1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use tracing::{debug, warn};
6
7type PartitionOffsets = HashMap<u32, u64>;
9type TopicOffsets = HashMap<String, PartitionOffsets>;
11type GroupOffsets = HashMap<String, TopicOffsets>;
13
14#[derive(Debug, Clone)]
20pub struct OffsetManager {
21 offsets: Arc<RwLock<GroupOffsets>>,
23 data_dir: Option<PathBuf>,
25}
26
27impl OffsetManager {
28 pub fn new() -> Self {
30 Self {
31 offsets: Arc::new(RwLock::new(HashMap::new())),
32 data_dir: None,
33 }
34 }
35
36 pub fn with_persistence(data_dir: PathBuf) -> Self {
40 std::fs::create_dir_all(&data_dir).ok();
41 let path = data_dir.join("offsets.json");
42
43 let offsets = if path.exists() {
44 match std::fs::read_to_string(&path) {
45 Ok(content) => match serde_json::from_str::<GroupOffsets>(&content) {
46 Ok(loaded) => {
47 debug!(
48 "Loaded {} consumer group offsets from {}",
49 loaded.len(),
50 path.display()
51 );
52 loaded
53 }
54 Err(e) => {
55 warn!("Failed to parse offsets file {}: {}", path.display(), e);
56 HashMap::new()
57 }
58 },
59 Err(e) => {
60 warn!("Failed to read offsets file {}: {}", path.display(), e);
61 HashMap::new()
62 }
63 }
64 } else {
65 HashMap::new()
66 };
67
68 Self {
69 offsets: Arc::new(RwLock::new(offsets)),
70 data_dir: Some(data_dir),
71 }
72 }
73
74 async fn checkpoint(&self) {
76 if let Some(ref dir) = self.data_dir {
77 let offsets = self.offsets.read().await;
78 let path = dir.join("offsets.json");
79 let tmp_path = dir.join("offsets.json.tmp");
80
81 match serde_json::to_string(&*offsets) {
82 Ok(json) => {
83 if let Err(e) = std::fs::write(&tmp_path, json.as_bytes()) {
85 warn!("Failed to write offset checkpoint: {}", e);
86 return;
87 }
88 if let Err(e) = std::fs::rename(&tmp_path, &path) {
89 warn!("Failed to rename offset checkpoint: {}", e);
90 }
91 }
92 Err(e) => {
93 warn!("Failed to serialize offsets: {}", e);
94 }
95 }
96 }
97 }
98
99 pub async fn commit_offset(
101 &self,
102 consumer_group: &str,
103 topic: &str,
104 partition: u32,
105 offset: u64,
106 ) {
107 {
108 let mut offsets = self.offsets.write().await;
109
110 offsets
111 .entry(consumer_group.to_string())
112 .or_insert_with(HashMap::new)
113 .entry(topic.to_string())
114 .or_insert_with(HashMap::new)
115 .insert(partition, offset);
116 }
117
118 self.checkpoint().await;
119 }
120
121 pub async fn get_offset(
123 &self,
124 consumer_group: &str,
125 topic: &str,
126 partition: u32,
127 ) -> Option<u64> {
128 let offsets = self.offsets.read().await;
129
130 offsets
131 .get(consumer_group)
132 .and_then(|topics| topics.get(topic))
133 .and_then(|partitions| partitions.get(&partition))
134 .copied()
135 }
136
137 pub async fn reset_offsets(&self, consumer_group: &str) {
139 {
140 let mut offsets = self.offsets.write().await;
141 offsets.remove(consumer_group);
142 }
143 self.checkpoint().await;
144 }
145
146 pub async fn list_groups(&self) -> Vec<String> {
148 let offsets = self.offsets.read().await;
149 offsets.keys().cloned().collect()
150 }
151
152 pub async fn get_group_offsets(
155 &self,
156 consumer_group: &str,
157 ) -> Option<HashMap<String, HashMap<u32, u64>>> {
158 let offsets = self.offsets.read().await;
159 offsets.get(consumer_group).cloned()
160 }
161
162 pub async fn delete_group(&self, consumer_group: &str) -> bool {
164 let removed = {
165 let mut offsets = self.offsets.write().await;
166 offsets.remove(consumer_group).is_some()
167 };
168 if removed {
169 self.checkpoint().await;
170 }
171 removed
172 }
173}
174
175impl Default for OffsetManager {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184
185 #[tokio::test]
186 async fn test_offset_management() {
187 let manager = OffsetManager::new();
188
189 manager.commit_offset("group1", "topic1", 0, 100).await;
190
191 let offset = manager.get_offset("group1", "topic1", 0).await;
192 assert_eq!(offset, Some(100));
193
194 let missing = manager.get_offset("group1", "topic1", 1).await;
195 assert_eq!(missing, None);
196 }
197
198 #[tokio::test]
199 async fn test_reset_offsets() {
200 let manager = OffsetManager::new();
201
202 manager.commit_offset("group1", "topic1", 0, 100).await;
203 manager.reset_offsets("group1").await;
204
205 let offset = manager.get_offset("group1", "topic1", 0).await;
206 assert_eq!(offset, None);
207 }
208
209 #[tokio::test]
210 async fn test_persistence_round_trip() {
211 let dir = tempfile::tempdir().unwrap();
212 let data_dir = dir.path().to_path_buf();
213
214 {
216 let manager = OffsetManager::with_persistence(data_dir.clone());
217 manager.commit_offset("grp1", "orders", 0, 42).await;
218 manager.commit_offset("grp1", "orders", 1, 99).await;
219 manager.commit_offset("grp2", "events", 0, 7).await;
220 }
221
222 let manager = OffsetManager::with_persistence(data_dir);
224 assert_eq!(manager.get_offset("grp1", "orders", 0).await, Some(42));
225 assert_eq!(manager.get_offset("grp1", "orders", 1).await, Some(99));
226 assert_eq!(manager.get_offset("grp2", "events", 0).await, Some(7));
227 }
228
229 #[tokio::test]
230 async fn test_persistence_delete_group() {
231 let dir = tempfile::tempdir().unwrap();
232 let data_dir = dir.path().to_path_buf();
233
234 let manager = OffsetManager::with_persistence(data_dir.clone());
235 manager.commit_offset("grp1", "t", 0, 10).await;
236 assert!(manager.delete_group("grp1").await);
237
238 let manager2 = OffsetManager::with_persistence(data_dir);
240 assert_eq!(manager2.get_offset("grp1", "t", 0).await, None);
241 }
242}