1pub mod mapper;
3
4use std::{borrow::Cow, io::{BufWriter, Read, Seek, Write}, ops::Range};
5use crate::errors::Error;
6use mapper::Mapper;
7
8pub type Section<'l> = (Range<u64>, Cow<'l, [u8]>);
9
10#[derive(Debug)]
12pub struct Layer<'l, Stream: Write + Read + Seek> {
13 pub bounds: Option<Range<u64>>,
15 mapper: Mapper<'l>,
17 pub size: u64,
19 pub read_cursor: (u64, usize),
21 stream: Stream,
23}
24
25#[inline]
27fn get_u64(buffer: &[u8], range: Range<usize>) -> Result<u64, Error> {
28 Ok(u64::from_be_bytes(
29 if let Some(Ok(x)) = buffer.get(range).map(|x| x.try_into())
30 { x }
31 else {
32 return Err(Error::DBCorrupt(Box::new(Error::InvalidLayer)));
33 }
34 ))
35}
36
37#[inline]
39fn until_err<T, E>(err: &mut &mut Result<(), E>, item: Result<T, E>) -> Option<T> {
40 match item {
41 Ok(item) => Some(item),
42 Err(e) => {
43 **err = Err(e);
44 None
45 }
46 }
47}
48
49impl<'l, Stream: Write + Read + Seek> Layer<'l, Stream> {
50 #[inline]
51 pub fn new(stream: Stream) -> Self {
52 Self {
53 bounds: None,
54 mapper: Mapper::new(),
55 size: 0,
56 read_cursor: (0, 0),
57 stream,
58 }
59 }
60
61 #[inline]
62 pub fn load(mut stream: Stream) -> Result<Self, Error> {
63 let mut buffer = [0u8; (u64::BITS as usize/8) * 3]; match stream.read_exact(&mut buffer) {
65 Ok(_) => (),
66 Err(_) => return Err(Error::DBCorrupt(Box::new(Error::InvalidLayer))),
67 };
68
69
70 let size = get_u64(&buffer, 0..8)?;
72 let bounds = get_u64(&buffer, 8..16)?..get_u64(&buffer, 16..24)?;
73
74 Ok(Self {
75 bounds: Some(bounds),
76 mapper: Mapper::Disk,
77 size,
78 read_cursor: (0, 0),
79 stream,
80 })
81 }
82
83 #[inline]
85 pub fn check_collisions(&mut self, range: &Range<u64>) -> Result<Box<[Range<u64>]>, Error> {
86 match self.bounds.as_ref() {
88 Some(bounds) => if bounds.end < range.start || range.end < bounds.start { return Ok(Box::new([])) },
89 None => return Ok(Box::new([])),
90 }
91
92 let mut err = Ok(());
93 let out = self.mapper.iter(&mut self.stream, self.size, REWIND_IDX)?
94 .scan(&mut err, until_err) .filter(|(r, _)| range.start < r.end && r.start < range.end)
96 .map(|(r, _)| range.start.max(r.start)..std::cmp::min(range.end, r.end))
97 .collect();
98 err?;
99 Ok(out)
100 }
101
102 #[inline]
104 pub fn check_non_collisions(&self, range: &Range<u64>, collisions: &[Range<u64>]) -> Box<[Range<u64>]> { let mut output = Vec::new();
106 let mut last_end = range.start;
107
108 for r in collisions.iter() {
109 if r.start > last_end {
110 output.push(last_end..r.start);
111 } last_end = r.end;
112 }
113
114 if last_end != range.end {
115 output.push(last_end..range.end);
116 } output.into_boxed_slice()
117 }
118
119 #[inline]
123 pub fn read_unchecked(&mut self, addr: &Range<u64>) -> Result<(Range<usize>, Cow<[u8]>), Error> {
124 let mut err = Ok(());
125 let out = self.mapper.iter(&mut self.stream, self.size, REWIND_IDX)? .scan(&mut err, until_err) .find(|(r, _)| r.start <= addr.start && addr.end <= r.end) .map(|(r, x)| ((addr.start-r.start) as usize..(addr.end-r.start) as usize, x));
129 err?;
130 out
131 .map(Ok)
132 .unwrap_or(Err(Error::OutOfBounds))
133 }
134
135 #[inline]
139 pub fn write_unchecked(&mut self, idx: u64, data: Cow<'l, [u8]>) -> Result<(), Error> {
140 let (mapper, write_cursor) = self.mapper.get_writer()?;
142 let range = idx..idx+data.len() as u64;
143
144 let map_idx = if write_cursor.0 == idx {
146 write_cursor.1
147 } else {
148 mapper
149 .iter()
150 .enumerate()
151 .find(|(_, (r, _))| r.start > idx)
152 .map(|(i, _)| i)
153 .unwrap_or(0) };
155
156 mapper.insert(map_idx, (range.clone(), data));
158 *write_cursor = (range.end, map_idx+1);
159 self.size += range.end - range.start;
160
161 self.bounds = Some(match self.bounds {
163 Some(ref x) => std::cmp::min(x.start, range.start)..std::cmp::max(x.end, range.end),
164 None => range,
165 });
166
167 Ok(())
168 }
169
170 pub fn flush(&mut self) -> Result<(), Error> {
172 const BUFFER_SIZE: usize = 1024 * 1024 * 4; let (bounds, mapper) = if let (Some(b), Mapper::Heap { mapper, .. }) = (&self.bounds, &self.mapper) { (b, mapper) } else { return Ok(()) };
176 let mut file = BufWriter::with_capacity(BUFFER_SIZE, &mut self.stream);
177
178 file.rewind()?;
180
181 file.write_all(&self.size.to_be_bytes())?;
183 file.write_all(&bounds.start.to_be_bytes())?;
184 file.write_all(&bounds.end.to_be_bytes())?;
185
186 for (range, data) in mapper {
188 file.write_all(&range.start.to_be_bytes())?;
189 file.write_all(&range.end.to_be_bytes())?;
190 file.write_all(data)?;
191 }
192
193 file.flush()?;
195 self.mapper = Mapper::Disk;
196
197 Ok(())
198 }
199}
200
201pub const REWIND_IDX: u64 = 8 + 8 + 8;