use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub const SCHEMA_VERSION: u32 = 1;
const ID_BYTE_LEN: usize = 16;
const ID_HEX_LEN: usize = ID_BYTE_LEN * 2;
#[derive(Debug, thiserror::Error)]
pub enum IdParseError {
#[error("id hex must be exactly 32 chars; got {len}")]
WrongLength { len: usize },
#[error("id hex contains non-hex at byte position {pos}: {snippet:?}")]
InvalidHex { pos: usize, snippet: String },
}
macro_rules! define_id_type {
($(#[$meta:meta])* $name:ident) => {
$(#[$meta])*
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct $name(pub i128);
impl $name {
pub fn to_hex(self) -> String {
let bytes = self.0.to_be_bytes();
let mut out = String::with_capacity(ID_HEX_LEN);
for b in bytes {
use std::fmt::Write as _;
let _ = write!(out, "{b:02x}");
}
out
}
pub fn from_hex(s: &str) -> Result<Self, IdParseError> {
if s.len() != ID_HEX_LEN {
return Err(IdParseError::WrongLength { len: s.len() });
}
let mut bytes = [0u8; ID_BYTE_LEN];
for (i, byte) in bytes.iter_mut().enumerate() {
*byte = u8::from_str_radix(&s[2 * i..2 * i + 2], 16).map_err(|_| {
IdParseError::InvalidHex {
pos: 2 * i,
snippet: s[2 * i..2 * i + 2].to_string(),
}
})?;
}
Ok(Self(i128::from_be_bytes(bytes)))
}
}
impl Serialize for $name {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
s.serialize_str(&self.to_hex())
}
}
impl<'de> Deserialize<'de> for $name {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
let s = String::deserialize(d)?;
Self::from_hex(&s).map_err(serde::de::Error::custom)
}
}
};
}
define_id_type! {
WalId
}
define_id_type! {
RowId
}
define_id_type! {
SupertableHandleId
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum OpKind {
Update,
Delete,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum WalState {
Intent,
Appended,
Complete,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Lease {
pub owner: SupertableHandleId,
pub acquired_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TombstoneEntry {
pub target_id: RowId,
pub outcome: TombstoneOutcome,
#[serde(default)]
pub tombstoned_in_superfile: Option<Uuid>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum TombstoneOutcome {
Pending,
Tombstoned,
NotFound,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SealRecord {
pub compaction_id: Uuid,
pub sealed_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct IdSpan {
pub first: RowId,
pub last: RowId,
}
impl IdSpan {
pub fn len(&self) -> u64 {
debug_assert!(self.first.0 <= self.last.0, "inverted span");
(self.last.0 - self.first.0 + 1) as u64
}
pub fn is_empty(&self) -> bool {
self.first.0 > self.last.0
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WalStateDoc {
pub wal_id: WalId,
pub schema_version: u32,
pub op_kind: OpKind,
pub state: WalState,
pub created_at: DateTime<Utc>,
#[serde(default)]
pub lease: Option<Lease>,
pub predicate_repr: String,
pub target_ids: Vec<RowId>,
#[serde(default)]
pub new_row_count: Option<u32>,
#[serde(default)]
pub new_row_content_hash: Option<String>,
#[serde(default)]
pub preallocated_superfile_id: Option<Uuid>,
#[serde(default)]
pub minted_id_spans: Vec<IdSpan>,
pub tombstone_progress: Vec<TombstoneEntry>,
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_wal_id() -> WalId {
WalId(0x0011_2233_4455_6677_8899_AABB_CCDD_EEFF_i128)
}
#[test]
fn wal_id_hex_round_trips() {
let w = sample_wal_id();
let h = w.to_hex();
assert_eq!(h.len(), 32);
assert_eq!(h, h.to_lowercase());
assert_eq!(WalId::from_hex(&h).expect("parse"), w);
}
#[test]
fn wal_id_hex_zero_pads_high_zero_bytes() {
let h = WalId(1).to_hex();
assert_eq!(h, "00000000000000000000000000000001");
}
#[test]
fn wal_id_hex_preserves_be_byte_order() {
let w = WalId(i128::from_be_bytes([
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
0x0F, 0x00,
]));
assert_eq!(w.to_hex(), "0102030405060708090a0b0c0d0e0f00");
}
#[test]
fn wal_id_from_hex_rejects_wrong_length() {
let too_short = "0000";
assert!(matches!(
WalId::from_hex(too_short),
Err(IdParseError::WrongLength { len: 4 })
));
let too_long = "0".repeat(33);
assert!(matches!(
WalId::from_hex(&too_long),
Err(IdParseError::WrongLength { len: 33 })
));
}
#[test]
fn wal_id_from_hex_rejects_non_hex() {
let mut s = String::from("0").repeat(30);
s.push('z');
s.push('z');
assert!(matches!(
WalId::from_hex(&s),
Err(IdParseError::InvalidHex { pos: 30, .. })
));
}
fn sample_state_doc() -> WalStateDoc {
WalStateDoc {
wal_id: sample_wal_id(),
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Update,
state: WalState::Intent,
created_at: "2026-05-30T10:00:00Z".parse().expect("ts"),
lease: Some(Lease {
owner: SupertableHandleId(42),
acquired_at: "2026-05-30T10:00:01Z".parse().expect("ts"),
expires_at: "2026-05-30T10:01:01Z".parse().expect("ts"),
}),
predicate_repr: "status = 'pending'".into(),
target_ids: vec![RowId(1), RowId(2), RowId(3)],
new_row_count: Some(3),
new_row_content_hash: Some("deadbeef".into()),
preallocated_superfile_id: Some(Uuid::nil()),
minted_id_spans: vec![
IdSpan {
first: RowId(100),
last: RowId(101),
},
IdSpan {
first: RowId(200),
last: RowId(200),
},
],
tombstone_progress: vec![
TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
},
TombstoneEntry {
target_id: RowId(2),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
},
TombstoneEntry {
target_id: RowId(3),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
},
],
}
}
#[test]
fn state_doc_round_trips_update_through_json() {
let original = sample_state_doc();
let json = serde_json::to_string(&original).expect("encode");
let decoded: WalStateDoc = serde_json::from_str(&json).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn state_doc_round_trips_delete_through_json() {
let original = WalStateDoc {
op_kind: OpKind::Delete,
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
..sample_state_doc()
};
let json = serde_json::to_string(&original).expect("encode");
let decoded: WalStateDoc = serde_json::from_str(&json).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn state_doc_tolerates_unknown_fields_on_read() {
let mut json: serde_json::Value = serde_json::to_value(sample_state_doc()).expect("encode");
json.as_object_mut()
.expect("object")
.insert("priority".into(), serde_json::json!("high"));
let serialized = serde_json::to_string(&json).expect("re-encode");
let _: WalStateDoc = serde_json::from_str(&serialized).expect("decode w/ extra field");
}
#[test]
fn state_doc_rejects_unknown_op_kind() {
let json = r#"{
"wal_id": "00000000000000000000000000000001",
"schema_version": 1,
"op_kind": "MERGE",
"state": "INTENT",
"created_at": "2026-05-30T10:00:00Z",
"predicate_repr": "x",
"target_ids": [],
"tombstone_progress": []
}"#;
let err = serde_json::from_str::<WalStateDoc>(json).expect_err("must fail");
assert!(
err.to_string().contains("MERGE")
|| err.to_string().contains("variant")
|| err.to_string().contains("op_kind"),
"expected variant-mismatch error; got: {err}"
);
}
#[test]
fn id_span_len_inclusive() {
let s = IdSpan {
first: RowId(10),
last: RowId(14),
};
assert_eq!(s.len(), 5);
let single = IdSpan {
first: RowId(7),
last: RowId(7),
};
assert_eq!(single.len(), 1);
}
}