Skip to main content

we_trust_sqlite/writer/
btree.rs

1use super::*;
2use crate::utils::{encode_record, encode_varint, parse_varint};
3use tokio::fs::OpenOptions;
4use tokio::io::{AsyncReadExt, AsyncWriteExt, SeekFrom};
5use yykv_types::{DsError, DsValue};
6
7type Result<T> = std::result::Result<T, DsError>;
8
9impl NativeWriter {
10    // --- Table B-Tree Write Logic ---
11
12    /// 初始化一个新的叶子表页面
13    pub async fn init_leaf_page(&self, page_id: u32) -> Result<()> {
14        self.ensure_initialized().await?;
15        let mut file = OpenOptions::new()
16            .write(true)
17            .open(&self.path)
18            .await
19            .map_err(|e| DsError::io(e.to_string()))?;
20        let page_size = self.get_page_size().await?;
21        let offset = (page_id as u64 - 1) * page_size as u64;
22
23        let mut page_data = vec![0u8; page_size as usize];
24
25        // Page Header
26        // Byte 0: Page Type (0x0D for Leaf Table)
27        page_data[0] = 0x0D;
28        // Bytes 1-2: First freeblock (0)
29        // Bytes 3-4: Number of cells (0)
30        // Bytes 5-6: Start of cell content area (page_size)
31        // Bytes 7: Fragmented free bytes (0)
32
33        let header_offset = if page_id == 1 { 100 } else { 0 };
34        page_data[header_offset] = 0x0D;
35
36        let content_start = page_size as u16;
37        page_data[header_offset + 5..header_offset + 7]
38            .copy_from_slice(&content_start.to_be_bytes());
39
40        file.seek(SeekFrom::Start(offset))
41            .await
42            .map_err(|e| DsError::io(e.to_string()))?;
43        file.write_all(&page_data)
44            .await
45            .map_err(|e| DsError::io(e.to_string()))?;
46
47        Ok(())
48    }
49
50    /// 在叶子表页面插入一行数据
51    pub async fn insert_into_leaf_page(
52        &self,
53        page_id: u32,
54        row_id: i64,
55        values: &[DsValue],
56        path: Vec<u32>,
57    ) -> Result<()> {
58        self.ensure_initialized().await?;
59        let mut file = OpenOptions::new()
60            .read(true)
61            .write(true)
62            .open(&self.path)
63            .await
64            .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
65        let page_size = self.get_page_size().await?;
66        let offset = (page_id as u64 - 1) * (page_size as u64);
67
68        let mut page_data = vec![0u8; page_size as usize];
69        file.seek(SeekFrom::Start(offset))
70            .await
71            .map_err(|e| DsError::io(e.to_string()))?;
72        file.read_exact(&mut page_data)
73            .await
74            .map_err(|e| DsError::io(e.to_string()))?;
75
76        let record = encode_record(values);
77        let payload_size = record.len() as u64;
78
79        let (stored_len, has_overflow) =
80            self.get_stored_payload_info(0x0D, payload_size, page_size);
81        let mut cell_data = Vec::new();
82        cell_data.extend(encode_varint(payload_size));
83        cell_data.extend(encode_varint(row_id as u64));
84        cell_data.extend_from_slice(&record[..stored_len]);
85
86        if has_overflow {
87            let overflow_page_id = self.write_overflow_pages(&record[stored_len..]).await?;
88            cell_data.extend_from_slice(&overflow_page_id.to_be_bytes());
89        }
90
91        let header_offset = if page_id == 1 { 100 } else { 0 };
92        let cell_count =
93            u16::from_be_bytes([page_data[header_offset + 3], page_data[header_offset + 4]]);
94        let cell_content_offset =
95            u16::from_be_bytes([page_data[header_offset + 5], page_data[header_offset + 6]]);
96        let cell_content_offset = if cell_content_offset == 0 {
97            page_size as u16
98        } else {
99            cell_content_offset
100        };
101
102        let pointer_array_end = header_offset + 8 + (cell_count as usize * 2);
103        let free_space = cell_content_offset as usize - pointer_array_end;
104
105        if cell_data.len() + 2 > free_space {
106            return Box::pin(self.split_table_leaf_page(page_id, row_id, values, path)).await;
107        }
108
109        let new_cell_offset = cell_content_offset - cell_data.len() as u16;
110        page_data[new_cell_offset as usize..new_cell_offset as usize + cell_data.len()]
111            .copy_from_slice(&cell_data);
112
113        let mut pointer_array = Vec::new();
114        for i in 0..cell_count {
115            let off = header_offset + 8 + (i as usize * 2);
116            pointer_array.push(u16::from_be_bytes([page_data[off], page_data[off + 1]]));
117        }
118
119        let mut insert_pos = cell_count as usize;
120        let mut is_update = false;
121        for i in 0..cell_count {
122            let cell_offset = pointer_array[i as usize] as usize;
123            let (_p_size, consumed) = parse_varint(&page_data[cell_offset..]);
124            let (existing_row_id, _) = parse_varint(&page_data[cell_offset + consumed..]);
125            if (existing_row_id as i64) == row_id {
126                insert_pos = i as usize;
127                is_update = true;
128                break;
129            } else if (existing_row_id as i64) > row_id {
130                insert_pos = i as usize;
131                break;
132            }
133        }
134
135        if is_update {
136            // Simple update: just replace the pointer in the array.
137            // Note: This leaves the old cell as "fragmented" or "free" space.
138            // SQLite would normally add it to the freeblock list.
139            pointer_array[insert_pos] = new_cell_offset;
140        } else {
141            pointer_array.insert(insert_pos, new_cell_offset);
142        }
143
144        let final_cell_count = pointer_array.len() as u16;
145        page_data[header_offset + 3..header_offset + 5]
146            .copy_from_slice(&final_cell_count.to_be_bytes());
147        page_data[header_offset + 5..header_offset + 7]
148            .copy_from_slice(&new_cell_offset.to_be_bytes());
149
150        for (i, &ptr) in pointer_array.iter().enumerate() {
151            let off = header_offset + 8 + (i * 2);
152            page_data[off..off + 2].copy_from_slice(&ptr.to_be_bytes());
153        }
154
155        file.seek(SeekFrom::Start(offset))
156            .await
157            .map_err(|e| DsError::io(e.to_string()))?;
158        file.write_all(&page_data)
159            .await
160            .map_err(|e| DsError::io(e.to_string()))?;
161
162        Ok(())
163    }
164
165    /// 从叶子表页面删除一行数据
166    pub async fn delete_from_leaf_page(&self, page_id: u32, row_id: i64) -> Result<bool> {
167        self.ensure_initialized().await?;
168        let mut file = OpenOptions::new()
169            .read(true)
170            .write(true)
171            .open(&self.path)
172            .await
173            .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
174        let page_size = self.get_page_size().await?;
175        let offset = (page_id as u64 - 1) * (page_size as u64);
176
177        let mut page_data = vec![0u8; page_size as usize];
178        file.seek(SeekFrom::Start(offset))
179            .await
180            .map_err(|e| DsError::io(e.to_string()))?;
181        file.read_exact(&mut page_data)
182            .await
183            .map_err(|e| DsError::io(e.to_string()))?;
184
185        let header_offset = if page_id == 1 { 100 } else { 0 };
186        let cell_count =
187            u16::from_be_bytes([page_data[header_offset + 3], page_data[header_offset + 4]]);
188
189        let mut pointer_array = Vec::new();
190        let mut target_idx = None;
191        for i in 0..cell_count {
192            let off = header_offset + 8 + (i as usize * 2);
193            let cell_ptr = u16::from_be_bytes([page_data[off], page_data[off + 1]]);
194
195            let (_p_size, consumed) = parse_varint(&page_data[cell_ptr as usize..]);
196            let (existing_row_id, _) = parse_varint(&page_data[cell_ptr as usize + consumed..]);
197
198            if (existing_row_id as i64) == row_id {
199                target_idx = Some(i as usize);
200            } else {
201                pointer_array.push(cell_ptr);
202            }
203        }
204
205        if let Some(_) = target_idx {
206            let new_cell_count = pointer_array.len() as u16;
207            page_data[header_offset + 3..header_offset + 5]
208                .copy_from_slice(&new_cell_count.to_be_bytes());
209
210            // Clear old pointer array
211            for i in 0..cell_count {
212                let off = header_offset + 8 + (i as usize * 2);
213                page_data[off..off + 2].copy_from_slice(&[0, 0]);
214            }
215
216            // Write new pointer array
217            for (i, &ptr) in pointer_array.iter().enumerate() {
218                let off = header_offset + 8 + (i * 2);
219                page_data[off..off + 2].copy_from_slice(&ptr.to_be_bytes());
220            }
221
222            file.seek(SeekFrom::Start(offset))
223                .await
224                .map_err(|e| DsError::io(e.to_string()))?;
225            file.write_all(&page_data)
226                .await
227                .map_err(|e| DsError::io(e.to_string()))?;
228            Ok(true)
229        } else {
230            Ok(false)
231        }
232    }
233
234    /// 拆分表叶子页面
235    async fn split_table_leaf_page(
236        &self,
237        page_id: u32,
238        row_id: i64,
239        values: &[DsValue],
240        mut path: Vec<u32>,
241    ) -> Result<()> {
242        let mut file = OpenOptions::new()
243            .read(true)
244            .write(true)
245            .open(&self.path)
246            .await
247            .map_err(|e| DsError::io(e.to_string()))?;
248        let page_size = self.get_page_size().await?;
249        let offset = (page_id as u64 - 1) * page_size as u64;
250
251        let mut old_page_data = vec![0u8; page_size as usize];
252        file.seek(SeekFrom::Start(offset))
253            .await
254            .map_err(|e| DsError::io(e.to_string()))?;
255        file.read_exact(&mut old_page_data)
256            .await
257            .map_err(|e| DsError::io(e.to_string()))?;
258
259        let header_offset = if page_id == 1 { 100 } else { 0 };
260        let cell_count = u16::from_be_bytes([
261            old_page_data[header_offset + 3],
262            old_page_data[header_offset + 4],
263        ]);
264
265        let mut all_cells = Vec::new();
266        for i in 0..cell_count {
267            let off = u16::from_be_bytes([
268                old_page_data[header_offset + 8 + i as usize * 2],
269                old_page_data[header_offset + 9 + i as usize * 2],
270            ]) as usize;
271            let (payload_size, consumed) = parse_varint(&old_page_data[off..]);
272            let (rid, consumed2) = parse_varint(&old_page_data[off + consumed..]);
273            let (stored_len, has_overflow) =
274                self.get_stored_payload_info(0x0D, payload_size, page_size);
275            let total_size = consumed + consumed2 + stored_len + if has_overflow { 4 } else { 0 };
276            all_cells.push((rid as i64, old_page_data[off..off + total_size].to_vec()));
277        }
278
279        let record = encode_record(values);
280        let payload_size = record.len() as u64;
281        let (stored_len, has_overflow) =
282            self.get_stored_payload_info(0x0D, payload_size, page_size);
283        let mut new_cell_data = Vec::new();
284        new_cell_data.extend(encode_varint(payload_size));
285        new_cell_data.extend(encode_varint(row_id as u64));
286        new_cell_data.extend_from_slice(&record[..stored_len]);
287        if has_overflow {
288            let overflow_page_id = self.write_overflow_pages(&record[stored_len..]).await?;
289            new_cell_data.extend_from_slice(&overflow_page_id.to_be_bytes());
290        }
291        all_cells.push((row_id, new_cell_data));
292        all_cells.sort_by_key(|c| c.0);
293
294        let new_page_id = self.allocate_page().await?;
295        let mut new_page_data = vec![0u8; page_size as usize];
296        new_page_data[0] = 0x0D;
297
298        let mid_idx = all_cells.len() / 2;
299        let mut updated_old_page_data = vec![0u8; page_size as usize];
300        updated_old_page_data[..header_offset].copy_from_slice(&old_page_data[..header_offset]);
301        updated_old_page_data[header_offset] = 0x0D;
302        let mut old_cell_content_offset = page_size as u16;
303        let mut old_pointer_array = Vec::new();
304        for i in 0..mid_idx {
305            let cell_data = &all_cells[i].1;
306            old_cell_content_offset -= cell_data.len() as u16;
307            updated_old_page_data[old_cell_content_offset as usize
308                ..old_cell_content_offset as usize + cell_data.len()]
309                .copy_from_slice(cell_data);
310            old_pointer_array.push(old_cell_content_offset);
311        }
312        updated_old_page_data[header_offset + 3..header_offset + 5]
313            .copy_from_slice(&(old_pointer_array.len() as u16).to_be_bytes());
314        updated_old_page_data[header_offset + 5..header_offset + 7]
315            .copy_from_slice(&old_cell_content_offset.to_be_bytes());
316        for (i, &ptr) in old_pointer_array.iter().enumerate() {
317            updated_old_page_data[header_offset + 8 + i * 2..header_offset + 10 + i * 2]
318                .copy_from_slice(&ptr.to_be_bytes());
319        }
320
321        let mut new_cell_content_offset = page_size as u16;
322        let mut new_pointer_array = Vec::new();
323        for i in mid_idx..all_cells.len() {
324            let cell_data = &all_cells[i].1;
325            new_cell_content_offset -= cell_data.len() as u16;
326            new_page_data[new_cell_content_offset as usize
327                ..new_cell_content_offset as usize + cell_data.len()]
328                .copy_from_slice(cell_data);
329            new_pointer_array.push(new_cell_content_offset);
330        }
331        new_page_data[3..5].copy_from_slice(&(new_pointer_array.len() as u16).to_be_bytes());
332        new_page_data[5..7].copy_from_slice(&new_cell_content_offset.to_be_bytes());
333        for (i, &ptr) in new_pointer_array.iter().enumerate() {
334            new_page_data[8 + i * 2..10 + i * 2].copy_from_slice(&ptr.to_be_bytes());
335        }
336
337        file.seek(SeekFrom::Start(offset))
338            .await
339            .map_err(|e| DsError::io(e.to_string()))?;
340        file.write_all(&updated_old_page_data)
341            .await
342            .map_err(|e| DsError::io(e.to_string()))?;
343
344        let new_page_offset = (new_page_id as u64 - 1) * page_size as u64;
345        file.seek(SeekFrom::Start(new_page_offset))
346            .await
347            .map_err(|e| DsError::io(e.to_string()))?;
348        file.write_all(&new_page_data)
349            .await
350            .map_err(|e| DsError::io(e.to_string()))?;
351
352        let separator_key = all_cells[mid_idx].0;
353        path.pop();
354        if let Some(parent_id) = path.last() {
355            Box::pin(self.insert_into_table_interior_page(
356                *parent_id,
357                page_id,
358                separator_key,
359                path,
360            ))
361            .await?;
362        } else {
363            self.create_new_table_root(page_id, new_page_id, separator_key)
364                .await?;
365        }
366        Ok(())
367    }
368
369    /// 插入到表内部页面
370    async fn insert_into_table_interior_page(
371        &self,
372        page_id: u32,
373        left_child: u32,
374        key: i64,
375        path: Vec<u32>,
376    ) -> Result<()> {
377        let mut file = OpenOptions::new()
378            .read(true)
379            .write(true)
380            .open(&self.path)
381            .await
382            .map_err(|e| DsError::io(e.to_string()))?;
383        let page_size = self.get_page_size().await?;
384        let offset = (page_id as u64 - 1) * page_size as u64;
385
386        let mut page_data = vec![0u8; page_size as usize];
387        file.seek(SeekFrom::Start(offset))
388            .await
389            .map_err(|e| DsError::io(e.to_string()))?;
390        file.read_exact(&mut page_data)
391            .await
392            .map_err(|e| DsError::io(e.to_string()))?;
393
394        let header_offset = if page_id == 1 { 100 } else { 0 };
395        let cell_count =
396            u16::from_be_bytes([page_data[header_offset + 3], page_data[header_offset + 4]]);
397        let cell_content_offset =
398            u16::from_be_bytes([page_data[header_offset + 5], page_data[header_offset + 6]]);
399        let cell_content_offset = if cell_content_offset == 0 {
400            page_size as u16
401        } else {
402            cell_content_offset
403        };
404
405        let mut cell_data = Vec::new();
406        cell_data.extend_from_slice(&left_child.to_be_bytes());
407        cell_data.extend(encode_varint(key as u64));
408
409        let pointer_array_end = header_offset + 12 + (cell_count as usize * 2);
410        let free_space = cell_content_offset as usize - pointer_array_end;
411
412        if cell_data.len() + 2 > free_space {
413            return Box::pin(self.split_table_interior_page(page_id, left_child, key, path)).await;
414        }
415
416        let new_cell_offset = cell_content_offset - cell_data.len() as u16;
417        page_data[new_cell_offset as usize..new_cell_offset as usize + cell_data.len()]
418            .copy_from_slice(&cell_data);
419
420        let mut pointer_array = Vec::new();
421        for i in 0..cell_count {
422            let off = header_offset + 12 + (i as usize * 2);
423            pointer_array.push(u16::from_be_bytes([page_data[off], page_data[off + 1]]));
424        }
425
426        let mut insert_pos = cell_count as usize;
427        for i in 0..cell_count {
428            let off = pointer_array[i as usize] as usize;
429            let (existing_key, _) = parse_varint(&page_data[off + 4..]);
430            if existing_key as i64 >= key {
431                insert_pos = i as usize;
432                break;
433            }
434        }
435        pointer_array.insert(insert_pos, new_cell_offset);
436
437        for (i, &ptr) in pointer_array.iter().enumerate() {
438            let off = header_offset + 12 + (i * 2);
439            page_data[off..off + 2].copy_from_slice(&ptr.to_be_bytes());
440        }
441
442        let new_cell_count = cell_count + 1;
443        page_data[header_offset + 3..header_offset + 5]
444            .copy_from_slice(&new_cell_count.to_be_bytes());
445        page_data[header_offset + 5..header_offset + 7]
446            .copy_from_slice(&new_cell_offset.to_be_bytes());
447
448        file.seek(SeekFrom::Start(offset))
449            .await
450            .map_err(|e| DsError::io(e.to_string()))?;
451        file.write_all(&page_data)
452            .await
453            .map_err(|e| DsError::io(e.to_string()))?;
454
455        Ok(())
456    }
457
458    /// 表内部页面拆分
459    async fn split_table_interior_page(
460        &self,
461        page_id: u32,
462        new_left_child: u32,
463        new_key: i64,
464        mut path: Vec<u32>,
465    ) -> Result<()> {
466        let mut file = OpenOptions::new()
467            .read(true)
468            .write(true)
469            .open(&self.path)
470            .await
471            .map_err(|e| DsError::io(e.to_string()))?;
472        let page_size = self.get_page_size().await?;
473        let offset = (page_id as u64 - 1) * page_size as u64;
474
475        let mut old_page_data = vec![0u8; page_size as usize];
476        file.seek(SeekFrom::Start(offset))
477            .await
478            .map_err(|e| DsError::io(e.to_string()))?;
479        file.read_exact(&mut old_page_data)
480            .await
481            .map_err(|e| DsError::io(e.to_string()))?;
482
483        let header_offset = if page_id == 1 { 100 } else { 0 };
484        let cell_count = u16::from_be_bytes([
485            old_page_data[header_offset + 3],
486            old_page_data[header_offset + 4],
487        ]);
488        let right_most_ptr = u32::from_be_bytes([
489            old_page_data[header_offset + 8],
490            old_page_data[header_offset + 9],
491            old_page_data[header_offset + 10],
492            old_page_data[header_offset + 11],
493        ]);
494
495        let mut all_cells = Vec::new();
496        for i in 0..cell_count {
497            let off = u16::from_be_bytes([
498                old_page_data[header_offset + 12 + i as usize * 2],
499                old_page_data[header_offset + 13 + i as usize * 2],
500            ]) as usize;
501            let (k, consumed) = parse_varint(&old_page_data[off + 4..]);
502            all_cells.push((k as i64, old_page_data[off..off + 4 + consumed].to_vec()));
503        }
504
505        let mut new_cell_data = Vec::new();
506        new_cell_data.extend_from_slice(&new_left_child.to_be_bytes());
507        new_cell_data.extend(encode_varint(new_key as u64));
508        all_cells.push((new_key, new_cell_data));
509        all_cells.sort_by_key(|c| c.0);
510
511        let mid_idx = all_cells.len() / 2;
512        let separator_key = all_cells[mid_idx].0;
513        let new_right_ptr = u32::from_be_bytes([
514            all_cells[mid_idx].1[0],
515            all_cells[mid_idx].1[1],
516            all_cells[mid_idx].1[2],
517            all_cells[mid_idx].1[3],
518        ]);
519
520        let mut updated_old_page_data = vec![0u8; page_size as usize];
521        updated_old_page_data[..header_offset].copy_from_slice(&old_page_data[..header_offset]);
522        updated_old_page_data[header_offset] = 0x05;
523        updated_old_page_data[header_offset + 8..header_offset + 12]
524            .copy_from_slice(&new_right_ptr.to_be_bytes());
525        let mut old_cell_content_offset = page_size as u16;
526        let mut old_pointer_array = Vec::new();
527        for i in 0..mid_idx {
528            let cell_data = &all_cells[i].1;
529            old_cell_content_offset -= cell_data.len() as u16;
530            updated_old_page_data[old_cell_content_offset as usize
531                ..old_cell_content_offset as usize + cell_data.len()]
532                .copy_from_slice(cell_data);
533            old_pointer_array.push(old_cell_content_offset);
534        }
535        updated_old_page_data[header_offset + 3..header_offset + 5]
536            .copy_from_slice(&(old_pointer_array.len() as u16).to_be_bytes());
537        updated_old_page_data[header_offset + 5..header_offset + 7]
538            .copy_from_slice(&old_cell_content_offset.to_be_bytes());
539        for (i, &ptr) in old_pointer_array.iter().enumerate() {
540            updated_old_page_data[header_offset + 12 + i * 2..header_offset + 14 + i * 2]
541                .copy_from_slice(&ptr.to_be_bytes());
542        }
543
544        let new_page_id = self.allocate_page().await?;
545        let mut new_page_data = vec![0u8; page_size as usize];
546        new_page_data[0] = 0x05;
547        new_page_data[8..12].copy_from_slice(&right_most_ptr.to_be_bytes());
548        let mut new_cell_content_offset = page_size as u16;
549        let mut new_pointer_array = Vec::new();
550        for i in mid_idx + 1..all_cells.len() {
551            let cell_data = &all_cells[i].1;
552            new_cell_content_offset -= cell_data.len() as u16;
553            new_page_data[new_cell_content_offset as usize
554                ..new_cell_content_offset as usize + cell_data.len()]
555                .copy_from_slice(cell_data);
556            new_pointer_array.push(new_cell_content_offset);
557        }
558        new_page_data[3..5].copy_from_slice(&(new_pointer_array.len() as u16).to_be_bytes());
559        new_page_data[5..7].copy_from_slice(&new_cell_content_offset.to_be_bytes());
560        for (i, &ptr) in new_pointer_array.iter().enumerate() {
561            new_page_data[12 + i * 2..14 + i * 2].copy_from_slice(&ptr.to_be_bytes());
562        }
563
564        file.seek(SeekFrom::Start(offset))
565            .await
566            .map_err(|e| DsError::io(e.to_string()))?;
567        file.write_all(&updated_old_page_data)
568            .await
569            .map_err(|e| DsError::io(e.to_string()))?;
570
571        let new_page_offset = (new_page_id as u64 - 1) * page_size as u64;
572        file.seek(SeekFrom::Start(new_page_offset))
573            .await
574            .map_err(|e| DsError::io(e.to_string()))?;
575        file.write_all(&new_page_data)
576            .await
577            .map_err(|e| DsError::io(e.to_string()))?;
578
579        path.pop();
580        if let Some(parent_id) = path.last() {
581            Box::pin(self.insert_into_table_interior_page(
582                *parent_id,
583                page_id,
584                separator_key,
585                path,
586            ))
587            .await?;
588        } else {
589            self.create_new_table_root(page_id, new_page_id, separator_key)
590                .await?;
591        }
592        Ok(())
593    }
594
595    /// 创建新的表根页面
596    async fn create_new_table_root(
597        &self,
598        left_child: u32,
599        right_child: u32,
600        key: i64,
601    ) -> Result<()> {
602        let mut file = OpenOptions::new()
603            .read(true)
604            .write(true)
605            .open(&self.path)
606            .await
607            .map_err(|e| DsError::io(e.to_string()))?;
608        let page_size = self.get_page_size().await?;
609
610        let mut root_page_data = vec![0u8; page_size as usize];
611        file.seek(SeekFrom::Start(0))
612            .await
613            .map_err(|e| DsError::io(e.to_string()))?;
614        file.read_exact(&mut root_page_data)
615            .await
616            .map_err(|e| DsError::io(e.to_string()))?;
617
618        // Root becomes an interior table page (0x05)
619        root_page_data[100] = 0x05;
620        root_page_data[101] = 0; // freeblock
621        root_page_data[102] = 0;
622        root_page_data[103] = 0; // cell count
623        root_page_data[104] = 1;
624
625        let mut cell_data = Vec::new();
626        cell_data.extend_from_slice(&left_child.to_be_bytes());
627        cell_data.extend(encode_varint(key as u64));
628
629        let cell_offset = page_size as u16 - cell_data.len() as u16;
630        root_page_data[cell_offset as usize..cell_offset as usize + cell_data.len()]
631            .copy_from_slice(&cell_data);
632        root_page_data[105..107].copy_from_slice(&cell_offset.to_be_bytes());
633        root_page_data[108..112].copy_from_slice(&right_child.to_be_bytes());
634        root_page_data[112..114].copy_from_slice(&cell_offset.to_be_bytes());
635
636        file.seek(SeekFrom::Start(0))
637            .await
638            .map_err(|e| DsError::io(e.to_string()))?;
639        file.write_all(&root_page_data)
640            .await
641            .map_err(|e| DsError::io(e.to_string()))?;
642        Ok(())
643    }
644}