use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::consumer::ConsumerGroup;
use crate::error::StreamResult;
use crate::event::StreamEvent;
use crate::types::{Offset, PartitionId, StreamPosition, TopicName};
#[async_trait]
pub trait StreamBackend: Send + Sync {
fn name(&self) -> &'static str;
async fn connect(&mut self) -> StreamResult<()>;
async fn disconnect(&mut self) -> StreamResult<()>;
async fn create_topic(&self, topic: &TopicName, partitions: u32) -> StreamResult<()>;
async fn delete_topic(&self, topic: &TopicName) -> StreamResult<()>;
async fn list_topics(&self) -> StreamResult<Vec<TopicName>>;
async fn send_event(&self, topic: &TopicName, event: StreamEvent) -> StreamResult<Offset>;
async fn send_batch(
&self,
topic: &TopicName,
events: Vec<StreamEvent>,
) -> StreamResult<Vec<Offset>>;
async fn receive_events(
&self,
topic: &TopicName,
consumer_group: Option<&ConsumerGroup>,
position: StreamPosition,
max_events: usize,
) -> StreamResult<Vec<(StreamEvent, Offset)>>;
async fn commit_offset(
&self,
topic: &TopicName,
consumer_group: &ConsumerGroup,
partition: PartitionId,
offset: Offset,
) -> StreamResult<()>;
async fn seek(
&self,
topic: &TopicName,
consumer_group: &ConsumerGroup,
partition: PartitionId,
position: StreamPosition,
) -> StreamResult<()>;
async fn get_consumer_lag(
&self,
topic: &TopicName,
consumer_group: &ConsumerGroup,
) -> StreamResult<HashMap<PartitionId, u64>>;
async fn get_topic_metadata(&self, topic: &TopicName) -> StreamResult<HashMap<String, String>>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamBackendConfig {
pub backend_type: BackendType,
pub connection_timeout_ms: u64,
pub retry_attempts: u32,
pub retry_delay_ms: u64,
pub health_check_interval_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
pub enum BackendType {
Kafka,
Nats,
Redis,
Kinesis,
Pulsar,
RabbitMQ,
Memory,
}
impl Default for StreamBackendConfig {
fn default() -> Self {
Self {
backend_type: BackendType::Memory,
connection_timeout_ms: 5000,
retry_attempts: 3,
retry_delay_ms: 100,
health_check_interval_ms: 30000,
}
}
}
pub mod memory;
#[cfg(feature = "redis")]
pub mod redis;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "nats")]
pub mod nats;
#[cfg(feature = "kinesis")]
pub mod kinesis;
#[cfg(feature = "pulsar")]
pub mod pulsar;
#[cfg(feature = "rabbitmq")]
pub mod rabbitmq;
#[cfg(feature = "mqtt")]
pub mod mqtt;
#[cfg(feature = "opcua")]
pub mod opcua;
#[cfg(feature = "kafka")]
pub mod kafka_schema_registry;