1#![doc = include_str!("../README.md")]
2use std::{
8 collections::HashSet,
9 fs::{self, File, OpenOptions},
10 ops::Deref,
11 os::unix::io::AsRawFd,
12 path::{Path, PathBuf},
13 sync::{Arc, Weak},
14};
15
16use libc::off_t;
17use memmap2::{MmapMut, MmapOptions};
18use parking_lot::{RwLock, RwLockReadGuard};
19
20pub mod error;
21mod layout;
22mod reader;
23mod region;
24mod regions;
25
26pub use error::*;
27use layout::*;
28use rayon::prelude::*;
29pub use reader::*;
30pub use region::*;
31use regions::*;
32
33pub const PAGE_SIZE: u64 = 4096;
34pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
35const GB: usize = 1024 * 1024 * 1024;
36
37#[derive(Debug, Clone)]
38pub struct Database(Arc<DatabaseInner>);
39
40#[derive(Debug)]
41pub struct DatabaseInner {
42 path: PathBuf,
43 regions: RwLock<Regions>,
44 layout: RwLock<Layout>,
45 file: RwLock<File>,
46 mmap: RwLock<MmapMut>,
47}
48
49impl Database {
50 pub fn open(path: &Path) -> Result<Self> {
51 fs::create_dir_all(path)?;
52
53 let file = OpenOptions::new()
54 .read(true)
55 .create(true)
56 .write(true)
57 .truncate(false)
58 .open(Self::data_path_(path))?;
59 file.try_lock()?;
60
61 let regions = Regions::open(path)?;
62 let mmap = Self::create_mmap(&file)?;
63
64 let db = Self(Arc::new(DatabaseInner {
65 path: path.to_owned(),
66 file: RwLock::new(file),
67 mmap: RwLock::new(mmap),
68 regions: RwLock::new(regions),
69 layout: RwLock::new(Layout::default()),
70 }));
71
72 db.regions.write().fill_index_to_region(&db)?;
73 *db.layout.write() = Layout::from(&*db.regions.read());
74
75 File::open(path)?.sync_data()?;
77
78 Ok(db)
79 }
80
81 pub fn file_len(&self) -> Result<u64> {
82 Ok(self.file.read().metadata()?.len())
83 }
84
85 pub fn set_min_len(&self, len: u64) -> Result<()> {
86 let len = Self::ceil_number_to_page_size_multiple(len);
87
88 let file_len = self.file_len()?;
89 if file_len < len {
90 let mut mmap = self.mmap.write();
91 let file = self.file.write();
92 file.set_len(len)?;
93 *mmap = Self::create_mmap(&file)?;
94 Ok(())
95 } else {
96 Ok(())
97 }
98 }
99
100 pub fn set_min_regions(&self, regions: usize) -> Result<()> {
101 self.regions
102 .write()
103 .set_min_len((regions * SIZE_OF_REGION_METADATA) as u64)?;
104 self.set_min_len(regions as u64 * PAGE_SIZE)
105 }
106
107 pub fn get_region(&self, id: &str) -> Option<Region> {
108 self.regions.read().get_region_from_id(id).cloned()
109 }
110
111 pub fn create_region_if_needed(&self, id: &str) -> Result<Region> {
112 if let Some(region) = self.get_region(id) {
113 return Ok(region);
114 }
115
116 let mut regions = self.regions.write();
117 let mut layout = self.layout.write();
118
119 let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
120 layout.remove_or_compress_hole(start, PAGE_SIZE);
121 start
122 } else {
123 let start = layout
124 .get_last_region()
125 .map(|(_, region)| {
126 let region_meta = region.meta().read();
127 region_meta.start() + region_meta.reserved()
128 })
129 .unwrap_or_default();
130
131 let len = start + PAGE_SIZE;
132
133 self.set_min_len(len)?;
134
135 start
136 };
137
138 let region = regions.create_region(self, id.to_owned(), start)?;
139
140 layout.insert_region(start, ®ion);
141
142 Ok(region)
143 }
144
145 #[inline]
146 pub fn write_all_to_region(&self, region: &Region, data: &[u8]) -> Result<()> {
147 self.write_all_to_region_at_(region, data, None, false)
148 }
149
150 #[inline]
151 pub fn write_all_to_region_at(&self, region: &Region, data: &[u8], at: u64) -> Result<()> {
152 self.write_all_to_region_at_(region, data, Some(at), false)
153 }
154
155 #[inline]
156 pub fn truncate_write_all_to_region(
157 &self,
158 region: &Region,
159 at: u64,
160 data: &[u8],
161 ) -> Result<()> {
162 self.write_all_to_region_at_(region, data, Some(at), true)
163 }
164
165 fn write_all_to_region_at_(
166 &self,
167 region: &Region,
168 data: &[u8],
169 at: Option<u64>,
170 truncate: bool,
171 ) -> Result<()> {
172 let region_meta = region.meta().read();
173 let start = region_meta.start();
174 let reserved = region_meta.reserved();
175 let len = region_meta.len();
176 drop(region_meta);
177
178 let data_len = data.len() as u64;
179
180 if let Some(at_val) = at
184 && at_val > len
185 {
186 return Err(Error::WriteOutOfBounds {
187 position: at_val,
188 region_len: len,
189 });
190 }
191
192 let new_len = at.map_or(len + data_len, |at| {
193 let new_len = at + data_len;
194 if truncate { new_len } else { new_len.max(len) }
195 });
196 let write_start = start + at.unwrap_or(len);
197
198 if new_len <= reserved {
200 if at.is_none() {
205 self.write(write_start, data);
206 }
207
208 let mut region_meta = region.meta().write();
209
210 if at.is_some() {
211 self.write(write_start, data);
212 }
213
214 region_meta.set_len(new_len);
215
216 return Ok(());
217 }
218
219 assert!(new_len > reserved);
220 let mut new_reserved = reserved;
221 while new_len > new_reserved {
222 new_reserved *= 2;
223 }
224 assert!(new_len <= new_reserved);
225 let added_reserve = new_reserved - reserved;
226
227 let mut layout = self.layout.write();
228
229 if layout.is_last_anything(region) {
231 self.set_min_len(start + new_reserved)?;
234 let mut region_meta = region.meta().write();
235 region_meta.set_reserved(new_reserved);
236 drop(region_meta);
237 drop(layout);
238
239 self.write(write_start, data);
240
241 let mut region_meta = region.meta().write();
242 region_meta.set_len(new_len);
243
244 return Ok(());
245 }
246
247 let hole_start = start + reserved;
249 if layout
250 .get_hole(hole_start)
251 .is_some_and(|gap| gap >= added_reserve)
252 {
253 layout.remove_or_compress_hole(hole_start, added_reserve);
256 let mut region_meta = region.meta().write();
257 region_meta.set_reserved(new_reserved);
258 drop(region_meta);
259 drop(layout);
260
261 self.write(write_start, data);
262
263 let mut region_meta = region.meta().write();
264 region_meta.set_len(new_len);
265
266 return Ok(());
267 }
268
269 if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
271 layout.remove_or_compress_hole(hole_start, new_reserved);
274 drop(layout);
275
276 self.write(
277 hole_start,
278 &self.mmap.read()[start as usize..write_start as usize],
279 );
280
281 self.write(hole_start + at.unwrap_or(len), data);
282
283 let mut layout = self.layout.write();
284 layout.move_region(hole_start, region)?;
285
286 let mut region_meta = region.meta().write();
287 region_meta.set_start(hole_start);
288 region_meta.set_reserved(new_reserved);
289 region_meta.set_len(new_len);
290
291 return Ok(());
292 }
293
294 let new_start = layout.len();
295 self.set_min_len(new_start + new_reserved)?;
302 layout.reserve(new_start, new_reserved);
303 drop(layout);
304
305 self.write(
307 new_start,
308 &self.mmap.read()[start as usize..write_start as usize],
309 );
310 self.write(new_start + at.unwrap_or(len), data);
311
312 let mut layout = self.layout.write();
313 layout.move_region(new_start, region)?;
314 assert!(layout.reserved(new_start) == Some(new_reserved));
315
316 let mut region_meta = region.meta().write();
317 region_meta.set_start(new_start);
318 region_meta.set_reserved(new_reserved);
319 region_meta.set_len(new_len);
320
321 Ok(())
322 }
323
324 #[inline]
325 fn write(&self, at: u64, data: &[u8]) {
326 let mmap = self.mmap.read();
327 let data_len = data.len();
328 let start = at as usize;
329 let end = start + data_len;
330 if end > mmap.len() {
331 unreachable!("Trying to write beyond mmap")
332 }
333
334 (unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) })
335 [start..end]
336 .copy_from_slice(data);
337 }
338
339 pub fn truncate_region(&self, region: &Region, from: u64) -> Result<()> {
345 let mut region_meta = region.meta().write();
346 let len = region_meta.len();
347 if from == len {
348 return Ok(());
349 } else if from > len {
350 return Err(Error::TruncateInvalid {
351 from,
352 current_len: len,
353 });
354 }
355 region_meta.set_len(from);
356 Ok(())
357 }
358
359 pub fn remove_region_with_id(&self, id: &str) -> Result<Option<Region>> {
360 let Some(region) = self.get_region(id) else {
361 return Ok(None);
362 };
363 self.remove_region(region)
364 }
365
366 pub fn remove_region(&self, region: Region) -> Result<Option<Region>> {
367 let mut regions = self.regions.write();
368 let mut layout = self.layout.write();
369 layout.remove_region(®ion)?;
370 regions.remove_region(region)
371 }
372
373 pub fn retain_regions(&self, mut ids: HashSet<String>) -> Result<()> {
374 let regions_to_remove = self
375 .regions
376 .read()
377 .id_to_index()
378 .keys()
379 .filter(|id| !ids.remove(&**id))
380 .flat_map(|id| self.get_region(id))
381 .collect::<Vec<Region>>();
382
383 regions_to_remove
384 .into_iter()
385 .try_for_each(|region| -> Result<()> {
386 self.remove_region(region)?;
387 Ok(())
388 })
389 }
390
391 #[inline]
392 fn create_mmap(file: &File) -> Result<MmapMut> {
393 Ok(unsafe { MmapOptions::new().map_mut(file)? })
394 }
395
396 #[inline]
397 pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
398 self.mmap.read()
399 }
400
401 #[inline]
402 pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
403 self.regions.read()
404 }
405
406 #[inline]
407 pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
408 self.layout.read()
409 }
410
411 #[inline]
412 fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
413 (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
414 }
415
416 #[inline]
417 fn data_path(&self) -> PathBuf {
418 Self::data_path_(&self.path)
419 }
420 #[inline]
421 fn data_path_(path: &Path) -> PathBuf {
422 path.join("data")
423 }
424
425 #[inline]
428 pub fn open_read_only_file(&self) -> Result<File> {
429 File::open(self.data_path()).map_err(Error::from)
430 }
431
432 pub fn disk_usage(&self) -> String {
433 let path = self.data_path();
434
435 let output = std::process::Command::new("du")
436 .arg("-h")
437 .arg(&path)
438 .output()
439 .expect("Failed to run du");
440
441 String::from_utf8_lossy(&output.stdout)
442 .replace(path.to_str().unwrap(), " ")
443 .trim()
444 .to_string()
445 }
446
447 pub fn flush(&self) -> Result<()> {
448 let mmap = self.mmap.read();
449 let regions = self.regions.read();
450 mmap.flush()?;
451 regions.flush()?;
452
453 self.layout.write().promote_pending_holes();
455
456 Ok(())
457 }
458
459 #[inline]
460 pub fn compact(&self) -> Result<()> {
461 self.flush()?;
462 self.punch_holes()
463 }
464
465 fn punch_holes(&self) -> Result<()> {
466 let file = self.file.write();
467 let mut mmap = self.mmap.write();
468 let regions = self.regions.read();
469 let layout = self.layout.read();
470
471 let mut punched = regions
472 .index_to_region()
473 .par_iter()
474 .flatten()
475 .map(|region| -> Result<usize> {
476 let region_meta = region.meta().read();
478 let rstart = region_meta.start();
479 let len = region_meta.len();
480 let reserved = region_meta.reserved();
481 let ceil_len = Self::ceil_number_to_page_size_multiple(len);
482 assert!(len <= ceil_len);
483 if ceil_len > reserved {
484 panic!()
485 } else if ceil_len < reserved {
486 let start = rstart + ceil_len;
487 let hole = reserved - ceil_len;
488 if Self::approx_has_punchable_data(&mmap, start, hole) {
489 Self::punch_hole(&file, start, hole)?;
490 return Ok(1);
491 }
492 }
493 Ok(0)
494 })
495 .sum::<Result<usize>>()?;
496
497 punched += layout
498 .start_to_hole()
499 .par_iter()
500 .map(|(&start, &hole)| -> Result<usize> {
501 if Self::approx_has_punchable_data(&mmap, start, hole) {
502 Self::punch_hole(&file, start, hole)?;
503 return Ok(1);
504 }
505 Ok(0)
506 })
507 .sum::<Result<usize>>()?;
508
509 if punched > 0 {
510 unsafe {
511 libc::fsync(file.as_raw_fd());
512 }
513 *mmap = Self::create_mmap(&file)?;
514 }
515
516 Ok(())
517 }
518
519 fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
520 assert!(start.is_multiple_of(PAGE_SIZE));
521 assert!(len.is_multiple_of(PAGE_SIZE));
522
523 let start = start as usize;
524 let len = len as usize;
525
526 let min = start;
527 let max = start + len;
528 let check = |start, end| {
529 assert!(start >= min);
530 assert!(end < max);
531 let start_is_some = mmap[start] != 0;
532 let end_is_some = mmap[end] != 0;
533 start_is_some || end_is_some
534 };
535
536 let first_page_start = start;
537 let first_page_end = start + PAGE_SIZE as usize - 1;
538 if check(first_page_start, first_page_end) {
539 return true;
540 }
541
542 let last_page_start = start + len - PAGE_SIZE as usize;
543 let last_page_end = start + len - 1;
544 if check(last_page_start, last_page_end) {
545 return true;
546 }
547
548 if len > GB {
549 let num_gb_checks = len / GB;
550 for i in 1..num_gb_checks {
551 let gb_boundary = start + i * GB;
552 let page_start = gb_boundary;
553 let page_end = gb_boundary + PAGE_SIZE as usize - 1;
554
555 if check(page_start, page_end) {
556 return true;
557 }
558 }
559 }
560
561 false
562 }
563
564 #[cfg(target_os = "macos")]
565 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
566 let fpunchhole = FPunchhole {
567 fp_flags: 0,
568 reserved: 0,
569 fp_offset: start as libc::off_t,
570 fp_length: length as libc::off_t,
571 };
572
573 let result = unsafe {
574 libc::fcntl(
575 file.as_raw_fd(),
576 libc::F_PUNCHHOLE,
577 &fpunchhole as *const FPunchhole,
578 )
579 };
580
581 if result == -1 {
582 let err = std::io::Error::last_os_error();
583 return Err(Error::HolePunchFailed {
584 start,
585 len: length,
586 source: err,
587 });
588 }
589
590 Ok(())
591 }
592
593 #[cfg(target_os = "linux")]
594 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
595 let result = unsafe {
596 libc::fallocate(
597 file.as_raw_fd(),
598 libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
599 start as libc::off_t,
600 length as libc::off_t,
601 )
602 };
603
604 if result == -1 {
605 let err = std::io::Error::last_os_error();
606 return Err(Error::HolePunchFailed {
607 start,
608 len: length,
609 source: err,
610 });
611 }
612
613 Ok(())
614 }
615
616 #[cfg(target_os = "freebsd")]
617 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
618 let fd = file.as_raw_fd();
619
620 let mut spacectl = libc::spacectl_range {
621 r_offset: start as libc::off_t,
622 r_len: length as libc::off_t,
623 };
624
625 let result = unsafe {
626 libc::fspacectl(
627 fd,
628 libc::SPACECTL_DEALLOC,
629 &spacectl as *const libc::spacectl_range,
630 0,
631 &mut spacectl as *mut libc::spacectl_range,
632 )
633 };
634
635 if result == -1 {
636 let err = std::io::Error::last_os_error();
637 return Err(Error::HolePunchFailed {
638 start,
639 len: length,
640 source: err,
641 });
642 }
643
644 Ok(())
645 }
646
647 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "freebsd")))]
648 fn punch_hole(_file: &File, _start: u64, _length: u64) -> Result<()> {
649 Err(Error::String(
650 "Hole punching not supported on this platform".to_string(),
651 ))
652 }
653
654 #[inline]
655 pub fn path(&self) -> &Path {
656 &self.path
657 }
658
659 #[inline]
660 pub fn weak_clone(&self) -> WeakDatabase {
661 WeakDatabase(Arc::downgrade(&self.0))
662 }
663}
664
665impl Deref for Database {
666 type Target = Arc<DatabaseInner>;
667 fn deref(&self) -> &Self::Target {
668 &self.0
669 }
670}
671
672#[repr(C)]
673struct FPunchhole {
674 fp_flags: u32,
675 reserved: u32,
676 fp_offset: off_t,
677 fp_length: off_t,
678}
679
680#[derive(Debug, Clone)]
681pub struct WeakDatabase(Weak<DatabaseInner>);
682
683impl WeakDatabase {
684 pub fn upgrade(&self) -> Database {
685 Database(
686 self.0
687 .upgrade()
688 .expect("Database was dropped while Region still exists"),
689 )
690 }
691}