rawdb/
lib.rs

1#![doc = include_str!("../README.md")]
2// #![doc = "\n## Example\n"]
3// #![doc = "\n```rust"]
4// #![doc = include_str!("../examples/db.rs")]
5// #![doc = "```\n"]
6
7use std::{
8    collections::HashSet,
9    fs::{self, File, OpenOptions},
10    ops::Deref,
11    os::unix::io::AsRawFd,
12    path::{Path, PathBuf},
13    sync::{Arc, Weak},
14};
15
16use libc::off_t;
17use log::debug;
18use memmap2::{MmapMut, MmapOptions};
19use parking_lot::{RwLock, RwLockReadGuard};
20
21pub mod error;
22mod layout;
23mod reader;
24mod region;
25mod regions;
26
27pub use error::*;
28use layout::*;
29use rayon::prelude::*;
30pub use reader::*;
31pub use region::*;
32use regions::*;
33
34pub const PAGE_SIZE: u64 = 4096;
35pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
36const GB: usize = 1024 * 1024 * 1024;
37
38/// Memory-mapped database with dynamic space allocation and hole punching.
39///
40/// Provides efficient storage through memory mapping with automatic region management,
41/// space reclamation via hole punching, and dynamic file growth as needed.
42#[derive(Debug, Clone)]
43pub struct Database(Arc<DatabaseInner>);
44
45#[derive(Debug)]
46pub struct DatabaseInner {
47    path: PathBuf,
48    regions: RwLock<Regions>,
49    layout: RwLock<Layout>,
50    file: RwLock<File>,
51    mmap: RwLock<MmapMut>,
52}
53
54impl Database {
55    /// Opens or creates a database at the specified path.
56    pub fn open(path: &Path) -> Result<Self> {
57        Self::open_with_min_len(path, 0)
58    }
59
60    /// Opens or creates a database with a minimum initial file size.
61    pub fn open_with_min_len(path: &Path, min_len: u64) -> Result<Self> {
62        fs::create_dir_all(path)?;
63
64        let file = OpenOptions::new()
65            .read(true)
66            .create(true)
67            .write(true)
68            .truncate(false)
69            .open(Self::data_path_(path))?;
70        debug!("File opened.");
71
72        file.try_lock()?;
73        debug!("File locked.");
74
75        let file_len = file.metadata()?.len();
76        if file_len < min_len {
77            file.set_len(min_len)?;
78            debug!("File extended.");
79            file.sync_all()?;
80        }
81
82        let regions = Regions::open(path)?;
83        let mmap = Self::create_mmap(&file)?;
84        debug!("Mmap created.");
85
86        let db = Self(Arc::new(DatabaseInner {
87            path: path.to_owned(),
88            file: RwLock::new(file),
89            mmap: RwLock::new(mmap),
90            regions: RwLock::new(regions),
91            layout: RwLock::new(Layout::default()),
92        }));
93
94        db.regions.write().fill_index_to_region(&db)?;
95        debug!("Filled regions.");
96        *db.layout.write() = Layout::from(&*db.regions.read());
97        debug!("Layout created.");
98
99        Ok(db)
100    }
101
102    pub fn file_len(&self) -> Result<u64> {
103        Ok(self.file.read().metadata()?.len())
104    }
105
106    pub fn set_min_len(&self, len: u64) -> Result<()> {
107        let len = Self::ceil_number_to_page_size_multiple(len);
108
109        let file_len = self.file_len()?;
110        if file_len >= len {
111            return Ok(());
112        }
113
114        let mut mmap = self.mmap.write();
115        let file = self.file.write();
116        file.set_len(len)?;
117        file.sync_all()?;
118        *mmap = Self::create_mmap(&file)?;
119        Ok(())
120    }
121
122    pub fn set_min_regions(&self, regions: usize) -> Result<()> {
123        self.regions
124            .write()
125            .set_min_len((regions * SIZE_OF_REGION_METADATA) as u64)?;
126        self.set_min_len(regions as u64 * PAGE_SIZE)
127    }
128
129    /// Gets an existing region by ID.
130    pub fn get_region(&self, id: &str) -> Option<Region> {
131        self.regions.read().get_region_from_id(id).cloned()
132    }
133
134    /// Creates a region with the given ID, or returns it if it already exists.
135    pub fn create_region_if_needed(&self, id: &str) -> Result<Region> {
136        if let Some(region) = self.get_region(id) {
137            return Ok(region);
138        }
139
140        let mut regions = self.regions.write();
141        let mut layout = self.layout.write();
142
143        let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
144            layout.remove_or_compress_hole(start, PAGE_SIZE);
145            start
146        } else {
147            let start = layout
148                .get_last_region()
149                .map(|(_, region)| {
150                    let region_meta = region.meta().read();
151                    region_meta.start() + region_meta.reserved()
152                })
153                .unwrap_or_default();
154
155            let len = start + PAGE_SIZE;
156
157            self.set_min_len(len)?;
158
159            start
160        };
161
162        let region = regions.create_region(self, id.to_owned(), start)?;
163
164        layout.insert_region(start, &region);
165
166        Ok(region)
167    }
168
169    #[inline]
170    pub fn write_all_to_region(&self, region: &Region, data: &[u8]) -> Result<()> {
171        self.write_all_to_region_at_(region, data, None, false)
172    }
173
174    #[inline]
175    pub fn write_all_to_region_at(&self, region: &Region, data: &[u8], at: u64) -> Result<()> {
176        self.write_all_to_region_at_(region, data, Some(at), false)
177    }
178
179    #[inline]
180    pub fn truncate_write_all_to_region(
181        &self,
182        region: &Region,
183        at: u64,
184        data: &[u8],
185    ) -> Result<()> {
186        self.write_all_to_region_at_(region, data, Some(at), true)
187    }
188
189    fn write_all_to_region_at_(
190        &self,
191        region: &Region,
192        data: &[u8],
193        at: Option<u64>,
194        truncate: bool,
195    ) -> Result<()> {
196        let region_meta = region.meta().read();
197        let start = region_meta.start();
198        let reserved = region_meta.reserved();
199        let len = region_meta.len();
200        drop(region_meta);
201
202        let data_len = data.len() as u64;
203
204        // Validate write position if specified
205        // Note: checking `at > len` is sufficient since `len <= reserved` is always true
206        // Therefore if `at <= len`, then `at <= reserved` must also be true
207        if let Some(at_val) = at
208            && at_val > len
209        {
210            return Err(Error::WriteOutOfBounds {
211                position: at_val,
212                region_len: len,
213            });
214        }
215
216        let new_len = at.map_or(len + data_len, |at| {
217            let new_len = at + data_len;
218            if truncate { new_len } else { new_len.max(len) }
219        });
220        let write_start = start + at.unwrap_or(len);
221
222        // Write to reserved space if possible
223        if new_len <= reserved {
224            // info!(
225            //     "Write {data_len} bytes to {region_index} reserved space at {write_start} (start = {start}, at = {at:?}, len = {len})"
226            // );
227
228            if at.is_none() {
229                self.write(write_start, data);
230            }
231
232            let mut region_meta = region.meta().write();
233
234            if at.is_some() {
235                self.write(write_start, data);
236            }
237
238            region_meta.set_len(new_len);
239
240            return Ok(());
241        }
242
243        assert!(new_len > reserved);
244        let mut new_reserved = reserved;
245        while new_len > new_reserved {
246            new_reserved *= 2;
247        }
248        assert!(new_len <= new_reserved);
249        let added_reserve = new_reserved - reserved;
250
251        let mut layout = self.layout.write();
252
253        // If is last continue writing
254        if layout.is_last_anything(region) {
255            // info!("{region_index} Append to file at {write_start}");
256
257            self.set_min_len(start + new_reserved)?;
258            let mut region_meta = region.meta().write();
259            region_meta.set_reserved(new_reserved);
260            drop(region_meta);
261            drop(layout);
262
263            self.write(write_start, data);
264
265            let mut region_meta = region.meta().write();
266            region_meta.set_len(new_len);
267
268            return Ok(());
269        }
270
271        // Expand region to the right if gap is wide enough
272        let hole_start = start + reserved;
273        if layout
274            .get_hole(hole_start)
275            .is_some_and(|gap| gap >= added_reserve)
276        {
277            // info!("Expand {region_index} to hole");
278
279            layout.remove_or_compress_hole(hole_start, added_reserve);
280            let mut region_meta = region.meta().write();
281            region_meta.set_reserved(new_reserved);
282            drop(region_meta);
283            drop(layout);
284
285            self.write(write_start, data);
286
287            let mut region_meta = region.meta().write();
288            region_meta.set_len(new_len);
289
290            return Ok(());
291        }
292
293        // Find hole big enough to move the region
294        if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
295            // info!("Move {region_index} to hole at {hole_start}");
296
297            layout.remove_or_compress_hole(hole_start, new_reserved);
298            drop(layout);
299
300            self.write(
301                hole_start,
302                &self.mmap.read()[start as usize..write_start as usize],
303            );
304
305            self.write(hole_start + at.unwrap_or(len), data);
306
307            let mut layout = self.layout.write();
308            layout.move_region(hole_start, region)?;
309
310            let mut region_meta = region.meta().write();
311            region_meta.set_start(hole_start);
312            region_meta.set_reserved(new_reserved);
313            region_meta.set_len(new_len);
314
315            return Ok(());
316        }
317
318        let new_start = layout.len();
319        // Write at the end
320        // info!(
321        //     "Move {region_index} to the end, from {start}..{} to {new_start}..{}",
322        //     start + reserved,
323        //     new_start + new_reserved
324        // );
325        self.set_min_len(new_start + new_reserved)?;
326        layout.reserve(new_start, new_reserved);
327        drop(layout);
328
329        // Read existing data and write to new location
330        self.write(
331            new_start,
332            &self.mmap.read()[start as usize..write_start as usize],
333        );
334        self.write(new_start + at.unwrap_or(len), data);
335
336        let mut layout = self.layout.write();
337        layout.move_region(new_start, region)?;
338        assert!(layout.reserved(new_start) == Some(new_reserved));
339
340        let mut region_meta = region.meta().write();
341        region_meta.set_start(new_start);
342        region_meta.set_reserved(new_reserved);
343        region_meta.set_len(new_len);
344
345        Ok(())
346    }
347
348    #[inline]
349    fn write(&self, at: u64, data: &[u8]) {
350        let mmap = self.mmap.read();
351        let data_len = data.len();
352        let start = at as usize;
353        let end = start + data_len;
354        if end > mmap.len() {
355            unreachable!("Trying to write beyond mmap")
356        }
357
358        (unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) })
359            [start..end]
360            .copy_from_slice(data);
361    }
362
363    ///
364    /// From relative to start
365    ///
366    /// Non destructive
367    ///
368    pub fn truncate_region(&self, region: &Region, from: u64) -> Result<()> {
369        let mut region_meta = region.meta().write();
370        let len = region_meta.len();
371        if from == len {
372            return Ok(());
373        } else if from > len {
374            return Err(Error::TruncateInvalid {
375                from,
376                current_len: len,
377            });
378        }
379        region_meta.set_len(from);
380        Ok(())
381    }
382
383    pub fn remove_region_with_id(&self, id: &str) -> Result<Option<Region>> {
384        let Some(region) = self.get_region(id) else {
385            return Ok(None);
386        };
387        self.remove_region(region)
388    }
389
390    pub fn remove_region(&self, region: Region) -> Result<Option<Region>> {
391        let mut regions = self.regions.write();
392        let mut layout = self.layout.write();
393        layout.remove_region(&region)?;
394        regions.remove_region(region)
395    }
396
397    pub fn retain_regions(&self, mut ids: HashSet<String>) -> Result<()> {
398        let regions_to_remove = self
399            .regions
400            .read()
401            .id_to_index()
402            .keys()
403            .filter(|id| !ids.remove(&**id))
404            .flat_map(|id| self.get_region(id))
405            .collect::<Vec<Region>>();
406
407        regions_to_remove
408            .into_iter()
409            .try_for_each(|region| -> Result<()> {
410                self.remove_region(region)?;
411                Ok(())
412            })
413    }
414
415    #[inline]
416    fn create_mmap(file: &File) -> Result<MmapMut> {
417        Ok(unsafe { MmapOptions::new().map_mut(file)? })
418    }
419
420    #[inline]
421    pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
422        self.mmap.read()
423    }
424
425    #[inline]
426    pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
427        self.regions.read()
428    }
429
430    #[inline]
431    pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
432        self.layout.read()
433    }
434
435    #[inline]
436    fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
437        (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
438    }
439
440    #[inline]
441    fn data_path(&self) -> PathBuf {
442        Self::data_path_(&self.path)
443    }
444    #[inline]
445    fn data_path_(path: &Path) -> PathBuf {
446        path.join("data")
447    }
448
449    /// Open a dedicated file handle for sequential reading
450    /// This enables optimal kernel readahead for iteration
451    #[inline]
452    pub fn open_read_only_file(&self) -> Result<File> {
453        File::open(self.data_path()).map_err(Error::from)
454    }
455
456    pub fn disk_usage(&self) -> String {
457        let path = self.data_path();
458
459        let output = std::process::Command::new("du")
460            .arg("-h")
461            .arg(&path)
462            .output()
463            .expect("Failed to run du");
464
465        String::from_utf8_lossy(&output.stdout)
466            .replace(path.to_str().unwrap(), " ")
467            .trim()
468            .to_string()
469    }
470
471    pub fn flush(&self) -> Result<()> {
472        let mmap = self.mmap.read();
473        let regions = self.regions.read();
474        mmap.flush()?;
475        regions.flush()?;
476
477        // Now that metadata is durable, pending holes can be reused
478        self.layout.write().promote_pending_holes();
479
480        Ok(())
481    }
482
483    #[inline]
484    pub fn compact(&self) -> Result<()> {
485        self.flush()?;
486        self.punch_holes()
487    }
488
489    fn punch_holes(&self) -> Result<()> {
490        let file = self.file.write();
491        let mut mmap = self.mmap.write();
492        let regions = self.regions.read();
493        let layout = self.layout.read();
494
495        let mut punched = regions
496            .index_to_region()
497            .par_iter()
498            .flatten()
499            .map(|region| -> Result<usize> {
500                // let region = region_lock.read();
501                let region_meta = region.meta().read();
502                let rstart = region_meta.start();
503                let len = region_meta.len();
504                let reserved = region_meta.reserved();
505                let ceil_len = Self::ceil_number_to_page_size_multiple(len);
506                assert!(len <= ceil_len);
507                if ceil_len > reserved {
508                    panic!()
509                } else if ceil_len < reserved {
510                    let start = rstart + ceil_len;
511                    let hole = reserved - ceil_len;
512                    if Self::approx_has_punchable_data(&mmap, start, hole) {
513                        Self::punch_hole(&file, start, hole)?;
514                        return Ok(1);
515                    }
516                }
517                Ok(0)
518            })
519            .sum::<Result<usize>>()?;
520
521        punched += layout
522            .start_to_hole()
523            .par_iter()
524            .map(|(&start, &hole)| -> Result<usize> {
525                if Self::approx_has_punchable_data(&mmap, start, hole) {
526                    Self::punch_hole(&file, start, hole)?;
527                    return Ok(1);
528                }
529                Ok(0)
530            })
531            .sum::<Result<usize>>()?;
532
533        if punched > 0 {
534            unsafe {
535                libc::fsync(file.as_raw_fd());
536            }
537            *mmap = Self::create_mmap(&file)?;
538        }
539
540        Ok(())
541    }
542
543    fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
544        assert!(start.is_multiple_of(PAGE_SIZE));
545        assert!(len.is_multiple_of(PAGE_SIZE));
546
547        let start = start as usize;
548        let len = len as usize;
549
550        let min = start;
551        let max = start + len;
552        let check = |start, end| {
553            assert!(start >= min);
554            assert!(end < max);
555            let start_is_some = mmap[start] != 0;
556            let end_is_some = mmap[end] != 0;
557            start_is_some || end_is_some
558        };
559
560        let first_page_start = start;
561        let first_page_end = start + PAGE_SIZE as usize - 1;
562        if check(first_page_start, first_page_end) {
563            return true;
564        }
565
566        let last_page_start = start + len - PAGE_SIZE as usize;
567        let last_page_end = start + len - 1;
568        if check(last_page_start, last_page_end) {
569            return true;
570        }
571
572        if len > GB {
573            let num_gb_checks = len / GB;
574            for i in 1..num_gb_checks {
575                let gb_boundary = start + i * GB;
576                let page_start = gb_boundary;
577                let page_end = gb_boundary + PAGE_SIZE as usize - 1;
578
579                if check(page_start, page_end) {
580                    return true;
581                }
582            }
583        }
584
585        false
586    }
587
588    #[cfg(target_os = "macos")]
589    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
590        let fpunchhole = FPunchhole {
591            fp_flags: 0,
592            reserved: 0,
593            fp_offset: start as libc::off_t,
594            fp_length: length as libc::off_t,
595        };
596
597        let result = unsafe {
598            libc::fcntl(
599                file.as_raw_fd(),
600                libc::F_PUNCHHOLE,
601                &fpunchhole as *const FPunchhole,
602            )
603        };
604
605        if result == -1 {
606            let err = std::io::Error::last_os_error();
607            return Err(Error::HolePunchFailed {
608                start,
609                len: length,
610                source: err,
611            });
612        }
613
614        Ok(())
615    }
616
617    #[cfg(target_os = "linux")]
618    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
619        let result = unsafe {
620            libc::fallocate(
621                file.as_raw_fd(),
622                libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
623                start as libc::off_t,
624                length as libc::off_t,
625            )
626        };
627
628        if result == -1 {
629            let err = std::io::Error::last_os_error();
630            return Err(Error::HolePunchFailed {
631                start,
632                len: length,
633                source: err,
634            });
635        }
636
637        Ok(())
638    }
639
640    #[cfg(target_os = "freebsd")]
641    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
642        let fd = file.as_raw_fd();
643
644        let mut spacectl = libc::spacectl_range {
645            r_offset: start as libc::off_t,
646            r_len: length as libc::off_t,
647        };
648
649        let result = unsafe {
650            libc::fspacectl(
651                fd,
652                libc::SPACECTL_DEALLOC,
653                &spacectl as *const libc::spacectl_range,
654                0,
655                &mut spacectl as *mut libc::spacectl_range,
656            )
657        };
658
659        if result == -1 {
660            let err = std::io::Error::last_os_error();
661            return Err(Error::HolePunchFailed {
662                start,
663                len: length,
664                source: err,
665            });
666        }
667
668        Ok(())
669    }
670
671    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "freebsd")))]
672    fn punch_hole(_file: &File, _start: u64, _length: u64) -> Result<()> {
673        Err(Error::String(
674            "Hole punching not supported on this platform".to_string(),
675        ))
676    }
677
678    #[inline]
679    pub fn path(&self) -> &Path {
680        &self.path
681    }
682
683    #[inline]
684    pub fn weak_clone(&self) -> WeakDatabase {
685        WeakDatabase(Arc::downgrade(&self.0))
686    }
687}
688
689impl Deref for Database {
690    type Target = Arc<DatabaseInner>;
691    fn deref(&self) -> &Self::Target {
692        &self.0
693    }
694}
695
696#[repr(C)]
697struct FPunchhole {
698    fp_flags: u32,
699    reserved: u32,
700    fp_offset: off_t,
701    fp_length: off_t,
702}
703
704/// Weak reference to a Database that doesn't prevent it from being dropped.
705///
706/// Used internally by regions to avoid circular references while maintaining
707/// access to the parent database.
708#[derive(Debug, Clone)]
709pub struct WeakDatabase(Weak<DatabaseInner>);
710
711impl WeakDatabase {
712    pub fn upgrade(&self) -> Database {
713        Database(
714            self.0
715                .upgrade()
716                .expect("Database was dropped while Region still exists"),
717        )
718    }
719}