allsource_core/application/services/
consumer.rs1use dashmap::DashMap;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4
5#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct Consumer {
9 pub consumer_id: String,
10 pub event_type_filters: Vec<String>,
13 pub cursor_position: Option<u64>,
17}
18
19pub struct ConsumerRegistry {
24 consumers: Arc<DashMap<String, Consumer>>,
25}
26
27impl Default for ConsumerRegistry {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl ConsumerRegistry {
34 pub fn new() -> Self {
35 Self {
36 consumers: Arc::new(DashMap::new()),
37 }
38 }
39
40 pub fn register(&self, consumer_id: String, event_type_filters: Vec<String>) -> Consumer {
42 let consumer = self
43 .consumers
44 .entry(consumer_id.clone())
45 .or_insert_with(|| Consumer {
46 consumer_id: consumer_id.clone(),
47 event_type_filters: event_type_filters.clone(),
48 cursor_position: None,
49 });
50
51 let mut c = consumer.clone();
53 if c.event_type_filters != event_type_filters {
54 drop(consumer);
55 self.consumers.alter(&consumer_id, |_, mut existing| {
56 existing.event_type_filters = event_type_filters;
57 c = existing.clone();
58 existing
59 });
60 }
61
62 c
63 }
64
65 pub fn get(&self, consumer_id: &str) -> Option<Consumer> {
67 self.consumers.get(consumer_id).map(|c| c.clone())
68 }
69
70 pub fn get_or_create(&self, consumer_id: &str) -> Consumer {
72 self.consumers
73 .entry(consumer_id.to_string())
74 .or_insert_with(|| Consumer {
75 consumer_id: consumer_id.to_string(),
76 event_type_filters: vec![],
77 cursor_position: None,
78 })
79 .clone()
80 }
81
82 pub fn ack(&self, consumer_id: &str, position: u64, max_offset: u64) -> Result<(), String> {
85 if position > max_offset {
86 return Err(format!(
87 "Position {} is beyond the latest event offset {}",
88 position, max_offset
89 ));
90 }
91
92 let mut entry = self
93 .consumers
94 .entry(consumer_id.to_string())
95 .or_insert_with(|| Consumer {
96 consumer_id: consumer_id.to_string(),
97 event_type_filters: vec![],
98 cursor_position: None,
99 });
100
101 let current = entry.cursor_position.unwrap_or(0);
103 if position > current {
104 entry.cursor_position = Some(position);
105 }
106
107 Ok(())
108 }
109
110 pub fn restore(&self, consumer: Consumer) {
112 self.consumers
113 .insert(consumer.consumer_id.clone(), consumer);
114 }
115
116 pub fn matches_filters(event_type: &str, filters: &[String]) -> bool {
119 if filters.is_empty() {
120 return true;
121 }
122 filters.iter().any(|filter| {
123 if let Some(prefix) = filter.strip_suffix(".*") {
124 event_type.starts_with(prefix)
125 && event_type
126 .as_bytes()
127 .get(prefix.len())
128 .is_none_or(|&b| b == b'.')
129 } else {
130 event_type == filter
131 }
132 })
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[test]
141 fn test_register_and_get() {
142 let registry = ConsumerRegistry::new();
143 let c = registry.register("c1".into(), vec!["scheduler.*".into()]);
144 assert_eq!(c.consumer_id, "c1");
145 assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
146 assert_eq!(c.cursor_position, None);
147
148 let fetched = registry.get("c1").unwrap();
149 assert_eq!(fetched.consumer_id, "c1");
150 }
151
152 #[test]
153 fn test_get_or_create() {
154 let registry = ConsumerRegistry::new();
155 assert!(registry.get("c1").is_none());
156
157 let c = registry.get_or_create("c1");
158 assert_eq!(c.consumer_id, "c1");
159 assert!(c.event_type_filters.is_empty());
160
161 let c2 = registry.get_or_create("c1");
163 assert_eq!(c2.consumer_id, "c1");
164 }
165
166 #[test]
167 fn test_ack_advances_cursor() {
168 let registry = ConsumerRegistry::new();
169 registry.register("c1".into(), vec![]);
170
171 registry.ack("c1", 5, 10).unwrap();
172 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
173
174 registry.ack("c1", 8, 10).unwrap();
176 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(8));
177 }
178
179 #[test]
180 fn test_ack_idempotent_no_regression() {
181 let registry = ConsumerRegistry::new();
182 registry.register("c1".into(), vec![]);
183
184 registry.ack("c1", 5, 10).unwrap();
185 registry.ack("c1", 3, 10).unwrap();
187 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
188 }
189
190 #[test]
191 fn test_ack_beyond_max_fails() {
192 let registry = ConsumerRegistry::new();
193 registry.register("c1".into(), vec![]);
194
195 let result = registry.ack("c1", 15, 10);
196 assert!(result.is_err());
197 }
198
199 #[test]
200 fn test_ack_auto_creates_consumer() {
201 let registry = ConsumerRegistry::new();
202 registry.ack("c1", 5, 10).unwrap();
203 assert_eq!(registry.get("c1").unwrap().cursor_position, Some(5));
204 }
205
206 #[test]
207 fn test_matches_filters_empty() {
208 assert!(ConsumerRegistry::matches_filters("anything", &[]));
209 }
210
211 #[test]
212 fn test_matches_filters_prefix() {
213 let filters = vec!["scheduler.*".to_string()];
214 assert!(ConsumerRegistry::matches_filters(
215 "scheduler.started",
216 &filters
217 ));
218 assert!(ConsumerRegistry::matches_filters(
219 "scheduler.completed",
220 &filters
221 ));
222 assert!(!ConsumerRegistry::matches_filters(
223 "trade.executed",
224 &filters
225 ));
226 }
227
228 #[test]
229 fn test_matches_filters_exact() {
230 let filters = vec!["scheduler.started".to_string()];
231 assert!(ConsumerRegistry::matches_filters(
232 "scheduler.started",
233 &filters
234 ));
235 assert!(!ConsumerRegistry::matches_filters(
236 "scheduler.completed",
237 &filters
238 ));
239 }
240
241 #[test]
242 fn test_matches_filters_multiple() {
243 let filters = vec!["scheduler.*".to_string(), "index.*".to_string()];
244 assert!(ConsumerRegistry::matches_filters(
245 "scheduler.started",
246 &filters
247 ));
248 assert!(ConsumerRegistry::matches_filters(
249 "index.created",
250 &filters
251 ));
252 assert!(!ConsumerRegistry::matches_filters(
253 "trade.executed",
254 &filters
255 ));
256 }
257
258 #[test]
259 fn test_restore() {
260 let registry = ConsumerRegistry::new();
261 registry.restore(Consumer {
262 consumer_id: "c1".into(),
263 event_type_filters: vec!["scheduler.*".into()],
264 cursor_position: Some(42),
265 });
266
267 let c = registry.get("c1").unwrap();
268 assert_eq!(c.cursor_position, Some(42));
269 assert_eq!(c.event_type_filters, vec!["scheduler.*"]);
270 }
271}