use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::runtime::Handle;
use turso_core::{
Buffer, Completion, File, IO, OpenFlags,
io::FileSyncType,
io::clock::{Clock, DefaultClock, MonotonicInstant, WallClockInstant},
};
use crate::fs::FileSystem;
use super::engine::EngineResult;
pub(super) struct VfsFile {
path: PathBuf,
bytes: Mutex<Vec<u8>>,
dirty: AtomicBool,
max_file_bytes: usize,
}
impl VfsFile {
fn new(path: PathBuf, bytes: Vec<u8>, max_file_bytes: usize) -> Self {
Self {
path,
bytes: Mutex::new(bytes),
dirty: AtomicBool::new(false),
max_file_bytes,
}
}
fn cap_error(&self) -> turso_core::LimboError {
turso_core::LimboError::InternalError(format!(
"sqlite: VFS file exceeds {} bytes cap",
self.max_file_bytes
))
}
}
fn lock_bytes<'a>(m: &'a Mutex<Vec<u8>>) -> std::sync::MutexGuard<'a, Vec<u8>> {
m.lock().unwrap_or_else(|e| e.into_inner())
}
impl File for VfsFile {
fn lock_file(&self, _exclusive: bool) -> turso_core::Result<()> {
Ok(())
}
fn unlock_file(&self) -> turso_core::Result<()> {
Ok(())
}
fn pread(&self, pos: u64, c: Completion) -> turso_core::Result<Completion> {
let buf = lock_bytes(&self.bytes);
let r = c.as_read();
let read_buf = r.buf();
let read_len = read_buf.len();
if read_len == 0 {
c.complete(0);
return Ok(c);
}
let pos_usize = pos as usize;
if pos_usize >= buf.len() {
c.complete(0);
return Ok(c);
}
let take = read_len.min(buf.len() - pos_usize);
read_buf.as_mut_slice()[..take].copy_from_slice(&buf[pos_usize..pos_usize + take]);
for byte in &mut read_buf.as_mut_slice()[take..] {
*byte = 0;
}
c.complete(take as i32);
Ok(c)
}
fn pwrite(
&self,
pos: u64,
buffer: Arc<Buffer>,
c: Completion,
) -> turso_core::Result<Completion> {
let mut buf = lock_bytes(&self.bytes);
let pos_usize = usize::try_from(pos).map_err(|_| self.cap_error())?;
let needed = pos_usize
.checked_add(buffer.len())
.ok_or_else(|| self.cap_error())?;
if needed > self.max_file_bytes {
return Err(self.cap_error());
}
if needed > buf.len() {
buf.resize(needed, 0);
}
if !buffer.is_empty() {
buf[pos_usize..pos_usize + buffer.len()].copy_from_slice(buffer.as_slice());
}
self.dirty.store(true, Ordering::Release);
c.complete(buffer.len() as i32);
Ok(c)
}
fn sync(&self, c: Completion, _sync_type: FileSyncType) -> turso_core::Result<Completion> {
c.complete(0);
Ok(c)
}
fn size(&self) -> turso_core::Result<u64> {
Ok(lock_bytes(&self.bytes).len() as u64)
}
fn truncate(&self, len: u64, c: Completion) -> turso_core::Result<Completion> {
let len_usize = usize::try_from(len).map_err(|_| self.cap_error())?;
if len_usize > self.max_file_bytes {
return Err(self.cap_error());
}
let mut buf = lock_bytes(&self.bytes);
buf.resize(len_usize, 0);
self.dirty.store(true, Ordering::Release);
c.complete(0);
Ok(c)
}
}
pub(super) struct BashkitVfsIO {
fs: Arc<dyn FileSystem>,
open_files: Mutex<HashMap<String, Arc<VfsFile>>>,
handle: Handle,
max_file_bytes: usize,
}
impl BashkitVfsIO {
pub(super) fn new_with_cap(
fs: Arc<dyn FileSystem>,
handle: Handle,
max_file_bytes: usize,
) -> Arc<Self> {
Arc::new(Self {
fs,
open_files: Mutex::new(HashMap::new()),
handle,
max_file_bytes,
})
}
fn open_files_lock(&self) -> std::sync::MutexGuard<'_, HashMap<String, Arc<VfsFile>>> {
self.open_files.lock().unwrap_or_else(|e| e.into_inner())
}
pub(super) async fn flush_dirty(&self) -> EngineResult<usize> {
let to_flush: Vec<Arc<VfsFile>> = {
let map = self.open_files_lock();
map.values()
.filter(|f| f.dirty.load(Ordering::Acquire))
.cloned()
.collect()
};
let mut count = 0usize;
for file in &to_flush {
let bytes = lock_bytes(&file.bytes).clone();
if let Some(parent) = file.path.parent()
&& !parent.as_os_str().is_empty()
&& !self.fs.exists(parent).await.unwrap_or(false)
{
return Err(format!(
"parent directory does not exist: {}",
parent.display()
));
}
self.fs
.write_file(&file.path, &bytes)
.await
.map_err(|e| format!("flush failed for {}: {e}", file.path.display()))?;
file.dirty.store(false, Ordering::Release);
count += 1;
}
Ok(count)
}
}
impl Clock for BashkitVfsIO {
fn current_time_monotonic(&self) -> MonotonicInstant {
DefaultClock.current_time_monotonic()
}
fn current_time_wall_clock(&self) -> WallClockInstant {
DefaultClock.current_time_wall_clock()
}
}
impl IO for BashkitVfsIO {
fn open_file(
&self,
path: &str,
flags: OpenFlags,
_direct: bool,
) -> turso_core::Result<Arc<dyn File>> {
if let Some(existing) = self.open_files_lock().get(path).cloned() {
return Ok(existing as Arc<dyn File>);
}
let pb = PathBuf::from(path);
let cap = self.max_file_bytes;
let bytes_opt = run_async(&self.handle, {
let fs = self.fs.clone();
let pb = pb.clone();
move || async move { fs.read_file(&pb).await.ok() }
});
let bytes = match bytes_opt {
Some(b) => {
if b.len() > cap {
return Err(turso_core::LimboError::InternalError(format!(
"sqlite: VFS file exceeds {} bytes cap",
cap
)));
}
b
}
None => {
if !flags.contains(OpenFlags::Create) {
return Err(turso_core::LimboError::InternalError(format!(
"sqlite: file not found: {path}"
)));
}
Vec::new()
}
};
let file = Arc::new(VfsFile::new(pb, bytes, cap));
self.open_files_lock()
.insert(path.to_string(), file.clone());
Ok(file as Arc<dyn File>)
}
fn remove_file(&self, path: &str) -> turso_core::Result<()> {
self.open_files_lock().remove(path);
run_async(&self.handle, {
let fs = self.fs.clone();
let pb = PathBuf::from(path);
move || async move {
let _ = fs.remove(&pb, false).await;
}
});
Ok(())
}
}
fn run_async<F, Fut, R>(handle: &tokio::runtime::Handle, make_fut: F) -> R
where
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<Output = R> + Send,
R: Send + 'static,
{
let handle = handle.clone();
std::thread::scope(|scope| {
scope
.spawn(move || handle.block_on(make_fut()))
.join()
.expect("vfs_io thread panicked")
})
}
pub(super) fn current_handle_or_default() -> Handle {
if let Ok(h) = Handle::try_current() {
return h;
}
use std::sync::OnceLock;
static FALLBACK: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
FALLBACK
.get_or_init(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("fallback runtime")
})
.handle()
.clone()
}