1#![allow(unused)]
3
4use std::{
5 collections::{HashMap, HashSet},
6 fmt::Debug,
7 io::ErrorKind,
8 sync::Arc,
9};
10
11use culprit::{Culprit, ResultExt};
12use graft_client::{
13 ClientErr,
14 runtime::{
15 runtime::Runtime,
16 storage::{
17 StorageErr,
18 volume_state::{SyncDirection, VolumeConfig},
19 },
20 },
21};
22use graft_core::{VolumeId, gid::GidParseErr};
23use graft_tracing::TracingConsumer;
24use parking_lot::Mutex;
25use sqlite_plugin::{
26 flags::{AccessFlags, LockLevel, OpenKind, OpenOpts},
27 logger::{SqliteLogLevel, SqliteLogger},
28 vars::{
29 self, SQLITE_BUSY, SQLITE_BUSY_SNAPSHOT, SQLITE_CANTOPEN, SQLITE_INTERNAL, SQLITE_IOERR,
30 SQLITE_IOERR_ACCESS, SQLITE_NOTFOUND, SQLITE_READONLY,
31 },
32 vfs::{Pragma, PragmaErr, SqliteErr, Vfs, VfsHandle, VfsResult},
33};
34use thiserror::Error;
35use tryiter::TryIteratorExt;
36
37use crate::{
38 file::{FileHandle, VfsFile, mem_file::MemFile, vol_file::VolFile},
39 pragma::GraftPragma,
40};
41
42#[derive(Debug, Error)]
43pub enum ErrCtx {
44 #[error("Graft client error: {0}")]
45 Client(#[from] ClientErr),
46
47 #[error("Failed to parse VolumeId: {0}")]
48 GidParseErr(#[from] GidParseErr),
49
50 #[error("Unknown Pragma")]
51 UnknownPragma,
52
53 #[error("Cant open Volume")]
54 CantOpen,
55
56 #[error("Transaction is busy")]
57 Busy,
58
59 #[error("The transaction snapshot is no longer current")]
60 BusySnapshot,
61
62 #[error("Invalid lock transition")]
63 InvalidLockTransition,
64
65 #[error("Invalid volume state")]
66 InvalidVolumeState,
67
68 #[error(transparent)]
69 FmtErr(#[from] std::fmt::Error),
70}
71
72impl ErrCtx {
73 #[inline]
74 fn wrap<T>(mut cb: impl FnOnce() -> culprit::Result<T, ErrCtx>) -> VfsResult<T> {
75 match cb() {
76 Ok(t) => Ok(t),
77 Err(err) => {
78 let code = err.ctx().sqlite_err();
79 if code == SQLITE_INTERNAL {
80 tracing::error!("{}", err);
81 }
82 Err(code)
83 }
84 }
85 }
86
87 fn sqlite_err(&self) -> SqliteErr {
88 match self {
89 ErrCtx::UnknownPragma => SQLITE_NOTFOUND,
90 ErrCtx::CantOpen => SQLITE_CANTOPEN,
91 ErrCtx::Busy => SQLITE_BUSY,
92 ErrCtx::BusySnapshot => SQLITE_BUSY_SNAPSHOT,
93 ErrCtx::Client(err) => Self::map_client_err(err),
94 _ => SQLITE_INTERNAL,
95 }
96 }
97
98 fn map_client_err(err: &ClientErr) -> SqliteErr {
99 match err {
100 ClientErr::GraftErr(err) => {
101 if err.code().is_client() {
102 SQLITE_INTERNAL
103 } else {
104 SQLITE_IOERR
105 }
106 }
107 ClientErr::HttpErr(_) => SQLITE_IOERR,
108 ClientErr::StorageErr(store_err) => match store_err {
109 StorageErr::ConcurrentWrite => SQLITE_BUSY_SNAPSHOT,
110 StorageErr::FjallErr(err) => match Self::extract_ioerr(err) {
111 Some(_) => SQLITE_IOERR,
112 None => SQLITE_INTERNAL,
113 },
114 StorageErr::IoErr(err) => SQLITE_IOERR,
115 _ => SQLITE_INTERNAL,
116 },
117 ClientErr::IoErr(kind) => SQLITE_IOERR,
118 _ => SQLITE_INTERNAL,
119 }
120 }
121
122 fn extract_ioerr<'a>(
123 mut err: &'a (dyn std::error::Error + 'static),
124 ) -> Option<&'a std::io::Error> {
125 while let Some(source) = err.source() {
126 err = source;
127 }
128 err.downcast_ref::<std::io::Error>()
129 }
130}
131
132impl<T> From<ErrCtx> for culprit::Result<T, ErrCtx> {
133 fn from(err: ErrCtx) -> culprit::Result<T, ErrCtx> {
134 Err(Culprit::new(err))
135 }
136}
137
138pub struct GraftVfs {
139 runtime: Runtime,
140 locks: Mutex<HashMap<VolumeId, Arc<Mutex<()>>>>,
141}
142
143impl GraftVfs {
144 pub fn new(runtime: Runtime) -> Self {
145 Self { runtime, locks: Default::default() }
146 }
147}
148
149impl Vfs for GraftVfs {
150 type Handle = FileHandle;
151
152 fn device_characteristics(&self) -> i32 {
153 vars::SQLITE_IOCAP_ATOMIC512 |
155 vars::SQLITE_IOCAP_ATOMIC1K |
156 vars::SQLITE_IOCAP_ATOMIC2K |
157 vars::SQLITE_IOCAP_ATOMIC4K |
158 vars::SQLITE_IOCAP_POWERSAFE_OVERWRITE |
162 vars::SQLITE_IOCAP_SAFE_APPEND |
165 vars::SQLITE_IOCAP_SEQUENTIAL
167 }
168
169 fn register_logger(&self, logger: SqliteLogger) {
170 #[derive(Clone)]
171 struct Writer(Arc<Mutex<SqliteLogger>>);
172
173 impl std::io::Write for Writer {
174 #[inline]
175 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
176 self.0.lock().log(SqliteLogLevel::Notice, data);
177 Ok(data.len())
178 }
179
180 #[inline]
181 fn flush(&mut self) -> std::io::Result<()> {
182 Ok(())
183 }
184 }
185
186 let writer = Writer(Arc::new(Mutex::new(logger)));
187 let make_writer = move || writer.clone();
188 graft_tracing::init_tracing_with_writer(
189 TracingConsumer::Tool,
190 Some(self.runtime.cid().short()),
191 make_writer,
192 );
193 }
194
195 fn canonical_path<'a>(
196 &self,
197 path: std::borrow::Cow<'a, str>,
198 ) -> VfsResult<std::borrow::Cow<'a, str>> {
199 if path == "random" {
200 Ok(VolumeId::random().pretty().into())
201 } else {
202 Ok(path)
203 }
204 }
205
206 fn pragma(
207 &self,
208 handle: &mut Self::Handle,
209 pragma: Pragma<'_>,
210 ) -> Result<Option<String>, PragmaErr> {
211 tracing::trace!("pragma: file={handle:?}, pragma={pragma:?}");
212 if let FileHandle::VolFile(file) = handle {
213 match GraftPragma::try_from(&pragma)?.eval(&self.runtime, file) {
214 Ok(val) => Ok(val),
215 Err(err) => Err(PragmaErr::Fail(
216 err.ctx().sqlite_err(),
217 Some(format!("{err:?}")),
218 )),
219 }
220 } else {
221 Err(PragmaErr::NotFound)
222 }
223 }
224
225 fn access(&self, path: &str, flags: AccessFlags) -> VfsResult<bool> {
226 tracing::trace!("access: path={path:?}; flags={flags:?}");
227 ErrCtx::wrap(move || {
228 if let Ok(vid) = path.parse::<VolumeId>() {
229 Ok(self.runtime.volume_exists(vid).or_into_ctx()?)
230 } else {
231 Ok(false)
232 }
233 })
234 }
235
236 fn open(&self, path: Option<&str>, opts: OpenOpts) -> VfsResult<Self::Handle> {
237 tracing::trace!("open: path={path:?}, opts={opts:?}");
238 ErrCtx::wrap(move || {
239 if opts.kind() == OpenKind::MainDb {
241 if let Some(path) = path {
242 let vid: VolumeId = path.parse()?;
243
244 let reserved_lock = self.locks.lock().entry(vid.clone()).or_default().clone();
246
247 let handle = self
248 .runtime
249 .open_volume(&vid, VolumeConfig::new(SyncDirection::Both))
250 .or_into_ctx()?;
251 return Ok(VolFile::new(handle, opts, reserved_lock).into());
252 }
253 }
254
255 Ok(MemFile::default().into())
257 })
258 }
259
260 fn close(&self, handle: Self::Handle) -> VfsResult<()> {
261 tracing::trace!("close: file={handle:?}");
262 ErrCtx::wrap(move || {
263 match handle {
264 FileHandle::MemFile(_) => Ok(()),
265 FileHandle::VolFile(vol_file) => {
266 if vol_file.opts().delete_on_close() {
267 self.runtime
269 .update_volume_config(vol_file.vid(), |conf| {
270 conf.with_sync(SyncDirection::Disabled)
271 })
272 .or_into_ctx()?;
273 }
274
275 let handle = vol_file.close();
277
278 let mut locks = self.locks.lock();
279 let reserved_lock = locks
280 .get(handle.vid())
281 .expect("reserved lock missing from lock manager");
282
283 if Arc::strong_count(reserved_lock) == 1 {
288 locks.remove(handle.vid());
289 }
290
291 Ok(())
292 }
293 }
294 })
295 }
296
297 fn delete(&self, path: &str) -> VfsResult<()> {
298 tracing::trace!("delete: path={path:?}");
299 ErrCtx::wrap(|| {
300 if let Ok(vid) = path.parse() {
301 self.runtime
303 .update_volume_config(&vid, |conf| conf.with_sync(SyncDirection::Disabled))
304 .or_into_ctx()?;
305 }
306 Ok(())
307 })
308 }
309
310 fn lock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
311 tracing::trace!("lock: file={handle:?}, level={level:?}");
312 ErrCtx::wrap(move || handle.lock(level))
313 }
314
315 fn unlock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
316 tracing::trace!("unlock: file={handle:?}, level={level:?}");
317 ErrCtx::wrap(move || handle.unlock(level))
318 }
319
320 fn file_size(&self, handle: &mut Self::Handle) -> VfsResult<usize> {
321 tracing::trace!("file_size: handle={handle:?}");
322 ErrCtx::wrap(move || handle.file_size())
323 }
324
325 fn truncate(&self, handle: &mut Self::Handle, size: usize) -> VfsResult<()> {
326 tracing::trace!("truncate: handle={handle:?}, size={size}");
327 ErrCtx::wrap(move || handle.truncate(size))
328 }
329
330 fn write(&self, handle: &mut Self::Handle, offset: usize, data: &[u8]) -> VfsResult<usize> {
331 tracing::trace!(
332 "write: handle={handle:?}, offset={offset}, len={}",
333 data.len()
334 );
335 ErrCtx::wrap(move || handle.write(offset, data))
336 }
337
338 fn read(&self, handle: &mut Self::Handle, offset: usize, data: &mut [u8]) -> VfsResult<usize> {
339 tracing::trace!(
340 "read: handle={handle:?}, offset={offset}, len={}",
341 data.len()
342 );
343 ErrCtx::wrap(move || handle.read(offset, data))
344 }
345}