Skip to main content

oxirs_stream/fault_tolerance/
exactly_once.rs

1//! # End-to-end exactly-once coordinator
2//!
3//! Combines three pre-existing primitives into a single high-level coordinator:
4//!
5//! 1. **Deduplication** via [`crate::state::exactly_once::ExactlyOnceProcessor`].
6//! 2. **Idempotent producers** — each batch carries a producer-scoped
7//!    [`ProducerStamp`] (`producer_id`, `partition`, `sequence`) so retries
8//!    are absorbed by downstream consumers. The stamp is convertible to
9//!    [`crate::state::exactly_once::MessageId`] for the deduplication log.
10//! 3. **Atomic transactions on ingress** — the coordinator opens a
11//!    transaction (a la Kafka transactional producer), atomically applies
12//!    state changes, then either commits or aborts.
13//!
14//! Combined, these three give the end-to-end exactly-once semantics required
15//! for streaming aggregations and joins to be safely re-played after a crash:
16//! the ingress dedups, the producers tag retries, and the transactions ensure
17//! all-or-nothing visibility.
18
19use std::collections::VecDeque;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22
23use parking_lot::Mutex;
24use serde::{Deserialize, Serialize};
25use thiserror::Error;
26use tracing::{debug, warn};
27
28use crate::error::StreamError;
29use crate::state::distributed_state::StateBackend;
30use crate::state::exactly_once::{
31    DeduplicationConfig, ExactlyOnceProcessor as InnerProcessor, MessageId,
32};
33
34// ─── Errors ─────────────────────────────────────────────────────────────────
35
36/// Errors raised by the exactly-once coordinator.
37#[derive(Debug, Error)]
38pub enum ExactlyOnceError {
39    /// Underlying state backend or deduplication failure.
40    #[error("processing error: {0}")]
41    Processing(String),
42    /// Caller attempted to commit an already-committed transaction.
43    #[error("transaction already committed")]
44    AlreadyCommitted,
45    /// Caller attempted to abort an already-finalised transaction.
46    #[error("transaction already finalised")]
47    AlreadyFinalised,
48    /// Caller misused the API (e.g. passed an unknown txn id).
49    #[error("invalid call: {0}")]
50    Invalid(String),
51}
52
53impl From<StreamError> for ExactlyOnceError {
54    fn from(err: StreamError) -> Self {
55        ExactlyOnceError::Processing(err.to_string())
56    }
57}
58
59/// Convenience alias.
60pub type ExactlyOnceResult<T> = std::result::Result<T, ExactlyOnceError>;
61
62// ─── Idempotent producer ───────────────────────────────────────────────────
63
64/// Configuration for an idempotent producer (per-partition stream of values).
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct IdempotentProducerConfig {
67    /// Stable producer identifier — chosen by the operator and must survive
68    /// restart so retries land with the same producer id.
69    pub producer_id: String,
70    /// Partition this producer is responsible for.
71    pub partition: u32,
72    /// Initial sequence number; usually `0` on a fresh producer or the last
73    /// committed sequence on recovery.
74    pub initial_sequence: u64,
75}
76
77/// Producer-side stamp emitted alongside every event.
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79pub struct ProducerStamp {
80    /// Stable producer id.
81    pub producer_id: String,
82    /// Partition id.
83    pub partition: u32,
84    /// Monotonically increasing sequence within `(producer_id, partition)`.
85    pub sequence: u64,
86}
87
88impl ProducerStamp {
89    /// Convert into a [`MessageId`] suitable for the deduplication log.
90    pub fn message_id(&self) -> MessageId {
91        MessageId::new(&self.producer_id, self.partition, self.sequence)
92    }
93}
94
95/// Idempotent producer for a single `(producer_id, partition)` stream.
96pub struct IdempotentProducer {
97    config: IdempotentProducerConfig,
98    next_seq: AtomicU64,
99    /// Records the last `replay_window` stamps so retries can be detected
100    /// without reaching all the way to the dedup log.
101    replay_window: Mutex<VecDeque<ProducerStamp>>,
102    replay_capacity: usize,
103}
104
105impl IdempotentProducer {
106    /// Build a new producer.
107    pub fn new(config: IdempotentProducerConfig) -> Self {
108        let initial = config.initial_sequence;
109        Self {
110            config,
111            next_seq: AtomicU64::new(initial),
112            replay_window: Mutex::new(VecDeque::with_capacity(1024)),
113            replay_capacity: 1024,
114        }
115    }
116
117    /// Producer id.
118    pub fn producer_id(&self) -> &str {
119        &self.config.producer_id
120    }
121
122    /// Partition.
123    pub fn partition(&self) -> u32 {
124        self.config.partition
125    }
126
127    /// Latest committed sequence (the sequence of the most-recently issued
128    /// stamp).
129    pub fn current_sequence(&self) -> u64 {
130        self.next_seq.load(Ordering::Relaxed)
131    }
132
133    /// Issue a fresh stamp; the sequence is monotonic and unique within the
134    /// producer's partition.
135    pub fn issue(&self) -> ProducerStamp {
136        let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
137        let stamp = ProducerStamp {
138            producer_id: self.config.producer_id.clone(),
139            partition: self.config.partition,
140            sequence: seq,
141        };
142        self.remember(stamp.clone());
143        stamp
144    }
145
146    /// Re-issue a stamp at a specific sequence (used during recovery to
147    /// re-emit committed-but-not-acked messages).
148    pub fn reissue(&self, sequence: u64) -> ProducerStamp {
149        // Bump `next_seq` past `sequence` if we have not yet caught up.
150        loop {
151            let cur = self.next_seq.load(Ordering::Relaxed);
152            if cur > sequence {
153                break;
154            }
155            if self
156                .next_seq
157                .compare_exchange(cur, sequence + 1, Ordering::Relaxed, Ordering::Relaxed)
158                .is_ok()
159            {
160                break;
161            }
162        }
163        let stamp = ProducerStamp {
164            producer_id: self.config.producer_id.clone(),
165            partition: self.config.partition,
166            sequence,
167        };
168        self.remember(stamp.clone());
169        stamp
170    }
171
172    /// Returns true if `seq` has already been issued by this producer (within
173    /// the in-memory window).
174    pub fn was_issued(&self, sequence: u64) -> bool {
175        self.replay_window
176            .lock()
177            .iter()
178            .any(|s| s.sequence == sequence)
179    }
180
181    fn remember(&self, stamp: ProducerStamp) {
182        let mut win = self.replay_window.lock();
183        if win.len() >= self.replay_capacity {
184            win.pop_front();
185        }
186        win.push_back(stamp);
187    }
188}
189
190// ─── Coordinator stats ─────────────────────────────────────────────────────
191
192/// Runtime stats for [`EndToEndExactlyOnceCoordinator`].
193#[derive(Debug, Default)]
194pub struct ExactlyOnceCoordinatorStats {
195    pub messages_received: AtomicU64,
196    pub duplicates_filtered: AtomicU64,
197    pub transactions_opened: AtomicU64,
198    pub transactions_committed: AtomicU64,
199    pub transactions_aborted: AtomicU64,
200}
201
202impl ExactlyOnceCoordinatorStats {
203    /// Plain serialisable snapshot.
204    pub fn snapshot(&self) -> ExactlyOnceStatsSnapshot {
205        ExactlyOnceStatsSnapshot {
206            messages_received: self.messages_received.load(Ordering::Relaxed),
207            duplicates_filtered: self.duplicates_filtered.load(Ordering::Relaxed),
208            transactions_opened: self.transactions_opened.load(Ordering::Relaxed),
209            transactions_committed: self.transactions_committed.load(Ordering::Relaxed),
210            transactions_aborted: self.transactions_aborted.load(Ordering::Relaxed),
211        }
212    }
213}
214
215/// Plain stats snapshot.
216#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
217pub struct ExactlyOnceStatsSnapshot {
218    pub messages_received: u64,
219    pub duplicates_filtered: u64,
220    pub transactions_opened: u64,
221    pub transactions_committed: u64,
222    pub transactions_aborted: u64,
223}
224
225// ─── Coordinator ───────────────────────────────────────────────────────────
226
227/// Configuration for [`EndToEndExactlyOnceCoordinator`].
228#[derive(Debug, Clone, Default)]
229pub struct ExactlyOnceCoordinatorConfig {
230    /// Underlying deduplication window.
231    pub dedup: DeduplicationConfig,
232}
233
234/// State of an open transaction.
235#[derive(Debug)]
236struct PendingTxn {
237    /// Producer stamp that opened the transaction.
238    stamp: ProducerStamp,
239    /// Pending state changes (`(key, value)` pairs).
240    changes: Vec<(Vec<u8>, Vec<u8>)>,
241}
242
243/// End-to-end exactly-once coordinator.
244pub struct EndToEndExactlyOnceCoordinator {
245    config: ExactlyOnceCoordinatorConfig,
246    inner: Arc<Mutex<InnerProcessor>>,
247    backend: Arc<dyn StateBackend>,
248    pending: Mutex<std::collections::HashMap<String, PendingTxn>>,
249    stats: Arc<ExactlyOnceCoordinatorStats>,
250    next_txn: AtomicU64,
251}
252
253impl EndToEndExactlyOnceCoordinator {
254    /// Build the coordinator.
255    pub fn new(config: ExactlyOnceCoordinatorConfig, backend: Arc<dyn StateBackend>) -> Self {
256        let inner = InnerProcessor::new(config.dedup.clone(), backend.clone());
257        Self {
258            config,
259            inner: Arc::new(Mutex::new(inner)),
260            backend,
261            pending: Mutex::new(std::collections::HashMap::new()),
262            stats: Arc::new(ExactlyOnceCoordinatorStats::default()),
263            next_txn: AtomicU64::new(1),
264        }
265    }
266
267    /// Stats accessor.
268    pub fn stats(&self) -> &Arc<ExactlyOnceCoordinatorStats> {
269        &self.stats
270    }
271
272    /// Number of currently open transactions.
273    pub fn pending_transactions(&self) -> usize {
274        self.pending.lock().len()
275    }
276
277    /// Begin a new exactly-once transaction.
278    ///
279    /// Returns `Ok(None)` if the message has already been processed (duplicate
280    /// retry) — in which case the caller should drop it without further work.
281    /// Returns `Ok(Some(txn_id))` for fresh messages: the caller proceeds to
282    /// stage state changes and then either [`Self::commit_transaction`] or
283    /// [`Self::abort_transaction`].
284    pub fn begin_transaction(&self, stamp: ProducerStamp) -> ExactlyOnceResult<Option<String>> {
285        self.stats.messages_received.fetch_add(1, Ordering::Relaxed);
286        let id = stamp.message_id();
287        let mut inner = self.inner.lock();
288        // We only want the dedup check here — `process` would commit
289        // immediately, but we want a two-phase API. So we peek at the
290        // duplicates by trying to reserve. If duplicate → bail out.
291        let dup_check = inner
292            .process(id.clone(), |_txn| Ok::<bool, StreamError>(true))
293            .map_err(ExactlyOnceError::from)?;
294        match dup_check {
295            None => {
296                self.stats
297                    .duplicates_filtered
298                    .fetch_add(1, Ordering::Relaxed);
299                Ok(None)
300            }
301            Some(_) => {
302                // The inner processor has marked the id as processed and
303                // committed an empty transaction. We open our own pending
304                // txn record so the caller can stage state changes that
305                // will be applied atomically when they call
306                // `commit_transaction`. This decouples the dedup commit (which
307                // is durable) from the staged state changes (which are not yet
308                // applied).
309                let txn_id = format!("txn-{}", self.next_txn.fetch_add(1, Ordering::Relaxed));
310                self.pending.lock().insert(
311                    txn_id.clone(),
312                    PendingTxn {
313                        stamp,
314                        changes: Vec::new(),
315                    },
316                );
317                self.stats
318                    .transactions_opened
319                    .fetch_add(1, Ordering::Relaxed);
320                Ok(Some(txn_id))
321            }
322        }
323    }
324
325    /// Stage a state change inside an open transaction.
326    pub fn add_state_change(
327        &self,
328        txn_id: &str,
329        key: Vec<u8>,
330        value: Vec<u8>,
331    ) -> ExactlyOnceResult<()> {
332        let mut pending = self.pending.lock();
333        let txn = pending
334            .get_mut(txn_id)
335            .ok_or_else(|| ExactlyOnceError::Invalid(format!("unknown txn {txn_id}")))?;
336        txn.changes.push((key, value));
337        Ok(())
338    }
339
340    /// Atomically apply staged state changes and finalise the transaction.
341    pub fn commit_transaction(&self, txn_id: &str) -> ExactlyOnceResult<()> {
342        let txn = self
343            .pending
344            .lock()
345            .remove(txn_id)
346            .ok_or_else(|| ExactlyOnceError::Invalid(format!("unknown txn {txn_id}")))?;
347        for (k, v) in &txn.changes {
348            self.backend
349                .put(k, v)
350                .map_err(|e| ExactlyOnceError::Processing(e.to_string()))?;
351        }
352        self.stats
353            .transactions_committed
354            .fetch_add(1, Ordering::Relaxed);
355        debug!(stamp = ?txn.stamp, "exactly-once: txn committed");
356        Ok(())
357    }
358
359    /// Abort a transaction; staged changes are discarded.
360    ///
361    /// Note: the deduplication log already considers the message as processed
362    /// (the dedup commit happened in [`Self::begin_transaction`]). This is the
363    /// chosen semantics: an aborted transaction means "we tried once and gave
364    /// up" — re-delivery would be a duplicate. Callers that need re-tryable
365    /// transactions should not abort; instead, they should re-stage the
366    /// changes through a new transaction with a fresh producer stamp.
367    pub fn abort_transaction(&self, txn_id: &str) -> ExactlyOnceResult<()> {
368        let txn = self
369            .pending
370            .lock()
371            .remove(txn_id)
372            .ok_or_else(|| ExactlyOnceError::Invalid(format!("unknown txn {txn_id}")))?;
373        warn!(stamp = ?txn.stamp, "exactly-once: txn aborted");
374        self.stats
375            .transactions_aborted
376            .fetch_add(1, Ordering::Relaxed);
377        Ok(())
378    }
379
380    /// Force a maintenance cycle on the deduplication log.
381    pub fn maintenance(&self) -> usize {
382        self.inner.lock().maintenance()
383    }
384
385    /// Configuration accessor.
386    pub fn config(&self) -> &ExactlyOnceCoordinatorConfig {
387        &self.config
388    }
389}
390
391// ─── Tests ──────────────────────────────────────────────────────────────────
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396    use crate::state::distributed_state::InMemoryStateBackend;
397
398    fn make_backend() -> Arc<dyn StateBackend> {
399        Arc::new(InMemoryStateBackend::new())
400    }
401
402    #[test]
403    fn idempotent_producer_issues_monotonic_stamps() {
404        let producer = IdempotentProducer::new(IdempotentProducerConfig {
405            producer_id: "p1".into(),
406            partition: 0,
407            initial_sequence: 0,
408        });
409        let s1 = producer.issue();
410        let s2 = producer.issue();
411        let s3 = producer.issue();
412        assert_eq!(s1.sequence, 0);
413        assert_eq!(s2.sequence, 1);
414        assert_eq!(s3.sequence, 2);
415        assert!(producer.was_issued(0));
416        assert!(producer.was_issued(2));
417    }
418
419    #[test]
420    fn idempotent_producer_reissue_advances_sequence() {
421        let producer = IdempotentProducer::new(IdempotentProducerConfig {
422            producer_id: "p1".into(),
423            partition: 0,
424            initial_sequence: 0,
425        });
426        let s = producer.reissue(7);
427        assert_eq!(s.sequence, 7);
428        let next = producer.issue();
429        assert_eq!(next.sequence, 8);
430    }
431
432    #[test]
433    fn coordinator_filters_duplicate_messages() {
434        let coord = EndToEndExactlyOnceCoordinator::new(
435            ExactlyOnceCoordinatorConfig::default(),
436            make_backend(),
437        );
438        let stamp = ProducerStamp {
439            producer_id: "p".into(),
440            partition: 0,
441            sequence: 0,
442        };
443        let txn1 = coord.begin_transaction(stamp.clone()).expect("ok");
444        assert!(txn1.is_some());
445        let txn2 = coord.begin_transaction(stamp.clone()).expect("ok");
446        assert!(txn2.is_none());
447        let stats = coord.stats().snapshot();
448        assert_eq!(stats.duplicates_filtered, 1);
449    }
450
451    #[test]
452    fn coordinator_commits_state_changes_atomically() {
453        let backend = make_backend();
454        let coord = EndToEndExactlyOnceCoordinator::new(
455            ExactlyOnceCoordinatorConfig::default(),
456            backend.clone(),
457        );
458        let stamp = ProducerStamp {
459            producer_id: "p".into(),
460            partition: 0,
461            sequence: 0,
462        };
463        let txn = coord.begin_transaction(stamp).expect("ok").expect("fresh");
464        coord
465            .add_state_change(&txn, b"k1".to_vec(), b"v1".to_vec())
466            .expect("ok");
467        coord
468            .add_state_change(&txn, b"k2".to_vec(), b"v2".to_vec())
469            .expect("ok");
470        // Pre-commit: changes should not yet be visible.
471        assert!(backend.get(b"k1").expect("ok").is_none());
472        coord.commit_transaction(&txn).expect("commit");
473        // Post-commit: changes are visible.
474        assert_eq!(backend.get(b"k1").expect("ok"), Some(b"v1".to_vec()));
475        assert_eq!(backend.get(b"k2").expect("ok"), Some(b"v2".to_vec()));
476        let stats = coord.stats().snapshot();
477        assert_eq!(stats.transactions_committed, 1);
478        assert_eq!(coord.pending_transactions(), 0);
479    }
480
481    #[test]
482    fn coordinator_aborts_drop_changes() {
483        let backend = make_backend();
484        let coord = EndToEndExactlyOnceCoordinator::new(
485            ExactlyOnceCoordinatorConfig::default(),
486            backend.clone(),
487        );
488        let stamp = ProducerStamp {
489            producer_id: "p".into(),
490            partition: 0,
491            sequence: 0,
492        };
493        let txn = coord.begin_transaction(stamp).expect("ok").expect("fresh");
494        coord
495            .add_state_change(&txn, b"x".to_vec(), b"y".to_vec())
496            .expect("ok");
497        coord.abort_transaction(&txn).expect("abort");
498        assert!(backend.get(b"x").expect("ok").is_none());
499        let stats = coord.stats().snapshot();
500        assert_eq!(stats.transactions_aborted, 1);
501    }
502
503    #[test]
504    fn coordinator_unknown_txn_id_errors() {
505        let coord = EndToEndExactlyOnceCoordinator::new(
506            ExactlyOnceCoordinatorConfig::default(),
507            make_backend(),
508        );
509        let err = coord
510            .add_state_change("bad", vec![], vec![])
511            .expect_err("should fail");
512        assert!(matches!(err, ExactlyOnceError::Invalid(_)));
513    }
514
515    #[test]
516    fn producer_stamp_round_trip_to_message_id() {
517        let stamp = ProducerStamp {
518            producer_id: "p".into(),
519            partition: 1,
520            sequence: 4,
521        };
522        let id = stamp.message_id();
523        assert_eq!(id.producer_id, "p");
524        assert_eq!(id.partition, 1);
525        assert_eq!(id.sequence, 4);
526    }
527}