use std::path::{Path, PathBuf};
use std::sync::Mutex;
use rusqlite::{Connection, OptionalExtension, params};
use crate::cursor::CursorStore;
use crate::error::IndexerError;
pub struct SqliteCursorStore {
path: PathBuf,
conn: Mutex<Connection>,
}
impl std::fmt::Debug for SqliteCursorStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SqliteCursorStore")
.field("path", &self.path)
.finish_non_exhaustive()
}
}
impl SqliteCursorStore {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, IndexerError> {
let path = path.as_ref().to_path_buf();
let conn = Connection::open(&path)
.map_err(|e| IndexerError::Cursor(format!("open sqlite {}: {e}", path.display())))?;
conn.pragma_update(None, "journal_mode", "WAL")
.map_err(|e| IndexerError::Cursor(format!("pragma journal_mode: {e}")))?;
conn.pragma_update(None, "synchronous", "NORMAL")
.map_err(|e| IndexerError::Cursor(format!("pragma synchronous: {e}")))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS cursors (
subscription_id TEXT PRIMARY KEY NOT NULL,
seq INTEGER NOT NULL
)",
[],
)
.map_err(|e| IndexerError::Cursor(format!("create cursors table: {e}")))?;
Ok(Self {
path,
conn: Mutex::new(conn),
})
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
}
impl CursorStore for SqliteCursorStore {
async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError> {
let conn = self
.conn
.lock()
.map_err(|e| IndexerError::Cursor(format!("sqlite mutex poisoned: {e}")))?;
let seq: Option<i64> = conn
.query_row(
"SELECT seq FROM cursors WHERE subscription_id = ?1",
params![subscription_id],
|row| row.get(0),
)
.optional()
.map_err(|e| IndexerError::Cursor(format!("select cursor: {e}")))?;
match seq {
Some(v) if v < 0 => Err(IndexerError::Cursor(format!(
"negative cursor in sqlite for {subscription_id}: {v}"
))),
#[allow(clippy::cast_sign_loss)]
Some(v) => Ok(Some(v as u64)),
None => Ok(None),
}
}
async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError> {
if seq > i64::MAX as u64 {
return Err(IndexerError::Cursor(format!(
"cursor seq {seq} exceeds sqlite i64 range"
)));
}
let conn = self
.conn
.lock()
.map_err(|e| IndexerError::Cursor(format!("sqlite mutex poisoned: {e}")))?;
#[allow(clippy::cast_possible_wrap)]
conn.execute(
"INSERT INTO cursors (subscription_id, seq) VALUES (?1, ?2)
ON CONFLICT(subscription_id) DO UPDATE SET seq = excluded.seq",
params![subscription_id, seq as i64],
)
.map_err(|e| IndexerError::Cursor(format!("upsert cursor: {e}")))?;
Ok(())
}
async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
let conn = self
.conn
.lock()
.map_err(|e| IndexerError::Cursor(format!("sqlite mutex poisoned: {e}")))?;
let mut stmt = conn
.prepare("SELECT subscription_id, seq FROM cursors ORDER BY subscription_id")
.map_err(|e| IndexerError::Cursor(format!("prepare list: {e}")))?;
let rows = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
})
.map_err(|e| IndexerError::Cursor(format!("query list: {e}")))?;
let mut out = Vec::new();
for r in rows {
let (id, seq) = r.map_err(|e| IndexerError::Cursor(format!("row: {e}")))?;
if seq < 0 {
return Err(IndexerError::Cursor(format!(
"negative cursor in sqlite for {id}: {seq}"
)));
}
#[allow(clippy::cast_sign_loss)]
out.push((id, seq as u64));
}
Ok(out)
}
}