1use std::mem;
2
3use crate::{
4 medium::{mapping, Bytes, BytesMut, Mapping},
5 storage::{Page, PageId},
6};
7
8pub(crate) struct Chunk<B> {
22 len: Mapping<B, Len>,
23 next: Mapping<B, PageId>,
24 body: B,
25}
26
27type Len = u32;
28
29const LEN_OVERFLOW_FLAG: Len = Len::MAX;
33
34unsafe impl<B> Page<B> for Chunk<B>
35where
36 B: Bytes,
37{
38 #[inline]
39 fn from_bytes(bytes: B) -> mapping::Result<Self> {
40 assert!(
41 bytes.len() < LEN_OVERFLOW_FLAG as usize,
42 "page size is too large"
43 );
44
45 let (len, remaining) = unsafe { Mapping::split(bytes)? };
46 let (next, body) = unsafe { Mapping::split(remaining)? };
47 Ok(Self { len, next, body })
48 }
49}
50
51impl<B> Chunk<B>
52where
53 B: Bytes,
54{
55 #[inline]
57 pub(crate) fn len(&self) -> u32 {
58 if self.is_overflow() {
59 self.body.len() as u32
60 } else {
61 *self.len
62 }
63 }
64
65 #[inline]
67 pub(crate) fn next(&self) -> Option<PageId> {
68 self.is_overflow().then(|| *self.next)
69 }
70
71 #[inline]
73 pub(crate) fn body(&self) -> mapping::Result<&[u8]> {
74 let range = ..self.len() as usize;
75 mapping::check_range(&range, &self.body)?;
76 Ok(&self.body[range])
77 }
78
79 #[inline]
81 fn is_overflow(&self) -> bool {
82 *self.len == LEN_OVERFLOW_FLAG
83 }
84}
85
86impl<B> Chunk<B>
87where
88 B: BytesMut,
89{
90 pub(crate) fn assign<'a>(&mut self, slice: &'a [u8]) -> Option<(&'a [u8], &mut PageId)> {
92 if slice.len() > self.body.len() {
93 *self.len = LEN_OVERFLOW_FLAG;
94
95 let (body, remaining) = slice.split_at(self.body.len());
96 self.body.copy_from_slice(body);
97
98 Some((remaining, &mut self.next))
99 } else {
100 *self.len = slice.len() as u32;
101
102 self.body[..slice.len()].copy_from_slice(slice);
103 None
104 }
105 }
106}
107
108impl Chunk<()> {
109 pub(crate) fn read<'a, F>(id: PageId, mut obtain: F) -> mapping::Result<Vec<u8>>
111 where
112 F: FnMut(PageId) -> mapping::Result<Chunk<&'a [u8]>>,
113 {
114 let (mut res, mut next_id) = (Vec::new(), Some(id));
115
116 while let Some(id) = next_id {
117 let chunk = obtain(id)?;
118 res.extend_from_slice(chunk.body()?);
119 next_id = chunk.next();
120 }
121
122 Ok(res)
123 }
124
125 pub(crate) fn write<'a, F>(mut slice: &[u8], mut alloc: F) -> mapping::Result<PageId>
127 where
128 F: FnMut() -> mapping::Result<(PageId, Chunk<&'a mut [u8]>)>,
129 {
130 let (id, mut chunk) = alloc()?;
131
132 while let Some((remaining, next_id)) = chunk.assign(slice) {
133 (*next_id, chunk) = alloc()?;
134 slice = remaining;
135 }
136
137 Ok(id)
138 }
139
140 pub(crate) fn delete<'a, O, D>(id: PageId, mut obtain: O, mut delete: D) -> mapping::Result<()>
142 where
143 O: FnMut(PageId) -> mapping::Result<Chunk<&'a [u8]>>,
144 D: FnMut(PageId) -> mapping::Result<()>,
145 {
146 let mut next_id = Some(id);
147
148 while let Some(id) = next_id {
149 let chunk = obtain(id)?;
150 next_id = chunk.next();
151 delete(id)?;
152 }
153
154 Ok(())
155 }
156
157 #[inline]
159 pub(crate) fn count(len: u32, page_size: u32) -> u32 {
160 let capacity = page_size - mem::size_of::<Len>() as u32 - mem::size_of::<PageId>() as u32;
161 (len - 1) / capacity + 1
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use std::collections::{HashMap, HashSet};
168
169 use super::Chunk;
170 use crate::{
171 medium::{mapping::Result, mempool::MemoryPool},
172 storage::{Page, PageId},
173 };
174
175 #[test]
176 fn test_chunk() -> Result<()> {
177 const PAGE_SIZE: usize = 20;
178
179 let mut id = PageId::from_raw(0);
180 let mut pages = HashMap::new();
181 let pool = MemoryPool::new(PAGE_SIZE, 0);
182
183 let bytes = b"ThetaDB is suitable for use on mobile clients with \"High-Read, Low-Write\" demands, it uses B+ Tree as the foundational layer for index management.";
184
185 let id = Chunk::write(bytes.as_ref(), || {
187 let id = id.incr();
188 let cell = pages.entry(id).or_insert(pool.obtain_cell());
189 let chunk = Chunk::from_bytes(unsafe { cell.as_mut_slice() })?;
190 Ok((id, chunk))
191 })?;
192
193 assert_eq!(
194 Chunk::count(bytes.len() as u32, PAGE_SIZE as u32),
195 pages.len() as u32
196 );
197
198 let res = Chunk::read(id, |id| {
200 let cell = pages.get(&id).unwrap();
201 Chunk::from_bytes(unsafe { cell.as_slice() })
202 })?;
203
204 assert_eq!(bytes, res.as_slice());
205
206 let mut page_ids = HashSet::new();
208
209 Chunk::delete(
210 id,
211 |id| {
212 let cell = pages.get(&id).unwrap();
213 Chunk::from_bytes(unsafe { cell.as_slice() })
214 },
215 |id| {
216 page_ids.insert(id);
217 Ok(())
218 },
219 )?;
220
221 assert_eq!(pages.into_keys().collect::<HashSet<_>>(), page_ids);
222
223 Ok(())
224 }
225}