Skip to main content

oxirs_stream/state/
exactly_once.rs

1//! # Exactly-Once Processing Semantics
2//!
3//! Idempotent operator execution via deduplication log + atomic transaction
4//! log.  Inspired by Apache Kafka Streams and Apache Flink's exactly-once mode.
5//!
6//! The design achieves exactly-once by combining:
7//! 1. **Deduplication log** — remembers message IDs within a sliding time window.
8//! 2. **Transactions** — atomically records state changes and marks messages as
9//!    processed so the two updates are either both visible or neither.
10
11use crate::error::StreamError;
12use crate::state::distributed_state::StateBackend;
13use std::collections::{HashMap, VecDeque};
14use std::time::{Duration, Instant};
15use uuid::Uuid;
16
17// ─── Message ID ───────────────────────────────────────────────────────────────
18
19/// Uniquely identifies a single message in the stream.
20///
21/// The combination of producer + partition + sequence forms a monotonically
22/// ordered identifier within a producer's partition.
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct MessageId {
25    pub producer_id: String,
26    pub partition: u32,
27    pub sequence: u64,
28}
29
30impl MessageId {
31    /// Create a new message identifier.
32    pub fn new(producer: &str, partition: u32, seq: u64) -> Self {
33        Self {
34            producer_id: producer.to_string(),
35            partition,
36            sequence: seq,
37        }
38    }
39
40    /// Serialize to a compact string suitable for hashing and logging.
41    ///
42    /// Format: `<producer_id>/<partition>/<sequence>`
43    pub fn serialize(&self) -> String {
44        format!("{}/{}/{}", self.producer_id, self.partition, self.sequence)
45    }
46
47    /// Parse a message ID from its serialized string form.
48    ///
49    /// Returns an error if the string does not match the expected format.
50    pub fn parse(s: &str) -> Result<Self, StreamError> {
51        let parts: Vec<&str> = s.splitn(3, '/').collect();
52        if parts.len() != 3 {
53            return Err(StreamError::InvalidInput(format!(
54                "MessageId must be '<producer>/<partition>/<seq>', got: {s}"
55            )));
56        }
57
58        let partition = parts[1].parse::<u32>().map_err(|e| {
59            StreamError::InvalidInput(format!("invalid partition in MessageId: {e}"))
60        })?;
61
62        let sequence = parts[2].parse::<u64>().map_err(|e| {
63            StreamError::InvalidInput(format!("invalid sequence in MessageId: {e}"))
64        })?;
65
66        Ok(Self {
67            producer_id: parts[0].to_string(),
68            partition,
69            sequence,
70        })
71    }
72}
73
74// ─── Deduplication window ─────────────────────────────────────────────────────
75
76/// Configuration for the deduplication sliding window.
77#[derive(Debug, Clone)]
78pub struct DeduplicationConfig {
79    /// How long to remember processed messages.
80    pub window_duration: Duration,
81    /// Maximum number of message IDs to track before LRU eviction.
82    pub max_tracked: usize,
83}
84
85impl Default for DeduplicationConfig {
86    fn default() -> Self {
87        Self {
88            window_duration: Duration::from_secs(300), // 5 minutes
89            max_tracked: 100_000,
90        }
91    }
92}
93
94/// Sliding-window deduplication log.
95///
96/// Tracks recently processed message IDs so duplicate deliveries can be
97/// detected and discarded.  Memory is bounded by both time and cardinality.
98pub struct DeduplicationLog {
99    config: DeduplicationConfig,
100    /// Map from message ID → time it was processed.
101    processed: HashMap<MessageId, Instant>,
102    /// FIFO queue of (id, processed_at) used for ordered eviction.
103    eviction_queue: VecDeque<(MessageId, Instant)>,
104}
105
106impl DeduplicationLog {
107    /// Create an empty deduplication log with the given configuration.
108    pub fn new(config: DeduplicationConfig) -> Self {
109        Self {
110            processed: HashMap::new(),
111            eviction_queue: VecDeque::new(),
112            config,
113        }
114    }
115
116    /// Returns `true` if the message has already been processed.
117    pub fn is_duplicate(&self, id: &MessageId) -> bool {
118        match self.processed.get(id) {
119            None => false,
120            Some(&processed_at) => {
121                // Still within the deduplication window?
122                processed_at.elapsed() < self.config.window_duration
123            }
124        }
125    }
126
127    /// Record that a message has been processed successfully.
128    ///
129    /// If the log is full (`max_tracked` reached), the oldest entry is evicted.
130    pub fn mark_processed(&mut self, id: MessageId) {
131        let now = Instant::now();
132
133        // Evict by capacity if needed
134        while self.processed.len() >= self.config.max_tracked {
135            if let Some((oldest_id, _)) = self.eviction_queue.pop_front() {
136                self.processed.remove(&oldest_id);
137            } else {
138                break;
139            }
140        }
141
142        self.eviction_queue.push_back((id.clone(), now));
143        self.processed.insert(id, now);
144    }
145
146    /// Remove entries that have aged out of the time window.
147    ///
148    /// Returns the number of evicted entries.
149    pub fn evict_expired(&mut self) -> usize {
150        let deadline = self.config.window_duration;
151        let mut evicted = 0usize;
152
153        while let Some((id, ts)) = self.eviction_queue.front() {
154            if ts.elapsed() >= deadline {
155                let id = id.clone();
156                self.eviction_queue.pop_front();
157                self.processed.remove(&id);
158                evicted += 1;
159            } else {
160                break;
161            }
162        }
163
164        evicted
165    }
166
167    /// Number of message IDs currently tracked.
168    pub fn size(&self) -> usize {
169        self.processed.len()
170    }
171}
172
173// ─── Exactly-once transaction ─────────────────────────────────────────────────
174
175/// An atomic unit of work that combines message acknowledgment with state
176/// mutations.
177///
178/// Either the entire transaction commits (marking the messages as processed
179/// and writing all state changes to the backend) or nothing happens.
180pub struct ExactlyOnceTransaction {
181    /// Unique identifier for this transaction (for idempotent replays).
182    pub transaction_id: String,
183    /// Messages consumed by this transaction.
184    pub messages: Vec<MessageId>,
185    /// State mutations: `(namespaced_key, value_bytes)`.
186    pub state_changes: Vec<(Vec<u8>, Vec<u8>)>,
187    pub started_at: Instant,
188    pub committed: bool,
189}
190
191impl Default for ExactlyOnceTransaction {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197impl ExactlyOnceTransaction {
198    /// Start a new transaction.
199    pub fn new() -> Self {
200        Self {
201            transaction_id: Uuid::new_v4().to_string(),
202            messages: Vec::new(),
203            state_changes: Vec::new(),
204            started_at: Instant::now(),
205            committed: false,
206        }
207    }
208
209    /// Register a message as part of this transaction.
210    pub fn add_message(&mut self, id: MessageId) {
211        self.messages.push(id);
212    }
213
214    /// Record a state mutation to be applied atomically at commit time.
215    pub fn add_state_change(&mut self, key: Vec<u8>, value: Vec<u8>) {
216        self.state_changes.push((key, value));
217    }
218
219    /// Commit this transaction.
220    ///
221    /// The commit:
222    /// 1. Applies all state changes to the backend.
223    /// 2. Marks all consumed messages as processed in the deduplication log.
224    ///
225    /// In production this would be wrapped in a WAL write; here we do a
226    /// best-effort ordered commit (state first, then dedup log).
227    pub fn commit(
228        mut self,
229        dedup_log: &mut DeduplicationLog,
230        backend: &dyn StateBackend,
231    ) -> Result<(), StreamError> {
232        if self.committed {
233            return Err(StreamError::InvalidOperation(format!(
234                "transaction {} already committed",
235                self.transaction_id
236            )));
237        }
238
239        // Phase 1: Apply state changes
240        for (key, value) in &self.state_changes {
241            backend.put(key, value)?;
242        }
243
244        // Phase 2: Mark messages as processed
245        for id in self.messages.drain(..) {
246            dedup_log.mark_processed(id);
247        }
248
249        self.committed = true;
250        Ok(())
251    }
252}
253
254// ─── High-level exactly-once processor ───────────────────────────────────────
255
256/// Wraps a state backend and deduplication log to provide exactly-once
257/// processing guarantees.
258pub struct ExactlyOnceProcessor {
259    dedup_log: DeduplicationLog,
260    backend: std::sync::Arc<dyn StateBackend>,
261    /// Number of duplicates filtered so far.
262    duplicates_filtered: u64,
263    /// Number of messages processed exactly once.
264    messages_processed: u64,
265}
266
267impl ExactlyOnceProcessor {
268    /// Create a new processor.
269    pub fn new(config: DeduplicationConfig, backend: std::sync::Arc<dyn StateBackend>) -> Self {
270        Self {
271            dedup_log: DeduplicationLog::new(config),
272            backend,
273            duplicates_filtered: 0,
274            messages_processed: 0,
275        }
276    }
277
278    /// Process a message exactly once.
279    ///
280    /// If `id` has already been processed within the deduplication window the
281    /// closure is NOT invoked and `Ok(None)` is returned.
282    ///
283    /// Otherwise the closure is called with a fresh transaction.  The closure
284    /// is expected to add state changes to the transaction; this method then
285    /// commits it and returns `Ok(Some(result))`.
286    pub fn process<R, F>(&mut self, id: MessageId, processor: F) -> Result<Option<R>, StreamError>
287    where
288        F: FnOnce(&mut ExactlyOnceTransaction) -> Result<R, StreamError>,
289    {
290        // Deduplicate
291        if self.dedup_log.is_duplicate(&id) {
292            self.duplicates_filtered += 1;
293            return Ok(None);
294        }
295
296        let mut txn = ExactlyOnceTransaction::new();
297        txn.add_message(id);
298
299        let result = processor(&mut txn)?;
300        txn.commit(&mut self.dedup_log, self.backend.as_ref())?;
301        self.messages_processed += 1;
302
303        Ok(Some(result))
304    }
305
306    /// Perform maintenance: evict expired dedup entries.
307    pub fn maintenance(&mut self) -> usize {
308        self.dedup_log.evict_expired()
309    }
310
311    /// Statistics about this processor.
312    pub fn stats(&self) -> ExactlyOnceStats {
313        ExactlyOnceStats {
314            dedup_window_size: self.dedup_log.size(),
315            duplicates_filtered: self.duplicates_filtered,
316            messages_processed: self.messages_processed,
317        }
318    }
319}
320
321/// Runtime statistics for an exactly-once processor.
322#[derive(Debug, Clone)]
323pub struct ExactlyOnceStats {
324    pub dedup_window_size: usize,
325    pub duplicates_filtered: u64,
326    pub messages_processed: u64,
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::state::distributed_state::InMemoryStateBackend;
333    use std::sync::Arc;
334
335    // ── MessageId ────────────────────────────────────────────────────────────
336
337    #[test]
338    fn test_message_id_round_trip() {
339        let id = MessageId::new("producer-1", 3, 42);
340        let serialized = id.serialize();
341        assert_eq!(serialized, "producer-1/3/42");
342
343        let parsed = MessageId::parse(&serialized).unwrap();
344        assert_eq!(parsed, id);
345    }
346
347    #[test]
348    fn test_message_id_parse_error() {
349        assert!(MessageId::parse("bad").is_err());
350        assert!(MessageId::parse("a/b").is_err());
351        assert!(MessageId::parse("a/notnum/1").is_err());
352        assert!(MessageId::parse("a/1/notnum").is_err());
353    }
354
355    // ── DeduplicationLog ─────────────────────────────────────────────────────
356
357    #[test]
358    fn test_dedup_log_basic() {
359        let config = DeduplicationConfig {
360            window_duration: Duration::from_secs(60),
361            max_tracked: 1000,
362        };
363        let mut log = DeduplicationLog::new(config);
364
365        let id = MessageId::new("p", 0, 1);
366        assert!(!log.is_duplicate(&id));
367
368        log.mark_processed(id.clone());
369        assert!(log.is_duplicate(&id));
370        assert_eq!(log.size(), 1);
371    }
372
373    #[test]
374    fn test_dedup_log_capacity_eviction() {
375        let config = DeduplicationConfig {
376            window_duration: Duration::from_secs(60),
377            max_tracked: 3,
378        };
379        let mut log = DeduplicationLog::new(config);
380
381        for i in 0..5u64 {
382            log.mark_processed(MessageId::new("p", 0, i));
383        }
384
385        // Never exceeds max_tracked
386        assert!(log.size() <= 3);
387    }
388
389    #[test]
390    fn test_dedup_log_expiry() {
391        let config = DeduplicationConfig {
392            // Very short window for test speed
393            window_duration: Duration::from_millis(50),
394            max_tracked: 1000,
395        };
396        let mut log = DeduplicationLog::new(config);
397
398        let id = MessageId::new("p", 0, 99);
399        log.mark_processed(id.clone());
400        assert!(log.is_duplicate(&id));
401
402        std::thread::sleep(Duration::from_millis(60));
403
404        // After window, no longer a duplicate (even before eviction)
405        assert!(!log.is_duplicate(&id));
406
407        // evict_expired should clean up
408        let evicted = log.evict_expired();
409        assert_eq!(evicted, 1);
410        assert_eq!(log.size(), 0);
411    }
412
413    // ── ExactlyOnceTransaction ───────────────────────────────────────────────
414
415    #[test]
416    fn test_transaction_commit_applies_state() {
417        let backend = InMemoryStateBackend::new();
418        let mut dedup = DeduplicationLog::new(DeduplicationConfig::default());
419
420        let mut txn = ExactlyOnceTransaction::new();
421        txn.add_message(MessageId::new("p", 0, 1));
422        txn.add_state_change(
423            b"counter".to_vec(),
424            b"\x01\x00\x00\x00\x00\x00\x00\x00".to_vec(),
425        );
426
427        txn.commit(&mut dedup, &backend).unwrap();
428
429        assert_eq!(
430            backend.get(b"counter").unwrap().as_deref(),
431            Some(b"\x01\x00\x00\x00\x00\x00\x00\x00".as_ref())
432        );
433        assert!(dedup.is_duplicate(&MessageId::new("p", 0, 1)));
434    }
435
436    #[test]
437    fn test_transaction_double_commit_fails() {
438        let backend = InMemoryStateBackend::new();
439        let mut dedup = DeduplicationLog::new(DeduplicationConfig::default());
440
441        let txn = ExactlyOnceTransaction::new();
442        // Commit once
443        txn.commit(&mut dedup, &backend).unwrap();
444
445        // A second commit on the same object would fail (committed flag set),
446        // but since commit consumes self we can't actually call it twice on
447        // the same object.  We verify the flag was set by checking the
448        // structure of a committed transaction.
449        // (Rust's ownership model prevents double-commit.)
450    }
451
452    // ── ExactlyOnceProcessor ─────────────────────────────────────────────────
453
454    #[test]
455    fn test_processor_exactly_once() {
456        let backend = Arc::new(InMemoryStateBackend::new());
457        let mut processor = ExactlyOnceProcessor::new(DeduplicationConfig::default(), backend);
458
459        let id = MessageId::new("prod", 0, 1);
460        let mut call_count = 0u32;
461
462        // First delivery
463        let result = processor
464            .process(id.clone(), |_txn| {
465                call_count += 1;
466                Ok(42u32)
467            })
468            .unwrap();
469        assert_eq!(result, Some(42u32));
470        assert_eq!(call_count, 1);
471
472        // Duplicate delivery — closure must NOT be called
473        let result = processor
474            .process(id, |_txn| {
475                call_count += 1;
476                Ok(99u32)
477            })
478            .unwrap();
479        assert_eq!(result, None);
480        assert_eq!(call_count, 1); // closure was NOT invoked
481
482        let stats = processor.stats();
483        assert_eq!(stats.messages_processed, 1);
484        assert_eq!(stats.duplicates_filtered, 1);
485    }
486}