stack_db/base/
layer.rs

1//! A layer/frame of which gets *stacked* to form the database
2pub mod mapper;
3
4use std::{borrow::Cow, io::{BufWriter, Read, Seek, Write}, ops::Range};
5use crate::errors::Error;
6use mapper::Mapper;
7
8pub type Section<'l> = (Range<u64>, Cow<'l, [u8]>);
9
10/// Represents a layer (either in the heap or disk) in the stack-db that *stacks*
11#[derive(Debug)]
12pub struct Layer<'l, Stream: Write + Read + Seek> {
13    /// The bounds of the layer; the range of the layer
14    pub bounds: Option<Range<u64>>,
15    /// The mapper that maps to either the heap or disk
16    mapper: Mapper<'l>,
17    /// The total size of all the writes in the layer
18    pub size: u64,
19    /// The current read cursor to speed up sequential reads
20    pub read_cursor: (u64, usize),
21    /// The underlying file reader/writer
22    stream: Stream,
23}
24
25/// Grabs a u64 from a buffer
26#[inline]
27fn get_u64(buffer: &[u8], range: Range<usize>) -> Result<u64, Error> {
28    Ok(u64::from_be_bytes(
29        if let Some(Ok(x)) = buffer.get(range).map(|x| x.try_into())
30            { x }
31        else { 
32            return Err(Error::DBCorrupt(Box::new(Error::InvalidLayer)));
33        }
34    ))
35}
36
37/// used for error handling in iterators
38#[inline]
39fn until_err<T, E>(err: &mut &mut Result<(), E>, item: Result<T, E>) -> Option<T> {
40    match item {
41        Ok(item) => Some(item),
42        Err(e) => {
43            **err = Err(e);
44            None
45        }
46    }
47}
48
49impl<'l,  Stream: Write + Read + Seek> Layer<'l, Stream> {
50    #[inline]
51    pub fn new(stream: Stream) -> Self {
52        Self {
53            bounds: None,
54            mapper: Mapper::new(),
55            size: 0,
56            read_cursor: (0, 0),
57            stream,
58        }
59    }
60
61    #[inline]
62    pub fn load(mut stream: Stream) -> Result<Self, Error> {
63        let mut buffer = [0u8; (u64::BITS as usize/8) * 3]; // buffer for three `u64` values: `size`, `bounds.start`, `bounds.end`
64        match stream.read_exact(&mut buffer) {
65            Ok(_) => (),
66            Err(_) => return Err(Error::DBCorrupt(Box::new(Error::InvalidLayer))),
67        };
68
69
70        // read metadata; return corruption error if failure
71        let size = get_u64(&buffer, 0..8)?;
72        let bounds = get_u64(&buffer, 8..16)?..get_u64(&buffer, 16..24)?;
73
74        Ok(Self {
75            bounds: Some(bounds),
76            mapper: Mapper::Disk,
77            size,
78            read_cursor: (0, 0),
79            stream,
80        })
81    }
82
83    /// Checks for collisions on the current layer
84    #[inline]
85    pub fn check_collisions(&mut self, range: &Range<u64>) -> Result<Box<[Range<u64>]>, Error> {
86        // if range not even in bounds or layer empty; return 
87        match self.bounds.as_ref() {
88            Some(bounds) => if bounds.end < range.start || range.end < bounds.start { return Ok(Box::new([])) },
89            None => return Ok(Box::new([])),
90        }
91        
92        let mut err = Ok(());
93        let out = self.mapper.iter(&mut self.stream, self.size, REWIND_IDX)?
94            .scan(&mut err, until_err) // handles the errors
95            .filter(|(r, _)| range.start < r.end && r.start < range.end)
96            .map(|(r, _)| range.start.max(r.start)..std::cmp::min(range.end, r.end))
97            .collect();
98        err?;
99        Ok(out)
100    }
101
102    /// Takes in the **ordered** output of the `check_collisions` function to find the inverse
103    #[inline]
104    pub fn check_non_collisions(&self, range: &Range<u64>, collisions: &[Range<u64>]) -> Box<[Range<u64>]> { // find a better purely functional solution
105        let mut output = Vec::new();
106        let mut last_end = range.start;
107
108        for r in collisions.iter() {
109            if r.start > last_end {
110                output.push(last_end..r.start);
111            } last_end = r.end;
112        }
113
114        if last_end != range.end {
115            output.push(last_end..range.end);
116        } output.into_boxed_slice()
117    }
118
119    /// Reads from the layer unchecked and returns the section data and the desired relative range within the section.
120    ///
121    /// **warning:** will throw `out-of-bounds` error (or undefined behaviour) if the read is accross two sections *(each read can only be on one section of a layer)*
122    #[inline]
123    pub fn read_unchecked(&mut self, addr: &Range<u64>) -> Result<(Range<usize>, Cow<[u8]>), Error> {
124        let mut err = Ok(());
125        let out = self.mapper.iter(&mut self.stream, self.size, REWIND_IDX)? // todo: Actually use the read-cursor so that you don't have to iterate through everything to get to where you want
126            .scan(&mut err, until_err) // handles errors
127            .find(|(r, _)| r.start <= addr.start && addr.end <= r.end) // read must be equal to or within layer section
128            .map(|(r, x)| ((addr.start-r.start) as usize..(addr.end-r.start) as usize, x));
129        err?;
130        out
131            .map(Ok)
132            .unwrap_or(Err(Error::OutOfBounds))
133    }
134
135    /// Writes to the heap layer without checking for collisions
136    ///
137    /// **WARNING:** the layer will be corrupt (due to undefined behaviour) if there are any collisions; this function is meant to be used internally
138    #[inline]
139    pub fn write_unchecked(&mut self, idx: u64, data: Cow<'l, [u8]>) -> Result<(), Error> {
140        // cannot write on read-only
141        let (mapper, write_cursor) = self.mapper.get_writer()?;
142        let range = idx..idx+data.len() as u64;
143
144        // get the idx ni the map to insert to
145        let map_idx = if write_cursor.0 == idx {
146            write_cursor.1
147        } else {
148            mapper
149                .iter()
150                .enumerate()
151                .find(|(_, (r, _))| r.start > idx)
152                .map(|(i, _)| i)
153                .unwrap_or(0) // if map is empty write to the first index
154        };
155
156        // insert data into the map and update write cursor & size
157        mapper.insert(map_idx, (range.clone(), data));
158        *write_cursor = (range.end, map_idx+1);
159        self.size += range.end - range.start;
160
161        // Update bounds
162        self.bounds = Some(match self.bounds {
163            Some(ref x) => std::cmp::min(x.start, range.start)..std::cmp::max(x.end, range.end),
164            None => range,
165        });
166
167        Ok(())
168    }
169
170    /// Moves the layer from the **heap** to **disk**
171    pub fn flush(&mut self) -> Result<(), Error> {
172        const BUFFER_SIZE: usize = 1024 * 1024 * 4; // 4MiB buffer size
173        
174        // don't flush if it's an empty layer or in read-only mode
175        let (bounds, mapper) = if let (Some(b), Mapper::Heap { mapper, .. }) = (&self.bounds, &self.mapper) { (b, mapper) } else {  return Ok(()) };
176        let mut file = BufWriter::with_capacity(BUFFER_SIZE, &mut self.stream);
177
178        // write from the start
179        file.rewind()?;
180
181        // write the bounds & size of the layer
182        file.write_all(&self.size.to_be_bytes())?;
183        file.write_all(&bounds.start.to_be_bytes())?;
184        file.write_all(&bounds.end.to_be_bytes())?;
185
186        // we assume that the map is already sorted
187        for (range, data) in mapper {
188            file.write_all(&range.start.to_be_bytes())?;
189            file.write_all(&range.end.to_be_bytes())?;
190            file.write_all(data)?;
191        }
192
193        // flush file and switch to disk layer
194        file.flush()?;
195        self.mapper = Mapper::Disk;
196        
197        Ok(())
198    }
199}
200
201pub const REWIND_IDX: u64 = 8 + 8 + 8; // skip the `u64`s: `layer_size`, `layer_bound.start` and `layer_bound.end`