faucet_core/
idempotency.rs1use serde::{Deserialize, Serialize};
11use serde_json::Value;
12
13#[derive(
15 Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
16)]
17#[serde(rename_all = "snake_case")]
18pub enum DeliveryMode {
19 #[default]
22 AtLeastOnce,
23 ExactlyOnce,
27}
28
29const EO_MARKER: &str = "__faucet_eo";
31const EO_BOOKMARK: &str = "bookmark";
32const EO_SEQ: &str = "seq";
33
34const TOKEN_WIDTH: usize = 20;
37
38pub fn format_token(seq: u64) -> String {
40 format!("{seq:0TOKEN_WIDTH$}")
41}
42
43pub fn parse_token(s: &str) -> Option<u64> {
45 s.trim().parse::<u64>().ok()
46}
47
48pub fn wrap_state(bookmark: Option<&Value>, seq: u64) -> Value {
50 serde_json::json!({
51 EO_MARKER: 1,
52 EO_BOOKMARK: bookmark.cloned().unwrap_or(Value::Null),
53 EO_SEQ: seq,
54 })
55}
56
57pub fn unwrap_state(value: &Value) -> (Option<Value>, u64) {
64 if let Value::Object(map) = value
65 && map.get(EO_MARKER).and_then(Value::as_u64) == Some(1)
66 {
67 let bookmark = match map.get(EO_BOOKMARK) {
68 None | Some(Value::Null) => None,
69 Some(v) => Some(v.clone()),
70 };
71 let seq = map.get(EO_SEQ).and_then(Value::as_u64).unwrap_or(0);
72 return (bookmark, seq);
73 }
74 let bookmark = if value.is_null() {
76 None
77 } else {
78 Some(value.clone())
79 };
80 (bookmark, 0)
81}
82
83pub const COMMIT_TOKEN_TABLE: &str = "_faucet_commit_token";
85pub const COMMIT_TOKEN_SCOPE_COL: &str = "scope";
87pub const COMMIT_TOKEN_TOKEN_COL: &str = "token";
89
90pub const ICEBERG_SCOPE_PROP: &str = "faucet.commit-scope";
92pub const ICEBERG_TOKEN_PROP: &str = "faucet.commit-token";
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use serde_json::json;
98
99 #[test]
100 fn token_round_trips_and_orders_lexicographically() {
101 assert_eq!(format_token(42).len(), TOKEN_WIDTH);
102 assert_eq!(parse_token(&format_token(42)), Some(42));
103 assert_eq!(parse_token(&format_token(0)), Some(0));
104 assert_eq!(parse_token(&format_token(u64::MAX)), Some(u64::MAX));
105 assert!(format_token(9) < format_token(10));
106 assert!(format_token(2) < format_token(1000));
107 }
108
109 #[test]
110 fn parse_token_rejects_garbage() {
111 assert_eq!(parse_token("abc"), None);
112 assert_eq!(parse_token(""), None);
113 }
114
115 #[test]
116 fn wrap_then_unwrap_preserves_bookmark_and_seq() {
117 let bm = json!({"lsn": "0/16B2D58"});
118 let wrapped = wrap_state(Some(&bm), 7);
119 let (got_bm, got_seq) = unwrap_state(&wrapped);
120 assert_eq!(got_bm, Some(bm));
121 assert_eq!(got_seq, 7);
122 }
123
124 #[test]
125 fn wrap_none_bookmark_unwraps_to_none() {
126 let wrapped = wrap_state(None, 3);
127 let (got_bm, got_seq) = unwrap_state(&wrapped);
128 assert_eq!(got_bm, None);
129 assert_eq!(got_seq, 3);
130 }
131
132 #[test]
133 fn legacy_bare_bookmark_unwraps_with_seq_zero() {
134 let (bm, seq) = unwrap_state(&json!("2024-12-01"));
135 assert_eq!(bm, Some(json!("2024-12-01")));
136 assert_eq!(seq, 0);
137 let (bm2, seq2) = unwrap_state(&json!({"updated_at": "2024-12-01"}));
138 assert_eq!(bm2, Some(json!({"updated_at": "2024-12-01"})));
139 assert_eq!(seq2, 0);
140 }
141
142 #[test]
143 fn object_with_non_sentinel_marker_is_treated_as_bare_bookmark() {
144 let v = json!({"__faucet_eo": null, "offset": 500});
147 let (bm, seq) = unwrap_state(&v);
148 assert_eq!(bm, Some(v));
149 assert_eq!(seq, 0);
150 }
151
152 #[test]
153 fn null_value_unwraps_to_none_seq_zero() {
154 let (bm, seq) = unwrap_state(&json!(null));
155 assert_eq!(bm, None);
156 assert_eq!(seq, 0);
157 }
158
159 #[test]
160 fn delivery_mode_serde_is_snake_case_and_defaults_at_least_once() {
161 assert_eq!(DeliveryMode::default(), DeliveryMode::AtLeastOnce);
162 assert_eq!(
163 serde_json::to_string(&DeliveryMode::ExactlyOnce).unwrap(),
164 "\"exactly_once\""
165 );
166 let m: DeliveryMode = serde_json::from_str("\"at_least_once\"").unwrap();
167 assert_eq!(m, DeliveryMode::AtLeastOnce);
168 }
169}