1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::RwLock;
4
5type PartitionOffsets = HashMap<u32, u64>;
7type TopicOffsets = HashMap<String, PartitionOffsets>;
9type GroupOffsets = HashMap<String, TopicOffsets>;
11
12#[derive(Debug, Clone)]
14pub struct OffsetManager {
15 offsets: Arc<RwLock<GroupOffsets>>,
17}
18
19impl OffsetManager {
20 pub fn new() -> Self {
22 Self {
23 offsets: Arc::new(RwLock::new(HashMap::new())),
24 }
25 }
26
27 pub async fn commit_offset(
29 &self,
30 consumer_group: &str,
31 topic: &str,
32 partition: u32,
33 offset: u64,
34 ) {
35 let mut offsets = self.offsets.write().await;
36
37 offsets
38 .entry(consumer_group.to_string())
39 .or_insert_with(HashMap::new)
40 .entry(topic.to_string())
41 .or_insert_with(HashMap::new)
42 .insert(partition, offset);
43 }
44
45 pub async fn get_offset(
47 &self,
48 consumer_group: &str,
49 topic: &str,
50 partition: u32,
51 ) -> Option<u64> {
52 let offsets = self.offsets.read().await;
53
54 offsets
55 .get(consumer_group)
56 .and_then(|topics| topics.get(topic))
57 .and_then(|partitions| partitions.get(&partition))
58 .copied()
59 }
60
61 pub async fn reset_offsets(&self, consumer_group: &str) {
63 let mut offsets = self.offsets.write().await;
64 offsets.remove(consumer_group);
65 }
66
67 pub async fn list_groups(&self) -> Vec<String> {
69 let offsets = self.offsets.read().await;
70 offsets.keys().cloned().collect()
71 }
72
73 pub async fn get_group_offsets(
76 &self,
77 consumer_group: &str,
78 ) -> Option<HashMap<String, HashMap<u32, u64>>> {
79 let offsets = self.offsets.read().await;
80 offsets.get(consumer_group).cloned()
81 }
82
83 pub async fn delete_group(&self, consumer_group: &str) -> bool {
85 let mut offsets = self.offsets.write().await;
86 offsets.remove(consumer_group).is_some()
87 }
88}
89
90impl Default for OffsetManager {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[tokio::test]
101 async fn test_offset_management() {
102 let manager = OffsetManager::new();
103
104 manager.commit_offset("group1", "topic1", 0, 100).await;
105
106 let offset = manager.get_offset("group1", "topic1", 0).await;
107 assert_eq!(offset, Some(100));
108
109 let missing = manager.get_offset("group1", "topic1", 1).await;
110 assert_eq!(missing, None);
111 }
112
113 #[tokio::test]
114 async fn test_reset_offsets() {
115 let manager = OffsetManager::new();
116
117 manager.commit_offset("group1", "topic1", 0, 100).await;
118 manager.reset_offsets("group1").await;
119
120 let offset = manager.get_offset("group1", "topic1", 0).await;
121 assert_eq!(offset, None);
122 }
123}