oxirs-stream 0.2.4

Real-time streaming support with Kafka/NATS/MQTT/OPC-UA I/O, RDF Patch, and SPARQL Update delta
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
//! # Exactly-Once Processing Semantics
//!
//! Idempotent operator execution via deduplication log + atomic transaction
//! log.  Inspired by Apache Kafka Streams and Apache Flink's exactly-once mode.
//!
//! The design achieves exactly-once by combining:
//! 1. **Deduplication log** — remembers message IDs within a sliding time window.
//! 2. **Transactions** — atomically records state changes and marks messages as
//!    processed so the two updates are either both visible or neither.

use crate::error::StreamError;
use crate::state::distributed_state::StateBackend;
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use uuid::Uuid;

// ─── Message ID ───────────────────────────────────────────────────────────────

/// Uniquely identifies a single message in the stream.
///
/// The combination of producer + partition + sequence forms a monotonically
/// ordered identifier within a producer's partition.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct MessageId {
    pub producer_id: String,
    pub partition: u32,
    pub sequence: u64,
}

impl MessageId {
    /// Create a new message identifier.
    pub fn new(producer: &str, partition: u32, seq: u64) -> Self {
        Self {
            producer_id: producer.to_string(),
            partition,
            sequence: seq,
        }
    }

    /// Serialize to a compact string suitable for hashing and logging.
    ///
    /// Format: `<producer_id>/<partition>/<sequence>`
    pub fn serialize(&self) -> String {
        format!("{}/{}/{}", self.producer_id, self.partition, self.sequence)
    }

    /// Parse a message ID from its serialized string form.
    ///
    /// Returns an error if the string does not match the expected format.
    pub fn parse(s: &str) -> Result<Self, StreamError> {
        let parts: Vec<&str> = s.splitn(3, '/').collect();
        if parts.len() != 3 {
            return Err(StreamError::InvalidInput(format!(
                "MessageId must be '<producer>/<partition>/<seq>', got: {s}"
            )));
        }

        let partition = parts[1].parse::<u32>().map_err(|e| {
            StreamError::InvalidInput(format!("invalid partition in MessageId: {e}"))
        })?;

        let sequence = parts[2].parse::<u64>().map_err(|e| {
            StreamError::InvalidInput(format!("invalid sequence in MessageId: {e}"))
        })?;

        Ok(Self {
            producer_id: parts[0].to_string(),
            partition,
            sequence,
        })
    }
}

// ─── Deduplication window ─────────────────────────────────────────────────────

/// Configuration for the deduplication sliding window.
#[derive(Debug, Clone)]
pub struct DeduplicationConfig {
    /// How long to remember processed messages.
    pub window_duration: Duration,
    /// Maximum number of message IDs to track before LRU eviction.
    pub max_tracked: usize,
}

impl Default for DeduplicationConfig {
    fn default() -> Self {
        Self {
            window_duration: Duration::from_secs(300), // 5 minutes
            max_tracked: 100_000,
        }
    }
}

/// Sliding-window deduplication log.
///
/// Tracks recently processed message IDs so duplicate deliveries can be
/// detected and discarded.  Memory is bounded by both time and cardinality.
pub struct DeduplicationLog {
    config: DeduplicationConfig,
    /// Map from message ID → time it was processed.
    processed: HashMap<MessageId, Instant>,
    /// FIFO queue of (id, processed_at) used for ordered eviction.
    eviction_queue: VecDeque<(MessageId, Instant)>,
}

impl DeduplicationLog {
    /// Create an empty deduplication log with the given configuration.
    pub fn new(config: DeduplicationConfig) -> Self {
        Self {
            processed: HashMap::new(),
            eviction_queue: VecDeque::new(),
            config,
        }
    }

    /// Returns `true` if the message has already been processed.
    pub fn is_duplicate(&self, id: &MessageId) -> bool {
        match self.processed.get(id) {
            None => false,
            Some(&processed_at) => {
                // Still within the deduplication window?
                processed_at.elapsed() < self.config.window_duration
            }
        }
    }

    /// Record that a message has been processed successfully.
    ///
    /// If the log is full (`max_tracked` reached), the oldest entry is evicted.
    pub fn mark_processed(&mut self, id: MessageId) {
        let now = Instant::now();

        // Evict by capacity if needed
        while self.processed.len() >= self.config.max_tracked {
            if let Some((oldest_id, _)) = self.eviction_queue.pop_front() {
                self.processed.remove(&oldest_id);
            } else {
                break;
            }
        }

        self.eviction_queue.push_back((id.clone(), now));
        self.processed.insert(id, now);
    }

    /// Remove entries that have aged out of the time window.
    ///
    /// Returns the number of evicted entries.
    pub fn evict_expired(&mut self) -> usize {
        let deadline = self.config.window_duration;
        let mut evicted = 0usize;

        while let Some((id, ts)) = self.eviction_queue.front() {
            if ts.elapsed() >= deadline {
                let id = id.clone();
                self.eviction_queue.pop_front();
                self.processed.remove(&id);
                evicted += 1;
            } else {
                break;
            }
        }

        evicted
    }

