Skip to main content

oxirs_stream/
consumer.rs

1//! # Stream Consumer
2//!
3//! Consumer types and configuration for streaming backends.
4
5use serde::{Deserialize, Serialize};
6
7/// Consumer configuration
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct ConsumerConfig {
10    pub group_id: String,
11    pub consumer_id: Option<String>,
12    pub auto_commit: bool,
13    pub commit_interval_ms: u64,
14    pub max_poll_records: usize,
15    pub session_timeout_ms: u64,
16    pub heartbeat_interval_ms: u64,
17    pub enable_auto_offset_store: bool,
18    pub isolation_level: IsolationLevel,
19}
20
21impl Default for ConsumerConfig {
22    fn default() -> Self {
23        Self {
24            group_id: "oxirs-consumer-group".to_string(),
25            consumer_id: None,
26            auto_commit: true,
27            commit_interval_ms: 5000,
28            max_poll_records: 500,
29            session_timeout_ms: 30000,
30            heartbeat_interval_ms: 3000,
31            enable_auto_offset_store: true,
32            isolation_level: IsolationLevel::ReadCommitted,
33        }
34    }
35}
36
37/// Consumer group representation
38#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
39pub struct ConsumerGroup {
40    name: String,
41    consumer_id: Option<String>,
42}
43
44impl ConsumerGroup {
45    pub fn new(name: String) -> Self {
46        Self {
47            name,
48            consumer_id: None,
49        }
50    }
51
52    pub fn with_consumer_id(mut self, consumer_id: String) -> Self {
53        self.consumer_id = Some(consumer_id);
54        self
55    }
56
57    pub fn name(&self) -> &str {
58        &self.name
59    }
60
61    pub fn consumer_id(&self) -> Option<&str> {
62        self.consumer_id.as_deref()
63    }
64}
65
66/// Transaction isolation levels
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
68pub enum IsolationLevel {
69    ReadUncommitted,
70    ReadCommitted,
71}
72
73/// Consumer position reset strategy
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
75pub enum OffsetReset {
76    Earliest,
77    #[default]
78    Latest,
79    None,
80}