Skip to main content

rivven_core/
offset.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use tracing::{debug, warn};
6
7/// Partition to offset mapping
8type PartitionOffsets = HashMap<u32, u64>;
9/// Topic to partition offsets mapping
10type TopicOffsets = HashMap<String, PartitionOffsets>;
11/// Consumer group to topic offsets mapping
12type GroupOffsets = HashMap<String, TopicOffsets>;
13
14/// Manages consumer offsets for topics and partitions.
15///
16/// Supports optional file-based persistence — when a `data_dir` is provided,
17/// offsets are atomically checkpointed to `<data_dir>/offsets.json` on every
18/// commit and loaded on startup.
19#[derive(Debug, Clone)]
20pub struct OffsetManager {
21    /// Map of consumer_group -> topic -> partition -> offset
22    offsets: Arc<RwLock<GroupOffsets>>,
23    /// Optional persistence path
24    data_dir: Option<PathBuf>,
25}
26
27impl OffsetManager {
28    /// Create a new in-memory offset manager (no persistence)
29    pub fn new() -> Self {
30        Self {
31            offsets: Arc::new(RwLock::new(HashMap::new())),
32            data_dir: None,
33        }
34    }
35
36    /// Create an offset manager that persists offsets to disk.
37    ///
38    /// Loads existing offsets from `<data_dir>/offsets.json` if the file exists.
39    pub fn with_persistence(data_dir: PathBuf) -> Self {
40        std::fs::create_dir_all(&data_dir).ok();
41        let path = data_dir.join("offsets.json");
42
43        let offsets = if path.exists() {
44            match std::fs::read_to_string(&path) {
45                Ok(content) => match serde_json::from_str::<GroupOffsets>(&content) {
46                    Ok(loaded) => {
47                        debug!(
48                            "Loaded {} consumer group offsets from {}",
49                            loaded.len(),
50                            path.display()
51                        );
52                        loaded
53                    }
54                    Err(e) => {
55                        warn!("Failed to parse offsets file {}: {}", path.display(), e);
56                        HashMap::new()
57                    }
58                },
59                Err(e) => {
60                    warn!("Failed to read offsets file {}: {}", path.display(), e);
61                    HashMap::new()
62                }
63            }
64        } else {
65            HashMap::new()
66        };
67
68        Self {
69            offsets: Arc::new(RwLock::new(offsets)),
70            data_dir: Some(data_dir),
71        }
72    }
73
74    /// Atomically checkpoint offsets to disk (if persistence is enabled)
75    async fn checkpoint(&self) {
76        if let Some(ref dir) = self.data_dir {
77            let offsets = self.offsets.read().await;
78            let path = dir.join("offsets.json");
79            let tmp_path = dir.join("offsets.json.tmp");
80
81            match serde_json::to_string(&*offsets) {
82                Ok(json) => {
83                    // Atomic write: write to temp file, then rename
84                    if let Err(e) = std::fs::write(&tmp_path, json.as_bytes()) {
85                        warn!("Failed to write offset checkpoint: {}", e);
86                        return;
87                    }
88                    if let Err(e) = std::fs::rename(&tmp_path, &path) {
89                        warn!("Failed to rename offset checkpoint: {}", e);
90                    }
91                }
92                Err(e) => {
93                    warn!("Failed to serialize offsets: {}", e);
94                }
95            }
96        }
97    }
98
99    /// Commit an offset for a consumer group
100    pub async fn commit_offset(
101        &self,
102        consumer_group: &str,
103        topic: &str,
104        partition: u32,
105        offset: u64,
106    ) {
107        {
108            let mut offsets = self.offsets.write().await;
109
110            offsets
111                .entry(consumer_group.to_string())
112                .or_insert_with(HashMap::new)
113                .entry(topic.to_string())
114                .or_insert_with(HashMap::new)
115                .insert(partition, offset);
116        }
117
118        self.checkpoint().await;
119    }
120
121    /// Get the committed offset for a consumer group
122    pub async fn get_offset(
123        &self,
124        consumer_group: &str,
125        topic: &str,
126        partition: u32,
127    ) -> Option<u64> {
128        let offsets = self.offsets.read().await;
129
130        offsets
131            .get(consumer_group)
132            .and_then(|topics| topics.get(topic))
133            .and_then(|partitions| partitions.get(&partition))
134            .copied()
135    }
136
137    /// Reset offsets for a consumer group
138    pub async fn reset_offsets(&self, consumer_group: &str) {
139        {
140            let mut offsets = self.offsets.write().await;
141            offsets.remove(consumer_group);
142        }
143        self.checkpoint().await;
144    }
145
146    /// List all consumer groups with committed offsets
147    pub async fn list_groups(&self) -> Vec<String> {
148        let offsets = self.offsets.read().await;
149        offsets.keys().cloned().collect()
150    }
151
152    /// Get all offsets for a consumer group
153    /// Returns: topic → partition → offset
154    pub async fn get_group_offsets(
155        &self,
156        consumer_group: &str,
157    ) -> Option<HashMap<String, HashMap<u32, u64>>> {
158        let offsets = self.offsets.read().await;
159        offsets.get(consumer_group).cloned()
160    }
161
162    /// Delete a consumer group and all its offsets
163    pub async fn delete_group(&self, consumer_group: &str) -> bool {
164        let removed = {
165            let mut offsets = self.offsets.write().await;
166            offsets.remove(consumer_group).is_some()
167        };
168        if removed {
169            self.checkpoint().await;
170        }
171        removed
172    }
173}
174
175impl Default for OffsetManager {
176    fn default() -> Self {
177        Self::new()
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    #[tokio::test]
186    async fn test_offset_management() {
187        let manager = OffsetManager::new();
188
189        manager.commit_offset("group1", "topic1", 0, 100).await;
190
191        let offset = manager.get_offset("group1", "topic1", 0).await;
192        assert_eq!(offset, Some(100));
193
194        let missing = manager.get_offset("group1", "topic1", 1).await;
195        assert_eq!(missing, None);
196    }
197
198    #[tokio::test]
199    async fn test_reset_offsets() {
200        let manager = OffsetManager::new();
201
202        manager.commit_offset("group1", "topic1", 0, 100).await;
203        manager.reset_offsets("group1").await;
204
205        let offset = manager.get_offset("group1", "topic1", 0).await;
206        assert_eq!(offset, None);
207    }
208
209    #[tokio::test]
210    async fn test_persistence_round_trip() {
211        let dir = tempfile::tempdir().unwrap();
212        let data_dir = dir.path().to_path_buf();
213
214        // Commit offsets
215        {
216            let manager = OffsetManager::with_persistence(data_dir.clone());
217            manager.commit_offset("grp1", "orders", 0, 42).await;
218            manager.commit_offset("grp1", "orders", 1, 99).await;
219            manager.commit_offset("grp2", "events", 0, 7).await;
220        }
221
222        // Reload from disk
223        let manager = OffsetManager::with_persistence(data_dir);
224        assert_eq!(manager.get_offset("grp1", "orders", 0).await, Some(42));
225        assert_eq!(manager.get_offset("grp1", "orders", 1).await, Some(99));
226        assert_eq!(manager.get_offset("grp2", "events", 0).await, Some(7));
227    }
228
229    #[tokio::test]
230    async fn test_persistence_delete_group() {
231        let dir = tempfile::tempdir().unwrap();
232        let data_dir = dir.path().to_path_buf();
233
234        let manager = OffsetManager::with_persistence(data_dir.clone());
235        manager.commit_offset("grp1", "t", 0, 10).await;
236        assert!(manager.delete_group("grp1").await);
237
238        // Reload — should be gone
239        let manager2 = OffsetManager::with_persistence(data_dir);
240        assert_eq!(manager2.get_offset("grp1", "t", 0).await, None);
241    }
242}