rawdb/
lib.rs

1#![doc = include_str!("../README.md")]
2// #![doc = "\n## Example\n"]
3// #![doc = "\n```rust"]
4// #![doc = include_str!("../examples/db.rs")]
5// #![doc = "```\n"]
6
7use std::{
8    collections::HashSet,
9    fs::{self, File, OpenOptions},
10    ops::Deref,
11    os::unix::io::AsRawFd,
12    path::{Path, PathBuf},
13    sync::{Arc, Weak},
14};
15
16use libc::off_t;
17use memmap2::{MmapMut, MmapOptions};
18use parking_lot::{RwLock, RwLockReadGuard};
19
20pub mod error;
21mod layout;
22mod reader;
23mod region;
24mod regions;
25
26pub use error::*;
27use layout::*;
28use rayon::prelude::*;
29pub use reader::*;
30pub use region::*;
31use regions::*;
32
33pub const PAGE_SIZE: u64 = 4096;
34pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
35const GB: usize = 1024 * 1024 * 1024;
36
37#[derive(Debug, Clone)]
38pub struct Database(Arc<DatabaseInner>);
39
40#[derive(Debug)]
41pub struct DatabaseInner {
42    path: PathBuf,
43    regions: RwLock<Regions>,
44    layout: RwLock<Layout>,
45    file: RwLock<File>,
46    mmap: RwLock<MmapMut>,
47}
48
49impl Database {
50    pub fn open(path: &Path) -> Result<Self> {
51        fs::create_dir_all(path)?;
52
53        let file = OpenOptions::new()
54            .read(true)
55            .create(true)
56            .write(true)
57            .truncate(false)
58            .open(Self::data_path_(path))?;
59        file.try_lock()?;
60
61        let regions = Regions::open(path)?;
62        let mmap = Self::create_mmap(&file)?;
63
64        let db = Self(Arc::new(DatabaseInner {
65            path: path.to_owned(),
66            file: RwLock::new(file),
67            mmap: RwLock::new(mmap),
68            regions: RwLock::new(regions),
69            layout: RwLock::new(Layout::default()),
70        }));
71
72        db.regions.write().fill_index_to_region(&db)?;
73        *db.layout.write() = Layout::from(&*db.regions.read());
74
75        // Ensure directory entries are durable
76        File::open(path)?.sync_data()?;
77
78        Ok(db)
79    }
80
81    pub fn file_len(&self) -> Result<u64> {
82        Ok(self.file.read().metadata()?.len())
83    }
84
85    pub fn set_min_len(&self, len: u64) -> Result<()> {
86        let len = Self::ceil_number_to_page_size_multiple(len);
87
88        let file_len = self.file_len()?;
89        if file_len < len {
90            let mut mmap = self.mmap.write();
91            let file = self.file.write();
92            file.set_len(len)?;
93            *mmap = Self::create_mmap(&file)?;
94            Ok(())
95        } else {
96            Ok(())
97        }
98    }
99
100    pub fn set_min_regions(&self, regions: usize) -> Result<()> {
101        self.regions
102            .write()
103            .set_min_len((regions * SIZE_OF_REGION_METADATA) as u64)?;
104        self.set_min_len(regions as u64 * PAGE_SIZE)
105    }
106
107    pub fn get_region(&self, id: &str) -> Option<Region> {
108        self.regions.read().get_region_from_id(id).cloned()
109    }
110
111    pub fn create_region_if_needed(&self, id: &str) -> Result<Region> {
112        if let Some(region) = self.get_region(id) {
113            return Ok(region);
114        }
115
116        let mut regions = self.regions.write();
117        let mut layout = self.layout.write();
118
119        let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
120            layout.remove_or_compress_hole(start, PAGE_SIZE);
121            start
122        } else {
123            let start = layout
124                .get_last_region()
125                .map(|(_, region)| {
126                    let region_meta = region.meta().read();
127                    region_meta.start() + region_meta.reserved()
128                })
129                .unwrap_or_default();
130
131            let len = start + PAGE_SIZE;
132
133            self.set_min_len(len)?;
134
135            start
136        };
137
138        let region = regions.create_region(self, id.to_owned(), start)?;
139
140        layout.insert_region(start, &region);
141
142        Ok(region)
143    }
144
145    #[inline]
146    pub fn write_all_to_region(&self, region: &Region, data: &[u8]) -> Result<()> {
147        self.write_all_to_region_at_(region, data, None, false)
148    }
149
150    #[inline]
151    pub fn write_all_to_region_at(&self, region: &Region, data: &[u8], at: u64) -> Result<()> {
152        self.write_all_to_region_at_(region, data, Some(at), false)
153    }
154
155    #[inline]
156    pub fn truncate_write_all_to_region(
157        &self,
158        region: &Region,
159        at: u64,
160        data: &[u8],
161    ) -> Result<()> {
162        self.write_all_to_region_at_(region, data, Some(at), true)
163    }
164
165    fn write_all_to_region_at_(
166        &self,
167        region: &Region,
168        data: &[u8],
169        at: Option<u64>,
170        truncate: bool,
171    ) -> Result<()> {
172        let region_meta = region.meta().read();
173        let start = region_meta.start();
174        let reserved = region_meta.reserved();
175        let len = region_meta.len();
176        drop(region_meta);
177
178        let data_len = data.len() as u64;
179
180        // Validate write position if specified
181        // Note: checking `at > len` is sufficient since `len <= reserved` is always true
182        // Therefore if `at <= len`, then `at <= reserved` must also be true
183        if let Some(at_val) = at
184            && at_val > len
185        {
186            return Err(Error::WriteOutOfBounds {
187                position: at_val,
188                region_len: len,
189            });
190        }
191
192        let new_len = at.map_or(len + data_len, |at| {
193            let new_len = at + data_len;
194            if truncate { new_len } else { new_len.max(len) }
195        });
196        let write_start = start + at.unwrap_or(len);
197
198        // Write to reserved space if possible
199        if new_len <= reserved {
200            // info!(
201            //     "Write {data_len} bytes to {region_index} reserved space at {write_start} (start = {start}, at = {at:?}, len = {len})"
202            // );
203
204            if at.is_none() {
205                self.write(write_start, data);
206            }
207
208            let mut region_meta = region.meta().write();
209
210            if at.is_some() {
211                self.write(write_start, data);
212            }
213
214            region_meta.set_len(new_len);
215
216            return Ok(());
217        }
218
219        assert!(new_len > reserved);
220        let mut new_reserved = reserved;
221        while new_len > new_reserved {
222            new_reserved *= 2;
223        }
224        assert!(new_len <= new_reserved);
225        let added_reserve = new_reserved - reserved;
226
227        let mut layout = self.layout.write();
228
229        // If is last continue writing
230        if layout.is_last_anything(region) {
231            // info!("{region_index} Append to file at {write_start}");
232
233            self.set_min_len(start + new_reserved)?;
234            let mut region_meta = region.meta().write();
235            region_meta.set_reserved(new_reserved);
236            drop(region_meta);
237            drop(layout);
238
239            self.write(write_start, data);
240
241            let mut region_meta = region.meta().write();
242            region_meta.set_len(new_len);
243
244            return Ok(());
245        }
246
247        // Expand region to the right if gap is wide enough
248        let hole_start = start + reserved;
249        if layout
250            .get_hole(hole_start)
251            .is_some_and(|gap| gap >= added_reserve)
252        {
253            // info!("Expand {region_index} to hole");
254
255            layout.remove_or_compress_hole(hole_start, added_reserve);
256            let mut region_meta = region.meta().write();
257            region_meta.set_reserved(new_reserved);
258            drop(region_meta);
259            drop(layout);
260
261            self.write(write_start, data);
262
263            let mut region_meta = region.meta().write();
264            region_meta.set_len(new_len);
265
266            return Ok(());
267        }
268
269        // Find hole big enough to move the region
270        if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
271            // info!("Move {region_index} to hole at {hole_start}");
272
273            layout.remove_or_compress_hole(hole_start, new_reserved);
274            drop(layout);
275
276            self.write(
277                hole_start,
278                &self.mmap.read()[start as usize..write_start as usize],
279            );
280
281            self.write(hole_start + at.unwrap_or(len), data);
282
283            let mut layout = self.layout.write();
284            layout.move_region(hole_start, region)?;
285
286            let mut region_meta = region.meta().write();
287            region_meta.set_start(hole_start);
288            region_meta.set_reserved(new_reserved);
289            region_meta.set_len(new_len);
290
291            return Ok(());
292        }
293
294        let new_start = layout.len();
295        // Write at the end
296        // info!(
297        //     "Move {region_index} to the end, from {start}..{} to {new_start}..{}",
298        //     start + reserved,
299        //     new_start + new_reserved
300        // );
301        self.set_min_len(new_start + new_reserved)?;
302        layout.reserve(new_start, new_reserved);
303        drop(layout);
304
305        // Read existing data and write to new location
306        self.write(
307            new_start,
308            &self.mmap.read()[start as usize..write_start as usize],
309        );
310        self.write(new_start + at.unwrap_or(len), data);
311
312        let mut layout = self.layout.write();
313        layout.move_region(new_start, region)?;
314        assert!(layout.reserved(new_start) == Some(new_reserved));
315
316        let mut region_meta = region.meta().write();
317        region_meta.set_start(new_start);
318        region_meta.set_reserved(new_reserved);
319        region_meta.set_len(new_len);
320
321        Ok(())
322    }
323
324    #[inline]
325    fn write(&self, at: u64, data: &[u8]) {
326        let mmap = self.mmap.read();
327        let data_len = data.len();
328        let start = at as usize;
329        let end = start + data_len;
330        if end > mmap.len() {
331            unreachable!("Trying to write beyond mmap")
332        }
333
334        (unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) })
335            [start..end]
336            .copy_from_slice(data);
337    }
338
339    ///
340    /// From relative to start
341    ///
342    /// Non destructive
343    ///
344    pub fn truncate_region(&self, region: &Region, from: u64) -> Result<()> {
345        let mut region_meta = region.meta().write();
346        let len = region_meta.len();
347        if from == len {
348            return Ok(());
349        } else if from > len {
350            return Err(Error::TruncateInvalid {
351                from,
352                current_len: len,
353            });
354        }
355        region_meta.set_len(from);
356        Ok(())
357    }
358
359    pub fn remove_region_with_id(&self, id: &str) -> Result<Option<Region>> {
360        let Some(region) = self.get_region(id) else {
361            return Ok(None);
362        };
363        self.remove_region(region)
364    }
365
366    pub fn remove_region(&self, region: Region) -> Result<Option<Region>> {
367        let mut regions = self.regions.write();
368        let mut layout = self.layout.write();
369        layout.remove_region(&region)?;
370        regions.remove_region(region)
371    }
372
373    pub fn retain_regions(&self, mut ids: HashSet<String>) -> Result<()> {
374        let regions_to_remove = self
375            .regions
376            .read()
377            .id_to_index()
378            .keys()
379            .filter(|id| !ids.remove(&**id))
380            .flat_map(|id| self.get_region(id))
381            .collect::<Vec<Region>>();
382
383        regions_to_remove
384            .into_iter()
385            .try_for_each(|region| -> Result<()> {
386                self.remove_region(region)?;
387                Ok(())
388            })
389    }
390
391    #[inline]
392    fn create_mmap(file: &File) -> Result<MmapMut> {
393        Ok(unsafe { MmapOptions::new().map_mut(file)? })
394    }
395
396    #[inline]
397    pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
398        self.mmap.read()
399    }
400
401    #[inline]
402    pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
403        self.regions.read()
404    }
405
406    #[inline]
407    pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
408        self.layout.read()
409    }
410
411    #[inline]
412    fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
413        (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
414    }
415
416    #[inline]
417    fn data_path(&self) -> PathBuf {
418        Self::data_path_(&self.path)
419    }
420    #[inline]
421    fn data_path_(path: &Path) -> PathBuf {
422        path.join("data")
423    }
424
425    /// Open a dedicated file handle for sequential reading
426    /// This enables optimal kernel readahead for iteration
427    #[inline]
428    pub fn open_read_only_file(&self) -> Result<File> {
429        File::open(self.data_path()).map_err(Error::from)
430    }
431
432    pub fn disk_usage(&self) -> String {
433        let path = self.data_path();
434
435        let output = std::process::Command::new("du")
436            .arg("-h")
437            .arg(&path)
438            .output()
439            .expect("Failed to run du");
440
441        String::from_utf8_lossy(&output.stdout)
442            .replace(path.to_str().unwrap(), " ")
443            .trim()
444            .to_string()
445    }
446
447    pub fn flush(&self) -> Result<()> {
448        let mmap = self.mmap.read();
449        let regions = self.regions.read();
450        mmap.flush()?;
451        regions.flush()?;
452
453        // Now that metadata is durable, pending holes can be reused
454        self.layout.write().promote_pending_holes();
455
456        Ok(())
457    }
458
459    #[inline]
460    pub fn compact(&self) -> Result<()> {
461        self.flush()?;
462        self.punch_holes()
463    }
464
465    fn punch_holes(&self) -> Result<()> {
466        let file = self.file.write();
467        let mut mmap = self.mmap.write();
468        let regions = self.regions.read();
469        let layout = self.layout.read();
470
471        let mut punched = regions
472            .index_to_region()
473            .par_iter()
474            .flatten()
475            .map(|region| -> Result<usize> {
476                // let region = region_lock.read();
477                let region_meta = region.meta().read();
478                let rstart = region_meta.start();
479                let len = region_meta.len();
480                let reserved = region_meta.reserved();
481                let ceil_len = Self::ceil_number_to_page_size_multiple(len);
482                assert!(len <= ceil_len);
483                if ceil_len > reserved {
484                    panic!()
485                } else if ceil_len < reserved {
486                    let start = rstart + ceil_len;
487                    let hole = reserved - ceil_len;
488                    if Self::approx_has_punchable_data(&mmap, start, hole) {
489                        Self::punch_hole(&file, start, hole)?;
490                        return Ok(1);
491                    }
492                }
493                Ok(0)
494            })
495            .sum::<Result<usize>>()?;
496
497        punched += layout
498            .start_to_hole()
499            .par_iter()
500            .map(|(&start, &hole)| -> Result<usize> {
501                if Self::approx_has_punchable_data(&mmap, start, hole) {
502                    Self::punch_hole(&file, start, hole)?;
503                    return Ok(1);
504                }
505                Ok(0)
506            })
507            .sum::<Result<usize>>()?;
508
509        if punched > 0 {
510            unsafe {
511                libc::fsync(file.as_raw_fd());
512            }
513            *mmap = Self::create_mmap(&file)?;
514        }
515
516        Ok(())
517    }
518
519    fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
520        assert!(start.is_multiple_of(PAGE_SIZE));
521        assert!(len.is_multiple_of(PAGE_SIZE));
522
523        let start = start as usize;
524        let len = len as usize;
525
526        let min = start;
527        let max = start + len;
528        let check = |start, end| {
529            assert!(start >= min);
530            assert!(end < max);
531            let start_is_some = mmap[start] != 0;
532            let end_is_some = mmap[end] != 0;
533            start_is_some || end_is_some
534        };
535
536        let first_page_start = start;
537        let first_page_end = start + PAGE_SIZE as usize - 1;
538        if check(first_page_start, first_page_end) {
539            return true;
540        }
541
542        let last_page_start = start + len - PAGE_SIZE as usize;
543        let last_page_end = start + len - 1;
544        if check(last_page_start, last_page_end) {
545            return true;
546        }
547
548        if len > GB {
549            let num_gb_checks = len / GB;
550            for i in 1..num_gb_checks {
551                let gb_boundary = start + i * GB;
552                let page_start = gb_boundary;
553                let page_end = gb_boundary + PAGE_SIZE as usize - 1;
554
555                if check(page_start, page_end) {
556                    return true;
557                }
558            }
559        }
560
561        false
562    }
563
564    #[cfg(target_os = "macos")]
565    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
566        let fpunchhole = FPunchhole {
567            fp_flags: 0,
568            reserved: 0,
569            fp_offset: start as libc::off_t,
570            fp_length: length as libc::off_t,
571        };
572
573        let result = unsafe {
574            libc::fcntl(
575                file.as_raw_fd(),
576                libc::F_PUNCHHOLE,
577                &fpunchhole as *const FPunchhole,
578            )
579        };
580
581        if result == -1 {
582            let err = std::io::Error::last_os_error();
583            return Err(Error::HolePunchFailed {
584                start,
585                len: length,
586                source: err,
587            });
588        }
589
590        Ok(())
591    }
592
593    #[cfg(target_os = "linux")]
594    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
595        let result = unsafe {
596            libc::fallocate(
597                file.as_raw_fd(),
598                libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
599                start as libc::off_t,
600                length as libc::off_t,
601            )
602        };
603
604        if result == -1 {
605            let err = std::io::Error::last_os_error();
606            return Err(Error::HolePunchFailed {
607                start,
608                len: length,
609                source: err,
610            });
611        }
612
613        Ok(())
614    }
615
616    #[cfg(target_os = "freebsd")]
617    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
618        let fd = file.as_raw_fd();
619
620        let mut spacectl = libc::spacectl_range {
621            r_offset: start as libc::off_t,
622            r_len: length as libc::off_t,
623        };
624
625        let result = unsafe {
626            libc::fspacectl(
627                fd,
628                libc::SPACECTL_DEALLOC,
629                &spacectl as *const libc::spacectl_range,
630                0,
631                &mut spacectl as *mut libc::spacectl_range,
632            )
633        };
634
635        if result == -1 {
636            let err = std::io::Error::last_os_error();
637            return Err(Error::HolePunchFailed {
638                start,
639                len: length,
640                source: err,
641            });
642        }
643
644        Ok(())
645    }
646
647    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "freebsd")))]
648    fn punch_hole(_file: &File, _start: u64, _length: u64) -> Result<()> {
649        Err(Error::String(
650            "Hole punching not supported on this platform".to_string(),
651        ))
652    }
653
654    #[inline]
655    pub fn path(&self) -> &Path {
656        &self.path
657    }
658
659    #[inline]
660    pub fn weak_clone(&self) -> WeakDatabase {
661        WeakDatabase(Arc::downgrade(&self.0))
662    }
663}
664
665impl Deref for Database {
666    type Target = Arc<DatabaseInner>;
667    fn deref(&self) -> &Self::Target {
668        &self.0
669    }
670}
671
672#[repr(C)]
673struct FPunchhole {
674    fp_flags: u32,
675    reserved: u32,
676    fp_offset: off_t,
677    fp_length: off_t,
678}
679
680#[derive(Debug, Clone)]
681pub struct WeakDatabase(Weak<DatabaseInner>);
682
683impl WeakDatabase {
684    pub fn upgrade(&self) -> Database {
685        Database(
686            self.0
687                .upgrade()
688                .expect("Database was dropped while Region still exists"),
689        )
690    }
691}