possum/
handle.rs

1use rusqlite::{TransactionBehavior, TransactionState};
2
3use super::*;
4
5#[derive(Default, Debug)]
6#[repr(C)]
7pub struct Limits {
8    pub max_value_length_sum: Option<u64>,
9    // Invert this logic when there are defaults and mutators.
10    pub disable_hole_punching: bool,
11}
12
13type DeletedValuesSender = sync::mpsc::SyncSender<Vec<NonzeroValueLocation>>;
14
15/// Provides access to a storage directory. Manages manifest access, file cloning, file writers,
16/// configuration, value eviction etc.
17#[derive(Debug)]
18pub struct Handle {
19    pub(crate) conn: Mutex<Connection>,
20    pub(crate) exclusive_files: Mutex<HashMap<FileId, ExclusiveFile>>,
21    pub(crate) dir: Dir,
22    pub(crate) clones: Mutex<FileCloneCache>,
23    pub(crate) instance_limits: Limits,
24    deleted_values: Option<DeletedValuesSender>,
25    _value_puncher: Option<thread::JoinHandle<()>>,
26    value_puncher_done: ValuePuncherDone,
27}
28
29/// 4 bytes stored in the database header https://sqlite.org/fileformat2.html#database_header.
30type ManifestUserVersion = u32;
31
32impl Handle {
33    /// Whether file cloning should be attempted.
34    pub fn dir_supports_file_cloning(&self) -> bool {
35        self.dir.supports_file_cloning()
36    }
37
38    pub fn set_instance_limits(&mut self, limits: Limits) -> Result<()> {
39        self.instance_limits = limits;
40        self.start_deferred_transaction()?.apply_limits()
41    }
42
43    pub fn dir(&self) -> &Path {
44        self.dir.as_ref()
45    }
46
47    pub(crate) fn get_exclusive_file(&self) -> Result<ExclusiveFile> {
48        {
49            let mut files = self.exclusive_files.lock().unwrap();
50            // How do we avoid cloning the key and skipping the unnecessary remove check? Do we need a
51            // pop method on HashMap?
52            if let Some(id) = files.keys().next().cloned() {
53                let file = files.remove(&id).unwrap();
54                debug_assert_eq!(id, file.id);
55                debug!("using exclusive file {} from handle", &file.id);
56                return Ok(file);
57            }
58        }
59        trace!("about to open existing files");
60        if let Some(file) = self.open_existing_exclusive_file()? {
61            debug!("opened existing values file {}", file.id);
62            return Ok(file);
63        }
64        trace!("here");
65        let ret = ExclusiveFile::new(&self.dir);
66        if let Ok(file) = &ret {
67            debug!("created new exclusive file {}", file.id);
68        }
69        ret
70    }
71
72    fn open_existing_exclusive_file(&self) -> Result<Option<ExclusiveFile>> {
73        for res in read_dir(&self.dir)? {
74            let entry = res?;
75            if !entry.file_type()?.is_file() {
76                continue;
77            }
78            if !ExclusiveFile::valid_file_name(entry.file_name().to_str().unwrap()) {
79                continue;
80            }
81            let path = entry.path();
82            debug!(?path, "opening existing file");
83            match ExclusiveFile::open(path.clone()) {
84                Ok(ef) => return Ok(ef),
85                Err(err) => {
86                    debug!(?path, ?err, "open");
87                }
88            }
89        }
90        Ok(None)
91    }
92
93    // Expected manifest sqlite user version field value.
94    const USER_VERSION: u32 = 3;
95
96    pub fn new(dir: PathBuf) -> Result<Self> {
97        let sqlite_version = rusqlite::version_number();
98        // TODO: Why?
99        if sqlite_version < 3042000 {
100            bail!(
101                "sqlite version {} below minimum {}",
102                rusqlite::version(),
103                "3.42"
104            );
105        }
106        let dir = Dir::new(dir).context("new Dir")?;
107        let mut conn = Connection::open(dir.path().join(MANIFEST_DB_FILE_NAME))?;
108        Self::init_sqlite_conn(&mut conn, &dir)?;
109        let (deleted_values, receiver) = sync::mpsc::sync_channel(10);
110        let (value_puncher_done_sender, value_puncher_done) = sync::mpsc::sync_channel(0);
111        let value_puncher_done = ValuePuncherDone(Arc::new(Mutex::new(value_puncher_done)));
112        let handle = Self {
113            conn: Mutex::new(conn),
114            exclusive_files: Default::default(),
115            dir: dir.clone(),
116            clones: Default::default(),
117            instance_limits: Default::default(),
118            deleted_values: Some(deleted_values),
119            // Don't wait on this, at least in the Drop handler, because it stays alive until it
120            // succeeds in punching everything.
121            _value_puncher: Some(thread::spawn(move || -> () {
122                let _value_puncher_done_sender = value_puncher_done_sender;
123                if let Err(err) = Self::value_puncher(dir, receiver) {
124                    error!("value puncher thread failed with {err:?}");
125                }
126            })),
127            value_puncher_done,
128        };
129        Ok(handle)
130    }
131
132    fn retry_while_busy<T>(mut f: impl FnMut() -> rusqlite::Result<T>) -> rusqlite::Result<T> {
133        loop {
134            match f() {
135                Err(rusqlite::Error::SqliteFailure(err, _))
136                    if err.code == rusqlite::ErrorCode::DatabaseBusy =>
137                {
138                    std::thread::sleep(Duration::from_secs(1));
139                }
140                default => return default,
141            }
142        }
143    }
144
145    fn init_sqlite_conn(conn: &mut Connection, dir: &Dir) -> anyhow::Result<()> {
146        Self::retry_while_busy(|| conn.pragma_update(None, "synchronous", "off"))?;
147
148        let get_user_version = |conn: &Connection| -> Result<ManifestUserVersion, _> {
149            conn.pragma_query_value(None, "user_version", |row| row.get(0))
150        };
151        let user_version: ManifestUserVersion = get_user_version(conn)?;
152        // I'm not sure about the greater than case. Should we return an error?
153        if user_version >= Self::USER_VERSION {
154            return Ok(());
155        }
156        // This initialization/upgrade process doesn't seem to be safe to perform when there's
157        // multiple connections to a database. To avoid issues upgrading databases, you might want
158        // to create a single synchronous handle to do the upgrade before creating new handles
159        // concurrently.
160
161        // If the journal mode is WAL, we seem to get database locking errors even when we think we
162        // have the lock.
163        conn.pragma_update(None, "journal_mode", "delete")?;
164        // After the next read/write, we should be the only ones working on the database. Since we
165        // can't use transactions there's no other choice.
166        conn.pragma_update(None, "locking_mode", "exclusive")?;
167        let user_version = get_user_version(conn)?;
168        if user_version < Self::USER_VERSION {
169            use rusqlite::config::DbConfig::SQLITE_DBCONFIG_RESET_DATABASE;
170            conn.set_db_config(SQLITE_DBCONFIG_RESET_DATABASE, true)?;
171            // This can't be done in a transaction, an exclusive one would have been nice.
172            conn.execute("vacuum", [])?;
173            conn.set_db_config(SQLITE_DBCONFIG_RESET_DATABASE, false)?;
174            Self::delete_all_values_files(dir)?;
175            // It might be possible special case doing this from user_version=0 (a brand new database).
176            init_manifest_schema(conn)?;
177            conn.pragma_update(None, "user_version", Self::USER_VERSION)?;
178        }
179        let mode: String =
180            conn.pragma_update_and_check(None, "locking_mode", "normal", |row| row.get(0))?;
181        assert_eq!(mode, "normal");
182        // This is sticky and sync-safe. Must access the database after setting the locking mode to
183        // make the change, so now is a good time to set the journal mode. I think if WAL is already
184        // set, this doesn't change the locking_mode.
185        conn.pragma_update(None, "journal_mode", "wal")?;
186        Ok(())
187    }
188
189    /// Delete all values files, ensuring they're not in use first.
190    fn delete_all_values_files(dir: &Dir) -> anyhow::Result<()> {
191        for entry in dir.walk_dir()? {
192            let path = &entry.path;
193            if !matches!(entry.entry_type, EntryType::ValuesFile) {
194                continue;
195            }
196            let file = OpenOptions::new().write(true).open(path)?;
197            if !file.lock_max_segment(LockExclusiveNonblock)? {
198                warn!(?path, "file for deletion is locked. blocking");
199                assert!(file.lock_max_segment(LockExclusive)?);
200            }
201            debug!(?path, "deleting file");
202            remove_file(path)?;
203        }
204        Ok(())
205    }
206
207    pub fn cleanup_snapshots(&self) -> PubResult<()> {
208        delete_unused_snapshots(self.dir.path()).map_err(Into::into)
209    }
210
211    pub fn block_size(&self) -> u64 {
212        self.dir.block_size()
213    }
214
215    pub fn new_writer(&self) -> Result<BatchWriter<&Handle>> {
216        Ok(BatchWriter::new(self))
217    }
218
219    pub(crate) fn start_immediate_transaction(&self) -> rusqlite::Result<OwnedTx> {
220        self.start_writable_transaction_with_behaviour(TransactionBehavior::Immediate)
221    }
222
223    pub(crate) fn start_writable_transaction_with_behaviour(
224        &self,
225        behaviour: TransactionBehavior,
226    ) -> rusqlite::Result<OwnedTx> {
227        Ok(self
228            .start_transaction(|conn, handle| {
229                let tx_res = run_blocking(|| {
230                    // We're holding the write lock around the Connection, I think we're safe to
231                    // pass it to another thread. For some reason the return type, the
232                    // rusqlite::Transaction is what spits the dummy. It doesn't implement Send,
233                    // even though it only has a reference to Connection internally. So we put it
234                    // inside CanSend to return it from the thread then pop it out.
235                    conn.transaction_with_behavior(behaviour).map(CanSend)
236                });
237                let rtx = tx_res?.0;
238                Ok(Transaction::new(rtx, handle))
239            })?
240            .into())
241    }
242
243    /// Starts a deferred transaction (the default). There is no guaranteed read-only transaction
244    /// mode. There might be pragmas that can limit to read only statements.
245    pub fn start_deferred_transaction_for_read(&self) -> rusqlite::Result<OwnedReadTx> {
246        Ok(self
247            .start_transaction(|conn, _handle| {
248                let rtx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?;
249                Ok(ReadTransactionOwned(rtx))
250            })?
251            .into())
252    }
253
254    /// Starts a deferred transaction (the default). This might upgrade to a write transaction if
255    /// appropriate. I'm not sure about the semantics of doing that yet. This might be useful for
256    /// operations that become writes depending on certain conditions, but could violate some
257    /// expectations around locking. TBD.
258    pub(crate) fn start_deferred_transaction(&self) -> rusqlite::Result<OwnedTx> {
259        self.start_writable_transaction_with_behaviour(TransactionBehavior::Deferred)
260    }
261
262    /// Begins a read transaction.
263    pub fn read(&self) -> rusqlite::Result<Reader<OwnedTx>> {
264        let reader = Reader {
265            owned_tx: self
266                .start_writable_transaction_with_behaviour(TransactionBehavior::Immediate)?,
267            reads: Default::default(),
268        };
269        Ok(reader)
270    }
271
272    pub fn read_single(&self, key: &[u8]) -> Result<Option<SnapshotValue<Value>>> {
273        let mut reader = self.read()?;
274        let Some(value) = reader.add(key)? else {
275            return Ok(None);
276        };
277        let snapshot = reader.begin()?;
278        Ok(Some(snapshot.value(value)))
279    }
280
281    pub fn single_write_from(
282        &self,
283        key: Vec<u8>,
284        r: impl Read,
285    ) -> Result<(u64, WriteCommitResult)> {
286        let mut writer = self.new_writer()?;
287        let mut value = writer.new_value().begin()?;
288        trace!("got value writer");
289        let n = value.copy_from(r)?;
290        writer.stage_write(key, value)?;
291        let commit = writer.commit()?;
292        Ok((n, commit))
293    }
294
295    pub fn single_delete(&self, key: &[u8]) -> PubResult<Option<c_api::PossumStat>> {
296        let mut tx = self.start_deferred_transaction()?;
297        let deleted = tx.delete_key(key)?;
298        // Maybe it's okay just to commit anyway, since we have a deferred transaction and sqlite
299        // might know nothing has changed.
300        if deleted.is_some() {
301            tx.commit()?.complete();
302        }
303        Ok(deleted)
304    }
305
306    pub fn clone_from_file(&mut self, key: Vec<u8>, file: &mut File) -> Result<u64> {
307        let mut writer = self.new_writer()?;
308        let mut value = writer.new_value().clone_file(file)?;
309        let n = value.value_length()?;
310        writer.stage_write(key, value)?;
311        writer.commit()?;
312        Ok(n)
313    }
314
315    pub fn rename_item(&mut self, from: &[u8], to: &[u8]) -> PubResult<Timestamp> {
316        let mut tx = self.start_immediate_transaction()?;
317        let last_used = tx.rename_item(from, to)?;
318        tx.commit()?.complete();
319        Ok(last_used)
320    }
321
322    /// Walks the underlying files in the possum directory.
323    pub fn walk_dir(&self) -> Result<Vec<walk::Entry>> {
324        crate::walk::walk_dir(&self.dir)
325    }
326
327    pub fn list_items(&self, prefix: &[u8]) -> PubResult<Vec<Item>> {
328        self.start_deferred_transaction_for_read()?
329            .list_items(prefix)
330    }
331
332    /// Punches values in batches with its own dedicated connection and read-only transactions.
333    fn value_puncher(
334        dir: Dir,
335        values_receiver: sync::mpsc::Receiver<Vec<NonzeroValueLocation>>,
336    ) -> Result<()> {
337        let manifest_path = dir.path().join(MANIFEST_DB_FILE_NAME);
338        use rusqlite::OpenFlags;
339        let mut conn = Connection::open_with_flags(
340            manifest_path,
341            OpenFlags::SQLITE_OPEN_READ_ONLY
342                | OpenFlags::SQLITE_OPEN_NO_MUTEX
343                | OpenFlags::SQLITE_OPEN_URI,
344        )?;
345        const RETRY_DURATION: Duration = Duration::from_secs(1);
346        let mut pending_values: Vec<_> = Default::default();
347        let mut values_receiver_opt = Some(values_receiver);
348        while values_receiver_opt.is_some() || !pending_values.is_empty() {
349            match &values_receiver_opt {
350                Some(values_receiver) => {
351                    let timeout = if pending_values.is_empty() {
352                        Duration::MAX
353                    } else {
354                        RETRY_DURATION
355                    };
356                    let recv_result = values_receiver.recv_timeout(timeout);
357                    use std::sync::mpsc::RecvTimeoutError;
358                    match recv_result {
359                        Ok(mut values) => {
360                            pending_values.append(&mut values);
361                            // Drain the channel
362                            while let Ok(more_values) = values_receiver.try_recv() {
363                                pending_values.extend(more_values);
364                            }
365                        }
366                        Err(RecvTimeoutError::Timeout) => {}
367                        Err(RecvTimeoutError::Disconnected) => {
368                            // Don't try receiving again.
369                            values_receiver_opt = None;
370                        }
371                    }
372                }
373                None => {
374                    std::thread::sleep(RETRY_DURATION);
375                }
376            }
377            let tx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?;
378            let tx = ReadTransactionOwned(tx);
379            pending_values = Self::punch_values(&dir, pending_values, &tx)?;
380            debug_assert_ne!(tx.0.transaction_state(None)?, TransactionState::Write);
381        }
382        Ok(())
383    }
384
385    /// Starts a read transaction to determine punch boundaries. Since punching is never expanded to
386    /// offsets above the targeted values, ongoing writes should not be affected.
387    pub(crate) fn punch_values(
388        dir: &Dir,
389        values: Vec<NonzeroValueLocation>,
390        transaction: &ReadTransactionOwned,
391    ) -> PubResult<Vec<NonzeroValueLocation>> {
392        let mut failed = Vec::with_capacity(values.len());
393        for v in values {
394            let NonzeroValueLocation {
395                file_id,
396                file_offset,
397                length,
398            } = &v;
399            let value_length = length;
400            let msg = format!(
401                "deleting value at {:?} {} {}",
402                file_id, file_offset, value_length
403            );
404            debug!("{}", msg);
405            // self.handle.clones.lock().unwrap().remove(&file_id);
406            if !punch_value(PunchValueOptions {
407                dir: dir.path(),
408                file_id,
409                offset: *file_offset,
410                length: *value_length,
411                tx: transaction,
412                block_size: dir.block_size(),
413                constraints: Default::default(),
414            })
415            .context(msg)?
416            {
417                failed.push(v);
418            }
419        }
420        Ok(failed)
421    }
422
423    pub(crate) fn send_values_for_delete(&self, values: Vec<NonzeroValueLocation>) {
424        use std::sync::mpsc::TrySendError::*;
425        let sender = self.deleted_values.as_ref().unwrap();
426        match sender.try_send(values) {
427            Ok(()) => (),
428            Err(Disconnected(values)) => {
429                error!("sending {values:?}: channel disconnected");
430            }
431            Err(Full(values)) => {
432                warn!("channel full while sending values. blocking.");
433                sender.send(values).unwrap()
434            }
435        }
436    }
437
438    /// Returns something that can be used to test if the value puncher routine for this Handle has returned.
439    pub fn get_value_puncher_done(&self) -> ValuePuncherDone {
440        ValuePuncherDone(Arc::clone(&self.value_puncher_done.0))
441    }
442
443    pub fn move_prefix(&self, from: &[u8], to: &[u8]) -> Result<()> {
444        let mut tx = self.start_deferred_transaction()?;
445        let items = tx.list_items(from)?;
446        let mut to_vec = to.to_vec();
447        for item in items {
448            to_vec.truncate(to.len());
449            to_vec.extend_from_slice(item.key.strip_prefix(from).unwrap());
450            tx.rename_item(&item.key, &to_vec)?;
451        }
452        tx.commit()?.complete();
453        Ok(())
454    }
455
456    pub fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> PubResult<()> {
457        let mut tx = self.start_deferred_transaction()?;
458        for item in tx.list_items(prefix.as_ref())? {
459            tx.delete_key(&item.key)?;
460        }
461        tx.commit()?.complete();
462        Ok(())
463    }
464}
465
466use item::Item;
467
468use crate::c_api::{PossumHandle, PossumHandleRc};
469use crate::dir::Dir;
470use crate::owned_cell::{MutOwnedCell, OwnedCell};
471use crate::ownedtx::{OwnedReadTx, OwnedTxInner};
472use crate::tx::ReadTransaction;
473use crate::walk::EntryType;
474
475impl Drop for Handle {
476    fn drop(&mut self) {
477        // self.deleted_values.take();
478        // if let Some(join_handle) = self.value_puncher.take() {
479        //     join_handle.thread().unpark();
480        //     join_handle.join().unwrap()
481        // }
482    }
483}
484
485#[derive(Debug)]
486pub struct ValuePuncherDone(Arc<Mutex<sync::mpsc::Receiver<()>>>);
487
488impl ValuePuncherDone {
489    pub fn wait(&self) {
490        assert!(matches!(
491            self.0.lock().unwrap().recv(),
492            Err(std::sync::mpsc::RecvError)
493        ))
494    }
495}
496
497// T is the transaction type. 'h is a lifetime on a handle.
498pub(crate) trait StartTransaction<'h, T> {
499    type Owned;
500    type TxHandle;
501    fn start_transaction(
502        self,
503        // This part allows returning different transaction wrappers.
504        make_tx: impl FnOnce(&'h mut Connection, Self::TxHandle) -> rusqlite::Result<T>,
505    ) -> rusqlite::Result<Self::Owned>;
506}
507
508impl<'h, T> StartTransaction<'h, T> for &'h Handle {
509    type Owned = OwnedTxInner<'h, T>;
510    type TxHandle = &'h Handle;
511    fn start_transaction(
512        self,
513        make_tx: impl FnOnce(&'h mut Connection, Self::TxHandle) -> rusqlite::Result<T>,
514    ) -> rusqlite::Result<Self::Owned> {
515        let guard = self.conn.lock().unwrap();
516        MutOwnedCell::try_make(guard, |conn| make_tx(conn, self))
517    }
518}
519
520impl<'h, T> StartTransaction<'h, T> for PossumHandleRc {
521    type Owned = OwnedCell<
522        Self,
523        OwnedCell<Rc<RwLockReadGuard<'h, Handle>>, MutOwnedCell<MutexGuard<'h, Connection>, T>>,
524    >;
525    type TxHandle = Rc<RwLockReadGuard<'h, Handle>>;
526    // type TxHandle = &'h Handle;
527    fn start_transaction(
528        self,
529        make_tx: impl FnOnce(&'h mut Connection, Self::TxHandle) -> rusqlite::Result<T>,
530    ) -> rusqlite::Result<Self::Owned> {
531        OwnedCell::try_make(self, |handle_lock| {
532            let handle_guard = Rc::new(handle_lock.read().unwrap());
533            OwnedCell::try_make(handle_guard.clone(), |handle| {
534                MutOwnedCell::try_make(handle.conn.lock().unwrap(), |conn| {
535                    make_tx(conn, handle_guard)
536                })
537            })
538        })
539    }
540}
541
542pub trait WithHandle {
543    fn with_handle<R>(&self, f: impl FnOnce(&Handle) -> R) -> R;
544}
545
546impl WithHandle for &Handle {
547    fn with_handle<R>(&self, f: impl FnOnce(&Handle) -> R) -> R {
548        f(self)
549    }
550}
551
552impl WithHandle for PossumHandle {
553    fn with_handle<R>(&self, f: impl FnOnce(&Handle) -> R) -> R {
554        f(&self.read().unwrap())
555    }
556}
557
558impl AsRef<Handle> for Handle {
559    fn as_ref(&self) -> &Handle {
560        self
561    }
562}
563
564impl AsRef<Handle> for Rc<RwLockReadGuard<'_, Handle>> {
565    fn as_ref(&self) -> &Handle {
566        self.deref()
567    }
568}
569
570struct CanSend<T>(T);
571
572unsafe impl<T> Send for CanSend<T> {}