use ubiquisync_core::hlc::HlcStorage;
use crate::{
db::{Db, DbBatch, DbError, DbType, DbValue},
dialect::SqlDialect,
util::quote_ident,
};
pub struct SqlHlcStorage {
seed: Option<u64>,
persist_sql: String,
}
impl SqlHlcStorage {
pub async fn open(db: &dyn Db, prefix: &str) -> Result<Self, DbError> {
let table = quote_ident(&format!("{prefix}__hlc"));
db.exec(&create_sql(&table, db.dialect()), &[]).await?;
let seed = match db.query(&load_sql(&table), &[]).await?.first() {
Some(row) => Some(row.get_u64(0)?),
None => None,
};
Ok(Self {
seed,
persist_sql: persist_sql(&table, db.dialect()),
})
}
}
impl HlcStorage for SqlHlcStorage {
type Error = DbError;
type Sink = dyn DbBatch;
fn load(&self) -> Result<Option<u64>, Self::Error> {
Ok(self.seed)
}
fn save(&self, sink: &mut Self::Sink, raw: u64) -> Result<(), Self::Error> {
sink.add_statement(&self.persist_sql, &[DbValue::from_u64(raw)?]);
Ok(())
}
}
fn create_sql(table: &str, dialect: SqlDialect) -> String {
let int_type = DbType::Integer.sql_type(dialect);
format!(
"CREATE TABLE IF NOT EXISTS {table} (\n \
id {int_type} PRIMARY KEY CHECK (id = 1),\n \
ts {int_type} NOT NULL DEFAULT 0\n)"
)
}
fn load_sql(table: &str) -> String {
format!("SELECT ts FROM {table} WHERE id = 1")
}
fn persist_sql(table: &str, dialect: SqlDialect) -> String {
let max = dialect.scalar_max();
let p1 = dialect.placeholder(1);
format!(
"INSERT INTO {table} (id, ts) VALUES (1, {p1}) \
ON CONFLICT(id) DO UPDATE SET ts = {max}(COALESCE(ts, 0), EXCLUDED.ts)"
)
}