thetadb/
chunk.rs

1use std::mem;
2
3use crate::{
4    medium::{mapping, Bytes, BytesMut, Mapping},
5    storage::{Page, PageId},
6};
7
8/// Represents a chunk of a page chain in ThetaDB.
9///
10/// A page chain is a sequence of pages that are linked together to store
11/// a large piece of data (e.g., Freelist, Overflow page) that cannot fit into a single page.
12///
13/// # Chunk Page Chain
14///
15/// ```plain
16///           ┌────────────────────────┐           ┌────────────────────────┐        
17/// ┌─────┬───┴──┬───────────────────┐ │ ┌─────┬───┴──┬───────────────────┐ │ ┌─────┐
18/// │ Len │ Next │      Payload      │ └▶│ Len │ Next │      Payload      │ └▶│ ••• │
19/// └─────┴──────┴───────────────────┘   └─────┴──────┴───────────────────┘   └─────┘
20/// ```
21pub(crate) struct Chunk<B> {
22    len: Mapping<B, Len>,
23    next: Mapping<B, PageId>,
24    body: B,
25}
26
27type Len = u32;
28
29/// Indicates that a chunk a overflowed, i.e., the length of the data exceeds the capacity
30/// of the chunk. When a chunk is overflowed, the `len` field is set to `LEN_OVERFLOW_FLAG`,
31/// and the remaining data is stored in the next chunk in the chain.
32const LEN_OVERFLOW_FLAG: Len = Len::MAX;
33
34unsafe impl<B> Page<B> for Chunk<B>
35where
36    B: Bytes,
37{
38    #[inline]
39    fn from_bytes(bytes: B) -> mapping::Result<Self> {
40        assert!(
41            bytes.len() < LEN_OVERFLOW_FLAG as usize,
42            "page size is too large"
43        );
44
45        let (len, remaining) = unsafe { Mapping::split(bytes)? };
46        let (next, body) = unsafe { Mapping::split(remaining)? };
47        Ok(Self { len, next, body })
48    }
49}
50
51impl<B> Chunk<B>
52where
53    B: Bytes,
54{
55    /// The length of the chunk.
56    #[inline]
57    pub(crate) fn len(&self) -> u32 {
58        if self.is_overflow() {
59            self.body.len() as u32
60        } else {
61            *self.len
62        }
63    }
64
65    /// The id of the next chunk page in the chain.
66    #[inline]
67    pub(crate) fn next(&self) -> Option<PageId> {
68        self.is_overflow().then(|| *self.next)
69    }
70
71    /// The data of the chunk.
72    #[inline]
73    pub(crate) fn body(&self) -> mapping::Result<&[u8]> {
74        let range = ..self.len() as usize;
75        mapping::check_range(&range, &self.body)?;
76        Ok(&self.body[range])
77    }
78
79    /// Checks if the chunk is overflowed.
80    #[inline]
81    fn is_overflow(&self) -> bool {
82        *self.len == LEN_OVERFLOW_FLAG
83    }
84}
85
86impl<B> Chunk<B>
87where
88    B: BytesMut,
89{
90    /// Assign a slice of bytes to the chunk.
91    pub(crate) fn assign<'a>(&mut self, slice: &'a [u8]) -> Option<(&'a [u8], &mut PageId)> {
92        if slice.len() > self.body.len() {
93            *self.len = LEN_OVERFLOW_FLAG;
94
95            let (body, remaining) = slice.split_at(self.body.len());
96            self.body.copy_from_slice(body);
97
98            Some((remaining, &mut self.next))
99        } else {
100            *self.len = slice.len() as u32;
101
102            self.body[..slice.len()].copy_from_slice(slice);
103            None
104        }
105    }
106}
107
108impl Chunk<()> {
109    /// Reads a page chain into a byte vector.
110    pub(crate) fn read<'a, F>(id: PageId, mut obtain: F) -> mapping::Result<Vec<u8>>
111    where
112        F: FnMut(PageId) -> mapping::Result<Chunk<&'a [u8]>>,
113    {
114        let (mut res, mut next_id) = (Vec::new(), Some(id));
115
116        while let Some(id) = next_id {
117            let chunk = obtain(id)?;
118            res.extend_from_slice(chunk.body()?);
119            next_id = chunk.next();
120        }
121
122        Ok(res)
123    }
124
125    /// Writes a byte slice into a page chain.
126    pub(crate) fn write<'a, F>(mut slice: &[u8], mut alloc: F) -> mapping::Result<PageId>
127    where
128        F: FnMut() -> mapping::Result<(PageId, Chunk<&'a mut [u8]>)>,
129    {
130        let (id, mut chunk) = alloc()?;
131
132        while let Some((remaining, next_id)) = chunk.assign(slice) {
133            (*next_id, chunk) = alloc()?;
134            slice = remaining;
135        }
136
137        Ok(id)
138    }
139
140    /// Deletes a page chain.
141    pub(crate) fn delete<'a, O, D>(id: PageId, mut obtain: O, mut delete: D) -> mapping::Result<()>
142    where
143        O: FnMut(PageId) -> mapping::Result<Chunk<&'a [u8]>>,
144        D: FnMut(PageId) -> mapping::Result<()>,
145    {
146        let mut next_id = Some(id);
147
148        while let Some(id) = next_id {
149            let chunk = obtain(id)?;
150            next_id = chunk.next();
151            delete(id)?;
152        }
153
154        Ok(())
155    }
156
157    /// Counts the number of chunks needed to store a given length of data.
158    #[inline]
159    pub(crate) fn count(len: u32, page_size: u32) -> u32 {
160        let capacity = page_size - mem::size_of::<Len>() as u32 - mem::size_of::<PageId>() as u32;
161        (len - 1) / capacity + 1
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use std::collections::{HashMap, HashSet};
168
169    use super::Chunk;
170    use crate::{
171        medium::{mapping::Result, mempool::MemoryPool},
172        storage::{Page, PageId},
173    };
174
175    #[test]
176    fn test_chunk() -> Result<()> {
177        const PAGE_SIZE: usize = 20;
178
179        let mut id = PageId::from_raw(0);
180        let mut pages = HashMap::new();
181        let pool = MemoryPool::new(PAGE_SIZE, 0);
182
183        let bytes = b"ThetaDB is suitable for use on mobile clients with \"High-Read, Low-Write\" demands, it uses B+ Tree as the foundational layer for index management.";
184
185        // Write
186        let id = Chunk::write(bytes.as_ref(), || {
187            let id = id.incr();
188            let cell = pages.entry(id).or_insert(pool.obtain_cell());
189            let chunk = Chunk::from_bytes(unsafe { cell.as_mut_slice() })?;
190            Ok((id, chunk))
191        })?;
192
193        assert_eq!(
194            Chunk::count(bytes.len() as u32, PAGE_SIZE as u32),
195            pages.len() as u32
196        );
197
198        // Read
199        let res = Chunk::read(id, |id| {
200            let cell = pages.get(&id).unwrap();
201            Chunk::from_bytes(unsafe { cell.as_slice() })
202        })?;
203
204        assert_eq!(bytes, res.as_slice());
205
206        // Delete
207        let mut page_ids = HashSet::new();
208
209        Chunk::delete(
210            id,
211            |id| {
212                let cell = pages.get(&id).unwrap();
213                Chunk::from_bytes(unsafe { cell.as_slice() })
214            },
215            |id| {
216                page_ids.insert(id);
217                Ok(())
218            },
219        )?;
220
221        assert_eq!(pages.into_keys().collect::<HashSet<_>>(), page_ids);
222
223        Ok(())
224    }
225}