use azoth_core::{
error::{AzothError, Result},
traits::ProjectionStore,
types::EventId,
ProjectionConfig,
};
use parking_lot::Mutex;
use rusqlite::{Connection, OpenFlags};
use std::path::Path;
use std::sync::Arc;
use crate::read_pool::SqliteReadPool;
use crate::schema;
use crate::txn::SimpleProjectionTxn;
pub struct SqliteProjectionStore {
write_conn: Arc<Mutex<Connection>>,
read_conn: Arc<Mutex<Connection>>,
config: ProjectionConfig,
read_pool: Option<Arc<SqliteReadPool>>,
}
impl SqliteProjectionStore {
pub fn conn(&self) -> &Arc<Mutex<Connection>> {
&self.write_conn
}
pub fn write_conn(&self) -> &Arc<Mutex<Connection>> {
&self.write_conn
}
fn open_read_connection(path: &Path, cfg: &ProjectionConfig) -> Result<Connection> {
let conn = Connection::open_with_flags(
path,
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
conn.pragma_update(None, "cache_size", cfg.cache_size)
.map_err(|e| AzothError::Config(e.to_string()))?;
Ok(conn)
}
fn init_schema(conn: &Connection) -> Result<()> {
conn.execute(
"CREATE TABLE IF NOT EXISTS projection_meta (
id INTEGER PRIMARY KEY CHECK (id = 0),
last_applied_event_id INTEGER NOT NULL DEFAULT -1,
schema_version INTEGER NOT NULL,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)",
[],
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
conn.execute(
"INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
VALUES (0, -1, 0)",
[],
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(())
}
fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
if cfg.wal_mode {
conn.pragma_update(None, "journal_mode", "WAL")
.map_err(|e| AzothError::Config(e.to_string()))?;
}
let sync_mode = match cfg.synchronous {
azoth_core::config::SynchronousMode::Full => "FULL",
azoth_core::config::SynchronousMode::Normal => "NORMAL",
azoth_core::config::SynchronousMode::Off => "OFF",
};
conn.pragma_update(None, "synchronous", sync_mode)
.map_err(|e| AzothError::Config(e.to_string()))?;
conn.pragma_update(None, "foreign_keys", "ON")
.map_err(|e| AzothError::Config(e.to_string()))?;
conn.pragma_update(None, "cache_size", cfg.cache_size)
.map_err(|e| AzothError::Config(e.to_string()))?;
Ok(())
}
pub async fn query_async<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(&Connection) -> Result<R> + Send + 'static,
R: Send + 'static,
{
if let Some(pool) = &self.read_pool {
let pool = Arc::clone(pool);
return tokio::task::spawn_blocking(move || {
let conn = pool.acquire_blocking()?;
f(conn.connection())
})
.await
.map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?;
}
let conn = self.read_conn.clone();
tokio::task::spawn_blocking(move || {
let conn_guard = conn.lock();
f(&conn_guard)
})
.await
.map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
}
pub fn query<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(&Connection) -> Result<R>,
{
if let Some(pool) = &self.read_pool {
let conn = pool.acquire_blocking()?;
return f(conn.connection());
}
let conn_guard = self.read_conn.lock();
f(&conn_guard)
}
pub async fn execute_async<F>(&self, f: F) -> Result<()>
where
F: FnOnce(&Connection) -> Result<()> + Send + 'static,
{
let conn = self.write_conn.clone();
tokio::task::spawn_blocking(move || {
let conn_guard = conn.lock();
f(&conn_guard)
})
.await
.map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
}
pub fn execute<F>(&self, f: F) -> Result<()>
where
F: FnOnce(&Connection) -> Result<()>,
{
let conn_guard = self.write_conn.lock();
f(&conn_guard)
}
pub fn transaction<F>(&self, f: F) -> Result<()>
where
F: FnOnce(&rusqlite::Transaction) -> Result<()>,
{
let mut conn_guard = self.write_conn.lock();
let tx = conn_guard
.transaction()
.map_err(|e| AzothError::Projection(e.to_string()))?;
f(&tx)?;
tx.commit()
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(())
}
pub async fn transaction_async<F>(&self, f: F) -> Result<()>
where
F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
{
let conn = self.write_conn.clone();
tokio::task::spawn_blocking(move || {
let mut conn_guard = conn.lock();
let tx = conn_guard
.transaction()
.map_err(|e| AzothError::Projection(e.to_string()))?;
f(&tx)?;
tx.commit()
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(())
})
.await
.map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
}
pub fn read_pool(&self) -> Option<&Arc<SqliteReadPool>> {
self.read_pool.as_ref()
}
pub fn has_read_pool(&self) -> bool {
self.read_pool.is_some()
}
pub fn db_path(&self) -> &Path {
&self.config.path
}
}
impl ProjectionStore for SqliteProjectionStore {
type Txn<'a> = SimpleProjectionTxn<'a>;
fn open(cfg: ProjectionConfig) -> Result<Self> {
if let Some(parent) = cfg.path.parent() {
std::fs::create_dir_all(parent)?;
}
let write_conn = Connection::open_with_flags(
&cfg.path,
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
Self::configure_connection(&write_conn, &cfg)?;
Self::init_schema(&write_conn)?;
let read_conn = Self::open_read_connection(&cfg.path, &cfg)?;
let read_pool = if cfg.read_pool.enabled {
Some(Arc::new(SqliteReadPool::new(
&cfg.path,
cfg.read_pool.clone(),
)?))
} else {
None
};
Ok(Self {
write_conn: Arc::new(Mutex::new(write_conn)),
read_conn: Arc::new(Mutex::new(read_conn)),
config: cfg,
read_pool,
})
}
fn close(&self) -> Result<()> {
Ok(())
}
fn begin_txn(&self) -> Result<Self::Txn<'_>> {
let guard = self.write_conn.lock();
SimpleProjectionTxn::new(guard)
}
fn get_cursor(&self) -> Result<EventId> {
let conn = self.read_conn.lock();
let cursor: i64 = conn
.query_row(
"SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
[],
|row| row.get(0),
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(cursor as EventId)
}
fn migrate(&self, target_version: u32) -> Result<()> {
let conn = self.write_conn.lock();
schema::migrate(&conn, target_version)
}
fn backup_to(&self, path: &Path) -> Result<()> {
{
let conn = self.write_conn.lock();
let mut stmt = conn
.prepare("PRAGMA wal_checkpoint(RESTART)")
.map_err(|e| AzothError::Projection(e.to_string()))?;
let mut rows = stmt
.query([])
.map_err(|e| AzothError::Projection(e.to_string()))?;
while let Ok(Some(_)) = rows.next() {}
}
let src_path = &self.config.path;
std::fs::copy(src_path, path)?;
Ok(())
}
fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
std::fs::copy(path, &cfg.path)?;
Self::open(cfg)
}
fn schema_version(&self) -> Result<u32> {
let conn = self.read_conn.lock();
let version: i64 = conn
.query_row(
"SELECT schema_version FROM projection_meta WHERE id = 0",
[],
|row| row.get(0),
)
.map_err(|e| AzothError::Projection(e.to_string()))?;
Ok(version as u32)
}
}