seqdb/
lib.rs

1#![doc = include_str!(concat!("../", env!("CARGO_PKG_README")))]
2#![doc = "\n## Example\n"]
3#![doc = "\n```rust"]
4#![doc = include_str!("../examples/db.rs")]
5#![doc = "```\n"]
6
7use std::{
8    fs::{self, File, OpenOptions},
9    ops::Deref,
10    os::unix::io::AsRawFd,
11    path::{Path, PathBuf},
12    sync::Arc,
13};
14
15use libc::off_t;
16use memmap2::{MmapMut, MmapOptions};
17use parking_lot::{RwLock, RwLockReadGuard};
18
19pub mod error;
20mod identifier;
21mod layout;
22mod reader;
23mod region;
24mod regions;
25
26pub use error::*;
27pub use identifier::*;
28use layout::*;
29use rayon::prelude::*;
30pub use reader::*;
31pub use region::*;
32use regions::*;
33
34pub const PAGE_SIZE: u64 = 4096;
35pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
36const GB: u64 = 1024 * 1024 * 1024;
37
38#[derive(Debug, Clone)]
39pub struct Database(Arc<DatabaseInner>);
40
41impl Database {
42    pub fn open(path: &Path) -> Result<Self> {
43        Ok(Self(Arc::new(DatabaseInner::open(path)?)))
44    }
45}
46
47impl Deref for Database {
48    type Target = DatabaseInner;
49    fn deref(&self) -> &Self::Target {
50        &self.0
51    }
52}
53
54#[derive(Debug)]
55pub struct DatabaseInner {
56    path: PathBuf,
57    regions: RwLock<Regions>,
58    layout: RwLock<Layout>,
59    file: RwLock<File>,
60    mmap: RwLock<MmapMut>,
61}
62
63impl DatabaseInner {
64    fn open(path: &Path) -> Result<Self> {
65        fs::create_dir_all(path)?;
66
67        let regions = Regions::open(path)?;
68
69        let layout = Layout::from(&regions);
70
71        let file = OpenOptions::new()
72            .read(true)
73            .create(true)
74            .write(true)
75            .truncate(false)
76            .open(Self::data_path_(path))?;
77        file.try_lock()?;
78
79        let mmap = Self::create_mmap(&file)?;
80
81        Ok(Self {
82            path: path.to_owned(),
83            file: RwLock::new(file),
84            mmap: RwLock::new(mmap),
85            regions: RwLock::new(regions),
86            layout: RwLock::new(layout),
87        })
88    }
89
90    pub fn file_len(&self) -> Result<u64> {
91        Ok(self.file.read().metadata()?.len())
92    }
93
94    pub fn set_min_len(&self, len: u64) -> Result<()> {
95        let len = Self::ceil_number_to_page_size_multiple(len);
96
97        let file_len = self.file_len()?;
98        if file_len < len {
99            let mut mmap = self.mmap.write();
100            let file = self.file.write();
101            file.set_len(len)?;
102            *mmap = Self::create_mmap(&file)?;
103            Ok(())
104        } else {
105            Ok(())
106        }
107    }
108
109    pub fn set_min_regions(&self, regions: usize) -> Result<()> {
110        self.regions
111            .write()
112            .set_min_len((regions * SIZE_OF_REGION) as u64)?;
113        self.set_min_len(regions as u64 * PAGE_SIZE)
114    }
115
116    pub fn create_region_if_needed(&self, id: &str) -> Result<(usize, Arc<RwLock<Region>>)> {
117        let regions = self.regions.read();
118        if let Some(index) = regions.get_region_index_from_id(id) {
119            return Ok((index, regions.get_region_from_index(index).unwrap()));
120        }
121        drop(regions);
122
123        let mut regions = self.regions.write();
124        let mut layout = self.layout.write();
125
126        let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
127            layout.remove_or_compress_hole(start, PAGE_SIZE);
128            start
129        } else {
130            let start = layout
131                .get_last_region_index()
132                .map(|index| {
133                    let region_opt = regions.get_region_from_index(index);
134                    let region = region_opt.as_ref().unwrap().read();
135                    region.start() + region.reserved()
136                })
137                .unwrap_or_default();
138
139            let len = start + PAGE_SIZE;
140
141            self.set_min_len(len)?;
142
143            start
144        };
145
146        let (index, region) = regions.create_region(id.to_owned(), start)?;
147
148        layout.insert_region(start, index);
149
150        Ok((index, region))
151    }
152
153    pub fn get_region(&self, identifier: Identifier) -> Result<RwLockReadGuard<'static, Region>> {
154        let regions = self.regions.read();
155        let region_opt = regions.get_region(identifier);
156        let region_arc = region_opt.ok_or(Error::Str("Unknown region"))?;
157        let region = region_arc.read();
158        let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute(region) };
159        Ok(region)
160    }
161
162    pub fn create_region_reader<'a>(&'a self, identifier: Identifier) -> Result<Reader<'a>> {
163        let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read();
164        let region = self.get_region(identifier)?;
165        Ok(Reader::new(mmap, region))
166    }
167
168    #[inline]
169    pub fn write_all_to_region(&self, identifier: Identifier, data: &[u8]) -> Result<()> {
170        self.write_all_to_region_at_(identifier, data, None, false)
171    }
172
173    #[inline]
174    pub fn write_all_to_region_at(
175        &self,
176        identifier: Identifier,
177        data: &[u8],
178        at: u64,
179    ) -> Result<()> {
180        self.write_all_to_region_at_(identifier, data, Some(at), false)
181    }
182
183    #[inline]
184    pub fn truncate_write_all_to_region(
185        &self,
186        identifier: Identifier,
187        at: u64,
188        data: &[u8],
189    ) -> Result<()> {
190        self.write_all_to_region_at_(identifier, data, Some(at), true)
191    }
192
193    fn write_all_to_region_at_(
194        &self,
195        identifier: Identifier,
196        data: &[u8],
197        at: Option<u64>,
198        truncate: bool,
199    ) -> Result<()> {
200        let regions = self.regions.read();
201        let Some(region_lock) = regions.get_region(identifier.clone()) else {
202            return Err(Error::Str("Unknown region"));
203        };
204
205        let region_index = regions.identifier_to_index(identifier).unwrap();
206
207        let region = region_lock.read();
208        let start = region.start();
209        let reserved = region.reserved();
210        let len = region.len();
211        drop(region);
212
213        let data_len = data.len() as u64;
214        let new_len = at.map_or(len + data_len, |at| {
215            assert!(at <= len);
216            let new_len = at + data_len;
217            if truncate { new_len } else { new_len.max(len) }
218        });
219        let write_start = start + at.unwrap_or(len);
220
221        if at.is_some_and(|at| at > reserved) {
222            return Err(Error::Str("Invalid at parameter"));
223        }
224
225        // Write to reserved space if possible
226        if new_len <= reserved {
227            // info!(
228            //     "Write {data_len} bytes to {region_index} reserved space at {write_start} (start = {start}, at = {at:?}, len = {len})"
229            // );
230
231            if at.is_none() {
232                self.write(write_start, data);
233            }
234
235            let mut region = region_lock.write();
236
237            if at.is_some() {
238                self.write(write_start, data);
239            }
240
241            region.set_len(new_len);
242            regions.write_to_mmap(&region, region_index);
243
244            return Ok(());
245        }
246
247        assert!(new_len > reserved);
248        let mut new_reserved = reserved;
249        while new_len > new_reserved {
250            new_reserved *= 2;
251        }
252        assert!(new_len <= new_reserved);
253        let added_reserve = new_reserved - reserved;
254
255        let mut layout = self.layout.write();
256
257        // If is last continue writing
258        if layout.is_last_anything(region_index) {
259            // info!("{region_index} Append to file at {write_start}");
260
261            self.set_min_len(start + new_reserved)?;
262            let mut region = region_lock.write();
263            region.set_reserved(new_reserved);
264            drop(region);
265            drop(layout);
266
267            self.write(write_start, data);
268
269            let mut region = region_lock.write();
270            region.set_len(new_len);
271            regions.write_to_mmap(&region, region_index);
272
273            return Ok(());
274        }
275
276        // Expand region to the right if gap is wide enough
277        let hole_start = start + reserved;
278        if layout
279            .get_hole(hole_start)
280            .is_some_and(|gap| gap >= added_reserve)
281        {
282            // info!("Expand {region_index} to hole");
283
284            layout.remove_or_compress_hole(hole_start, added_reserve);
285            let mut region = region_lock.write();
286            region.set_reserved(new_reserved);
287            drop(region);
288            drop(layout);
289
290            self.write(write_start, data);
291
292            let mut region = region_lock.write();
293            region.set_len(new_len);
294            regions.write_to_mmap(&region, region_index);
295
296            return Ok(());
297        }
298
299        // Find hole big enough to move the region
300        if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
301            // info!("Move {region_index} to hole at {hole_start}");
302
303            layout.remove_or_compress_hole(hole_start, new_reserved);
304            drop(layout);
305
306            self.write(
307                hole_start,
308                &self.mmap.read()[start as usize..write_start as usize],
309            );
310            self.write(hole_start + at.unwrap_or(len), data);
311
312            let mut region = region_lock.write();
313            let mut layout = self.layout.write();
314            layout.move_region(hole_start, region_index, &region)?;
315            drop(layout);
316
317            region.set_start(hole_start);
318            region.set_reserved(new_reserved);
319            region.set_len(new_len);
320            regions.write_to_mmap(&region, region_index);
321
322            return Ok(());
323        }
324
325        let new_start = layout.len(&regions);
326        // Write at the end
327        // info!(
328        //     "Move {region_index} to the end, from {start}..{} to {new_start}..{}",
329        //     start + reserved,
330        //     new_start + new_reserved
331        // );
332        self.set_min_len(new_start + new_reserved)?;
333        layout.reserve(new_start, new_reserved);
334        drop(layout);
335
336        self.write(
337            new_start,
338            &self.mmap.read()[start as usize..write_start as usize],
339        );
340        self.write(new_start + at.unwrap_or(len), data);
341
342        let mut region = region_lock.write();
343        let mut layout = self.layout.write();
344        layout.move_region(new_start, region_index, &region)?;
345        assert!(layout.reserved(new_start) == Some(new_reserved));
346        drop(layout);
347
348        region.set_start(new_start);
349        region.set_reserved(new_reserved);
350        region.set_len(new_len);
351        regions.write_to_mmap(&region, region_index);
352
353        Ok(())
354    }
355
356    fn write(&self, at: u64, data: &[u8]) {
357        let mmap = self.mmap.read();
358
359        let data_len = data.len();
360        let start = at as usize;
361        let end = start + data_len;
362
363        if end > mmap.len() {
364            unreachable!("Trying to write beyond mmap")
365        }
366
367        let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) };
368
369        slice[start..end].copy_from_slice(data);
370    }
371
372    ///
373    /// From relative to start
374    ///
375    /// Non destructive
376    ///
377    pub fn truncate_region(&self, identifier: Identifier, from: u64) -> Result<()> {
378        let Some(region) = self.regions.read().get_region(identifier.clone()) else {
379            return Err(Error::Str("Unknown region"));
380        };
381        let mut region_ = region.write();
382        let len = region_.len();
383        if from == len {
384            return Ok(());
385        } else if from > len {
386            return Err(Error::Str("Truncating further than length"));
387        }
388        region_.set_len(from);
389        Ok(())
390    }
391
392    pub fn remove_region(&self, identifier: Identifier) -> Result<Option<Arc<RwLock<Region>>>> {
393        let mut regions = self.regions.write();
394
395        let mut layout = self.layout.write();
396
397        let index_opt = regions.identifier_to_index(identifier.clone());
398
399        let Some(region) = regions.remove_region(identifier)? else {
400            return Ok(None);
401        };
402
403        let index = index_opt.unwrap();
404
405        let region_ = region.write();
406
407        layout.remove_region(index, &region_)?;
408
409        drop(region_);
410
411        Ok(Some(region))
412    }
413
414    fn create_mmap(file: &File) -> Result<MmapMut> {
415        Ok(unsafe { MmapOptions::new().map_mut(file)? })
416    }
417
418    pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
419        self.regions.read()
420    }
421
422    pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
423        self.layout.read()
424    }
425
426    pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
427        self.mmap.read()
428    }
429
430    fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
431        (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
432    }
433
434    fn data_path(&self) -> PathBuf {
435        Self::data_path_(&self.path)
436    }
437    fn data_path_(path: &Path) -> PathBuf {
438        path.join("data")
439    }
440
441    pub fn disk_usage(&self) -> String {
442        let path = self.data_path();
443
444        let output = std::process::Command::new("du")
445            .arg("-h")
446            .arg(&path)
447            .output()
448            .expect("Failed to run du");
449
450        String::from_utf8_lossy(&output.stdout)
451            .replace(path.to_str().unwrap(), " ")
452            .trim()
453            .to_string()
454    }
455
456    pub fn flush(&self) -> Result<()> {
457        let mmap = self.mmap.read();
458        let regions = self.regions.read();
459        mmap.flush()?;
460        regions.flush()
461    }
462
463    pub fn flush_then_punch(&self) -> Result<()> {
464        self.flush()?;
465        self.punch_holes()
466    }
467
468    pub fn punch_holes(&self) -> Result<()> {
469        let file = self.file.write();
470        let mut mmap = self.mmap.write();
471        let regions = self.regions.read();
472        let layout = self.layout.read();
473
474        let mut punched = regions
475            .index_to_region()
476            .par_iter()
477            .flatten()
478            .map(|region_lock| -> Result<usize> {
479                let region = region_lock.read();
480                let rstart = region.start();
481                let len = region.len();
482                let reserved = region.reserved();
483                let ceil_len = Self::ceil_number_to_page_size_multiple(len);
484                assert!(len <= ceil_len);
485                if ceil_len > reserved {
486                    panic!()
487                } else if ceil_len < reserved {
488                    let start = rstart + ceil_len;
489                    let hole = reserved - ceil_len;
490                    if Self::approx_has_punchable_data(&mmap, start, hole) {
491                        // info!(
492                        //     "dbg: {:?}",
493                        //     (region, rstart, len, ceil_len, reserved, start, hole)
494                        // );
495                        // info!("Punching a hole of {hole} bytes at {start}...");
496                        Self::punch_hole(&file, start, hole)?;
497                        return Ok(1);
498                    }
499                }
500                Ok(0)
501            })
502            .sum::<Result<usize>>()?;
503
504        punched += layout
505            .start_to_hole()
506            .par_iter()
507            .map(|(&start, &hole)| -> Result<usize> {
508                if Self::approx_has_punchable_data(&mmap, start, hole) {
509                    // info!("dbg: {:?}", (start, hole));
510                    // info!("Punching a hole of {hole} bytes at {start}...");
511                    Self::punch_hole(&file, start, hole)?;
512                    Ok(1)
513                } else {
514                    Ok(0)
515                }
516            })
517            .sum::<Result<usize>>()?;
518
519        if punched > 0 {
520            unsafe {
521                libc::fsync(file.as_raw_fd());
522            }
523            // info!("Remaping post hole punching...");
524            *mmap = Self::create_mmap(&file)?;
525        }
526
527        Ok(())
528    }
529
530    fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
531        assert!(start % PAGE_SIZE == 0);
532        assert!(len % PAGE_SIZE == 0);
533
534        let min = start as usize;
535        let max = (start + len) as usize;
536        let check = |start, end| {
537            assert!(start >= min);
538            assert!(end < max);
539            let start_is_some = mmap[start] != 0;
540            // if start_is_some {
541            // info!("mmap[start = {}] = {}", start, mmap[start])
542            // }
543            let end_is_some = mmap[end] != 0;
544            // if end_is_some {
545            // info!("mmap[end = {}] = {}", end, mmap[end])
546            // }
547            start_is_some || end_is_some
548        };
549
550        // Check first page (first and last byte)
551        let first_page_start = start as usize;
552        let first_page_end = (start + PAGE_SIZE - 1) as usize;
553        if check(first_page_start, first_page_end) {
554            return true;
555        }
556
557        // Check last page (first and last byte)
558        let last_page_start = (start + len - PAGE_SIZE) as usize;
559        let last_page_end = (start + len - 1) as usize;
560        if check(last_page_start, last_page_end) {
561            return true;
562        }
563
564        // For large lengths, check at 1GB intervals
565        if len > GB {
566            let num_gb_checks = (len / GB) as usize;
567            for i in 1..num_gb_checks {
568                let gb_boundary = start + (i as u64 * GB);
569                let page_start = gb_boundary as usize;
570                let page_end = (gb_boundary + PAGE_SIZE - 1) as usize;
571
572                if check(page_start, page_end) {
573                    return true;
574                }
575            }
576        }
577
578        false
579    }
580
581    #[cfg(target_os = "macos")]
582    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
583        let fpunchhole = FPunchhole {
584            fp_flags: 0,
585            reserved: 0,
586            fp_offset: start as libc::off_t,
587            fp_length: length as libc::off_t,
588        };
589
590        let result = unsafe {
591            libc::fcntl(
592                file.as_raw_fd(),
593                libc::F_PUNCHHOLE,
594                &fpunchhole as *const FPunchhole,
595            )
596        };
597
598        if result == -1 {
599            let err = std::io::Error::last_os_error();
600            return Err(Error::String(format!("Failed to punch hole: {err}")));
601        }
602
603        Ok(())
604    }
605
606    #[cfg(target_os = "linux")]
607    fn punch_hole(file: &File, start: u64, length: u64) -> Result<()> {
608        let result = unsafe {
609            libc::fallocate(
610                file.as_raw_fd(),
611                libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
612                start as libc::off_t,
613                length as libc::off_t,
614            )
615        };
616
617        if result == -1 {
618            let err = std::io::Error::last_os_error();
619            return Err(Error::String(format!("Failed to punch hole: {err}")));
620        }
621
622        Ok(())
623    }
624
625    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
626    fn punch_hole(_file: &File, _start: u64, _length: u64) -> Result<()> {
627        Err(Error::String(
628            "Hole punching not supported on this platform".to_string(),
629        ))
630    }
631}
632
633#[repr(C)]
634struct FPunchhole {
635    fp_flags: u32,
636    reserved: u32,
637    fp_offset: off_t,
638    fp_length: off_t,
639}