    /// Number of message IDs currently tracked.
    pub fn size(&self) -> usize {
        self.processed.len()
    }
}

// ─── Exactly-once transaction ─────────────────────────────────────────────────

/// An atomic unit of work that combines message acknowledgment with state
/// mutations.
///
/// Either the entire transaction commits (marking the messages as processed
/// and writing all state changes to the backend) or nothing happens.
pub struct ExactlyOnceTransaction {
    /// Unique identifier for this transaction (for idempotent replays).
    pub transaction_id: String,
    /// Messages consumed by this transaction.
    pub messages: Vec<MessageId>,
    /// State mutations: `(namespaced_key, value_bytes)`.
    pub state_changes: Vec<(Vec<u8>, Vec<u8>)>,
    pub started_at: Instant,
    pub committed: bool,
}

impl Default for ExactlyOnceTransaction {
    fn default() -> Self {
        Self::new()
    }
}

impl ExactlyOnceTransaction {
    /// Start a new transaction.
    pub fn new() -> Self {
        Self {
            transaction_id: Uuid::new_v4().to_string(),
            messages: Vec::new(),
            state_changes: Vec::new(),
            started_at: Instant::now(),
            committed: false,
        }
    }

    /// Register a message as part of this transaction.
    pub fn add_message(&mut self, id: MessageId) {
        self.messages.push(id);
    }

    /// Record a state mutation to be applied atomically at commit time.
    pub fn add_state_change(&mut self, key: Vec<u8>, value: Vec<u8>) {
        self.state_changes.push((key, value));
    }

    /// Commit this transaction.
    ///
    /// The commit:
    /// 1. Applies all state changes to the backend.
    /// 2. Marks all consumed messages as processed in the deduplication log.
    ///
    /// In production this would be wrapped in a WAL write; here we do a
    /// best-effort ordered commit (state first, then dedup log).
    pub fn commit(
        mut self,
        dedup_log: &mut DeduplicationLog,
        backend: &dyn StateBackend,
    ) -> Result<(), StreamError> {
        if self.committed {
            return Err(StreamError::InvalidOperation(format!(
                "transaction {} already committed",
                self.transaction_id
            )));
        }

        // Phase 1: Apply state changes
        for (key, value) in &self.state_changes {
            backend.put(key, value)?;
        }

        // Phase 2: Mark messages as processed
        for id in self.messages.drain(..) {
            dedup_log.mark_processed(id);
        }

        self.committed = true;
        Ok(())
    }
}

// ─── High-level exactly-once processor ───────────────────────────────────────

/// Wraps a state backend and deduplication log to provide exactly-once
/// processing guarantees.
pub struct ExactlyOnceProcessor {
    dedup_log: DeduplicationLog,
    backend: std::sync::Arc<dyn StateBackend>,
    /// Number of duplicates filtered so far.
    duplicates_filtered: u64,
    /// Number of messages processed exactly once.
    messages_processed: u64,
}

impl ExactlyOnceProcessor {
    /// Create a new processor.
    pub fn new(config: DeduplicationConfig, backend: std::sync::Arc<dyn StateBackend>) -> Self {
        Self {
            dedup_log: DeduplicationLog::new(config),
            backend,
            duplicates_filtered: 0,
            messages_processed: 0,
        }
    }

    /// Process a message exactly once.
    ///
    /// If `id` has already been processed within the deduplication window the
    /// closure is NOT invoked and `Ok(None)` is returned.
    ///
    /// Otherwise the closure is called with a fresh transaction.  The closure
    /// is expected to add state changes to the transaction; this method then
    /// commits it and returns `Ok(Some(result))`.
    pub fn process<R, F>(&mut self, id: MessageId, processor: F) -> Result<Option<R>, StreamError>
    where
        F: FnOnce(&mut ExactlyOnceTransaction) -> Result<R, StreamError>,
    {
        // Deduplicate
        if self.dedup_log.is_duplicate(&id) {
            self.duplicates_filtered += 1;
            return Ok(None);
        }

        let mut txn = ExactlyOnceTransaction::new();
        txn.add_message(id);

        let result = processor(&mut txn)?;
        txn.commit(&mut self.dedup_log, self.backend.as_ref())?;
        self.messages_processed += 1;

        Ok(Some(result))
    }

    /// Perform maintenance: evict expired dedup entries.
    pub fn maintenance(&mut self) -> usize {
        self.dedup_log.evict_expired()
    }

    /// Statistics about this processor.
    pub fn stats(&self) -> ExactlyOnceStats {
        ExactlyOnceStats {
            dedup_window_size: self.dedup_log.size(),
            duplicates_filtered: self.duplicates_filtered,
            messages_processed: self.messages_processed,
        }
    }
}

