1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4 ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::LogicalFieldId;
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16 constants::CATALOG_ROOT_PKEY,
17 pager::{BatchGet, BatchPut, GetResult, Pager},
18 serialization::{deserialize_array, serialize_array},
19 types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26pub struct ColumnStore<P: Pager> {
27 pub(crate) pager: Arc<P>,
28 pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
29 cfg: ColumnStoreConfig,
30 dtype_cache: DTypeCache<P>,
31 index_manager: IndexManager<P>,
32}
33
34impl<P> ColumnStore<P>
35where
36 P: Pager<Blob = EntryHandle> + Send + Sync,
37{
38 pub fn open(pager: Arc<P>) -> Result<Self> {
39 let cfg = ColumnStoreConfig::default();
40 let catalog = match pager
41 .batch_get(&[BatchGet::Raw {
42 key: CATALOG_ROOT_PKEY,
43 }])?
44 .pop()
45 {
46 Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
47 _ => ColumnCatalog::default(),
48 };
49 let arc_catalog = Arc::new(RwLock::new(catalog));
50
51 let index_manager = IndexManager::new(Arc::clone(&pager));
52
53 Ok(Self {
54 pager: Arc::clone(&pager),
55 catalog: Arc::clone(&arc_catalog),
56 cfg,
57 dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
58 index_manager,
59 })
60 }
61
62 pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
64 self.index_manager.register_index(self, field_id, kind)
65 }
66
67 pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
69 self.index_manager.unregister_index(self, field_id, kind)
70 }
71
72 pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
74 if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
75 return Ok(dt);
76 }
77 self.dtype_cache.dtype_for_field(field_id)
78 }
79
80 pub fn filter_row_ids<T>(
82 &self,
83 field_id: LogicalFieldId,
84 predicate: &Predicate<T::Value>,
85 ) -> Result<Vec<u64>>
86 where
87 T: FilterDispatch,
88 {
89 T::run_filter(self, field_id, predicate)
90 }
91
92 pub fn filter_matches<T, F>(
93 &self,
94 field_id: LogicalFieldId,
95 predicate: F,
96 ) -> Result<FilterResult>
97 where
98 T: FilterPrimitive,
99 F: FnMut(T::Native) -> bool,
100 {
101 T::run_filter_with_result(self, field_id, predicate)
102 }
103
104 pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
106 let catalog = self.catalog.read().unwrap();
107 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
108
109 let desc_blob = self
110 .pager
111 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
112 .pop()
113 .and_then(|r| match r {
114 GetResult::Raw { bytes, .. } => Some(bytes),
115 _ => None,
116 })
117 .ok_or(Error::NotFound)?;
118 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
119
120 let kinds = descriptor.get_indexes()?;
121 Ok(kinds)
122 }
123
124 pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
130 let catalog = self.catalog.read().unwrap();
131 let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
132 drop(catalog);
133
134 let desc_blob = self
135 .pager
136 .batch_get(&[BatchGet::Raw { key: desc_pk }])?
137 .pop()
138 .and_then(|r| match r {
139 GetResult::Raw { bytes, .. } => Some(bytes),
140 _ => None,
141 })
142 .ok_or(Error::NotFound)?;
143
144 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
145 Ok(desc.total_row_count)
146 }
147
148 pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
155 use crate::types::Namespace;
156 let catalog = self.catalog.read().unwrap();
158 let candidates: Vec<LogicalFieldId> = catalog
160 .map
161 .keys()
162 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
163 .copied()
164 .collect();
165 drop(catalog);
166
167 if candidates.is_empty() {
168 return Ok(0);
169 }
170
171 let mut max_rows: u64 = 0;
173 for field in candidates {
174 let rows = self.total_rows_for_field(field)?;
175 if rows > max_rows {
176 max_rows = rows;
177 }
178 }
179 Ok(max_rows)
180 }
181
182 pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
184 use crate::types::Namespace;
185
186 let catalog = self.catalog.read().unwrap();
187 catalog
188 .map
189 .keys()
190 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
191 .copied()
192 .collect()
193 }
194
195 pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: u64) -> Result<bool> {
198 let rid_fid = rowid_fid(field_id);
199 let catalog = self.catalog.read().unwrap();
200 let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
201 let rid_desc_blob = self
202 .pager
203 .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
204 .pop()
205 .and_then(|r| match r {
206 GetResult::Raw { bytes, .. } => Some(bytes),
207 _ => None,
208 })
209 .ok_or(Error::NotFound)?;
210 let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
211 drop(catalog);
212
213 for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
215 let meta = m?;
216 if meta.row_count == 0 {
217 continue;
218 }
219 if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
220 || row_id > meta.max_val_u64
221 {
222 continue;
223 }
224 let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
226 if meta.value_order_perm_pk != 0 {
227 gets.push(BatchGet::Raw {
228 key: meta.value_order_perm_pk,
229 });
230 }
231 let results = self.pager.batch_get(&gets)?;
232 let mut rid_blob: Option<EntryHandle> = None;
233 let mut perm_blob: Option<EntryHandle> = None;
234 for r in results {
235 if let GetResult::Raw { key, bytes } = r {
236 if key == meta.chunk_pk {
237 rid_blob = Some(bytes);
238 } else if key == meta.value_order_perm_pk {
239 perm_blob = Some(bytes);
240 }
241 }
242 }
243 let Some(rid_blob) = rid_blob else { continue };
245 let rid_any = deserialize_array(rid_blob)?;
246 let rids = rid_any
247 .as_any()
248 .downcast_ref::<UInt64Array>()
249 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
250 if let Some(pblob) = perm_blob {
251 let perm_any = deserialize_array(pblob)?;
252 let perm = perm_any
253 .as_any()
254 .downcast_ref::<UInt32Array>()
255 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
256 let mut lo: isize = 0;
258 let mut hi: isize = (perm.len() as isize) - 1;
259 while lo <= hi {
260 let mid = ((lo + hi) >> 1) as usize;
261 let rid = rids.value(perm.value(mid) as usize);
262 if rid == row_id {
263 return Ok(true);
264 } else if rid < row_id {
265 lo = mid as isize + 1;
266 } else {
267 hi = mid as isize - 1;
268 }
269 }
270 } else {
271 let mut lo: isize = 0;
273 let mut hi: isize = (rids.len() as isize) - 1;
274 while lo <= hi {
275 let mid = ((lo + hi) >> 1) as usize;
276 let rid = rids.value(mid);
277 if rid == row_id {
278 return Ok(true);
279 } else if rid < row_id {
280 lo = mid as isize + 1;
281 } else {
282 hi = mid as isize - 1;
283 }
284 }
285 }
286 }
287 Ok(false)
288 }
289
290 #[allow(unused_variables, unused_assignments)] pub fn append(&self, batch: &RecordBatch) -> Result<()> {
332 let working_batch: RecordBatch;
338 let batch_ref = {
339 let schema = batch.schema();
340 let row_id_idx = schema
341 .index_of(ROW_ID_COLUMN_NAME)
342 .map_err(|_| Error::Internal("row_id column required".into()))?;
343 let row_id_any = batch.column(row_id_idx).clone();
344 let row_id_arr = row_id_any
345 .as_any()
346 .downcast_ref::<UInt64Array>()
347 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
348
349 let mut is_sorted = true;
351 if !row_id_arr.is_empty() {
352 let mut last = row_id_arr.value(0);
353 for i in 1..row_id_arr.len() {
354 let current = row_id_arr.value(i);
355 if current < last {
356 is_sorted = false;
357 break;
358 }
359 last = current;
360 }
361 }
362
363 if is_sorted {
366 batch
367 } else {
368 let sort_col = SortColumn {
369 values: row_id_any,
370 options: None,
371 };
372 let idx = lexsort_to_indices(&[sort_col], None)?;
373 let perm = idx
374 .as_any()
375 .downcast_ref::<UInt32Array>()
376 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
377 let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
378 for i in 0..batch.num_columns() {
379 cols.push(compute::take(batch.column(i), perm, None)?);
380 }
381 working_batch = RecordBatch::try_new(schema.clone(), cols)
382 .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
383 &working_batch
384 }
385 };
386
387 let schema = batch_ref.schema();
392 let row_id_idx = schema
393 .index_of(ROW_ID_COLUMN_NAME)
394 .map_err(|_| Error::Internal("row_id column required".into()))?;
395
396 let row_id_arr = batch_ref
398 .column(row_id_idx)
399 .as_any()
400 .downcast_ref::<UInt64Array>()
401 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
402 let mut incoming_ids_map = FxHashMap::default();
403 incoming_ids_map.reserve(row_id_arr.len());
404 for i in 0..row_id_arr.len() {
405 incoming_ids_map.insert(row_id_arr.value(i), i);
406 }
407
408 let mut catalog_dirty = false;
410 let mut puts_rewrites: Vec<BatchPut> = Vec::new();
411 let mut all_rewritten_ids = FxHashSet::default();
412
413 let mut catalog_lock = self.catalog.write().unwrap();
415 for i in 0..batch_ref.num_columns() {
416 if i == row_id_idx {
417 continue;
418 }
419 let field = schema.field(i);
420 if let Some(field_id_str) = field.metadata().get("field_id") {
421 let field_id = field_id_str
422 .parse::<u64>()
423 .map(LogicalFieldId::from)
424 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
425
426 let rewritten = self.lww_rewrite_for_field(
429 &mut catalog_lock,
430 field_id,
431 &incoming_ids_map,
432 batch_ref.column(i),
433 batch_ref.column(row_id_idx),
434 &mut puts_rewrites,
435 )?;
436 all_rewritten_ids.extend(rewritten);
437 }
438 }
439 drop(catalog_lock);
440
441 if !puts_rewrites.is_empty() {
443 self.pager.batch_put(&puts_rewrites)?;
444 }
445
446 let batch_to_append = if !all_rewritten_ids.is_empty() {
450 let keep_mask: Vec<bool> = (0..row_id_arr.len())
451 .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
452 .collect();
453 let keep_array = BooleanArray::from(keep_mask);
454 compute::filter_record_batch(batch_ref, &keep_array)?
455 } else {
456 batch_ref.clone()
457 };
458
459 if batch_to_append.num_rows() == 0 {
461 return Ok(());
462 }
463
464 let append_schema = batch_to_append.schema();
468 let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
469 let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
470 let mut puts_appends: Vec<BatchPut> = Vec::new();
471
472 for (i, array) in batch_to_append.columns().iter().enumerate() {
474 if i == append_row_id_idx {
475 continue;
476 }
477
478 let field = append_schema.field(i);
479 let field_id = field
480 .metadata()
481 .get("field_id")
482 .ok_or_else(|| Error::Internal("Missing field_id".into()))?
483 .parse::<u64>()
484 .map(LogicalFieldId::from)
485 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
486
487 self.dtype_cache.insert(field_id, field.data_type().clone());
490
491 let (array_clean, rids_clean) = if array.null_count() == 0 {
494 (array.clone(), append_row_id_any.clone())
495 } else {
496 let keep =
497 BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
498 let a = compute::filter(array, &keep)?;
499 let r = compute::filter(&append_row_id_any, &keep)?;
500 (a, r)
501 };
502
503 if array_clean.is_empty() {
504 continue;
505 }
506
507 let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
510 let mut catalog = self.catalog.write().unwrap();
511 let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
512 catalog_dirty = true;
513 self.pager.alloc_many(1).unwrap()[0]
514 });
515 let r_fid = rowid_fid(field_id);
516 let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
517 catalog_dirty = true;
518 self.pager.alloc_many(1).unwrap()[0]
519 });
520 (pk1, pk2, r_fid)
521 };
522
523 let (mut data_descriptor, mut data_tail_page) =
526 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
527 let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
528 Arc::clone(&self.pager),
529 rid_descriptor_pk,
530 rid_fid,
531 )?;
532
533 self.index_manager
537 .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
538
539 let slices = split_to_target_bytes(
541 &array_clean,
542 TARGET_CHUNK_BYTES,
543 self.cfg.varwidth_fallback_rows_per_slice,
544 );
545 let mut row_off = 0usize;
546
547 for s in slices {
549 let rows = s.len();
550 let data_pk = self.pager.alloc_many(1)?[0];
552 let s_norm = zero_offset(&s);
553 let data_bytes = serialize_array(s_norm.as_ref())?;
554 puts_appends.push(BatchPut::Raw {
555 key: data_pk,
556 bytes: data_bytes,
557 });
558
559 let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
561 let rid_norm = zero_offset(&rid_slice);
562 let rid_pk = self.pager.alloc_many(1)?[0];
563 let rid_bytes = serialize_array(rid_norm.as_ref())?;
564 puts_appends.push(BatchPut::Raw {
565 key: rid_pk,
566 bytes: rid_bytes,
567 });
568
569 let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
571 let (min, max) = if !rids_for_meta.is_empty() {
572 let mut min_val = rids_for_meta.value(0);
573 let mut max_val = rids_for_meta.value(0);
574 for i in 1..rids_for_meta.len() {
575 let v = rids_for_meta.value(i);
576 if v < min_val {
577 min_val = v;
578 }
579 if v > max_val {
580 max_val = v;
581 }
582 }
583 (min_val, max_val)
584 } else {
585 (0, 0)
586 };
587
588 let mut data_meta = ChunkMetadata {
591 chunk_pk: data_pk,
592 row_count: rows as u64,
593 serialized_bytes: s_norm.get_array_memory_size() as u64,
594 max_val_u64: u64::MAX,
595 ..Default::default()
596 };
597 let mut rid_meta = ChunkMetadata {
598 chunk_pk: rid_pk,
599 row_count: rows as u64,
600 serialized_bytes: rid_norm.get_array_memory_size() as u64,
601 min_val_u64: min,
602 max_val_u64: max,
603 ..Default::default()
604 };
605
606 self.index_manager.stage_updates_for_new_chunk(
612 field_id,
613 &data_descriptor,
614 &s_norm,
615 &rid_norm,
616 &mut data_meta,
617 &mut rid_meta,
618 &mut puts_appends,
619 )?;
620
621 self.append_meta_in_loop(
623 &mut data_descriptor,
624 &mut data_tail_page,
625 data_meta,
626 &mut puts_appends,
627 )?;
628 self.append_meta_in_loop(
629 &mut rid_descriptor,
630 &mut rid_tail_page,
631 rid_meta,
632 &mut puts_appends,
633 )?;
634 row_off += rows;
635 }
636
637 puts_appends.push(BatchPut::Raw {
640 key: data_descriptor.tail_page_pk,
641 bytes: data_tail_page,
642 });
643 puts_appends.push(BatchPut::Raw {
644 key: descriptor_pk,
645 bytes: data_descriptor.to_le_bytes(),
646 });
647 puts_appends.push(BatchPut::Raw {
648 key: rid_descriptor.tail_page_pk,
649 bytes: rid_tail_page,
650 });
651 puts_appends.push(BatchPut::Raw {
652 key: rid_descriptor_pk,
653 bytes: rid_descriptor.to_le_bytes(),
654 });
655 }
656
657 if catalog_dirty {
660 let catalog = self.catalog.read().unwrap();
661 puts_appends.push(BatchPut::Raw {
662 key: CATALOG_ROOT_PKEY,
663 bytes: catalog.to_bytes(),
664 });
665 }
666
667 if !puts_appends.is_empty() {
671 self.pager.batch_put(&puts_appends)?;
672 }
673 Ok(())
674 }
675 fn lww_rewrite_for_field(
676 &self,
677 catalog: &mut ColumnCatalog,
678 field_id: LogicalFieldId,
679 incoming_ids_map: &FxHashMap<u64, usize>,
680 incoming_data: &ArrayRef,
681 incoming_row_ids: &ArrayRef,
682 puts: &mut Vec<BatchPut>,
683 ) -> Result<FxHashSet<u64>> {
684 use crate::store::descriptor::DescriptorIterator;
685 use crate::store::ingest::ChunkEdit;
686
687 if incoming_ids_map.is_empty() {
689 return Ok(FxHashSet::default());
690 }
691 let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
692
693 let desc_pk_data = match catalog.map.get(&field_id) {
695 Some(pk) => *pk,
696 None => return Ok(FxHashSet::default()),
697 };
698 let rid_fid = rowid_fid(field_id);
699 let desc_pk_rid = match catalog.map.get(&rid_fid) {
700 Some(pk) => *pk,
701 None => return Ok(FxHashSet::default()),
702 };
703
704 let gets = vec![
706 BatchGet::Raw { key: desc_pk_data },
707 BatchGet::Raw { key: desc_pk_rid },
708 ];
709 let results = self.pager.batch_get(&gets)?;
710 let mut blobs_by_pk = FxHashMap::default();
711 for r in results {
712 if let GetResult::Raw { key, bytes } = r {
713 blobs_by_pk.insert(key, bytes);
714 }
715 }
716
717 let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or(Error::NotFound)?;
718 let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
719
720 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
721 let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
722
723 let mut metas_data: Vec<ChunkMetadata> = Vec::new();
725 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
726 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
727 metas_data.push(m?);
728 }
729 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
730 metas_rid.push(m?);
731 }
732
733 let rid_in = incoming_row_ids
735 .as_any()
736 .downcast_ref::<UInt64Array>()
737 .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
738 let mut ids_to_delete = FxHashSet::default();
739 let mut ids_to_upsert = FxHashSet::default();
740 for i in 0..rid_in.len() {
741 let rid = rid_in.value(i);
742 if incoming_data.is_null(i) {
743 ids_to_delete.insert(rid);
744 } else {
745 ids_to_upsert.insert(rid);
746 }
747 }
748
749 let mut rewritten_ids = FxHashSet::default();
751 let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
752 let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
753
754 let n = metas_data.len().min(metas_rid.len());
755 if n > 0 {
756 let mut gets_rid = Vec::with_capacity(n);
758 for rm in metas_rid.iter().take(n) {
759 gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
760 }
761 let rid_results = self.pager.batch_get(&gets_rid)?;
762 let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
763 for r in rid_results {
764 if let GetResult::Raw { key, bytes } = r {
765 rid_blobs.insert(key, bytes);
766 }
767 }
768
769 for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
770 if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
771 let rid_arr_any = deserialize_array(rid_blob.clone())?;
772 let rid_arr = rid_arr_any
773 .as_any()
774 .downcast_ref::<UInt64Array>()
775 .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
776 for j in 0..rid_arr.len() {
777 let rid = rid_arr.value(j);
778 if incoming_ids.contains(&rid) {
779 if ids_to_delete.contains(&rid) {
780 hit_del.entry(i).or_default().push(rid);
781 } else if ids_to_upsert.contains(&rid) {
782 hit_up.entry(i).or_default().push(rid);
783 }
784 rewritten_ids.insert(rid);
785 }
786 }
787 }
788 }
789 }
790
791 if hit_up.is_empty() && hit_del.is_empty() {
792 return Ok(rewritten_ids);
793 }
794
795 let mut hit_set = FxHashSet::default();
797 hit_set.extend(hit_up.keys().copied());
798 hit_set.extend(hit_del.keys().copied());
799 let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
800
801 let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
802 for &i in &hit_idxs {
803 gets.push(BatchGet::Raw {
804 key: metas_data[i].chunk_pk,
805 });
806 gets.push(BatchGet::Raw {
807 key: metas_rid[i].chunk_pk,
808 });
809 }
810 let results = self.pager.batch_get(&gets)?;
811 let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
812 for r in results {
813 if let GetResult::Raw { key, bytes } = r {
814 blob_map.insert(key, bytes);
815 }
816 }
817
818 for i in hit_idxs {
820 let old_data_arr =
821 deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
822 let old_rid_arr_any =
823 deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
824 let old_rid_arr = old_rid_arr_any
825 .as_any()
826 .downcast_ref::<UInt64Array>()
827 .unwrap();
828
829 let up_vec = hit_up.remove(&i).unwrap_or_default();
830 let del_vec = hit_del.remove(&i).unwrap_or_default();
831
832 let edit = ChunkEdit::from_lww_upsert(
834 old_rid_arr,
835 &up_vec,
836 &del_vec,
837 incoming_data,
838 incoming_row_ids,
839 incoming_ids_map,
840 )?;
841
842 let (new_data_arr, new_rid_arr) =
843 ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
844
845 let data_bytes = serialize_array(&new_data_arr)?;
847 puts.push(BatchPut::Raw {
848 key: metas_data[i].chunk_pk,
849 bytes: data_bytes,
850 });
851 metas_data[i].row_count = new_data_arr.len() as u64;
852 metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
853
854 if let Some(rarr) = new_rid_arr {
856 let rid_bytes = serialize_array(&rarr)?;
857 puts.push(BatchPut::Raw {
858 key: metas_rid[i].chunk_pk,
859 bytes: rid_bytes,
860 });
861 metas_rid[i].row_count = rarr.len() as u64;
862 metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
863 }
864
865 if metas_data[i].value_order_perm_pk != 0 {
867 let sort_col = SortColumn {
868 values: new_data_arr,
869 options: None,
870 };
871 let idx = lexsort_to_indices(&[sort_col], None)?;
872 let perm_bytes = serialize_array(&idx)?;
873 puts.push(BatchPut::Raw {
874 key: metas_data[i].value_order_perm_pk,
875 bytes: perm_bytes,
876 });
877 }
878 }
879
880 descriptor_data.rewrite_pages(
882 Arc::clone(&self.pager),
883 desc_pk_data,
884 &mut metas_data,
885 puts,
886 )?;
887 descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
888
889 Ok(rewritten_ids)
890 }
891
892 pub fn delete_rows<I>(&self, field_id: LogicalFieldId, rows_to_delete: I) -> Result<()>
900 where
901 I: IntoIterator<Item = u64>,
902 {
903 use crate::store::descriptor::DescriptorIterator;
904 use crate::store::ingest::ChunkEdit;
905
906 let mut del_iter = rows_to_delete.into_iter();
908 let mut cur_del = del_iter.next();
909 let mut last_seen: Option<u64> = None;
910 if let Some(v) = cur_del {
911 last_seen = Some(v);
912 }
913
914 let catalog = self.catalog.read().unwrap();
916 let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
917 let rid_fid = rowid_fid(field_id);
918 let desc_pk_rid = catalog.map.get(&rid_fid).copied();
919
920 let mut gets = vec![BatchGet::Raw { key: desc_pk }];
922 if let Some(pk) = desc_pk_rid {
923 gets.push(BatchGet::Raw { key: pk });
924 }
925 let results = self.pager.batch_get(&gets)?;
926 let mut blobs_by_pk = FxHashMap::default();
927 for res in results {
928 if let GetResult::Raw { key, bytes } = res {
929 blobs_by_pk.insert(key, bytes);
930 }
931 }
932
933 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
934 let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
935
936 let mut metas: Vec<ChunkMetadata> = Vec::new();
938 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
939 metas.push(m?);
940 }
941 if metas.is_empty() {
942 drop(catalog);
943 self.compact_field_bounded(field_id)?;
944 return Ok(());
945 }
946
947 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
949 let mut descriptor_rid: Option<ColumnDescriptor> = None;
950 if let Some(pk_rid) = desc_pk_rid
951 && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
952 {
953 let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
954 for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
955 metas_rid.push(m?);
956 }
957 descriptor_rid = Some(d_rid);
958 }
959
960 let mut puts = Vec::new();
961 let mut cum_rows = 0u64;
962
963 for (i, meta) in metas.iter_mut().enumerate() {
964 let start_u64 = cum_rows;
965 let end_u64 = start_u64 + meta.row_count;
966
967 while let Some(d) = cur_del {
969 if d < start_u64
970 && let Some(prev) = last_seen
971 {
972 if d < prev {
973 return Err(Error::Internal(
974 "rows_to_delete must be ascending/unique".into(),
975 ));
976 }
977
978 last_seen = Some(d);
979 cur_del = del_iter.next();
980 } else {
981 break;
982 }
983 }
984
985 let rows = meta.row_count as usize;
987 let mut del_local: FxHashSet<usize> = FxHashSet::default();
988 while let Some(d) = cur_del {
989 if d >= end_u64 {
990 break;
991 }
992 del_local.insert((d - start_u64) as usize);
993 last_seen = Some(d);
994 cur_del = del_iter.next();
995 }
996
997 if del_local.is_empty() {
998 cum_rows = end_u64;
999 continue;
1000 }
1001
1002 let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1004 if let Some(rm) = metas_rid.get(i) {
1005 chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1006 }
1007 let chunk_results = self.pager.batch_get(&chunk_gets)?;
1008 let mut chunk_blobs = FxHashMap::default();
1009 for res in chunk_results {
1010 if let GetResult::Raw { key, bytes } = res {
1011 chunk_blobs.insert(key, bytes);
1012 }
1013 }
1014
1015 let data_blob = chunk_blobs.remove(&meta.chunk_pk).ok_or(Error::NotFound)?;
1016 let data_arr = deserialize_array(data_blob)?;
1017
1018 let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1019 let rid_blob = chunk_blobs.remove(&rm.chunk_pk).ok_or(Error::NotFound)?;
1020 Some(deserialize_array(rid_blob)?)
1021 } else {
1022 None
1023 };
1024
1025 let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1027
1028 let (new_data_arr, new_rid_arr) =
1030 ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1031
1032 let data_bytes = serialize_array(&new_data_arr)?;
1034 puts.push(BatchPut::Raw {
1035 key: meta.chunk_pk,
1036 bytes: data_bytes,
1037 });
1038 meta.row_count = new_data_arr.len() as u64;
1039 meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1040
1041 if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1043 let rm = metas_rid.get_mut(i).unwrap();
1044 let rid_bytes = serialize_array(&rids)?;
1045 puts.push(BatchPut::Raw {
1046 key: rm.chunk_pk,
1047 bytes: rid_bytes,
1048 });
1049 rm.row_count = rids.len() as u64;
1050 rm.serialized_bytes = rids.get_array_memory_size() as u64;
1051 }
1052
1053 if meta.value_order_perm_pk != 0 {
1055 let sort_column = SortColumn {
1056 values: new_data_arr,
1057 options: None,
1058 };
1059 let indices = lexsort_to_indices(&[sort_column], None)?;
1060 let perm_bytes = serialize_array(&indices)?;
1061 puts.push(BatchPut::Raw {
1062 key: meta.value_order_perm_pk,
1063 bytes: perm_bytes,
1064 });
1065 }
1066
1067 cum_rows = end_u64;
1068 }
1069
1070 descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, &mut puts)?;
1072 if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1073 rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, &mut puts)?;
1074 }
1075 if !puts.is_empty() {
1076 self.pager.batch_put(&puts)?;
1077 }
1078
1079 drop(catalog);
1080 self.compact_field_bounded(field_id)
1081 }
1082
1083 pub(crate) fn write_descriptor_chain(
1087 &self,
1088 descriptor_pk: PhysicalKey,
1089 descriptor: &mut ColumnDescriptor,
1090 new_metas: &[ChunkMetadata],
1091 puts: &mut Vec<BatchPut>,
1092 frees: &mut Vec<PhysicalKey>,
1093 ) -> Result<()> {
1094 let mut old_pages = Vec::new();
1096 let mut pk = descriptor.head_page_pk;
1097 while pk != 0 {
1098 let page_blob = self
1099 .pager
1100 .batch_get(&[BatchGet::Raw { key: pk }])?
1101 .pop()
1102 .and_then(|res| match res {
1103 GetResult::Raw { bytes, .. } => Some(bytes),
1104 _ => None,
1105 })
1106 .ok_or(Error::NotFound)?;
1107 let header = DescriptorPageHeader::from_le_bytes(
1108 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1109 );
1110 old_pages.push(pk);
1111 pk = header.next_page_pk;
1112 }
1113
1114 let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1116 let need_pages = if new_metas.is_empty() {
1117 0
1118 } else {
1119 new_metas.len().div_ceil(per)
1120 };
1121 if need_pages == 0 {
1123 frees.extend(old_pages.iter().copied());
1124 descriptor.total_row_count = 0;
1125 descriptor.total_chunk_count = 0;
1126 puts.push(BatchPut::Raw {
1127 key: descriptor_pk,
1128 bytes: descriptor.to_le_bytes(),
1129 });
1130 return Ok(());
1131 }
1132
1133 let mut pages = Vec::with_capacity(need_pages);
1135 if !old_pages.is_empty() {
1136 pages.push(old_pages[0]);
1137 } else {
1138 pages.push(self.pager.alloc_many(1)?[0]);
1139 descriptor.head_page_pk = pages[0];
1140 }
1141 if need_pages > pages.len() {
1142 let extra = self.pager.alloc_many(need_pages - pages.len())?;
1143 pages.extend(extra);
1144 }
1145
1146 if old_pages.len() > need_pages {
1148 frees.extend(old_pages[need_pages..].iter().copied());
1149 }
1150
1151 let mut off = 0usize;
1153 for (i, page_pk) in pages.iter().copied().enumerate() {
1154 let remain = new_metas.len() - off;
1155 let count = remain.min(per);
1156 let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1157 let header = DescriptorPageHeader {
1158 next_page_pk: next,
1159 entry_count: count as u32,
1160 _padding: [0; 4],
1161 };
1162 let mut page_bytes = header.to_le_bytes().to_vec();
1163 for m in &new_metas[off..off + count] {
1164 page_bytes.extend_from_slice(&m.to_le_bytes());
1165 }
1166 puts.push(BatchPut::Raw {
1167 key: page_pk,
1168 bytes: page_bytes,
1169 });
1170 off += count;
1171 }
1172
1173 descriptor.tail_page_pk = *pages.last().unwrap();
1174 descriptor.total_chunk_count = new_metas.len() as u64;
1175 descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1176 puts.push(BatchPut::Raw {
1177 key: descriptor_pk,
1178 bytes: descriptor.to_le_bytes(),
1179 });
1180 Ok(())
1181 }
1182
1183 fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1187 let mut catalog = self.catalog.write().unwrap();
1189
1190 let desc_pk = match catalog.map.get(&field_id) {
1191 Some(&pk) => pk,
1192 None => return Ok(()),
1193 };
1194 let rid_fid = rowid_fid(field_id);
1195 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1196 Some(&pk) => pk,
1197 None => return Ok(()),
1198 };
1199
1200 let gets = vec![
1202 BatchGet::Raw { key: desc_pk },
1203 BatchGet::Raw { key: desc_pk_rid },
1204 ];
1205 let results = self.pager.batch_get(&gets)?;
1206 let mut blobs_by_pk = FxHashMap::default();
1207 for res in results {
1208 if let GetResult::Raw { key, bytes } = res {
1209 blobs_by_pk.insert(key, bytes);
1210 }
1211 }
1212 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1213 let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1214 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1215 let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1216
1217 let mut metas = Vec::new();
1219 for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1220 metas.push(m?);
1221 }
1222 let mut metas_rid = Vec::new();
1223 for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1224 metas_rid.push(m?);
1225 }
1226 if metas.is_empty() || metas_rid.is_empty() {
1227 return Ok(());
1228 }
1229
1230 let mut puts: Vec<BatchPut> = Vec::new();
1231 let mut frees: Vec<PhysicalKey> = Vec::new();
1232 let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1233 let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1234
1235 let mut i = 0usize;
1236 while i < metas.len() {
1237 let sz = metas[i].serialized_bytes as usize;
1238 if sz >= MIN_CHUNK_BYTES {
1240 new_metas.push(metas[i]);
1241 new_rid_metas.push(metas_rid[i]);
1242 i += 1;
1243 continue;
1244 }
1245
1246 let mut j = i;
1248 let mut run_bytes = 0usize;
1249 while j < metas.len() {
1250 let b = metas[j].serialized_bytes as usize;
1251 if b >= TARGET_CHUNK_BYTES {
1252 break;
1253 }
1254 if run_bytes + b > MAX_MERGE_RUN_BYTES {
1255 break;
1256 }
1257 run_bytes += b;
1258 j += 1;
1259 }
1260 if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1261 new_metas.push(metas[i]);
1262 new_rid_metas.push(metas_rid[i]);
1263 i += 1;
1264 continue;
1265 }
1266
1267 let mut gets = Vec::with_capacity((j - i) * 2);
1269 for k in i..j {
1270 gets.push(BatchGet::Raw {
1271 key: metas[k].chunk_pk,
1272 });
1273 gets.push(BatchGet::Raw {
1274 key: metas_rid[k].chunk_pk,
1275 });
1276 }
1277 let results = self.pager.batch_get(&gets)?;
1278 let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1279 for r in results {
1280 match r {
1281 GetResult::Raw { key, bytes } => {
1282 by_pk.insert(key, bytes);
1283 }
1284 _ => return Err(Error::NotFound),
1285 }
1286 }
1287 let mut data_parts = Vec::with_capacity(j - i);
1288 let mut rid_parts = Vec::with_capacity(j - i);
1289 for k in i..j {
1290 let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1291 data_parts.push(deserialize_array(db.clone())?);
1292 let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1293 rid_parts.push(deserialize_array(rb.clone())?);
1294 }
1295 let merged_data = concat_many(data_parts.iter().collect())?;
1296 let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1297
1298 let slices = split_to_target_bytes(
1300 &merged_data,
1301 TARGET_CHUNK_BYTES,
1302 self.cfg.varwidth_fallback_rows_per_slice,
1303 );
1304 let mut rid_off = 0usize;
1305 let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1306
1307 for s in slices {
1308 let rows = s.len();
1309 let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1311 let rid_norm = zero_offset(&rid_ref);
1312 let rid_pk = self.pager.alloc_many(1)?[0];
1313 let rid_bytes = serialize_array(rid_norm.as_ref())?;
1314
1315 let data_pk = self.pager.alloc_many(1)?[0];
1316 let s_norm = zero_offset(&s);
1317 let data_bytes = serialize_array(s_norm.as_ref())?;
1318 puts.push(BatchPut::Raw {
1319 key: data_pk,
1320 bytes: data_bytes,
1321 });
1322 puts.push(BatchPut::Raw {
1323 key: rid_pk,
1324 bytes: rid_bytes,
1325 });
1326 let mut meta = ChunkMetadata {
1327 chunk_pk: data_pk,
1328 row_count: rows as u64,
1329 serialized_bytes: s_norm.get_array_memory_size() as u64,
1330 max_val_u64: u64::MAX,
1331 ..Default::default()
1332 };
1333 if need_perms {
1335 let sort_col = SortColumn {
1336 values: s.clone(),
1337 options: None,
1338 };
1339 let idx = lexsort_to_indices(&[sort_col], None)?;
1340 let perm_bytes = serialize_array(&idx)?;
1341 let perm_pk = self.pager.alloc_many(1)?[0];
1342 puts.push(BatchPut::Raw {
1343 key: perm_pk,
1344 bytes: perm_bytes,
1345 });
1346 meta.value_order_perm_pk = perm_pk;
1347 }
1348
1349 let rid_any = rid_norm.clone();
1351 let rids = rid_any
1352 .as_any()
1353 .downcast_ref::<UInt64Array>()
1354 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1355 let mut min = u64::MAX;
1356 let mut max = 0u64;
1357 let mut sorted_rids = true;
1358 let mut last_v = 0u64;
1359 for ii in 0..rids.len() {
1360 let v = rids.value(ii);
1361 if ii == 0 {
1362 last_v = v;
1363 } else if v < last_v {
1364 sorted_rids = false;
1365 } else {
1366 last_v = v;
1367 }
1368 if v < min {
1369 min = v;
1370 }
1371 if v > max {
1372 max = v;
1373 }
1374 }
1375 let mut rid_perm_pk = 0u64;
1376 if !sorted_rids {
1377 let rid_sort_col = SortColumn {
1378 values: rid_any,
1379 options: None,
1380 };
1381 let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1382 let rid_perm_bytes = serialize_array(&rid_idx)?;
1383 rid_perm_pk = self.pager.alloc_many(1)?[0];
1384 puts.push(BatchPut::Raw {
1385 key: rid_perm_pk,
1386 bytes: rid_perm_bytes,
1387 });
1388 }
1389 let rid_meta = ChunkMetadata {
1390 chunk_pk: rid_pk,
1391 value_order_perm_pk: rid_perm_pk,
1392 row_count: rows as u64,
1393 serialized_bytes: rid_norm.get_array_memory_size() as u64,
1394 min_val_u64: if rows > 0 { min } else { 0 },
1395 max_val_u64: if rows > 0 { max } else { 0 },
1396 };
1397 new_metas.push(meta);
1398 new_rid_metas.push(rid_meta);
1399 rid_off += rows;
1400 }
1401
1402 for k in i..j {
1404 frees.push(metas[k].chunk_pk);
1405 if metas[k].value_order_perm_pk != 0 {
1406 frees.push(metas[k].value_order_perm_pk);
1407 }
1408 frees.push(metas_rid[k].chunk_pk);
1409 if metas_rid[k].value_order_perm_pk != 0 {
1410 frees.push(metas_rid[k].value_order_perm_pk);
1411 }
1412 }
1413
1414 i = j;
1415 }
1416
1417 if new_metas.is_empty() {
1419 self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1421 self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1422 catalog.map.remove(&field_id);
1423 catalog.map.remove(&rid_fid);
1424 puts.push(BatchPut::Raw {
1425 key: CATALOG_ROOT_PKEY,
1426 bytes: catalog.to_bytes(),
1427 });
1428 if !puts.is_empty() {
1429 self.pager.batch_put(&puts)?;
1430 }
1431 if !frees.is_empty() {
1432 self.pager.free_many(&frees)?;
1433 }
1434 return Ok(());
1435 }
1436
1437 self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1439 self.write_descriptor_chain(
1440 desc_pk_rid,
1441 &mut desc_rid,
1442 &new_rid_metas,
1443 &mut puts,
1444 &mut frees,
1445 )?;
1446 if !puts.is_empty() {
1448 self.pager.batch_put(&puts)?;
1449 }
1450 if !frees.is_empty() {
1451 self.pager.free_many(&frees)?;
1452 }
1453
1454 Ok(())
1455 }
1456
1457 fn append_meta_in_loop(
1460 &self,
1461 descriptor: &mut ColumnDescriptor,
1462 tail_page_bytes: &mut Vec<u8>,
1463 meta: ChunkMetadata,
1464 puts: &mut Vec<BatchPut>,
1465 ) -> Result<()> {
1466 let mut header = DescriptorPageHeader::from_le_bytes(
1467 &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1468 );
1469 if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1470 && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1471 {
1472 tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1474 header.entry_count += 1;
1475 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1476 .copy_from_slice(&header.to_le_bytes());
1477 } else {
1478 let new_tail_pk = self.pager.alloc_many(1)?[0];
1480 header.next_page_pk = new_tail_pk;
1481 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1482 .copy_from_slice(&header.to_le_bytes());
1483 let full_page_to_write = std::mem::take(tail_page_bytes);
1486 puts.push(BatchPut::Raw {
1487 key: descriptor.tail_page_pk,
1488 bytes: full_page_to_write,
1489 });
1490 let new_header = DescriptorPageHeader {
1492 next_page_pk: 0,
1493 entry_count: 1,
1494 _padding: [0; 4],
1495 };
1496 let mut new_page_bytes = new_header.to_le_bytes().to_vec();
1497 new_page_bytes.extend_from_slice(&meta.to_le_bytes());
1498 descriptor.tail_page_pk = new_tail_pk;
1500 *tail_page_bytes = new_page_bytes;
1501 }
1502
1503 descriptor.total_row_count += meta.row_count;
1504 descriptor.total_chunk_count += 1;
1505 Ok(())
1506 }
1507
1508 pub fn verify_integrity(&self) -> Result<()> {
1518 let catalog = self.catalog.read().unwrap();
1519 for (&field_id, &descriptor_pk) in &catalog.map {
1520 let desc_blob = self
1521 .pager
1522 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1523 .pop()
1524 .and_then(|r| match r {
1525 GetResult::Raw { bytes, .. } => Some(bytes),
1526 _ => None,
1527 })
1528 .ok_or_else(|| {
1529 Error::Internal(format!(
1530 "Catalog points to missing descriptor pk={}",
1531 descriptor_pk
1532 ))
1533 })?;
1534 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1535 if descriptor.field_id != field_id {
1536 return Err(Error::Internal(format!(
1537 "Descriptor at pk={} has wrong field_id: expected {:?}, \
1538 got {:?}",
1539 descriptor_pk, field_id, descriptor.field_id
1540 )));
1541 }
1542
1543 let mut actual_rows = 0;
1544 let mut actual_chunks = 0;
1545 let mut current_page_pk = descriptor.head_page_pk;
1546 while current_page_pk != 0 {
1547 let page_blob = self
1548 .pager
1549 .batch_get(&[BatchGet::Raw {
1550 key: current_page_pk,
1551 }])?
1552 .pop()
1553 .and_then(|r| match r {
1554 GetResult::Raw { bytes, .. } => Some(bytes),
1555 _ => None,
1556 })
1557 .ok_or_else(|| {
1558 Error::Internal(format!(
1559 "Descriptor page chain broken at pk={}",
1560 current_page_pk
1561 ))
1562 })?;
1563 let header = DescriptorPageHeader::from_le_bytes(
1564 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1565 );
1566 for i in 0..(header.entry_count as usize) {
1567 let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
1568 let end = off + ChunkMetadata::DISK_SIZE;
1569 let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
1570 actual_rows += meta.row_count;
1571 actual_chunks += 1;
1572 }
1573 current_page_pk = header.next_page_pk;
1574 }
1575
1576 if descriptor.total_row_count != actual_rows {
1577 return Err(Error::Internal(format!(
1578 "Row count mismatch for field {:?}: descriptor says {}, \
1579 actual is {}",
1580 field_id, descriptor.total_row_count, actual_rows
1581 )));
1582 }
1583 if descriptor.total_chunk_count != actual_chunks {
1584 return Err(Error::Internal(format!(
1585 "Chunk count mismatch for field {:?}: descriptor says {}, \
1586 actual is {}",
1587 field_id, descriptor.total_chunk_count, actual_chunks
1588 )));
1589 }
1590 }
1591 Ok(())
1592 }
1593
1594 pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
1599 let catalog = self.catalog.read().unwrap();
1600 let mut all_stats = Vec::new();
1601
1602 for (&field_id, &descriptor_pk) in &catalog.map {
1603 let desc_blob = self
1604 .pager
1605 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1606 .pop()
1607 .and_then(|r| match r {
1608 GetResult::Raw { bytes, .. } => Some(bytes),
1609 _ => None,
1610 })
1611 .ok_or(Error::NotFound)?;
1612 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1613
1614 let mut page_stats = Vec::new();
1615 let mut current_page_pk = descriptor.head_page_pk;
1616 while current_page_pk != 0 {
1617 let page_blob = self
1618 .pager
1619 .batch_get(&[BatchGet::Raw {
1620 key: current_page_pk,
1621 }])?
1622 .pop()
1623 .and_then(|r| match r {
1624 GetResult::Raw { bytes, .. } => Some(bytes),
1625 _ => None,
1626 })
1627 .ok_or(Error::NotFound)?;
1628 let header = DescriptorPageHeader::from_le_bytes(
1629 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1630 );
1631 page_stats.push(DescriptorPageStats {
1632 page_pk: current_page_pk,
1633 entry_count: header.entry_count,
1634 page_size_bytes: page_blob.as_ref().len(),
1635 });
1636 current_page_pk = header.next_page_pk;
1637 }
1638
1639 all_stats.push(ColumnLayoutStats {
1640 field_id,
1641 total_rows: descriptor.total_row_count,
1642 total_chunks: descriptor.total_chunk_count,
1643 pages: page_stats,
1644 });
1645 }
1646 Ok(all_stats)
1647 }
1648}