use std::marker::PhantomData;
use ubiquisync_core::{
codec::{CodecError, DecodedEntry, IndexableOp, TAG_EXPUNGED},
hlc::Timestamp,
log_entry::LogEntry,
sync::PeerCursors,
uuid::Uuid,
};
use crate::{
db::{Db, DbBatch, DbError, DbType, DbValue, ValueBinder},
util::quote_ident,
};
#[async_trait::async_trait]
pub trait LogTracker<Op>: Sized + Send + Sync {
async fn init(db: &dyn Db, prefix: &str) -> Result<Self, DbError>;
fn track_one(
&self,
peer_id: &Uuid,
entry_idx: u64,
timestamp: Timestamp,
server_user_id: Option<Uuid>,
op: &Op,
batch: &mut dyn DbBatch,
) -> Result<(), LogTrackerError>;
fn track_expunged(
&self,
peer_id: &Uuid,
entry_idx: u64,
hash: &blake3::Hash,
batch: &mut dyn DbBatch,
) -> Result<(), LogTrackerError>;
async fn all_cursors(&self, db: &dyn Db) -> Result<PeerCursors, DbError>;
}
#[async_trait::async_trait]
pub trait HistoryTracker<Op>: LogTracker<Op> {
async fn read_entries(
&self,
db: &dyn Db,
peer_id: &Uuid,
from: u64,
limit: u64,
) -> Result<Vec<(u64, DecodedEntry<Op>)>, LogTrackerError>;
}
#[derive(Debug, thiserror::Error)]
pub enum LogTrackerError {
#[error("codec error: {0}")]
Codec(#[from] CodecError),
#[error("db error: {0}")]
Db(#[from] DbError),
}
pub struct LogIndexTracker<Op> {
quoted_table_name: String,
_phantom: PhantomData<Op>,
}
#[async_trait::async_trait]
impl<Op: IndexableOp + Send + Sync> LogTracker<Op> for LogIndexTracker<Op> {
async fn init(db: &dyn Db, prefix: &str) -> Result<Self, DbError> {
let quoted_table_name = quote_ident(&format!("{prefix}__oplog"));
let dialect = db.dialect();
let int_type = DbType::Integer.sql_type(dialect);
let blob_type = DbType::Blob.sql_type(dialect);
let uuid_type = DbType::Uuid.sql_type(dialect);
let without_rowid = dialect.without_rowid();
let sql = format!(
"CREATE TABLE IF NOT EXISTS {quoted_table_name} (\
peer_id {uuid_type} NOT NULL,\
entry_idx {int_type} NOT NULL,\
server_user_id {uuid_type} NULL,\
ts {int_type} NOT NULL,\
tag {int_type} NOT NULL,\
index_key {blob_type} NULL,\
index_value {blob_type} NULL,\
PRIMARY KEY(peer_id, entry_idx))\
{without_rowid};"
);
db.exec(&sql, &[]).await?;
Ok(Self {
quoted_table_name,
_phantom: Default::default(),
})
}
fn track_one(
&self,
peer_id: &Uuid,
entry_idx: u64,
timestamp: Timestamp,
server_user_id: Option<Uuid>,
op: &Op,
batch: &mut dyn DbBatch,
) -> Result<(), LogTrackerError> {
let mut value_binder = ValueBinder::new(batch.dialect());
let peer_id_bind = value_binder.bind_next(DbValue::Uuid(*peer_id));
let entry_idx_bind = value_binder.bind_next(DbValue::from_u64(entry_idx)?);
let server_user_id_bind = if let Some(server_user_id) = server_user_id {
value_binder.bind_next(DbValue::Uuid(server_user_id))
} else {
value_binder.bind_next(DbValue::Null)
};
let ts_bind = value_binder.bind_next(DbValue::from_u64(timestamp.raw())?);
let index_entry = op.to_index_entry()?;
let tag_bind = value_binder.bind_next(DbValue::Integer(index_entry.tag as i64));
let index_key_bind = value_binder.bind_next(DbValue::Blob(index_entry.key));
let value_bind = value_binder.bind_next(DbValue::Blob(index_entry.value));
let sql = format!(
"INSERT INTO {} (\"peer_id\", \"entry_idx\", \"server_user_id\", \"ts\", \"tag\", \"index_key\", \"index_value\") \
VALUES({peer_id_bind}, {entry_idx_bind}, {server_user_id_bind}, {ts_bind}, {tag_bind}, {index_key_bind}, {value_bind})",
self.quoted_table_name
);
batch.add_statement(&sql, &value_binder.values());
Ok(())
}
async fn all_cursors(&self, db: &dyn Db) -> Result<PeerCursors, DbError> {
let sql = format!(
"SELECT \"peer_id\", MAX(\"entry_idx\") FROM {} GROUP BY \"peer_id\"",
self.quoted_table_name
);
let rows = db.query(&sql, &[]).await?;
let mut cursors = PeerCursors::new();
for row in &rows {
cursors.insert(row.get_uuid(0)?, row.get_u64(1)? + 1);
}
Ok(cursors)
}
fn track_expunged(
&self,
peer_id: &Uuid,
entry_idx: u64,
hash: &blake3::Hash,
batch: &mut dyn DbBatch,
) -> Result<(), LogTrackerError> {
let mut value_binder = ValueBinder::new(batch.dialect());
let peer_id_bind = value_binder.bind_next(DbValue::Uuid(*peer_id));
let entry_idx_bind = value_binder.bind_next(DbValue::from_u64(entry_idx)?);
let tag_bind = value_binder.bind_next(DbValue::Integer(TAG_EXPUNGED as i64));
let hash_bind = value_binder.bind_next(DbValue::Blob(hash.as_bytes().to_vec()));
let sql = format!(
"INSERT INTO {} (\"peer_id\", \"entry_idx\", \"server_user_id\", \"ts\", \"tag\", \"index_key\", \"index_value\") \
VALUES({peer_id_bind}, {entry_idx_bind}, NULL, 0, {tag_bind}, NULL, {hash_bind})",
self.quoted_table_name
);
batch.add_statement(&sql, &value_binder.values());
Ok(())
}
}
#[async_trait::async_trait]
impl<Op: IndexableOp + Send + Sync> HistoryTracker<Op> for LogIndexTracker<Op> {
async fn read_entries(
&self,
db: &dyn Db,
peer_id: &Uuid,
from: u64,
limit: u64,
) -> Result<Vec<(u64, DecodedEntry<Op>)>, LogTrackerError> {
let mut binder = ValueBinder::new(db.dialect());
let peer_bind = binder.bind_next(DbValue::Uuid(*peer_id));
let from_bind = binder.bind_next(DbValue::from_u64(from)?);
let limit_bind = binder.bind_next(DbValue::from_u64(limit)?);
let sql = format!(
"SELECT \"entry_idx\", \"server_user_id\", \"ts\", \"tag\", \"index_key\", \"index_value\" \
FROM {} WHERE \"peer_id\" = {peer_bind} AND \"entry_idx\" >= {from_bind} \
ORDER BY \"entry_idx\" ASC LIMIT {limit_bind}",
self.quoted_table_name
);
let rows = db.query(&sql, &binder.values()).await?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
let idx = row.get_u64(0)?;
let tag = u8::try_from(row.get_i64(3)?).map_err(|_| DbError::TypeMismatch {
col: 3,
expected: "u8 tag",
})?;
let decoded = if tag == TAG_EXPUNGED {
let bytes: [u8; 32] =
row.get_blob(5)?
.try_into()
.map_err(|_| DbError::TypeMismatch {
col: 5,
expected: "32-byte hash",
})?;
DecodedEntry::Expunged(blake3::Hash::from_bytes(bytes))
} else {
let op = Op::from_index_parts(
tag,
row.get_optional_blob(4)?.unwrap_or_default(),
row.get_optional_blob(5)?.unwrap_or_default(),
)?;
DecodedEntry::LogEntry(LogEntry {
server_user_id: row.get_optional_uuid(1)?,
timestamp: Timestamp::from_raw(row.get_u64(2)?),
op,
})
};
out.push((idx, decoded));
}
Ok(out)
}
}