use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use turso_core::{Connection, Database, IO, MemoryIO, OpenFlags, StepResult, Value};
use super::vfs_io::BashkitVfsIO;
#[derive(Debug, Clone, Copy)]
pub(super) struct Deadline {
pub deadline: Option<Instant>,
}
impl Deadline {
pub(super) fn new(max_duration: std::time::Duration) -> Self {
let deadline = if max_duration.is_zero() {
None
} else {
Some(Instant::now() + max_duration)
};
Self { deadline }
}
pub(super) fn expired(&self) -> bool {
self.deadline.map(|d| Instant::now() >= d).unwrap_or(false)
}
}
pub(super) type EngineResult<T> = std::result::Result<T, String>;
fn unique_memory_path() -> String {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
format!(":memory:bashkit-{n}")
}
pub(super) enum Backend {
Memory(Arc<MemoryIO>),
Vfs(Arc<BashkitVfsIO>),
}
#[derive(Debug, Default)]
pub(super) struct StatementOutcome {
pub columns: Vec<String>,
pub rows: Vec<Vec<Value>>,
pub changes: i64,
}
pub(super) struct SqliteEngine {
backend: Backend,
_db: Arc<Database>,
conn: Arc<Connection>,
memory_path: Option<String>,
}
impl SqliteEngine {
pub(super) fn open_memory(initial_bytes: Option<&[u8]>) -> EngineResult<Self> {
let io: Arc<MemoryIO> = Arc::new(MemoryIO::new());
let path = unique_memory_path();
if let Some(bytes) = initial_bytes
&& !bytes.is_empty()
{
seed_memory_io(&io, &path, bytes).map_err(turso_msg)?;
}
let io_dyn: Arc<dyn IO> = io.clone();
let db = Database::open_file(io_dyn, &path).map_err(turso_msg)?;
let conn = db.connect().map_err(turso_msg)?;
Ok(Self {
backend: Backend::Memory(io),
_db: db,
conn,
memory_path: Some(path),
})
}
pub(super) fn open_pure_memory() -> EngineResult<Self> {
let io: Arc<MemoryIO> = Arc::new(MemoryIO::new());
let io_dyn: Arc<dyn IO> = io.clone();
let db = Database::open_file(io_dyn, ":memory:").map_err(turso_msg)?;
let conn = db.connect().map_err(turso_msg)?;
Ok(Self {
backend: Backend::Memory(io),
_db: db,
conn,
memory_path: None,
})
}
pub(super) fn open_vfs(io: Arc<BashkitVfsIO>, path_in_io: &str) -> EngineResult<Self> {
let io_dyn: Arc<dyn IO> = io.clone();
let db = Database::open_file(io_dyn, path_in_io).map_err(turso_msg)?;
let conn = db.connect().map_err(turso_msg)?;
Ok(Self {
backend: Backend::Vfs(io),
_db: db,
conn,
memory_path: None,
})
}
pub(super) fn execute(&self, sql: &str, deadline: Deadline) -> EngineResult<StatementOutcome> {
let mut stmt = self.conn.prepare(sql).map_err(turso_msg)?;
let mut outcome = StatementOutcome::default();
for idx in 0..stmt.num_columns() {
outcome.columns.push(stmt.get_column_name(idx).to_string());
}
loop {
if deadline.expired() {
stmt.interrupt();
return Err("query timed out".to_string());
}
match stmt.step().map_err(turso_msg)? {
StepResult::Row => {
if let Some(row) = stmt.row() {
let values: Vec<Value> = (0..stmt.num_columns())
.map(|idx| row.get_value(idx).clone())
.collect();
outcome.rows.push(values);
}
}
StepResult::Done => break,
StepResult::IO => {
self.io_step()?;
}
StepResult::Busy | StepResult::Interrupt => {
return Err("query was interrupted or database is busy".to_string());
}
}
}
outcome.changes = self.conn.changes();
Ok(outcome)
}
fn io_step(&self) -> EngineResult<()> {
match &self.backend {
Backend::Memory(io) => io.step().map_err(turso_msg),
Backend::Vfs(io) => io.step().map_err(turso_msg),
}
}
pub(super) fn snapshot_bytes(&self) -> Option<Vec<u8>> {
let Backend::Memory(io) = &self.backend else {
return None;
};
let path = self.memory_path.as_deref()?;
let _ = self.conn.checkpoint(turso_core::CheckpointMode::Truncate {
upper_bound_inclusive: None,
});
let file = io.open_file(path, OpenFlags::None, false).ok()?;
let size = file.size().ok()? as usize;
if size == 0 {
return Some(Vec::new());
}
Some(read_all(&file, size))
}
pub(super) async fn flush_dirty(&self) -> EngineResult<usize> {
match &self.backend {
Backend::Memory(_) => Ok(0),
Backend::Vfs(io) => io.flush_dirty().await,
}
}
pub(super) fn close(&self) {
let _ = self.conn.close();
}
}
impl Drop for SqliteEngine {
fn drop(&mut self) {
self.close();
}
}
fn read_all(file: &Arc<dyn turso_core::File>, size: usize) -> Vec<u8> {
use turso_core::{Buffer, Completion};
let mut out = vec![0u8; size];
let chunk_size: usize = 4096;
let mut pos = 0usize;
while pos < size {
let remaining = size - pos;
let take = remaining.min(chunk_size);
let chunk = Arc::new(Buffer::new(vec![0u8; take]));
let completion = Completion::new_read(chunk.clone(), |_res| None);
let _ = file.pread(pos as u64, completion);
out[pos..pos + take].copy_from_slice(&chunk.as_slice()[..take]);
pos += take;
}
out
}
fn seed_memory_io(
io: &Arc<MemoryIO>,
path: &str,
bytes: &[u8],
) -> std::result::Result<(), turso_core::LimboError> {
use turso_core::{Buffer, Completion, OpenFlags};
let file = io.open_file(path, OpenFlags::Create, false)?;
if bytes.is_empty() {
return Ok(());
}
let buf = Arc::new(Buffer::new(bytes.to_vec()));
let completion = Completion::new_write(|_| {});
let _completion = file.pwrite(0, buf, completion)?;
Ok(())
}
fn turso_msg(e: turso_core::LimboError) -> String {
e.to_string()
}