allsource_core/application/services/
consumer.rs1use crate::{
2 domain::value_objects::system_stream::{SystemDomain, consumer_events, system_entity_id_value},
3 infrastructure::persistence::SystemMetadataStore,
4};
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Consumer {
14 pub consumer_id: String,
15 pub event_type_filters: Vec<String>,
18 pub cursor_position: Option<u64>,
22}
23
24pub struct ConsumerRegistry {
33 consumers: Arc<DashMap<String, Consumer>>,
34 system_store: Option<Arc<SystemMetadataStore>>,
35}
36
37impl Default for ConsumerRegistry {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl ConsumerRegistry {
44 pub fn new() -> Self {
46 Self {
47 consumers: Arc::new(DashMap::new()),
48 system_store: None,
49 }
50 }
51
52 pub fn new_durable(system_store: Arc<SystemMetadataStore>) -> Self {
57 let registry = Self {
58 consumers: Arc::new(DashMap::new()),
59 system_store: Some(system_store),
60 };
61 registry.rebuild_cache();
62 registry
63 }
64
65 pub fn register(&self, consumer_id: &str, event_type_filters: &[String]) -> Consumer {
67 let consumer = self
68 .consumers
69 .entry(consumer_id.to_string())
70 .or_insert_with(|| Consumer {
71 consumer_id: consumer_id.to_string(),
72 event_type_filters: event_type_filters.to_vec(),
73 cursor_position: None,
74 });
75
76 let mut c = consumer.clone();
78 if c.event_type_filters != event_type_filters {
79 drop(consumer);
80 self.consumers.alter(consumer_id, |_, mut existing| {
81 existing.event_type_filters = event_type_filters.to_vec();
82 c = existing.clone();
83 existing
84 });
85 }
86
87 if let Some(ref store) = self.system_store {
89 let entity_id = system_entity_id_value(SystemDomain::Consumer, consumer_id);
90 let payload = json!({
91 "consumer_id": consumer_id,
92 "event_type_filters": event_type_filters,
93 });
94 if let Err(e) =
95 store.append_system_event(consumer_events::REGISTERED, entity_id, payload, None)
96 {
97 tracing::warn!("Failed to persist consumer registration: {}", e);
98 }
99 }
100
101 c
102 }
103
104 pub fn get(&self, consumer_id: &str) -> Option<Consumer> {
106 self.consumers.get(consumer_id).map(|c| c.clone())
107 }
108
109 pub fn get_or_create(&self, consumer_id: &str) -> Consumer {
111 self.consumers
112 .entry(consumer_id.to_string())
113 .or_insert_with(|| Consumer {
114 consumer_id: consumer_id.to_string(),
115 event_type_filters: vec![],
116 cursor_position: None,
117 })
118 .clone()
119 }
120
121 pub fn ack(&self, consumer_id: &str, position: u64, max_offset: u64) -> Result<(), String> {
124 if position > max_offset {
125 return Err(format!(
126 "Position {position} is beyond the latest event offset {max_offset}"
127 ));
128 }
129
130 let mut entry = self
131 .consumers
132 .entry(consumer_id.to_string())
133 .or_insert_with(|| Consumer {
134 consumer_id: consumer_id.to_string(),
135 event_type_filters: vec![],
136 cursor_position: None,
137 });
138
139 let current = entry.cursor_position.unwrap_or(0);
141 if position > current {
142 entry.cursor_position = Some(position);
143
144 if let Some(ref store) = self.system_store {
146 let entity_id = system_entity_id_value(SystemDomain::Consumer, consumer_id);
147 let payload = json!({
148 "consumer_id": consumer_id,
149 "position": position,
150 });
151 if let Err(e) = store.append_system_event(
152 consumer_events::ACK_UPDATED,
153 entity_id,
154 payload,
155 None,
156 ) {
157 tracing::warn!("Failed to persist consumer ack: {}", e);
158 }
159 }
160 }
161
162 Ok(())
163 }
164
165 pub fn restore(&self, consumer: Consumer) {
167 self.consumers
168 .insert(consumer.consumer_id.clone(), consumer);
169 }
170
171 pub fn count(&self) -> usize {
173 self.consumers.len()
174 }
175
176 fn rebuild_cache(&self) {
178 let Some(ref store) = self.system_store else {
179 return;
180 };
181 let events = store.read_stream(SystemDomain::Consumer);
182 for event in &events {
183 self.apply_event(event);
184 }
185 tracing::info!(
186 "Rebuilt consumer cache: {} consumers from {} events",
187 self.consumers.len(),
188 events.len()
189 );
190 }
191
192 fn apply_event(&self, event: &crate::domain::entities::Event) {
194 let event_type = event.event_type_str();
195 let payload = event.payload();
196
197 match event_type {
198 consumer_events::REGISTERED => {
199 let consumer_id = payload
200 .get("consumer_id")
201 .and_then(|v| v.as_str())
202 .unwrap_or_default()
203 .to_string();
204 let event_type_filters: Vec<String> = payload
205 .get("event_type_filters")
206 .and_then(|v| v.as_array())
207 .map(|arr| {
208 arr.iter()
209 .filter_map(|v| v.as_str().map(String::from))
210 .collect()
211 })
212 .unwrap_or_default();
213
214 if !consumer_id.is_empty() {
215 self.consumers.insert(
216 consumer_id.clone(),
217 Consumer {
218 consumer_id,
219 event_type_filters,
220 cursor_position: None,
221 },
222 );
223 }
224 }
225 consumer_events::ACK_UPDATED => {
226 let consumer_id = payload
227 .get("consumer_id")
228 .and_then(|v| v.as_str())
229 .unwrap_or_default()
230 .to_string();
231 let position = payload.get("position").and_then(serde_json::Value::as_u64);
232
233 if !consumer_id.is_empty() {
234 self.consumers
235 .entry(consumer_id.clone())
236 .and_modify(|c| {
237 if let Some(pos) = position {
239 let current = c.cursor_position.unwrap_or(0);
240 if pos > current {
241 c.cursor_position = Some(pos);
242 }
243 }
244 })
245 .or_insert_with(|| Consumer {
246 consumer_id,
247 event_type_filters: vec![],
248 cursor_position: position,
249 });
250 }
251 }
252 consumer_events::DELETED => {
253 let consumer_id = payload
254 .get("consumer_id")
255 .and_then(|v| v.as_str())
256 .unwrap_or_default();
257 if !consumer_id.is_empty() {
258 self.consumers.remove(consumer_id);
259 }
260 }
261 _ => {}
262 }
263 }
264
265 pub fn matches_filters(event_type: &str, filters: &[String]) -> bool {
268 if filters.is_empty() {
269 return true;
270 }
271 filters.iter().any(|filter| {
272 if let Some(prefix) = filter.strip_suffix(".*") {
273 event_type.starts_with(prefix)
274 && event_type
275 .as_bytes()
276 .get(prefix.len())
277 .is_none_or(|&b| b == b'.')
278 } else {
279 event_type == filter
280 }
281 })
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn test_register_and_get() {
291 let registry = ConsumerRegistry::new();
292 let c = registry.register("c1", &["scheduler.*".into()]);
293 assert_eq!(c.consumer_id, "c1");
294 assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
295 assert_eq!(c.cursor_position, None);
296
297 let fetched = registry.get("c1").unwrap();
298 assert_eq!(fetched.consumer_id, "c1");
299 }
300
301 #[test]
302 fn test_get_or_create() {
303 let registry = ConsumerRegistry::new();
304 assert!(registry.get("c1").is_none());
305
306 let c = registry.get_or_create("c1");
307 assert_eq!(c.consumer_id, "c1");
308 assert!(c.event_type_filters.is_empty());
309
310 let c2 = registry.get_or_create("c1");
312 assert_eq!(c2.consumer_id, "c1");
313 }
314
315 #[test]
316 fn test_ack_advances_cursor() {
317 let registry = ConsumerRegistry::new();
318 registry.register("c1", &[]);
319
320 registry.ack("c1", 5, 10).unwrap();
321 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
322
323 registry.ack("c1", 8, 10).unwrap();
325 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(8));
326 }
327
328 #[test]
329 fn test_ack_idempotent_no_regression() {
330 let registry = ConsumerRegistry::new();
331 registry.register("c1", &[]);
332
333 registry.ack("c1", 5, 10).unwrap();
334 registry.ack("c1", 3, 10).unwrap();
336 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
337 }
338
339 #[test]
340 fn test_ack_beyond_max_fails() {
341 let registry = ConsumerRegistry::new();
342 registry.register("c1", &[]);
343
344 let result = registry.ack("c1", 15, 10);
345 assert!(result.is_err());
346 }
347
348 #[test]
349 fn test_ack_auto_creates_consumer() {
350 let registry = ConsumerRegistry::new();
351 registry.ack("c1", 5, 10).unwrap();
352 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
353 }
354
355 #[test]
356 fn test_matches_filters_empty() {
357 assert!(ConsumerRegistry::matches_filters("anything", &[]));
358 }
359
360 #[test]
361 fn test_matches_filters_prefix() {
362 let filters = vec!["scheduler.*".to_string()];
363 assert!(ConsumerRegistry::matches_filters(
364 "scheduler.started",
365 &filters
366 ));
367 assert!(ConsumerRegistry::matches_filters(
368 "scheduler.completed",
369 &filters
370 ));
371 assert!(!ConsumerRegistry::matches_filters(
372 "trade.executed",
373 &filters
374 ));
375 }
376
377 #[test]
378 fn test_matches_filters_exact() {
379 let filters = vec!["scheduler.started".to_string()];
380 assert!(ConsumerRegistry::matches_filters(
381 "scheduler.started",
382 &filters
383 ));
384 assert!(!ConsumerRegistry::matches_filters(
385 "scheduler.completed",
386 &filters
387 ));
388 }
389
390 #[test]
391 fn test_matches_filters_multiple() {
392 let filters = vec!["scheduler.*".to_string(), "index.*".to_string()];
393 assert!(ConsumerRegistry::matches_filters(
394 "scheduler.started",
395 &filters
396 ));
397 assert!(ConsumerRegistry::matches_filters("index.created", &filters));
398 assert!(!ConsumerRegistry::matches_filters(
399 "trade.executed",
400 &filters
401 ));
402 }
403
404 #[test]
405 fn test_restore() {
406 let registry = ConsumerRegistry::new();
407 registry.restore(Consumer {
408 consumer_id: "c1".into(),
409 event_type_filters: vec!["scheduler.*".into()],
410 cursor_position: Some(42),
411 });
412
413 let c = registry.get("c1").unwrap();
414 assert_eq!(c.cursor_position, Some(42));
415 assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
416 }
417
418 #[test]
419 fn test_count() {
420 let registry = ConsumerRegistry::new();
421 assert_eq!(registry.count(), 0);
422 registry.register("c1", &[]);
423 assert_eq!(registry.count(), 1);
424 registry.register("c2", &[]);
425 assert_eq!(registry.count(), 2);
426 }
427
428 #[test]
429 fn test_in_memory_mode_still_works() {
430 let registry = ConsumerRegistry::new();
432 registry.register("c1", &["trade.*".into()]);
433 registry.ack("c1", 10, 100).unwrap();
434 let c = registry.get("c1").unwrap();
435 assert_eq!(c.cursor_position, Some(10));
436 assert_eq!(c.event_type_filters, vec!["trade.*"]);
437 }
438
439 #[test]
440 fn test_durable_register_persists() {
441 let temp_dir = tempfile::TempDir::new().unwrap();
442 let store = Arc::new(SystemMetadataStore::new(temp_dir.path()).unwrap());
443
444 {
446 let registry = ConsumerRegistry::new_durable(store.clone());
447 registry.register("c1", &["scheduler.*".into()]);
448 assert_eq!(registry.count(), 1);
449 }
450
451 {
453 let registry = ConsumerRegistry::new_durable(store.clone());
454 assert_eq!(registry.count(), 1);
455 let c = registry.get("c1").unwrap();
456 assert_eq!(c.consumer_id, "c1");
457 assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
458 assert_eq!(c.cursor_position, None);
459 }
460 }
461
462 #[test]
463 fn test_durable_ack_persists() {
464 let temp_dir = tempfile::TempDir::new().unwrap();
465 let store = Arc::new(SystemMetadataStore::new(temp_dir.path()).unwrap());
466
467 {
469 let registry = ConsumerRegistry::new_durable(store.clone());
470 registry.register("c1", &[]);
471 registry.ack("c1", 42, 100).unwrap();
472 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(42));
473 }
474
475 {
477 let registry = ConsumerRegistry::new_durable(store.clone());
478 let c = registry.get("c1").unwrap();
479 assert_eq!(c.cursor_position, Some(42));
480 }
481 }
482
483 #[test]
484 fn test_durable_recovery_multiple_acks() {
485 let temp_dir = tempfile::TempDir::new().unwrap();
486 let store = Arc::new(SystemMetadataStore::new(temp_dir.path()).unwrap());
487
488 {
490 let registry = ConsumerRegistry::new_durable(store.clone());
491 registry.register("c1", &[]);
492 registry.ack("c1", 10, 100).unwrap();
493 registry.ack("c1", 25, 100).unwrap();
494 registry.ack("c1", 50, 100).unwrap();
495 }
496
497 {
499 let registry = ConsumerRegistry::new_durable(store.clone());
500 let c = registry.get("c1").unwrap();
501 assert_eq!(c.cursor_position, Some(50));
502 }
503 }
504}