1#![allow(unused)]
3
4use std::{
5 collections::{HashMap, HashSet},
6 fmt::Debug,
7 io::ErrorKind,
8 sync::Arc,
9};
10
11use culprit::{Culprit, ResultExt};
12use graft_client::{
13 ClientErr,
14 runtime::{
15 runtime::Runtime,
16 storage::{
17 StorageErr,
18 volume_state::{SyncDirection, VolumeConfig},
19 },
20 },
21};
22use graft_core::{VolumeId, gid::GidParseErr};
23use graft_tracing::TracingConsumer;
24use parking_lot::Mutex;
25use sqlite_plugin::{
26 flags::{AccessFlags, LockLevel, OpenKind, OpenOpts},
27 logger::{SqliteLogLevel, SqliteLogger},
28 vars::{
29 self, SQLITE_BUSY, SQLITE_BUSY_SNAPSHOT, SQLITE_CANTOPEN, SQLITE_INTERNAL, SQLITE_IOERR,
30 SQLITE_IOERR_ACCESS, SQLITE_NOTFOUND, SQLITE_READONLY,
31 },
32 vfs::{Pragma, PragmaErr, SqliteErr, Vfs, VfsHandle, VfsResult},
33};
34use thiserror::Error;
35use tryiter::TryIteratorExt;
36
37use crate::{
38 file::{FileHandle, VfsFile, mem_file::MemFile, vol_file::VolFile},
39 pragma::GraftPragma,
40};
41
42#[derive(Debug, Error)]
43pub enum ErrCtx {
44 #[error("Graft client error: {0}")]
45 Client(#[from] ClientErr),
46
47 #[error("Failed to parse VolumeId: {0}")]
48 GidParseErr(#[from] GidParseErr),
49
50 #[error("Unknown Pragma")]
51 UnknownPragma,
52
53 #[error("Cant open Volume")]
54 CantOpen,
55
56 #[error("Transaction is busy")]
57 Busy,
58
59 #[error("The transaction snapshot is no longer current")]
60 BusySnapshot,
61
62 #[error("Invalid lock transition")]
63 InvalidLockTransition,
64
65 #[error("Invalid volume state")]
66 InvalidVolumeState,
67}
68
69impl ErrCtx {
70 #[inline]
71 fn wrap<T>(mut cb: impl FnOnce() -> culprit::Result<T, ErrCtx>) -> VfsResult<T> {
72 match cb() {
73 Ok(t) => Ok(t),
74 Err(err) => {
75 let code = match err.ctx() {
76 ErrCtx::UnknownPragma => SQLITE_NOTFOUND,
77 ErrCtx::CantOpen => SQLITE_CANTOPEN,
78 ErrCtx::Busy => SQLITE_BUSY,
79 ErrCtx::BusySnapshot => SQLITE_BUSY_SNAPSHOT,
80 ErrCtx::Client(err) => Self::map_client_err(err),
81 _ => SQLITE_INTERNAL,
82 };
83 if code == SQLITE_INTERNAL {
84 tracing::error!("{}", err);
85 }
86 Err(code)
87 }
88 }
89 }
90
91 fn map_client_err(err: &ClientErr) -> SqliteErr {
92 match err {
93 ClientErr::GraftErr(err) => {
94 if err.code().is_client() {
95 SQLITE_INTERNAL
96 } else {
97 SQLITE_IOERR
98 }
99 }
100 ClientErr::HttpErr(_) => SQLITE_IOERR,
101 ClientErr::StorageErr(store_err) => match store_err {
102 StorageErr::ConcurrentWrite => SQLITE_BUSY_SNAPSHOT,
103 StorageErr::FjallErr(err) => match Self::extract_ioerr(err) {
104 Some(_) => SQLITE_IOERR,
105 None => SQLITE_INTERNAL,
106 },
107 StorageErr::IoErr(err) => SQLITE_IOERR,
108 _ => SQLITE_INTERNAL,
109 },
110 ClientErr::IoErr(kind) => SQLITE_IOERR,
111 _ => SQLITE_INTERNAL,
112 }
113 }
114
115 fn extract_ioerr<'a>(
116 mut err: &'a (dyn std::error::Error + 'static),
117 ) -> Option<&'a std::io::Error> {
118 while let Some(source) = err.source() {
119 err = source;
120 }
121 err.downcast_ref::<std::io::Error>()
122 }
123}
124
125impl<T> From<ErrCtx> for culprit::Result<T, ErrCtx> {
126 fn from(err: ErrCtx) -> culprit::Result<T, ErrCtx> {
127 Err(Culprit::new(err))
128 }
129}
130
131pub struct GraftVfs {
132 runtime: Runtime,
133 locks: Mutex<HashMap<VolumeId, Arc<Mutex<()>>>>,
134}
135
136impl GraftVfs {
137 pub fn new(runtime: Runtime) -> Self {
138 Self { runtime, locks: Default::default() }
139 }
140}
141
142impl Vfs for GraftVfs {
143 type Handle = FileHandle;
144
145 fn register_logger(&self, logger: SqliteLogger) {
146 #[derive(Clone)]
147 struct Writer(Arc<Mutex<SqliteLogger>>);
148
149 impl std::io::Write for Writer {
150 #[inline]
151 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
152 self.0.lock().log(SqliteLogLevel::Notice, data);
153 Ok(data.len())
154 }
155
156 #[inline]
157 fn flush(&mut self) -> std::io::Result<()> {
158 Ok(())
159 }
160 }
161
162 let writer = Writer(Arc::new(Mutex::new(logger)));
163 let make_writer = move || writer.clone();
164 graft_tracing::init_tracing_with_writer(TracingConsumer::Tool, None, make_writer);
165 }
166
167 fn canonical_path<'a>(
168 &self,
169 path: std::borrow::Cow<'a, str>,
170 ) -> VfsResult<std::borrow::Cow<'a, str>> {
171 if path == "random" {
172 Ok(VolumeId::random().pretty().into())
173 } else {
174 Ok(path)
175 }
176 }
177
178 fn pragma(
179 &self,
180 handle: &mut Self::Handle,
181 pragma: Pragma<'_>,
182 ) -> Result<Option<String>, PragmaErr> {
183 tracing::trace!("pragma: file={handle:?}, pragma={pragma:?}");
184 if let FileHandle::VolFile(file) = handle {
185 GraftPragma::try_from(&pragma)?.eval(&self.runtime, file)
186 } else {
187 Err(PragmaErr::NotFound)
188 }
189 }
190
191 fn access(&self, path: &str, flags: AccessFlags) -> VfsResult<bool> {
192 tracing::trace!("access: path={path:?}; flags={flags:?}");
193 ErrCtx::wrap(move || {
194 if let Ok(vid) = path.parse::<VolumeId>() {
195 Ok(self.runtime.volume_exists(vid).or_into_ctx()?)
196 } else {
197 Ok(false)
198 }
199 })
200 }
201
202 fn open(&self, path: Option<&str>, opts: OpenOpts) -> VfsResult<Self::Handle> {
203 tracing::trace!("open: path={path:?}, opts={opts:?}");
204 ErrCtx::wrap(move || {
205 if opts.kind() == OpenKind::MainDb {
207 if let Some(path) = path {
208 let vid: VolumeId = path.parse()?;
209
210 let reserved_lock = self.locks.lock().entry(vid.clone()).or_default().clone();
212
213 let handle = self
214 .runtime
215 .open_volume(&vid, VolumeConfig::new(SyncDirection::Both))
216 .or_into_ctx()?;
217 return Ok(VolFile::new(handle, opts, reserved_lock).into());
218 }
219 }
220
221 Ok(MemFile::default().into())
223 })
224 }
225
226 fn close(&self, handle: Self::Handle) -> VfsResult<()> {
227 tracing::trace!("close: file={handle:?}");
228 ErrCtx::wrap(move || {
229 match handle {
230 FileHandle::MemFile(_) => Ok(()),
231 FileHandle::VolFile(vol_file) => {
232 if vol_file.opts().delete_on_close() {
233 self.runtime
235 .update_volume_config(vol_file.handle().vid(), |conf| {
236 conf.with_sync(SyncDirection::Disabled)
237 })
238 .or_into_ctx()?;
239 }
240
241 let handle = vol_file.close();
243
244 let mut locks = self.locks.lock();
245 let reserved_lock = locks
246 .get(handle.vid())
247 .expect("reserved lock missing from lock manager");
248
249 if Arc::strong_count(reserved_lock) == 1 {
254 locks.remove(handle.vid());
255 }
256
257 Ok(())
258 }
259 }
260 })
261 }
262
263 fn delete(&self, path: &str) -> VfsResult<()> {
264 tracing::trace!("delete: path={path:?}");
265 ErrCtx::wrap(|| {
266 if let Ok(vid) = path.parse() {
267 self.runtime
269 .update_volume_config(&vid, |conf| conf.with_sync(SyncDirection::Disabled))
270 .or_into_ctx()?;
271 }
272 Ok(())
273 })
274 }
275
276 fn lock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
277 tracing::trace!("lock: file={handle:?}, level={level:?}");
278 ErrCtx::wrap(move || handle.lock(level))
279 }
280
281 fn unlock(&self, handle: &mut Self::Handle, level: LockLevel) -> VfsResult<()> {
282 tracing::trace!("unlock: file={handle:?}, level={level:?}");
283 ErrCtx::wrap(move || handle.unlock(level))
284 }
285
286 fn file_size(&self, handle: &mut Self::Handle) -> VfsResult<usize> {
287 tracing::trace!("file_size: handle={handle:?}");
288 ErrCtx::wrap(move || handle.file_size())
289 }
290
291 fn truncate(&self, handle: &mut Self::Handle, size: usize) -> VfsResult<()> {
292 tracing::trace!("truncate: handle={handle:?}, size={size}");
293 ErrCtx::wrap(move || handle.truncate(size))
294 }
295
296 fn write(&self, handle: &mut Self::Handle, offset: usize, data: &[u8]) -> VfsResult<usize> {
297 tracing::trace!(
298 "write: handle={handle:?}, offset={offset}, len={}",
299 data.len()
300 );
301 ErrCtx::wrap(move || handle.write(offset, data))
302 }
303
304 fn read(&self, handle: &mut Self::Handle, offset: usize, data: &mut [u8]) -> VfsResult<usize> {
305 tracing::trace!(
306 "read: handle={handle:?}, offset={offset}, len={}",
307 data.len()
308 );
309 ErrCtx::wrap(move || handle.read(offset, data))
310 }
311}