Skip to main content

rivven_core/
offset.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::RwLock;
4
5/// Partition to offset mapping
6type PartitionOffsets = HashMap<u32, u64>;
7/// Topic to partition offsets mapping
8type TopicOffsets = HashMap<String, PartitionOffsets>;
9/// Consumer group to topic offsets mapping
10type GroupOffsets = HashMap<String, TopicOffsets>;
11
12/// Manages consumer offsets for topics and partitions
13#[derive(Debug, Clone)]
14pub struct OffsetManager {
15    /// Map of consumer_group -> topic -> partition -> offset
16    offsets: Arc<RwLock<GroupOffsets>>,
17}
18
19impl OffsetManager {
20    /// Create a new offset manager
21    pub fn new() -> Self {
22        Self {
23            offsets: Arc::new(RwLock::new(HashMap::new())),
24        }
25    }
26
27    /// Commit an offset for a consumer group
28    pub async fn commit_offset(
29        &self,
30        consumer_group: &str,
31        topic: &str,
32        partition: u32,
33        offset: u64,
34    ) {
35        let mut offsets = self.offsets.write().await;
36
37        offsets
38            .entry(consumer_group.to_string())
39            .or_insert_with(HashMap::new)
40            .entry(topic.to_string())
41            .or_insert_with(HashMap::new)
42            .insert(partition, offset);
43    }
44
45    /// Get the committed offset for a consumer group
46    pub async fn get_offset(
47        &self,
48        consumer_group: &str,
49        topic: &str,
50        partition: u32,
51    ) -> Option<u64> {
52        let offsets = self.offsets.read().await;
53
54        offsets
55            .get(consumer_group)
56            .and_then(|topics| topics.get(topic))
57            .and_then(|partitions| partitions.get(&partition))
58            .copied()
59    }
60
61    /// Reset offsets for a consumer group
62    pub async fn reset_offsets(&self, consumer_group: &str) {
63        let mut offsets = self.offsets.write().await;
64        offsets.remove(consumer_group);
65    }
66
67    /// List all consumer groups with committed offsets
68    pub async fn list_groups(&self) -> Vec<String> {
69        let offsets = self.offsets.read().await;
70        offsets.keys().cloned().collect()
71    }
72
73    /// Get all offsets for a consumer group
74    /// Returns: topic → partition → offset
75    pub async fn get_group_offsets(
76        &self,
77        consumer_group: &str,
78    ) -> Option<HashMap<String, HashMap<u32, u64>>> {
79        let offsets = self.offsets.read().await;
80        offsets.get(consumer_group).cloned()
81    }
82
83    /// Delete a consumer group and all its offsets
84    pub async fn delete_group(&self, consumer_group: &str) -> bool {
85        let mut offsets = self.offsets.write().await;
86        offsets.remove(consumer_group).is_some()
87    }
88}
89
90impl Default for OffsetManager {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[tokio::test]
101    async fn test_offset_management() {
102        let manager = OffsetManager::new();
103
104        manager.commit_offset("group1", "topic1", 0, 100).await;
105
106        let offset = manager.get_offset("group1", "topic1", 0).await;
107        assert_eq!(offset, Some(100));
108
109        let missing = manager.get_offset("group1", "topic1", 1).await;
110        assert_eq!(missing, None);
111    }
112
113    #[tokio::test]
114    async fn test_reset_offsets() {
115        let manager = OffsetManager::new();
116
117        manager.commit_offset("group1", "topic1", 0, 100).await;
118        manager.reset_offsets("group1").await;
119
120        let offset = manager.get_offset("group1", "topic1", 0).await;
121        assert_eq!(offset, None);
122    }
123}