use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(
Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
)]
#[serde(rename_all = "snake_case")]
pub enum DeliveryMode {
#[default]
AtLeastOnce,
ExactlyOnce,
}
const EO_MARKER: &str = "__faucet_eo";
const EO_BOOKMARK: &str = "bookmark";
const EO_SEQ: &str = "seq";
const TOKEN_WIDTH: usize = 20;
pub fn format_token(seq: u64) -> String {
format!("{seq:0TOKEN_WIDTH$}")
}
pub fn parse_token(s: &str) -> Option<u64> {
s.trim().parse::<u64>().ok()
}
pub fn wrap_state(bookmark: Option<&Value>, seq: u64) -> Value {
serde_json::json!({
EO_MARKER: 1,
EO_BOOKMARK: bookmark.cloned().unwrap_or(Value::Null),
EO_SEQ: seq,
})
}
pub fn unwrap_state(value: &Value) -> (Option<Value>, u64) {
if let Value::Object(map) = value
&& map.get(EO_MARKER).and_then(Value::as_u64) == Some(1)
{
let bookmark = match map.get(EO_BOOKMARK) {
None | Some(Value::Null) => None,
Some(v) => Some(v.clone()),
};
let seq = map.get(EO_SEQ).and_then(Value::as_u64).unwrap_or(0);
return (bookmark, seq);
}
let bookmark = if value.is_null() {
None
} else {
Some(value.clone())
};
(bookmark, 0)
}
pub const COMMIT_TOKEN_TABLE: &str = "_faucet_commit_token";
pub const COMMIT_TOKEN_SCOPE_COL: &str = "scope";
pub const COMMIT_TOKEN_TOKEN_COL: &str = "token";
pub const ICEBERG_SCOPE_PROP: &str = "faucet.commit-scope";
pub const ICEBERG_TOKEN_PROP: &str = "faucet.commit-token";
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn token_round_trips_and_orders_lexicographically() {
assert_eq!(format_token(42).len(), TOKEN_WIDTH);
assert_eq!(parse_token(&format_token(42)), Some(42));
assert_eq!(parse_token(&format_token(0)), Some(0));
assert_eq!(parse_token(&format_token(u64::MAX)), Some(u64::MAX));
assert!(format_token(9) < format_token(10));
assert!(format_token(2) < format_token(1000));
}
#[test]
fn parse_token_rejects_garbage() {
assert_eq!(parse_token("abc"), None);
assert_eq!(parse_token(""), None);
}
#[test]
fn wrap_then_unwrap_preserves_bookmark_and_seq() {
let bm = json!({"lsn": "0/16B2D58"});
let wrapped = wrap_state(Some(&bm), 7);
let (got_bm, got_seq) = unwrap_state(&wrapped);
assert_eq!(got_bm, Some(bm));
assert_eq!(got_seq, 7);
}
#[test]
fn wrap_none_bookmark_unwraps_to_none() {
let wrapped = wrap_state(None, 3);
let (got_bm, got_seq) = unwrap_state(&wrapped);
assert_eq!(got_bm, None);
assert_eq!(got_seq, 3);
}
#[test]
fn legacy_bare_bookmark_unwraps_with_seq_zero() {
let (bm, seq) = unwrap_state(&json!("2024-12-01"));
assert_eq!(bm, Some(json!("2024-12-01")));
assert_eq!(seq, 0);
let (bm2, seq2) = unwrap_state(&json!({"updated_at": "2024-12-01"}));
assert_eq!(bm2, Some(json!({"updated_at": "2024-12-01"})));
assert_eq!(seq2, 0);
}
#[test]
fn object_with_non_sentinel_marker_is_treated_as_bare_bookmark() {
let v = json!({"__faucet_eo": null, "offset": 500});
let (bm, seq) = unwrap_state(&v);
assert_eq!(bm, Some(v));
assert_eq!(seq, 0);
}
#[test]
fn null_value_unwraps_to_none_seq_zero() {
let (bm, seq) = unwrap_state(&json!(null));
assert_eq!(bm, None);
assert_eq!(seq, 0);
}
#[test]
fn delivery_mode_serde_is_snake_case_and_defaults_at_least_once() {
assert_eq!(DeliveryMode::default(), DeliveryMode::AtLeastOnce);
assert_eq!(
serde_json::to_string(&DeliveryMode::ExactlyOnce).unwrap(),
"\"exactly_once\""
);
let m: DeliveryMode = serde_json::from_str("\"at_least_once\"").unwrap();
assert_eq!(m, DeliveryMode::AtLeastOnce);
}
}