graft_sqlite/
vfs.rs

1// TODO: remove this once the vfs is implemented
2#![allow(unused)]
3
4use std::{
5    collections::{HashMap, HashSet},
6    fmt::Debug,
7    io::ErrorKind,
8    sync::Arc,
9};
10
11use culprit::{Culprit, ResultExt};
12use graft_client::{
13    ClientErr,
14    runtime::{
15        runtime::Runtime,
16        storage::{
17            StorageErr,
18            volume_state::{SyncDirection, VolumeConfig},
19        },
20    },
21};
22use graft_core::{VolumeId, gid::GidParseErr};
23use graft_tracing::TracingConsumer;
24use parking_lot::Mutex;
25use sqlite_plugin::{
26    flags::{AccessFlags, LockLevel, OpenKind, OpenOpts},
27    logger::{SqliteLogLevel, SqliteLogger},
28    vars::{
29        self, SQLITE_BUSY, SQLITE_BUSY_SNAPSHOT, SQLITE_CANTOPEN, SQLITE_INTERNAL, SQLITE_IOERR,
30        SQLITE_IOERR_ACCESS, SQLITE_NOTFOUND, SQLITE_READONLY,
31    },
32    vfs::{Pragma, PragmaErr, SqliteErr, Vfs, VfsHandle, VfsResult},
33};
34use thiserror::Error;
35use tryiter::TryIteratorExt;
36
37use crate::{
38    file::{FileHandle, VfsFile, mem_file::MemFile, vol_file::VolFile},
39    pragma::GraftPragma,
40};
41
42#[derive(Debug, Error)]
43pub enum ErrCtx {
44    #[error("Graft client error: {0}")]
45    Client(#[from] ClientErr),
46
47    #[error("Failed to parse VolumeId: {0}")]
48    GidParseErr(#[from] GidParseErr),
49
50    #[error("Unknown Pragma")]
51    UnknownPragma,
52
53    #[error("Cant open Volume")]
54    CantOpen,
55
56    #[error("Transaction is busy")]
57    Busy,
58
59    #[error("The transaction snapshot is no longer current")]
60    BusySnapshot,
61
62    #[error("Invalid lock transition")]
63    InvalidLockTransition,
64
65    #[error("Invalid volume state")]
66    InvalidVolumeState,
67}
68
69impl ErrCtx {
70    #[inline]
71    fn wrap<T>(mut cb: impl FnOnce() -> culprit::Result<T, ErrCtx>) -> VfsResult<T> {
72        match cb() {
73            Ok(t) => Ok(t),
74            Err(err) => {
75                let code = match err.ctx() {
76                    ErrCtx::UnknownPragma => SQLITE_NOTFOUND,
77                    ErrCtx::CantOpen => SQLITE_CANTOPEN,
78                    ErrCtx::Busy => SQLITE_BUSY,
79                    ErrCtx::BusySnapshot => SQLITE_BUSY_SNAPSHOT,
80                    ErrCtx::Client(err) => Self::map_client_err(err),
81                    _ => SQLITE_INTERNAL,
82                };
83                if code == SQLITE_INTERNAL {
84                    tracing::error!("{}", err);
85                }
86                Err(code)
87            }
88        }
89    }
90
91    fn map_client_err(err: &ClientErr) -> SqliteErr {
92        match err {
93            ClientErr::GraftErr(err) => {
94                if err.code().is_client() {
95                    SQLITE_INTERNAL
96                } else {
97                    SQLITE_IOERR
98                }
99            }
100            ClientErr::HttpErr(_) => SQLITE_IOERR,
101            ClientErr::StorageErr(store_err) => match store_err {
102                StorageErr::ConcurrentWrite => SQLITE_BUSY_SNAPSHOT,
103                StorageErr::FjallErr(err) => match Self::extract_ioerr(err) {
104                    Some(_) => SQLITE_IOERR,
105                    None => SQLITE_INTERNAL,
106                },
107                StorageErr::IoErr(err) => SQLITE_IOERR,
108                _ => SQLITE_INTERNAL,
109            },
110            ClientErr::IoErr(kind) => SQLITE_IOERR,
111            _ => SQLITE_INTERNAL,
112        }
113    }
114
115    fn extract_ioerr<'a>(
116        mut err: &'a (dyn std::error::Error + 'static),
117    ) -> Option<&'a std::io::Error> {
118        while let Some(source) = err.source() {
119            err = source;
120        }
121        err.downcast_ref::<std::io::Error>()
122    }
123}
124
125impl<T> From<ErrCtx> for culprit::Result<T, ErrCtx> {
126    fn from(err: ErrCtx) -> culprit::Result<T, ErrCtx> {
127        Err(Culprit::new(err))
128    }
129}
130
131pub struct GraftVfs {
132    runtime: Runtime,
133    locks: Mutex<HashMap<VolumeId, Arc<Mutex<()>>>>,
134}
135
136impl GraftVfs {
137    pub fn new(runtime: Runtime) -> Self {
138        Self { runtime, locks: Default::default() }
139    }
140}
141
142impl Vfs for GraftVfs {
143    type Handle = FileHandle;
144
145    fn register_logger(&self, logger: SqliteLogger) {
146        #[derive(Clone)]
147        struct Writer(Arc<Mutex<SqliteLogger>>);
148
149        impl std::io::Write for Writer {
150            #[inline]
151            fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
152                self.0.lock().log(SqliteLogLevel::Notice, data);
153                Ok(data.len())
154            }
155
156            #[inline]
157            fn flush(&mut self) -> std::io::Result<()> {
158                Ok(())
159            }
160        }
161
162        let writer = Writer(Arc::new(Mutex::new(logger)));
163        let make_writer = move || writer.clone();
164        graft_tracing::init_tracing_with_writer(
165            TracingConsumer::Tool,
166            Some(self.runtime.cid().short()),
167            make_writer,
168        );
169    }
170
171    fn canonical_path<'a>(
172        &self,
173        path: std::borrow::Cow<'a, str>,
174    ) -> VfsResult<std::borrow::Cow<'a, str>> {
175        if path == "random" {
176            Ok(VolumeId::random().pretty().into())
177        } else {
178            Ok(path)
179        }
180    }
181
182    fn pragma(
183        &self,
184        handle: &mut Self::Handle,
185        pragma: Pragma<'_>,
186    ) -> Result<Option<String>, PragmaErr> {
187        tracing::trace!("pragma: file={handle:?}, pragma={pragma:?}");
188        if let FileHandle::VolFile(file) = handle {
189            GraftPragma::try_from(&pragma)?.eval(&self.runtime, file)
190        } else {
191            Err(PragmaErr::NotFound)
192        }
193    }
194
195    fn access(&self, path: &str, flags: AccessFlags) -> VfsResult<bool> {
196        tracing::trace!("access: path={path:?}; flags={flags:?}");
197        ErrCtx::wrap(move || {
198            if let Ok(vid) = path.parse::<VolumeId>() {
199                Ok(self.runtime.volume_exists(vid).or_into_ctx()?)
200            } else {
201                Ok(false)
202            }
203        })
204    }
205
206    fn open(&self, path: Option<&str>, opts: OpenOpts) -> VfsResult<Self::Handle> {
207        tracing::trace!("open: path={path:?}, opts={opts:?}");
208        ErrCtx::wrap(move || {
209            // we only open a Volume for main database files named after a Volume ID
210            if opts.kind() == OpenKind::MainDb {
211                if let Some(path) = path {
212                    let vid: VolumeId = path.parse()?;
213
214                    // get or create a reserved lock for this Volume
215                    let reserved_lock = self.locks.lock().entry(vid.clone()).or_default().clone();
216
217                    let handle = self
218                        .runtime
219                        .open_volume(&vid, VolumeConfig::new(SyncDirection::Both))
220                        .or_into_ctx()?;
221                    return Ok(VolFile::new(handle, opts, reserved_lock).into());
222                }
223            }
224
225            // all other files use in-memory storage
226            Ok(MemFile::default().into())
227        })
228    }
229
230    fn close(&self, handle: Self::Handle) -> VfsResult<()> {
231        tracing::trace!("close: file={handle:?}");
232        ErrCtx::wrap(move || {
233            match handle {
234                FileHandle::MemFile(_) => Ok(()),
235                FileHandle::VolFile(vol_file) => {
236                    if vol_file.opts().delete_on_close() {
237                        // TODO: do we want to actually delete volumes? or mark them for deletion?
238                        self.runtime
239                            .update_volume_config(vol_file.handle().vid(), |conf| {
240                                conf.with_sync(SyncDirection::Disabled)
241                            })
242                            .or_into_ctx()?;
243                    }
244
245                    // close and drop the vol_file
246                    let handle = vol_file.close();
247
248                    let mut locks = self.locks.lock();
249                    let reserved_lock = locks
250                        .get(handle.vid())
251                        .expect("reserved lock missing from lock manager");
252
253                    // clean up the lock if this was the last reference
254                    // SAFETY: we are holding a lock on the lock manager,
255                    // preventing any concurrent opens from incrementing the
256                    // reference count
257                    if Arc::strong_count(reserved_lock) == 1 {
258                        locks.remove(handle.vid());
259                    }
260
261                    Ok(())
262                }
263            }
264        })
265    }
266
267    fn delete(&self, path: &str) -> VfsResult<()> {
268        tracing::trace!("delete: path={path:?}");
269        ErrCtx::wrap(|| {
270            if let Ok(vid) = path.parse() {
271                // TODO: do we want to actually delete volumes? or mark them for deletion?
272                self.runtime
273                    .update_volume_config(&vid, |conf| conf.with_sync(SyncDirection::Disabled))
274                    .or_into_ctx()?;
275            }
276            Ok(())
277        })
278    }
279
280    fn lock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
281        tracing::trace!("lock: file={handle:?}, level={level:?}");
282        ErrCtx::wrap(move || handle.lock(level))
283    }
284
285    fn unlock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
286        tracing::trace!("unlock: file={handle:?}, level={level:?}");
287        ErrCtx::wrap(move || handle.unlock(level))
288    }
289
290    fn file_size(&self, handle: &mut Self::Handle) -> VfsResult<usize> {
291        tracing::trace!("file_size: handle={handle:?}");
292        ErrCtx::wrap(move || handle.file_size())
293    }
294
295    fn truncate(&self, handle: &mut Self::Handle, size: usize) -> VfsResult<()> {
296        tracing::trace!("truncate: handle={handle:?}, size={size}");
297        ErrCtx::wrap(move || handle.truncate(size))
298    }
299
300    fn write(&self, handle: &mut Self::Handle, offset: usize, data: &[u8]) -> VfsResult<usize> {
301        tracing::trace!(
302            "write: handle={handle:?}, offset={offset}, len={}",
303            data.len()
304        );
305        ErrCtx::wrap(move || handle.write(offset, data))
306    }
307
308    fn read(&self, handle: &mut Self::Handle, offset: usize, data: &mut [u8]) -> VfsResult<usize> {
309        tracing::trace!(
310            "read: handle={handle:?}, offset={offset}, len={}",
311            data.len()
312        );
313        ErrCtx::wrap(move || handle.read(offset, data))
314    }
315}