1#![doc = include_str!(concat!("../", env!("CARGO_PKG_README")))]
2#![doc = "\n## Example\n"]
3#![doc = "\n```rust"]
4#![doc = include_str!("../examples/db.rs")]
5#![doc = "```\n"]
6
7use std::{
8 collections::HashSet,
9 fs::{self, File, OpenOptions},
10 ops::Deref,
11 os::unix::io::AsRawFd,
12 path::{Path, PathBuf},
13 sync::Arc,
14};
15
16use allocative::Allocative;
17use libc::off_t;
18use memmap2::{MmapMut, MmapOptions};
19use parking_lot::{RwLock, RwLockReadGuard};
20
21pub mod error;
22mod identifier;
23mod layout;
24mod reader;
25mod region;
26mod regions;
27
28pub use error::*;
29pub use identifier::*;
30use layout::*;
31use rayon::prelude::*;
32pub use reader::*;
33pub use region::*;
34use regions::*;
35
36pub const PAGE_SIZE: u64 = 4096;
37pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
38const GB: u64 = 1024 * 1024 * 1024;
39
40#[derive(Debug, Clone, Allocative)]
41pub struct Database(Arc<DatabaseInner>);
42
43impl Database {
44 pub fn open(path: &Path) -> Result<Self> {
45 Ok(Self(Arc::new(DatabaseInner::open(path)?)))
46 }
47}
48
49impl Deref for Database {
50 type Target = DatabaseInner;
51 fn deref(&self) -> &Self::Target {
52 &self.0
53 }
54}
55
56#[derive(Debug, Allocative)]
57pub struct DatabaseInner {
58 path: PathBuf,
59 regions: RwLock<Regions>,
60 layout: RwLock<Layout>,
61 #[allocative(skip)]
62 file: RwLock<File>,
63 #[allocative(skip)]
64 mmap: RwLock<MmapMut>,
65}
66
67impl DatabaseInner {
68 fn open(path: &Path) -> Result<Self> {
69 fs::create_dir_all(path)?;
70
71 let regions = Regions::open(path)?;
72
73 let layout = Layout::from(®ions);
74
75 let file = OpenOptions::new()
76 .read(true)
77 .create(true)
78 .write(true)
79 .truncate(false)
80 .open(Self::data_path_(path))?;
81 file.try_lock()?;
82
83 let mmap = Self::create_mmap(&file)?;
84
85 Ok(Self {
86 path: path.to_owned(),
87 file: RwLock::new(file),
88 mmap: RwLock::new(mmap),
89 regions: RwLock::new(regions),
90 layout: RwLock::new(layout),
91 })
92 }
93
94 pub fn file_len(&self) -> Result<u64> {
95 Ok(self.file.read().metadata()?.len())
96 }
97
98 pub fn set_min_len(&self, len: u64) -> Result<()> {
99 let len = Self::ceil_number_to_page_size_multiple(len);
100
101 let file_len = self.file_len()?;
102 if file_len < len {
103 let mut mmap = self.mmap.write();
104 let file = self.file.write();
105 file.set_len(len)?;
106 *mmap = Self::create_mmap(&file)?;
107 Ok(())
108 } else {
109 Ok(())
110 }
111 }
112
113 pub fn set_min_regions(&self, regions: usize) -> Result<()> {
114 self.regions
115 .write()
116 .set_min_len((regions * SIZE_OF_REGION) as u64)?;
117 self.set_min_len(regions as u64 * PAGE_SIZE)
118 }
119
120 pub fn create_region_if_needed(&self, id: &str) -> Result<(usize, Arc<RwLock<Region>>)> {
121 let regions = self.regions.read();
122 if let Some(index) = regions.get_region_index_from_id(id) {
123 return Ok((index, regions.get_region_from_index(index).unwrap()));
124 }
125 drop(regions);
126
127 let mut regions = self.regions.write();
128 let mut layout = self.layout.write();
129
130 let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
131 layout.remove_or_compress_hole(start, PAGE_SIZE);
132 start
133 } else {
134 let start = layout
135 .get_last_region_index()
136 .map(|index| {
137 let region_opt = regions.get_region_from_index(index);
138 let region = region_opt.as_ref().unwrap().read();
139 region.start() + region.reserved()
140 })
141 .unwrap_or_default();
142
143 let len = start + PAGE_SIZE;
144
145 self.set_min_len(len)?;
146
147 start
148 };
149
150 let (index, region) = regions.create_region(id.to_owned(), start)?;
151
152 layout.insert_region(start, index);
153
154 Ok((index, region))
155 }
156
157 pub fn get_region(&self, identifier: Identifier) -> Result<RwLockReadGuard<'static, Region>> {
158 let regions = self.regions.read();
159 let region_opt = regions.get_region(identifier);
160 let region_arc = region_opt.ok_or(Error::Str("Unknown region"))?;
161 let region = region_arc.read();
162 let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute(region) };
163 Ok(region)
164 }
165
166 pub fn create_region_reader<'a>(&'a self, identifier: Identifier) -> Result<Reader<'a>> {
167 let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read();
168 let region = self.get_region(identifier)?;
169 Ok(Reader::new(mmap, region))
170 }
171
172 #[inline]
173 pub fn write_all_to_region(&self, identifier: Identifier, data: &[u8]) -> Result<()> {
174 self.write_all_to_region_at_(identifier, data, None, false)
175 }
176
177 #[inline]
178 pub fn write_all_to_region_at(
179 &self,
180 identifier: Identifier,
181 data: &[u8],
182 at: u64,
183 ) -> Result<()> {
184 self.write_all_to_region_at_(identifier, data, Some(at), false)
185 }
186
187 #[inline]
188 pub fn truncate_write_all_to_region(
189 &self,
190 identifier: Identifier,
191 at: u64,
192 data: &[u8],
193 ) -> Result<()> {
194 self.write_all_to_region_at_(identifier, data, Some(at), true)
195 }
196
197 fn write_all_to_region_at_(
198 &self,
199 identifier: Identifier,
200 data: &[u8],
201 at: Option<u64>,
202 truncate: bool,
203 ) -> Result<()> {
204 let regions = self.regions.read();
205 let Some(region_lock) = regions.get_region(identifier.clone()) else {
206 return Err(Error::Str("Unknown region"));
207 };
208
209 let region_index = regions.identifier_to_index(identifier).unwrap();
210
211 let region = region_lock.read();
212 let start = region.start();
213 let reserved = region.reserved();
214 let len = region.len();
215 drop(region);
216
217 let data_len = data.len() as u64;
218 let new_len = at.map_or(len + data_len, |at| {
219 assert!(at <= len);
220 let new_len = at + data_len;
221 if truncate { new_len } else { new_len.max(len) }
222 });
223 let write_start = start + at.unwrap_or(len);
224
225 if at.is_some_and(|at| at > reserved) {
226 return Err(Error::Str("Invalid at parameter"));
227 }
228
229 if new_len <= reserved {
231 if at.is_none() {
236 self.write(write_start, data);
237 }
238
239 let mut region = region_lock.write();
240
241 if at.is_some() {
242 self.write(write_start, data);
243 }
244
245 region.set_len(new_len);
246 regions.write_to_mmap(®ion, region_index);
247
248 return Ok(());
249 }
250
251 assert!(new_len > reserved);
252 let mut new_reserved = reserved;
253 while new_len > new_reserved {
254 new_reserved *= 2;
255 }
256 assert!(new_len <= new_reserved);
257 let added_reserve = new_reserved - reserved;
258
259 let mut layout = self.layout.write();
260
261 if layout.is_last_anything(region_index) {
263 self.set_min_len(start + new_reserved)?;
266 let mut region = region_lock.write();
267 region.set_reserved(new_reserved);
268 drop(region);
269 drop(layout);
270
271 self.write(write_start, data);
272
273 let mut region = region_lock.write();
274 region.set_len(new_len);
275 regions.write_to_mmap(®ion, region_index);
276
277 return Ok(());
278 }
279
280 let hole_start = start + reserved;
282 if layout
283 .get_hole(hole_start)
284 .is_some_and(|gap| gap >= added_reserve)
285 {
286 layout.remove_or_compress_hole(hole_start, added_reserve);
289 let mut region = region_lock.write();
290 region.set_reserved(new_reserved);
291 drop(region);
292 drop(layout);
293
294 self.write(write_start, data);
295
296 let mut region = region_lock.write();
297 region.set_len(new_len);
298 regions.write_to_mmap(®ion, region_index);
299
300 return Ok(());
301 }
302
303 if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
305 layout.remove_or_compress_hole(hole_start, new_reserved);
308 drop(layout);
309
310 self.write(
311 hole_start,
312 &self.mmap.read()[start as usize..write_start as usize],
313 );
314 self.write(hole_start + at.unwrap_or(len), data);
315
316 let mut region = region_lock.write();
317 let mut layout = self.layout.write();
318 layout.move_region(hole_start, region_index, ®ion)?;
319 drop(layout);
320
321 region.set_start(hole_start);
322 region.set_reserved(new_reserved);
323 region.set_len(new_len);
324 regions.write_to_mmap(®ion, region_index);
325
326 return Ok(());
327 }
328
329 let new_start = layout.len(®ions);
330 self.set_min_len(new_start + new_reserved)?;
337 layout.reserve(new_start, new_reserved);
338 drop(layout);
339
340 self.write(
341 new_start,
342 &self.mmap.read()[start as usize..write_start as usize],
343 );
344 self.write(new_start + at.unwrap_or(len), data);
345
346 let mut region = region_lock.write();
347 let mut layout = self.layout.write();
348 layout.move_region(new_start, region_index, ®ion)?;
349 assert!(layout.reserved(new_start) == Some(new_reserved));
350 drop(layout);
351
352 region.set_start(new_start);
353 region.set_reserved(new_reserved);
354 region.set_len(new_len);
355 regions.write_to_mmap(®ion, region_index);
356
357 Ok(())
358 }
359
360 fn write(&self, at: u64, data: &[u8]) {
361 let mmap = self.mmap.read();
362
363 let data_len = data.len();
364 let start = at as usize;
365 let end = start + data_len;
366
367 if end > mmap.len() {
368 unreachable!("Trying to write beyond mmap")
369 }
370
371 let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) };
372
373 slice[start..end].copy_from_slice(data);
374 }
375
376 pub fn truncate_region(&self, identifier: Identifier, from: u64) -> Result<()> {
382 let Some(region) = self.regions.read().get_region(identifier.clone()) else {
383 return Err(Error::Str("Unknown region"));
384 };
385 let mut region_ = region.write();
386 let len = region_.len();
387 if from == len {
388 return Ok(());
389 } else if from > len {
390 return Err(Error::Str("Truncating further than length"));
391 }
392 region_.set_len(from);
393 Ok(())
394 }
395
396 pub fn remove_region(&self, identifier: Identifier) -> Result<Option<Arc<RwLock<Region>>>> {
397 let mut regions = self.regions.write();
398
399 let mut layout = self.layout.write();
400
401 let index_opt = regions.identifier_to_index(identifier.clone());
402
403 let Some(region) = regions.remove_region(identifier)? else {
404 return Ok(None);
405 };
406
407 let index = index_opt.unwrap();
408
409 let region_ = region.write();
410
411 layout.remove_region(index, ®ion_)?;
412
413 drop(region_);
414
415 Ok(Some(region))
416 }
417
418 pub fn retain_regions(&self, mut ids: HashSet<String>) -> Result<()> {
419 let ids_to_remove = self
420 .regions
421 .read()
422 .id_to_index()
423 .keys()
424 .filter(|id| !ids.remove(&**id))
425 .cloned()
426 .collect::<Vec<String>>();
427
428 ids_to_remove.into_iter().try_for_each(|id| -> Result<()> {
429 self.remove_region(id.into())?;
430 Ok(())
431 })
432 }
433
434 fn create_mmap(file: &File) -> Result<MmapMut> {
435 Ok(unsafe { MmapOptions::new().map_mut(file)? })
436 }
437
438 pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
439 self.regions.read()
440 }
441
442 pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
443 self.layout.read()
444 }
445
446 pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
447 self.mmap.read()
448 }
449
450 fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
451 (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
452 }
453
454 fn data_path(&self) -> PathBuf {
455 Self::data_path_(&self.path)
456 }
457 fn data_path_(path: &Path) -> PathBuf {
458 path.join("data")
459 }
460
461 pub fn disk_usage(&self) -> String {
462 let path = self.data_path();
463
464 let output = std::process::Command::new("du")
465 .arg("-h")
466 .arg(&path)
467 .output()
468 .expect("Failed to run du");
469
470 String::from_utf8_lossy(&output.stdout)
471 .replace(path.to_str().unwrap(), " ")
472 .trim()
473 .to_string()
474 }
475
476 pub fn flush(&self) -> Result<()> {
477 let mmap = self.mmap.read();
478 let regions = self.regions.read();
479 mmap.flush()?;
480 regions.flush()
481 }
482
483 pub fn flush_then_punch(&self) -> Result<()> {
484 self.flush()?;
485 self.punch_holes()
486 }
487
488 pub fn punch_holes(&self) -> Result<()> {
489 let file = self.file.write();
490 let mut mmap = self.mmap.write();
491 let regions = self.regions.read();
492 let layout = self.layout.read();
493
494 let mut punched = regions
495 .index_to_region()
496 .par_iter()
497 .flatten()
498 .map(|region_lock| -> Result<usize> {
499 let region = region_lock.read();
500 let rstart = region.start();
501 let len = region.len();
502 let reserved = region.reserved();
503 let ceil_len = Self::ceil_number_to_page_size_multiple(len);
504 assert!(len <= ceil_len);
505 if ceil_len > reserved {
506 panic!()
507 } else if ceil_len < reserved {
508 let start = rstart + ceil_len;
509 let hole = reserved - ceil_len;
510 if Self::approx_has_punchable_data(&mmap, start, hole) {
511 Self::punch_hole(&file, start, hole)?;
517 return Ok(1);
518 }
519 }
520 Ok(0)
521 })
522 .sum::<Result<usize>>()?;
523
524 punched += layout
525 .start_to_hole()
526 .par_iter()
527 .map(|(&start, &hole)| -> Result<usize> {
528 if Self::approx_has_punchable_data(&mmap, start, hole) {
529 Self::punch_hole(&file, start, hole)?;
532 Ok(1)
533 } else {
534 Ok(0)
535 }
536 })
537 .sum::<Result<usize>>()?;
538
539 if punched > 0 {
540 unsafe {
541 libc::fsync(file.as_raw_fd());
542 }
543 *mmap = Self::create_mmap(&file)?;
545 }
546
547 Ok(())
548 }
549
550 fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
551 assert!(start % PAGE_SIZE == 0);
552 assert!(len % PAGE_SIZE == 0);
553
554 let min = start as usize;
555 let max = (start + len) as usize;
556 let check = |start, end| {
557 assert!(start >= min);
558 assert!(end < max);
559 let start_is_some = mmap[start] != 0;
560 let end_is_some = mmap[end] != 0;
564 start_is_some || end_is_some
568 };
569
570 let first_page_start = start as usize;
572 let first_page_end = (start + PAGE_SIZE - 1) as usize;
573 if check(first_page_start, first_page_end) {
574 return true;
575 }
576
577 let last_page_start = (start + len - PAGE_SIZE) as usize;
579 let last_page_end = (start + len - 1) as usize;
580 if check(last_page_start, last_page_end) {
581 return true;
582 }
583
584 if len > GB {
586 let num_gb_checks = (len / GB) as usize;
587 for i in 1..num_gb_checks {
588 let gb_boundary = start + (i as u64 * GB);
589 let page_start = gb_boundary as usize;
590 let page_end = (gb_boundary + PAGE_SIZE - 1) as usize;
591
592 if check(page_start, page_end) {
593 return true;
594 }
595 }
596 }
597
598 false
599 }
600
601 #[cfg(target_os = "macos")]
602 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
603 let fpunchhole = FPunchhole {
604 fp_flags: 0,
605 reserved: 0,
606 fp_offset: start as libc::off_t,
607 fp_length: length as libc::off_t,
608 };
609
610 let result = unsafe {
611 libc::fcntl(
612 file.as_raw_fd(),
613 libc::F_PUNCHHOLE,
614 &fpunchhole as *const FPunchhole,
615 )
616 };
617
618 if result == -1 {
619 let err = std::io::Error::last_os_error();
620 return Err(Error::String(format!("Failed to punch hole: {err}")));
621 }
622
623 Ok(())
624 }
625
626 #[cfg(target_os = "linux")]
627 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
628 let result = unsafe {
629 libc::fallocate(
630 file.as_raw_fd(),
631 libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
632 start as libc::off_t,
633 length as libc::off_t,
634 )
635 };
636
637 if result == -1 {
638 let err = std::io::Error::last_os_error();
639 return Err(Error::String(format!("Failed to punch hole: {err}")));
640 }
641
642 Ok(())
643 }
644
645 #[cfg(target_os = "freebsd")]
646 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
647 let fd = file.as_raw_fd();
648
649 let mut spacectl = libc::spacectl_range {
650 r_offset: offset as libc::off_t,
651 r_len: length as libc::off_t,
652 };
653
654 let result = unsafe {
655 libc::fspacectl(
656 fd,
657 libc::SPACECTL_DEALLOC,
658 &spacectl as *const libc::spacectl_range,
659 0,
660 &mut spacectl as *mut libc::spacectl_range,
661 )
662 };
663
664 if result == -1 {
665 let err = std::io::Error::last_os_error();
666 return Err(Error::String(format!("Failed to punch hole: {err}")));
667 }
668
669 Ok(())
670 }
671
672 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "freebsd")))]
673 fn punch_hole(_file: &File, _start: u64, _length: u64) -> Result<()> {
674 Err(Error::String(
675 "Hole punching not supported on this platform".to_string(),
676 ))
677 }
678
679 pub fn path(&self) -> &Path {
680 &self.path
681 }
682}
683
684#[repr(C)]
685struct FPunchhole {
686 fp_flags: u32,
687 reserved: u32,
688 fp_offset: off_t,
689 fp_length: off_t,
690}