seqdb/
lib.rs

1#![doc = include_str!(concat!("../", env!("CARGO_PKG_README")))]
2#![doc = "\n## Example\n"]
3#![doc = "\n```rust"]
4#![doc = include_str!("../examples/db.rs")]
5#![doc = "```\n"]
6
7use std::{
8    collections::HashSet,
9    fs::{self, File, OpenOptions},
10    ops::Deref,
11    os::unix::io::AsRawFd,
12    path::{Path, PathBuf},
13    sync::Arc,
14};
15
16use allocative::Allocative;
17use libc::off_t;
18use memmap2::{MmapMut, MmapOptions};
19use parking_lot::{RwLock, RwLockReadGuard};
20
21pub mod error;
22mod identifier;
23mod layout;
24mod reader;
25mod region;
26mod regions;
27
28pub use error::*;
29pub use identifier::*;
30use layout::*;
31use rayon::prelude::*;
32pub use reader::*;
33pub use region::*;
34use regions::*;
35
36pub const PAGE_SIZE: u64 = 4096;
37pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
38const GB: u64 = 1024 * 1024 * 1024;
39
40#[derive(Debug, Clone, Allocative)]
41pub struct Database(Arc<DatabaseInner>);
42
43impl Database {
44    pub fn open(path: &Path) -> Result<Self> {
45        Ok(Self(Arc::new(DatabaseInner::open(path)?)))
46    }
47}
48
49impl Deref for Database {
50    type Target = DatabaseInner;
51    fn deref(&self) -> &Self::Target {
52        &self.0
53    }
54}
55
56#[derive(Debug, Allocative)]
57pub struct DatabaseInner {
58    path: PathBuf,
59    regions: RwLock<Regions>,
60    layout: RwLock<Layout>,
61    #[allocative(skip)]
62    file: RwLock<File>,
63    #[allocative(skip)]
64    mmap: RwLock<MmapMut>,
65}
66
67impl DatabaseInner {
68    fn open(path: &Path) -> Result<Self> {
69        fs::create_dir_all(path)?;
70
71        let regions = Regions::open(path)?;
72
73        let layout = Layout::from(&regions);
74
75        let file = OpenOptions::new()
76            .read(true)
77            .create(true)
78            .write(true)
79            .truncate(false)
80            .open(Self::data_path_(path))?;
81        file.try_lock()?;
82
83        let mmap = Self::create_mmap(&file)?;
84
85        Ok(Self {
86            path: path.to_owned(),
87            file: RwLock::new(file),
88            mmap: RwLock::new(mmap),
89            regions: RwLock::new(regions),
90            layout: RwLock::new(layout),
91        })
92    }
93
94    pub fn file_len(&self) -> Result<u64> {
95        Ok(self.file.read().metadata()?.len())
96    }
97
98    pub fn set_min_len(&self, len: u64) -> Result<()> {
99        let len = Self::ceil_number_to_page_size_multiple(len);
100
101        let file_len = self.file_len()?;
102        if file_len < len {
103            let mut mmap = self.mmap.write();
104            let file = self.file.write();
105            file.set_len(len)?;
106            *mmap = Self::create_mmap(&file)?;
107            Ok(())
108        } else {
109            Ok(())
110        }
111    }
112
113    pub fn set_min_regions(&self, regions: usize) -> Result<()> {
114        self.regions
115            .write()
116            .set_min_len((regions * SIZE_OF_REGION) as u64)?;
117        self.set_min_len(regions as u64 * PAGE_SIZE)
118    }
119
120    pub fn create_region_if_needed(&self, id: &str) -> Result<(usize, Arc<RwLock<Region>>)> {
121        let regions = self.regions.read();
122        if let Some(index) = regions.get_region_index_from_id(id) {
123            return Ok((index, regions.get_region_from_index(index).unwrap()));
124        }
125        drop(regions);
126
127        let mut regions = self.regions.write();
128        let mut layout = self.layout.write();
129
130        let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
131            layout.remove_or_compress_hole(start, PAGE_SIZE);
132            start
133        } else {
134            let start = layout
135                .get_last_region_index()
136                .map(|index| {
137                    let region_opt = regions.get_region_from_index(index);
138                    let region = region_opt.as_ref().unwrap().read();
139                    region.start() + region.reserved()
140                })
141                .unwrap_or_default();
142
143            let len = start + PAGE_SIZE;
144
145            self.set_min_len(len)?;
146
147            start
148        };
149
150        let (index, region) = regions.create_region(id.to_owned(), start)?;
151
152        layout.insert_region(start, index);
153
154        Ok((index, region))
155    }
156
157    pub fn get_region(&self, identifier: Identifier) -> Result<RwLockReadGuard<'static, Region>> {
158        let regions = self.regions.read();
159        let region_opt = regions.get_region(identifier);
160        let region_arc = region_opt.ok_or(Error::Str("Unknown region"))?;
161        let region = region_arc.read();
162        let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute(region) };
163        Ok(region)
164    }
165
166    pub fn create_region_reader<'a>(&'a self, identifier: Identifier) -> Result<Reader<'a>> {
167        let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read();
168        let region = self.get_region(identifier)?;
169        Ok(Reader::new(mmap, region))
170    }
171
172    #[inline]
173    pub fn write_all_to_region(&self, identifier: Identifier, data: &[u8]) -> Result<()> {
174        self.write_all_to_region_at_(identifier, data, None, false)
175    }
176
177    #[inline]
178    pub fn write_all_to_region_at(
179        &self,
180        identifier: Identifier,
181        data: &[u8],
182        at: u64,
183    ) -> Result<()> {
184        self.write_all_to_region_at_(identifier, data, Some(at), false)
185    }
186
187    #[inline]
188    pub fn truncate_write_all_to_region(
189        &self,
190        identifier: Identifier,
191        at: u64,
192        data: &[u8],
193    ) -> Result<()> {
194        self.write_all_to_region_at_(identifier, data, Some(at), true)
195    }
196
197    fn write_all_to_region_at_(
198        &self,
199        identifier: Identifier,
200        data: &[u8],
201        at: Option<u64>,
202        truncate: bool,
203    ) -> Result<()> {
204        let regions = self.regions.read();
205        let Some(region_lock) = regions.get_region(identifier.clone()) else {
206            return Err(Error::Str("Unknown region"));
207        };
208
209        let region_index = regions.identifier_to_index(identifier).unwrap();
210
211        let region = region_lock.read();
212        let start = region.start();
213        let reserved = region.reserved();
214        let len = region.len();
215        drop(region);
216
217        let data_len = data.len() as u64;
218        let new_len = at.map_or(len + data_len, |at| {
219            assert!(at <= len);
220            let new_len = at + data_len;
221            if truncate { new_len } else { new_len.max(len) }
222        });
223        let write_start = start + at.unwrap_or(len);
224
225        if at.is_some_and(|at| at > reserved) {
226            return Err(Error::Str("Invalid at parameter"));
227        }
228
229        // Write to reserved space if possible
230        if new_len <= reserved {
231            // info!(
232            //     "Write {data_len} bytes to {region_index} reserved space at {write_start} (start = {start}, at = {at:?}, len = {len})"
233            // );
234
235            if at.is_none() {
236                self.write(write_start, data);
237            }
238
239            let mut region = region_lock.write();
240
241            if at.is_some() {
242                self.write(write_start, data);
243            }
244
245            region.set_len(new_len);
246            regions.write_to_mmap(&region, region_index);
247
248            return Ok(());
249        }
250
251        assert!(new_len > reserved);
252        let mut new_reserved = reserved;
253        while new_len > new_reserved {
254            new_reserved *= 2;
255        }
256        assert!(new_len <= new_reserved);
257        let added_reserve = new_reserved - reserved;
258
259        let mut layout = self.layout.write();
260
261        // If is last continue writing
262        if layout.is_last_anything(region_index) {
263            // info!("{region_index} Append to file at {write_start}");
264
265            self.set_min_len(start + new_reserved)?;
266            let mut region = region_lock.write();
267            region.set_reserved(new_reserved);
268            drop(region);
269            drop(layout);
270
271            self.write(write_start, data);
272
273            let mut region = region_lock.write();
274            region.set_len(new_len);
275            regions.write_to_mmap(&region, region_index);
276
277            return Ok(());
278        }
279
280        // Expand region to the right if gap is wide enough
281        let hole_start = start + reserved;
282        if layout
283            .get_hole(hole_start)
284            .is_some_and(|gap| gap >= added_reserve)
285        {
286            // info!("Expand {region_index} to hole");
287
288            layout.remove_or_compress_hole(hole_start, added_reserve);
289            let mut region = region_lock.write();
290            region.set_reserved(new_reserved);
291            drop(region);
292            drop(layout);
293
294            self.write(write_start, data);
295
296            let mut region = region_lock.write();
297            region.set_len(new_len);
298            regions.write_to_mmap(&region, region_index);
299
300            return Ok(());
301        }
302
303        // Find hole big enough to move the region
304        if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
305            // info!("Move {region_index} to hole at {hole_start}");
306
307            layout.remove_or_compress_hole(hole_start, new_reserved);
308            drop(layout);
309
310            self.write(
311                hole_start,
312                &self.mmap.read()[start as usize..write_start as usize],
313            );
314            self.write(hole_start + at.unwrap_or(len), data);
315
316            let mut region = region_lock.write();
317            let mut layout = self.layout.write();
318            layout.move_region(hole_start, region_index, &region)?;
319            drop(layout);
320
321            region.set_start(hole_start);
322            region.set_reserved(new_reserved);
323            region.set_len(new_len);
324            regions.write_to_mmap(&region, region_index);
325
326            return Ok(());
327        }
328
329        let new_start = layout.len(&regions);
330        // Write at the end
331        // info!(
332        //     "Move {region_index} to the end, from {start}..{} to {new_start}..{}",
333        //     start + reserved,
334        //     new_start + new_reserved
335        // );
336        self.set_min_len(new_start + new_reserved)?;
337        layout.reserve(new_start, new_reserved);
338        drop(layout);
339
340        self.write(
341            new_start,
342            &self.mmap.read()[start as usize..write_start as usize],
343        );
344        self.write(new_start + at.unwrap_or(len), data);
345
346        let mut region = region_lock.write();
347        let mut layout = self.layout.write();
348        layout.move_region(new_start, region_index, &region)?;
349        assert!(layout.reserved(new_start) == Some(new_reserved));
350        drop(layout);
351
352        region.set_start(new_start);
353        region.set_reserved(new_reserved);
354        region.set_len(new_len);
355        regions.write_to_mmap(&region, region_index);
356
357        Ok(())
358    }
359
360    fn write(&self, at: u64, data: &[u8]) {
361        let mmap = self.mmap.read();
362
363        let data_len = data.len();
364        let start = at as usize;
365        let end = start + data_len;
366
367        if end > mmap.len() {
368            unreachable!("Trying to write beyond mmap")
369        }
370
371        let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) };
372
373        slice[start..end].copy_from_slice(data);
374    }
375
376    ///
377    /// From relative to start
378    ///
379    /// Non destructive
380    ///
381    pub fn truncate_region(&self, identifier: Identifier, from: u64) -> Result<()> {
382        let Some(region) = self.regions.read().get_region(identifier.clone()) else {
383            return Err(Error::Str("Unknown region"));
384        };
385        let mut region_ = region.write();
386        let len = region_.len();
387        if from == len {
388            return Ok(());
389        } else if from > len {
390            return Err(Error::Str("Truncating further than length"));
391        }
392        region_.set_len(from);
393        Ok(())
394    }
395
396    pub fn remove_region(&self, identifier: Identifier) -> Result<Option<Arc<RwLock<Region>>>> {
397        let mut regions = self.regions.write();
398
399        let mut layout = self.layout.write();
400
401        let index_opt = regions.identifier_to_index(identifier.clone());
402
403        let Some(region) = regions.remove_region(identifier)? else {
404            return Ok(None);
405        };
406
407        let index = index_opt.unwrap();
408
409        let region_ = region.write();
410
411        layout.remove_region(index, &region_)?;
412
413        drop(region_);
414
415        Ok(Some(region))
416    }
417
418    pub fn retain_regions(&self, mut ids: HashSet<String>) -> Result<()> {
419        let ids_to_remove = self
420            .regions
421            .read()
422            .id_to_index()
423            .keys()
424            .filter(|id| !ids.remove(&**id))
425            .cloned()
426            .collect::<Vec<String>>();
427
428        ids_to_remove.into_iter().try_for_each(|id| -> Result<()> {
429            self.remove_region(id.into())?;
430            Ok(())
431        })
432    }
433
434    fn create_mmap(file: &File) -> Result<MmapMut> {
435        Ok(unsafe { MmapOptions::new().map_mut(file)? })
436    }
437
438    pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
439        self.regions.read()
440    }
441
442    pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
443        self.layout.read()
444    }
445
446    pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
447        self.mmap.read()
448    }
449
450    fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
451        (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
452    }
453
454    fn data_path(&self) -> PathBuf {
455        Self::data_path_(&self.path)
456    }
457    fn data_path_(path: &Path) -> PathBuf {
458        path.join("data")
459    }
460
461    pub fn disk_usage(&self) -> String {
462        let path = self.data_path();
463
464        let output = std::process::Command::new("du")
465            .arg("-h")
466            .arg(&path)
467            .output()
468            .expect("Failed to run du");
469
470        String::from_utf8_lossy(&output.stdout)
471            .replace(path.to_str().unwrap(), " ")
472            .trim()
473            .to_string()
474    }
475
476    pub fn flush(&self) -> Result<()> {
477        let mmap = self.mmap.read();
478        let regions = self.regions.read();
479        mmap.flush()?;
480        regions.flush()
481    }
482
483    pub fn flush_then_punch(&self) -> Result<()> {
484        self.flush()?;
485        self.punch_holes()
486    }
487
488    pub fn punch_holes(&self) -> Result<()> {
489        let file = self.file.write();
490        let mut mmap = self.mmap.write();
491        let regions = self.regions.read();
492        let layout = self.layout.read();
493
494        let mut punched = regions
495            .index_to_region()
496            .par_iter()
497            .flatten()
498            .map(|region_lock| -> Result<usize> {
499                let region = region_lock.read();
500                let rstart = region.start();
501                let len = region.len();
502                let reserved = region.reserved();
503                let ceil_len = Self::ceil_number_to_page_size_multiple(len);
504                assert!(len <= ceil_len);
505                if ceil_len > reserved {
506                    panic!()
507                } else if ceil_len < reserved {
508                    let start = rstart + ceil_len;
509                    let hole = reserved - ceil_len;
510                    if Self::approx_has_punchable_data(&mmap, start, hole) {
511                        // info!(
512                        //     "dbg: {:?}",
513                        //     (region, rstart, len, ceil_len, reserved, start, hole)
514                        // );
515                        // info!("Punching a hole of {hole} bytes at {start}...");
516                        Self::punch_hole(&file, start, hole)?;
517                        return Ok(1);
518                    }
519                }
520                Ok(0)
521            })
522            .sum::<Result<usize>>()?;
523
524        punched += layout
525            .start_to_hole()
526            .par_iter()
527            .map(|(&start, &hole)| -> Result<usize> {
528                if Self::approx_has_punchable_data(&mmap, start, hole) {
529                    // info!("dbg: {:?}", (start, hole));
530                    // info!("Punching a hole of {hole} bytes at {start}...");
531                    Self::punch_hole(&file, start, hole)?;
532                    Ok(1)
533                } else {
534                    Ok(0)
535                }
536            })
537            .sum::<Result<usize>>()?;
538
539        if punched > 0 {
540            unsafe {
541                libc::fsync(file.as_raw_fd());
542            }
543            // info!("Remaping post hole punching...");
544            *mmap = Self::create_mmap(&file)?;
545        }
546
547        Ok(())
548    }
549
550    fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
551        assert!(start % PAGE_SIZE == 0);
552        assert!(len % PAGE_SIZE == 0);
553
554        let min = start as usize;
555        let max = (start + len) as usize;
556        let check = |start, end| {
557            assert!(start >= min);
558            assert!(end < max);
559            let start_is_some = mmap[start] != 0;
560            // if start_is_some {
561            // info!("mmap[start = {}] = {}", start, mmap[start])
562            // }
563            let end_is_some = mmap[end] != 0;
564            // if end_is_some {
565            // info!("mmap[end = {}] = {}", end, mmap[end])
566            // }
567            start_is_some || end_is_some
568        };
569
570        // Check first page (first and last byte)
571        let first_page_start = start as usize;
572        let first_page_end = (start + PAGE_SIZE - 1) as usize;
573        if check(first_page_start, first_page_end) {
574            return true;
575        }
576
577        // Check last page (first and last byte)
578        let last_page_start = (start + len - PAGE_SIZE) as usize;
579        let last_page_end = (start + len - 1) as usize;
580        if check(last_page_start, last_page_end) {
581            return true;
582        }
583
584        // For large lengths, check at 1GB intervals
585        if len > GB {
586            let num_gb_checks = (len / GB) as usize;
587            for i in 1..num_gb_checks {
588                let gb_boundary = start + (i as u64 * GB);
589                let page_start = gb_boundary as usize;
590                let page_end = (gb_boundary + PAGE_SIZE - 1) as usize;
591
592                if check(page_start, page_end) {
593                    return true;
594                }
595            }
596        }
597
598        false
599    }
600
601    #[cfg(target_os = "macos")]
602    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
603        let fpunchhole = FPunchhole {
604            fp_flags: 0,
605            reserved: 0,
606            fp_offset: start as libc::off_t,
607            fp_length: length as libc::off_t,
608        };
609
610        let result = unsafe {
611            libc::fcntl(
612                file.as_raw_fd(),
613                libc::F_PUNCHHOLE,
614                &fpunchhole as *const FPunchhole,
615            )
616        };
617
618        if result == -1 {
619            let err = std::io::Error::last_os_error();
620            return Err(Error::String(format!("Failed to punch hole: {err}")));
621        }
622
623        Ok(())
624    }
625
626    #[cfg(target_os = "linux")]
627    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
628        let result = unsafe {
629            libc::fallocate(
630                file.as_raw_fd(),
631                libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
632                start as libc::off_t,
633                length as libc::off_t,
634            )
635        };
636
637        if result == -1 {
638            let err = std::io::Error::last_os_error();
639            return Err(Error::String(format!("Failed to punch hole: {err}")));
640        }
641
642        Ok(())
643    }
644
645    #[cfg(target_os = "freebsd")]
646    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
647        let fd = file.as_raw_fd();
648
649        let mut spacectl = libc::spacectl_range {
650            r_offset: offset as libc::off_t,
651            r_len: length as libc::off_t,
652        };
653
654        let result = unsafe {
655            libc::fspacectl(
656                fd,
657                libc::SPACECTL_DEALLOC,
658                &spacectl as *const libc::spacectl_range,
659                0,
660                &mut spacectl as *mut libc::spacectl_range,
661            )
662        };
663
664        if result == -1 {
665            let err = std::io::Error::last_os_error();
666            return Err(Error::String(format!("Failed to punch hole: {err}")));
667        }
668
669        Ok(())
670    }
671
672    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "freebsd")))]
673    fn punch_hole(_file: &File, _start: u64, _length: u64) -> Result<()> {
674        Err(Error::String(
675            "Hole punching not supported on this platform".to_string(),
676        ))
677    }
678
679    pub fn path(&self) -> &Path {
680        &self.path
681    }
682}
683
684#[repr(C)]
685struct FPunchhole {
686    fp_flags: u32,
687    reserved: u32,
688    fp_offset: off_t,
689    fp_length: off_t,
690}