1#![allow(clippy::unused_unit)]
2
3use std::borrow::Borrow;
4use std::cmp::min;
5use std::collections::{BTreeSet, HashMap, HashSet};
6use std::ffi::{OsStr, OsString};
7use std::fmt::{Debug, Display, Formatter};
8use std::fs::{read_dir, remove_dir, remove_file, File, OpenOptions};
9use std::io::SeekFrom::{End, Start};
10use std::io::{ErrorKind, Read, Seek, Write};
11use std::num::TryFromIntError;
12use std::ops::{Deref, DerefMut};
13use std::path::{Path, PathBuf};
14use std::rc::Rc;
15use std::sync::OnceLock;
16use std::time::Duration;
17use std::{fs, io, str};
18
19use anyhow::{anyhow, bail, Context, Result};
20use cfg_if::cfg_if;
21use chrono::NaiveDateTime;
22use env::flocking;
23pub use error::*;
24use exclusive_file::ExclusiveFile;
25use file_id::FileId;
26pub use handle::Handle;
27use memmap2::Mmap;
28use num::Integer;
29use ownedtx::OwnedTx;
30use positioned_io::ReadAt;
31use rand::Rng;
32use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, ValueRef};
33use rusqlite::Error::QueryReturnedNoRows;
34use rusqlite::{params, CachedStatement, Connection, Statement, TransactionBehavior};
35use stable_deref_trait::StableDeref;
36use sys::*;
37use tempfile::TempDir;
38#[cfg(test)]
39pub use test_log::test;
40use tracing::*;
41use ErrorKind::InvalidInput;
42
43use crate::item::Item;
44use crate::walk::walk_dir;
45use crate::ValueLocation::{Nonzero, ZeroLength};
46
47mod c_api;
48mod cpathbuf;
49mod dir;
50mod error;
51mod exclusive_file;
52mod file_id;
53pub(crate) mod handle;
54mod item;
55mod owned_cell;
56pub mod sys;
57#[cfg(feature = "testing")]
58pub mod testing;
59#[cfg(test)]
60mod tests;
61mod tx;
62pub use tx::*;
63mod ownedtx;
64pub mod walk;
65pub use dir::*;
66pub mod env;
67mod reader;
68use reader::Reader;
69
70pub mod concurrency;
72use concurrency::*;
73
74use self::concurrency::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
75use crate::handle::WithHandle;
76
77pub type PubResult<T> = Result<T, Error>;
79
80#[derive(Debug)]
81struct FileClone {
82 file: File,
83 #[allow(unused)]
85 tempdir: Option<Arc<TempDir>>,
86 mmap: Option<Mmap>,
87 len: u64,
88}
89
90type FileCloneCache = HashMap<FileId, Arc<Mutex<FileClone>>>;
91
92impl FileClone {
93 fn get_mmap(&mut self) -> io::Result<&Mmap> {
94 let mmap_opt = &mut self.mmap;
95 if let Some(mmap) = mmap_opt {
96 return Ok(mmap);
97 }
98 let mmap = unsafe {
99 memmap2::MmapOptions::new()
100 .len(self.len.try_into().unwrap())
101 .map_copy_read_only(&self.file)
102 }?;
103 assert_eq!(mmap.len() as u64, self.len);
104 Ok(mmap_opt.insert(mmap))
105 }
106}
107
108#[derive(Debug)]
109struct PendingWrite {
110 key: Vec<u8>,
111 value_file_offset: u64,
112 value_length: u64,
113 value_file_id: FileId,
114}
115
116const MANIFEST_SCHEMA_SQL: &str = include_str!("../manifest.sql");
117
118fn init_manifest_schema(conn: &rusqlite::Connection) -> rusqlite::Result<()> {
119 conn.execute_batch(MANIFEST_SCHEMA_SQL)
120}
121
122pub struct BeginWriteValue<'writer, H>
125where
126 H: WithHandle,
127{
128 batch: &'writer mut BatchWriter<H>,
129}
130
131impl<H> BeginWriteValue<'_, H>
132where
133 H: WithHandle,
134{
135 pub fn clone_file(self, file: &mut File) -> PubResult<ValueWriter> {
143 if !self
144 .batch
145 .handle
146 .with_handle(Handle::dir_supports_file_cloning)
147 {
148 return self.copy_file(file);
149 }
150 let dst_path = loop {
151 let dst_path = self
152 .batch
153 .handle
154 .with_handle(|handle| handle.dir.path().join(FileId::random().values_file_path()));
155 match fclonefile_noflags(file, &dst_path) {
156 Err(err) if CloneFileError::is_unsupported(&err) => {
157 return self.copy_file(file);
158 }
159 Err(err) if err.is_file_already_exists() => continue,
160 Err(err) => return Err(err.into()),
161 Ok(()) => break dst_path,
162 }
163 };
164 let exclusive_file = ExclusiveFile::open(dst_path)?.unwrap();
168 Ok(ValueWriter {
169 exclusive_file,
170 value_file_offset: 0,
171 })
172 }
173
174 fn copy_file(self, file: &mut File) -> PubResult<ValueWriter> {
176 let mut value_writer = self.begin()?;
177 file.seek(Start(0))?;
179 value_writer.copy_from(file)?;
180 Ok(value_writer)
181 }
182
183 pub fn begin(self) -> PubResult<ValueWriter> {
185 let mut exclusive_file = self.batch.get_exclusive_file()?;
186 Ok(ValueWriter {
187 value_file_offset: exclusive_file.next_write_offset()?,
188 exclusive_file,
189 })
190 }
191}
192
193#[derive(Debug)]
195pub struct ValueWriter {
196 exclusive_file: ExclusiveFile,
197 value_file_offset: u64,
198}
199
200impl ValueWriter {
201 pub fn get_file(&mut self) -> Result<&mut File> {
202 Ok(&mut self.exclusive_file.inner)
203 }
204
205 pub fn copy_from(&mut self, mut value: impl Read) -> PubResult<u64> {
206 let value_file_offset = self.exclusive_file.next_write_offset()?;
207 let value_length = match std::io::copy(&mut value, &mut self.exclusive_file.inner) {
208 Ok(ok) => ok,
209 Err(err) => {
210 self.exclusive_file
211 .inner
212 .seek(Start(value_file_offset))
213 .expect("should rewind failed copy");
214 return Err(err.into());
215 }
216 };
217 Ok(value_length)
218 }
219
220 pub fn value_length(&mut self) -> io::Result<u64> {
221 Ok(self.exclusive_file.next_write_offset()? - self.value_file_offset)
222 }
223}
224
225impl Write for ValueWriter {
226 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
227 let file = &mut self.exclusive_file.inner;
228 file.write(buf)
229 }
230
231 fn flush(&mut self) -> io::Result<()> {
232 Ok(())
234 }
235}
236
237#[derive(Debug)]
238struct ValueRename {
239 value: Value,
240 new_key: Vec<u8>,
241}
242
243#[derive(Debug)]
245pub struct BatchWriter<H>
246where
247 H: WithHandle,
248{
249 handle: H,
250 exclusive_files: Vec<ExclusiveFile>,
251 pending_writes: Vec<PendingWrite>,
252 value_renames: Vec<ValueRename>,
253}
254
255impl<H> BatchWriter<H>
256where
257 H: WithHandle,
258{
259 pub fn new(handle: H) -> Self {
260 Self {
261 handle,
262 exclusive_files: Default::default(),
263 pending_writes: Default::default(),
264 value_renames: Default::default(),
265 }
266 }
267}
268
269pub type TimestampInner = NaiveDateTime;
270
271#[derive(Debug, PartialEq, Copy, Clone, PartialOrd)]
272pub struct Timestamp(TimestampInner);
273
274impl FromSql for Timestamp {
275 fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
276 let int_time = value.as_i64()?;
277 Ok(Self(
278 TimestampInner::from_timestamp_millis(int_time)
279 .ok_or(FromSqlError::OutOfRange(int_time))?,
280 ))
281 }
282}
283
284pub const LAST_USED_RESOLUTION: Duration = Duration::from_millis(1);
286
287impl Deref for Timestamp {
288 type Target = TimestampInner;
289
290 fn deref(&self) -> &Self::Target {
291 &self.0
292 }
293}
294
295pub struct WriteCommitResult {
296 count: usize,
297}
298
299impl WriteCommitResult {
300 pub fn count(&self) -> usize {
301 self.count
302 }
303}
304
305const VALUE_COLUMN_NAMES: &[&str] = &["file_id", "file_offset", "value_length", "last_used"];
306
307fn value_columns_sql() -> &'static str {
308 static ONCE: OnceLock<String> = OnceLock::new();
309 ONCE.get_or_init(|| VALUE_COLUMN_NAMES.join(", ")).as_str()
310}
311
312impl<H> BatchWriter<H>
313where
314 H: WithHandle,
315{
316 fn get_exclusive_file(&mut self) -> Result<ExclusiveFile> {
317 if let Some(ef) = self.exclusive_files.pop() {
318 debug!("reusing exclusive file from writer");
319 return Ok(ef);
320 }
321 self.handle.with_handle(Handle::get_exclusive_file)
322 }
323
324 pub fn stage_write(&mut self, key: Vec<u8>, mut value: ValueWriter) -> anyhow::Result<()> {
325 let value_length = match value.value_length() {
326 Ok(ok) => ok,
327 Err(err) => {
328 if let Err(err) = value
329 .exclusive_file
330 .revert_to_offset(value.value_file_offset)
331 {
332 error!("error reverting value write: {:#?}", err);
333 }
334 return Err(err.into());
337 }
338 };
339 let exclusive_file = value.exclusive_file;
340 let value_file_id = exclusive_file.id;
341 self.exclusive_files.push(exclusive_file);
342 self.pending_writes.push(PendingWrite {
343 key,
344 value_file_offset: value.value_file_offset,
345 value_length,
346 value_file_id,
347 });
348 Ok(())
349 }
350
351 pub fn new_value(&mut self) -> BeginWriteValue<H> {
352 BeginWriteValue { batch: self }
353 }
354
355 pub fn rename_value(&mut self, value: Value, key: Vec<u8>) {
356 self.value_renames.push(ValueRename {
357 value,
358 new_key: key,
359 });
360 }
361
362 pub fn commit(self) -> Result<WriteCommitResult> {
363 self.commit_inner(|| {})
364 }
365
366 fn commit_inner(mut self, before_write: impl Fn()) -> Result<WriteCommitResult> {
367 if flocking() {
368 for ef in &mut self.exclusive_files {
369 assert!(ef.downgrade_lock()?);
370 }
371 }
372 let write_commit_res = self.handle.with_handle(|handle| {
373 let mut transaction: OwnedTx = handle.start_immediate_transaction()?;
374 let mut write_commit_res = WriteCommitResult { count: 0 };
375 for pw in self.pending_writes.drain(..) {
376 before_write();
377 transaction.delete_key(&pw.key)?;
378 transaction.insert_key(pw)?;
379 write_commit_res.count += 1;
380 }
381 for vr in self.value_renames.drain(..) {
382 transaction.rename_value(&vr.value, vr.new_key)?;
383 }
384 let work = transaction.commit().context("commit transaction")?;
386 work.complete();
387 anyhow::Ok(write_commit_res)
388 })?;
389 self.flush_exclusive_files();
390 Ok(write_commit_res)
391 }
392
393 fn flush_exclusive_files(&mut self) {
395 for ef in &mut self.exclusive_files {
396 ef.committed().unwrap();
397 }
398 self.return_exclusive_files_to_handle()
399 }
400
401 fn return_exclusive_files_to_handle(&mut self) {
402 if flocking() {
405 return;
406 }
407 self.handle.with_handle(|handle| {
408 let mut handle_exclusive_files = handle.exclusive_files.lock().unwrap();
409 for ef in self.exclusive_files.drain(..) {
410 debug!("returning exclusive file {} to handle", ef.id);
411 assert!(handle_exclusive_files.insert(ef.id, ef).is_none());
412 }
413 })
414 }
415}
416
417impl<H> Drop for BatchWriter<H>
418where
419 H: WithHandle,
420{
421 fn drop(&mut self) {
422 self.return_exclusive_files_to_handle()
423 }
424}
425
426type ValueLength = u64;
427
428#[derive(Debug, Clone, PartialEq, Copy)]
429pub struct Value {
430 pub location: ValueLocation,
431 last_used: Timestamp,
432}
433
434#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq, Copy)]
436pub struct NonzeroValueLocation {
437 pub file_id: FileId,
438 pub file_offset: u64,
439 pub length: ValueLength,
440}
441
442#[derive(Debug, Clone, PartialEq, Copy)]
445pub enum ValueLocation {
446 ZeroLength,
447 Nonzero(NonzeroValueLocation),
448}
449
450impl ValueLocation {
451 pub fn into_non_zero(self) -> Option<NonzeroValueLocation> {
452 match self {
453 ZeroLength => None,
454 Nonzero(a) => Some(a),
455 }
456 }
457
458 pub fn file_offset(&self) -> Option<u64> {
459 match self {
460 ZeroLength => None,
461 Nonzero(NonzeroValueLocation { file_offset, .. }) => Some(*file_offset),
462 }
463 }
464
465 pub fn file_id(&self) -> Option<&FileId> {
466 match self {
467 ZeroLength => None,
468 Nonzero(NonzeroValueLocation { file_id, .. }) => Some(file_id),
469 }
470 }
471
472 pub fn length(&self) -> u64 {
473 match self {
474 ZeroLength => 0,
475 Nonzero(NonzeroValueLocation { length, .. }) => *length,
476 }
477 }
478}
479
480impl Deref for Value {
481 type Target = ValueLocation;
482
483 fn deref(&self) -> &Self::Target {
484 &self.location
485 }
486}
487
488impl Value {
489 fn from_row(row: &rusqlite::Row) -> rusqlite::Result<Self> {
490 Self::from_column_values(row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)
491 }
492
493 fn from_column_values(
494 file_id: Option<FileId>,
495 file_offset: Option<u64>,
496 length: ValueLength,
497 last_used: Timestamp,
498 ) -> rusqlite::Result<Self> {
499 let location = if length == 0 {
500 assert_eq!(file_id, None);
501 assert_eq!(file_offset, None);
502 ZeroLength
503 } else {
504 Nonzero(NonzeroValueLocation {
505 file_id: file_id.unwrap(),
506 file_offset: file_offset.unwrap(),
507 length,
508 })
509 };
510 Ok(Value {
511 location,
512 last_used,
513 })
514 }
515
516 pub fn last_used(&self) -> Timestamp {
517 self.last_used
518 }
519}
520
521impl AsRef<Value> for Value {
522 fn as_ref(&self) -> &Value {
523 self
524 }
525}
526
527impl AsMut<Snapshot> for Snapshot {
528 fn as_mut(&mut self) -> &mut Snapshot {
529 self
530 }
531}
532
533impl AsRef<Snapshot> for Snapshot {
534 fn as_ref(&self) -> &Self {
535 self
536 }
537}
538
539#[derive(Debug)]
540pub struct Snapshot {
541 file_clones: HashMap<FileId, Arc<Mutex<FileClone>>>,
542}
543
544#[derive(Debug)]
545pub struct SnapshotValue<V> {
546 value: V,
547 cloned_file: Option<Arc<Mutex<FileClone>>>,
549}
550
551impl<V> Deref for SnapshotValue<V> {
552 type Target = V;
553
554 fn deref(&self) -> &Self::Target {
555 &self.value
556 }
557}
558
559impl Snapshot {
560 pub fn value<V>(&self, value: V) -> SnapshotValue<V>
561 where
562 V: AsRef<Value>,
563 {
564 SnapshotValue {
565 cloned_file: value
566 .as_ref()
567 .file_id()
568 .map(|file_id| Arc::clone(self.file_clones.get(file_id).unwrap())),
569 value,
570 }
571 }
572}
573
574impl<V> ReadAt for SnapshotValue<V>
575where
576 V: AsRef<Value>,
577{
578 fn read_at(&self, pos: u64, mut buf: &mut [u8]) -> io::Result<usize> {
579 if false {
580 let n = self.view(|view| {
583 let r = view;
584 r.read_at(pos, buf)
585 })??;
586 Ok(n)
588 } else {
589 match self.value.as_ref().location {
590 ValueLocation::ZeroLength => Ok(0),
591 Nonzero(NonzeroValueLocation {
592 file_offset,
593 length,
594 ..
595 }) => {
596 if pos >= length {
597 return Ok(0);
598 }
599 let available = length - pos;
600 buf = buf
601 .split_at_mut(min(buf.len() as u64, available) as usize)
602 .0;
603 let mut file_clone = self.file_clone().unwrap().lock().unwrap();
604 let file = &mut file_clone.file;
605 let file_offset = file_offset + pos;
606 let res = file.read_at(file_offset, buf);
608 debug!(
609 ?file,
610 ?file_offset,
611 len = buf.len(),
612 ?res,
613 "snapshot value read_at"
614 );
615 res
616 }
617 }
618 }
619 }
620}
621
622impl<V> SnapshotValue<V>
623where
624 V: AsRef<Value>,
625{
626 fn file_clone(&self) -> Option<&Arc<Mutex<FileClone>>> {
627 self.cloned_file.as_ref()
628 }
629
630 pub fn view<R>(&self, f: impl FnOnce(&[u8]) -> R) -> io::Result<R> {
631 let value = self.value.as_ref();
632 match value.location {
633 Nonzero(NonzeroValueLocation {
634 file_offset,
635 length,
636 ..
637 }) => {
638 let file_clone = self.file_clone().unwrap();
639 let start = to_usize_io(file_offset)?;
640 let usize_length = to_usize_io(length)?;
641 let end =
642 usize::checked_add(start, usize_length).ok_or_else(make_to_usize_io_error)?;
643 let mut mutex_guard = file_clone.lock().unwrap();
644 let mmap = mutex_guard.get_mmap()?;
645 Ok(f(&mmap[start..end]))
646 }
647 ZeroLength => Ok(f(&[])),
648 }
649 }
650
651 pub fn read(&self, mut buf: &mut [u8]) -> Result<usize> {
652 match self.value.as_ref().location {
653 ValueLocation::ZeroLength => Ok(0),
654 Nonzero(NonzeroValueLocation {
655 file_offset,
656 length,
657 ..
658 }) => {
659 buf = buf.split_at_mut(min(buf.len() as u64, length) as usize).0;
660 let mut file_clone = self.file_clone().unwrap().lock().unwrap();
661 let file = &mut file_clone.file;
662 file.seek(Start(file_offset))?;
663 let res = file.read(buf);
664 debug!(
665 ?file,
666 ?file_offset,
667 len = buf.len(),
668 ?res,
669 "snapshot value read"
670 );
671 res.map_err(Into::into)
672 }
673 }
674 }
675
676 pub fn new_reader(&self) -> impl Read + '_ {
677 positioned_io::Cursor::new(self)
678 }
679
680 pub fn leak_snapshot_dir(&self) {
683 if let Some(file_clone) = self.file_clone() {
684 if let Some(tempdir) = &file_clone.lock().unwrap().tempdir {
685 std::mem::forget(Arc::clone(tempdir));
686 }
687 }
688 }
689}
690
691pub struct FileValues<'a, S>
694where
695 S: Deref<Target = Statement<'a>> + DerefMut + 'a,
696{
697 stmt: S,
698 file_id: FileId,
699}
700
701impl<'a, S> FileValues<'a, S>
702where
703 S: Deref<Target = Statement<'a>> + DerefMut + 'a,
704{
705 pub fn begin(
706 &mut self,
707 ) -> rusqlite::Result<impl Iterator<Item = rusqlite::Result<Value>> + '_> {
708 self.stmt.query_map([self.file_id], Value::from_row)
709 }
710}
711
712#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone)]
713struct ReadExtent {
714 pub offset: u64,
715 pub len: u64,
716}
717
718#[allow(dead_code)]
719fn floored_multiple<T>(value: T, multiple: T) -> T
720where
721 T: Integer + Copy,
722{
723 multiple * (value / multiple)
725}
726
727pub fn ceil_multiple<T>(value: T, multiple: T) -> T
729where
730 T: Integer + Copy,
731{
732 (value + multiple - T::one()) / multiple * multiple
733}
734
735fn open_file_id(options: &OpenOptions, dir: &Path, file_id: &FileId) -> io::Result<File> {
736 options.open(file_path(dir, file_id))
737}
738
739fn file_path(dir: &Path, file_id: impl AsRef<FileId>) -> PathBuf {
740 dir.join(file_id.as_ref().values_file_path())
741}
742
743fn random_file_name_in_dir(dir: &Path, prefix: &str) -> PathBuf {
744 let base = random_file_name(prefix);
745 dir.join(base)
746}
747
748const FILE_NAME_RAND_LENGTH: usize = 8;
749const VALUES_FILE_NAME_PREFIX: &str = "values-";
750const SNAPSHOT_DIR_NAME_PREFIX: &str = "snapshot-";
751
752fn random_file_name(prefix: &str) -> OsString {
753 let mut begin = prefix.as_bytes().to_vec();
754 begin.extend(
755 rand::thread_rng()
756 .sample_iter(rand::distributions::Alphanumeric)
757 .take(FILE_NAME_RAND_LENGTH),
758 );
759 unsafe { OsString::from_encoded_bytes_unchecked(begin) }
760}
761
762pub const MANIFEST_DB_FILE_NAME: &str = "manifest.db";
763
764struct PunchValueConstraints {
765 greedy_start: bool,
766 check_hole: bool,
767 greedy_end: bool,
768 allow_truncate: bool,
769 allow_remove: bool,
770}
771
772impl Default for PunchValueConstraints {
773 fn default() -> Self {
774 Self {
775 greedy_start: true,
776 check_hole: true,
777 greedy_end: true,
778 allow_truncate: true,
779 allow_remove: true,
780 }
781 }
782}
783
784struct PunchValueOptions<'a> {
785 dir: &'a Path,
786 file_id: &'a FileId,
787 offset: u64,
788 length: u64,
789 tx: &'a ReadTransactionOwned<'a>,
790 block_size: u64,
791 constraints: PunchValueConstraints,
792}
793
794fn punch_value(opts: PunchValueOptions) -> Result<bool> {
796 let PunchValueOptions {
797 dir,
798 file_id,
799 offset,
800 length,
801 tx,
802 block_size,
803 constraints:
804 PunchValueConstraints {
805 greedy_start,
806 check_hole: check_holes,
807 allow_truncate,
808 allow_remove,
809 greedy_end,
810 },
811 } = opts;
812 let cloning_lock_aware = false;
813 let mut offset = offset as i64;
815 let mut length = length as i64;
816 let block_size = block_size as i64;
817 let file_path = file_path(dir, file_id);
818 let mut file = match OpenOptions::new().write(true).open(&file_path) {
820 Err(err) if err.kind() == ErrorKind::NotFound && allow_remove => return Ok(true),
822 Err(err) => return Err(err).context("opening value file"),
823 Ok(ok) => ok,
824 };
825 if offset % block_size != 0 || greedy_start {
827 let last_end_offset = tx.query_last_end_offset(file_id, offset as u64)?;
828 let new_offset = ceil_multiple(last_end_offset, block_size as u64) as i64;
830 length += offset - new_offset;
832 offset = new_offset;
833 }
834 assert_eq!(offset % block_size, 0);
835 if greedy_end {
836 let next_offset = tx.next_value_offset(file_id, (offset + length).try_into().unwrap())?;
837 let end_offset = match next_offset {
838 None => {
839 let locked_file = file
841 .lock_max_segment(LockExclusiveNonblock)
842 .context("locking value file")?;
843 let file_end = file.seek(End(0))? as i64;
845 if locked_file {
846 if offset == 0 && allow_remove {
849 remove_file(file_path).context("removing value file")?;
850 return Ok(true);
851 } else if allow_truncate {
852 file.set_len(offset as u64)?;
853 return Ok(true);
854 }
855 file_end
856 } else if cloning_lock_aware {
857 floored_multiple(file_end, block_size)
860 } else {
861 floored_multiple(offset + length, block_size)
862 }
863 }
864 Some(next_offset) => floored_multiple(next_offset as i64, block_size),
865 };
866 let new_length = end_offset - offset;
867 length = new_length;
868 } else {
869 let end_offset = floored_multiple(offset + length, block_size);
870 length = end_offset - offset;
871 }
872 debug!(target: "punching", "punching {} {} for {}", file_id, offset, length);
873 assert!(length >= -block_size);
876 if length <= 0 {
877 return Ok(true);
878 }
879 assert_eq!(offset % block_size, 0);
880 if !file.lock_segment(LockExclusiveNonblock, Some(length as u64), offset as u64)? {
881 warn!(%file_id, %offset, %length, "can't punch, file segment locked");
884 return Ok(false);
885 }
886 debug!(?file, %offset, %length, "punching");
887 punchfile(
888 &file,
889 offset.try_into().unwrap(),
890 length.try_into().unwrap(),
891 )
892 .with_context(|| format!("length {}", length))?;
893 if check_holes {
896 if let Err(err) = check_hole(&mut file, offset as u64, length as u64) {
897 warn!("checking hole: {}", err);
898 }
899 }
900 Ok(true)
901}
902
903pub fn check_hole(file: &mut File, offset: u64, length: u64) -> Result<()> {
905 match seekhole::seek_hole_whence(file, offset, seekhole::RegionType::Data)? {
906 Some(seek_offset) if seek_offset >= offset + length => Ok(()),
908 None => Ok(()),
910 otherwise => {
911 bail!("punched hole didn't appear: {:?}", otherwise)
912 }
913 }
914}
915
916fn delete_unused_snapshots(dir: &Path) -> Result<()> {
917 use walk::EntryType::*;
918 for entry in walk_dir(dir).context("walking dir")? {
919 match entry.entry_type {
920 SnapshotDir => {
921 let res = remove_dir(&entry.path);
924 debug!("removing snapshot dir {:?}: {:?}", &entry.path, res);
925 }
926 SnapshotValue => {
927 match std::fs::OpenOptions::new().write(true).open(&entry.path) {
928 Err(err) if err.kind() == ErrorKind::NotFound => {}
929 Err(err) => {
930 return Err(err)
931 .with_context(|| format!("opening snapshot value {:?}", &entry.path))
932 }
933 Ok(file) => {
934 if file
935 .lock_max_segment(LockExclusiveNonblock)
936 .context("locking snapshot value")?
937 {
938 let res = remove_file(&entry.path);
939 debug!("removing snapshot value file {:?}: {:?}", &entry.path, res);
940 let _ = remove_dir(
942 entry
943 .path
944 .parent()
945 .expect("snapshot values must have a parent dir"),
946 );
947 } else {
948 debug!("not deleting {:?}, still in use", &entry.path);
949 }
950 }
951 };
952 }
953 _ => {}
954 }
955 }
956 Ok(())
957}
958
959fn to_usize_io<F>(from: F) -> io::Result<usize>
960where
961 usize: TryFrom<F, Error = TryFromIntError>,
962{
963 convert_int_io(from)
964}
965
966fn convert_int_io<F, T>(from: F) -> io::Result<T>
967where
968 T: TryFrom<F, Error = TryFromIntError>,
969{
970 from.try_into()
971 .map_err(|_: TryFromIntError| make_to_usize_io_error())
972}
973
974fn make_to_usize_io_error() -> io::Error {
975 io::Error::new(TO_USIZE_IO_ERROR_KIND, TO_USIZE_IO_ERR_PAYLOAD)
976}
977
978const TO_USIZE_IO_ERROR_KIND: ErrorKind = InvalidInput;
979const TO_USIZE_IO_ERR_PAYLOAD: &str = "can't convert to usize";
980
981fn inc_big_endian_array(arr: &mut [u8]) -> bool {
984 for e in arr.iter_mut().rev() {
985 if *e == u8::MAX {
986 *e = 0
987 } else {
988 *e += 1;
989 return true;
990 }
991 }
992 false
993}