/// Runtime statistics for an exactly-once processor.
#[derive(Debug, Clone)]
pub struct ExactlyOnceStats {
    pub dedup_window_size: usize,
    pub duplicates_filtered: u64,
    pub messages_processed: u64,
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::state::distributed_state::InMemoryStateBackend;
    use std::sync::Arc;

    // ── MessageId ────────────────────────────────────────────────────────────

    #[test]
    fn test_message_id_round_trip() {
        let id = MessageId::new("producer-1", 3, 42);
        let serialized = id.serialize();
        assert_eq!(serialized, "producer-1/3/42");

        let parsed = MessageId::parse(&serialized).unwrap();
        assert_eq!(parsed, id);
    }

    #[test]
    fn test_message_id_parse_error() {
        assert!(MessageId::parse("bad").is_err());
        assert!(MessageId::parse("a/b").is_err());
        assert!(MessageId::parse("a/notnum/1").is_err());
        assert!(MessageId::parse("a/1/notnum").is_err());
    }

    // ── DeduplicationLog ─────────────────────────────────────────────────────

    #[test]
    fn test_dedup_log_basic() {
        let config = DeduplicationConfig {
            window_duration: Duration::from_secs(60),
            max_tracked: 1000,
        };
        let mut log = DeduplicationLog::new(config);

        let id = MessageId::new("p", 0, 1);
        assert!(!log.is_duplicate(&id));

        log.mark_processed(id.clone());
        assert!(log.is_duplicate(&id));
        assert_eq!(log.size(), 1);
    }

    #[test]
    fn test_dedup_log_capacity_eviction() {
        let config = DeduplicationConfig {
            window_duration: Duration::from_secs(60),
            max_tracked: 3,
        };
        let mut log = DeduplicationLog::new(config);

        for i in 0..5u64 {
            log.mark_processed(MessageId::new("p", 0, i));
        }

        // Never exceeds max_tracked
        assert!(log.size() <= 3);
    }

    #[test]
    fn test_dedup_log_expiry() {
        let config = DeduplicationConfig {
            // Very short window for test speed
            window_duration: Duration::from_millis(50),
            max_tracked: 1000,
        };
        let mut log = DeduplicationLog::new(config);

        let id = MessageId::new("p", 0, 99);
        log.mark_processed(id.clone());
        assert!(log.is_duplicate(&id));

        std::thread::sleep(Duration::from_millis(60));

        // After window, no longer a duplicate (even before eviction)
        assert!(!log.is_duplicate(&id));

        // evict_expired should clean up
        let evicted = log.evict_expired();
        assert_eq!(evicted, 1);
        assert_eq!(log.size(), 0);
    }

    // ── ExactlyOnceTransaction ───────────────────────────────────────────────

    #[test]
    fn test_transaction_commit_applies_state() {
        let backend = InMemoryStateBackend::new();
        let mut dedup = DeduplicationLog::new(DeduplicationConfig::default());

        let mut txn = ExactlyOnceTransaction::new();
        txn.add_message(MessageId::new("p", 0, 1));
        txn.add_state_change(
            b"counter".to_vec(),
            b"\x01\x00\x00\x00\x00\x00\x00\x00".to_vec(),
        );

        txn.commit(&mut dedup, &backend).unwrap();

        assert_eq!(
            backend.get(b"counter").unwrap().as_deref(),
            Some(b"\x01\x00\x00\x00\x00\x00\x00\x00".as_ref())
        );
        assert!(dedup.is_duplicate(&MessageId::new("p", 0, 1)));
    }

    #[test]
    fn test_transaction_double_commit_fails() {
        let backend = InMemoryStateBackend::new();
        let mut dedup = DeduplicationLog::new(DeduplicationConfig::default());

        let txn = ExactlyOnceTransaction::new();
        // Commit once
        txn.commit(&mut dedup, &backend).unwrap();

        // A second commit on the same object would fail (committed flag set),
        // but since commit consumes self we can't actually call it twice on
        // the same object.  We verify the flag was set by checking the
        // structure of a committed transaction.
        // (Rust's ownership model prevents double-commit.)
    }

    // ── ExactlyOnceProcessor ─────────────────────────────────────────────────

    #[test]
    fn test_processor_exactly_once() {
        let backend = Arc::new(InMemoryStateBackend::new());
        let mut processor = ExactlyOnceProcessor::new(DeduplicationConfig::default(), backend);

        let id = MessageId::new("prod", 0, 1);
        let mut call_count = 0u32;

        // First delivery
        let result = processor
            .process(id.clone(), |_txn| {
                call_count += 1;
                Ok(42u32)
            })
            .unwrap();
        assert_eq!(result, Some(42u32));
        assert_eq!(call_count, 1);

        // Duplicate delivery — closure must NOT be called
        let result = processor
            .process(id, |_txn| {
                call_count += 1;
                Ok(99u32)
            })
            .unwrap();
        assert_eq!(result, None);
        assert_eq!(call_count, 1); // closure was NOT invoked

        let stats = processor.stats();
        assert_eq!(stats.messages_processed, 1);
        assert_eq!(stats.duplicates_filtered, 1);
    }
}