Skip to main content

allsource_core/application/services/
consumer.rs

1use crate::{
2    domain::value_objects::system_stream::{SystemDomain, consumer_events, system_entity_id_value},
3    infrastructure::persistence::SystemMetadataStore,
4};
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::sync::Arc;
9
10/// A durable consumer tracks a cursor (last-acknowledged position) so it can
11/// resume from where it left off after disconnection or restart.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Consumer {
14    pub consumer_id: String,
15    /// Event type prefix filters (e.g. `["scheduler.*", "index.*"]`).
16    /// Empty means "all events".
17    pub event_type_filters: Vec<String>,
18    /// Global event offset of the last acknowledged event.
19    /// Events after this offset are "unprocessed" for this consumer.
20    /// `None` means the consumer hasn't acked anything yet (start from beginning).
21    pub cursor_position: Option<u64>,
22}
23
24/// Registry that manages durable consumers with persistent cursor positions.
25///
26/// Consumer state is stored as system events in the WAL pipeline so it
27/// survives Core restarts. In-memory state is held in a DashMap for O(1) access.
28///
29/// Two modes:
30/// - **In-memory** (`new()`): no persistence, for tests and backward compatibility
31/// - **Durable** (`new_durable()`): backed by `SystemMetadataStore`, survives restarts
32pub struct ConsumerRegistry {
33    consumers: Arc<DashMap<String, Consumer>>,
34    system_store: Option<Arc<SystemMetadataStore>>,
35}
36
37impl Default for ConsumerRegistry {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl ConsumerRegistry {
44    /// Create an in-memory-only consumer registry (no persistence).
45    pub fn new() -> Self {
46        Self {
47            consumers: Arc::new(DashMap::new()),
48            system_store: None,
49        }
50    }
51
52    /// Create a durable consumer registry backed by `SystemMetadataStore`.
53    ///
54    /// On construction, replays all `_system.consumer.*` events to rebuild
55    /// the in-memory cache from WAL.
56    pub fn new_durable(system_store: Arc<SystemMetadataStore>) -> Self {
57        let registry = Self {
58            consumers: Arc::new(DashMap::new()),
59            system_store: Some(system_store),
60        };
61        registry.rebuild_cache();
62        registry
63    }
64
65    /// Register a consumer (or update its filters if it already exists).
66    pub fn register(&self, consumer_id: &str, event_type_filters: &[String]) -> Consumer {
67        let consumer = self
68            .consumers
69            .entry(consumer_id.to_string())
70            .or_insert_with(|| Consumer {
71                consumer_id: consumer_id.to_string(),
72                event_type_filters: event_type_filters.to_vec(),
73                cursor_position: None,
74            });
75
76        // Update filters if consumer already existed
77        let mut c = consumer.clone();
78        if c.event_type_filters != event_type_filters {
79            drop(consumer);
80            self.consumers.alter(consumer_id, |_, mut existing| {
81                existing.event_type_filters = event_type_filters.to_vec();
82                c = existing.clone();
83                existing
84            });
85        }
86
87        // Persist registration event
88        if let Some(ref store) = self.system_store {
89            let entity_id = system_entity_id_value(SystemDomain::Consumer, consumer_id);
90            let payload = json!({
91                "consumer_id": consumer_id,
92                "event_type_filters": event_type_filters,
93            });
94            if let Err(e) =
95                store.append_system_event(consumer_events::REGISTERED, entity_id, payload, None)
96            {
97                tracing::warn!("Failed to persist consumer registration: {}", e);
98            }
99        }
100
101        c
102    }
103
104    /// Get a consumer by ID. Returns None if not registered.
105    pub fn get(&self, consumer_id: &str) -> Option<Consumer> {
106        self.consumers.get(consumer_id).map(|c| c.clone())
107    }
108
109    /// Get or implicitly create a consumer.
110    pub fn get_or_create(&self, consumer_id: &str) -> Consumer {
111        self.consumers
112            .entry(consumer_id.to_string())
113            .or_insert_with(|| Consumer {
114                consumer_id: consumer_id.to_string(),
115                event_type_filters: vec![],
116                cursor_position: None,
117            })
118            .clone()
119    }
120
121    /// Acknowledge events up to a given global offset.
122    /// Returns Ok(()) on success, Err if the position is beyond the max offset.
123    pub fn ack(&self, consumer_id: &str, position: u64, max_offset: u64) -> Result<(), String> {
124        if position > max_offset {
125            return Err(format!(
126                "Position {position} is beyond the latest event offset {max_offset}"
127            ));
128        }
129
130        let mut entry = self
131            .consumers
132            .entry(consumer_id.to_string())
133            .or_insert_with(|| Consumer {
134                consumer_id: consumer_id.to_string(),
135                event_type_filters: vec![],
136                cursor_position: None,
137            });
138
139        // Only advance the cursor (idempotent: acking an older position is a no-op)
140        let current = entry.cursor_position.unwrap_or(0);
141        if position > current {
142            entry.cursor_position = Some(position);
143
144            // Persist ack event
145            if let Some(ref store) = self.system_store {
146                let entity_id = system_entity_id_value(SystemDomain::Consumer, consumer_id);
147                let payload = json!({
148                    "consumer_id": consumer_id,
149                    "position": position,
150                });
151                if let Err(e) = store.append_system_event(
152                    consumer_events::ACK_UPDATED,
153                    entity_id,
154                    payload,
155                    None,
156                ) {
157                    tracing::warn!("Failed to persist consumer ack: {}", e);
158                }
159            }
160        }
161
162        Ok(())
163    }
164
165    /// Restore consumer state (called during WAL recovery).
166    pub fn restore(&self, consumer: Consumer) {
167        self.consumers
168            .insert(consumer.consumer_id.clone(), consumer);
169    }
170
171    /// Number of registered consumers.
172    pub fn count(&self) -> usize {
173        self.consumers.len()
174    }
175
176    /// Rebuild in-memory cache from system store events.
177    fn rebuild_cache(&self) {
178        let Some(ref store) = self.system_store else {
179            return;
180        };
181        let events = store.read_stream(SystemDomain::Consumer);
182        for event in &events {
183            self.apply_event(event);
184        }
185        tracing::info!(
186            "Rebuilt consumer cache: {} consumers from {} events",
187            self.consumers.len(),
188            events.len()
189        );
190    }
191
192    /// Apply a single system event to the in-memory cache.
193    fn apply_event(&self, event: &crate::domain::entities::Event) {
194        let event_type = event.event_type_str();
195        let payload = event.payload();
196
197        match event_type {
198            consumer_events::REGISTERED => {
199                let consumer_id = payload
200                    .get("consumer_id")
201                    .and_then(|v| v.as_str())
202                    .unwrap_or_default()
203                    .to_string();
204                let event_type_filters: Vec<String> = payload
205                    .get("event_type_filters")
206                    .and_then(|v| v.as_array())
207                    .map(|arr| {
208                        arr.iter()
209                            .filter_map(|v| v.as_str().map(String::from))
210                            .collect()
211                    })
212                    .unwrap_or_default();
213
214                if !consumer_id.is_empty() {
215                    self.consumers.insert(
216                        consumer_id.clone(),
217                        Consumer {
218                            consumer_id,
219                            event_type_filters,
220                            cursor_position: None,
221                        },
222                    );
223                }
224            }
225            consumer_events::ACK_UPDATED => {
226                let consumer_id = payload
227                    .get("consumer_id")
228                    .and_then(|v| v.as_str())
229                    .unwrap_or_default()
230                    .to_string();
231                let position = payload.get("position").and_then(serde_json::Value::as_u64);
232
233                if !consumer_id.is_empty() {
234                    self.consumers
235                        .entry(consumer_id.clone())
236                        .and_modify(|c| {
237                            // Only advance — never regress
238                            if let Some(pos) = position {
239                                let current = c.cursor_position.unwrap_or(0);
240                                if pos > current {
241                                    c.cursor_position = Some(pos);
242                                }
243                            }
244                        })
245                        .or_insert_with(|| Consumer {
246                            consumer_id,
247                            event_type_filters: vec![],
248                            cursor_position: position,
249                        });
250                }
251            }
252            consumer_events::DELETED => {
253                let consumer_id = payload
254                    .get("consumer_id")
255                    .and_then(|v| v.as_str())
256                    .unwrap_or_default();
257                if !consumer_id.is_empty() {
258                    self.consumers.remove(consumer_id);
259                }
260            }
261            _ => {}
262        }
263    }
264
265    /// Check if an event type matches a consumer's filters.
266    /// Empty filters = match all.
267    pub fn matches_filters(event_type: &str, filters: &[String]) -> bool {
268        if filters.is_empty() {
269            return true;
270        }
271        filters.iter().any(|filter| {
272            if let Some(prefix) = filter.strip_suffix(".*") {
273                event_type.starts_with(prefix)
274                    && event_type
275                        .as_bytes()
276                        .get(prefix.len())
277                        .is_none_or(|&b| b == b'.')
278            } else {
279                event_type == filter
280            }
281        })
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn test_register_and_get() {
291        let registry = ConsumerRegistry::new();
292        let c = registry.register("c1", &["scheduler.*".into()]);
293        assert_eq!(c.consumer_id, "c1");
294        assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
295        assert_eq!(c.cursor_position, None);
296
297        let fetched = registry.get("c1").unwrap();
298        assert_eq!(fetched.consumer_id, "c1");
299    }
300
301    #[test]
302    fn test_get_or_create() {
303        let registry = ConsumerRegistry::new();
304        assert!(registry.get("c1").is_none());
305
306        let c = registry.get_or_create("c1");
307        assert_eq!(c.consumer_id, "c1");
308        assert!(c.event_type_filters.is_empty());
309
310        // Second call returns same consumer
311        let c2 = registry.get_or_create("c1");
312        assert_eq!(c2.consumer_id, "c1");
313    }
314
315    #[test]
316    fn test_ack_advances_cursor() {
317        let registry = ConsumerRegistry::new();
318        registry.register("c1", &[]);
319
320        registry.ack("c1", 5, 10).unwrap();
321        assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
322
323        // Advance further
324        registry.ack("c1", 8, 10).unwrap();
325        assert_eq!(registry.get("c1").unwrap().cursor_position, Some(8));
326    }
327
328    #[test]
329    fn test_ack_idempotent_no_regression() {
330        let registry = ConsumerRegistry::new();
331        registry.register("c1", &[]);
332
333        registry.ack("c1", 5, 10).unwrap();
334        // Acking an earlier position is a no-op
335        registry.ack("c1", 3, 10).unwrap();
336        assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
337    }
338
339    #[test]
340    fn test_ack_beyond_max_fails() {
341        let registry = ConsumerRegistry::new();
342        registry.register("c1", &[]);
343
344        let result = registry.ack("c1", 15, 10);
345        assert!(result.is_err());
346    }
347
348    #[test]
349    fn test_ack_auto_creates_consumer() {
350        let registry = ConsumerRegistry::new();
351        registry.ack("c1", 5, 10).unwrap();
352        assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
353    }
354
355    #[test]
356    fn test_matches_filters_empty() {
357        assert!(ConsumerRegistry::matches_filters("anything", &[]));
358    }
359
360    #[test]
361    fn test_matches_filters_prefix() {
362        let filters = vec!["scheduler.*".to_string()];
363        assert!(ConsumerRegistry::matches_filters(
364            "scheduler.started",
365            &filters
366        ));
367        assert!(ConsumerRegistry::matches_filters(
368            "scheduler.completed",
369            &filters
370        ));
371        assert!(!ConsumerRegistry::matches_filters(
372            "trade.executed",
373            &filters
374        ));
375    }
376
377    #[test]
378    fn test_matches_filters_exact() {
379        let filters = vec!["scheduler.started".to_string()];
380        assert!(ConsumerRegistry::matches_filters(
381            "scheduler.started",
382            &filters
383        ));
384        assert!(!ConsumerRegistry::matches_filters(
385            "scheduler.completed",
386            &filters
387        ));
388    }
389
390    #[test]
391    fn test_matches_filters_multiple() {
392        let filters = vec!["scheduler.*".to_string(), "index.*".to_string()];
393        assert!(ConsumerRegistry::matches_filters(
394            "scheduler.started",
395            &filters
396        ));
397        assert!(ConsumerRegistry::matches_filters("index.created", &filters));
398        assert!(!ConsumerRegistry::matches_filters(
399            "trade.executed",
400            &filters
401        ));
402    }
403
404    #[test]
405    fn test_restore() {
406        let registry = ConsumerRegistry::new();
407        registry.restore(Consumer {
408            consumer_id: "c1".into(),
409            event_type_filters: vec!["scheduler.*".into()],
410            cursor_position: Some(42),
411        });
412
413        let c = registry.get("c1").unwrap();
414        assert_eq!(c.cursor_position, Some(42));
415        assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
416    }
417
418    #[test]
419    fn test_count() {
420        let registry = ConsumerRegistry::new();
421        assert_eq!(registry.count(), 0);
422        registry.register("c1", &[]);
423        assert_eq!(registry.count(), 1);
424        registry.register("c2", &[]);
425        assert_eq!(registry.count(), 2);
426    }
427
428    #[test]
429    fn test_in_memory_mode_still_works() {
430        // Ensure new() works without a system store
431        let registry = ConsumerRegistry::new();
432        registry.register("c1", &["trade.*".into()]);
433        registry.ack("c1", 10, 100).unwrap();
434        let c = registry.get("c1").unwrap();
435        assert_eq!(c.cursor_position, Some(10));
436        assert_eq!(c.event_type_filters, vec!["trade.*"]);
437    }
438
439    #[test]
440    fn test_durable_register_persists() {
441        let temp_dir = tempfile::TempDir::new().unwrap();
442        let store = Arc::new(SystemMetadataStore::new(temp_dir.path()).unwrap());
443
444        // Register a consumer with durable registry
445        {
446            let registry = ConsumerRegistry::new_durable(store.clone());
447            registry.register("c1", &["scheduler.*".into()]);
448            assert_eq!(registry.count(), 1);
449        }
450
451        // Recreate from same store — should recover consumer
452        {
453            let registry = ConsumerRegistry::new_durable(store.clone());
454            assert_eq!(registry.count(), 1);
455            let c = registry.get("c1").unwrap();
456            assert_eq!(c.consumer_id, "c1");
457            assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
458            assert_eq!(c.cursor_position, None);
459        }
460    }
461
462    #[test]
463    fn test_durable_ack_persists() {
464        let temp_dir = tempfile::TempDir::new().unwrap();
465        let store = Arc::new(SystemMetadataStore::new(temp_dir.path()).unwrap());
466
467        // Register + ack
468        {
469            let registry = ConsumerRegistry::new_durable(store.clone());
470            registry.register("c1", &[]);
471            registry.ack("c1", 42, 100).unwrap();
472            assert_eq!(registry.get("c1").unwrap().cursor_position, Some(42));
473        }
474
475        // Recreate — cursor should survive
476        {
477            let registry = ConsumerRegistry::new_durable(store.clone());
478            let c = registry.get("c1").unwrap();
479            assert_eq!(c.cursor_position, Some(42));
480        }
481    }
482
483    #[test]
484    fn test_durable_recovery_multiple_acks() {
485        let temp_dir = tempfile::TempDir::new().unwrap();
486        let store = Arc::new(SystemMetadataStore::new(temp_dir.path()).unwrap());
487
488        // Ack multiple times
489        {
490            let registry = ConsumerRegistry::new_durable(store.clone());
491            registry.register("c1", &[]);
492            registry.ack("c1", 10, 100).unwrap();
493            registry.ack("c1", 25, 100).unwrap();
494            registry.ack("c1", 50, 100).unwrap();
495        }
496
497        // Only latest position should be recovered
498        {
499            let registry = ConsumerRegistry::new_durable(store.clone());
500            let c = registry.get("c1").unwrap();
501            assert_eq!(c.cursor_position, Some(50));
502        }
503    }
504}