use ave_actors_store::{
Error, StoreOperation,
config::{MachineSpec, resolve_spec},
database::{Collection, DbManager, State},
};
use rusqlite::{Connection, Error as SqliteError, OpenFlags, params};
use tracing::{debug, error, info};
use std::{
collections::VecDeque,
path::PathBuf,
sync::{Arc, Mutex},
};
use std::{fs, path::Path};
type EntryIterator = Box<dyn Iterator<Item = Result<(String, Vec<u8>), Error>>>;
const ITER_CHUNK_SIZE: usize = 1_000;
#[derive(Clone)]
pub struct SqliteManager {
path: Arc<PathBuf>,
durability: bool,
tuning: SqliteTuning,
admin_conn: Arc<Mutex<Connection>>,
}
impl SqliteManager {
fn validate_identifier(identifier: &str) -> Result<(), Error> {
let mut chars = identifier.chars();
let Some(first) = chars.next() else {
return Err(Error::CreateStore {
reason: "invalid SQLite identifier: empty".to_owned(),
});
};
let valid_start = first == '_' || first.is_ascii_alphabetic();
let valid_rest =
chars.all(|ch| ch == '_' || ch.is_ascii_alphanumeric());
if valid_start && valid_rest {
return Ok(());
}
Err(Error::CreateStore {
reason: format!(
"invalid SQLite identifier '{identifier}': allowed pattern is [A-Za-z_][A-Za-z0-9_]*"
),
})
}
fn open_managed_connection(&self) -> Result<Connection, Error> {
open_with_tuning(self.path.as_ref(), self.durability, self.tuning)
}
fn create_store_handle(
&self,
identifier: &str,
prefix: &str,
) -> Result<SqliteCollection, Error> {
let read_conn = self.open_managed_connection()?;
let write_conn = self.open_managed_connection()?;
Ok(SqliteCollection::new(
read_conn, write_conn, identifier, prefix,
))
}
pub fn new(
path: &PathBuf,
durability: bool,
spec: Option<MachineSpec>,
) -> Result<Self, Error> {
info!("Creating SQLite database manager");
if !Path::new(&path).exists() {
debug!("Path does not exist, creating it");
fs::create_dir_all(path).map_err(|e| {
error!(path = %path.display(), error = %e, "Failed to create SQLite directory");
Error::CreateStore {
reason: format!(
"fail SQLite create directory: {}",
e
),
}
})?;
}
let path = path.join("database.db");
let spec = resolve_spec(spec);
let tuning = tuning_for_ram(spec.ram_mb);
info!(
"SQLite tuning: ram_mb={}, cpu_cores={}",
spec.ram_mb, spec.cpu_cores
);
debug!("Opening SQLite connection");
let conn = open_with_tuning(&path, durability, tuning).map_err(|e| {
error!(path = %path.display(), error = %e, "Failed to open SQLite connection");
Error::CreateStore { reason: format!("fail SQLite open connection: {}", e) }
})?;
debug!("SQLite database manager created successfully");
Ok(Self {
path: Arc::new(path),
durability,
tuning,
admin_conn: Arc::new(Mutex::new(conn)),
})
}
}
impl DbManager<SqliteCollection, SqliteCollection> for SqliteManager {
fn create_state(
&self,
identifier: &str,
prefix: &str,
) -> Result<SqliteCollection, Error> {
Self::validate_identifier(identifier)?;
let stmt = format!(
"CREATE TABLE IF NOT EXISTS {} (prefix TEXT NOT NULL, value \
BLOB NOT NULL, PRIMARY KEY (prefix))",
identifier
);
{
let conn = self.admin_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for state creation");
Error::Store {
operation: StoreOperation::LockConnection,
reason: format!("{}", e),
}
})?;
conn.execute(stmt.as_str(), ()).map_err(|e| {
error!(table = identifier, error = %e, "Failed to create state table");
Error::CreateStore { reason: format!("fail SQLite create table: {}", e) }
})?;
}
debug!(table = identifier, prefix = prefix, "State table created");
self.create_store_handle(identifier, prefix)
}
fn create_collection(
&self,
identifier: &str,
prefix: &str,
) -> Result<SqliteCollection, Error> {
Self::validate_identifier(identifier)?;
let stmt = format!(
"CREATE TABLE IF NOT EXISTS {} (prefix TEXT NOT NULL, sn TEXT NOT NULL, value \
BLOB NOT NULL, PRIMARY KEY (prefix, sn))",
identifier
);
{
let conn = self.admin_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for collection creation");
Error::Store {
operation: StoreOperation::LockConnection,
reason: format!("{}", e),
}
})?;
conn.execute(stmt.as_str(), ()).map_err(|e| {
error!(table = identifier, error = %e, "Failed to create collection table");
Error::CreateStore { reason: format!("fail SQLite create table: {}", e) }
})?;
}
debug!(
table = identifier,
prefix = prefix,
"Collection table created"
);
self.create_store_handle(identifier, prefix)
}
fn stop(&mut self) -> Result<(), Error> {
debug!("Stopping SQLite manager, flushing WAL");
let conn = self.admin_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock on stop");
Error::Store {
operation: StoreOperation::LockConnection,
reason: format!("{}", e),
}
})?;
conn.execute_batch("PRAGMA optimize; PRAGMA wal_checkpoint(TRUNCATE);")
.map_err(|e| {
error!(error = %e, "Failed to checkpoint WAL on stop");
Error::Store {
operation: StoreOperation::WalCheckpoint,
reason: format!("{}", e),
}
})?;
drop(conn);
debug!("SQLite WAL checkpoint complete");
Ok(())
}
}
pub struct SqliteCollection {
read_conn: Arc<Mutex<Connection>>,
write_conn: Arc<Mutex<Connection>>,
table: String,
prefix: String,
}
impl SqliteCollection {
pub fn new(
read_conn: Connection,
write_conn: Connection,
table: &str,
prefix: &str,
) -> Self {
Self {
read_conn: Arc::new(Mutex::new(read_conn)),
write_conn: Arc::new(Mutex::new(write_conn)),
table: table.to_owned(),
prefix: prefix.to_owned(),
}
}
fn make_iter(&self, reverse: bool) -> EntryIterator {
Box::new(SqliteChunkedIterator::new(
self.read_conn.clone(),
self.table.clone(),
self.prefix.clone(),
reverse,
))
}
fn state_key(&self) -> String {
self.prefix.clone()
}
fn collection_key(&self, key: &str) -> String {
format!("{}.{}", self.prefix, key)
}
fn map_get_error(&self, error: SqliteError, key: String) -> Error {
match error {
SqliteError::QueryReturnedNoRows => Error::EntryNotFound { key },
other => Error::Get {
key,
reason: format!("{}", other),
},
}
}
}
struct SqliteChunkedIterator {
conn: Arc<Mutex<Connection>>,
table: String,
prefix: String,
reverse: bool,
buffer: VecDeque<(String, Vec<u8>)>,
last_key: Option<String>,
exhausted: bool,
}
impl SqliteChunkedIterator {
const fn new(
conn: Arc<Mutex<Connection>>,
table: String,
prefix: String,
reverse: bool,
) -> Self {
Self {
conn,
table,
prefix,
reverse,
buffer: VecDeque::new(),
last_key: None,
exhausted: false,
}
}
fn fetch_chunk(&mut self) -> Result<(), Error> {
let conn = self.conn.lock().map_err(|e| {
error!(table = %self.table, error = %e, "Failed to acquire lock for chunk fetch");
Error::Store {
operation: StoreOperation::LockConnection,
reason: format!("{}", e),
}
})?;
let order = if self.reverse { "DESC" } else { "ASC" };
let cmp = if self.reverse { "<" } else { ">" };
let rows: Vec<(String, Vec<u8>)> = match &self.last_key {
None => {
let q = format!(
"SELECT sn, value FROM {} WHERE prefix = ?1 ORDER BY sn {} LIMIT {}",
self.table, order, ITER_CHUNK_SIZE
);
conn.prepare(&q).and_then(|mut s| {
s.query_map(params![self.prefix], |r| {
Ok((r.get(0)?, r.get(1)?))
})
.and_then(|rows| rows.collect())
})
.map_err(|e| {
error!(table = %self.table, error = %e, "Failed to fetch first chunk from DB");
Error::Get {
key: self.prefix.clone(),
reason: format!("{}", e),
}
})?
}
Some(last) => {
let q = format!(
"SELECT sn, value FROM {} WHERE prefix = ?1 AND sn {} ?2 ORDER BY sn {} LIMIT {}",
self.table, cmp, order, ITER_CHUNK_SIZE
);
let last = last.clone();
conn.prepare(&q).and_then(|mut s| {
s.query_map(params![self.prefix, last], |r| {
Ok((r.get(0)?, r.get(1)?))
})
.and_then(|rows| rows.collect())
})
.map_err(|e| {
error!(table = %self.table, error = %e, "Failed to fetch next chunk from DB");
Error::Get {
key: self.prefix.clone(),
reason: format!("{}", e),
}
})?
}
};
if rows.is_empty() {
self.exhausted = true;
} else {
self.last_key = rows.last().map(|(k, _)| k.clone());
self.buffer.extend(rows);
}
Ok(())
}
}
impl Iterator for SqliteChunkedIterator {
type Item = Result<(String, Vec<u8>), Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.buffer.is_empty()
&& !self.exhausted
&& let Err(error) = self.fetch_chunk()
{
self.exhausted = true;
return Some(Err(error));
}
self.buffer.pop_front().map(Ok)
}
}
impl State for SqliteCollection {
fn get(&self) -> Result<Vec<u8>, Error> {
let query =
format!("SELECT value FROM {} WHERE prefix = ?1", &self.table);
let key = self.state_key();
let row: Vec<u8> = self.read_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for state get");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?.query_row(&query, params![self.prefix], |row| row.get(0))
.map_err(|e| self.map_get_error(e, key))?;
Ok(row)
}
fn put(&mut self, data: &[u8]) -> Result<(), Error> {
let stmt = format!(
"INSERT OR REPLACE INTO {} (prefix, value) VALUES (?1, ?2)",
&self.table
);
self.write_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for state put");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?.execute(&stmt, params![self.prefix, data])
.map_err(|e| {
error!(table = %self.table, error = %e, "Failed to put state");
Error::Store {
operation: StoreOperation::Insert,
reason: format!("{}", e),
}
})?;
Ok(())
}
fn del(&mut self) -> Result<(), Error> {
let stmt = format!("DELETE FROM {} WHERE prefix = ?1", &self.table);
let affected_rows = self.write_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for state delete");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?.execute(&stmt, params![self.prefix,])
.map_err(|e| {
error!(table = %self.table, error = %e, "Failed to delete state");
Error::Store {
operation: StoreOperation::Delete,
reason: format!("{}", e),
}
})?;
if affected_rows == 0 {
return Err(Error::EntryNotFound {
key: self.state_key(),
});
}
Ok(())
}
fn purge(&mut self) -> Result<(), Error> {
let stmt = format!("DELETE FROM {} WHERE prefix = ?1", &self.table);
self.write_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for state purge");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?.execute(&stmt, params![self.prefix])
.map_err(|e| {
error!(table = %self.table, error = %e, "Failed to purge state");
Error::Store {
operation: StoreOperation::Purge,
reason: format!("{}", e),
}
})?;
debug!(table = %self.table, "State purged");
Ok(())
}
fn name(&self) -> &str {
self.table.as_str()
}
}
impl Collection for SqliteCollection {
fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
let query = format!(
"SELECT value FROM {} WHERE prefix = ?1 AND sn = ?2",
&self.table
);
let collection_key = self.collection_key(key);
let row: Vec<u8> = self.read_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for collection get");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?.query_row(&query, params![self.prefix, key], |row| row.get(0))
.map_err(|e| self.map_get_error(e, collection_key))?;
Ok(row)
}
fn put(&mut self, key: &str, data: &[u8]) -> Result<(), Error> {
let stmt = format!(
"INSERT OR REPLACE INTO {} (prefix, sn, value) VALUES (?1, ?2, ?3)",
&self.table
);
self.write_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for collection put");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?.execute(&stmt, params![self.prefix, key, data])
.map_err(|e| {
error!(table = %self.table, key = key, error = %e, "Failed to put collection entry");
Error::Store {
operation: StoreOperation::Insert,
reason: format!("{}", e),
}
})?;
Ok(())
}
fn del(&mut self, key: &str) -> Result<(), Error> {
let stmt = format!(
"DELETE FROM {} WHERE prefix = ?1 AND sn = ?2",
&self.table
);
let affected_rows = self.write_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for collection delete");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?.execute(&stmt, params![self.prefix, key])
.map_err(|e| {
error!(table = %self.table, key = key, error = %e, "Failed to delete collection entry");
Error::Store {
operation: StoreOperation::Delete,
reason: format!("{}", e),
}
})?;
if affected_rows == 0 {
return Err(Error::EntryNotFound {
key: self.collection_key(key),
});
}
Ok(())
}
fn purge(&mut self) -> Result<(), Error> {
let stmt = format!("DELETE FROM {} WHERE prefix = ?1", &self.table);
self.write_conn.lock().map_err(|e| {
error!(error = %e, "Failed to acquire connection lock for collection purge");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?.execute(&stmt, params![self.prefix])
.map_err(|e| {
error!(table = %self.table, error = %e, "Failed to purge collection");
Error::Store {
operation: StoreOperation::Purge,
reason: format!("{}", e),
}
})?;
debug!(table = %self.table, "Collection purged");
Ok(())
}
fn last(&self) -> Result<Option<(String, Vec<u8>)>, Error> {
let mut iter = self.iter(true)?;
iter.next().transpose()
}
fn iter<'a>(
&'a self,
reverse: bool,
) -> Result<
Box<dyn Iterator<Item = Result<(String, Vec<u8>), Error>> + 'a>,
Error,
> {
Ok(self.make_iter(reverse))
}
fn name(&self) -> &str {
self.table.as_str()
}
}
fn open_with_tuning<P: AsRef<Path>>(
path: P,
durability: bool,
tuning: SqliteTuning,
) -> Result<Connection, Error> {
let path = path.as_ref();
debug!(path = %path.display(), "Opening SQLite database");
let flags =
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE;
let conn = Connection::open_with_flags(path, flags).map_err(|e| {
error!(path = %path.display(), error = %e, "Failed to open SQLite database");
Error::Store {
operation: StoreOperation::OpenConnection,
reason: format!("{}", e),
}
})?;
let sync_mode = if durability { "FULL" } else { "NORMAL" };
conn.execute_batch(
format!(
"
PRAGMA journal_mode=WAL;
PRAGMA busy_timeout=5000;
PRAGMA synchronous={};
PRAGMA wal_autocheckpoint={}; -- pages
PRAGMA journal_size_limit={}; -- bytes
PRAGMA temp_store=MEMORY;
PRAGMA cache_size={}; -- negative = KB
PRAGMA mmap_size={}; -- bytes
PRAGMA optimize=0x10002; -- analyze + run on open (cheap)
",
sync_mode,
tuning.wal_autocheckpoint_pages,
tuning.journal_size_limit_bytes,
tuning.cache_size_kb,
tuning.mmap_size_bytes,
)
.as_str(),
)
.map_err(|e| {
error!(error = %e, "Failed to execute SQLite PRAGMA statements");
Error::Store {
operation: StoreOperation::ExecuteBatch,
reason: format!("{}", e),
}
})?;
debug!("SQLite database opened and configured successfully");
Ok(conn)
}
fn tuning_for_ram(ram_mb: u64) -> SqliteTuning {
let cache_mb = (ram_mb * 2 / 100).clamp(8, 1024);
let cache_size_kb = -(cache_mb as i64 * 1024);
let mmap_size_bytes = (cache_mb as i64 / 2).min(128) * 1024 * 1024;
let wal_autocheckpoint_pages = (cache_mb as i64 * 128).clamp(1_000, 8_000);
let journal_size_limit_bytes = (wal_autocheckpoint_pages * 4096 * 3)
.clamp(32 * 1024 * 1024, 256 * 1024 * 1024);
SqliteTuning {
wal_autocheckpoint_pages,
journal_size_limit_bytes,
cache_size_kb,
mmap_size_bytes,
}
}
#[derive(Clone, Copy)]
struct SqliteTuning {
wal_autocheckpoint_pages: i64,
journal_size_limit_bytes: i64,
cache_size_kb: i64,
mmap_size_bytes: i64,
}
#[cfg(test)]
mod tests {
pub fn create_temp_dir() -> String {
let path = temp_dir();
if fs::metadata(&path).is_err() {
fs::create_dir_all(&path).unwrap();
}
path
}
fn temp_dir() -> String {
let dir =
tempfile::tempdir().expect("Can not create temporal directory.");
dir.path().to_str().unwrap().to_owned()
}
impl Default for SqliteManager {
fn default() -> Self {
let path = PathBuf::from(create_temp_dir());
SqliteManager::new(&path, false, None)
.expect("Cannot create the database")
}
}
use super::*;
use ave_actors_store::{
database::{Collection, DbManager},
test_store_trait,
};
test_store_trait! {
unit_test_sqlite_manager:SqliteManager:SqliteCollection
}
}