allsource_core/application/services/
exactly_once.rs1use chrono::{DateTime, Duration, Utc};
24use dashmap::DashMap;
25use serde::{Deserialize, Serialize};
26use uuid::Uuid;
27
28#[derive(Debug, Clone)]
30pub struct ExactlyOnceConfig {
31 pub idempotency_ttl: Duration,
33 pub max_idempotency_keys: usize,
35}
36
37impl Default for ExactlyOnceConfig {
38 fn default() -> Self {
39 Self {
40 idempotency_ttl: Duration::hours(24),
41 max_idempotency_keys: 1_000_000,
42 }
43 }
44}
45
46#[derive(Debug, Clone, Serialize)]
48struct IdempotencyRecord {
49 event_id: Uuid,
51 created_at: DateTime<Utc>,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct ConsumerOffset {
58 pub pipeline_id: String,
60 pub stream_id: String,
62 pub offset: usize,
64 pub last_event_id: Uuid,
66 pub updated_at: DateTime<Utc>,
68}
69
70#[derive(Debug, Clone, Hash, Eq, PartialEq)]
72struct ProcessingReceipt {
73 pipeline_id: String,
74 event_id: Uuid,
75}
76
77#[derive(Debug, Clone, Serialize)]
79pub enum IdempotencyResult {
80 New,
82 Duplicate { original_event_id: Uuid },
84}
85
86pub struct ExactlyOnceRegistry {
88 config: ExactlyOnceConfig,
89 idempotency_keys: DashMap<String, IdempotencyRecord>,
91 consumer_offsets: DashMap<(String, String), ConsumerOffset>,
93 processing_receipts: DashMap<ProcessingReceipt, DateTime<Utc>>,
95}
96
97impl ExactlyOnceRegistry {
98 pub fn new(config: ExactlyOnceConfig) -> Self {
99 Self {
100 config,
101 idempotency_keys: DashMap::new(),
102 consumer_offsets: DashMap::new(),
103 processing_receipts: DashMap::new(),
104 }
105 }
106
107 pub fn check_idempotency(&self, key: &str, event_id: Uuid) -> IdempotencyResult {
112 let now = Utc::now();
113
114 if let Some(existing) = self.idempotency_keys.get(key) {
116 let age = now - existing.created_at;
117 if age < self.config.idempotency_ttl {
118 return IdempotencyResult::Duplicate {
119 original_event_id: existing.event_id,
120 };
121 }
122 drop(existing);
124 self.idempotency_keys.remove(key);
125 }
126
127 if self.idempotency_keys.len() >= self.config.max_idempotency_keys {
129 self.evict_expired_keys();
130 }
131
132 self.idempotency_keys.insert(
134 key.to_string(),
135 IdempotencyRecord {
136 event_id,
137 created_at: now,
138 },
139 );
140
141 IdempotencyResult::New
142 }
143
144 pub fn commit_offset(&self, pipeline_id: &str, stream_id: &str, offset: usize, event_id: Uuid) {
146 let key = (pipeline_id.to_string(), stream_id.to_string());
147 self.consumer_offsets.insert(
148 key,
149 ConsumerOffset {
150 pipeline_id: pipeline_id.to_string(),
151 stream_id: stream_id.to_string(),
152 offset,
153 last_event_id: event_id,
154 updated_at: Utc::now(),
155 },
156 );
157 }
158
159 pub fn get_offset(&self, pipeline_id: &str, stream_id: &str) -> Option<ConsumerOffset> {
161 let key = (pipeline_id.to_string(), stream_id.to_string());
162 self.consumer_offsets.get(&key).map(|v| v.clone())
163 }
164
165 pub fn record_processing(&self, pipeline_id: &str, event_id: Uuid) {
167 let receipt = ProcessingReceipt {
168 pipeline_id: pipeline_id.to_string(),
169 event_id,
170 };
171 self.processing_receipts.insert(receipt, Utc::now());
172 }
173
174 pub fn was_processed(&self, pipeline_id: &str, event_id: Uuid) -> bool {
176 let receipt = ProcessingReceipt {
177 pipeline_id: pipeline_id.to_string(),
178 event_id,
179 };
180 self.processing_receipts.contains_key(&receipt)
181 }
182
183 fn evict_expired_keys(&self) {
185 let now = Utc::now();
186 let ttl = self.config.idempotency_ttl;
187 self.idempotency_keys
188 .retain(|_, record| now - record.created_at < ttl);
189 }
190
191 pub fn stats(&self) -> ExactlyOnceStats {
193 ExactlyOnceStats {
194 idempotency_keys: self.idempotency_keys.len(),
195 consumer_offsets: self.consumer_offsets.len(),
196 processing_receipts: self.processing_receipts.len(),
197 }
198 }
199}
200
201#[derive(Debug, Clone, Serialize)]
203pub struct ExactlyOnceStats {
204 pub idempotency_keys: usize,
205 pub consumer_offsets: usize,
206 pub processing_receipts: usize,
207}
208
209pub fn extract_idempotency_key(metadata: &Option<serde_json::Value>) -> Option<String> {
211 metadata
212 .as_ref()?
213 .get("idempotency_key")
214 .and_then(|v| v.as_str())
215 .map(|s| s.to_string())
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 fn registry() -> ExactlyOnceRegistry {
223 ExactlyOnceRegistry::new(ExactlyOnceConfig::default())
224 }
225
226 #[test]
227 fn test_new_idempotency_key() {
228 let reg = registry();
229 let id = Uuid::new_v4();
230 match reg.check_idempotency("key-1", id) {
231 IdempotencyResult::New => {}
232 other => panic!("Expected New, got {other:?}"),
233 }
234 }
235
236 #[test]
237 fn test_duplicate_idempotency_key() {
238 let reg = registry();
239 let id1 = Uuid::new_v4();
240 let id2 = Uuid::new_v4();
241 reg.check_idempotency("key-1", id1);
242 match reg.check_idempotency("key-1", id2) {
243 IdempotencyResult::Duplicate { original_event_id } => {
244 assert_eq!(original_event_id, id1);
245 }
246 other => panic!("Expected Duplicate, got {other:?}"),
247 }
248 }
249
250 #[test]
251 fn test_expired_key_treated_as_new() {
252 let reg = ExactlyOnceRegistry::new(ExactlyOnceConfig {
253 idempotency_ttl: Duration::seconds(-1), ..Default::default()
255 });
256 let id1 = Uuid::new_v4();
257 let id2 = Uuid::new_v4();
258 reg.check_idempotency("key-1", id1);
259 match reg.check_idempotency("key-1", id2) {
260 IdempotencyResult::New => {}
261 other => panic!("Expected New (expired), got {other:?}"),
262 }
263 }
264
265 #[test]
266 fn test_consumer_offset_commit_and_get() {
267 let reg = registry();
268 let eid = Uuid::new_v4();
269 reg.commit_offset("pipeline-1", "stream-a", 42, eid);
270 let offset = reg.get_offset("pipeline-1", "stream-a").unwrap();
271 assert_eq!(offset.offset, 42);
272 assert_eq!(offset.last_event_id, eid);
273 }
274
275 #[test]
276 fn test_consumer_offset_not_found() {
277 let reg = registry();
278 assert!(reg.get_offset("nonexistent", "stream").is_none());
279 }
280
281 #[test]
282 fn test_processing_receipts() {
283 let reg = registry();
284 let eid = Uuid::new_v4();
285 assert!(!reg.was_processed("pipeline-1", eid));
286 reg.record_processing("pipeline-1", eid);
287 assert!(reg.was_processed("pipeline-1", eid));
288 }
289
290 #[test]
291 fn test_processing_receipt_different_pipeline() {
292 let reg = registry();
293 let eid = Uuid::new_v4();
294 reg.record_processing("pipeline-1", eid);
295 assert!(!reg.was_processed("pipeline-2", eid));
296 }
297
298 #[test]
299 fn test_extract_idempotency_key() {
300 let meta = Some(serde_json::json!({"idempotency_key": "abc-123"}));
301 assert_eq!(extract_idempotency_key(&meta), Some("abc-123".to_string()));
302
303 let no_key = Some(serde_json::json!({"source": "web"}));
304 assert_eq!(extract_idempotency_key(&no_key), None);
305
306 assert_eq!(extract_idempotency_key(&None), None);
307 }
308
309 #[test]
310 fn test_stats() {
311 let reg = registry();
312 let id = Uuid::new_v4();
313 reg.check_idempotency("k1", id);
314 reg.commit_offset("p1", "s1", 0, id);
315 reg.record_processing("p1", id);
316 let stats = reg.stats();
317 assert_eq!(stats.idempotency_keys, 1);
318 assert_eq!(stats.consumer_offsets, 1);
319 assert_eq!(stats.processing_receipts, 1);
320 }
321
322 #[test]
323 fn test_eviction_on_capacity() {
324 let reg = ExactlyOnceRegistry::new(ExactlyOnceConfig {
325 idempotency_ttl: Duration::seconds(-1), max_idempotency_keys: 2,
327 });
328 reg.idempotency_keys.insert(
330 "old-1".to_string(),
331 IdempotencyRecord {
332 event_id: Uuid::new_v4(),
333 created_at: Utc::now() - Duration::hours(48),
334 },
335 );
336 reg.idempotency_keys.insert(
337 "old-2".to_string(),
338 IdempotencyRecord {
339 event_id: Uuid::new_v4(),
340 created_at: Utc::now() - Duration::hours(48),
341 },
342 );
343 let id = Uuid::new_v4();
345 reg.check_idempotency("new-key", id);
346 assert!(reg.idempotency_keys.len() <= 2);
348 }
349}