use crate::BitemporalError;
use rusqlite::{params, Connection};
use std::path::Path;
#[derive(Debug)]
pub struct SqliteDb {
conn: Connection,
}
impl SqliteDb {
pub fn open(path: impl AsRef<Path>) -> Result<Self, BitemporalError> {
let conn = Connection::open(path)
.map_err(|e| BitemporalError::DatabaseError(format!("failed to open sqlite db: {e}")))?;
let db = Self { conn };
db.migrate()?;
Ok(db)
}
pub fn open_in_memory() -> Result<Self, BitemporalError> {
let conn = Connection::open_in_memory().map_err(|e| {
BitemporalError::DatabaseError(format!("failed to open in-memory sqlite db: {e}"))
})?;
let db = Self { conn };
db.migrate()?;
Ok(db)
}
fn migrate(&self) -> Result<(), BitemporalError> {
self.conn
.execute_batch(
"CREATE TABLE IF NOT EXISTS bitemporal_records (
record_id TEXT NOT NULL,
valid_time INTEGER NOT NULL,
recorded_time INTEGER NOT NULL,
superseded_by TEXT,
value_json BLOB NOT NULL,
PRIMARY KEY (record_id, valid_time, recorded_time)
);
CREATE INDEX IF NOT EXISTS idx_bt_recorded ON bitemporal_records(recorded_time);
CREATE INDEX IF NOT EXISTS idx_bt_superseded ON bitemporal_records(superseded_by);",
)
.map_err(|e| BitemporalError::DatabaseError(format!("migration failed: {e}")))?;
Ok(())
}
pub fn insert(
&self,
record: crate::BitemporalRecord<serde_json::Value>,
) -> Result<usize, BitemporalError> {
let value_bytes = serde_json::to_vec(&record.value).map_err(|e| {
BitemporalError::SerializationError(format!("value serialization failed: {e}"))
})?;
let tx = self
.conn
.unchecked_transaction()
.map_err(|e| BitemporalError::DatabaseError(format!("failed to begin tx: {e}")))?;
let superseded_count = tx
.execute(
"UPDATE bitemporal_records
SET superseded_by = ?1
WHERE record_id = ?2
AND superseded_by IS NULL",
params![record.id, record.id],
)
.map_err(|e| {
BitemporalError::DatabaseError(format!("supersession update failed: {e}"))
})?;
tx.execute(
"INSERT INTO bitemporal_records
(record_id, valid_time, recorded_time, superseded_by, value_json)
VALUES (?1, ?2, ?3, NULL, ?4)",
params![
record.id,
record.valid_time.timestamp(),
record.recorded_time.timestamp(),
value_bytes,
],
)
.map_err(|e| BitemporalError::DatabaseError(format!("insert failed: {e}")))?;
tx.commit()
.map_err(|e| BitemporalError::DatabaseError(format!("commit failed: {e}")))?;
Ok(superseded_count)
}
pub fn snapshot_at(
&self,
recorded_time: chrono::DateTime<chrono::Utc>,
) -> Result<Vec<crate::BitemporalRecord<serde_json::Value>>, BitemporalError> {
let mut stmt = self
.conn
.prepare(
"SELECT record_id, valid_time, recorded_time, value_json
FROM bitemporal_records
WHERE recorded_time <= ?1
ORDER BY record_id, recorded_time DESC",
)
.map_err(|e| BitemporalError::DatabaseError(format!("snapshot prepare failed: {e}")))?;
let rows = stmt
.query_map(params![recorded_time.timestamp()], |row| {
let id: String = row.get(0)?;
let valid_ts: i64 = row.get(1)?;
let recorded_ts: i64 = row.get(2)?;
let value_bytes: Vec<u8> = row.get(3)?;
let value: serde_json::Value = serde_json::from_slice(&value_bytes).map_err(
|e| {
rusqlite::Error::FromSqlConversionFailure(
3,
rusqlite::types::Type::Blob,
Box::new(e),
)
},
)?;
let valid_time = chrono::DateTime::<chrono::Utc>::from_timestamp(valid_ts, 0)
.ok_or_else(|| rusqlite::Error::InvalidQuery)?;
let recorded_time =
chrono::DateTime::<chrono::Utc>::from_timestamp(recorded_ts, 0)
.ok_or_else(|| rusqlite::Error::InvalidQuery)?;
Ok(crate::BitemporalRecord {
id,
valid_time,
recorded_time,
value,
})
})
.map_err(|e| BitemporalError::DatabaseError(format!("snapshot query failed: {e}")))?;
let mut by_id: std::collections::BTreeMap<String, crate::BitemporalRecord<serde_json::Value>> =
std::collections::BTreeMap::new();
for row in rows {
let r = row.map_err(|e| BitemporalError::DatabaseError(format!("row decode: {e}")))?;
by_id
.entry(r.id.clone())
.and_modify(|existing| {
if r.recorded_time > existing.recorded_time {
*existing = r.clone();
}
})
.or_insert(r);
}
Ok(by_id.into_values().collect())
}
}