Skip to main content

faucet_core/
idempotency.rs

1//! Exactly-once / idempotent delivery primitives.
2//!
3//! The pipeline issues a monotonic **commit token** for every page that carries
4//! a bookmark. The token is persisted in the [`StateStore`](crate::state::StateStore)
5//! value next to the bookmark and committed inside the sink's own transaction,
6//! so a crash between "sink durably wrote" and "state persisted" is resolved on
7//! resume by skipping pages the sink already committed. See
8//! `docs/superpowers/specs/2026-06-09-exactly-once-delivery-design.md`.
9
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12
13/// Delivery guarantee for a pipeline run.
14#[derive(
15    Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
16)]
17#[serde(rename_all = "snake_case")]
18pub enum DeliveryMode {
19    /// Today's behaviour: a page may be re-delivered after a crash between the
20    /// sink write and the bookmark persist. Downstream must tolerate duplicates.
21    #[default]
22    AtLeastOnce,
23    /// The sink durably records a per-page commit token atomically with the
24    /// data; on resume the pipeline skips already-committed pages. Requires a
25    /// state store, an idempotent sink, and a deterministic-replay source.
26    ExactlyOnce,
27}
28
29/// Reserved key marking the exactly-once state wrapper object.
30const EO_MARKER: &str = "__faucet_eo";
31const EO_BOOKMARK: &str = "bookmark";
32const EO_SEQ: &str = "seq";
33
34/// Width of the zero-padded decimal token. `u64::MAX` is 20 digits, so 20 makes
35/// lexicographic order match numeric order for the full `u64` range.
36const TOKEN_WIDTH: usize = 20;
37
38/// Render a page sequence as a fixed-width, lexicographically-ordered token.
39pub fn format_token(seq: u64) -> String {
40    format!("{seq:0TOKEN_WIDTH$}")
41}
42
43/// Parse a token produced by [`format_token`]. Returns `None` on garbage.
44pub fn parse_token(s: &str) -> Option<u64> {
45    s.trim().parse::<u64>().ok()
46}
47
48/// Wrap a bookmark + sequence into the exactly-once state value.
49pub fn wrap_state(bookmark: Option<&Value>, seq: u64) -> Value {
50    serde_json::json!({
51        EO_MARKER: 1,
52        EO_BOOKMARK: bookmark.cloned().unwrap_or(Value::Null),
53        EO_SEQ: seq,
54    })
55}
56
57/// Unwrap a stored state value into `(bookmark, seq)`.
58///
59/// A value that is the exactly-once wrapper object unwraps to its inner
60/// bookmark + seq. Anything else is treated as a legacy/at-least-once **bare
61/// bookmark** with `seq = 0` — so switching an existing pipeline to
62/// `exactly_once` resumes cleanly (the sink's own watermark is authoritative).
63pub fn unwrap_state(value: &Value) -> (Option<Value>, u64) {
64    if let Value::Object(map) = value
65        && map.get(EO_MARKER).and_then(Value::as_u64) == Some(1)
66    {
67        let bookmark = match map.get(EO_BOOKMARK) {
68            None | Some(Value::Null) => None,
69            Some(v) => Some(v.clone()),
70        };
71        let seq = map.get(EO_SEQ).and_then(Value::as_u64).unwrap_or(0);
72        return (bookmark, seq);
73    }
74    // Legacy bare bookmark.
75    let bookmark = if value.is_null() {
76        None
77    } else {
78        Some(value.clone())
79    };
80    (bookmark, 0)
81}
82
83/// Canonical watermark table the SQL sinks UPSERT the commit token into.
84pub const COMMIT_TOKEN_TABLE: &str = "_faucet_commit_token";
85/// Watermark column holding the pipeline state-key (`{name}::{row_id}`).
86pub const COMMIT_TOKEN_SCOPE_COL: &str = "scope";
87/// Watermark column holding the latest committed token.
88pub const COMMIT_TOKEN_TOKEN_COL: &str = "token";
89
90/// Iceberg snapshot summary property names.
91pub const ICEBERG_SCOPE_PROP: &str = "faucet.commit-scope";
92pub const ICEBERG_TOKEN_PROP: &str = "faucet.commit-token";
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use serde_json::json;
98
99    #[test]
100    fn token_round_trips_and_orders_lexicographically() {
101        assert_eq!(format_token(42).len(), TOKEN_WIDTH);
102        assert_eq!(parse_token(&format_token(42)), Some(42));
103        assert_eq!(parse_token(&format_token(0)), Some(0));
104        assert_eq!(parse_token(&format_token(u64::MAX)), Some(u64::MAX));
105        assert!(format_token(9) < format_token(10));
106        assert!(format_token(2) < format_token(1000));
107    }
108
109    #[test]
110    fn parse_token_rejects_garbage() {
111        assert_eq!(parse_token("abc"), None);
112        assert_eq!(parse_token(""), None);
113    }
114
115    #[test]
116    fn wrap_then_unwrap_preserves_bookmark_and_seq() {
117        let bm = json!({"lsn": "0/16B2D58"});
118        let wrapped = wrap_state(Some(&bm), 7);
119        let (got_bm, got_seq) = unwrap_state(&wrapped);
120        assert_eq!(got_bm, Some(bm));
121        assert_eq!(got_seq, 7);
122    }
123
124    #[test]
125    fn wrap_none_bookmark_unwraps_to_none() {
126        let wrapped = wrap_state(None, 3);
127        let (got_bm, got_seq) = unwrap_state(&wrapped);
128        assert_eq!(got_bm, None);
129        assert_eq!(got_seq, 3);
130    }
131
132    #[test]
133    fn legacy_bare_bookmark_unwraps_with_seq_zero() {
134        let (bm, seq) = unwrap_state(&json!("2024-12-01"));
135        assert_eq!(bm, Some(json!("2024-12-01")));
136        assert_eq!(seq, 0);
137        let (bm2, seq2) = unwrap_state(&json!({"updated_at": "2024-12-01"}));
138        assert_eq!(bm2, Some(json!({"updated_at": "2024-12-01"})));
139        assert_eq!(seq2, 0);
140    }
141
142    #[test]
143    fn object_with_non_sentinel_marker_is_treated_as_bare_bookmark() {
144        // A legacy/user object that merely contains the key must NOT be misread
145        // as an EO wrapper — only the typed sentinel `1` counts.
146        let v = json!({"__faucet_eo": null, "offset": 500});
147        let (bm, seq) = unwrap_state(&v);
148        assert_eq!(bm, Some(v));
149        assert_eq!(seq, 0);
150    }
151
152    #[test]
153    fn null_value_unwraps_to_none_seq_zero() {
154        let (bm, seq) = unwrap_state(&json!(null));
155        assert_eq!(bm, None);
156        assert_eq!(seq, 0);
157    }
158
159    #[test]
160    fn delivery_mode_serde_is_snake_case_and_defaults_at_least_once() {
161        assert_eq!(DeliveryMode::default(), DeliveryMode::AtLeastOnce);
162        assert_eq!(
163            serde_json::to_string(&DeliveryMode::ExactlyOnce).unwrap(),
164            "\"exactly_once\""
165        );
166        let m: DeliveryMode = serde_json::from_str("\"at_least_once\"").unwrap();
167        assert_eq!(m, DeliveryMode::AtLeastOnce);
168    }
169}