stack_db/base/
database.rs

1//! The user-facing interface for interacting with multiple layers at once
2
3use std::{borrow::Cow, ops::Range};
4use crate::errors::Error;
5use self::allocator::Allocator;
6use super::layer::Layer;
7pub mod allocator;
8
9#[derive(Debug)]
10pub struct StackDB<'l, A: Allocator<'l>> {
11    /// The layer allocator for the database
12    alloc: A,
13    /// If there is a heap layer or not
14    heap_layer: bool,
15    /// The actual layers in the database
16    layers: Vec<Layer<'l, A::LayerStream>>,
17}
18
19impl<'l, A: Allocator<'l>> StackDB<'l, A> {
20    /// creates a database interface; either loads an existing db or creates a new one.
21    #[inline]
22    pub fn new(alloc: A) -> Result<Self, Error> {
23        Ok(Self {
24            heap_layer: false,
25            layers: alloc.load_layers()?,
26            alloc,
27        })
28    }
29
30    /// Either grabs the heap layer or creates a new one
31    #[inline]
32    fn get_heap_layer(&mut self) -> Result<&mut Layer<'l, A::LayerStream>, Error> {
33        if self.heap_layer {
34            return Ok(self.layers.last_mut().unwrap());
35        }
36
37        self.layers.push(self.alloc.add_layer()?);
38        self.heap_layer = true;
39        self.get_heap_layer()
40    }
41
42    /// Reads data from either the heap or disk layers
43    #[inline]
44    pub fn read(&mut self, addr: Range<u64>) -> Result<Box<[u8]>, Error> {
45        let mut data = vec![0u8; (addr.end-addr.start) as usize].into_boxed_slice();
46        let mut missing: Vec<Range<u64>> = vec![addr.clone()]; // data that hasn't been read yet
47
48        #[inline]
49        fn write_into(data: &[u8], out: &mut [u8]) {
50            data.iter()
51                .enumerate()
52                .for_each(|(i, b)| out[i] = *b);
53        }
54
55        for layer in self.layers.iter_mut().rev() {
56            if missing.is_empty() { break };
57            let mut collisions = Vec::new();
58            let mut non_collisions = Vec::new();
59
60            // find the parts of the range that belong to the layer's sections
61            for miss in missing.iter() {
62                let mut miss_collisions = layer.check_collisions(miss)?;
63                miss_collisions.sort_unstable_by_key(|r| r.start); // for later: fix the collisions function so that it's automatically sorted.
64
65                non_collisions.append(&mut layer.check_non_collisions(miss, &miss_collisions).into_vec());
66                collisions.append(&mut miss_collisions.into_vec());
67            } missing = non_collisions;
68
69            // actually read the values
70            for range in collisions.iter() {
71                let read = layer.read_unchecked(range)?;
72                write_into(&read.1[read.0], &mut data[(range.start-addr.start) as usize..(range.end-addr.start) as usize]);
73            }
74        }
75
76        if !missing.is_empty() { return Err(Error::OutOfBounds) } // note: otherwise it will just return 0s for the areas not covered by layers
77
78        Ok(data)
79    }
80
81    /// Rebases and drops overwritten layers (the database history)
82    /// by compressing all the layers into one to save space
83    ///
84    /// **Warning:** will temporarity double database size
85    #[inline]
86    pub fn rebase(&mut self, buffer_size: u64) -> Result<(), Error> {
87        if self.layers.is_empty() || self.layers.last().unwrap().bounds.is_none() { return Ok(()) }; // do nothing if database is empty
88        self.commit()?;
89        let old_layers = self.layers.len();
90
91        let db_bounds = self.layers.iter()
92            .filter_map(|x| x.bounds.as_ref())
93            .fold((u64::MAX, u64::MIN), |x, y| (std::cmp::min(x.0, y.start), std::cmp::max(x.1, y.end)));
94        let db_bounds = db_bounds.0..db_bounds.1;
95        
96        // Write all the changes into the top layer
97        let mut idx = db_bounds.start;
98        while idx < db_bounds.end {
99            let end = std::cmp::min(db_bounds.end, idx+buffer_size);
100            let buffer = self.read(idx..end)?;
101            self.write(idx, &buffer)?;
102            self.commit()?; // as to not bomb your memory
103            idx = end;
104        }
105
106        // Drop all the other layers
107        self.alloc.rebase(old_layers)?;
108        let mut layers = Vec::with_capacity(self.layers.len()-old_layers);
109        layers.extend(self.layers.drain(old_layers..));
110        self.layers = layers;
111
112        Ok(())
113    }
114
115    /// Writes data to the heap layer (collisions are fine) (`flush` to commit the heap layers to disk)
116    #[inline]
117    pub fn write(&mut self, addr: u64, data: &[u8]) -> Result<(), Error> {
118        let layer = self.get_heap_layer()?;
119        let range = addr..addr + data.len() as u64;
120
121        let r_normal = (range.start-addr)as usize..(range.end-addr)as usize;
122        let mut data = data[r_normal].to_vec();
123        data.shrink_to_fit();
124
125        layer.write_unchecked(range.start, Cow::Owned(data))?;
126
127        Ok(())
128    }
129
130    /// Commits / writes the read-write layer's (on the heap) writes to the database (on the disk); making it read-only
131    #[inline]
132    pub fn commit(&mut self) -> Result<(), Error> {
133        if !self.heap_layer { return Ok(()) };
134
135        let layer = self.layers.last_mut().unwrap();
136        // Don't flush if layer is empty
137        if layer.bounds.is_none() { return Ok(()) };
138        layer.flush()?;
139        self.heap_layer = false;
140
141        Ok(())
142    }
143}