Skip to main content

allsource_core/application/services/
consumer.rs

1use dashmap::DashMap;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4
5/// A durable consumer tracks a cursor (last-acknowledged position) so it can
6/// resume from where it left off after disconnection or restart.
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct Consumer {
9    pub consumer_id: String,
10    /// Event type prefix filters (e.g. ["scheduler.*", "index.*"]).
11    /// Empty means "all events".
12    pub event_type_filters: Vec<String>,
13    /// Global event offset of the last acknowledged event.
14    /// Events after this offset are "unprocessed" for this consumer.
15    /// `None` means the consumer hasn't acked anything yet (start from beginning).
16    pub cursor_position: Option<u64>,
17}
18
19/// Registry that manages durable consumers with persistent cursor positions.
20///
21/// Consumer state is stored as system events in the WAL pipeline so it
22/// survives Core restarts. In-memory state is held in a DashMap for O(1) access.
23pub struct ConsumerRegistry {
24    consumers: Arc<DashMap<String, Consumer>>,
25}
26
27impl Default for ConsumerRegistry {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl ConsumerRegistry {
34    pub fn new() -> Self {
35        Self {
36            consumers: Arc::new(DashMap::new()),
37        }
38    }
39
40    /// Register a consumer (or update its filters if it already exists).
41    pub fn register(&self, consumer_id: String, event_type_filters: Vec<String>) -> Consumer {
42        let consumer = self
43            .consumers
44            .entry(consumer_id.clone())
45            .or_insert_with(|| Consumer {
46                consumer_id: consumer_id.clone(),
47                event_type_filters: event_type_filters.clone(),
48                cursor_position: None,
49            });
50
51        // Update filters if consumer already existed
52        let mut c = consumer.clone();
53        if c.event_type_filters != event_type_filters {
54            drop(consumer);
55            self.consumers.alter(&consumer_id, |_, mut existing| {
56                existing.event_type_filters = event_type_filters;
57                c = existing.clone();
58                existing
59            });
60        }
61
62        c
63    }
64
65    /// Get a consumer by ID. Returns None if not registered.
66    pub fn get(&self, consumer_id: &str) -> Option<Consumer> {
67        self.consumers.get(consumer_id).map(|c| c.clone())
68    }
69
70    /// Get or implicitly create a consumer.
71    pub fn get_or_create(&self, consumer_id: &str) -> Consumer {
72        self.consumers
73            .entry(consumer_id.to_string())
74            .or_insert_with(|| Consumer {
75                consumer_id: consumer_id.to_string(),
76                event_type_filters: vec![],
77                cursor_position: None,
78            })
79            .clone()
80    }
81
82    /// Acknowledge events up to a given global offset.
83    /// Returns Ok(()) on success, Err if the position is beyond the max offset.
84    pub fn ack(&self, consumer_id: &str, position: u64, max_offset: u64) -> Result<(), String> {
85        if position > max_offset {
86            return Err(format!(
87                "Position {} is beyond the latest event offset {}",
88                position, max_offset
89            ));
90        }
91
92        let mut entry = self
93            .consumers
94            .entry(consumer_id.to_string())
95            .or_insert_with(|| Consumer {
96                consumer_id: consumer_id.to_string(),
97                event_type_filters: vec![],
98                cursor_position: None,
99            });
100
101        // Only advance the cursor (idempotent: acking an older position is a no-op)
102        let current = entry.cursor_position.unwrap_or(0);
103        if position > current {
104            entry.cursor_position = Some(position);
105        }
106
107        Ok(())
108    }
109
110    /// Restore consumer state (called during WAL recovery).
111    pub fn restore(&self, consumer: Consumer) {
112        self.consumers
113            .insert(consumer.consumer_id.clone(), consumer);
114    }
115
116    /// Check if an event type matches a consumer's filters.
117    /// Empty filters = match all.
118    pub fn matches_filters(event_type: &str, filters: &[String]) -> bool {
119        if filters.is_empty() {
120            return true;
121        }
122        filters.iter().any(|filter| {
123            if let Some(prefix) = filter.strip_suffix(".*") {
124                event_type.starts_with(prefix)
125                    && event_type
126                        .as_bytes()
127                        .get(prefix.len())
128                        .is_none_or(|&b| b == b'.')
129            } else {
130                event_type == filter
131            }
132        })
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn test_register_and_get() {
142        let registry = ConsumerRegistry::new();
143        let c = registry.register("c1".into(), vec!["scheduler.*".into()]);
144        assert_eq!(c.consumer_id, "c1");
145        assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
146        assert_eq!(c.cursor_position, None);
147
148        let fetched = registry.get("c1").unwrap();
149        assert_eq!(fetched.consumer_id, "c1");
150    }
151
152    #[test]
153    fn test_get_or_create() {
154        let registry = ConsumerRegistry::new();
155        assert!(registry.get("c1").is_none());
156
157        let c = registry.get_or_create("c1");
158        assert_eq!(c.consumer_id, "c1");
159        assert!(c.event_type_filters.is_empty());
160
161        // Second call returns same consumer
162        let c2 = registry.get_or_create("c1");
163        assert_eq!(c2.consumer_id, "c1");
164    }
165
166    #[test]
167    fn test_ack_advances_cursor() {
168        let registry = ConsumerRegistry::new();
169        registry.register("c1".into(), vec![]);
170
171        registry.ack("c1", 5, 10).unwrap();
172        assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
173
174        // Advance further
175        registry.ack("c1", 8, 10).unwrap();
176        assert_eq!(registry.get("c1").unwrap().cursor_position, Some(8));
177    }
178
179    #[test]
180    fn test_ack_idempotent_no_regression() {
181        let registry = ConsumerRegistry::new();
182        registry.register("c1".into(), vec![]);
183
184        registry.ack("c1", 5, 10).unwrap();
185        // Acking an earlier position is a no-op
186        registry.ack("c1", 3, 10).unwrap();
187        assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
188    }
189
190    #[test]
191    fn test_ack_beyond_max_fails() {
192        let registry = ConsumerRegistry::new();
193        registry.register("c1".into(), vec![]);
194
195        let result = registry.ack("c1", 15, 10);
196        assert!(result.is_err());
197    }
198
199    #[test]
200    fn test_ack_auto_creates_consumer() {
201        let registry = ConsumerRegistry::new();
202        registry.ack("c1", 5, 10).unwrap();
203        assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
204    }
205
206    #[test]
207    fn test_matches_filters_empty() {
208        assert!(ConsumerRegistry::matches_filters("anything", &[]));
209    }
210
211    #[test]
212    fn test_matches_filters_prefix() {
213        let filters = vec!["scheduler.*".to_string()];
214        assert!(ConsumerRegistry::matches_filters(
215            "scheduler.started",
216            &filters
217        ));
218        assert!(ConsumerRegistry::matches_filters(
219            "scheduler.completed",
220            &filters
221        ));
222        assert!(!ConsumerRegistry::matches_filters(
223            "trade.executed",
224            &filters
225        ));
226    }
227
228    #[test]
229    fn test_matches_filters_exact() {
230        let filters = vec!["scheduler.started".to_string()];
231        assert!(ConsumerRegistry::matches_filters(
232            "scheduler.started",
233            &filters
234        ));
235        assert!(!ConsumerRegistry::matches_filters(
236            "scheduler.completed",
237            &filters
238        ));
239    }
240
241    #[test]
242    fn test_matches_filters_multiple() {
243        let filters = vec!["scheduler.*".to_string(), "index.*".to_string()];
244        assert!(ConsumerRegistry::matches_filters(
245            "scheduler.started",
246            &filters
247        ));
248        assert!(ConsumerRegistry::matches_filters(
249            "index.created",
250            &filters
251        ));
252        assert!(!ConsumerRegistry::matches_filters(
253            "trade.executed",
254            &filters
255        ));
256    }
257
258    #[test]
259    fn test_restore() {
260        let registry = ConsumerRegistry::new();
261        registry.restore(Consumer {
262            consumer_id: "c1".into(),
263            event_type_filters: vec!["scheduler.*".into()],
264            cursor_position: Some(42),
265        });
266
267        let c = registry.get("c1").unwrap();
268        assert_eq!(c.cursor_position, Some(42));
269        assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
270    }
271}