Skip to main content

ubiquisync_sql/
hlc_storage.rs

1//! SQL-backed [`HlcStorage`] for the hybrid logical clock.
2//!
3//! Owns the clock's schema — a single-row register table, named from a caller
4//! prefix — and the SQL behind it. [`save`](HlcStorage::save) enqueues into a
5//! [`DbBatch`] sink rather than committing on its own, so the clock state lands
6//! in the same transaction as the write it timestamps. The persist upsert is
7//! MAX-guarded, so an out-of-order commit can never lower the stored clock.
8//!
9//! The clock is read exactly once, at startup: [`SqlHlcStorage::open`] ensures
10//! the table and loads the seed asynchronously, then caches it so the
11//! synchronous [`load`](HlcStorage::load) the service calls just returns it.
12
13use ubiquisync_core::hlc::HlcStorage;
14
15use crate::{
16    db::{Db, DbBatch, DbError, DbType, DbValue},
17    dialect::SqlDialect,
18    util::quote_ident,
19};
20
21/// Persistence for the clock register, scoped to a table-name prefix.
22pub struct SqlHlcStorage {
23    /// Seed read from the register at `open`, or `None` for a fresh store.
24    seed: Option<u64>,
25    /// Pre-rendered MAX-guarded upsert, built once from the resolved table name.
26    persist_sql: String,
27}
28
29impl SqlHlcStorage {
30    /// Ensure the clock register exists and read its seed — the clock's only
31    /// read, done once at startup. `prefix` namespaces the table so multiple
32    /// stores can share a database.
33    pub async fn open(db: &dyn Db, prefix: &str) -> Result<Self, DbError> {
34        let table = quote_ident(&format!("{prefix}__hlc"));
35        db.exec(&create_sql(&table, db.dialect()), &[]).await?;
36        let seed = match db.query(&load_sql(&table), &[]).await?.first() {
37            Some(row) => Some(row.get_u64(0)?),
38            None => None,
39        };
40        Ok(Self {
41            seed,
42            persist_sql: persist_sql(&table, db.dialect()),
43        })
44    }
45}
46
47impl HlcStorage for SqlHlcStorage {
48    type Error = DbError;
49    type Sink = dyn DbBatch;
50
51    /// Return the seed cached at `open`. No I/O — the read already happened.
52    fn load(&self) -> Result<Option<u64>, Self::Error> {
53        Ok(self.seed)
54    }
55
56    /// Enqueue the clock-state upsert into `sink`. Durable when the caller
57    /// commits the sink. The packed `u64` occupies the full 64-bit width
58    /// (48-bit millis `<< 16` | 16-bit counter), so [`DbValue::from_u64`]
59    /// rejects a value past `i64::MAX` rather than wrap it negative — which
60    /// would also break the signed `MAX`-guard merge. The real clock stays far
61    /// below that bound (millis below 2^47, ~year 6400).
62    fn save(&self, sink: &mut Self::Sink, raw: u64) -> Result<(), Self::Error> {
63        sink.add_statement(&self.persist_sql, &[DbValue::from_u64(raw)?]);
64        Ok(())
65    }
66}
67
68/// DDL for the register: exactly one row, pinned at `id = 1`.
69fn create_sql(table: &str, dialect: SqlDialect) -> String {
70    let int_type = DbType::Integer.sql_type(dialect);
71    format!(
72        "CREATE TABLE IF NOT EXISTS {table} (\n    \
73         id {int_type} PRIMARY KEY CHECK (id = 1),\n    \
74         ts {int_type} NOT NULL DEFAULT 0\n)"
75    )
76}
77
78/// Reads the single register row.
79fn load_sql(table: &str) -> String {
80    format!("SELECT ts FROM {table} WHERE id = 1")
81}
82
83/// MAX-guarded upsert: a stale commit cannot lower the stored clock below a
84/// value an earlier-issued (but later-committed) write set.
85fn persist_sql(table: &str, dialect: SqlDialect) -> String {
86    let max = dialect.scalar_max();
87    let p1 = dialect.placeholder(1);
88    format!(
89        "INSERT INTO {table} (id, ts) VALUES (1, {p1}) \
90         ON CONFLICT(id) DO UPDATE SET ts = {max}(COALESCE(ts, 0), EXCLUDED.ts)"
91    )
92}