graft_sqlite/
vfs.rs

1// TODO: remove this once the vfs is implemented
2#![allow(unused)]
3
4use std::{
5    collections::{HashMap, HashSet},
6    fmt::Debug,
7    io::ErrorKind,
8    sync::Arc,
9};
10
11use culprit::{Culprit, ResultExt};
12use graft_client::{
13    ClientErr,
14    runtime::{
15        runtime::Runtime,
16        storage::{
17            StorageErr,
18            volume_state::{SyncDirection, VolumeConfig},
19        },
20    },
21};
22use graft_core::{VolumeId, gid::GidParseErr};
23use graft_tracing::TracingConsumer;
24use parking_lot::Mutex;
25use sqlite_plugin::{
26    flags::{AccessFlags, LockLevel, OpenKind, OpenOpts},
27    logger::{SqliteLogLevel, SqliteLogger},
28    vars::{
29        self, SQLITE_BUSY, SQLITE_BUSY_SNAPSHOT, SQLITE_CANTOPEN, SQLITE_INTERNAL, SQLITE_IOERR,
30        SQLITE_IOERR_ACCESS, SQLITE_NOTFOUND, SQLITE_READONLY,
31    },
32    vfs::{Pragma, PragmaErr, SqliteErr, Vfs, VfsHandle, VfsResult},
33};
34use thiserror::Error;
35use tryiter::TryIteratorExt;
36
37use crate::{
38    file::{FileHandle, VfsFile, mem_file::MemFile, vol_file::VolFile},
39    pragma::GraftPragma,
40};
41
42#[derive(Debug, Error)]
43pub enum ErrCtx {
44    #[error("Graft client error: {0}")]
45    Client(#[from] ClientErr),
46
47    #[error("Failed to parse VolumeId: {0}")]
48    GidParseErr(#[from] GidParseErr),
49
50    #[error("Unknown Pragma")]
51    UnknownPragma,
52
53    #[error("Cant open Volume")]
54    CantOpen,
55
56    #[error("Transaction is busy")]
57    Busy,
58
59    #[error("The transaction snapshot is no longer current")]
60    BusySnapshot,
61
62    #[error("Invalid lock transition")]
63    InvalidLockTransition,
64
65    #[error("Invalid volume state")]
66    InvalidVolumeState,
67}
68
69impl ErrCtx {
70    #[inline]
71    fn wrap<T>(mut cb: impl FnOnce() -> culprit::Result<T, ErrCtx>) -> VfsResult<T> {
72        match cb() {
73            Ok(t) => Ok(t),
74            Err(err) => {
75                let code = match err.ctx() {
76                    ErrCtx::UnknownPragma => SQLITE_NOTFOUND,
77                    ErrCtx::CantOpen => SQLITE_CANTOPEN,
78                    ErrCtx::Busy => SQLITE_BUSY,
79                    ErrCtx::BusySnapshot => SQLITE_BUSY_SNAPSHOT,
80                    ErrCtx::Client(err) => Self::map_client_err(err),
81                    _ => SQLITE_INTERNAL,
82                };
83                if code == SQLITE_INTERNAL {
84                    tracing::error!("{}", err);
85                }
86                Err(code)
87            }
88        }
89    }
90
91    fn map_client_err(err: &ClientErr) -> SqliteErr {
92        match err {
93            ClientErr::GraftErr(err) => {
94                if err.code().is_client() {
95                    SQLITE_INTERNAL
96                } else {
97                    SQLITE_IOERR
98                }
99            }
100            ClientErr::HttpErr(_) => SQLITE_IOERR,
101            ClientErr::StorageErr(store_err) => match store_err {
102                StorageErr::ConcurrentWrite => SQLITE_BUSY_SNAPSHOT,
103                StorageErr::FjallErr(err) => match Self::extract_ioerr(err) {
104                    Some(_) => SQLITE_IOERR,
105                    None => SQLITE_INTERNAL,
106                },
107                StorageErr::IoErr(err) => SQLITE_IOERR,
108                _ => SQLITE_INTERNAL,
109            },
110            ClientErr::IoErr(kind) => SQLITE_IOERR,
111            _ => SQLITE_INTERNAL,
112        }
113    }
114
115    fn extract_ioerr<'a>(
116        mut err: &'a (dyn std::error::Error + 'static),
117    ) -> Option<&'a std::io::Error> {
118        while let Some(source) = err.source() {
119            err = source;
120        }
121        err.downcast_ref::<std::io::Error>()
122    }
123}
124
125impl<T> From<ErrCtx> for culprit::Result<T, ErrCtx> {
126    fn from(err: ErrCtx) -> culprit::Result<T, ErrCtx> {
127        Err(Culprit::new(err))
128    }
129}
130
131pub struct GraftVfs {
132    runtime: Runtime,
133    locks: Mutex<HashMap<VolumeId, Arc<Mutex<()>>>>,
134}
135
136impl GraftVfs {
137    pub fn new(runtime: Runtime) -> Self {
138        Self { runtime, locks: Default::default() }
139    }
140}
141
142impl Vfs for GraftVfs {
143    type Handle = FileHandle;
144
145    fn register_logger(&self, logger: SqliteLogger) {
146        #[derive(Clone)]
147        struct Writer(Arc<Mutex<SqliteLogger>>);
148
149        impl std::io::Write for Writer {
150            #[inline]
151            fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
152                self.0.lock().log(SqliteLogLevel::Notice, data);
153                Ok(data.len())
154            }
155
156            #[inline]
157            fn flush(&mut self) -> std::io::Result<()> {
158                Ok(())
159            }
160        }
161
162        let writer = Writer(Arc::new(Mutex::new(logger)));
163        let make_writer = move || writer.clone();
164        graft_tracing::init_tracing_with_writer(TracingConsumer::Tool, None, make_writer);
165    }
166
167    fn canonical_path<'a>(
168        &self,
169        path: std::borrow::Cow<'a, str>,
170    ) -> VfsResult<std::borrow::Cow<'a, str>> {
171        if path == "random" {
172            Ok(VolumeId::random().pretty().into())
173        } else {
174            Ok(path)
175        }
176    }
177
178    fn pragma(
179        &self,
180        handle: &mut Self::Handle,
181        pragma: Pragma<'_>,
182    ) -> Result<Option<String>, PragmaErr> {
183        tracing::trace!("pragma: file={handle:?}, pragma={pragma:?}");
184        if let FileHandle::VolFile(file) = handle {
185            GraftPragma::try_from(&pragma)?.eval(&self.runtime, file)
186        } else {
187            Err(PragmaErr::NotFound)
188        }
189    }
190
191    fn access(&self, path: &str, flags: AccessFlags) -> VfsResult<bool> {
192        tracing::trace!("access: path={path:?}; flags={flags:?}");
193        ErrCtx::wrap(move || {
194            if let Ok(vid) = path.parse::<VolumeId>() {
195                Ok(self.runtime.volume_exists(vid).or_into_ctx()?)
196            } else {
197                Ok(false)
198            }
199        })
200    }
201
202    fn open(&self, path: Option<&str>, opts: OpenOpts) -> VfsResult<Self::Handle> {
203        tracing::trace!("open: path={path:?}, opts={opts:?}");
204        ErrCtx::wrap(move || {
205            // we only open a Volume for main database files named after a Volume ID
206            if opts.kind() == OpenKind::MainDb {
207                if let Some(path) = path {
208                    let vid: VolumeId = path.parse()?;
209
210                    // get or create a reserved lock for this Volume
211                    let reserved_lock = self.locks.lock().entry(vid.clone()).or_default().clone();
212
213                    let handle = self
214                        .runtime
215                        .open_volume(&vid, VolumeConfig::new(SyncDirection::Both))
216                        .or_into_ctx()?;
217                    return Ok(VolFile::new(handle, opts, reserved_lock).into());
218                }
219            }
220
221            // all other files use in-memory storage
222            Ok(MemFile::default().into())
223        })
224    }
225
226    fn close(&self, handle: Self::Handle) -> VfsResult<()> {
227        tracing::trace!("close: file={handle:?}");
228        ErrCtx::wrap(move || {
229            match handle {
230                FileHandle::MemFile(_) => Ok(()),
231                FileHandle::VolFile(vol_file) => {
232                    if vol_file.opts().delete_on_close() {
233                        // TODO: do we want to actually delete volumes? or mark them for deletion?
234                        self.runtime
235                            .update_volume_config(vol_file.handle().vid(), |conf| {
236                                conf.with_sync(SyncDirection::Disabled)
237                            })
238                            .or_into_ctx()?;
239                    }
240
241                    // close and drop the vol_file
242                    let handle = vol_file.close();
243
244                    let mut locks = self.locks.lock();
245                    let reserved_lock = locks
246                        .get(handle.vid())
247                        .expect("reserved lock missing from lock manager");
248
249                    // clean up the lock if this was the last reference
250                    // SAFETY: we are holding a lock on the lock manager,
251                    // preventing any concurrent opens from incrementing the
252                    // reference count
253                    if Arc::strong_count(reserved_lock) == 1 {
254                        locks.remove(handle.vid());
255                    }
256
257                    Ok(())
258                }
259            }
260        })
261    }
262
263    fn delete(&self, path: &str) -> VfsResult<()> {
264        tracing::trace!("delete: path={path:?}");
265        ErrCtx::wrap(|| {
266            if let Ok(vid) = path.parse() {
267                // TODO: do we want to actually delete volumes? or mark them for deletion?
268                self.runtime
269                    .update_volume_config(&vid, |conf| conf.with_sync(SyncDirection::Disabled))
270                    .or_into_ctx()?;
271            }
272            Ok(())
273        })
274    }
275
276    fn lock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
277        tracing::trace!("lock: file={handle:?}, level={level:?}");
278        ErrCtx::wrap(move || handle.lock(level))
279    }
280
281    fn unlock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
282        tracing::trace!("unlock: file={handle:?}, level={level:?}");
283        ErrCtx::wrap(move || handle.unlock(level))
284    }
285
286    fn file_size(&self, handle: &mut Self::Handle) -> VfsResult<usize> {
287        tracing::trace!("file_size: handle={handle:?}");
288        ErrCtx::wrap(move || handle.file_size())
289    }
290
291    fn truncate(&self, handle: &mut Self::Handle, size: usize) -> VfsResult<()> {
292        tracing::trace!("truncate: handle={handle:?}, size={size}");
293        ErrCtx::wrap(move || handle.truncate(size))
294    }
295
296    fn write(&self, handle: &mut Self::Handle, offset: usize, data: &[u8]) -> VfsResult<usize> {
297        tracing::trace!(
298            "write: handle={handle:?}, offset={offset}, len={}",
299            data.len()
300        );
301        ErrCtx::wrap(move || handle.write(offset, data))
302    }
303
304    fn read(&self, handle: &mut Self::Handle, offset: usize, data: &mut [u8]) -> VfsResult<usize> {
305        tracing::trace!(
306            "read: handle={handle:?}, offset={offset}, len={}",
307            data.len()
308        );
309        ErrCtx::wrap(move || handle.read(offset, data))
310    }
311}