Skip to main content

allsource_core/application/services/
exactly_once.rs

1//! Exactly-once stream processing guarantees
2//!
3//! Provides deduplication and idempotency tracking for event ingestion and
4//! pipeline processing, ensuring that retried or duplicated events don't
5//! result in duplicate state changes.
6//!
7//! ## Mechanisms
8//!
9//! 1. **Idempotency keys**: Clients include an `idempotency_key` in event metadata.
10//!    The registry tracks which keys have been processed and rejects duplicates.
11//!
12//! 2. **Consumer offsets**: Pipeline processors track their read offset per stream.
13//!    On restart, processing resumes from the last committed offset.
14//!
15//! 3. **Processing receipts**: Each event processed by a pipeline gets a receipt
16//!    (hash of pipeline_id + event_id). Receipts prevent reprocessing during replays.
17//!
18//! ## TTL
19//!
20//! Idempotency keys expire after a configurable window (default 24 hours) to
21//! bound memory usage. Consumer offsets persist indefinitely.
22
23use chrono::{DateTime, Duration, Utc};
24use dashmap::DashMap;
25use serde::{Deserialize, Serialize};
26use uuid::Uuid;
27
28/// Configuration for exactly-once processing
29#[derive(Debug, Clone)]
30pub struct ExactlyOnceConfig {
31    /// How long to keep idempotency keys before expiry
32    pub idempotency_ttl: Duration,
33    /// Maximum number of idempotency keys to retain (LRU eviction)
34    pub max_idempotency_keys: usize,
35}
36
37impl Default for ExactlyOnceConfig {
38    fn default() -> Self {
39        Self {
40            idempotency_ttl: Duration::hours(24),
41            max_idempotency_keys: 1_000_000,
42        }
43    }
44}
45
46/// An idempotency record
47#[derive(Debug, Clone, Serialize)]
48struct IdempotencyRecord {
49    /// The event ID that was created for this key
50    event_id: Uuid,
51    /// When this key was first seen
52    created_at: DateTime<Utc>,
53}
54
55/// Consumer offset for a pipeline
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct ConsumerOffset {
58    /// Pipeline ID
59    pub pipeline_id: String,
60    /// Stream/partition identifier
61    pub stream_id: String,
62    /// Last committed event offset (index into the event store)
63    pub offset: usize,
64    /// Last committed event ID
65    pub last_event_id: Uuid,
66    /// When offset was last updated
67    pub updated_at: DateTime<Utc>,
68}
69
70/// Processing receipt proving an event was handled by a pipeline
71#[derive(Debug, Clone, Hash, Eq, PartialEq)]
72struct ProcessingReceipt {
73    pipeline_id: String,
74    event_id: Uuid,
75}
76
77/// Result of an idempotency check
78#[derive(Debug, Clone, Serialize)]
79pub enum IdempotencyResult {
80    /// Key is new, proceed with processing
81    New,
82    /// Key was already processed, return the original event_id
83    Duplicate { original_event_id: Uuid },
84}
85
86/// Exactly-once processing registry
87pub struct ExactlyOnceRegistry {
88    config: ExactlyOnceConfig,
89    /// Idempotency key → record
90    idempotency_keys: DashMap<String, IdempotencyRecord>,
91    /// (pipeline_id, stream_id) → consumer offset
92    consumer_offsets: DashMap<(String, String), ConsumerOffset>,
93    /// Set of processing receipts
94    processing_receipts: DashMap<ProcessingReceipt, DateTime<Utc>>,
95}
96
97impl ExactlyOnceRegistry {
98    pub fn new(config: ExactlyOnceConfig) -> Self {
99        Self {
100            config,
101            idempotency_keys: DashMap::new(),
102            consumer_offsets: DashMap::new(),
103            processing_receipts: DashMap::new(),
104        }
105    }
106
107    /// Check and register an idempotency key
108    ///
109    /// Returns `New` if the key hasn't been seen (and registers it),
110    /// or `Duplicate` with the original event_id if already processed.
111    pub fn check_idempotency(&self, key: &str, event_id: Uuid) -> IdempotencyResult {
112        let now = Utc::now();
113
114        // Check if key exists and is not expired
115        if let Some(existing) = self.idempotency_keys.get(key) {
116            let age = now - existing.created_at;
117            if age < self.config.idempotency_ttl {
118                return IdempotencyResult::Duplicate {
119                    original_event_id: existing.event_id,
120                };
121            }
122            // Expired — remove and treat as new
123            drop(existing);
124            self.idempotency_keys.remove(key);
125        }
126
127        // Evict oldest keys if at capacity
128        if self.idempotency_keys.len() >= self.config.max_idempotency_keys {
129            self.evict_expired_keys();
130        }
131
132        // Register the new key
133        self.idempotency_keys.insert(
134            key.to_string(),
135            IdempotencyRecord {
136                event_id,
137                created_at: now,
138            },
139        );
140
141        IdempotencyResult::New
142    }
143
144    /// Commit a consumer offset for a pipeline
145    pub fn commit_offset(&self, pipeline_id: &str, stream_id: &str, offset: usize, event_id: Uuid) {
146        let key = (pipeline_id.to_string(), stream_id.to_string());
147        self.consumer_offsets.insert(
148            key,
149            ConsumerOffset {
150                pipeline_id: pipeline_id.to_string(),
151                stream_id: stream_id.to_string(),
152                offset,
153                last_event_id: event_id,
154                updated_at: Utc::now(),
155            },
156        );
157    }
158
159    /// Get the last committed offset for a pipeline
160    pub fn get_offset(&self, pipeline_id: &str, stream_id: &str) -> Option<ConsumerOffset> {
161        let key = (pipeline_id.to_string(), stream_id.to_string());
162        self.consumer_offsets.get(&key).map(|v| v.clone())
163    }
164
165    /// Record that a pipeline has processed an event
166    pub fn record_processing(&self, pipeline_id: &str, event_id: Uuid) {
167        let receipt = ProcessingReceipt {
168            pipeline_id: pipeline_id.to_string(),
169            event_id,
170        };
171        self.processing_receipts.insert(receipt, Utc::now());
172    }
173
174    /// Check if a pipeline has already processed an event
175    pub fn was_processed(&self, pipeline_id: &str, event_id: Uuid) -> bool {
176        let receipt = ProcessingReceipt {
177            pipeline_id: pipeline_id.to_string(),
178            event_id,
179        };
180        self.processing_receipts.contains_key(&receipt)
181    }
182
183    /// Remove expired idempotency keys
184    fn evict_expired_keys(&self) {
185        let now = Utc::now();
186        let ttl = self.config.idempotency_ttl;
187        self.idempotency_keys
188            .retain(|_, record| now - record.created_at < ttl);
189    }
190
191    /// Statistics about the exactly-once registry
192    pub fn stats(&self) -> ExactlyOnceStats {
193        ExactlyOnceStats {
194            idempotency_keys: self.idempotency_keys.len(),
195            consumer_offsets: self.consumer_offsets.len(),
196            processing_receipts: self.processing_receipts.len(),
197        }
198    }
199}
200
201/// Exactly-once registry statistics
202#[derive(Debug, Clone, Serialize)]
203pub struct ExactlyOnceStats {
204    pub idempotency_keys: usize,
205    pub consumer_offsets: usize,
206    pub processing_receipts: usize,
207}
208
209/// Extract an idempotency key from event metadata
210pub fn extract_idempotency_key(metadata: &Option<serde_json::Value>) -> Option<String> {
211    metadata
212        .as_ref()?
213        .get("idempotency_key")
214        .and_then(|v| v.as_str())
215        .map(std::string::ToString::to_string)
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    fn registry() -> ExactlyOnceRegistry {
223        ExactlyOnceRegistry::new(ExactlyOnceConfig::default())
224    }
225
226    #[test]
227    fn test_new_idempotency_key() {
228        let reg = registry();
229        let id = Uuid::new_v4();
230        match reg.check_idempotency("key-1", id) {
231            IdempotencyResult::New => {}
232            IdempotencyResult::Duplicate { .. } => panic!("Expected New, got Duplicate"),
233        }
234    }
235
236    #[test]
237    fn test_duplicate_idempotency_key() {
238        let reg = registry();
239        let id1 = Uuid::new_v4();
240        let id2 = Uuid::new_v4();
241        reg.check_idempotency("key-1", id1);
242        match reg.check_idempotency("key-1", id2) {
243            IdempotencyResult::Duplicate { original_event_id } => {
244                assert_eq!(original_event_id, id1);
245            }
246            IdempotencyResult::New => panic!("Expected Duplicate, got New"),
247        }
248    }
249
250    #[test]
251    fn test_expired_key_treated_as_new() {
252        let reg = ExactlyOnceRegistry::new(ExactlyOnceConfig {
253            idempotency_ttl: Duration::seconds(-1), // Already expired
254            ..Default::default()
255        });
256        let id1 = Uuid::new_v4();
257        let id2 = Uuid::new_v4();
258        reg.check_idempotency("key-1", id1);
259        match reg.check_idempotency("key-1", id2) {
260            IdempotencyResult::New => {}
261            IdempotencyResult::Duplicate { .. } => panic!("Expected New (expired), got Duplicate"),
262        }
263    }
264
265    #[test]
266    fn test_consumer_offset_commit_and_get() {
267        let reg = registry();
268        let eid = Uuid::new_v4();
269        reg.commit_offset("pipeline-1", "stream-a", 42, eid);
270        let offset = reg.get_offset("pipeline-1", "stream-a").unwrap();
271        assert_eq!(offset.offset, 42);
272        assert_eq!(offset.last_event_id, eid);
273    }
274
275    #[test]
276    fn test_consumer_offset_not_found() {
277        let reg = registry();
278        assert!(reg.get_offset("nonexistent", "stream").is_none());
279    }
280
281    #[test]
282    fn test_processing_receipts() {
283        let reg = registry();
284        let eid = Uuid::new_v4();
285        assert!(!reg.was_processed("pipeline-1", eid));
286        reg.record_processing("pipeline-1", eid);
287        assert!(reg.was_processed("pipeline-1", eid));
288    }
289
290    #[test]
291    fn test_processing_receipt_different_pipeline() {
292        let reg = registry();
293        let eid = Uuid::new_v4();
294        reg.record_processing("pipeline-1", eid);
295        assert!(!reg.was_processed("pipeline-2", eid));
296    }
297
298    #[test]
299    fn test_extract_idempotency_key() {
300        let meta = Some(serde_json::json!({"idempotency_key": "abc-123"}));
301        assert_eq!(extract_idempotency_key(&meta), Some("abc-123".to_string()));
302
303        let no_key = Some(serde_json::json!({"source": "web"}));
304        assert_eq!(extract_idempotency_key(&no_key), None);
305
306        assert_eq!(extract_idempotency_key(&None), None);
307    }
308
309    #[test]
310    fn test_stats() {
311        let reg = registry();
312        let id = Uuid::new_v4();
313        reg.check_idempotency("k1", id);
314        reg.commit_offset("p1", "s1", 0, id);
315        reg.record_processing("p1", id);
316        let stats = reg.stats();
317        assert_eq!(stats.idempotency_keys, 1);
318        assert_eq!(stats.consumer_offsets, 1);
319        assert_eq!(stats.processing_receipts, 1);
320    }
321
322    #[test]
323    fn test_eviction_on_capacity() {
324        let reg = ExactlyOnceRegistry::new(ExactlyOnceConfig {
325            idempotency_ttl: Duration::seconds(-1), // All expired
326            max_idempotency_keys: 2,
327        });
328        // Fill to capacity with expired keys
329        reg.idempotency_keys.insert(
330            "old-1".to_string(),
331            IdempotencyRecord {
332                event_id: Uuid::new_v4(),
333                created_at: Utc::now() - Duration::hours(48),
334            },
335        );
336        reg.idempotency_keys.insert(
337            "old-2".to_string(),
338            IdempotencyRecord {
339                event_id: Uuid::new_v4(),
340                created_at: Utc::now() - Duration::hours(48),
341            },
342        );
343        // This should trigger eviction
344        let id = Uuid::new_v4();
345        reg.check_idempotency("new-key", id);
346        // Old expired keys should have been evicted
347        assert!(reg.idempotency_keys.len() <= 2);
348    }
349}