possum/
lib.rs

1#![allow(clippy::unused_unit)]
2
3use std::borrow::Borrow;
4use std::cmp::min;
5use std::collections::{BTreeSet, HashMap, HashSet};
6use std::ffi::{OsStr, OsString};
7use std::fmt::{Debug, Display, Formatter};
8use std::fs::{read_dir, remove_dir, remove_file, File, OpenOptions};
9use std::io::SeekFrom::{End, Start};
10use std::io::{ErrorKind, Read, Seek, Write};
11use std::num::TryFromIntError;
12use std::ops::{Deref, DerefMut};
13use std::path::{Path, PathBuf};
14use std::rc::Rc;
15use std::sync::OnceLock;
16use std::time::Duration;
17use std::{fs, io, str};
18
19use anyhow::{anyhow, bail, Context, Result};
20use cfg_if::cfg_if;
21use chrono::NaiveDateTime;
22use env::flocking;
23pub use error::*;
24use exclusive_file::ExclusiveFile;
25use file_id::FileId;
26pub use handle::Handle;
27use memmap2::Mmap;
28use num::Integer;
29use ownedtx::OwnedTx;
30use positioned_io::ReadAt;
31use rand::Rng;
32use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, ValueRef};
33use rusqlite::Error::QueryReturnedNoRows;
34use rusqlite::{params, CachedStatement, Connection, Statement, TransactionBehavior};
35use stable_deref_trait::StableDeref;
36use sys::*;
37use tempfile::TempDir;
38#[cfg(test)]
39pub use test_log::test;
40use tracing::*;
41use ErrorKind::InvalidInput;
42
43use crate::item::Item;
44use crate::walk::walk_dir;
45use crate::ValueLocation::{Nonzero, ZeroLength};
46
47mod c_api;
48mod cpathbuf;
49mod dir;
50mod error;
51mod exclusive_file;
52mod file_id;
53pub(crate) mod handle;
54mod item;
55mod owned_cell;
56pub mod sys;
57#[cfg(feature = "testing")]
58pub mod testing;
59#[cfg(test)]
60mod tests;
61mod tx;
62pub use tx::*;
63mod ownedtx;
64pub mod walk;
65pub use dir::*;
66pub mod env;
67mod reader;
68use reader::Reader;
69
70// Concurrency related stuff that's replaced by loom or shuttle.
71pub mod concurrency;
72use concurrency::*;
73
74use self::concurrency::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
75use crate::handle::WithHandle;
76
77/// Type to be exposed eventually from the lib instead of anyhow. Should be useful for the C API.
78pub type PubResult<T> = Result<T, Error>;
79
80#[derive(Debug)]
81struct FileClone {
82    file: File,
83    /// This exists to destroy clone tempdirs after they become empty.
84    #[allow(unused)]
85    tempdir: Option<Arc<TempDir>>,
86    mmap: Option<Mmap>,
87    len: u64,
88}
89
90type FileCloneCache = HashMap<FileId, Arc<Mutex<FileClone>>>;
91
92impl FileClone {
93    fn get_mmap(&mut self) -> io::Result<&Mmap> {
94        let mmap_opt = &mut self.mmap;
95        if let Some(mmap) = mmap_opt {
96            return Ok(mmap);
97        }
98        let mmap = unsafe {
99            memmap2::MmapOptions::new()
100                .len(self.len.try_into().unwrap())
101                .map_copy_read_only(&self.file)
102        }?;
103        assert_eq!(mmap.len() as u64, self.len);
104        Ok(mmap_opt.insert(mmap))
105    }
106}
107
108#[derive(Debug)]
109struct PendingWrite {
110    key: Vec<u8>,
111    value_file_offset: u64,
112    value_length: u64,
113    value_file_id: FileId,
114}
115
116const MANIFEST_SCHEMA_SQL: &str = include_str!("../manifest.sql");
117
118fn init_manifest_schema(conn: &rusqlite::Connection) -> rusqlite::Result<()> {
119    conn.execute_batch(MANIFEST_SCHEMA_SQL)
120}
121
122/// The start of a write, before an exclusive file has been allocated. This allows for bringing your
123/// own file, such as by rename, or file clone.
124pub struct BeginWriteValue<'writer, H>
125where
126    H: WithHandle,
127{
128    batch: &'writer mut BatchWriter<H>,
129}
130
131impl<H> BeginWriteValue<'_, H>
132where
133    H: WithHandle,
134{
135    // TODO: On Linux and Windows, this should be possible without creating a new file. I'm not sure
136    // if it's worth it however, since cloned blocks have to be a of a minimum size and alignment.
137    // See also
138    // https://stackoverflow.com/questions/65505765/difference-of-ficlone-vs-ficlonerange-vs-copy-file-range-for-copy-on-write-supp
139    // for a discussion on efficient ways to copy values that could be supported.
140    /// Clone an entire file in. If cloning fails, this will fall back to copying the provided file.
141    /// Its file position may be altered.
142    pub fn clone_file(self, file: &mut File) -> PubResult<ValueWriter> {
143        if !self
144            .batch
145            .handle
146            .with_handle(Handle::dir_supports_file_cloning)
147        {
148            return self.copy_file(file);
149        }
150        let dst_path = loop {
151            let dst_path = self
152                .batch
153                .handle
154                .with_handle(|handle| handle.dir.path().join(FileId::random().values_file_path()));
155            match fclonefile_noflags(file, &dst_path) {
156                Err(err) if CloneFileError::is_unsupported(&err) => {
157                    return self.copy_file(file);
158                }
159                Err(err) if err.is_file_already_exists() => continue,
160                Err(err) => return Err(err.into()),
161                Ok(()) => break dst_path,
162            }
163        };
164        // Should we delete this file if we fail to open it exclusively? I think it is possible that
165        // someone else could open it before us. In that case we probably want to punch out the part
166        // we cloned and move on.
167        let exclusive_file = ExclusiveFile::open(dst_path)?.unwrap();
168        Ok(ValueWriter {
169            exclusive_file,
170            value_file_offset: 0,
171        })
172    }
173
174    /// Assigns an exclusive file for writing, and copies the entire source file.
175    fn copy_file(self, file: &mut File) -> PubResult<ValueWriter> {
176        let mut value_writer = self.begin()?;
177        // Need to rewind the file since we're cloning the whole thing.
178        file.seek(Start(0))?;
179        value_writer.copy_from(file)?;
180        Ok(value_writer)
181    }
182
183    /// Assign an exclusive file for writing a value.
184    pub fn begin(self) -> PubResult<ValueWriter> {
185        let mut exclusive_file = self.batch.get_exclusive_file()?;
186        Ok(ValueWriter {
187            value_file_offset: exclusive_file.next_write_offset()?,
188            exclusive_file,
189        })
190    }
191}
192
193// TODO: Implement Drop for ValueWriter?
194#[derive(Debug)]
195pub struct ValueWriter {
196    exclusive_file: ExclusiveFile,
197    value_file_offset: u64,
198}
199
200impl ValueWriter {
201    pub fn get_file(&mut self) -> Result<&mut File> {
202        Ok(&mut self.exclusive_file.inner)
203    }
204
205    pub fn copy_from(&mut self, mut value: impl Read) -> PubResult<u64> {
206        let value_file_offset = self.exclusive_file.next_write_offset()?;
207        let value_length = match std::io::copy(&mut value, &mut self.exclusive_file.inner) {
208            Ok(ok) => ok,
209            Err(err) => {
210                self.exclusive_file
211                    .inner
212                    .seek(Start(value_file_offset))
213                    .expect("should rewind failed copy");
214                return Err(err.into());
215            }
216        };
217        Ok(value_length)
218    }
219
220    pub fn value_length(&mut self) -> io::Result<u64> {
221        Ok(self.exclusive_file.next_write_offset()? - self.value_file_offset)
222    }
223}
224
225impl Write for ValueWriter {
226    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
227        let file = &mut self.exclusive_file.inner;
228        file.write(buf)
229    }
230
231    fn flush(&mut self) -> io::Result<()> {
232        // This makes no sense until commit.
233        Ok(())
234    }
235}
236
237#[derive(Debug)]
238struct ValueRename {
239    value: Value,
240    new_key: Vec<u8>,
241}
242
243/// Manages uncommitted writes
244#[derive(Debug)]
245pub struct BatchWriter<H>
246where
247    H: WithHandle,
248{
249    handle: H,
250    exclusive_files: Vec<ExclusiveFile>,
251    pending_writes: Vec<PendingWrite>,
252    value_renames: Vec<ValueRename>,
253}
254
255impl<H> BatchWriter<H>
256where
257    H: WithHandle,
258{
259    pub fn new(handle: H) -> Self {
260        Self {
261            handle,
262            exclusive_files: Default::default(),
263            pending_writes: Default::default(),
264            value_renames: Default::default(),
265        }
266    }
267}
268
269pub type TimestampInner = NaiveDateTime;
270
271#[derive(Debug, PartialEq, Copy, Clone, PartialOrd)]
272pub struct Timestamp(TimestampInner);
273
274impl FromSql for Timestamp {
275    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
276        let int_time = value.as_i64()?;
277        Ok(Self(
278            TimestampInner::from_timestamp_millis(int_time)
279                .ok_or(FromSqlError::OutOfRange(int_time))?,
280        ))
281    }
282}
283
284// This may only be public for external tests.
285pub const LAST_USED_RESOLUTION: Duration = Duration::from_millis(1);
286
287impl Deref for Timestamp {
288    type Target = TimestampInner;
289
290    fn deref(&self) -> &Self::Target {
291        &self.0
292    }
293}
294
295pub struct WriteCommitResult {
296    count: usize,
297}
298
299impl WriteCommitResult {
300    pub fn count(&self) -> usize {
301        self.count
302    }
303}
304
305const VALUE_COLUMN_NAMES: &[&str] = &["file_id", "file_offset", "value_length", "last_used"];
306
307fn value_columns_sql() -> &'static str {
308    static ONCE: OnceLock<String> = OnceLock::new();
309    ONCE.get_or_init(|| VALUE_COLUMN_NAMES.join(", ")).as_str()
310}
311
312impl<H> BatchWriter<H>
313where
314    H: WithHandle,
315{
316    fn get_exclusive_file(&mut self) -> Result<ExclusiveFile> {
317        if let Some(ef) = self.exclusive_files.pop() {
318            debug!("reusing exclusive file from writer");
319            return Ok(ef);
320        }
321        self.handle.with_handle(Handle::get_exclusive_file)
322    }
323
324    pub fn stage_write(&mut self, key: Vec<u8>, mut value: ValueWriter) -> anyhow::Result<()> {
325        let value_length = match value.value_length() {
326            Ok(ok) => ok,
327            Err(err) => {
328                if let Err(err) = value
329                    .exclusive_file
330                    .revert_to_offset(value.value_file_offset)
331                {
332                    error!("error reverting value write: {:#?}", err);
333                }
334                // The ExclusiveFile is probably broken in some way if we couldn't seek on it. Don't
335                // return it to the BatchWriter.
336                return Err(err.into());
337            }
338        };
339        let exclusive_file = value.exclusive_file;
340        let value_file_id = exclusive_file.id;
341        self.exclusive_files.push(exclusive_file);
342        self.pending_writes.push(PendingWrite {
343            key,
344            value_file_offset: value.value_file_offset,
345            value_length,
346            value_file_id,
347        });
348        Ok(())
349    }
350
351    pub fn new_value(&mut self) -> BeginWriteValue<H> {
352        BeginWriteValue { batch: self }
353    }
354
355    pub fn rename_value(&mut self, value: Value, key: Vec<u8>) {
356        self.value_renames.push(ValueRename {
357            value,
358            new_key: key,
359        });
360    }
361
362    pub fn commit(self) -> Result<WriteCommitResult> {
363        self.commit_inner(|| {})
364    }
365
366    fn commit_inner(mut self, before_write: impl Fn()) -> Result<WriteCommitResult> {
367        if flocking() {
368            for ef in &mut self.exclusive_files {
369                assert!(ef.downgrade_lock()?);
370            }
371        }
372        let write_commit_res = self.handle.with_handle(|handle| {
373            let mut transaction: OwnedTx = handle.start_immediate_transaction()?;
374            let mut write_commit_res = WriteCommitResult { count: 0 };
375            for pw in self.pending_writes.drain(..) {
376                before_write();
377                transaction.delete_key(&pw.key)?;
378                transaction.insert_key(pw)?;
379                write_commit_res.count += 1;
380            }
381            for vr in self.value_renames.drain(..) {
382                transaction.rename_value(&vr.value, vr.new_key)?;
383            }
384            // TODO: On error here, rewind the exclusive to undo any writes that just occurred.
385            let work = transaction.commit().context("commit transaction")?;
386            work.complete();
387            anyhow::Ok(write_commit_res)
388        })?;
389        self.flush_exclusive_files();
390        Ok(write_commit_res)
391    }
392
393    /// Flush Writer's exclusive files and return them to the Handle pool.
394    fn flush_exclusive_files(&mut self) {
395        for ef in &mut self.exclusive_files {
396            ef.committed().unwrap();
397        }
398        self.return_exclusive_files_to_handle()
399    }
400
401    fn return_exclusive_files_to_handle(&mut self) {
402        // When we're flocking, we can't have writers and readers at the same time and still be
403        // able to punch values asynchronously.
404        if flocking() {
405            return;
406        }
407        self.handle.with_handle(|handle| {
408            let mut handle_exclusive_files = handle.exclusive_files.lock().unwrap();
409            for ef in self.exclusive_files.drain(..) {
410                debug!("returning exclusive file {} to handle", ef.id);
411                assert!(handle_exclusive_files.insert(ef.id, ef).is_none());
412            }
413        })
414    }
415}
416
417impl<H> Drop for BatchWriter<H>
418where
419    H: WithHandle,
420{
421    fn drop(&mut self) {
422        self.return_exclusive_files_to_handle()
423    }
424}
425
426type ValueLength = u64;
427
428#[derive(Debug, Clone, PartialEq, Copy)]
429pub struct Value {
430    pub location: ValueLocation,
431    last_used: Timestamp,
432}
433
434/// Storage location info for a non-zero-length value.
435#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Copy)]
436pub struct NonzeroValueLocation {
437    pub file_id: FileId,
438    pub file_offset: u64,
439    pub length: ValueLength,
440}
441
442/// Location data for a value. Includes the case where the value is empty and so doesn't require
443/// allocation.
444#[derive(Debug, Clone, PartialEq, Copy)]
445pub enum ValueLocation {
446    ZeroLength,
447    Nonzero(NonzeroValueLocation),
448}
449
450impl ValueLocation {
451    pub fn into_non_zero(self) -> Option<NonzeroValueLocation> {
452        match self {
453            ZeroLength => None,
454            Nonzero(a) => Some(a),
455        }
456    }
457
458    pub fn file_offset(&self) -> Option<u64> {
459        match self {
460            ZeroLength => None,
461            Nonzero(NonzeroValueLocation { file_offset, .. }) => Some(*file_offset),
462        }
463    }
464
465    pub fn file_id(&self) -> Option<&FileId> {
466        match self {
467            ZeroLength => None,
468            Nonzero(NonzeroValueLocation { file_id, .. }) => Some(file_id),
469        }
470    }
471
472    pub fn length(&self) -> u64 {
473        match self {
474            ZeroLength => 0,
475            Nonzero(NonzeroValueLocation { length, .. }) => *length,
476        }
477    }
478}
479
480impl Deref for Value {
481    type Target = ValueLocation;
482
483    fn deref(&self) -> &Self::Target {
484        &self.location
485    }
486}
487
488impl Value {
489    fn from_row(row: &rusqlite::Row) -> rusqlite::Result<Self> {
490        Self::from_column_values(row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)
491    }
492
493    fn from_column_values(
494        file_id: Option<FileId>,
495        file_offset: Option<u64>,
496        length: ValueLength,
497        last_used: Timestamp,
498    ) -> rusqlite::Result<Self> {
499        let location = if length == 0 {
500            assert_eq!(file_id, None);
501            assert_eq!(file_offset, None);
502            ZeroLength
503        } else {
504            Nonzero(NonzeroValueLocation {
505                file_id: file_id.unwrap(),
506                file_offset: file_offset.unwrap(),
507                length,
508            })
509        };
510        Ok(Value {
511            location,
512            last_used,
513        })
514    }
515
516    pub fn last_used(&self) -> Timestamp {
517        self.last_used
518    }
519}
520
521impl AsRef<Value> for Value {
522    fn as_ref(&self) -> &Value {
523        self
524    }
525}
526
527impl AsMut<Snapshot> for Snapshot {
528    fn as_mut(&mut self) -> &mut Snapshot {
529        self
530    }
531}
532
533impl AsRef<Snapshot> for Snapshot {
534    fn as_ref(&self) -> &Self {
535        self
536    }
537}
538
539#[derive(Debug)]
540pub struct Snapshot {
541    file_clones: HashMap<FileId, Arc<Mutex<FileClone>>>,
542}
543
544#[derive(Debug)]
545pub struct SnapshotValue<V> {
546    value: V,
547    // This is Some if value is Nonzero.
548    cloned_file: Option<Arc<Mutex<FileClone>>>,
549}
550
551impl<V> Deref for SnapshotValue<V> {
552    type Target = V;
553
554    fn deref(&self) -> &Self::Target {
555        &self.value
556    }
557}
558
559impl Snapshot {
560    pub fn value<V>(&self, value: V) -> SnapshotValue<V>
561    where
562        V: AsRef<Value>,
563    {
564        SnapshotValue {
565            cloned_file: value
566                .as_ref()
567                .file_id()
568                .map(|file_id| Arc::clone(self.file_clones.get(file_id).unwrap())),
569            value,
570        }
571    }
572}
573
574impl<V> ReadAt for SnapshotValue<V>
575where
576    V: AsRef<Value>,
577{
578    fn read_at(&self, pos: u64, mut buf: &mut [u8]) -> io::Result<usize> {
579        if false {
580            // TODO: Create a thiserror or io::Error for non-usize pos.
581            // let pos = usize::try_from(pos).expect("pos should be usize");
582            let n = self.view(|view| {
583                let r = view;
584                r.read_at(pos, buf)
585            })??;
586            // dbg!(buf.split_at(n).0);
587            Ok(n)
588        } else {
589            match self.value.as_ref().location {
590                ValueLocation::ZeroLength => Ok(0),
591                Nonzero(NonzeroValueLocation {
592                    file_offset,
593                    length,
594                    ..
595                }) => {
596                    if pos >= length {
597                        return Ok(0);
598                    }
599                    let available = length - pos;
600                    buf = buf
601                        .split_at_mut(min(buf.len() as u64, available) as usize)
602                        .0;
603                    let mut file_clone = self.file_clone().unwrap().lock().unwrap();
604                    let file = &mut file_clone.file;
605                    let file_offset = file_offset + pos;
606                    // Getting lazy: Using positioned-io's ReadAt because it works on Windows.
607                    let res = file.read_at(file_offset, buf);
608                    debug!(
609                        ?file,
610                        ?file_offset,
611                        len = buf.len(),
612                        ?res,
613                        "snapshot value read_at"
614                    );
615                    res
616                }
617            }
618        }
619    }
620}
621
622impl<V> SnapshotValue<V>
623where
624    V: AsRef<Value>,
625{
626    fn file_clone(&self) -> Option<&Arc<Mutex<FileClone>>> {
627        self.cloned_file.as_ref()
628    }
629
630    pub fn view<R>(&self, f: impl FnOnce(&[u8]) -> R) -> io::Result<R> {
631        let value = self.value.as_ref();
632        match value.location {
633            Nonzero(NonzeroValueLocation {
634                file_offset,
635                length,
636                ..
637            }) => {
638                let file_clone = self.file_clone().unwrap();
639                let start = to_usize_io(file_offset)?;
640                let usize_length = to_usize_io(length)?;
641                let end =
642                    usize::checked_add(start, usize_length).ok_or_else(make_to_usize_io_error)?;
643                let mut mutex_guard = file_clone.lock().unwrap();
644                let mmap = mutex_guard.get_mmap()?;
645                Ok(f(&mmap[start..end]))
646            }
647            ZeroLength => Ok(f(&[])),
648        }
649    }
650
651    pub fn read(&self, mut buf: &mut [u8]) -> Result<usize> {
652        match self.value.as_ref().location {
653            ValueLocation::ZeroLength => Ok(0),
654            Nonzero(NonzeroValueLocation {
655                file_offset,
656                length,
657                ..
658            }) => {
659                buf = buf.split_at_mut(min(buf.len() as u64, length) as usize).0;
660                let mut file_clone = self.file_clone().unwrap().lock().unwrap();
661                let file = &mut file_clone.file;
662                file.seek(Start(file_offset))?;
663                let res = file.read(buf);
664                debug!(
665                    ?file,
666                    ?file_offset,
667                    len = buf.len(),
668                    ?res,
669                    "snapshot value read"
670                );
671                res.map_err(Into::into)
672            }
673        }
674    }
675
676    pub fn new_reader(&self) -> impl Read + '_ {
677        positioned_io::Cursor::new(self)
678    }
679
680    /// For testing: Leak a reference to the snapshot tempdir so it's not cleaned up when all
681    /// references are forgotten. This could possibly be used from internal tests instead.
682    pub fn leak_snapshot_dir(&self) {
683        if let Some(file_clone) = self.file_clone() {
684            if let Some(tempdir) = &file_clone.lock().unwrap().tempdir {
685                std::mem::forget(Arc::clone(tempdir));
686            }
687        }
688    }
689}
690
691/// A value holding a statement temporary for starting an iterator over the Values of a value file.
692/// I couldn't seem to get the code to work without this.
693pub struct FileValues<'a, S>
694where
695    S: Deref<Target = Statement<'a>> + DerefMut + 'a,
696{
697    stmt: S,
698    file_id: FileId,
699}
700
701impl<'a, S> FileValues<'a, S>
702where
703    S: Deref<Target = Statement<'a>> + DerefMut + 'a,
704{
705    pub fn begin(
706        &mut self,
707    ) -> rusqlite::Result<impl Iterator<Item = rusqlite::Result<Value>> + '_> {
708        self.stmt.query_map([self.file_id], Value::from_row)
709    }
710}
711
712#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone)]
713struct ReadExtent {
714    pub offset: u64,
715    pub len: u64,
716}
717
718#[allow(dead_code)]
719fn floored_multiple<T>(value: T, multiple: T) -> T
720where
721    T: Integer + Copy,
722{
723    // What about value - value % multiple?
724    multiple * (value / multiple)
725}
726
727/// Divides value by multiple, always rounding up. Very common operation for allocators.
728pub fn ceil_multiple<T>(value: T, multiple: T) -> T
729where
730    T: Integer + Copy,
731{
732    (value + multiple - T::one()) / multiple * multiple
733}
734
735fn open_file_id(options: &OpenOptions, dir: &Path, file_id: &FileId) -> io::Result<File> {
736    options.open(file_path(dir, file_id))
737}
738
739fn file_path(dir: &Path, file_id: impl AsRef<FileId>) -> PathBuf {
740    dir.join(file_id.as_ref().values_file_path())
741}
742
743fn random_file_name_in_dir(dir: &Path, prefix: &str) -> PathBuf {
744    let base = random_file_name(prefix);
745    dir.join(base)
746}
747
748const FILE_NAME_RAND_LENGTH: usize = 8;
749const VALUES_FILE_NAME_PREFIX: &str = "values-";
750const SNAPSHOT_DIR_NAME_PREFIX: &str = "snapshot-";
751
752fn random_file_name(prefix: &str) -> OsString {
753    let mut begin = prefix.as_bytes().to_vec();
754    begin.extend(
755        rand::thread_rng()
756            .sample_iter(rand::distributions::Alphanumeric)
757            .take(FILE_NAME_RAND_LENGTH),
758    );
759    unsafe { OsString::from_encoded_bytes_unchecked(begin) }
760}
761
762pub const MANIFEST_DB_FILE_NAME: &str = "manifest.db";
763
764struct PunchValueConstraints {
765    greedy_start: bool,
766    check_hole: bool,
767    greedy_end: bool,
768    allow_truncate: bool,
769    allow_remove: bool,
770}
771
772impl Default for PunchValueConstraints {
773    fn default() -> Self {
774        Self {
775            greedy_start: true,
776            check_hole: true,
777            greedy_end: true,
778            allow_truncate: true,
779            allow_remove: true,
780        }
781    }
782}
783
784struct PunchValueOptions<'a> {
785    dir: &'a Path,
786    file_id: &'a FileId,
787    offset: u64,
788    length: u64,
789    tx: &'a ReadTransactionOwned<'a>,
790    block_size: u64,
791    constraints: PunchValueConstraints,
792}
793
794// Can't do this as &mut self for dumb Rust reasons.
795fn punch_value(opts: PunchValueOptions) -> Result<bool> {
796    let PunchValueOptions {
797        dir,
798        file_id,
799        offset,
800        length,
801        tx,
802        block_size,
803        constraints:
804            PunchValueConstraints {
805                greedy_start,
806                check_hole: check_holes,
807                allow_truncate,
808                allow_remove,
809                greedy_end,
810            },
811    } = opts;
812    let cloning_lock_aware = false;
813    // Make signed for easier arithmetic.
814    let mut offset = offset as i64;
815    let mut length = length as i64;
816    let block_size = block_size as i64;
817    let file_path = file_path(dir, file_id);
818    // Punching values probably requires write permission.
819    let mut file = match OpenOptions::new().write(true).open(&file_path) {
820        // The file could have already been deleted by a previous punch.
821        Err(err) if err.kind() == ErrorKind::NotFound && allow_remove => return Ok(true),
822        Err(err) => return Err(err).context("opening value file"),
823        Ok(ok) => ok,
824    };
825    // Find out how far back we can punch and start there, correcting for block boundaries as we go.
826    if offset % block_size != 0 || greedy_start {
827        let last_end_offset = tx.query_last_end_offset(file_id, offset as u64)?;
828        // Round up the end of the last value.
829        let new_offset = ceil_multiple(last_end_offset, block_size as u64) as i64;
830        // Because these are u64 we can't deal with overflow into negatives.
831        length += offset - new_offset;
832        offset = new_offset;
833    }
834    assert_eq!(offset % block_size, 0);
835    if greedy_end {
836        let next_offset = tx.next_value_offset(file_id, (offset + length).try_into().unwrap())?;
837        let end_offset = match next_offset {
838            None => {
839                // Lock the file to see if we can expand to the end of the file.
840                let locked_file = file
841                    .lock_max_segment(LockExclusiveNonblock)
842                    .context("locking value file")?;
843                // Get the file length after we have tried locking the file.
844                let file_end = file.seek(End(0))? as i64;
845                if locked_file {
846                    // I think it's okay to remove and truncate files if cloning doesn't use locks,
847                    // because there are no values in this file to clone.
848                    if offset == 0 && allow_remove {
849                        remove_file(file_path).context("removing value file")?;
850                        return Ok(true);
851                    } else if allow_truncate {
852                        file.set_len(offset as u64)?;
853                        return Ok(true);
854                    }
855                    file_end
856                } else if cloning_lock_aware {
857                    // Round the punch region down the beginning of the last block. We aren't sure
858                    // if someone is writing to the file.
859                    floored_multiple(file_end, block_size)
860                } else {
861                    floored_multiple(offset + length, block_size)
862                }
863            }
864            Some(next_offset) => floored_multiple(next_offset as i64, block_size),
865        };
866        let new_length = end_offset - offset;
867        length = new_length;
868    } else {
869        let end_offset = floored_multiple(offset + length, block_size);
870        length = end_offset - offset;
871    }
872    debug!(target: "punching", "punching {} {} for {}", file_id, offset, length);
873    // If a punch is rounded up, and the end is rounded down, they cross each other by exactly a
874    // full block.
875    assert!(length >= -block_size);
876    if length <= 0 {
877        return Ok(true);
878    }
879    assert_eq!(offset % block_size, 0);
880    if !file.lock_segment(LockExclusiveNonblock, Some(length as u64), offset as u64)? {
881        // TODO: If we can't delete immediately, we should schedule to try again later. Maybe
882        // spinning up a thread, or putting in a slow queue.
883        warn!(%file_id, %offset, %length, "can't punch, file segment locked");
884        return Ok(false);
885    }
886    debug!(?file, %offset, %length, "punching");
887    punchfile(
888        &file,
889        offset.try_into().unwrap(),
890        length.try_into().unwrap(),
891    )
892    .with_context(|| format!("length {}", length))?;
893    // nix::fcntl::fcntl(file.as_raw_fd(), nix::fcntl::F_FULLFSYNC)?;
894    // file.flush()?;
895    if check_holes {
896        if let Err(err) = check_hole(&mut file, offset as u64, length as u64) {
897            warn!("checking hole: {}", err);
898        }
899    }
900    Ok(true)
901}
902
903/// Checks that there's no data allocated in the region provided.
904pub fn check_hole(file: &mut File, offset: u64, length: u64) -> Result<()> {
905    match seekhole::seek_hole_whence(file, offset, seekhole::RegionType::Data)? {
906        // Data starts after the hole we just punched.
907        Some(seek_offset) if seek_offset >= offset + length => Ok(()),
908        // There's no data after the hole we just punched.
909        None => Ok(()),
910        otherwise => {
911            bail!("punched hole didn't appear: {:?}", otherwise)
912        }
913    }
914}
915
916fn delete_unused_snapshots(dir: &Path) -> Result<()> {
917    use walk::EntryType::*;
918    for entry in walk_dir(dir).context("walking dir")? {
919        match entry.entry_type {
920            SnapshotDir => {
921                // If the dir is not empty, it will be attempted after each snapshot value inside
922                // anyway.
923                let res = remove_dir(&entry.path);
924                debug!("removing snapshot dir {:?}: {:?}", &entry.path, res);
925            }
926            SnapshotValue => {
927                match std::fs::OpenOptions::new().write(true).open(&entry.path) {
928                    Err(err) if err.kind() == ErrorKind::NotFound => {}
929                    Err(err) => {
930                        return Err(err)
931                            .with_context(|| format!("opening snapshot value {:?}", &entry.path))
932                    }
933                    Ok(file) => {
934                        if file
935                            .lock_max_segment(LockExclusiveNonblock)
936                            .context("locking snapshot value")?
937                        {
938                            let res = remove_file(&entry.path);
939                            debug!("removing snapshot value file {:?}: {:?}", &entry.path, res);
940                            // Try to delete the parent directory, if it's empty it will succeed.
941                            let _ = remove_dir(
942                                entry
943                                    .path
944                                    .parent()
945                                    .expect("snapshot values must have a parent dir"),
946                            );
947                        } else {
948                            debug!("not deleting {:?}, still in use", &entry.path);
949                        }
950                    }
951                };
952            }
953            _ => {}
954        }
955    }
956    Ok(())
957}
958
959fn to_usize_io<F>(from: F) -> io::Result<usize>
960where
961    usize: TryFrom<F, Error = TryFromIntError>,
962{
963    convert_int_io(from)
964}
965
966fn convert_int_io<F, T>(from: F) -> io::Result<T>
967where
968    T: TryFrom<F, Error = TryFromIntError>,
969{
970    from.try_into()
971        .map_err(|_: TryFromIntError| make_to_usize_io_error())
972}
973
974fn make_to_usize_io_error() -> io::Error {
975    io::Error::new(TO_USIZE_IO_ERROR_KIND, TO_USIZE_IO_ERR_PAYLOAD)
976}
977
978const TO_USIZE_IO_ERROR_KIND: ErrorKind = InvalidInput;
979const TO_USIZE_IO_ERR_PAYLOAD: &str = "can't convert to usize";
980
981/// Increments the right most byte, overflowing leftwards. Returns false if incrementing the array
982/// overflows the available bytes.
983fn inc_big_endian_array(arr: &mut [u8]) -> bool {
984    for e in arr.iter_mut().rev() {
985        if *e == u8::MAX {
986            *e = 0
987        } else {
988            *e += 1;
989            return true;
990        }
991    }
992    false
993}