reddb_server/storage/engine/
bulk_writer.rs1use std::sync::Arc;
13
14use super::page::{Page, PageType, CONTENT_SIZE, HEADER_SIZE, PAGE_SIZE};
15use super::pager::Pager;
16use crate::storage::schema::Value;
17
18const LEAF_DATA_OFFSET: usize = HEADER_SIZE + 8; const MAX_LEAF_DATA: usize = PAGE_SIZE - LEAF_DATA_OFFSET;
23
24#[inline]
33pub fn serialize_row(values: &[Value]) -> Vec<u8> {
34 let mut buf = Vec::with_capacity(64);
35 buf.push(values.len() as u8);
36
37 for val in values {
38 match val {
39 Value::Null => buf.push(0),
40 Value::Text(s) => {
41 buf.push(1);
42 let bytes = s.as_bytes();
43 buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
44 buf.extend_from_slice(bytes);
45 }
46 Value::Integer(n) => {
47 buf.push(2);
48 buf.extend_from_slice(&n.to_le_bytes());
49 }
50 Value::Float(f) => {
51 buf.push(3);
52 buf.extend_from_slice(&f.to_le_bytes());
53 }
54 Value::Boolean(b) => {
55 buf.push(4);
56 buf.push(*b as u8);
57 }
58 Value::UnsignedInteger(n) => {
59 buf.push(5);
60 buf.extend_from_slice(&n.to_le_bytes());
61 }
62 other => {
64 buf.push(1);
65 let s = format!("{other:?}");
66 let bytes = s.as_bytes();
67 buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
68 buf.extend_from_slice(bytes);
69 }
70 }
71 }
72 buf
73}
74
75pub struct PageBulkWriter {
77 pager: Arc<Pager>,
78 pages: Vec<(u32, Page)>,
80 current: Option<Page>,
82 offset: usize,
84 cell_count: u16,
86 total_rows: u64,
88 next_id: u64,
90}
91
92impl PageBulkWriter {
93 pub fn new(pager: Arc<Pager>, start_id: u64) -> Self {
94 Self {
95 pager,
96 pages: Vec::new(),
97 current: None,
98 offset: LEAF_DATA_OFFSET,
99 cell_count: 0,
100 total_rows: 0,
101 next_id: start_id,
102 }
103 }
104
105 #[inline]
107 pub fn write_row(&mut self, values: &[Value]) -> Result<u64, String> {
108 let id = self.next_id;
109 self.next_id += 1;
110
111 let key = id.to_le_bytes();
112 let value = serialize_row(values);
113
114 let cell_size = 4 + key.len() + value.len();
116
117 if cell_size > MAX_LEAF_DATA {
118 return Err("row too large for page".into());
119 }
120
121 if self.current.is_none() || self.offset + cell_size > PAGE_SIZE {
123 self.seal_current_page()?;
124 self.allocate_new_page()?;
125 }
126
127 let page = self.current.as_mut().unwrap();
129 let data = page.as_bytes_mut();
130
131 data[self.offset..self.offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
132 data[self.offset + 2..self.offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
133 data[self.offset + 4..self.offset + 4 + key.len()].copy_from_slice(&key);
134 data[self.offset + 4 + key.len()..self.offset + 4 + key.len() + value.len()]
135 .copy_from_slice(&value);
136
137 self.offset += cell_size;
138 self.cell_count += 1;
139 self.total_rows += 1;
140
141 Ok(id)
142 }
143
144 #[inline]
147 pub fn write_row_direct(&mut self, values: &[Value]) -> Result<u64, String> {
148 let id = self.next_id;
149 self.next_id += 1;
150
151 let estimated_size = 4 + 8 + 1 + values.len() * 12; if self.current.is_none() || self.offset + estimated_size > PAGE_SIZE {
154 self.seal_current_page()?;
155 self.allocate_new_page()?;
156 }
157
158 let page = self.current.as_mut().unwrap();
159 let data = page.as_bytes_mut();
160
161 let header_pos = self.offset;
163 let key_start = header_pos + 4;
164
165 data[key_start..key_start + 8].copy_from_slice(&id.to_le_bytes());
167 let mut pos = key_start + 8;
168
169 data[pos] = values.len() as u8;
171 pos += 1;
172
173 for val in values {
175 if pos >= PAGE_SIZE - 16 {
176 self.offset = header_pos; self.seal_current_page()?;
180 self.allocate_new_page()?;
181 return self.write_row(values);
183 }
184 match val {
185 Value::Null => {
186 data[pos] = 0;
187 pos += 1;
188 }
189 Value::Text(s) => {
190 let b = s.as_bytes();
191 data[pos] = 1;
192 pos += 1;
193 data[pos..pos + 2].copy_from_slice(&(b.len() as u16).to_le_bytes());
194 pos += 2;
195 if pos + b.len() < PAGE_SIZE {
196 data[pos..pos + b.len()].copy_from_slice(b);
197 pos += b.len();
198 }
199 }
200 Value::Integer(n) => {
201 data[pos] = 2;
202 pos += 1;
203 data[pos..pos + 8].copy_from_slice(&n.to_le_bytes());
204 pos += 8;
205 }
206 Value::Float(f) => {
207 data[pos] = 3;
208 pos += 1;
209 data[pos..pos + 8].copy_from_slice(&f.to_le_bytes());
210 pos += 8;
211 }
212 Value::Boolean(b) => {
213 data[pos] = 4;
214 pos += 1;
215 data[pos] = *b as u8;
216 pos += 1;
217 }
218 Value::UnsignedInteger(n) => {
219 data[pos] = 5;
220 pos += 1;
221 data[pos..pos + 8].copy_from_slice(&n.to_le_bytes());
222 pos += 8;
223 }
224 _ => {
225 data[pos] = 0;
226 pos += 1; }
228 }
229 }
230
231 let val_len = (pos - key_start - 8) as u16;
233 data[header_pos..header_pos + 2].copy_from_slice(&8u16.to_le_bytes()); data[header_pos + 2..header_pos + 4].copy_from_slice(&val_len.to_le_bytes());
235
236 self.offset = pos;
237 self.cell_count += 1;
238 self.total_rows += 1;
239
240 Ok(id)
241 }
242
243 pub fn write_rows(&mut self, rows: &[Vec<Value>]) -> Result<Vec<u64>, String> {
245 let mut ids = Vec::with_capacity(rows.len());
246 for row in rows {
247 ids.push(self.write_row(row)?);
248 }
249 Ok(ids)
250 }
251
252 pub fn finish(mut self) -> Result<BulkWriteResult, String> {
254 self.seal_current_page()?;
255
256 if self.pages.is_empty() {
257 return Ok(BulkWriteResult {
258 total_rows: 0,
259 total_pages: 0,
260 first_page_id: 0,
261 first_entity_id: 0,
262 });
263 }
264
265 let page_ids: Vec<u32> = self.pages.iter().map(|(id, _)| *id).collect();
267 for i in 0..self.pages.len() {
268 let prev = if i > 0 { page_ids[i - 1] } else { 0 };
269 let next = if i + 1 < page_ids.len() {
270 page_ids[i + 1]
271 } else {
272 0
273 };
274 let data = self.pages[i].1.as_bytes_mut();
275 data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&prev.to_le_bytes());
276 data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&next.to_le_bytes());
277 }
278
279 let first_page_id = page_ids[0];
281 let total_pages = self.pages.len();
282 for (page_id, page) in self.pages {
283 self.pager
284 .write_page_no_checksum(page_id, page)
285 .map_err(|e| format!("pager write: {e}"))?;
286 }
287
288 Ok(BulkWriteResult {
289 total_rows: self.total_rows,
290 total_pages: total_pages as u64,
291 first_page_id,
292 first_entity_id: self.next_id - self.total_rows,
293 })
294 }
295
296 fn seal_current_page(&mut self) -> Result<(), String> {
297 if let Some(mut page) = self.current.take() {
298 page.set_cell_count(self.cell_count);
299 let page_id = page.page_id();
300 self.pages.push((page_id, page));
301 self.cell_count = 0;
302 self.offset = LEAF_DATA_OFFSET;
303 }
304 Ok(())
305 }
306
307 fn allocate_new_page(&mut self) -> Result<(), String> {
308 let page = self
309 .pager
310 .allocate_page(PageType::BTreeLeaf)
311 .map_err(|e| format!("allocate page: {e}"))?;
312 self.current = Some(page);
313 self.offset = LEAF_DATA_OFFSET;
314 self.cell_count = 0;
315 Ok(())
316 }
317}
318
319#[derive(Debug)]
321pub struct BulkWriteResult {
322 pub total_rows: u64,
323 pub total_pages: u64,
324 pub first_page_id: u32,
325 pub first_entity_id: u64,
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_serialize_row() {
334 let row = vec![
335 Value::text("Alice".to_string()),
336 Value::Integer(30),
337 Value::Float(95.5),
338 Value::Boolean(true),
339 Value::Null,
340 ];
341 let bytes = serialize_row(&row);
342 assert_eq!(bytes[0], 5); assert_eq!(bytes[1], 1); assert!(bytes.len() < 64);
345 }
346
347 #[test]
348 fn test_serialize_row_compact() {
349 let row = vec![
351 Value::text("User_123".to_string()),
352 Value::text("user_123@test.com".to_string()),
353 Value::Integer(35),
354 Value::text("NYC".to_string()),
355 Value::Float(95.5),
356 Value::text("2024-01-01".to_string()),
357 ];
358 let bytes = serialize_row(&row);
359 println!("Row size: {} bytes", bytes.len());
361 assert!(
362 bytes.len() < 100,
363 "Row should be < 100 bytes, got {}",
364 bytes.len()
365 );
366 }
367}