Skip to main content

oxirs_stream/backend/
mod.rs

1//! # Stream Backend Abstraction
2//!
3//! This module provides a unified interface for different streaming backends
4//! including Kafka, NATS, Redis, Kinesis, and Pulsar.
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10use crate::consumer::ConsumerGroup;
11use crate::error::StreamResult;
12use crate::event::StreamEvent;
13use crate::types::{Offset, PartitionId, StreamPosition, TopicName};
14
15/// Common trait for all streaming backends
16#[async_trait]
17pub trait StreamBackend: Send + Sync {
18    /// Get the name of this backend
19    fn name(&self) -> &'static str;
20
21    /// Connect to the backend
22    async fn connect(&mut self) -> StreamResult<()>;
23
24    /// Disconnect from the backend
25    async fn disconnect(&mut self) -> StreamResult<()>;
26
27    /// Create a new topic/stream
28    async fn create_topic(&self, topic: &TopicName, partitions: u32) -> StreamResult<()>;
29
30    /// Delete a topic/stream
31    async fn delete_topic(&self, topic: &TopicName) -> StreamResult<()>;
32
33    /// List all topics/streams
34    async fn list_topics(&self) -> StreamResult<Vec<TopicName>>;
35
36    /// Send a single event
37    async fn send_event(&self, topic: &TopicName, event: StreamEvent) -> StreamResult<Offset>;
38
39    /// Send multiple events as a batch
40    async fn send_batch(
41        &self,
42        topic: &TopicName,
43        events: Vec<StreamEvent>,
44    ) -> StreamResult<Vec<Offset>>;
45
46    /// Receive events from a topic
47    async fn receive_events(
48        &self,
49        topic: &TopicName,
50        consumer_group: Option<&ConsumerGroup>,
51        position: StreamPosition,
52        max_events: usize,
53    ) -> StreamResult<Vec<(StreamEvent, Offset)>>;
54
55    /// Commit consumer offset
56    async fn commit_offset(
57        &self,
58        topic: &TopicName,
59        consumer_group: &ConsumerGroup,
60        partition: PartitionId,
61        offset: Offset,
62    ) -> StreamResult<()>;
63
64    /// Seek to a specific position
65    async fn seek(
66        &self,
67        topic: &TopicName,
68        consumer_group: &ConsumerGroup,
69        partition: PartitionId,
70        position: StreamPosition,
71    ) -> StreamResult<()>;
72
73    /// Get consumer lag information
74    async fn get_consumer_lag(
75        &self,
76        topic: &TopicName,
77        consumer_group: &ConsumerGroup,
78    ) -> StreamResult<HashMap<PartitionId, u64>>;
79
80    /// Get topic metadata
81    async fn get_topic_metadata(&self, topic: &TopicName) -> StreamResult<HashMap<String, String>>;
82}
83
84/// Backend configuration
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct StreamBackendConfig {
87    pub backend_type: BackendType,
88    pub connection_timeout_ms: u64,
89    pub retry_attempts: u32,
90    pub retry_delay_ms: u64,
91    pub health_check_interval_ms: u64,
92}
93
94/// Backend types
95#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
96pub enum BackendType {
97    Kafka,
98    Nats,
99    Redis,
100    Kinesis,
101    Pulsar,
102    RabbitMQ,
103    Memory,
104}
105
106impl Default for StreamBackendConfig {
107    fn default() -> Self {
108        Self {
109            backend_type: BackendType::Memory,
110            connection_timeout_ms: 5000,
111            retry_attempts: 3,
112            retry_delay_ms: 100,
113            health_check_interval_ms: 30000,
114        }
115    }
116}
117
118// Re-export backend implementations
119pub mod memory;
120
121#[cfg(feature = "redis")]
122pub mod redis;
123
124#[cfg(feature = "kafka")]
125pub mod kafka;
126
127#[cfg(feature = "nats")]
128pub mod nats;
129
130#[cfg(feature = "kinesis")]
131pub mod kinesis;
132
133#[cfg(feature = "pulsar")]
134pub mod pulsar;
135
136#[cfg(feature = "rabbitmq")]
137pub mod rabbitmq;
138
139#[cfg(feature = "mqtt")]
140pub mod mqtt;
141
142#[cfg(feature = "opcua")]
143pub mod opcua;
144
145#[cfg(feature = "kafka")]
146pub mod kafka_schema_registry;