graft_sqlite/
vfs.rs

1// TODO: remove this once the vfs is implemented
2#![allow(unused)]
3
4use std::{
5    collections::{HashMap, HashSet},
6    fmt::Debug,
7    io::ErrorKind,
8    sync::Arc,
9};
10
11use culprit::{Culprit, ResultExt};
12use graft_client::{
13    ClientErr,
14    runtime::{
15        runtime::Runtime,
16        storage::{
17            StorageErr,
18            volume_state::{SyncDirection, VolumeConfig},
19        },
20    },
21};
22use graft_core::{VolumeId, gid::GidParseErr};
23use graft_tracing::TracingConsumer;
24use parking_lot::Mutex;
25use sqlite_plugin::{
26    flags::{AccessFlags, LockLevel, OpenKind, OpenOpts},
27    logger::{SqliteLogLevel, SqliteLogger},
28    vars::{
29        self, SQLITE_BUSY, SQLITE_BUSY_SNAPSHOT, SQLITE_CANTOPEN, SQLITE_INTERNAL, SQLITE_IOERR,
30        SQLITE_IOERR_ACCESS, SQLITE_NOTFOUND, SQLITE_READONLY,
31    },
32    vfs::{Pragma, PragmaErr, SqliteErr, Vfs, VfsHandle, VfsResult},
33};
34use thiserror::Error;
35use tryiter::TryIteratorExt;
36
37use crate::{
38    file::{FileHandle, VfsFile, mem_file::MemFile, vol_file::VolFile},
39    pragma::GraftPragma,
40};
41
42#[derive(Debug, Error)]
43pub enum ErrCtx {
44    #[error("Graft client error: {0}")]
45    Client(#[from] ClientErr),
46
47    #[error("Failed to parse VolumeId: {0}")]
48    GidParseErr(#[from] GidParseErr),
49
50    #[error("Unknown Pragma")]
51    UnknownPragma,
52
53    #[error("Cant open Volume")]
54    CantOpen,
55
56    #[error("Transaction is busy")]
57    Busy,
58
59    #[error("The transaction snapshot is no longer current")]
60    BusySnapshot,
61
62    #[error("Invalid lock transition")]
63    InvalidLockTransition,
64
65    #[error("Invalid volume state")]
66    InvalidVolumeState,
67
68    #[error(transparent)]
69    FmtErr(#[from] std::fmt::Error),
70}
71
72impl ErrCtx {
73    #[inline]
74    fn wrap<T>(mut cb: impl FnOnce() -> culprit::Result<T, ErrCtx>) -> VfsResult<T> {
75        match cb() {
76            Ok(t) => Ok(t),
77            Err(err) => {
78                let code = err.ctx().sqlite_err();
79                if code == SQLITE_INTERNAL {
80                    tracing::error!("{}", err);
81                }
82                Err(code)
83            }
84        }
85    }
86
87    fn sqlite_err(&self) -> SqliteErr {
88        match self {
89            ErrCtx::UnknownPragma => SQLITE_NOTFOUND,
90            ErrCtx::CantOpen => SQLITE_CANTOPEN,
91            ErrCtx::Busy => SQLITE_BUSY,
92            ErrCtx::BusySnapshot => SQLITE_BUSY_SNAPSHOT,
93            ErrCtx::Client(err) => Self::map_client_err(err),
94            _ => SQLITE_INTERNAL,
95        }
96    }
97
98    fn map_client_err(err: &ClientErr) -> SqliteErr {
99        match err {
100            ClientErr::GraftErr(err) => {
101                if err.code().is_client() {
102                    SQLITE_INTERNAL
103                } else {
104                    SQLITE_IOERR
105                }
106            }
107            ClientErr::HttpErr(_) => SQLITE_IOERR,
108            ClientErr::StorageErr(store_err) => match store_err {
109                StorageErr::ConcurrentWrite => SQLITE_BUSY_SNAPSHOT,
110                StorageErr::FjallErr(err) => match Self::extract_ioerr(err) {
111                    Some(_) => SQLITE_IOERR,
112                    None => SQLITE_INTERNAL,
113                },
114                StorageErr::IoErr(err) => SQLITE_IOERR,
115                _ => SQLITE_INTERNAL,
116            },
117            ClientErr::IoErr(kind) => SQLITE_IOERR,
118            _ => SQLITE_INTERNAL,
119        }
120    }
121
122    fn extract_ioerr<'a>(
123        mut err: &'a (dyn std::error::Error + 'static),
124    ) -> Option<&'a std::io::Error> {
125        while let Some(source) = err.source() {
126            err = source;
127        }
128        err.downcast_ref::<std::io::Error>()
129    }
130}
131
132impl<T> From<ErrCtx> for culprit::Result<T, ErrCtx> {
133    fn from(err: ErrCtx) -> culprit::Result<T, ErrCtx> {
134        Err(Culprit::new(err))
135    }
136}
137
138pub struct GraftVfs {
139    runtime: Runtime,
140    locks: Mutex<HashMap<VolumeId, Arc<Mutex<()>>>>,
141}
142
143impl GraftVfs {
144    pub fn new(runtime: Runtime) -> Self {
145        Self { runtime, locks: Default::default() }
146    }
147}
148
149impl Vfs for GraftVfs {
150    type Handle = FileHandle;
151
152    fn device_characteristics(&self) -> i32 {
153        // writes up to a single page are atomic
154        vars::SQLITE_IOCAP_ATOMIC512 |
155        vars::SQLITE_IOCAP_ATOMIC1K |
156        vars::SQLITE_IOCAP_ATOMIC2K |
157        vars::SQLITE_IOCAP_ATOMIC4K |
158        // after reboot following a crash or power loss, the only bytes in a file that were written
159        // at the application level might have changed and that adjacent bytes, even bytes within
160        // the same sector are guaranteed to be unchanged
161        vars::SQLITE_IOCAP_POWERSAFE_OVERWRITE |
162        // when data is appended to a file, the data is appended first then the size of the file is
163        // extended, never the other way around
164        vars::SQLITE_IOCAP_SAFE_APPEND |
165        // information is written to disk in the same order as calls to xWrite()
166        vars::SQLITE_IOCAP_SEQUENTIAL
167    }
168
169    fn register_logger(&self, logger: SqliteLogger) {
170        #[derive(Clone)]
171        struct Writer(Arc<Mutex<SqliteLogger>>);
172
173        impl std::io::Write for Writer {
174            #[inline]
175            fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
176                self.0.lock().log(SqliteLogLevel::Notice, data);
177                Ok(data.len())
178            }
179
180            #[inline]
181            fn flush(&mut self) -> std::io::Result<()> {
182                Ok(())
183            }
184        }
185
186        let writer = Writer(Arc::new(Mutex::new(logger)));
187        let make_writer = move || writer.clone();
188        graft_tracing::init_tracing_with_writer(
189            TracingConsumer::Tool,
190            Some(self.runtime.cid().short()),
191            make_writer,
192        );
193    }
194
195    fn canonical_path<'a>(
196        &self,
197        path: std::borrow::Cow<'a, str>,
198    ) -> VfsResult<std::borrow::Cow<'a, str>> {
199        if path == "random" {
200            Ok(VolumeId::random().pretty().into())
201        } else {
202            Ok(path)
203        }
204    }
205
206    fn pragma(
207        &self,
208        handle: &mut Self::Handle,
209        pragma: Pragma<'_>,
210    ) -> Result<Option<String>, PragmaErr> {
211        tracing::trace!("pragma: file={handle:?}, pragma={pragma:?}");
212        if let FileHandle::VolFile(file) = handle {
213            match GraftPragma::try_from(&pragma)?.eval(&self.runtime, file) {
214                Ok(val) => Ok(val),
215                Err(err) => Err(PragmaErr::Fail(
216                    err.ctx().sqlite_err(),
217                    Some(format!("{err:?}")),
218                )),
219            }
220        } else {
221            Err(PragmaErr::NotFound)
222        }
223    }
224
225    fn access(&self, path: &str, flags: AccessFlags) -> VfsResult<bool> {
226        tracing::trace!("access: path={path:?}; flags={flags:?}");
227        ErrCtx::wrap(move || {
228            if let Ok(vid) = path.parse::<VolumeId>() {
229                Ok(self.runtime.volume_exists(vid).or_into_ctx()?)
230            } else {
231                Ok(false)
232            }
233        })
234    }
235
236    fn open(&self, path: Option<&str>, opts: OpenOpts) -> VfsResult<Self::Handle> {
237        tracing::trace!("open: path={path:?}, opts={opts:?}");
238        ErrCtx::wrap(move || {
239            // we only open a Volume for main database files named after a Volume ID
240            if opts.kind() == OpenKind::MainDb {
241                if let Some(path) = path {
242                    let vid: VolumeId = path.parse()?;
243
244                    // get or create a reserved lock for this Volume
245                    let reserved_lock = self.locks.lock().entry(vid.clone()).or_default().clone();
246
247                    let handle = self
248                        .runtime
249                        .open_volume(&vid, VolumeConfig::new(SyncDirection::Both))
250                        .or_into_ctx()?;
251                    return Ok(VolFile::new(handle, opts, reserved_lock).into());
252                }
253            }
254
255            // all other files use in-memory storage
256            Ok(MemFile::default().into())
257        })
258    }
259
260    fn close(&self, handle: Self::Handle) -> VfsResult<()> {
261        tracing::trace!("close: file={handle:?}");
262        ErrCtx::wrap(move || {
263            match handle {
264                FileHandle::MemFile(_) => Ok(()),
265                FileHandle::VolFile(vol_file) => {
266                    if vol_file.opts().delete_on_close() {
267                        // TODO: do we want to actually delete volumes? or mark them for deletion?
268                        self.runtime
269                            .update_volume_config(vol_file.vid(), |conf| {
270                                conf.with_sync(SyncDirection::Disabled)
271                            })
272                            .or_into_ctx()?;
273                    }
274
275                    // close and drop the vol_file
276                    let handle = vol_file.close();
277
278                    let mut locks = self.locks.lock();
279                    let reserved_lock = locks
280                        .get(handle.vid())
281                        .expect("reserved lock missing from lock manager");
282
283                    // clean up the lock if this was the last reference
284                    // SAFETY: we are holding a lock on the lock manager,
285                    // preventing any concurrent opens from incrementing the
286                    // reference count
287                    if Arc::strong_count(reserved_lock) == 1 {
288                        locks.remove(handle.vid());
289                    }
290
291                    Ok(())
292                }
293            }
294        })
295    }
296
297    fn delete(&self, path: &str) -> VfsResult<()> {
298        tracing::trace!("delete: path={path:?}");
299        ErrCtx::wrap(|| {
300            if let Ok(vid) = path.parse() {
301                // TODO: do we want to actually delete volumes? or mark them for deletion?
302                self.runtime
303                    .update_volume_config(&vid, |conf| conf.with_sync(SyncDirection::Disabled))
304                    .or_into_ctx()?;
305            }
306            Ok(())
307        })
308    }
309
310    fn lock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
311        tracing::trace!("lock: file={handle:?}, level={level:?}");
312        ErrCtx::wrap(move || handle.lock(level))
313    }
314
315    fn unlock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
316        tracing::trace!("unlock: file={handle:?}, level={level:?}");
317        ErrCtx::wrap(move || handle.unlock(level))
318    }
319
320    fn file_size(&self, handle: &mut Self::Handle) -> VfsResult<usize> {
321        tracing::trace!("file_size: handle={handle:?}");
322        ErrCtx::wrap(move || handle.file_size())
323    }
324
325    fn truncate(&self, handle: &mut Self::Handle, size: usize) -> VfsResult<()> {
326        tracing::trace!("truncate: handle={handle:?}, size={size}");
327        ErrCtx::wrap(move || handle.truncate(size))
328    }
329
330    fn write(&self, handle: &mut Self::Handle, offset: usize, data: &[u8]) -> VfsResult<usize> {
331        tracing::trace!(
332            "write: handle={handle:?}, offset={offset}, len={}",
333            data.len()
334        );
335        ErrCtx::wrap(move || handle.write(offset, data))
336    }
337
338    fn read(&self, handle: &mut Self::Handle, offset: usize, data: &mut [u8]) -> VfsResult<usize> {
339        tracing::trace!(
340            "read: handle={handle:?}, offset={offset}, len={}",
341            data.len()
342        );
343        ErrCtx::wrap(move || handle.read(offset, data))
344    }
345}