1#![doc = include_str!("../README.md")]
2use std::{
8 collections::HashSet,
9 fs::{self, File, OpenOptions},
10 ops::Deref,
11 os::unix::io::AsRawFd,
12 path::{Path, PathBuf},
13 sync::{Arc, Weak},
14};
15
16use libc::off_t;
17use log::debug;
18use memmap2::{MmapMut, MmapOptions};
19use parking_lot::{RwLock, RwLockReadGuard};
20
21pub mod error;
22mod layout;
23mod reader;
24mod region;
25mod regions;
26
27pub use error::*;
28use layout::*;
29use rayon::prelude::*;
30pub use reader::*;
31pub use region::*;
32use regions::*;
33
34pub const PAGE_SIZE: u64 = 4096;
35pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
36const GB: usize = 1024 * 1024 * 1024;
37
38#[derive(Debug, Clone)]
43pub struct Database(Arc<DatabaseInner>);
44
45#[derive(Debug)]
46pub struct DatabaseInner {
47 path: PathBuf,
48 regions: RwLock<Regions>,
49 layout: RwLock<Layout>,
50 file: RwLock<File>,
51 mmap: RwLock<MmapMut>,
52}
53
54impl Database {
55 pub fn open(path: &Path) -> Result<Self> {
57 Self::open_with_min_len(path, 0)
58 }
59
60 pub fn open_with_min_len(path: &Path, min_len: u64) -> Result<Self> {
62 fs::create_dir_all(path)?;
63
64 let file = OpenOptions::new()
65 .read(true)
66 .create(true)
67 .write(true)
68 .truncate(false)
69 .open(Self::data_path_(path))?;
70 debug!("File opened.");
71
72 file.try_lock()?;
73 debug!("File locked.");
74
75 let file_len = file.metadata()?.len();
76 if file_len < min_len {
77 file.set_len(min_len)?;
78 debug!("File extended.");
79 file.sync_all()?;
80 }
81
82 let regions = Regions::open(path)?;
83 let mmap = Self::create_mmap(&file)?;
84 debug!("Mmap created.");
85
86 let db = Self(Arc::new(DatabaseInner {
87 path: path.to_owned(),
88 file: RwLock::new(file),
89 mmap: RwLock::new(mmap),
90 regions: RwLock::new(regions),
91 layout: RwLock::new(Layout::default()),
92 }));
93
94 db.regions.write().fill_index_to_region(&db)?;
95 debug!("Filled regions.");
96 *db.layout.write() = Layout::from(&*db.regions.read());
97 debug!("Layout created.");
98
99 Ok(db)
100 }
101
102 pub fn file_len(&self) -> Result<u64> {
103 Ok(self.file.read().metadata()?.len())
104 }
105
106 pub fn set_min_len(&self, len: u64) -> Result<()> {
107 let len = Self::ceil_number_to_page_size_multiple(len);
108
109 let file_len = self.file_len()?;
110 if file_len >= len {
111 return Ok(());
112 }
113
114 let mut mmap = self.mmap.write();
115 let file = self.file.write();
116 file.set_len(len)?;
117 file.sync_all()?;
118 *mmap = Self::create_mmap(&file)?;
119 Ok(())
120 }
121
122 pub fn set_min_regions(&self, regions: usize) -> Result<()> {
123 self.regions
124 .write()
125 .set_min_len((regions * SIZE_OF_REGION_METADATA) as u64)?;
126 self.set_min_len(regions as u64 * PAGE_SIZE)
127 }
128
129 pub fn get_region(&self, id: &str) -> Option<Region> {
131 self.regions.read().get_region_from_id(id).cloned()
132 }
133
134 pub fn create_region_if_needed(&self, id: &str) -> Result<Region> {
136 if let Some(region) = self.get_region(id) {
137 return Ok(region);
138 }
139
140 let mut regions = self.regions.write();
141 let mut layout = self.layout.write();
142
143 let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
144 layout.remove_or_compress_hole(start, PAGE_SIZE);
145 start
146 } else {
147 let start = layout
148 .get_last_region()
149 .map(|(_, region)| {
150 let region_meta = region.meta().read();
151 region_meta.start() + region_meta.reserved()
152 })
153 .unwrap_or_default();
154
155 let len = start + PAGE_SIZE;
156
157 self.set_min_len(len)?;
158
159 start
160 };
161
162 let region = regions.create_region(self, id.to_owned(), start)?;
163
164 layout.insert_region(start, ®ion);
165
166 Ok(region)
167 }
168
169 #[inline]
170 pub fn write_all_to_region(&self, region: &Region, data: &[u8]) -> Result<()> {
171 self.write_all_to_region_at_(region, data, None, false)
172 }
173
174 #[inline]
175 pub fn write_all_to_region_at(&self, region: &Region, data: &[u8], at: u64) -> Result<()> {
176 self.write_all_to_region_at_(region, data, Some(at), false)
177 }
178
179 #[inline]
180 pub fn truncate_write_all_to_region(
181 &self,
182 region: &Region,
183 at: u64,
184 data: &[u8],
185 ) -> Result<()> {
186 self.write_all_to_region_at_(region, data, Some(at), true)
187 }
188
189 fn write_all_to_region_at_(
190 &self,
191 region: &Region,
192 data: &[u8],
193 at: Option<u64>,
194 truncate: bool,
195 ) -> Result<()> {
196 let region_meta = region.meta().read();
197 let start = region_meta.start();
198 let reserved = region_meta.reserved();
199 let len = region_meta.len();
200 drop(region_meta);
201
202 let data_len = data.len() as u64;
203
204 if let Some(at_val) = at
208 && at_val > len
209 {
210 return Err(Error::WriteOutOfBounds {
211 position: at_val,
212 region_len: len,
213 });
214 }
215
216 let new_len = at.map_or(len + data_len, |at| {
217 let new_len = at + data_len;
218 if truncate { new_len } else { new_len.max(len) }
219 });
220 let write_start = start + at.unwrap_or(len);
221
222 if new_len <= reserved {
224 if at.is_none() {
229 self.write(write_start, data);
230 }
231
232 let mut region_meta = region.meta().write();
233
234 if at.is_some() {
235 self.write(write_start, data);
236 }
237
238 region_meta.set_len(new_len);
239
240 return Ok(());
241 }
242
243 assert!(new_len > reserved);
244 let mut new_reserved = reserved;
245 while new_len > new_reserved {
246 new_reserved *= 2;
247 }
248 assert!(new_len <= new_reserved);
249 let added_reserve = new_reserved - reserved;
250
251 let mut layout = self.layout.write();
252
253 if layout.is_last_anything(region) {
255 self.set_min_len(start + new_reserved)?;
258 let mut region_meta = region.meta().write();
259 region_meta.set_reserved(new_reserved);
260 drop(region_meta);
261 drop(layout);
262
263 self.write(write_start, data);
264
265 let mut region_meta = region.meta().write();
266 region_meta.set_len(new_len);
267
268 return Ok(());
269 }
270
271 let hole_start = start + reserved;
273 if layout
274 .get_hole(hole_start)
275 .is_some_and(|gap| gap >= added_reserve)
276 {
277 layout.remove_or_compress_hole(hole_start, added_reserve);
280 let mut region_meta = region.meta().write();
281 region_meta.set_reserved(new_reserved);
282 drop(region_meta);
283 drop(layout);
284
285 self.write(write_start, data);
286
287 let mut region_meta = region.meta().write();
288 region_meta.set_len(new_len);
289
290 return Ok(());
291 }
292
293 if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
295 layout.remove_or_compress_hole(hole_start, new_reserved);
298 drop(layout);
299
300 self.write(
301 hole_start,
302 &self.mmap.read()[start as usize..write_start as usize],
303 );
304
305 self.write(hole_start + at.unwrap_or(len), data);
306
307 let mut layout = self.layout.write();
308 layout.move_region(hole_start, region)?;
309
310 let mut region_meta = region.meta().write();
311 region_meta.set_start(hole_start);
312 region_meta.set_reserved(new_reserved);
313 region_meta.set_len(new_len);
314
315 return Ok(());
316 }
317
318 let new_start = layout.len();
319 self.set_min_len(new_start + new_reserved)?;
326 layout.reserve(new_start, new_reserved);
327 drop(layout);
328
329 self.write(
331 new_start,
332 &self.mmap.read()[start as usize..write_start as usize],
333 );
334 self.write(new_start + at.unwrap_or(len), data);
335
336 let mut layout = self.layout.write();
337 layout.move_region(new_start, region)?;
338 assert!(layout.reserved(new_start) == Some(new_reserved));
339
340 let mut region_meta = region.meta().write();
341 region_meta.set_start(new_start);
342 region_meta.set_reserved(new_reserved);
343 region_meta.set_len(new_len);
344
345 Ok(())
346 }
347
348 #[inline]
349 fn write(&self, at: u64, data: &[u8]) {
350 let mmap = self.mmap.read();
351 let data_len = data.len();
352 let start = at as usize;
353 let end = start + data_len;
354 if end > mmap.len() {
355 unreachable!("Trying to write beyond mmap")
356 }
357
358 (unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) })
359 [start..end]
360 .copy_from_slice(data);
361 }
362
363 pub fn truncate_region(&self, region: &Region, from: u64) -> Result<()> {
369 let mut region_meta = region.meta().write();
370 let len = region_meta.len();
371 if from == len {
372 return Ok(());
373 } else if from > len {
374 return Err(Error::TruncateInvalid {
375 from,
376 current_len: len,
377 });
378 }
379 region_meta.set_len(from);
380 Ok(())
381 }
382
383 pub fn remove_region_with_id(&self, id: &str) -> Result<Option<Region>> {
384 let Some(region) = self.get_region(id) else {
385 return Ok(None);
386 };
387 self.remove_region(region)
388 }
389
390 pub fn remove_region(&self, region: Region) -> Result<Option<Region>> {
391 let mut regions = self.regions.write();
392 let mut layout = self.layout.write();
393 layout.remove_region(®ion)?;
394 regions.remove_region(region)
395 }
396
397 pub fn retain_regions(&self, mut ids: HashSet<String>) -> Result<()> {
398 let regions_to_remove = self
399 .regions
400 .read()
401 .id_to_index()
402 .keys()
403 .filter(|id| !ids.remove(&**id))
404 .flat_map(|id| self.get_region(id))
405 .collect::<Vec<Region>>();
406
407 regions_to_remove
408 .into_iter()
409 .try_for_each(|region| -> Result<()> {
410 self.remove_region(region)?;
411 Ok(())
412 })
413 }
414
415 #[inline]
416 fn create_mmap(file: &File) -> Result<MmapMut> {
417 Ok(unsafe { MmapOptions::new().map_mut(file)? })
418 }
419
420 #[inline]
421 pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
422 self.mmap.read()
423 }
424
425 #[inline]
426 pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
427 self.regions.read()
428 }
429
430 #[inline]
431 pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
432 self.layout.read()
433 }
434
435 #[inline]
436 fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
437 (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
438 }
439
440 #[inline]
441 fn data_path(&self) -> PathBuf {
442 Self::data_path_(&self.path)
443 }
444 #[inline]
445 fn data_path_(path: &Path) -> PathBuf {
446 path.join("data")
447 }
448
449 #[inline]
452 pub fn open_read_only_file(&self) -> Result<File> {
453 File::open(self.data_path()).map_err(Error::from)
454 }
455
456 pub fn disk_usage(&self) -> String {
457 let path = self.data_path();
458
459 let output = std::process::Command::new("du")
460 .arg("-h")
461 .arg(&path)
462 .output()
463 .expect("Failed to run du");
464
465 String::from_utf8_lossy(&output.stdout)
466 .replace(path.to_str().unwrap(), " ")
467 .trim()
468 .to_string()
469 }
470
471 pub fn flush(&self) -> Result<()> {
472 let mmap = self.mmap.read();
473 let regions = self.regions.read();
474 mmap.flush()?;
475 regions.flush()?;
476
477 self.layout.write().promote_pending_holes();
479
480 Ok(())
481 }
482
483 #[inline]
484 pub fn compact(&self) -> Result<()> {
485 self.flush()?;
486 self.punch_holes()
487 }
488
489 fn punch_holes(&self) -> Result<()> {
490 let file = self.file.write();
491 let mut mmap = self.mmap.write();
492 let regions = self.regions.read();
493 let layout = self.layout.read();
494
495 let mut punched = regions
496 .index_to_region()
497 .par_iter()
498 .flatten()
499 .map(|region| -> Result<usize> {
500 let region_meta = region.meta().read();
502 let rstart = region_meta.start();
503 let len = region_meta.len();
504 let reserved = region_meta.reserved();
505 let ceil_len = Self::ceil_number_to_page_size_multiple(len);
506 assert!(len <= ceil_len);
507 if ceil_len > reserved {
508 panic!()
509 } else if ceil_len < reserved {
510 let start = rstart + ceil_len;
511 let hole = reserved - ceil_len;
512 if Self::approx_has_punchable_data(&mmap, start, hole) {
513 Self::punch_hole(&file, start, hole)?;
514 return Ok(1);
515 }
516 }
517 Ok(0)
518 })
519 .sum::<Result<usize>>()?;
520
521 punched += layout
522 .start_to_hole()
523 .par_iter()
524 .map(|(&start, &hole)| -> Result<usize> {
525 if Self::approx_has_punchable_data(&mmap, start, hole) {
526 Self::punch_hole(&file, start, hole)?;
527 return Ok(1);
528 }
529 Ok(0)
530 })
531 .sum::<Result<usize>>()?;
532
533 if punched > 0 {
534 unsafe {
535 libc::fsync(file.as_raw_fd());
536 }
537 *mmap = Self::create_mmap(&file)?;
538 }
539
540 Ok(())
541 }
542
543 fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
544 assert!(start.is_multiple_of(PAGE_SIZE));
545 assert!(len.is_multiple_of(PAGE_SIZE));
546
547 let start = start as usize;
548 let len = len as usize;
549
550 let min = start;
551 let max = start + len;
552 let check = |start, end| {
553 assert!(start >= min);
554 assert!(end < max);
555 let start_is_some = mmap[start] != 0;
556 let end_is_some = mmap[end] != 0;
557 start_is_some || end_is_some
558 };
559
560 let first_page_start = start;
561 let first_page_end = start + PAGE_SIZE as usize - 1;
562 if check(first_page_start, first_page_end) {
563 return true;
564 }
565
566 let last_page_start = start + len - PAGE_SIZE as usize;
567 let last_page_end = start + len - 1;
568 if check(last_page_start, last_page_end) {
569 return true;
570 }
571
572 if len > GB {
573 let num_gb_checks = len / GB;
574 for i in 1..num_gb_checks {
575 let gb_boundary = start + i * GB;
576 let page_start = gb_boundary;
577 let page_end = gb_boundary + PAGE_SIZE as usize - 1;
578
579 if check(page_start, page_end) {
580 return true;
581 }
582 }
583 }
584
585 false
586 }
587
588 #[cfg(target_os = "macos")]
589 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
590 let fpunchhole = FPunchhole {
591 fp_flags: 0,
592 reserved: 0,
593 fp_offset: start as libc::off_t,
594 fp_length: length as libc::off_t,
595 };
596
597 let result = unsafe {
598 libc::fcntl(
599 file.as_raw_fd(),
600 libc::F_PUNCHHOLE,
601 &fpunchhole as *const FPunchhole,
602 )
603 };
604
605 if result == -1 {
606 let err = std::io::Error::last_os_error();
607 return Err(Error::HolePunchFailed {
608 start,
609 len: length,
610 source: err,
611 });
612 }
613
614 Ok(())
615 }
616
617 #[cfg(target_os = "linux")]
618 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
619 let result = unsafe {
620 libc::fallocate(
621 file.as_raw_fd(),
622 libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
623 start as libc::off_t,
624 length as libc::off_t,
625 )
626 };
627
628 if result == -1 {
629 let err = std::io::Error::last_os_error();
630 return Err(Error::HolePunchFailed {
631 start,
632 len: length,
633 source: err,
634 });
635 }
636
637 Ok(())
638 }
639
640 #[cfg(target_os = "freebsd")]
641 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
642 let fd = file.as_raw_fd();
643
644 let mut spacectl = libc::spacectl_range {
645 r_offset: start as libc::off_t,
646 r_len: length as libc::off_t,
647 };
648
649 let result = unsafe {
650 libc::fspacectl(
651 fd,
652 libc::SPACECTL_DEALLOC,
653 &spacectl as *const libc::spacectl_range,
654 0,
655 &mut spacectl as *mut libc::spacectl_range,
656 )
657 };
658
659 if result == -1 {
660 let err = std::io::Error::last_os_error();
661 return Err(Error::HolePunchFailed {
662 start,
663 len: length,
664 source: err,
665 });
666 }
667
668 Ok(())
669 }
670
671 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "freebsd")))]
672 fn punch_hole(_file: &File, _start: u64, _length: u64) -> Result<()> {
673 Err(Error::String(
674 "Hole punching not supported on this platform".to_string(),
675 ))
676 }
677
678 #[inline]
679 pub fn path(&self) -> &Path {
680 &self.path
681 }
682
683 #[inline]
684 pub fn weak_clone(&self) -> WeakDatabase {
685 WeakDatabase(Arc::downgrade(&self.0))
686 }
687}
688
689impl Deref for Database {
690 type Target = Arc<DatabaseInner>;
691 fn deref(&self) -> &Self::Target {
692 &self.0
693 }
694}
695
696#[repr(C)]
697struct FPunchhole {
698 fp_flags: u32,
699 reserved: u32,
700 fp_offset: off_t,
701 fp_length: off_t,
702}
703
704#[derive(Debug, Clone)]
709pub struct WeakDatabase(Weak<DatabaseInner>);
710
711impl WeakDatabase {
712 pub fn upgrade(&self) -> Database {
713 Database(
714 self.0
715 .upgrade()
716 .expect("Database was dropped while Region still exists"),
717 )
718 }
719}