Skip to main content

reddb_server/storage/engine/
bulk_writer.rs

1//! Page-Based Bulk Writer — like PostgreSQL COPY FROM
2//!
3//! Writes rows directly into B-tree leaf pages, bypassing:
4//! - UnifiedEntity object creation
5//! - HashMap allocation
6//! - B-tree traversal & splitting
7//! - Bloom filter, memtable, cross-refs
8//!
9//! Each row is serialized as a cell and packed sequentially into pages.
10//! Pages are linked as a doubly-linked list and written to the pager in batch.
11
12use std::sync::Arc;
13
14use super::page::{Page, PageType, CONTENT_SIZE, HEADER_SIZE, PAGE_SIZE};
15use super::pager::Pager;
16use crate::storage::schema::Value;
17
18/// Offset where leaf data starts (after header + prev/next leaf pointers)
19const LEAF_DATA_OFFSET: usize = HEADER_SIZE + 8; // 40 bytes
20
21/// Maximum usable space per leaf page for cell data
22const MAX_LEAF_DATA: usize = PAGE_SIZE - LEAF_DATA_OFFSET;
23
24/// Serialize a row's fields into a compact byte buffer.
25///
26/// Format: [num_fields: u8][field1_type: u8][field1_data]...[fieldN_type: u8][fieldN_data]
27/// Text: [type=1][len:u16][bytes]
28/// Int:  [type=2][i64 LE 8 bytes]
29/// Float:[type=3][f64 LE 8 bytes]
30/// Bool: [type=4][u8]
31/// Null: [type=0]
32#[inline]
33pub fn serialize_row(values: &[Value]) -> Vec<u8> {
34    let mut buf = Vec::with_capacity(64);
35    buf.push(values.len() as u8);
36
37    for val in values {
38        match val {
39            Value::Null => buf.push(0),
40            Value::Text(s) => {
41                buf.push(1);
42                let bytes = s.as_bytes();
43                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
44                buf.extend_from_slice(bytes);
45            }
46            Value::Integer(n) => {
47                buf.push(2);
48                buf.extend_from_slice(&n.to_le_bytes());
49            }
50            Value::Float(f) => {
51                buf.push(3);
52                buf.extend_from_slice(&f.to_le_bytes());
53            }
54            Value::Boolean(b) => {
55                buf.push(4);
56                buf.push(*b as u8);
57            }
58            Value::UnsignedInteger(n) => {
59                buf.push(5);
60                buf.extend_from_slice(&n.to_le_bytes());
61            }
62            // For all other types, serialize as text representation
63            other => {
64                buf.push(1);
65                let s = format!("{other:?}");
66                let bytes = s.as_bytes();
67                buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
68                buf.extend_from_slice(bytes);
69            }
70        }
71    }
72    buf
73}
74
75/// Bulk writer that packs rows directly into B-tree leaf pages.
76pub struct PageBulkWriter {
77    pager: Arc<Pager>,
78    /// Completed pages ready to write
79    pages: Vec<(u32, Page)>,
80    /// Current page being filled
81    current: Option<Page>,
82    /// Current write offset within the page
83    offset: usize,
84    /// Cell count in current page
85    cell_count: u16,
86    /// Total rows written
87    total_rows: u64,
88    /// Auto-incrementing entity ID
89    next_id: u64,
90}
91
92impl PageBulkWriter {
93    pub fn new(pager: Arc<Pager>, start_id: u64) -> Self {
94        Self {
95            pager,
96            pages: Vec::new(),
97            current: None,
98            offset: LEAF_DATA_OFFSET,
99            cell_count: 0,
100            total_rows: 0,
101            next_id: start_id,
102        }
103    }
104
105    /// Write a single row (key = entity_id as u64 LE, value = serialized row).
106    #[inline]
107    pub fn write_row(&mut self, values: &[Value]) -> Result<u64, String> {
108        let id = self.next_id;
109        self.next_id += 1;
110
111        let key = id.to_le_bytes();
112        let value = serialize_row(values);
113
114        // Cell format: [key_len:u16][val_len:u16][key][value]
115        let cell_size = 4 + key.len() + value.len();
116
117        if cell_size > MAX_LEAF_DATA {
118            return Err("row too large for page".into());
119        }
120
121        // Check if current page has space
122        if self.current.is_none() || self.offset + cell_size > PAGE_SIZE {
123            self.seal_current_page()?;
124            self.allocate_new_page()?;
125        }
126
127        // Write cell directly into page bytes
128        let page = self.current.as_mut().unwrap();
129        let data = page.as_bytes_mut();
130
131        data[self.offset..self.offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
132        data[self.offset + 2..self.offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
133        data[self.offset + 4..self.offset + 4 + key.len()].copy_from_slice(&key);
134        data[self.offset + 4 + key.len()..self.offset + 4 + key.len() + value.len()]
135            .copy_from_slice(&value);
136
137        self.offset += cell_size;
138        self.cell_count += 1;
139        self.total_rows += 1;
140
141        Ok(id)
142    }
143
144    /// Write a row DIRECTLY into page buffer — zero intermediate Vec allocation.
145    /// Serializes values inline into the current page's byte array.
146    #[inline]
147    pub fn write_row_direct(&mut self, values: &[Value]) -> Result<u64, String> {
148        let id = self.next_id;
149        self.next_id += 1;
150
151        // Estimate cell size (overestimate is fine — we check bounds)
152        let estimated_size = 4 + 8 + 1 + values.len() * 12; // key_len+val_len+key+header+worst_case
153        if self.current.is_none() || self.offset + estimated_size > PAGE_SIZE {
154            self.seal_current_page()?;
155            self.allocate_new_page()?;
156        }
157
158        let page = self.current.as_mut().unwrap();
159        let data = page.as_bytes_mut();
160
161        // Reserve space for cell header [key_len:u16][val_len:u16]
162        let header_pos = self.offset;
163        let key_start = header_pos + 4;
164
165        // Write key (entity ID as u64 LE)
166        data[key_start..key_start + 8].copy_from_slice(&id.to_le_bytes());
167        let mut pos = key_start + 8;
168
169        // Write value count
170        data[pos] = values.len() as u8;
171        pos += 1;
172
173        // Serialize each value DIRECTLY into page buffer
174        for val in values {
175            if pos >= PAGE_SIZE - 16 {
176                // Not enough space — fall back to next page
177                // (This shouldn't happen with proper estimation, but safety first)
178                self.offset = header_pos; // rewind
179                self.seal_current_page()?;
180                self.allocate_new_page()?;
181                // Retry via the Vec-based path
182                return self.write_row(values);
183            }
184            match val {
185                Value::Null => {
186                    data[pos] = 0;
187                    pos += 1;
188                }
189                Value::Text(s) => {
190                    let b = s.as_bytes();
191                    data[pos] = 1;
192                    pos += 1;
193                    data[pos..pos + 2].copy_from_slice(&(b.len() as u16).to_le_bytes());
194                    pos += 2;
195                    if pos + b.len() < PAGE_SIZE {
196                        data[pos..pos + b.len()].copy_from_slice(b);
197                        pos += b.len();
198                    }
199                }
200                Value::Integer(n) => {
201                    data[pos] = 2;
202                    pos += 1;
203                    data[pos..pos + 8].copy_from_slice(&n.to_le_bytes());
204                    pos += 8;
205                }
206                Value::Float(f) => {
207                    data[pos] = 3;
208                    pos += 1;
209                    data[pos..pos + 8].copy_from_slice(&f.to_le_bytes());
210                    pos += 8;
211                }
212                Value::Boolean(b) => {
213                    data[pos] = 4;
214                    pos += 1;
215                    data[pos] = *b as u8;
216                    pos += 1;
217                }
218                Value::UnsignedInteger(n) => {
219                    data[pos] = 5;
220                    pos += 1;
221                    data[pos..pos + 8].copy_from_slice(&n.to_le_bytes());
222                    pos += 8;
223                }
224                _ => {
225                    data[pos] = 0;
226                    pos += 1; // null for unsupported types in direct mode
227                }
228            }
229        }
230
231        // Write cell header retroactively
232        let val_len = (pos - key_start - 8) as u16;
233        data[header_pos..header_pos + 2].copy_from_slice(&8u16.to_le_bytes()); // key_len = 8
234        data[header_pos + 2..header_pos + 4].copy_from_slice(&val_len.to_le_bytes());
235
236        self.offset = pos;
237        self.cell_count += 1;
238        self.total_rows += 1;
239
240        Ok(id)
241    }
242
243    /// Write many rows at once.
244    pub fn write_rows(&mut self, rows: &[Vec<Value>]) -> Result<Vec<u64>, String> {
245        let mut ids = Vec::with_capacity(rows.len());
246        for row in rows {
247            ids.push(self.write_row(row)?);
248        }
249        Ok(ids)
250    }
251
252    /// Finish writing — seal current page, link all pages, write to pager.
253    pub fn finish(mut self) -> Result<BulkWriteResult, String> {
254        self.seal_current_page()?;
255
256        if self.pages.is_empty() {
257            return Ok(BulkWriteResult {
258                total_rows: 0,
259                total_pages: 0,
260                first_page_id: 0,
261                first_entity_id: 0,
262            });
263        }
264
265        // Link leaf pages (doubly-linked list)
266        let page_ids: Vec<u32> = self.pages.iter().map(|(id, _)| *id).collect();
267        for i in 0..self.pages.len() {
268            let prev = if i > 0 { page_ids[i - 1] } else { 0 };
269            let next = if i + 1 < page_ids.len() {
270                page_ids[i + 1]
271            } else {
272                0
273            };
274            let data = self.pages[i].1.as_bytes_mut();
275            data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&prev.to_le_bytes());
276            data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&next.to_le_bytes());
277        }
278
279        // Write all pages to pager — skip per-page checksum for speed
280        let first_page_id = page_ids[0];
281        let total_pages = self.pages.len();
282        for (page_id, page) in self.pages {
283            self.pager
284                .write_page_no_checksum(page_id, page)
285                .map_err(|e| format!("pager write: {e}"))?;
286        }
287
288        Ok(BulkWriteResult {
289            total_rows: self.total_rows,
290            total_pages: total_pages as u64,
291            first_page_id,
292            first_entity_id: self.next_id - self.total_rows,
293        })
294    }
295
296    fn seal_current_page(&mut self) -> Result<(), String> {
297        if let Some(mut page) = self.current.take() {
298            page.set_cell_count(self.cell_count);
299            let page_id = page.page_id();
300            self.pages.push((page_id, page));
301            self.cell_count = 0;
302            self.offset = LEAF_DATA_OFFSET;
303        }
304        Ok(())
305    }
306
307    fn allocate_new_page(&mut self) -> Result<(), String> {
308        let page = self
309            .pager
310            .allocate_page(PageType::BTreeLeaf)
311            .map_err(|e| format!("allocate page: {e}"))?;
312        self.current = Some(page);
313        self.offset = LEAF_DATA_OFFSET;
314        self.cell_count = 0;
315        Ok(())
316    }
317}
318
319/// Result of a bulk write operation.
320#[derive(Debug)]
321pub struct BulkWriteResult {
322    pub total_rows: u64,
323    pub total_pages: u64,
324    pub first_page_id: u32,
325    pub first_entity_id: u64,
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_serialize_row() {
334        let row = vec![
335            Value::text("Alice".to_string()),
336            Value::Integer(30),
337            Value::Float(95.5),
338            Value::Boolean(true),
339            Value::Null,
340        ];
341        let bytes = serialize_row(&row);
342        assert_eq!(bytes[0], 5); // num_fields
343        assert_eq!(bytes[1], 1); // type=text
344        assert!(bytes.len() < 64);
345    }
346
347    #[test]
348    fn test_serialize_row_compact() {
349        // A typical user row: name(text), email(text), age(int), city(text), score(float), ts(text)
350        let row = vec![
351            Value::text("User_123".to_string()),
352            Value::text("user_123@test.com".to_string()),
353            Value::Integer(35),
354            Value::text("NYC".to_string()),
355            Value::Float(95.5),
356            Value::text("2024-01-01".to_string()),
357        ];
358        let bytes = serialize_row(&row);
359        // Should be very compact: ~60 bytes for a typical row
360        println!("Row size: {} bytes", bytes.len());
361        assert!(
362            bytes.len() < 100,
363            "Row should be < 100 bytes, got {}",
364            bytes.len()
365        );
366    }
367}