1#![allow(unused)]
3
4use std::{
5 collections::{HashMap, HashSet},
6 fmt::Debug,
7 io::ErrorKind,
8 sync::Arc,
9};
10
11use culprit::{Culprit, ResultExt};
12use graft_client::{
13 ClientErr,
14 runtime::{
15 runtime::Runtime,
16 storage::{
17 StorageErr,
18 volume_state::{SyncDirection, VolumeConfig},
19 },
20 },
21};
22use graft_core::{VolumeId, gid::GidParseErr};
23use graft_tracing::TracingConsumer;
24use parking_lot::Mutex;
25use sqlite_plugin::{
26 flags::{AccessFlags, LockLevel, OpenKind, OpenOpts},
27 logger::{SqliteLogLevel, SqliteLogger},
28 vars::{
29 self, SQLITE_BUSY, SQLITE_BUSY_SNAPSHOT, SQLITE_CANTOPEN, SQLITE_INTERNAL, SQLITE_IOERR,
30 SQLITE_IOERR_ACCESS, SQLITE_NOTFOUND, SQLITE_READONLY,
31 },
32 vfs::{Pragma, PragmaErr, SqliteErr, Vfs, VfsHandle, VfsResult},
33};
34use thiserror::Error;
35use tryiter::TryIteratorExt;
36
37use crate::{
38 file::{FileHandle, VfsFile, mem_file::MemFile, vol_file::VolFile},
39 pragma::GraftPragma,
40};
41
42#[derive(Debug, Error)]
43pub enum ErrCtx {
44 #[error("Graft client error: {0}")]
45 Client(#[from] ClientErr),
46
47 #[error("Failed to parse VolumeId: {0}")]
48 GidParseErr(#[from] GidParseErr),
49
50 #[error("Unknown Pragma")]
51 UnknownPragma,
52
53 #[error("Cant open Volume")]
54 CantOpen,
55
56 #[error("Transaction is busy")]
57 Busy,
58
59 #[error("The transaction snapshot is no longer current")]
60 BusySnapshot,
61
62 #[error("Invalid lock transition")]
63 InvalidLockTransition,
64
65 #[error("Invalid volume state")]
66 InvalidVolumeState,
67}
68
69impl ErrCtx {
70 #[inline]
71 fn wrap<T>(mut cb: impl FnOnce() -> culprit::Result<T, ErrCtx>) -> VfsResult<T> {
72 match cb() {
73 Ok(t) => Ok(t),
74 Err(err) => {
75 let code = match err.ctx() {
76 ErrCtx::UnknownPragma => SQLITE_NOTFOUND,
77 ErrCtx::CantOpen => SQLITE_CANTOPEN,
78 ErrCtx::Busy => SQLITE_BUSY,
79 ErrCtx::BusySnapshot => SQLITE_BUSY_SNAPSHOT,
80 ErrCtx::Client(err) => Self::map_client_err(err),
81 _ => SQLITE_INTERNAL,
82 };
83 if code == SQLITE_INTERNAL {
84 tracing::error!("{}", err);
85 }
86 Err(code)
87 }
88 }
89 }
90
91 fn map_client_err(err: &ClientErr) -> SqliteErr {
92 match err {
93 ClientErr::GraftErr(err) => {
94 if err.code().is_client() {
95 SQLITE_INTERNAL
96 } else {
97 SQLITE_IOERR
98 }
99 }
100 ClientErr::HttpErr(_) => SQLITE_IOERR,
101 ClientErr::StorageErr(store_err) => match store_err {
102 StorageErr::ConcurrentWrite => SQLITE_BUSY_SNAPSHOT,
103 StorageErr::FjallErr(err) => match Self::extract_ioerr(err) {
104 Some(_) => SQLITE_IOERR,
105 None => SQLITE_INTERNAL,
106 },
107 StorageErr::IoErr(err) => SQLITE_IOERR,
108 _ => SQLITE_INTERNAL,
109 },
110 ClientErr::IoErr(kind) => SQLITE_IOERR,
111 _ => SQLITE_INTERNAL,
112 }
113 }
114
115 fn extract_ioerr<'a>(
116 mut err: &'a (dyn std::error::Error + 'static),
117 ) -> Option<&'a std::io::Error> {
118 while let Some(source) = err.source() {
119 err = source;
120 }
121 err.downcast_ref::<std::io::Error>()
122 }
123}
124
125impl<T> From<ErrCtx> for culprit::Result<T, ErrCtx> {
126 fn from(err: ErrCtx) -> culprit::Result<T, ErrCtx> {
127 Err(Culprit::new(err))
128 }
129}
130
131pub struct GraftVfs {
132 runtime: Runtime,
133 locks: Mutex<HashMap<VolumeId, Arc<Mutex<()>>>>,
134}
135
136impl GraftVfs {
137 pub fn new(runtime: Runtime) -> Self {
138 Self { runtime, locks: Default::default() }
139 }
140}
141
142impl Vfs for GraftVfs {
143 type Handle = FileHandle;
144
145 fn register_logger(&self, logger: SqliteLogger) {
146 #[derive(Clone)]
147 struct Writer(Arc<Mutex<SqliteLogger>>);
148
149 impl std::io::Write for Writer {
150 #[inline]
151 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
152 self.0.lock().log(SqliteLogLevel::Notice, data);
153 Ok(data.len())
154 }
155
156 #[inline]
157 fn flush(&mut self) -> std::io::Result<()> {
158 Ok(())
159 }
160 }
161
162 let writer = Writer(Arc::new(Mutex::new(logger)));
163 let make_writer = move || writer.clone();
164 graft_tracing::init_tracing_with_writer(
165 TracingConsumer::Tool,
166 Some(self.runtime.cid().short()),
167 make_writer,
168 );
169 }
170
171 fn canonical_path<'a>(
172 &self,
173 path: std::borrow::Cow<'a, str>,
174 ) -> VfsResult<std::borrow::Cow<'a, str>> {
175 if path == "random" {
176 Ok(VolumeId::random().pretty().into())
177 } else {
178 Ok(path)
179 }
180 }
181
182 fn pragma(
183 &self,
184 handle: &mut Self::Handle,
185 pragma: Pragma<'_>,
186 ) -> Result<Option<String>, PragmaErr> {
187 tracing::trace!("pragma: file={handle:?}, pragma={pragma:?}");
188 if let FileHandle::VolFile(file) = handle {
189 GraftPragma::try_from(&pragma)?.eval(&self.runtime, file)
190 } else {
191 Err(PragmaErr::NotFound)
192 }
193 }
194
195 fn access(&self, path: &str, flags: AccessFlags) -> VfsResult<bool> {
196 tracing::trace!("access: path={path:?}; flags={flags:?}");
197 ErrCtx::wrap(move || {
198 if let Ok(vid) = path.parse::<VolumeId>() {
199 Ok(self.runtime.volume_exists(vid).or_into_ctx()?)
200 } else {
201 Ok(false)
202 }
203 })
204 }
205
206 fn open(&self, path: Option<&str>, opts: OpenOpts) -> VfsResult<Self::Handle> {
207 tracing::trace!("open: path={path:?}, opts={opts:?}");
208 ErrCtx::wrap(move || {
209 if opts.kind() == OpenKind::MainDb {
211 if let Some(path) = path {
212 let vid: VolumeId = path.parse()?;
213
214 let reserved_lock = self.locks.lock().entry(vid.clone()).or_default().clone();
216
217 let handle = self
218 .runtime
219 .open_volume(&vid, VolumeConfig::new(SyncDirection::Both))
220 .or_into_ctx()?;
221 return Ok(VolFile::new(handle, opts, reserved_lock).into());
222 }
223 }
224
225 Ok(MemFile::default().into())
227 })
228 }
229
230 fn close(&self, handle: Self::Handle) -> VfsResult<()> {
231 tracing::trace!("close: file={handle:?}");
232 ErrCtx::wrap(move || {
233 match handle {
234 FileHandle::MemFile(_) => Ok(()),
235 FileHandle::VolFile(vol_file) => {
236 if vol_file.opts().delete_on_close() {
237 self.runtime
239 .update_volume_config(vol_file.handle().vid(), |conf| {
240 conf.with_sync(SyncDirection::Disabled)
241 })
242 .or_into_ctx()?;
243 }
244
245 let handle = vol_file.close();
247
248 let mut locks = self.locks.lock();
249 let reserved_lock = locks
250 .get(handle.vid())
251 .expect("reserved lock missing from lock manager");
252
253 if Arc::strong_count(reserved_lock) == 1 {
258 locks.remove(handle.vid());
259 }
260
261 Ok(())
262 }
263 }
264 })
265 }
266
267 fn delete(&self, path: &str) -> VfsResult<()> {
268 tracing::trace!("delete: path={path:?}");
269 ErrCtx::wrap(|| {
270 if let Ok(vid) = path.parse() {
271 self.runtime
273 .update_volume_config(&vid, |conf| conf.with_sync(SyncDirection::Disabled))
274 .or_into_ctx()?;
275 }
276 Ok(())
277 })
278 }
279
280 fn lock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
281 tracing::trace!("lock: file={handle:?}, level={level:?}");
282 ErrCtx::wrap(move || handle.lock(level))
283 }
284
285 fn unlock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
286 tracing::trace!("unlock: file={handle:?}, level={level:?}");
287 ErrCtx::wrap(move || handle.unlock(level))
288 }
289
290 fn file_size(&self, handle: &mut Self::Handle) -> VfsResult<usize> {
291 tracing::trace!("file_size: handle={handle:?}");
292 ErrCtx::wrap(move || handle.file_size())
293 }
294
295 fn truncate(&self, handle: &mut Self::Handle, size: usize) -> VfsResult<()> {
296 tracing::trace!("truncate: handle={handle:?}, size={size}");
297 ErrCtx::wrap(move || handle.truncate(size))
298 }
299
300 fn write(&self, handle: &mut Self::Handle, offset: usize, data: &[u8]) -> VfsResult<usize> {
301 tracing::trace!(
302 "write: handle={handle:?}, offset={offset}, len={}",
303 data.len()
304 );
305 ErrCtx::wrap(move || handle.write(offset, data))
306 }
307
308 fn read(&self, handle: &mut Self::Handle, offset: usize, data: &mut [u8]) -> VfsResult<usize> {
309 tracing::trace!(
310 "read: handle={handle:?}, offset={offset}, len={}",
311 data.len()
312 );
313 ErrCtx::wrap(move || handle.read(offset, data))
314 }
315}