1#![doc = include_str!(concat!("../", env!("CARGO_PKG_README")))]
2#![doc = "\n## Example\n"]
3#![doc = "\n```rust"]
4#![doc = include_str!("../examples/db.rs")]
5#![doc = "```\n"]
6
7use std::{
8 fs::{self, File, OpenOptions},
9 ops::Deref,
10 os::unix::io::AsRawFd,
11 path::{Path, PathBuf},
12 sync::Arc,
13};
14
15use libc::off_t;
16use memmap2::{MmapMut, MmapOptions};
17use parking_lot::{RwLock, RwLockReadGuard};
18
19pub mod error;
20mod identifier;
21mod layout;
22mod reader;
23mod region;
24mod regions;
25
26pub use error::*;
27pub use identifier::*;
28use layout::*;
29use rayon::prelude::*;
30pub use reader::*;
31pub use region::*;
32use regions::*;
33
34pub const PAGE_SIZE: u64 = 4096;
35pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
36const GB: u64 = 1024 * 1024 * 1024;
37
38#[derive(Debug, Clone)]
39pub struct Database(Arc<DatabaseInner>);
40
41impl Database {
42 pub fn open(path: &Path) -> Result<Self> {
43 Ok(Self(Arc::new(DatabaseInner::open(path)?)))
44 }
45}
46
47impl Deref for Database {
48 type Target = DatabaseInner;
49 fn deref(&self) -> &Self::Target {
50 &self.0
51 }
52}
53
54#[derive(Debug)]
55pub struct DatabaseInner {
56 path: PathBuf,
57 regions: RwLock<Regions>,
58 layout: RwLock<Layout>,
59 file: RwLock<File>,
60 mmap: RwLock<MmapMut>,
61}
62
63impl DatabaseInner {
64 fn open(path: &Path) -> Result<Self> {
65 fs::create_dir_all(path)?;
66
67 let regions = Regions::open(path)?;
68
69 let layout = Layout::from(®ions);
70
71 let file = OpenOptions::new()
72 .read(true)
73 .create(true)
74 .write(true)
75 .truncate(false)
76 .open(Self::data_path_(path))?;
77 file.try_lock()?;
78
79 let mmap = Self::create_mmap(&file)?;
80
81 Ok(Self {
82 path: path.to_owned(),
83 file: RwLock::new(file),
84 mmap: RwLock::new(mmap),
85 regions: RwLock::new(regions),
86 layout: RwLock::new(layout),
87 })
88 }
89
90 pub fn file_len(&self) -> Result<u64> {
91 Ok(self.file.read().metadata()?.len())
92 }
93
94 pub fn set_min_len(&self, len: u64) -> Result<()> {
95 let len = Self::ceil_number_to_page_size_multiple(len);
96
97 let file_len = self.file_len()?;
98 if file_len < len {
99 let mut mmap = self.mmap.write();
100 let file = self.file.write();
101 file.set_len(len)?;
102 *mmap = Self::create_mmap(&file)?;
103 Ok(())
104 } else {
105 Ok(())
106 }
107 }
108
109 pub fn set_min_regions(&self, regions: usize) -> Result<()> {
110 self.regions
111 .write()
112 .set_min_len((regions * SIZE_OF_REGION) as u64)?;
113 self.set_min_len(regions as u64 * PAGE_SIZE)
114 }
115
116 pub fn create_region_if_needed(&self, id: &str) -> Result<(usize, Arc<RwLock<Region>>)> {
117 let regions = self.regions.read();
118 if let Some(index) = regions.get_region_index_from_id(id) {
119 return Ok((index, regions.get_region_from_index(index).unwrap()));
120 }
121 drop(regions);
122
123 let mut regions = self.regions.write();
124 let mut layout = self.layout.write();
125
126 let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
127 layout.remove_or_compress_hole(start, PAGE_SIZE);
128 start
129 } else {
130 let start = layout
131 .get_last_region_index()
132 .map(|index| {
133 let region_opt = regions.get_region_from_index(index);
134 let region = region_opt.as_ref().unwrap().read();
135 region.start() + region.reserved()
136 })
137 .unwrap_or_default();
138
139 let len = start + PAGE_SIZE;
140
141 self.set_min_len(len)?;
142
143 start
144 };
145
146 let (index, region) = regions.create_region(id.to_owned(), start)?;
147
148 layout.insert_region(start, index);
149
150 Ok((index, region))
151 }
152
153 pub fn get_region(&self, identifier: Identifier) -> Result<RwLockReadGuard<'static, Region>> {
154 let regions = self.regions.read();
155 let region_opt = regions.get_region(identifier);
156 let region_arc = region_opt.ok_or(Error::Str("Unknown region"))?;
157 let region = region_arc.read();
158 let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute(region) };
159 Ok(region)
160 }
161
162 pub fn create_region_reader<'a>(&'a self, identifier: Identifier) -> Result<Reader<'a>> {
163 let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read();
164 let region = self.get_region(identifier)?;
165 Ok(Reader::new(mmap, region))
166 }
167
168 #[inline]
169 pub fn write_all_to_region(&self, identifier: Identifier, data: &[u8]) -> Result<()> {
170 self.write_all_to_region_at_(identifier, data, None, false)
171 }
172
173 #[inline]
174 pub fn write_all_to_region_at(
175 &self,
176 identifier: Identifier,
177 data: &[u8],
178 at: u64,
179 ) -> Result<()> {
180 self.write_all_to_region_at_(identifier, data, Some(at), false)
181 }
182
183 #[inline]
184 pub fn truncate_write_all_to_region(
185 &self,
186 identifier: Identifier,
187 at: u64,
188 data: &[u8],
189 ) -> Result<()> {
190 self.write_all_to_region_at_(identifier, data, Some(at), true)
191 }
192
193 fn write_all_to_region_at_(
194 &self,
195 identifier: Identifier,
196 data: &[u8],
197 at: Option<u64>,
198 truncate: bool,
199 ) -> Result<()> {
200 let regions = self.regions.read();
201 let Some(region_lock) = regions.get_region(identifier.clone()) else {
202 return Err(Error::Str("Unknown region"));
203 };
204
205 let region_index = regions.identifier_to_index(identifier).unwrap();
206
207 let region = region_lock.read();
208 let start = region.start();
209 let reserved = region.reserved();
210 let len = region.len();
211 drop(region);
212
213 let data_len = data.len() as u64;
214 let new_len = at.map_or(len + data_len, |at| {
215 assert!(at <= len);
216 let new_len = at + data_len;
217 if truncate { new_len } else { new_len.max(len) }
218 });
219 let write_start = start + at.unwrap_or(len);
220
221 if at.is_some_and(|at| at > reserved) {
222 return Err(Error::Str("Invalid at parameter"));
223 }
224
225 if new_len <= reserved {
227 if at.is_none() {
232 self.write(write_start, data);
233 }
234
235 let mut region = region_lock.write();
236
237 if at.is_some() {
238 self.write(write_start, data);
239 }
240
241 region.set_len(new_len);
242 regions.write_to_mmap(®ion, region_index);
243
244 return Ok(());
245 }
246
247 assert!(new_len > reserved);
248 let mut new_reserved = reserved;
249 while new_len > new_reserved {
250 new_reserved *= 2;
251 }
252 assert!(new_len <= new_reserved);
253 let added_reserve = new_reserved - reserved;
254
255 let mut layout = self.layout.write();
256
257 if layout.is_last_anything(region_index) {
259 self.set_min_len(start + new_reserved)?;
262 let mut region = region_lock.write();
263 region.set_reserved(new_reserved);
264 drop(region);
265 drop(layout);
266
267 self.write(write_start, data);
268
269 let mut region = region_lock.write();
270 region.set_len(new_len);
271 regions.write_to_mmap(®ion, region_index);
272
273 return Ok(());
274 }
275
276 let hole_start = start + reserved;
278 if layout
279 .get_hole(hole_start)
280 .is_some_and(|gap| gap >= added_reserve)
281 {
282 layout.remove_or_compress_hole(hole_start, added_reserve);
285 let mut region = region_lock.write();
286 region.set_reserved(new_reserved);
287 drop(region);
288 drop(layout);
289
290 self.write(write_start, data);
291
292 let mut region = region_lock.write();
293 region.set_len(new_len);
294 regions.write_to_mmap(®ion, region_index);
295
296 return Ok(());
297 }
298
299 if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
301 layout.remove_or_compress_hole(hole_start, new_reserved);
304 drop(layout);
305
306 self.write(
307 hole_start,
308 &self.mmap.read()[start as usize..write_start as usize],
309 );
310 self.write(hole_start + at.unwrap_or(len), data);
311
312 let mut region = region_lock.write();
313 let mut layout = self.layout.write();
314 layout.move_region(hole_start, region_index, ®ion)?;
315 drop(layout);
316
317 region.set_start(hole_start);
318 region.set_reserved(new_reserved);
319 region.set_len(new_len);
320 regions.write_to_mmap(®ion, region_index);
321
322 return Ok(());
323 }
324
325 let new_start = layout.len(®ions);
326 self.set_min_len(new_start + new_reserved)?;
333 layout.reserve(new_start, new_reserved);
334 drop(layout);
335
336 self.write(
337 new_start,
338 &self.mmap.read()[start as usize..write_start as usize],
339 );
340 self.write(new_start + at.unwrap_or(len), data);
341
342 let mut region = region_lock.write();
343 let mut layout = self.layout.write();
344 layout.move_region(new_start, region_index, ®ion)?;
345 assert!(layout.reserved(new_start) == Some(new_reserved));
346 drop(layout);
347
348 region.set_start(new_start);
349 region.set_reserved(new_reserved);
350 region.set_len(new_len);
351 regions.write_to_mmap(®ion, region_index);
352
353 Ok(())
354 }
355
356 fn write(&self, at: u64, data: &[u8]) {
357 let mmap = self.mmap.read();
358
359 let data_len = data.len();
360 let start = at as usize;
361 let end = start + data_len;
362
363 if end > mmap.len() {
364 unreachable!("Trying to write beyond mmap")
365 }
366
367 let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) };
368
369 slice[start..end].copy_from_slice(data);
370 }
371
372 pub fn truncate_region(&self, identifier: Identifier, from: u64) -> Result<()> {
378 let Some(region) = self.regions.read().get_region(identifier.clone()) else {
379 return Err(Error::Str("Unknown region"));
380 };
381 let mut region_ = region.write();
382 let len = region_.len();
383 if from == len {
384 return Ok(());
385 } else if from > len {
386 return Err(Error::Str("Truncating further than length"));
387 }
388 region_.set_len(from);
389 Ok(())
390 }
391
392 pub fn remove_region(&self, identifier: Identifier) -> Result<Option<Arc<RwLock<Region>>>> {
393 let mut regions = self.regions.write();
394
395 let mut layout = self.layout.write();
396
397 let index_opt = regions.identifier_to_index(identifier.clone());
398
399 let Some(region) = regions.remove_region(identifier)? else {
400 return Ok(None);
401 };
402
403 let index = index_opt.unwrap();
404
405 let region_ = region.write();
406
407 layout.remove_region(index, ®ion_)?;
408
409 drop(region_);
410
411 Ok(Some(region))
412 }
413
414 fn create_mmap(file: &File) -> Result<MmapMut> {
415 Ok(unsafe { MmapOptions::new().map_mut(file)? })
416 }
417
418 pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
419 self.regions.read()
420 }
421
422 pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
423 self.layout.read()
424 }
425
426 pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
427 self.mmap.read()
428 }
429
430 fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
431 (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
432 }
433
434 fn data_path(&self) -> PathBuf {
435 Self::data_path_(&self.path)
436 }
437 fn data_path_(path: &Path) -> PathBuf {
438 path.join("data")
439 }
440
441 pub fn disk_usage(&self) -> String {
442 let path = self.data_path();
443
444 let output = std::process::Command::new("du")
445 .arg("-h")
446 .arg(&path)
447 .output()
448 .expect("Failed to run du");
449
450 String::from_utf8_lossy(&output.stdout)
451 .replace(path.to_str().unwrap(), " ")
452 .trim()
453 .to_string()
454 }
455
456 pub fn flush(&self) -> Result<()> {
457 let mmap = self.mmap.read();
458 let regions = self.regions.read();
459 mmap.flush()?;
460 regions.flush()
461 }
462
463 pub fn flush_then_punch(&self) -> Result<()> {
464 self.flush()?;
465 self.punch_holes()
466 }
467
468 pub fn punch_holes(&self) -> Result<()> {
469 let file = self.file.write();
470 let mut mmap = self.mmap.write();
471 let regions = self.regions.read();
472 let layout = self.layout.read();
473
474 let mut punched = regions
475 .index_to_region()
476 .par_iter()
477 .flatten()
478 .map(|region_lock| -> Result<usize> {
479 let region = region_lock.read();
480 let rstart = region.start();
481 let len = region.len();
482 let reserved = region.reserved();
483 let ceil_len = Self::ceil_number_to_page_size_multiple(len);
484 assert!(len <= ceil_len);
485 if ceil_len > reserved {
486 panic!()
487 } else if ceil_len < reserved {
488 let start = rstart + ceil_len;
489 let hole = reserved - ceil_len;
490 if Self::approx_has_punchable_data(&mmap, start, hole) {
491 Self::punch_hole(&file, start, hole)?;
497 return Ok(1);
498 }
499 }
500 Ok(0)
501 })
502 .sum::<Result<usize>>()?;
503
504 punched += layout
505 .start_to_hole()
506 .par_iter()
507 .map(|(&start, &hole)| -> Result<usize> {
508 if Self::approx_has_punchable_data(&mmap, start, hole) {
509 Self::punch_hole(&file, start, hole)?;
512 Ok(1)
513 } else {
514 Ok(0)
515 }
516 })
517 .sum::<Result<usize>>()?;
518
519 if punched > 0 {
520 unsafe {
521 libc::fsync(file.as_raw_fd());
522 }
523 *mmap = Self::create_mmap(&file)?;
525 }
526
527 Ok(())
528 }
529
530 fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
531 assert!(start % PAGE_SIZE == 0);
532 assert!(len % PAGE_SIZE == 0);
533
534 let min = start as usize;
535 let max = (start + len) as usize;
536 let check = |start, end| {
537 assert!(start >= min);
538 assert!(end < max);
539 let start_is_some = mmap[start] != 0;
540 let end_is_some = mmap[end] != 0;
544 start_is_some || end_is_some
548 };
549
550 let first_page_start = start as usize;
552 let first_page_end = (start + PAGE_SIZE - 1) as usize;
553 if check(first_page_start, first_page_end) {
554 return true;
555 }
556
557 let last_page_start = (start + len - PAGE_SIZE) as usize;
559 let last_page_end = (start + len - 1) as usize;
560 if check(last_page_start, last_page_end) {
561 return true;
562 }
563
564 if len > GB {
566 let num_gb_checks = (len / GB) as usize;
567 for i in 1..num_gb_checks {
568 let gb_boundary = start + (i as u64 * GB);
569 let page_start = gb_boundary as usize;
570 let page_end = (gb_boundary + PAGE_SIZE - 1) as usize;
571
572 if check(page_start, page_end) {
573 return true;
574 }
575 }
576 }
577
578 false
579 }
580
581 #[cfg(target_os = "macos")]
582 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
583 let fpunchhole = FPunchhole {
584 fp_flags: 0,
585 reserved: 0,
586 fp_offset: start as libc::off_t,
587 fp_length: length as libc::off_t,
588 };
589
590 let result = unsafe {
591 libc::fcntl(
592 file.as_raw_fd(),
593 libc::F_PUNCHHOLE,
594 &fpunchhole as *const FPunchhole,
595 )
596 };
597
598 if result == -1 {
599 let err = std::io::Error::last_os_error();
600 return Err(Error::String(format!("Failed to punch hole: {err}")));
601 }
602
603 Ok(())
604 }
605
606 #[cfg(target_os = "linux")]
607 fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
608 let result = unsafe {
609 libc::fallocate(
610 file.as_raw_fd(),
611 libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
612 start as libc::off_t,
613 length as libc::off_t,
614 )
615 };
616
617 if result == -1 {
618 let err = std::io::Error::last_os_error();
619 return Err(Error::String(format!("Failed to punch hole: {err}")));
620 }
621
622 Ok(())
623 }
624
625 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
626 fn punch_hole(_file: &File, _start: u64, _length: u64) -> Result<()> {
627 Err(Error::String(
628 "Hole punching not supported on this platform".to_string(),
629 ))
630 }
631}
632
633#[repr(C)]
634struct FPunchhole {
635 fp_flags: u32,
636 reserved: u32,
637 fp_offset: off_t,
638 fp_length: off_t,
639}