oxirs_stream/backend/
mod.rs1use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10use crate::consumer::ConsumerGroup;
11use crate::error::StreamResult;
12use crate::event::StreamEvent;
13use crate::types::{Offset, PartitionId, StreamPosition, TopicName};
14
15#[async_trait]
17pub trait StreamBackend: Send + Sync {
18 fn name(&self) -> &'static str;
20
21 async fn connect(&mut self) -> StreamResult<()>;
23
24 async fn disconnect(&mut self) -> StreamResult<()>;
26
27 async fn create_topic(&self, topic: &TopicName, partitions: u32) -> StreamResult<()>;
29
30 async fn delete_topic(&self, topic: &TopicName) -> StreamResult<()>;
32
33 async fn list_topics(&self) -> StreamResult<Vec<TopicName>>;
35
36 async fn send_event(&self, topic: &TopicName, event: StreamEvent) -> StreamResult<Offset>;
38
39 async fn send_batch(
41 &self,
42 topic: &TopicName,
43 events: Vec<StreamEvent>,
44 ) -> StreamResult<Vec<Offset>>;
45
46 async fn receive_events(
48 &self,
49 topic: &TopicName,
50 consumer_group: Option<&ConsumerGroup>,
51 position: StreamPosition,
52 max_events: usize,
53 ) -> StreamResult<Vec<(StreamEvent, Offset)>>;
54
55 async fn commit_offset(
57 &self,
58 topic: &TopicName,
59 consumer_group: &ConsumerGroup,
60 partition: PartitionId,
61 offset: Offset,
62 ) -> StreamResult<()>;
63
64 async fn seek(
66 &self,
67 topic: &TopicName,
68 consumer_group: &ConsumerGroup,
69 partition: PartitionId,
70 position: StreamPosition,
71 ) -> StreamResult<()>;
72
73 async fn get_consumer_lag(
75 &self,
76 topic: &TopicName,
77 consumer_group: &ConsumerGroup,
78 ) -> StreamResult<HashMap<PartitionId, u64>>;
79
80 async fn get_topic_metadata(&self, topic: &TopicName) -> StreamResult<HashMap<String, String>>;
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct StreamBackendConfig {
87 pub backend_type: BackendType,
88 pub connection_timeout_ms: u64,
89 pub retry_attempts: u32,
90 pub retry_delay_ms: u64,
91 pub health_check_interval_ms: u64,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
96pub enum BackendType {
97 Kafka,
98 Nats,
99 Redis,
100 Kinesis,
101 Pulsar,
102 RabbitMQ,
103 Memory,
104}
105
106impl Default for StreamBackendConfig {
107 fn default() -> Self {
108 Self {
109 backend_type: BackendType::Memory,
110 connection_timeout_ms: 5000,
111 retry_attempts: 3,
112 retry_delay_ms: 100,
113 health_check_interval_ms: 30000,
114 }
115 }
116}
117
118pub mod memory;
120
121#[cfg(feature = "redis")]
122pub mod redis;
123
124#[cfg(feature = "kafka")]
125pub mod kafka;
126
127#[cfg(feature = "nats")]
128pub mod nats;
129
130#[cfg(feature = "kinesis")]
131pub mod kinesis;
132
133#[cfg(feature = "pulsar")]
134pub mod pulsar;
135
136#[cfg(feature = "rabbitmq")]
137pub mod rabbitmq;
138
139#[cfg(feature = "mqtt")]
140pub mod mqtt;
141
142#[cfg(feature = "opcua")]
143pub mod opcua;
144
145#[cfg(feature = "kafka")]
146pub mod kafka_schema_registry;