bindb/storage/
dynamic.rs

1use std::{fs::File, marker::PhantomData, path::{Path, PathBuf}, pin::pin};
2use binbuf::{bytes_ptr, fixed::BufPartialEq, BytesPtr, Entry, Fixed as _};
3use super::OpenMode;
4
5pub use {entry_id::Value as EntryId, header::Value as Header};
6use memmap2::{Mmap, MmapMut, MmapOptions};
7
8pub mod entry_id;
9pub mod header;
10
11binbuf::fixed! {
12    #[derive(Clone)]
13    pub struct FreeLocation {
14        #[lens(buf_start)]
15        start: u64,
16        #[lens(buf_end)]
17        end: u64,
18    }
19
20    buf! { pub struct FreeLocationBuf<P>(FreeLocation, P); }
21    impl I for FreeLocation {
22        type Buf<P> = FreeLocationBuf<P>;
23    }
24    impl Code for FreeLocation {}
25}
26
27impl<P: BytesPtr> FreeLocationBuf<P> {
28    fn start(self) -> binbuf::Buf<u64, P> { FreeLocation::buf_start(self) }
29    fn end(self) -> binbuf::Buf<u64, P> { FreeLocation::buf_end(self) }
30}
31
32#[derive(Debug)]
33pub enum AddError {
34    Io(std::io::Error),
35    FixedSwapRemove(super::fixed::SwapRemoveError),
36}
37
38#[derive(Debug)]
39pub enum RemoveError {
40    Io(std::io::Error),
41    FixedSwapRemove(super::fixed::SwapRemoveError),
42    FixedAdd(super::fixed::AddError),
43}
44
45#[derive(Debug)]
46pub enum OpenError {
47    Io(std::io::Error),
48    FixedOpen(super::fixed::OpenError)
49}
50
51pub struct OpenFiles {
52    pub entries: File,
53    pub free_locations: File,
54}
55
56pub struct OpenMaxMargins {
57    pub entries: u64,
58    pub free_locations: u64,
59}
60
61
62pub struct OpenConfig {
63    pub mode: OpenMode,
64    pub files: OpenFiles,
65    pub max_margins: OpenMaxMargins,
66}
67
68pub struct Value<E> {
69    len: u64,
70    bytes_len: u64,
71    free_locations: super::Fixed<FreeLocation>,
72    entries_file: File,
73    entries_mmap: MmapMut,
74    margin: u64,
75    max_margin: u64,
76    _marker: PhantomData<fn() -> E>
77}
78
79impl<E: binbuf::Dynamic> Value<E> {
80    pub unsafe fn open(OpenConfig { mode, files, max_margins }: OpenConfig) -> Result<Self, OpenError> {
81        if let OpenMode::New = mode {
82            files.entries.set_len(Header::LEN as u64).map_err(OpenError::Io)?;
83        }
84        let mut entries_mmap = MmapMut::map_mut(&files.entries).map_err(OpenError::Io)?;
85        let header = match mode {
86            OpenMode::Existing => binbuf::fixed::decode::<Header, _>(
87                Header::buf(bytes_ptr::Const::new(entries_mmap[0 .. Header::LEN].as_ptr(), Header::LEN))
88            ),
89            OpenMode::New => unsafe {
90                binbuf::fixed::encode_ptr(
91                    bytes_ptr::Mut::from_slice(&mut entries_mmap[0 .. Header::LEN]),
92                    &Header { len: 0, bytes_len: 0 }
93                );
94                Header { len: 0, bytes_len: 0 }
95            }
96        };
97        Ok(Self {
98            len: header.len,
99            bytes_len: header.bytes_len,
100            free_locations: super::Fixed::open(mode, files.free_locations, max_margins.free_locations).map_err(OpenError::FixedOpen)?,
101            entries_file: files.entries,
102            entries_mmap,
103            margin: 0,
104            max_margin: max_margins.entries,
105            _marker: PhantomData
106        })
107    }
108
109    fn entry_offset(&self, id: EntryId) -> usize {
110        Header::LEN + id.0 as usize
111    }
112
113    fn header_buf(&self) -> binbuf::BufConst<Header> {
114        let ptr = unsafe { bytes_ptr::Const::from_slice(&self.entries_mmap[0 .. Header::LEN]) };
115        unsafe { Header::buf(ptr) }
116    }
117
118    fn header_buf_mut(&mut self) -> binbuf::BufMut<Header> {
119        let ptr = unsafe { bytes_ptr::Mut::from_slice(&mut self.entries_mmap[0 .. Header::LEN]) };
120        unsafe { Header::buf(ptr) }
121    }
122
123    fn set_bytes_len(&mut self, value: u64) {
124        self.bytes_len = value;
125        value.encode(Header::buf_bytes_len(self.header_buf_mut()));
126    }
127
128    // Doesn't check if id is valid. It's impossible to check that.
129    // Id may be pointing to garbage.
130    pub unsafe fn buf_unchecked(&self, id: EntryId) -> binbuf::BufConst<E> {
131        let ptr = bytes_ptr::Const::from_slice(
132            self.entries_mmap.get_unchecked(self.entry_offset(id) ..)
133        );
134        E::buf(ptr)
135    }
136
137    pub unsafe fn buf_mut_unchecked(&mut self, id: EntryId) -> binbuf::BufMut<E> {
138        let offset = self.entry_offset(id);
139        let ptr = bytes_ptr::Mut::from_slice(
140            self.entries_mmap.get_unchecked_mut(offset ..)
141        );
142        E::buf(ptr)
143    }
144
145    pub fn add(&mut self, entry: impl binbuf::dynamic::Readable<E>) -> Result<EntryId, AddError> {
146        let entry_len = entry.len();
147        let entry_len_u64 = entry_len as u64;
148        for loc_id in self.free_locations.all_ids() {
149            let (loc, _) = binbuf::dynamic::decode::<FreeLocation>(unsafe { self.free_locations.buf_unchecked(loc_id) });
150            let loc_len = loc.end - loc.start;
151            if loc_len >= entry_len_u64 {
152                let entry_id = EntryId(loc.start);
153                let buf = unsafe { self.buf_mut_unchecked(entry_id) };
154                let written_len = entry.write_to(buf);
155                debug_assert_eq!(written_len, entry_len);
156                let left_len = loc_len - entry_len_u64;
157                if left_len == 0 {
158                    unsafe { self.free_locations.swap_remove(loc_id).map_err(AddError::FixedSwapRemove) }?;
159                } else {
160                    self.free_locations.set(loc_id, &FreeLocation {
161                        start: loc.start + entry_len_u64,
162                        end: loc.end,
163                    });
164                }
165                return Ok(entry_id);
166            }
167        }
168
169        if self.margin < entry_len_u64 {
170            let margin_extra = self.margin + ((entry_len_u64 - self.margin) / self.max_margin + 1) * self.max_margin;
171            let new_len = self.entry_offset(
172                EntryId(self.bytes_len + margin_extra)
173            );
174            self.entries_file.set_len(new_len as u64).map_err(AddError::Io)?;
175            self.entries_mmap = unsafe { MmapOptions::new().len(new_len).map_mut(&self.entries_file).map_err(AddError::Io)? };
176
177            let entry_id = EntryId(self.bytes_len);
178            let written_len = entry.write_to(unsafe { self.buf_mut_unchecked(entry_id) });
179            debug_assert_eq!(written_len, entry_len);
180            self.set_bytes_len(self.bytes_len + entry_len_u64);
181            self.margin = margin_extra - entry_len_u64;
182            Ok(entry_id)
183
184        } else {
185            let entry_id = EntryId(self.bytes_len);
186            let written_len = entry.write_to(unsafe { self.buf_mut_unchecked(entry_id) });
187            debug_assert_eq!(written_len, entry_len);
188            self.set_bytes_len(self.bytes_len + entry_len_u64);
189            self.margin -= entry_len_u64;
190            Ok(entry_id)
191        }
192    }
193
194    pub fn free_locations_len(&self) -> u64 {
195        self.free_locations.len()
196    }
197
198    pub unsafe fn remove(&mut self, id: EntryId) -> Result<(), RemoveError> {
199        let entry_len = binbuf::dynamic::buf_len::<E>(self.buf_unchecked(id));
200        let entry_len_u64 = entry_len as u64;
201        let mut entry_loc_store = pin!([0; FreeLocation::LEN]);
202        let entry_loc = FreeLocation { start: id.0, end: id.0 + entry_len_u64 };
203
204        let entry_loc_buf = binbuf::entry::buf_mut_from_slice::<FreeLocation>(&mut *entry_loc_store);
205        entry_loc.encode(entry_loc_buf);
206
207        let is_last_entry = entry_loc.end == self.bytes_len;
208
209        let mut loc_expanded = (false, is_last_entry);
210        let mut loc_id = 0u64;
211        
212        loop {
213            if loc_id >= self.free_locations.len() {
214                if !loc_expanded.0 && !loc_expanded.1 {
215                }
216                break;
217            }
218            let loc_buf = self.free_locations.buf_unchecked(loc_id);
219
220            if !loc_expanded.0 && binbuf::fixed::decode::<u64, _>(loc_buf.end()).buf_eq(
221                binbuf::fixed::buf_to_const::<u64, _>(entry_loc_buf.start())
222            ) {
223                binbuf::fixed::buf_copy_to::<u64>(loc_buf.start(), entry_loc_buf.start());
224                self.free_locations.swap_remove(loc_id).map_err(RemoveError::FixedSwapRemove)?;
225                loc_expanded.0 = true;
226
227            } else if !loc_expanded.1 && binbuf::fixed::decode::<u64, _>(loc_buf.start()).buf_eq(
228                binbuf::fixed::buf_to_const::<u64, _>(entry_loc_buf.end())
229            ) {
230                binbuf::fixed::buf_copy_to::<u64>(loc_buf.end(), entry_loc_buf.end());
231                self.free_locations.swap_remove(loc_id).map_err(RemoveError::FixedSwapRemove)?;
232                loc_expanded.1 = true;
233
234            } else {
235                loc_id += 1;
236            }
237
238            if loc_expanded.0 && loc_expanded.1 {
239                break;
240            }
241        }
242
243        if is_last_entry {
244            let size_dec = binbuf::fixed::decode::<u64, _>(entry_loc_buf.end())
245                - binbuf::fixed::decode::<u64, _>(entry_loc_buf.start());
246            self.set_bytes_len(self.bytes_len - size_dec);
247            self.margin += size_dec;
248            if self.margin >= self.max_margin {
249                let new_len = self.entry_offset(EntryId(self.bytes_len + self.margin % self.max_margin));
250                self.entries_file.set_len(new_len as u64).map_err(RemoveError::Io)?;
251                self.entries_mmap = unsafe { MmapOptions::new().len(new_len).map_mut(&self.entries_file).map_err(RemoveError::Io)? };
252                self.margin = self.margin % self.max_margin;
253            }
254        } else {
255            self.free_locations.add(entry_loc_buf).map_err(RemoveError::FixedAdd)?;
256        }
257        drop(entry_loc_store);
258        Ok(())
259    }
260}
261
262impl<E: binbuf::dynamic::Decode> Value<E> {
263    // Make sure ID is valid!
264    pub unsafe fn get(&self, id: EntryId) -> E {
265        E::decode(self.buf_unchecked(id)).0
266    }
267}