use crate::{
HammerworkError,
events::{EventFilter, EventManager, EventSubscription, JobLifecycleEvent},
};
#[cfg(feature = "kafka")]
use rdkafka::{
ClientConfig,
message::{Header, OwnedHeaders},
producer::{FutureProducer, FutureRecord, Producer},
};
#[cfg(feature = "google-pubsub")]
use google_cloud_pubsub::{
client::{Client, ClientConfig as PubSubClientConfig},
publisher::Publisher,
};
#[cfg(feature = "google-pubsub")]
use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
#[cfg(feature = "google-pubsub")]
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
#[cfg(feature = "kinesis")]
use aws_sdk_kinesis::{Client as KinesisClient, config::Region};
#[cfg(feature = "kinesis")]
use aws_config::BehaviorVersion;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{RwLock, Semaphore};
use uuid::Uuid;
mod uuid_string {
use serde::{Deserialize, Deserializer, Serializer};
use uuid::Uuid;
pub fn serialize<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&uuid.to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Uuid, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;
let s = String::deserialize(deserializer)?;
Uuid::parse_str(&s).map_err(D::Error::custom)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
#[serde(with = "uuid_string")]
pub id: Uuid,
pub name: String,
pub backend: StreamBackend,
pub filter: EventFilter,
pub partitioning: PartitioningStrategy,
pub serialization: SerializationFormat,
pub retry_policy: StreamRetryPolicy,
pub enabled: bool,
pub buffer_config: BufferConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum StreamBackend {
Kafka {
brokers: Vec<String>,
topic: String,
config: HashMap<String, String>,
},
Kinesis {
region: String,
stream_name: String,
access_key_id: Option<String>,
secret_access_key: Option<String>,
config: HashMap<String, String>,
},
PubSub {
project_id: String,
topic_name: String,
service_account_key: Option<String>,
config: HashMap<String, String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum PartitioningStrategy {
None,
JobId,
QueueName,
Priority,
EventType,
Custom { metadata_key: String },
Hash { fields: Vec<PartitionField> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PartitionField {
JobId,
QueueName,
Priority,
EventType,
MetadataKey(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SerializationFormat {
Json,
Avro { schema_registry_url: String },
Protobuf { schema_definition: String },
MessagePack,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamRetryPolicy {
pub max_attempts: u32,
pub initial_delay_secs: u64,
pub max_delay_secs: u64,
pub backoff_multiplier: f64,
pub use_jitter: bool,
}
impl Default for StreamRetryPolicy {
fn default() -> Self {
Self {
max_attempts: 5,
initial_delay_secs: 1,
max_delay_secs: 300, backoff_multiplier: 2.0,
use_jitter: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BufferConfig {
pub max_events: usize,
pub max_buffer_time_secs: u64,
pub batch_size: usize,
}
impl Default for BufferConfig {
fn default() -> Self {
Self {
max_events: 1000,
max_buffer_time_secs: 5,
batch_size: 100,
}
}
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
name: "default_stream".to_string(),
backend: StreamBackend::Kafka {
brokers: vec!["localhost:9092".to_string()],
topic: "hammerwork-events".to_string(),
config: HashMap::new(),
},
filter: EventFilter::new(),
partitioning: PartitioningStrategy::None,
serialization: SerializationFormat::Json,
retry_policy: StreamRetryPolicy::default(),
enabled: true,
buffer_config: BufferConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamedEvent {
pub event: JobLifecycleEvent,
pub partition_key: Option<String>,
pub serialized_data: Vec<u8>,
pub streamed_at: DateTime<Utc>,
pub headers: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamDelivery {
#[serde(with = "uuid_string")]
pub delivery_id: Uuid,
#[serde(with = "uuid_string")]
pub stream_id: Uuid,
#[serde(with = "uuid_string")]
pub event_id: Uuid,
pub success: bool,
pub error_message: Option<String>,
pub attempted_at: DateTime<Utc>,
pub duration_ms: Option<u64>,
pub attempt_number: u32,
pub partition: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamStats {
#[serde(with = "uuid_string")]
pub stream_id: Uuid,
pub total_events: u64,
pub successful_deliveries: u64,
pub failed_deliveries: u64,
pub success_rate: f64,
pub avg_delivery_time_ms: f64,
pub buffered_events: u64,
pub last_success_at: Option<DateTime<Utc>>,
pub last_failure_at: Option<DateTime<Utc>>,
pub calculated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamManagerGlobalStats {
pub total_streams: usize,
pub active_streams: usize,
pub total_events: u64,
pub successful_deliveries: u64,
pub failed_deliveries: u64,
}
pub struct StreamManager {
streams: Arc<RwLock<HashMap<Uuid, StreamConfig>>>,
subscriptions: Arc<RwLock<HashMap<Uuid, EventSubscription>>>,
event_manager: Arc<EventManager>,
processors: Arc<RwLock<HashMap<Uuid, Box<dyn StreamProcessor + Send + Sync>>>>,
processing_semaphore: Arc<Semaphore>,
stats: Arc<RwLock<HashMap<Uuid, StreamStats>>>,
config: StreamManagerConfig,
}
#[derive(Debug, Clone)]
pub struct StreamManagerConfig {
pub max_concurrent_processors: usize,
pub log_operations: bool,
pub global_flush_interval_secs: u64,
}
impl Default for StreamManagerConfig {
fn default() -> Self {
Self {
max_concurrent_processors: 50,
log_operations: true,
global_flush_interval_secs: 10,
}
}
}
#[async_trait::async_trait]
pub trait StreamProcessor {
async fn send_batch(&self, events: Vec<StreamedEvent>) -> crate::Result<Vec<StreamDelivery>>;
async fn health_check(&self) -> crate::Result<bool>;
async fn get_stats(&self) -> crate::Result<HashMap<String, serde_json::Value>>;
async fn shutdown(&self) -> crate::Result<()>;
}
impl StreamManager {
pub fn new(event_manager: Arc<EventManager>, config: StreamManagerConfig) -> Self {
Self {
streams: Arc::new(RwLock::new(HashMap::new())),
subscriptions: Arc::new(RwLock::new(HashMap::new())),
event_manager,
processors: Arc::new(RwLock::new(HashMap::new())),
processing_semaphore: Arc::new(Semaphore::new(config.max_concurrent_processors)),
stats: Arc::new(RwLock::new(HashMap::new())),
config,
}
}
pub fn new_default(event_manager: Arc<EventManager>) -> Self {
Self::new(event_manager, StreamManagerConfig::default())
}
pub async fn add_stream(&self, stream: StreamConfig) -> crate::Result<()> {
let stream_id = stream.id;
let processor = self.create_processor(&stream).await?;
let subscription = self.event_manager.subscribe(stream.filter.clone()).await?;
{
let mut streams = self.streams.write().await;
streams.insert(stream_id, stream.clone());
}
{
let mut processors = self.processors.write().await;
processors.insert(stream_id, processor);
}
{
let mut subscriptions = self.subscriptions.write().await;
subscriptions.insert(stream_id, subscription);
}
{
let mut stats = self.stats.write().await;
stats.insert(
stream_id,
StreamStats {
stream_id,
total_events: 0,
successful_deliveries: 0,
failed_deliveries: 0,
success_rate: 0.0,
avg_delivery_time_ms: 0.0,
buffered_events: 0,
last_success_at: None,
last_failure_at: None,
calculated_at: Utc::now(),
},
);
}
self.start_stream_processing_task(stream_id).await;
if self.config.log_operations {
tracing::info!("Added stream: {} ({:?})", stream.name, stream.backend);
}
Ok(())
}
pub async fn remove_stream(&self, stream_id: Uuid) -> crate::Result<()> {
{
let mut processors = self.processors.write().await;
if let Some(processor) = processors.remove(&stream_id) {
processor.shutdown().await?;
}
}
{
let mut subscriptions = self.subscriptions.write().await;
if let Some(subscription) = subscriptions.remove(&stream_id) {
self.event_manager.unsubscribe(subscription.id).await?;
}
}
{
let mut streams = self.streams.write().await;
streams.remove(&stream_id);
}
{
let mut stats = self.stats.write().await;
stats.remove(&stream_id);
}
if self.config.log_operations {
tracing::info!("Removed stream: {}", stream_id);
}
Ok(())
}
pub async fn get_stream(&self, stream_id: Uuid) -> Option<StreamConfig> {
let streams = self.streams.read().await;
streams.get(&stream_id).cloned()
}
pub async fn list_streams(&self) -> Vec<StreamConfig> {
let streams = self.streams.read().await;
streams.values().cloned().collect()
}
pub async fn enable_stream(&self, stream_id: Uuid) -> crate::Result<()> {
let mut streams = self.streams.write().await;
if let Some(stream) = streams.get_mut(&stream_id) {
stream.enabled = true;
Ok(())
} else {
Err(crate::HammerworkError::Queue {
message: format!("Stream {} not found", stream_id),
})
}
}
pub async fn disable_stream(&self, stream_id: Uuid) -> crate::Result<()> {
let mut streams = self.streams.write().await;
if let Some(stream) = streams.get_mut(&stream_id) {
stream.enabled = false;
Ok(())
} else {
Err(crate::HammerworkError::Queue {
message: format!("Stream {} not found", stream_id),
})
}
}
pub async fn get_stream_stats(&self, stream_id: Uuid) -> Option<StreamStats> {
let stats = self.stats.read().await;
stats.get(&stream_id).cloned()
}
pub async fn get_all_stream_stats(&self) -> Vec<StreamStats> {
let stats = self.stats.read().await;
stats.values().cloned().collect()
}
pub async fn get_stats(&self) -> StreamManagerGlobalStats {
let streams = self.streams.read().await;
let stats = self.stats.read().await;
let total_streams = streams.len();
let active_streams = streams.values().filter(|s| s.enabled).count();
let total_events = stats.values().map(|s| s.total_events).sum();
let successful_deliveries = stats.values().map(|s| s.successful_deliveries).sum();
let failed_deliveries = stats.values().map(|s| s.failed_deliveries).sum();
StreamManagerGlobalStats {
total_streams,
active_streams,
total_events,
successful_deliveries,
failed_deliveries,
}
}
async fn create_processor(
&self,
stream: &StreamConfig,
) -> crate::Result<Box<dyn StreamProcessor + Send + Sync>> {
match &stream.backend {
StreamBackend::Kafka {
brokers,
topic,
config,
} => Ok(Box::new(
KafkaProcessor::new(brokers.clone(), topic.clone(), config.clone()).await?,
)),
StreamBackend::Kinesis {
region,
stream_name,
access_key_id,
secret_access_key,
config,
} => Ok(Box::new(
KinesisProcessor::new(
region.clone(),
stream_name.clone(),
access_key_id.clone(),
secret_access_key.clone(),
config.clone(),
)
.await?,
)),
StreamBackend::PubSub {
project_id,
topic_name,
service_account_key,
config,
} => Ok(Box::new(
PubSubProcessor::new(
project_id.clone(),
topic_name.clone(),
service_account_key.clone(),
config.clone(),
)
.await?,
)),
}
}
async fn start_stream_processing_task(&self, stream_id: Uuid) {
let streams = self.streams.clone();
let subscriptions = self.subscriptions.clone();
let processors = self.processors.clone();
let stats = self.stats.clone();
let processing_semaphore = self.processing_semaphore.clone();
let config = self.config.clone();
tokio::spawn(async move {
let mut event_buffer: Vec<JobLifecycleEvent> = Vec::new();
let mut last_flush = std::time::Instant::now();
loop {
let stream = {
let streams = streams.read().await;
match streams.get(&stream_id) {
Some(stream) if stream.enabled => stream.clone(),
_ => {
break;
}
}
};
let mut receiver = {
let subscriptions = subscriptions.read().await;
match subscriptions.get(&stream_id) {
Some(subscription) => subscription.receiver.resubscribe(),
None => {
break;
}
}
};
let should_flush = event_buffer.len() >= stream.buffer_config.batch_size
|| last_flush.elapsed().as_secs() >= stream.buffer_config.max_buffer_time_secs
|| event_buffer.len() >= stream.buffer_config.max_events;
if should_flush && !event_buffer.is_empty() {
let events_to_process = event_buffer.clone();
event_buffer.clear();
last_flush = std::time::Instant::now();
let stream_clone = stream.clone();
let processors_clone = processors.clone();
let stats_clone = stats.clone();
let config_clone = config.clone();
let semaphore_clone = processing_semaphore.clone();
tokio::spawn(async move {
let _permit = semaphore_clone.acquire().await.unwrap();
Self::process_event_batch(
stream_id,
stream_clone,
events_to_process,
processors_clone,
stats_clone,
config_clone,
)
.await;
});
}
match tokio::time::timeout(Duration::from_secs(1), receiver.recv()).await {
Ok(Ok(event)) => {
if stream.filter.matches(&event) {
event_buffer.push(event);
}
}
Ok(Err(_)) => {
break;
}
Err(_) => {
continue;
}
}
}
});
}
async fn process_event_batch(
stream_id: Uuid,
stream: StreamConfig,
events: Vec<JobLifecycleEvent>,
processors: Arc<RwLock<HashMap<Uuid, Box<dyn StreamProcessor + Send + Sync>>>>,
stats: Arc<RwLock<HashMap<Uuid, StreamStats>>>,
config: StreamManagerConfig,
) {
{
let processors = processors.read().await;
if !processors.contains_key(&stream_id) {
return;
}
}
let mut streamed_events = Vec::new();
for event in events {
match Self::prepare_streamed_event(event, &stream) {
Ok(streamed_event) => streamed_events.push(streamed_event),
Err(e) => {
if config.log_operations {
tracing::error!("Failed to prepare streamed event: {}", e);
}
}
}
}
if streamed_events.is_empty() {
return;
}
if config.log_operations {
tracing::debug!(
"Processing batch of {} events for stream {}",
streamed_events.len(),
stream.name
);
}
Self::update_stream_stats(stream_id, true, streamed_events.len(), stats).await;
}
fn prepare_streamed_event(
event: JobLifecycleEvent,
stream: &StreamConfig,
) -> crate::Result<StreamedEvent> {
let partition_key = Self::calculate_partition_key(&event, &stream.partitioning);
let serialized_data = match &stream.serialization {
SerializationFormat::Json => serde_json::to_vec(&event)?,
#[cfg(feature = "streaming")]
SerializationFormat::MessagePack => {
rmp_serde::to_vec(&event).map_err(|e| HammerworkError::Streaming {
message: format!("MessagePack serialization failed: {}", e),
})?
}
#[cfg(not(feature = "streaming"))]
SerializationFormat::MessagePack => {
return Err(HammerworkError::Streaming {
message: "MessagePack serialization requires 'streaming' feature".to_string(),
});
}
#[cfg(feature = "streaming")]
SerializationFormat::Avro {
schema_registry_url,
} => Self::serialize_avro(&event, schema_registry_url)?,
#[cfg(not(feature = "streaming"))]
SerializationFormat::Avro { .. } => {
return Err(HammerworkError::Streaming {
message: "Avro serialization requires 'streaming' feature".to_string(),
});
}
#[cfg(feature = "streaming")]
SerializationFormat::Protobuf { schema_definition } => {
Self::serialize_protobuf(&event, schema_definition)?
}
#[cfg(not(feature = "streaming"))]
SerializationFormat::Protobuf { .. } => {
return Err(HammerworkError::Streaming {
message: "Protobuf serialization requires 'streaming' feature".to_string(),
});
}
};
let mut headers = HashMap::new();
headers.insert("event_type".to_string(), event.event_type.to_string());
headers.insert("queue_name".to_string(), event.queue_name.clone());
headers.insert("priority".to_string(), event.priority.to_string());
headers.insert("timestamp".to_string(), event.timestamp.to_rfc3339());
Ok(StreamedEvent {
event,
partition_key,
serialized_data,
streamed_at: Utc::now(),
headers,
})
}
#[cfg(feature = "streaming")]
fn serialize_avro(
event: &JobLifecycleEvent,
_schema_registry_url: &str,
) -> crate::Result<Vec<u8>> {
use apache_avro::{Schema, Writer};
let schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "JobLifecycleEvent",
"fields": [
{"name": "job_id", "type": "string"},
{"name": "queue_name", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "priority", "type": "string"},
{"name": "timestamp", "type": "string"},
{"name": "metadata", "type": {"type": "map", "values": "string"}}
]
}
"#,
)
.map_err(|e| HammerworkError::Streaming {
message: format!("Failed to parse Avro schema: {}", e),
})?;
let mut writer = Writer::new(&schema, Vec::new());
let mut record =
apache_avro::types::Record::new(&schema).ok_or_else(|| HammerworkError::Streaming {
message: "Failed to create Avro record".to_string(),
})?;
record.put("job_id", event.job_id.to_string());
record.put("queue_name", event.queue_name.clone());
record.put("event_type", event.event_type.to_string());
record.put("priority", event.priority.to_string());
record.put("timestamp", event.timestamp.to_rfc3339());
record.put("metadata", event.metadata.clone());
writer
.append(record)
.map_err(|e| HammerworkError::Streaming {
message: format!("Failed to append Avro record: {}", e),
})?;
writer.into_inner().map_err(|e| HammerworkError::Streaming {
message: format!("Failed to finalize Avro writer: {}", e),
})
}
#[cfg(feature = "streaming")]
fn serialize_protobuf(
event: &JobLifecycleEvent,
_schema_definition: &str,
) -> crate::Result<Vec<u8>> {
use prost::Message;
#[derive(prost::Message)]
struct JobLifecycleEventProto {
#[prost(string, tag = "1")]
job_id: String,
#[prost(string, tag = "2")]
queue_name: String,
#[prost(string, tag = "3")]
event_type: String,
#[prost(string, tag = "4")]
priority: String,
#[prost(string, tag = "5")]
timestamp: String,
#[prost(map = "string, string", tag = "6")]
metadata: std::collections::HashMap<String, String>,
}
let proto_event = JobLifecycleEventProto {
job_id: event.job_id.to_string(),
queue_name: event.queue_name.clone(),
event_type: event.event_type.to_string(),
priority: event.priority.to_string(),
timestamp: event.timestamp.to_rfc3339(),
metadata: event.metadata.clone(),
};
let mut buf = Vec::new();
proto_event
.encode(&mut buf)
.map_err(|e| HammerworkError::Streaming {
message: format!("Failed to encode Protobuf message: {}", e),
})?;
Ok(buf)
}
fn calculate_partition_key(
event: &JobLifecycleEvent,
strategy: &PartitioningStrategy,
) -> Option<String> {
match strategy {
PartitioningStrategy::None => None,
PartitioningStrategy::JobId => Some(event.job_id.to_string()),
PartitioningStrategy::QueueName => Some(event.queue_name.clone()),
PartitioningStrategy::Priority => Some(event.priority.to_string()),
PartitioningStrategy::EventType => Some(event.event_type.to_string()),
PartitioningStrategy::Custom { metadata_key } => {
event.metadata.get(metadata_key).cloned()
}
PartitioningStrategy::Hash { fields } => {
let mut hash_input = String::new();
for field in fields {
match field {
PartitionField::JobId => hash_input.push_str(&event.job_id.to_string()),
PartitionField::QueueName => hash_input.push_str(&event.queue_name),
PartitionField::Priority => {
hash_input.push_str(&event.priority.to_string())
}
PartitionField::EventType => {
hash_input.push_str(&event.event_type.to_string())
}
PartitionField::MetadataKey(key) => {
if let Some(value) = event.metadata.get(key) {
hash_input.push_str(value);
}
}
}
hash_input.push('|');
}
let hash = hash_input.chars().map(|c| c as u32).sum::<u32>();
Some(format!("{}", hash % 1000))
}
}
}
async fn update_stream_stats(
stream_id: Uuid,
success: bool,
event_count: usize,
stats: Arc<RwLock<HashMap<Uuid, StreamStats>>>,
) {
let mut stats_map = stats.write().await;
if let Some(stream_stats) = stats_map.get_mut(&stream_id) {
stream_stats.total_events += event_count as u64;
if success {
stream_stats.successful_deliveries += event_count as u64;
stream_stats.last_success_at = Some(Utc::now());
} else {
stream_stats.failed_deliveries += event_count as u64;
stream_stats.last_failure_at = Some(Utc::now());
}
stream_stats.success_rate =
stream_stats.successful_deliveries as f64 / stream_stats.total_events as f64;
stream_stats.calculated_at = Utc::now();
}
}
}
#[cfg(feature = "kafka")]
pub struct KafkaProcessor {
producer: FutureProducer,
topic: String,
config: HashMap<String, String>,
}
#[cfg(not(feature = "kafka"))]
pub struct KafkaProcessor {
brokers: Vec<String>,
topic: String,
config: HashMap<String, String>,
}
#[cfg(feature = "kafka")]
impl KafkaProcessor {
pub async fn new(
brokers: Vec<String>,
topic: String,
config: HashMap<String, String>,
) -> crate::Result<Self> {
let mut client_config = ClientConfig::new();
client_config.set("bootstrap.servers", brokers.join(","));
for (key, value) in &config {
client_config.set(key, value);
}
if !config.contains_key("message.timeout.ms") {
client_config.set("message.timeout.ms", "5000");
}
if !config.contains_key("queue.buffering.max.messages") {
client_config.set("queue.buffering.max.messages", "100000");
}
let producer: FutureProducer =
client_config
.create()
.map_err(|e| HammerworkError::Streaming {
message: format!("Failed to create Kafka producer: {}", e),
})?;
Ok(Self {
producer,
topic,
config,
})
}
}
#[cfg(not(feature = "kafka"))]
impl KafkaProcessor {
pub async fn new(
brokers: Vec<String>,
topic: String,
config: HashMap<String, String>,
) -> crate::Result<Self> {
Ok(Self {
brokers,
topic,
config,
})
}
}
#[cfg(feature = "kafka")]
#[async_trait::async_trait]
impl StreamProcessor for KafkaProcessor {
async fn send_batch(&self, events: Vec<StreamedEvent>) -> crate::Result<Vec<StreamDelivery>> {
let mut deliveries = Vec::new();
for event in events {
let delivery_start = Utc::now();
let mut record = FutureRecord::to(&self.topic).payload(&event.serialized_data);
if let Some(ref key) = event.partition_key {
record = record.key(key);
}
if !event.headers.is_empty() {
let mut headers = OwnedHeaders::new();
for (header_key, header_value) in &event.headers {
headers = headers.insert(Header {
key: header_key,
value: Some(header_value),
});
}
record = record.headers(headers);
}
let result = self
.producer
.send(record, tokio::time::Duration::from_secs(5))
.await;
let delivery_end = Utc::now();
let duration_ms = delivery_end
.signed_duration_since(delivery_start)
.num_milliseconds()
.max(0) as u64;
let (success, error_message, partition) = match result {
Ok((partition, offset)) => {
tracing::debug!(
"Message delivered to partition {} at offset {}",
partition,
offset
);
(true, None, Some(partition.to_string()))
}
Err((kafka_error, _)) => {
let error_msg = format!("Kafka delivery failed: {}", kafka_error);
tracing::error!("{}", error_msg);
(false, Some(error_msg), event.partition_key)
}
};
deliveries.push(StreamDelivery {
delivery_id: Uuid::new_v4(),
stream_id: Uuid::new_v4(), event_id: event.event.event_id,
success,
error_message,
attempted_at: delivery_start,
duration_ms: Some(duration_ms),
attempt_number: 1,
partition,
});
}
Ok(deliveries)
}
async fn health_check(&self) -> crate::Result<bool> {
let health_check_timeout_ms = self
.config
.get("health.check.timeout.ms")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(5000);
let timeout = tokio::time::Duration::from_millis(health_check_timeout_ms);
let result = tokio::task::spawn_blocking({
let producer = self.producer.clone();
let topic = self.topic.clone();
move || producer.client().fetch_metadata(Some(&topic), timeout)
})
.await;
let health_result = match result {
Ok(metadata_result) => match metadata_result {
Ok(metadata) => {
let topic_metadata = metadata.topics().iter().find(|t| t.name() == self.topic);
match topic_metadata {
Some(topic) => {
if topic.partitions().is_empty() {
tracing::warn!("Topic {} has no partitions", self.topic);
false
} else {
tracing::debug!(
"Kafka health check passed for topic {}",
self.topic
);
true
}
}
None => {
tracing::warn!("Topic {} not found in metadata", self.topic);
false
}
}
}
Err(e) => {
tracing::error!("Kafka health check failed: {}", e);
false
}
},
Err(e) => {
tracing::error!("Kafka health check task failed: {}", e);
false
}
};
Ok(health_result)
}
async fn get_stats(&self) -> crate::Result<HashMap<String, serde_json::Value>> {
let mut stats = HashMap::new();
stats.insert(
"type".to_string(),
serde_json::Value::String("kafka".to_string()),
);
let brokers = self
.config
.get("bootstrap.servers")
.map(|b| {
b.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>()
})
.unwrap_or_default();
stats.insert(
"brokers".to_string(),
serde_json::Value::Array(
brokers
.iter()
.map(|b| serde_json::Value::String(b.clone()))
.collect(),
),
);
stats.insert(
"topic".to_string(),
serde_json::Value::String(self.topic.clone()),
);
stats.insert(
"producer_available".to_string(),
serde_json::Value::Bool(true),
);
stats.insert(
"config".to_string(),
serde_json::Value::Object(
self.config
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect(),
),
);
Ok(stats)
}
async fn shutdown(&self) -> crate::Result<()> {
let producer = self.producer.clone();
let flush_result = tokio::task::spawn_blocking(move || {
producer.flush(tokio::time::Duration::from_secs(10))
})
.await;
match flush_result {
Ok(Ok(())) => {
tracing::debug!("Kafka producer flushed successfully during shutdown");
}
Ok(Err(e)) => {
tracing::warn!("Error flushing Kafka producer during shutdown: {}", e);
}
Err(e) => {
tracing::warn!("Flush task failed during shutdown: {}", e);
}
}
Ok(())
}
}
#[cfg(not(feature = "kafka"))]
#[async_trait::async_trait]
impl StreamProcessor for KafkaProcessor {
async fn send_batch(&self, events: Vec<StreamedEvent>) -> crate::Result<Vec<StreamDelivery>> {
let start_time = Utc::now();
let mut deliveries = Vec::new();
let batch_delay_ms = self
.config
.get("batch.delay.ms")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(10);
tokio::time::sleep(tokio::time::Duration::from_millis(batch_delay_ms)).await;
let error_rate = self
.config
.get("test.error.rate")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0);
for event in events {
let success = rand::random::<f64>() > error_rate;
let duration = start_time
.signed_duration_since(Utc::now())
.num_milliseconds()
.unsigned_abs();
deliveries.push(StreamDelivery {
delivery_id: Uuid::new_v4(),
stream_id: Uuid::new_v4(), event_id: event.event.event_id,
success,
error_message: if success {
None
} else {
Some(format!(
"Simulated Kafka delivery failure to topic: {}",
self.topic
))
},
attempted_at: start_time,
duration_ms: Some(duration),
attempt_number: 1,
partition: event.partition_key,
});
}
Ok(deliveries)
}
async fn health_check(&self) -> crate::Result<bool> {
let health_check_timeout_ms = self
.config
.get("health.check.timeout.ms")
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(5000);
tokio::time::timeout(
tokio::time::Duration::from_millis(health_check_timeout_ms),
async {
for broker in &self.brokers {
tracing::debug!("Health checking Kafka broker: {}", broker);
}
Ok(true)
},
)
.await
.unwrap_or(Ok(false))
}
async fn get_stats(&self) -> crate::Result<HashMap<String, serde_json::Value>> {
let mut stats = HashMap::new();
stats.insert(
"type".to_string(),
serde_json::Value::String("kafka".to_string()),
);
stats.insert(
"brokers".to_string(),
serde_json::Value::Array(
self.brokers
.iter()
.map(|b| serde_json::Value::String(b.clone()))
.collect(),
),
);
stats.insert(
"topic".to_string(),
serde_json::Value::String(self.topic.clone()),
);
stats.insert(
"config".to_string(),
serde_json::Value::Object(
self.config
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect(),
),
);
Ok(stats)
}
async fn shutdown(&self) -> crate::Result<()> {
Ok(())
}
}
#[cfg(feature = "kinesis")]
pub struct KinesisProcessor {
region: String,
stream_name: String,
access_key_id: Option<String>,
secret_access_key: Option<String>,
config: HashMap<String, String>,
client: KinesisClient,
}
#[cfg(not(feature = "kinesis"))]
pub struct KinesisProcessor {
region: String,
stream_name: String,
access_key_id: Option<String>,
secret_access_key: Option<String>,
config: HashMap<String, String>,
}
#[cfg(feature = "kinesis")]
impl KinesisProcessor {
pub async fn new(
region: String,
stream_name: String,
access_key_id: Option<String>,
secret_access_key: Option<String>,
config: HashMap<String, String>,
) -> crate::Result<Self> {
let aws_config =
if let (Some(access_key), Some(secret_key)) = (&access_key_id, &secret_access_key) {
let credentials = aws_sdk_kinesis::config::Credentials::new(
access_key,
secret_key,
None, None, "hammerwork",
);
aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(region.clone()))
.credentials_provider(credentials)
.load()
.await
} else {
aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(region.clone()))
.load()
.await
};
let client = KinesisClient::new(&aws_config);
Ok(Self {
region,
stream_name,
access_key_id,
secret_access_key,
config,
client,
})
}
}
#[cfg(not(feature = "kinesis"))]
impl KinesisProcessor {
pub async fn new(
region: String,
stream_name: String,
access_key_id: Option<String>,
secret_access_key: Option<String>,
config: HashMap<String, String>,
) -> crate::Result<Self> {
Ok(Self {
region,
stream_name,
access_key_id,
secret_access_key,
config,
})
}
}
#[cfg(feature = "kinesis")]
#[async_trait::async_trait]
impl StreamProcessor for KinesisProcessor {
async fn send_batch(&self, events: Vec<StreamedEvent>) -> crate::Result<Vec<StreamDelivery>> {
let start_time = std::time::Instant::now();
let mut deliveries = Vec::new();
for event in events {
let delivery_start = std::time::Instant::now();
let delivery_id = Uuid::new_v4();
let payload =
serde_json::to_vec(&event.event).map_err(|e| HammerworkError::Streaming {
message: format!("Failed to serialize event: {}", e),
})?;
let partition_key = event.partition_key.clone().unwrap_or_else(|| {
event.event.event_id.to_string()
});
let put_result = self
.client
.put_record()
.stream_name(&self.stream_name)
.data(aws_sdk_kinesis::primitives::Blob::new(payload))
.partition_key(&partition_key)
.send()
.await;
let delivery = match put_result {
Ok(output) => {
let sequence_number = output.sequence_number();
let shard_id = output.shard_id();
tracing::debug!(
"Published event to Kinesis: sequence={}, shard={}",
sequence_number,
shard_id
);
StreamDelivery {
delivery_id,
stream_id: Uuid::new_v4(), event_id: event.event.event_id,
success: true,
error_message: None,
attempted_at: Utc::now(),
duration_ms: Some(delivery_start.elapsed().as_millis() as u64),
attempt_number: 1,
partition: Some(partition_key),
}
}
Err(e) => {
let error_msg = format!("Failed to publish to Kinesis: {}", e);
tracing::error!("{}", error_msg);
StreamDelivery {
delivery_id,
stream_id: Uuid::new_v4(),
event_id: event.event.event_id,
success: false,
error_message: Some(error_msg),
attempted_at: Utc::now(),
duration_ms: Some(delivery_start.elapsed().as_millis() as u64),
attempt_number: 1,
partition: Some(partition_key),
}
}
};
deliveries.push(delivery);
}
tracing::info!(
"Sent {} events to Kinesis stream '{}' in {}ms",
deliveries.len(),
self.stream_name,
start_time.elapsed().as_millis()
);
Ok(deliveries)
}
async fn health_check(&self) -> crate::Result<bool> {
match self
.client
.describe_stream()
.stream_name(&self.stream_name)
.send()
.await
{
Ok(output) => {
if let Some(stream_description) = output.stream_description() {
let status = stream_description.stream_status();
tracing::debug!("Kinesis stream '{}' status: {:?}", self.stream_name, status);
match status {
aws_sdk_kinesis::types::StreamStatus::Active
| aws_sdk_kinesis::types::StreamStatus::Updating => Ok(true),
_ => {
tracing::warn!(
"Kinesis stream '{}' is not in a healthy state: {:?}",
self.stream_name,
status
);
Ok(false)
}
}
} else {
tracing::warn!(
"Kinesis stream '{}' description not available",
self.stream_name
);
Ok(false)
}
}
Err(e) => {
tracing::warn!(
"Kinesis health check failed for stream '{}': {}",
self.stream_name,
e
);
Ok(false)
}
}
}
async fn get_stats(&self) -> crate::Result<HashMap<String, serde_json::Value>> {
let mut stats = HashMap::new();
stats.insert(
"type".to_string(),
serde_json::Value::String("kinesis".to_string()),
);
stats.insert(
"region".to_string(),
serde_json::Value::String(self.region.clone()),
);
stats.insert(
"stream_name".to_string(),
serde_json::Value::String(self.stream_name.clone()),
);
stats.insert(
"credentials_configured".to_string(),
serde_json::Value::Bool(
self.access_key_id.is_some() && self.secret_access_key.is_some(),
),
);
if let Some(ref access_key_id) = self.access_key_id {
stats.insert(
"access_key_id".to_string(),
serde_json::Value::String(format!(
"{}***",
&access_key_id[..4.min(access_key_id.len())]
)),
);
}
stats.insert(
"config".to_string(),
serde_json::Value::Object(
self.config
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect(),
),
);
stats.insert("feature_enabled".to_string(), serde_json::Value::Bool(true));
Ok(stats)
}
async fn shutdown(&self) -> crate::Result<()> {
tracing::info!(
"Kinesis processor shutdown for stream '{}'.",
self.stream_name
);
Ok(())
}
}
#[cfg(not(feature = "kinesis"))]
#[async_trait::async_trait]
impl StreamProcessor for KinesisProcessor {
async fn send_batch(&self, events: Vec<StreamedEvent>) -> crate::Result<Vec<StreamDelivery>> {
let mut deliveries = Vec::new();
for event in events {
deliveries.push(StreamDelivery {
delivery_id: Uuid::new_v4(),
stream_id: Uuid::new_v4(),
event_id: event.event.event_id,
success: false,
error_message: Some("AWS Kinesis feature not enabled. Enable 'kinesis' feature to use this backend.".to_string()),
attempted_at: Utc::now(),
duration_ms: Some(0),
attempt_number: 1,
partition: event.partition_key,
});
}
Ok(deliveries)
}
async fn health_check(&self) -> crate::Result<bool> {
Ok(false) }
async fn get_stats(&self) -> crate::Result<HashMap<String, serde_json::Value>> {
let mut stats = HashMap::new();
stats.insert(
"type".to_string(),
serde_json::Value::String("kinesis".to_string()),
);
stats.insert(
"region".to_string(),
serde_json::Value::String(self.region.clone()),
);
stats.insert(
"stream_name".to_string(),
serde_json::Value::String(self.stream_name.clone()),
);
stats.insert(
"credentials_configured".to_string(),
serde_json::Value::Bool(
self.access_key_id.is_some() && self.secret_access_key.is_some(),
),
);
stats.insert(
"feature_enabled".to_string(),
serde_json::Value::Bool(false),
);
stats.insert(
"error".to_string(),
serde_json::Value::String("AWS Kinesis feature not enabled".to_string()),
);
Ok(stats)
}
async fn shutdown(&self) -> crate::Result<()> {
Ok(())
}
}
#[cfg(feature = "google-pubsub")]
pub struct PubSubProcessor {
project_id: String,
topic_name: String,
service_account_key: Option<String>,
config: HashMap<String, String>,
publisher: Publisher,
}
#[cfg(not(feature = "google-pubsub"))]
pub struct PubSubProcessor {
project_id: String,
topic_name: String,
service_account_key: Option<String>,
config: HashMap<String, String>,
}
#[cfg(feature = "google-pubsub")]
impl PubSubProcessor {
pub async fn new(
project_id: String,
topic_name: String,
service_account_key: Option<String>,
config: HashMap<String, String>,
) -> crate::Result<Self> {
let client_config = if let Some(service_account_json) = &service_account_key {
let credentials = serde_json::from_str::<CredentialsFile>(service_account_json)
.map_err(|e| HammerworkError::Streaming {
message: format!("Invalid service account credentials: {}", e),
})?;
PubSubClientConfig::default()
.with_credentials(credentials)
.await
.map_err(|e| HammerworkError::Streaming {
message: format!("Failed to configure Pub/Sub client: {}", e),
})?
} else {
PubSubClientConfig::default()
.with_auth()
.await
.map_err(|e| HammerworkError::Streaming {
message: format!(
"Failed to initialize Pub/Sub client with default credentials: {}",
e
),
})?
};
let client = Client::new(client_config)
.await
.map_err(|e| HammerworkError::Streaming {
message: format!("Failed to create Pub/Sub client: {}", e),
})?;
let topic = client.topic(&topic_name);
let publisher = topic.new_publisher(None);
Ok(Self {
project_id,
topic_name,
service_account_key,
config,
publisher,
})
}
}
#[cfg(not(feature = "google-pubsub"))]
impl PubSubProcessor {
pub async fn new(
project_id: String,
topic_name: String,
service_account_key: Option<String>,
config: HashMap<String, String>,
) -> crate::Result<Self> {
Ok(Self {
project_id,
topic_name,
service_account_key,
config,
})
}
}
#[cfg(feature = "google-pubsub")]
#[async_trait::async_trait]
impl StreamProcessor for PubSubProcessor {
async fn send_batch(&self, events: Vec<StreamedEvent>) -> crate::Result<Vec<StreamDelivery>> {
let start_time = std::time::Instant::now();
let mut deliveries = Vec::new();
for event in events {
let delivery_start = std::time::Instant::now();
let delivery_id = Uuid::new_v4();
let payload =
serde_json::to_vec(&event.event).map_err(|e| HammerworkError::Streaming {
message: format!("Failed to serialize event: {}", e),
})?;
let mut attributes = std::collections::HashMap::new();
attributes.insert("event_id".to_string(), event.event.event_id.to_string());
attributes.insert("job_id".to_string(), event.event.job_id.to_string());
attributes.insert("queue_name".to_string(), event.event.queue_name.clone());
attributes.insert(
"event_type".to_string(),
format!("{:?}", event.event.event_type),
);
attributes.insert("timestamp".to_string(), event.event.timestamp.to_rfc3339());
if let Some(partition_key) = &event.partition_key {
attributes.insert("partition_key".to_string(), partition_key.clone());
}
let message = PubsubMessage {
data: payload,
attributes,
message_id: String::new(),
publish_time: None,
ordering_key: event.partition_key.clone().unwrap_or_default(),
};
let awaiter = self.publisher.publish(message).await;
let delivery = match awaiter.get().await {
Ok(message_id) => {
tracing::debug!("Published message to Pub/Sub: {}", message_id);
StreamDelivery {
delivery_id,
stream_id: Uuid::new_v4(), event_id: event.event.event_id,
success: true,
error_message: None,
attempted_at: Utc::now(),
duration_ms: Some(delivery_start.elapsed().as_millis() as u64),
attempt_number: 1,
partition: event.partition_key,
}
}
Err(e) => {
let error_msg = format!("Failed to publish to Pub/Sub: {}", e);
tracing::error!("{}", error_msg);
StreamDelivery {
delivery_id,
stream_id: Uuid::new_v4(),
event_id: event.event.event_id,
success: false,
error_message: Some(error_msg),
attempted_at: Utc::now(),
duration_ms: Some(delivery_start.elapsed().as_millis() as u64),
attempt_number: 1,
partition: event.partition_key,
}
}
};
deliveries.push(delivery);
}
tracing::info!(
"Sent {} events to Pub/Sub topic '{}' in {}ms",
deliveries.len(),
self.topic_name,
start_time.elapsed().as_millis()
);
Ok(deliveries)
}
async fn health_check(&self) -> crate::Result<bool> {
tracing::debug!("Pub/Sub health check for topic '{}'.", self.topic_name);
Ok(true)
}
async fn get_stats(&self) -> crate::Result<HashMap<String, serde_json::Value>> {
let mut stats = HashMap::new();
stats.insert(
"type".to_string(),
serde_json::Value::String("pubsub".to_string()),
);
stats.insert(
"project_id".to_string(),
serde_json::Value::String(self.project_id.clone()),
);
stats.insert(
"topic_name".to_string(),
serde_json::Value::String(self.topic_name.clone()),
);
stats.insert(
"service_account_configured".to_string(),
serde_json::Value::Bool(self.service_account_key.is_some()),
);
stats.insert(
"config".to_string(),
serde_json::Value::Object(
self.config
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect(),
),
);
stats.insert("feature_enabled".to_string(), serde_json::Value::Bool(true));
Ok(stats)
}
async fn shutdown(&self) -> crate::Result<()> {
tracing::info!(
"Pub/Sub publisher shutdown for topic '{}'.",
self.topic_name
);
Ok(())
}
}
#[cfg(not(feature = "google-pubsub"))]
#[async_trait::async_trait]
impl StreamProcessor for PubSubProcessor {
async fn send_batch(&self, events: Vec<StreamedEvent>) -> crate::Result<Vec<StreamDelivery>> {
let mut deliveries = Vec::new();
for event in events {
deliveries.push(StreamDelivery {
delivery_id: Uuid::new_v4(),
stream_id: Uuid::new_v4(),
event_id: event.event.event_id,
success: false,
error_message: Some("Google Pub/Sub feature not enabled. Enable 'google-pubsub' feature to use this backend.".to_string()),
attempted_at: Utc::now(),
duration_ms: Some(0),
attempt_number: 1,
partition: event.partition_key,
});
}
Ok(deliveries)
}
async fn health_check(&self) -> crate::Result<bool> {
Ok(false) }
async fn get_stats(&self) -> crate::Result<HashMap<String, serde_json::Value>> {
let mut stats = HashMap::new();
stats.insert(
"type".to_string(),
serde_json::Value::String("pubsub".to_string()),
);
stats.insert(
"project_id".to_string(),
serde_json::Value::String(self.project_id.clone()),
);
stats.insert(
"topic_name".to_string(),
serde_json::Value::String(self.topic_name.clone()),
);
stats.insert(
"service_account_configured".to_string(),
serde_json::Value::Bool(self.service_account_key.is_some()),
);
stats.insert(
"feature_enabled".to_string(),
serde_json::Value::Bool(false),
);
stats.insert(
"error".to_string(),
serde_json::Value::String("Google Pub/Sub feature not enabled".to_string()),
);
Ok(stats)
}
async fn shutdown(&self) -> crate::Result<()> {
Ok(())
}
}
impl StreamConfig {
pub fn new(name: String, backend: StreamBackend) -> Self {
Self {
id: Uuid::new_v4(),
name,
backend,
filter: EventFilter::default(),
partitioning: PartitioningStrategy::None,
serialization: SerializationFormat::Json,
retry_policy: StreamRetryPolicy::default(),
enabled: true,
buffer_config: BufferConfig::default(),
}
}
pub fn with_filter(mut self, filter: EventFilter) -> Self {
self.filter = filter;
self
}
pub fn with_partitioning(mut self, partitioning: PartitioningStrategy) -> Self {
self.partitioning = partitioning;
self
}
pub fn with_serialization(mut self, serialization: SerializationFormat) -> Self {
self.serialization = serialization;
self
}
pub fn with_retry_policy(mut self, retry_policy: StreamRetryPolicy) -> Self {
self.retry_policy = retry_policy;
self
}
pub fn with_buffer_config(mut self, buffer_config: BufferConfig) -> Self {
self.buffer_config = buffer_config;
self
}
pub fn enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{events::JobLifecycleEventType, priority::JobPriority};
#[test]
fn test_stream_config_creation() {
let backend = StreamBackend::Kafka {
brokers: vec!["localhost:9092".to_string()],
topic: "job-events".to_string(),
config: HashMap::new(),
};
let stream = StreamConfig::new("test-stream".to_string(), backend)
.with_partitioning(PartitioningStrategy::QueueName)
.with_serialization(SerializationFormat::Json)
.enabled(true);
assert_eq!(stream.name, "test-stream");
assert!(stream.enabled);
assert!(matches!(
stream.partitioning,
PartitioningStrategy::QueueName
));
assert!(matches!(stream.serialization, SerializationFormat::Json));
}
#[test]
fn test_partitioning_strategies() {
let event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "email_queue".to_string(),
event_type: JobLifecycleEventType::Completed,
priority: JobPriority::High,
timestamp: Utc::now(),
processing_time_ms: Some(1000),
error: None,
payload: None,
metadata: {
let mut metadata = HashMap::new();
metadata.insert("source".to_string(), "api".to_string());
metadata
},
};
assert_eq!(
StreamManager::calculate_partition_key(&event, &PartitioningStrategy::None),
None
);
assert_eq!(
StreamManager::calculate_partition_key(&event, &PartitioningStrategy::QueueName),
Some("email_queue".to_string())
);
assert_eq!(
StreamManager::calculate_partition_key(&event, &PartitioningStrategy::Priority),
Some("high".to_string())
);
assert_eq!(
StreamManager::calculate_partition_key(&event, &PartitioningStrategy::EventType),
Some("completed".to_string())
);
assert_eq!(
StreamManager::calculate_partition_key(
&event,
&PartitioningStrategy::Custom {
metadata_key: "source".to_string()
}
),
Some("api".to_string())
);
let hash_result = StreamManager::calculate_partition_key(
&event,
&PartitioningStrategy::Hash {
fields: vec![PartitionField::QueueName, PartitionField::Priority],
},
);
assert!(hash_result.is_some());
}
#[test]
fn test_stream_backends() {
let kafka = StreamBackend::Kafka {
brokers: vec!["localhost:9092".to_string()],
topic: "events".to_string(),
config: HashMap::new(),
};
let kinesis = StreamBackend::Kinesis {
region: "us-east-1".to_string(),
stream_name: "job-events".to_string(),
access_key_id: None,
secret_access_key: None,
config: HashMap::new(),
};
let pubsub = StreamBackend::PubSub {
project_id: "my-project".to_string(),
topic_name: "job-events".to_string(),
service_account_key: None,
config: HashMap::new(),
};
match kafka {
StreamBackend::Kafka { brokers, topic, .. } => {
assert_eq!(brokers.len(), 1);
assert_eq!(topic, "events");
}
_ => panic!("Wrong backend type"),
}
match kinesis {
StreamBackend::Kinesis {
region,
stream_name,
..
} => {
assert_eq!(region, "us-east-1");
assert_eq!(stream_name, "job-events");
}
_ => panic!("Wrong backend type"),
}
match pubsub {
StreamBackend::PubSub {
project_id,
topic_name,
..
} => {
assert_eq!(project_id, "my-project");
assert_eq!(topic_name, "job-events");
}
_ => panic!("Wrong backend type"),
}
}
#[test]
fn test_retry_policy() {
let policy = StreamRetryPolicy {
max_attempts: 3,
initial_delay_secs: 1,
max_delay_secs: 60,
backoff_multiplier: 2.0,
use_jitter: false,
};
assert_eq!(policy.max_attempts, 3);
assert_eq!(policy.initial_delay_secs, 1);
assert_eq!(policy.backoff_multiplier, 2.0);
assert!(!policy.use_jitter);
}
#[test]
fn test_buffer_config() {
let buffer = BufferConfig {
max_events: 500,
max_buffer_time_secs: 10,
batch_size: 50,
};
assert_eq!(buffer.max_events, 500);
assert_eq!(buffer.max_buffer_time_secs, 10);
assert_eq!(buffer.batch_size, 50);
}
#[test]
fn test_serialization_formats() {
let json = SerializationFormat::Json;
let avro = SerializationFormat::Avro {
schema_registry_url: "http://localhost:8081".to_string(),
};
let protobuf = SerializationFormat::Protobuf {
schema_definition: "syntax = \"proto3\";".to_string(),
};
let msgpack = SerializationFormat::MessagePack;
assert!(matches!(json, SerializationFormat::Json));
assert!(matches!(avro, SerializationFormat::Avro { .. }));
assert!(matches!(protobuf, SerializationFormat::Protobuf { .. }));
assert!(matches!(msgpack, SerializationFormat::MessagePack));
}
fn create_test_job_lifecycle_event() -> JobLifecycleEvent {
JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "test_queue".to_string(),
event_type: JobLifecycleEventType::Started,
priority: crate::priority::JobPriority::Normal,
timestamp: chrono::Utc::now(),
processing_time_ms: Some(150),
error: None,
payload: Some(serde_json::json!({"message": "test payload"})),
metadata: {
let mut metadata = HashMap::new();
metadata.insert("test_key".to_string(), "test_value".to_string());
metadata.insert("priority".to_string(), "normal".to_string());
metadata
},
}
}
#[test]
fn test_json_serialization() {
let event = create_test_job_lifecycle_event();
let stream_config = StreamConfig {
name: "test_stream".to_string(),
serialization: SerializationFormat::Json,
..Default::default()
};
let result = StreamManager::prepare_streamed_event(event.clone(), &stream_config);
assert!(result.is_ok());
let streamed_event = result.unwrap();
assert!(!streamed_event.serialized_data.is_empty());
let deserialized: JobLifecycleEvent =
serde_json::from_slice(&streamed_event.serialized_data).unwrap();
assert_eq!(deserialized.queue_name, event.queue_name);
assert_eq!(deserialized.event_type, event.event_type);
assert_eq!(deserialized.priority, event.priority);
assert_eq!(deserialized.metadata, event.metadata);
}
#[test]
#[cfg(feature = "streaming")]
fn test_messagepack_serialization() {
let event = create_test_job_lifecycle_event();
let stream_config = StreamConfig {
name: "test_stream".to_string(),
serialization: SerializationFormat::MessagePack,
..Default::default()
};
let result = StreamManager::prepare_streamed_event(event.clone(), &stream_config);
assert!(result.is_ok());
let streamed_event = result.unwrap();
assert!(!streamed_event.serialized_data.is_empty());
let deserialized: JobLifecycleEvent =
rmp_serde::from_slice(&streamed_event.serialized_data).unwrap();
assert_eq!(deserialized.queue_name, event.queue_name);
assert_eq!(deserialized.event_type, event.event_type);
assert_eq!(deserialized.priority, event.priority);
assert_eq!(deserialized.metadata, event.metadata);
let json_size = serde_json::to_vec(&event).unwrap().len();
assert!(streamed_event.serialized_data.len() <= json_size);
}
#[test]
#[cfg(not(feature = "streaming"))]
fn test_messagepack_serialization_requires_feature() {
let event = create_test_job_lifecycle_event();
let stream_config = StreamConfig {
name: "test_stream".to_string(),
serialization: SerializationFormat::MessagePack,
..Default::default()
};
let result = StreamManager::prepare_streamed_event(event, &stream_config);
assert!(result.is_err());
let error = result.unwrap_err();
if let crate::error::HammerworkError::Streaming { message } = error {
assert!(message.contains("streaming"));
assert!(message.contains("feature"));
} else {
panic!("Expected Streaming error");
}
}
#[test]
#[cfg(feature = "streaming")]
fn test_avro_serialization() {
let event = create_test_job_lifecycle_event();
let stream_config = StreamConfig {
name: "test_stream".to_string(),
serialization: SerializationFormat::Avro {
schema_registry_url: "http://localhost:8081".to_string(),
},
..Default::default()
};
let result = StreamManager::prepare_streamed_event(event.clone(), &stream_config);
assert!(result.is_ok());
let streamed_event = result.unwrap();
assert!(!streamed_event.serialized_data.is_empty());
assert!(streamed_event.serialized_data.len() > 10); }
#[test]
#[cfg(not(feature = "streaming"))]
fn test_avro_serialization_requires_feature() {
let event = create_test_job_lifecycle_event();
let stream_config = StreamConfig {
name: "test_stream".to_string(),
serialization: SerializationFormat::Avro {
schema_registry_url: "http://localhost:8081".to_string(),
},
..Default::default()
};
let result = StreamManager::prepare_streamed_event(event, &stream_config);
assert!(result.is_err());
let error = result.unwrap_err();
if let crate::error::HammerworkError::Streaming { message } = error {
assert!(message.contains("streaming"));
assert!(message.contains("feature"));
} else {
panic!("Expected Streaming error");
}
}
#[test]
#[cfg(feature = "streaming")]
fn test_protobuf_serialization() {
let event = create_test_job_lifecycle_event();
let stream_config = StreamConfig {
name: "test_stream".to_string(),
serialization: SerializationFormat::Protobuf {
schema_definition: "syntax = \"proto3\";".to_string(),
},
..Default::default()
};
let result = StreamManager::prepare_streamed_event(event.clone(), &stream_config);
assert!(result.is_ok());
let streamed_event = result.unwrap();
assert!(!streamed_event.serialized_data.is_empty());
assert!(streamed_event.serialized_data.len() > 5);
let json_size = serde_json::to_vec(&event).unwrap().len();
assert!(streamed_event.serialized_data.len() <= json_size);
}
#[test]
#[cfg(not(feature = "streaming"))]
fn test_protobuf_serialization_requires_feature() {
let event = create_test_job_lifecycle_event();
let stream_config = StreamConfig {
name: "test_stream".to_string(),
serialization: SerializationFormat::Protobuf {
schema_definition: "syntax = \"proto3\";".to_string(),
},
..Default::default()
};
let result = StreamManager::prepare_streamed_event(event, &stream_config);
assert!(result.is_err());
let error = result.unwrap_err();
if let crate::error::HammerworkError::Streaming { message } = error {
assert!(message.contains("streaming"));
assert!(message.contains("feature"));
} else {
panic!("Expected Streaming error");
}
}
#[test]
#[cfg(feature = "streaming")]
fn test_serialization_format_size_comparison() {
let event = create_test_job_lifecycle_event();
let json_config = StreamConfig {
name: "json_stream".to_string(),
serialization: SerializationFormat::Json,
..Default::default()
};
let msgpack_config = StreamConfig {
name: "msgpack_stream".to_string(),
serialization: SerializationFormat::MessagePack,
..Default::default()
};
let avro_config = StreamConfig {
name: "avro_stream".to_string(),
serialization: SerializationFormat::Avro {
schema_registry_url: "http://localhost:8081".to_string(),
},
..Default::default()
};
let protobuf_config = StreamConfig {
name: "protobuf_stream".to_string(),
serialization: SerializationFormat::Protobuf {
schema_definition: "syntax = \"proto3\";".to_string(),
},
..Default::default()
};
let json_result =
StreamManager::prepare_streamed_event(event.clone(), &json_config).unwrap();
let msgpack_result =
StreamManager::prepare_streamed_event(event.clone(), &msgpack_config).unwrap();
let avro_result =
StreamManager::prepare_streamed_event(event.clone(), &avro_config).unwrap();
let protobuf_result =
StreamManager::prepare_streamed_event(event.clone(), &protobuf_config).unwrap();
assert!(!json_result.serialized_data.is_empty());
assert!(!msgpack_result.serialized_data.is_empty());
assert!(!avro_result.serialized_data.is_empty());
assert!(!protobuf_result.serialized_data.is_empty());
let json_size = json_result.serialized_data.len();
let msgpack_size = msgpack_result.serialized_data.len();
let protobuf_size = protobuf_result.serialized_data.len();
assert!(
msgpack_size <= json_size,
"MessagePack ({} bytes) should be <= JSON ({} bytes)",
msgpack_size,
json_size
);
assert!(
protobuf_size <= json_size,
"Protobuf ({} bytes) should be <= JSON ({} bytes)",
protobuf_size,
json_size
);
}
#[tokio::test]
async fn test_stream_manager_creation() {
let config = StreamManagerConfig::default();
let event_manager = Arc::new(EventManager::new_default());
let manager = StreamManager::new(event_manager, config);
let stats = manager.get_stats().await;
assert_eq!(stats.total_streams, 0);
assert_eq!(stats.active_streams, 0);
}
#[tokio::test]
async fn test_stream_manager_add_remove_stream() {
let config = StreamManagerConfig::default();
let event_manager = Arc::new(EventManager::new_default());
let manager = StreamManager::new(event_manager, config);
let backend = StreamBackend::Kafka {
brokers: vec!["localhost:9092".to_string()],
topic: "test-events".to_string(),
config: HashMap::new(),
};
let stream_config = StreamConfig::new("test-stream".to_string(), backend)
.with_partitioning(PartitioningStrategy::QueueName)
.with_serialization(SerializationFormat::Json);
let stream_id = stream_config.id;
manager.add_stream(stream_config).await.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.total_streams, 1);
assert_eq!(stats.active_streams, 1);
manager.remove_stream(stream_id).await.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.total_streams, 0);
assert_eq!(stats.active_streams, 0);
}
#[tokio::test]
async fn test_stream_manager_enable_disable() {
let config = StreamManagerConfig::default();
let event_manager = Arc::new(EventManager::new_default());
let manager = StreamManager::new(event_manager, config);
let backend = StreamBackend::Kinesis {
region: "us-west-2".to_string(),
stream_name: "test-stream".to_string(),
access_key_id: None,
secret_access_key: None,
config: HashMap::new(),
};
let stream_config = StreamConfig::new("test-stream".to_string(), backend);
let stream_id = stream_config.id;
manager.add_stream(stream_config).await.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.active_streams, 1);
manager.disable_stream(stream_id).await.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.active_streams, 0);
assert_eq!(stats.total_streams, 1);
manager.enable_stream(stream_id).await.unwrap();
let stats = manager.get_stats().await;
assert_eq!(stats.active_streams, 1);
}
#[test]
fn test_streamed_event_creation() {
let event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "test_queue".to_string(),
event_type: JobLifecycleEventType::Completed,
priority: JobPriority::Normal,
timestamp: Utc::now(),
processing_time_ms: Some(1500),
error: None,
payload: Some(serde_json::json!({"key": "value"})),
metadata: HashMap::new(),
};
let _stream_config = StreamConfig::new(
"test-stream".to_string(),
StreamBackend::Kafka {
brokers: vec!["localhost:9092".to_string()],
topic: "events".to_string(),
config: HashMap::new(),
},
);
let streamed_event = StreamedEvent {
event,
partition_key: Some("partition-1".to_string()),
serialized_data: b"serialized data".to_vec(),
streamed_at: Utc::now(),
headers: HashMap::new(),
};
assert!(streamed_event.partition_key.is_some());
assert!(!streamed_event.serialized_data.is_empty());
assert_eq!(
streamed_event.event.event_type,
JobLifecycleEventType::Completed
);
}
#[test]
fn test_stream_delivery_result() {
let delivery = StreamDelivery {
delivery_id: Uuid::new_v4(),
stream_id: Uuid::new_v4(),
event_id: Uuid::new_v4(),
success: true,
error_message: None,
attempted_at: Utc::now(),
duration_ms: Some(100),
attempt_number: 1,
partition: Some("partition-0".to_string()),
};
assert!(delivery.success);
assert!(delivery.error_message.is_none());
assert_eq!(delivery.attempt_number, 1);
assert_eq!(delivery.duration_ms, Some(100));
}
#[test]
fn test_stream_stats_aggregation() {
let mut stats = StreamStats {
stream_id: Uuid::new_v4(),
total_events: 1000,
successful_deliveries: 950,
failed_deliveries: 50,
success_rate: 0.95,
avg_delivery_time_ms: 125.5,
buffered_events: 25,
last_success_at: Some(Utc::now() - chrono::Duration::minutes(2)),
last_failure_at: Some(Utc::now() - chrono::Duration::minutes(10)),
calculated_at: Utc::now(),
};
assert_eq!(
stats.total_events,
stats.successful_deliveries + stats.failed_deliveries
);
assert!((stats.success_rate - 0.95).abs() < f64::EPSILON);
assert!(stats.last_success_at.is_some());
assert!(stats.last_failure_at.is_some());
assert_eq!(stats.buffered_events, 25);
stats.total_events += 1;
stats.successful_deliveries += 1;
stats.success_rate = stats.successful_deliveries as f64 / stats.total_events as f64;
assert_eq!(stats.total_events, 1001);
assert_eq!(stats.successful_deliveries, 951);
assert!((stats.success_rate - 951.0 / 1001.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_kafka_processor_creation() {
let processor = KafkaProcessor::new(
vec!["localhost:9092".to_string()],
"test-topic".to_string(),
HashMap::new(),
)
.await
.unwrap();
assert_eq!(processor.topic, "test-topic");
}
#[tokio::test]
async fn test_kinesis_processor_creation() {
let processor = KinesisProcessor::new(
"us-east-1".to_string(),
"test-stream".to_string(),
None,
None,
HashMap::new(),
)
.await
.unwrap();
assert_eq!(processor.region, "us-east-1");
assert_eq!(processor.stream_name, "test-stream");
assert!(processor.access_key_id.is_none());
}
#[tokio::test]
async fn test_pubsub_processor_creation() {
let processor = PubSubProcessor::new(
"my-project".to_string(),
"test-topic".to_string(),
None,
HashMap::new(),
)
.await
.unwrap();
assert_eq!(processor.project_id, "my-project");
assert_eq!(processor.topic_name, "test-topic");
assert!(processor.service_account_key.is_none());
}
#[tokio::test]
async fn test_stream_processor_health_check() {
let processor = KafkaProcessor::new(
vec!["localhost:9092".to_string()],
"test-topic".to_string(),
HashMap::new(),
)
.await
.unwrap();
let health = processor.health_check().await.unwrap();
assert!(health);
}
#[tokio::test]
async fn test_stream_processor_batch_sending() {
let processor = KafkaProcessor::new(
vec!["localhost:9092".to_string()],
"test-topic".to_string(),
HashMap::new(),
)
.await
.unwrap();
let event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "test".to_string(),
event_type: JobLifecycleEventType::Completed,
priority: JobPriority::Normal,
timestamp: Utc::now(),
processing_time_ms: Some(1000),
error: None,
payload: None,
metadata: HashMap::new(),
};
let streamed_event = StreamedEvent {
event,
partition_key: Some("partition-0".to_string()),
serialized_data: b"test data".to_vec(),
streamed_at: Utc::now(),
headers: HashMap::new(),
};
let events = vec![streamed_event];
let deliveries = processor.send_batch(events).await.unwrap();
assert_eq!(deliveries.len(), 1);
assert!(deliveries[0].success);
}
#[test]
fn test_partition_field_combinations() {
let event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "test_queue".to_string(),
event_type: JobLifecycleEventType::Started,
priority: JobPriority::High,
timestamp: Utc::now(),
processing_time_ms: None,
error: None,
payload: None,
metadata: HashMap::new(),
};
let queue_partition =
StreamManager::calculate_partition_key(&event, &PartitioningStrategy::QueueName);
assert_eq!(queue_partition, Some("test_queue".to_string()));
let priority_partition =
StreamManager::calculate_partition_key(&event, &PartitioningStrategy::Priority);
assert_eq!(priority_partition, Some("high".to_string()));
let event_type_partition =
StreamManager::calculate_partition_key(&event, &PartitioningStrategy::EventType);
assert_eq!(event_type_partition, Some("started".to_string()));
let hash_partition = StreamManager::calculate_partition_key(
&event,
&PartitioningStrategy::Hash {
fields: vec![
PartitionField::QueueName,
PartitionField::Priority,
PartitionField::EventType,
],
},
);
assert!(hash_partition.is_some());
let hash_partition2 = StreamManager::calculate_partition_key(
&event,
&PartitioningStrategy::Hash {
fields: vec![
PartitionField::QueueName,
PartitionField::Priority,
PartitionField::EventType,
],
},
);
assert_eq!(hash_partition, hash_partition2);
}
#[test]
fn test_stream_config_builder() {
let backend = StreamBackend::PubSub {
project_id: "test-project".to_string(),
topic_name: "events".to_string(),
service_account_key: None,
config: HashMap::new(),
};
let filter = EventFilter::new().with_event_types(vec![
JobLifecycleEventType::Completed,
JobLifecycleEventType::Failed,
]);
let retry_policy = StreamRetryPolicy {
max_attempts: 5,
initial_delay_secs: 2,
max_delay_secs: 120,
backoff_multiplier: 2.0,
use_jitter: true,
};
let buffer_config = BufferConfig {
max_events: 1000,
max_buffer_time_secs: 30,
batch_size: 100,
};
let stream = StreamConfig::new("comprehensive-stream".to_string(), backend)
.with_filter(filter)
.with_partitioning(PartitioningStrategy::Hash {
fields: vec![PartitionField::QueueName, PartitionField::Priority],
})
.with_serialization(SerializationFormat::Avro {
schema_registry_url: "http://schema-registry:8081".to_string(),
})
.with_retry_policy(retry_policy)
.with_buffer_config(buffer_config)
.enabled(true);
assert_eq!(stream.name, "comprehensive-stream");
assert!(stream.enabled);
assert!(matches!(
stream.partitioning,
PartitioningStrategy::Hash { .. }
));
assert!(matches!(
stream.serialization,
SerializationFormat::Avro { .. }
));
assert_eq!(stream.retry_policy.max_attempts, 5);
assert!(stream.retry_policy.use_jitter);
assert_eq!(stream.buffer_config.max_events, 1000);
assert_eq!(stream.filter.event_types.len(), 2);
}
#[test]
fn test_stream_manager_config_defaults() {
let config = StreamManagerConfig::default();
assert_eq!(config.max_concurrent_processors, 50);
assert!(config.log_operations);
assert_eq!(config.global_flush_interval_secs, 10);
}
#[test]
fn test_stream_config_serialization() {
let backend = StreamBackend::Kinesis {
region: "eu-west-1".to_string(),
stream_name: "production-events".to_string(),
access_key_id: Some("AKIA...".to_string()),
secret_access_key: Some("secret".to_string()),
config: {
let mut config = HashMap::new();
config.insert("batch_size".to_string(), "500".to_string());
config
},
};
let stream_config = StreamConfig::new("prod-stream".to_string(), backend)
.with_partitioning(PartitioningStrategy::Custom {
metadata_key: "tenant_id".to_string(),
})
.with_serialization(SerializationFormat::Protobuf {
schema_definition: "syntax = \"proto3\"; message Event { string id = 1; }"
.to_string(),
});
let serialized = serde_json::to_string(&stream_config).unwrap();
let deserialized: StreamConfig = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.name, stream_config.name);
assert_eq!(deserialized.enabled, stream_config.enabled);
match &deserialized.backend {
StreamBackend::Kinesis {
region,
stream_name,
..
} => {
assert_eq!(region, "eu-west-1");
assert_eq!(stream_name, "production-events");
}
_ => panic!("Wrong backend type after deserialization"),
}
}
#[tokio::test]
async fn test_stream_processor_stats() {
let processor = PubSubProcessor::new(
"test-project".to_string(),
"test-topic".to_string(),
None,
HashMap::new(),
)
.await
.unwrap();
let stats = processor.get_stats().await.unwrap();
assert_eq!(
stats["type"],
serde_json::Value::String("pubsub".to_string())
);
assert_eq!(
stats["project_id"],
serde_json::Value::String("test-project".to_string())
);
assert_eq!(
stats["topic_name"],
serde_json::Value::String("test-topic".to_string())
);
}
#[test]
fn test_buffer_config_defaults() {
let buffer = BufferConfig::default();
assert_eq!(buffer.max_events, 1000);
assert_eq!(buffer.max_buffer_time_secs, 5);
assert_eq!(buffer.batch_size, 100);
}
#[test]
fn test_stream_retry_policy_defaults() {
let policy = StreamRetryPolicy::default();
assert_eq!(policy.max_attempts, 5);
assert_eq!(policy.initial_delay_secs, 1);
assert_eq!(policy.max_delay_secs, 300);
assert_eq!(policy.backoff_multiplier, 2.0);
assert!(policy.use_jitter);
}
#[test]
fn test_custom_metadata_partitioning() {
let mut metadata = HashMap::new();
metadata.insert("tenant_id".to_string(), "tenant_123".to_string());
metadata.insert("region".to_string(), "us-west-2".to_string());
let event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "multi_tenant_queue".to_string(),
event_type: JobLifecycleEventType::Enqueued,
priority: JobPriority::Normal,
timestamp: Utc::now(),
processing_time_ms: None,
error: None,
payload: None,
metadata,
};
let tenant_partition = StreamManager::calculate_partition_key(
&event,
&PartitioningStrategy::Custom {
metadata_key: "tenant_id".to_string(),
},
);
assert_eq!(tenant_partition, Some("tenant_123".to_string()));
let region_partition = StreamManager::calculate_partition_key(
&event,
&PartitioningStrategy::Custom {
metadata_key: "region".to_string(),
},
);
assert_eq!(region_partition, Some("us-west-2".to_string()));
let missing_partition = StreamManager::calculate_partition_key(
&event,
&PartitioningStrategy::Custom {
metadata_key: "missing_key".to_string(),
},
);
assert_eq!(missing_partition, None);
}
#[tokio::test]
async fn test_stream_processor_configuration_usage() {
let mut kafka_config = HashMap::new();
kafka_config.insert("batch.delay.ms".to_string(), "50".to_string());
kafka_config.insert("test.error.rate".to_string(), "0.1".to_string());
kafka_config.insert("health.check.timeout.ms".to_string(), "1000".to_string());
let kafka_processor = KafkaProcessor::new(
vec!["localhost:9092".to_string()],
"test-topic".to_string(),
kafka_config,
)
.await
.unwrap();
let stats = kafka_processor.get_stats().await.unwrap();
assert_eq!(
stats.get("type").unwrap(),
&serde_json::Value::String("kafka".to_string())
);
assert!(stats.contains_key("config"));
let mut kinesis_config = HashMap::new();
kinesis_config.insert("retries".to_string(), "3".to_string());
let kinesis_processor = KinesisProcessor::new(
"us-west-2".to_string(),
"test-stream".to_string(),
Some("AKIA***".to_string()),
Some("secret".to_string()),
kinesis_config,
)
.await
.unwrap();
let kinesis_stats = kinesis_processor.get_stats().await.unwrap();
assert_eq!(
kinesis_stats.get("type").unwrap(),
&serde_json::Value::String("kinesis".to_string())
);
assert!(kinesis_stats.contains_key("access_key_id"));
assert!(kinesis_stats.contains_key("config"));
let mut pubsub_config = HashMap::new();
pubsub_config.insert("max_messages".to_string(), "1000".to_string());
let pubsub_processor = PubSubProcessor::new(
"my-project".to_string(),
"test-topic".to_string(),
Some("service-account-key".to_string()),
pubsub_config,
)
.await
.unwrap();
let pubsub_stats = pubsub_processor.get_stats().await.unwrap();
assert_eq!(
pubsub_stats.get("type").unwrap(),
&serde_json::Value::String("pubsub".to_string())
);
assert_eq!(
pubsub_stats.get("service_account_configured").unwrap(),
&serde_json::Value::Bool(true)
);
assert!(pubsub_stats.contains_key("config"));
}
}