1use super::*;
2use crate::utils::{encode_record, encode_varint, parse_varint};
3use tokio::fs::OpenOptions;
4use tokio::io::{AsyncReadExt, AsyncWriteExt, SeekFrom};
5use yykv_types::{DsError, DsValue};
6
7type Result<T> = std::result::Result<T, DsError>;
8
9impl NativeWriter {
10 pub async fn init_leaf_page(&self, page_id: u32) -> Result<()> {
14 self.ensure_initialized().await?;
15 let mut file = OpenOptions::new()
16 .write(true)
17 .open(&self.path)
18 .await
19 .map_err(|e| DsError::io(e.to_string()))?;
20 let page_size = self.get_page_size().await?;
21 let offset = (page_id as u64 - 1) * page_size as u64;
22
23 let mut page_data = vec![0u8; page_size as usize];
24
25 page_data[0] = 0x0D;
28 let header_offset = if page_id == 1 { 100 } else { 0 };
34 page_data[header_offset] = 0x0D;
35
36 let content_start = page_size as u16;
37 page_data[header_offset + 5..header_offset + 7]
38 .copy_from_slice(&content_start.to_be_bytes());
39
40 file.seek(SeekFrom::Start(offset))
41 .await
42 .map_err(|e| DsError::io(e.to_string()))?;
43 file.write_all(&page_data)
44 .await
45 .map_err(|e| DsError::io(e.to_string()))?;
46
47 Ok(())
48 }
49
50 pub async fn insert_into_leaf_page(
52 &self,
53 page_id: u32,
54 row_id: i64,
55 values: &[DsValue],
56 path: Vec<u32>,
57 ) -> Result<()> {
58 self.ensure_initialized().await?;
59 let mut file = OpenOptions::new()
60 .read(true)
61 .write(true)
62 .open(&self.path)
63 .await
64 .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
65 let page_size = self.get_page_size().await?;
66 let offset = (page_id as u64 - 1) * (page_size as u64);
67
68 let mut page_data = vec![0u8; page_size as usize];
69 file.seek(SeekFrom::Start(offset))
70 .await
71 .map_err(|e| DsError::io(e.to_string()))?;
72 file.read_exact(&mut page_data)
73 .await
74 .map_err(|e| DsError::io(e.to_string()))?;
75
76 let record = encode_record(values);
77 let payload_size = record.len() as u64;
78
79 let (stored_len, has_overflow) =
80 self.get_stored_payload_info(0x0D, payload_size, page_size);
81 let mut cell_data = Vec::new();
82 cell_data.extend(encode_varint(payload_size));
83 cell_data.extend(encode_varint(row_id as u64));
84 cell_data.extend_from_slice(&record[..stored_len]);
85
86 if has_overflow {
87 let overflow_page_id = self.write_overflow_pages(&record[stored_len..]).await?;
88 cell_data.extend_from_slice(&overflow_page_id.to_be_bytes());
89 }
90
91 let header_offset = if page_id == 1 { 100 } else { 0 };
92 let cell_count =
93 u16::from_be_bytes([page_data[header_offset + 3], page_data[header_offset + 4]]);
94 let cell_content_offset =
95 u16::from_be_bytes([page_data[header_offset + 5], page_data[header_offset + 6]]);
96 let cell_content_offset = if cell_content_offset == 0 {
97 page_size as u16
98 } else {
99 cell_content_offset
100 };
101
102 let pointer_array_end = header_offset + 8 + (cell_count as usize * 2);
103 let free_space = cell_content_offset as usize - pointer_array_end;
104
105 if cell_data.len() + 2 > free_space {
106 return Box::pin(self.split_table_leaf_page(page_id, row_id, values, path)).await;
107 }
108
109 let new_cell_offset = cell_content_offset - cell_data.len() as u16;
110 page_data[new_cell_offset as usize..new_cell_offset as usize + cell_data.len()]
111 .copy_from_slice(&cell_data);
112
113 let mut pointer_array = Vec::new();
114 for i in 0..cell_count {
115 let off = header_offset + 8 + (i as usize * 2);
116 pointer_array.push(u16::from_be_bytes([page_data[off], page_data[off + 1]]));
117 }
118
119 let mut insert_pos = cell_count as usize;
120 let mut is_update = false;
121 for i in 0..cell_count {
122 let cell_offset = pointer_array[i as usize] as usize;
123 let (_p_size, consumed) = parse_varint(&page_data[cell_offset..]);
124 let (existing_row_id, _) = parse_varint(&page_data[cell_offset + consumed..]);
125 if (existing_row_id as i64) == row_id {
126 insert_pos = i as usize;
127 is_update = true;
128 break;
129 } else if (existing_row_id as i64) > row_id {
130 insert_pos = i as usize;
131 break;
132 }
133 }
134
135 if is_update {
136 pointer_array[insert_pos] = new_cell_offset;
140 } else {
141 pointer_array.insert(insert_pos, new_cell_offset);
142 }
143
144 let final_cell_count = pointer_array.len() as u16;
145 page_data[header_offset + 3..header_offset + 5]
146 .copy_from_slice(&final_cell_count.to_be_bytes());
147 page_data[header_offset + 5..header_offset + 7]
148 .copy_from_slice(&new_cell_offset.to_be_bytes());
149
150 for (i, &ptr) in pointer_array.iter().enumerate() {
151 let off = header_offset + 8 + (i * 2);
152 page_data[off..off + 2].copy_from_slice(&ptr.to_be_bytes());
153 }
154
155 file.seek(SeekFrom::Start(offset))
156 .await
157 .map_err(|e| DsError::io(e.to_string()))?;
158 file.write_all(&page_data)
159 .await
160 .map_err(|e| DsError::io(e.to_string()))?;
161
162 Ok(())
163 }
164
165 pub async fn delete_from_leaf_page(&self, page_id: u32, row_id: i64) -> Result<bool> {
167 self.ensure_initialized().await?;
168 let mut file = OpenOptions::new()
169 .read(true)
170 .write(true)
171 .open(&self.path)
172 .await
173 .map_err(|e| DsError::io(format!("Failed to open database file: {}", e)))?;
174 let page_size = self.get_page_size().await?;
175 let offset = (page_id as u64 - 1) * (page_size as u64);
176
177 let mut page_data = vec![0u8; page_size as usize];
178 file.seek(SeekFrom::Start(offset))
179 .await
180 .map_err(|e| DsError::io(e.to_string()))?;
181 file.read_exact(&mut page_data)
182 .await
183 .map_err(|e| DsError::io(e.to_string()))?;
184
185 let header_offset = if page_id == 1 { 100 } else { 0 };
186 let cell_count =
187 u16::from_be_bytes([page_data[header_offset + 3], page_data[header_offset + 4]]);
188
189 let mut pointer_array = Vec::new();
190 let mut target_idx = None;
191 for i in 0..cell_count {
192 let off = header_offset + 8 + (i as usize * 2);
193 let cell_ptr = u16::from_be_bytes([page_data[off], page_data[off + 1]]);
194
195 let (_p_size, consumed) = parse_varint(&page_data[cell_ptr as usize..]);
196 let (existing_row_id, _) = parse_varint(&page_data[cell_ptr as usize + consumed..]);
197
198 if (existing_row_id as i64) == row_id {
199 target_idx = Some(i as usize);
200 } else {
201 pointer_array.push(cell_ptr);
202 }
203 }
204
205 if let Some(_) = target_idx {
206 let new_cell_count = pointer_array.len() as u16;
207 page_data[header_offset + 3..header_offset + 5]
208 .copy_from_slice(&new_cell_count.to_be_bytes());
209
210 for i in 0..cell_count {
212 let off = header_offset + 8 + (i as usize * 2);
213 page_data[off..off + 2].copy_from_slice(&[0, 0]);
214 }
215
216 for (i, &ptr) in pointer_array.iter().enumerate() {
218 let off = header_offset + 8 + (i * 2);
219 page_data[off..off + 2].copy_from_slice(&ptr.to_be_bytes());
220 }
221
222 file.seek(SeekFrom::Start(offset))
223 .await
224 .map_err(|e| DsError::io(e.to_string()))?;
225 file.write_all(&page_data)
226 .await
227 .map_err(|e| DsError::io(e.to_string()))?;
228 Ok(true)
229 } else {
230 Ok(false)
231 }
232 }
233
234 async fn split_table_leaf_page(
236 &self,
237 page_id: u32,
238 row_id: i64,
239 values: &[DsValue],
240 mut path: Vec<u32>,
241 ) -> Result<()> {
242 let mut file = OpenOptions::new()
243 .read(true)
244 .write(true)
245 .open(&self.path)
246 .await
247 .map_err(|e| DsError::io(e.to_string()))?;
248 let page_size = self.get_page_size().await?;
249 let offset = (page_id as u64 - 1) * page_size as u64;
250
251 let mut old_page_data = vec![0u8; page_size as usize];
252 file.seek(SeekFrom::Start(offset))
253 .await
254 .map_err(|e| DsError::io(e.to_string()))?;
255 file.read_exact(&mut old_page_data)
256 .await
257 .map_err(|e| DsError::io(e.to_string()))?;
258
259 let header_offset = if page_id == 1 { 100 } else { 0 };
260 let cell_count = u16::from_be_bytes([
261 old_page_data[header_offset + 3],
262 old_page_data[header_offset + 4],
263 ]);
264
265 let mut all_cells = Vec::new();
266 for i in 0..cell_count {
267 let off = u16::from_be_bytes([
268 old_page_data[header_offset + 8 + i as usize * 2],
269 old_page_data[header_offset + 9 + i as usize * 2],
270 ]) as usize;
271 let (payload_size, consumed) = parse_varint(&old_page_data[off..]);
272 let (rid, consumed2) = parse_varint(&old_page_data[off + consumed..]);
273 let (stored_len, has_overflow) =
274 self.get_stored_payload_info(0x0D, payload_size, page_size);
275 let total_size = consumed + consumed2 + stored_len + if has_overflow { 4 } else { 0 };
276 all_cells.push((rid as i64, old_page_data[off..off + total_size].to_vec()));
277 }
278
279 let record = encode_record(values);
280 let payload_size = record.len() as u64;
281 let (stored_len, has_overflow) =
282 self.get_stored_payload_info(0x0D, payload_size, page_size);
283 let mut new_cell_data = Vec::new();
284 new_cell_data.extend(encode_varint(payload_size));
285 new_cell_data.extend(encode_varint(row_id as u64));
286 new_cell_data.extend_from_slice(&record[..stored_len]);
287 if has_overflow {
288 let overflow_page_id = self.write_overflow_pages(&record[stored_len..]).await?;
289 new_cell_data.extend_from_slice(&overflow_page_id.to_be_bytes());
290 }
291 all_cells.push((row_id, new_cell_data));
292 all_cells.sort_by_key(|c| c.0);
293
294 let new_page_id = self.allocate_page().await?;
295 let mut new_page_data = vec![0u8; page_size as usize];
296 new_page_data[0] = 0x0D;
297
298 let mid_idx = all_cells.len() / 2;
299 let mut updated_old_page_data = vec![0u8; page_size as usize];
300 updated_old_page_data[..header_offset].copy_from_slice(&old_page_data[..header_offset]);
301 updated_old_page_data[header_offset] = 0x0D;
302 let mut old_cell_content_offset = page_size as u16;
303 let mut old_pointer_array = Vec::new();
304 for i in 0..mid_idx {
305 let cell_data = &all_cells[i].1;
306 old_cell_content_offset -= cell_data.len() as u16;
307 updated_old_page_data[old_cell_content_offset as usize
308 ..old_cell_content_offset as usize + cell_data.len()]
309 .copy_from_slice(cell_data);
310 old_pointer_array.push(old_cell_content_offset);
311 }
312 updated_old_page_data[header_offset + 3..header_offset + 5]
313 .copy_from_slice(&(old_pointer_array.len() as u16).to_be_bytes());
314 updated_old_page_data[header_offset + 5..header_offset + 7]
315 .copy_from_slice(&old_cell_content_offset.to_be_bytes());
316 for (i, &ptr) in old_pointer_array.iter().enumerate() {
317 updated_old_page_data[header_offset + 8 + i * 2..header_offset + 10 + i * 2]
318 .copy_from_slice(&ptr.to_be_bytes());
319 }
320
321 let mut new_cell_content_offset = page_size as u16;
322 let mut new_pointer_array = Vec::new();
323 for i in mid_idx..all_cells.len() {
324 let cell_data = &all_cells[i].1;
325 new_cell_content_offset -= cell_data.len() as u16;
326 new_page_data[new_cell_content_offset as usize
327 ..new_cell_content_offset as usize + cell_data.len()]
328 .copy_from_slice(cell_data);
329 new_pointer_array.push(new_cell_content_offset);
330 }
331 new_page_data[3..5].copy_from_slice(&(new_pointer_array.len() as u16).to_be_bytes());
332 new_page_data[5..7].copy_from_slice(&new_cell_content_offset.to_be_bytes());
333 for (i, &ptr) in new_pointer_array.iter().enumerate() {
334 new_page_data[8 + i * 2..10 + i * 2].copy_from_slice(&ptr.to_be_bytes());
335 }
336
337 file.seek(SeekFrom::Start(offset))
338 .await
339 .map_err(|e| DsError::io(e.to_string()))?;
340 file.write_all(&updated_old_page_data)
341 .await
342 .map_err(|e| DsError::io(e.to_string()))?;
343
344 let new_page_offset = (new_page_id as u64 - 1) * page_size as u64;
345 file.seek(SeekFrom::Start(new_page_offset))
346 .await
347 .map_err(|e| DsError::io(e.to_string()))?;
348 file.write_all(&new_page_data)
349 .await
350 .map_err(|e| DsError::io(e.to_string()))?;
351
352 let separator_key = all_cells[mid_idx].0;
353 path.pop();
354 if let Some(parent_id) = path.last() {
355 Box::pin(self.insert_into_table_interior_page(
356 *parent_id,
357 page_id,
358 separator_key,
359 path,
360 ))
361 .await?;
362 } else {
363 self.create_new_table_root(page_id, new_page_id, separator_key)
364 .await?;
365 }
366 Ok(())
367 }
368
369 async fn insert_into_table_interior_page(
371 &self,
372 page_id: u32,
373 left_child: u32,
374 key: i64,
375 path: Vec<u32>,
376 ) -> Result<()> {
377 let mut file = OpenOptions::new()
378 .read(true)
379 .write(true)
380 .open(&self.path)
381 .await
382 .map_err(|e| DsError::io(e.to_string()))?;
383 let page_size = self.get_page_size().await?;
384 let offset = (page_id as u64 - 1) * page_size as u64;
385
386 let mut page_data = vec![0u8; page_size as usize];
387 file.seek(SeekFrom::Start(offset))
388 .await
389 .map_err(|e| DsError::io(e.to_string()))?;
390 file.read_exact(&mut page_data)
391 .await
392 .map_err(|e| DsError::io(e.to_string()))?;
393
394 let header_offset = if page_id == 1 { 100 } else { 0 };
395 let cell_count =
396 u16::from_be_bytes([page_data[header_offset + 3], page_data[header_offset + 4]]);
397 let cell_content_offset =
398 u16::from_be_bytes([page_data[header_offset + 5], page_data[header_offset + 6]]);
399 let cell_content_offset = if cell_content_offset == 0 {
400 page_size as u16
401 } else {
402 cell_content_offset
403 };
404
405 let mut cell_data = Vec::new();
406 cell_data.extend_from_slice(&left_child.to_be_bytes());
407 cell_data.extend(encode_varint(key as u64));
408
409 let pointer_array_end = header_offset + 12 + (cell_count as usize * 2);
410 let free_space = cell_content_offset as usize - pointer_array_end;
411
412 if cell_data.len() + 2 > free_space {
413 return Box::pin(self.split_table_interior_page(page_id, left_child, key, path)).await;
414 }
415
416 let new_cell_offset = cell_content_offset - cell_data.len() as u16;
417 page_data[new_cell_offset as usize..new_cell_offset as usize + cell_data.len()]
418 .copy_from_slice(&cell_data);
419
420 let mut pointer_array = Vec::new();
421 for i in 0..cell_count {
422 let off = header_offset + 12 + (i as usize * 2);
423 pointer_array.push(u16::from_be_bytes([page_data[off], page_data[off + 1]]));
424 }
425
426 let mut insert_pos = cell_count as usize;
427 for i in 0..cell_count {
428 let off = pointer_array[i as usize] as usize;
429 let (existing_key, _) = parse_varint(&page_data[off + 4..]);
430 if existing_key as i64 >= key {
431 insert_pos = i as usize;
432 break;
433 }
434 }
435 pointer_array.insert(insert_pos, new_cell_offset);
436
437 for (i, &ptr) in pointer_array.iter().enumerate() {
438 let off = header_offset + 12 + (i * 2);
439 page_data[off..off + 2].copy_from_slice(&ptr.to_be_bytes());
440 }
441
442 let new_cell_count = cell_count + 1;
443 page_data[header_offset + 3..header_offset + 5]
444 .copy_from_slice(&new_cell_count.to_be_bytes());
445 page_data[header_offset + 5..header_offset + 7]
446 .copy_from_slice(&new_cell_offset.to_be_bytes());
447
448 file.seek(SeekFrom::Start(offset))
449 .await
450 .map_err(|e| DsError::io(e.to_string()))?;
451 file.write_all(&page_data)
452 .await
453 .map_err(|e| DsError::io(e.to_string()))?;
454
455 Ok(())
456 }
457
458 async fn split_table_interior_page(
460 &self,
461 page_id: u32,
462 new_left_child: u32,
463 new_key: i64,
464 mut path: Vec<u32>,
465 ) -> Result<()> {
466 let mut file = OpenOptions::new()
467 .read(true)
468 .write(true)
469 .open(&self.path)
470 .await
471 .map_err(|e| DsError::io(e.to_string()))?;
472 let page_size = self.get_page_size().await?;
473 let offset = (page_id as u64 - 1) * page_size as u64;
474
475 let mut old_page_data = vec![0u8; page_size as usize];
476 file.seek(SeekFrom::Start(offset))
477 .await
478 .map_err(|e| DsError::io(e.to_string()))?;
479 file.read_exact(&mut old_page_data)
480 .await
481 .map_err(|e| DsError::io(e.to_string()))?;
482
483 let header_offset = if page_id == 1 { 100 } else { 0 };
484 let cell_count = u16::from_be_bytes([
485 old_page_data[header_offset + 3],
486 old_page_data[header_offset + 4],
487 ]);
488 let right_most_ptr = u32::from_be_bytes([
489 old_page_data[header_offset + 8],
490 old_page_data[header_offset + 9],
491 old_page_data[header_offset + 10],
492 old_page_data[header_offset + 11],
493 ]);
494
495 let mut all_cells = Vec::new();
496 for i in 0..cell_count {
497 let off = u16::from_be_bytes([
498 old_page_data[header_offset + 12 + i as usize * 2],
499 old_page_data[header_offset + 13 + i as usize * 2],
500 ]) as usize;
501 let (k, consumed) = parse_varint(&old_page_data[off + 4..]);
502 all_cells.push((k as i64, old_page_data[off..off + 4 + consumed].to_vec()));
503 }
504
505 let mut new_cell_data = Vec::new();
506 new_cell_data.extend_from_slice(&new_left_child.to_be_bytes());
507 new_cell_data.extend(encode_varint(new_key as u64));
508 all_cells.push((new_key, new_cell_data));
509 all_cells.sort_by_key(|c| c.0);
510
511 let mid_idx = all_cells.len() / 2;
512 let separator_key = all_cells[mid_idx].0;
513 let new_right_ptr = u32::from_be_bytes([
514 all_cells[mid_idx].1[0],
515 all_cells[mid_idx].1[1],
516 all_cells[mid_idx].1[2],
517 all_cells[mid_idx].1[3],
518 ]);
519
520 let mut updated_old_page_data = vec![0u8; page_size as usize];
521 updated_old_page_data[..header_offset].copy_from_slice(&old_page_data[..header_offset]);
522 updated_old_page_data[header_offset] = 0x05;
523 updated_old_page_data[header_offset + 8..header_offset + 12]
524 .copy_from_slice(&new_right_ptr.to_be_bytes());
525 let mut old_cell_content_offset = page_size as u16;
526 let mut old_pointer_array = Vec::new();
527 for i in 0..mid_idx {
528 let cell_data = &all_cells[i].1;
529 old_cell_content_offset -= cell_data.len() as u16;
530 updated_old_page_data[old_cell_content_offset as usize
531 ..old_cell_content_offset as usize + cell_data.len()]
532 .copy_from_slice(cell_data);
533 old_pointer_array.push(old_cell_content_offset);
534 }
535 updated_old_page_data[header_offset + 3..header_offset + 5]
536 .copy_from_slice(&(old_pointer_array.len() as u16).to_be_bytes());
537 updated_old_page_data[header_offset + 5..header_offset + 7]
538 .copy_from_slice(&old_cell_content_offset.to_be_bytes());
539 for (i, &ptr) in old_pointer_array.iter().enumerate() {
540 updated_old_page_data[header_offset + 12 + i * 2..header_offset + 14 + i * 2]
541 .copy_from_slice(&ptr.to_be_bytes());
542 }
543
544 let new_page_id = self.allocate_page().await?;
545 let mut new_page_data = vec![0u8; page_size as usize];
546 new_page_data[0] = 0x05;
547 new_page_data[8..12].copy_from_slice(&right_most_ptr.to_be_bytes());
548 let mut new_cell_content_offset = page_size as u16;
549 let mut new_pointer_array = Vec::new();
550 for i in mid_idx + 1..all_cells.len() {
551 let cell_data = &all_cells[i].1;
552 new_cell_content_offset -= cell_data.len() as u16;
553 new_page_data[new_cell_content_offset as usize
554 ..new_cell_content_offset as usize + cell_data.len()]
555 .copy_from_slice(cell_data);
556 new_pointer_array.push(new_cell_content_offset);
557 }
558 new_page_data[3..5].copy_from_slice(&(new_pointer_array.len() as u16).to_be_bytes());
559 new_page_data[5..7].copy_from_slice(&new_cell_content_offset.to_be_bytes());
560 for (i, &ptr) in new_pointer_array.iter().enumerate() {
561 new_page_data[12 + i * 2..14 + i * 2].copy_from_slice(&ptr.to_be_bytes());
562 }
563
564 file.seek(SeekFrom::Start(offset))
565 .await
566 .map_err(|e| DsError::io(e.to_string()))?;
567 file.write_all(&updated_old_page_data)
568 .await
569 .map_err(|e| DsError::io(e.to_string()))?;
570
571 let new_page_offset = (new_page_id as u64 - 1) * page_size as u64;
572 file.seek(SeekFrom::Start(new_page_offset))
573 .await
574 .map_err(|e| DsError::io(e.to_string()))?;
575 file.write_all(&new_page_data)
576 .await
577 .map_err(|e| DsError::io(e.to_string()))?;
578
579 path.pop();
580 if let Some(parent_id) = path.last() {
581 Box::pin(self.insert_into_table_interior_page(
582 *parent_id,
583 page_id,
584 separator_key,
585 path,
586 ))
587 .await?;
588 } else {
589 self.create_new_table_root(page_id, new_page_id, separator_key)
590 .await?;
591 }
592 Ok(())
593 }
594
595 async fn create_new_table_root(
597 &self,
598 left_child: u32,
599 right_child: u32,
600 key: i64,
601 ) -> Result<()> {
602 let mut file = OpenOptions::new()
603 .read(true)
604 .write(true)
605 .open(&self.path)
606 .await
607 .map_err(|e| DsError::io(e.to_string()))?;
608 let page_size = self.get_page_size().await?;
609
610 let mut root_page_data = vec![0u8; page_size as usize];
611 file.seek(SeekFrom::Start(0))
612 .await
613 .map_err(|e| DsError::io(e.to_string()))?;
614 file.read_exact(&mut root_page_data)
615 .await
616 .map_err(|e| DsError::io(e.to_string()))?;
617
618 root_page_data[100] = 0x05;
620 root_page_data[101] = 0; root_page_data[102] = 0;
622 root_page_data[103] = 0; root_page_data[104] = 1;
624
625 let mut cell_data = Vec::new();
626 cell_data.extend_from_slice(&left_child.to_be_bytes());
627 cell_data.extend(encode_varint(key as u64));
628
629 let cell_offset = page_size as u16 - cell_data.len() as u16;
630 root_page_data[cell_offset as usize..cell_offset as usize + cell_data.len()]
631 .copy_from_slice(&cell_data);
632 root_page_data[105..107].copy_from_slice(&cell_offset.to_be_bytes());
633 root_page_data[108..112].copy_from_slice(&right_child.to_be_bytes());
634 root_page_data[112..114].copy_from_slice(&cell_offset.to_be_bytes());
635
636 file.seek(SeekFrom::Start(0))
637 .await
638 .map_err(|e| DsError::io(e.to_string()))?;
639 file.write_all(&root_page_data)
640 .await
641 .map_err(|e| DsError::io(e.to_string()))?;
642 Ok(())
643 }
644}