ubiquisync_sql/hlc_storage.rs
1//! SQL-backed [`HlcStorage`] for the hybrid logical clock.
2//!
3//! Owns the clock's schema — a single-row register table, named from a caller
4//! prefix — and the SQL behind it. [`save`](HlcStorage::save) enqueues into a
5//! [`DbBatch`] sink rather than committing on its own, so the clock state lands
6//! in the same transaction as the write it timestamps. The persist upsert is
7//! MAX-guarded, so an out-of-order commit can never lower the stored clock.
8//!
9//! The clock is read exactly once, at startup: [`SqlHlcStorage::open`] ensures
10//! the table and loads the seed asynchronously, then caches it so the
11//! synchronous [`load`](HlcStorage::load) the service calls just returns it.
12
13use ubiquisync_core::hlc::HlcStorage;
14
15use crate::{
16 db::{Db, DbBatch, DbError, DbType, DbValue},
17 dialect::SqlDialect,
18 util::quote_ident,
19};
20
21/// Persistence for the clock register, scoped to a table-name prefix.
22pub struct SqlHlcStorage {
23 /// Seed read from the register at `open`, or `None` for a fresh store.
24 seed: Option<u64>,
25 /// Pre-rendered MAX-guarded upsert, built once from the resolved table name.
26 persist_sql: String,
27}
28
29impl SqlHlcStorage {
30 /// Ensure the clock register exists and read its seed — the clock's only
31 /// read, done once at startup. `prefix` namespaces the table so multiple
32 /// stores can share a database.
33 pub async fn open(db: &dyn Db, prefix: &str) -> Result<Self, DbError> {
34 let table = quote_ident(&format!("{prefix}__hlc"));
35 db.exec(&create_sql(&table, db.dialect()), &[]).await?;
36 let seed = match db.query(&load_sql(&table), &[]).await?.first() {
37 Some(row) => Some(row.get_u64(0)?),
38 None => None,
39 };
40 Ok(Self {
41 seed,
42 persist_sql: persist_sql(&table, db.dialect()),
43 })
44 }
45}
46
47impl HlcStorage for SqlHlcStorage {
48 type Error = DbError;
49 type Sink = dyn DbBatch;
50
51 /// Return the seed cached at `open`. No I/O — the read already happened.
52 fn load(&self) -> Result<Option<u64>, Self::Error> {
53 Ok(self.seed)
54 }
55
56 /// Enqueue the clock-state upsert into `sink`. Durable when the caller
57 /// commits the sink. The packed `u64` occupies the full 64-bit width
58 /// (48-bit millis `<< 16` | 16-bit counter), so [`DbValue::from_u64`]
59 /// rejects a value past `i64::MAX` rather than wrap it negative — which
60 /// would also break the signed `MAX`-guard merge. The real clock stays far
61 /// below that bound (millis below 2^47, ~year 6400).
62 fn save(&self, sink: &mut Self::Sink, raw: u64) -> Result<(), Self::Error> {
63 sink.add_statement(&self.persist_sql, &[DbValue::from_u64(raw)?]);
64 Ok(())
65 }
66}
67
68/// DDL for the register: exactly one row, pinned at `id = 1`.
69fn create_sql(table: &str, dialect: SqlDialect) -> String {
70 let int_type = DbType::Integer.sql_type(dialect);
71 format!(
72 "CREATE TABLE IF NOT EXISTS {table} (\n \
73 id {int_type} PRIMARY KEY CHECK (id = 1),\n \
74 ts {int_type} NOT NULL DEFAULT 0\n)"
75 )
76}
77
78/// Reads the single register row.
79fn load_sql(table: &str) -> String {
80 format!("SELECT ts FROM {table} WHERE id = 1")
81}
82
83/// MAX-guarded upsert: a stale commit cannot lower the stored clock below a
84/// value an earlier-issued (but later-committed) write set.
85fn persist_sql(table: &str, dialect: SqlDialect) -> String {
86 let max = dialect.scalar_max();
87 let p1 = dialect.placeholder(1);
88 format!(
89 "INSERT INTO {table} (id, ts) VALUES (1, {p1}) \
90 ON CONFLICT(id) DO UPDATE SET ts = {max}(COALESCE(ts, 0), EXCLUDED.ts)"
91 )
92}