1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4 ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16 constants::CATALOG_ROOT_PKEY,
17 pager::{BatchGet, BatchPut, GetResult, Pager},
18 serialization::{deserialize_array, serialize_array},
19 types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26pub struct ColumnStore<P: Pager> {
51 pub(crate) pager: Arc<P>,
52 pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
53 cfg: ColumnStoreConfig,
54 dtype_cache: DTypeCache<P>,
55 index_manager: IndexManager<P>,
56}
57
58impl<P> ColumnStore<P>
59where
60 P: Pager<Blob = EntryHandle> + Send + Sync,
61{
62 pub fn open(pager: Arc<P>) -> Result<Self> {
72 let cfg = ColumnStoreConfig::default();
73 let catalog = match pager
74 .batch_get(&[BatchGet::Raw {
75 key: CATALOG_ROOT_PKEY,
76 }])?
77 .pop()
78 {
79 Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
80 _ => ColumnCatalog::default(),
81 };
82 let arc_catalog = Arc::new(RwLock::new(catalog));
83
84 let index_manager = IndexManager::new(Arc::clone(&pager));
85
86 Ok(Self {
87 pager: Arc::clone(&pager),
88 catalog: Arc::clone(&arc_catalog),
89 cfg,
90 dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
91 index_manager,
92 })
93 }
94
95 pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
105 self.index_manager.register_index(self, field_id, kind)
106 }
107
108 pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
117 self.index_manager.unregister_index(self, field_id, kind)
118 }
119
120 pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
129 if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
130 return Ok(dt);
131 }
132 self.dtype_cache.dtype_for_field(field_id)
133 }
134
135 pub fn filter_row_ids<T>(
145 &self,
146 field_id: LogicalFieldId,
147 predicate: &Predicate<T::Value>,
148 ) -> Result<Vec<u64>>
149 where
150 T: FilterDispatch,
151 {
152 tracing::trace!(field=?field_id, "filter_row_ids start");
153 let res = T::run_filter(self, field_id, predicate);
154 if let Err(ref err) = res {
155 tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
156 } else {
157 tracing::trace!(field=?field_id, "filter_row_ids ok");
158 }
159 res
160 }
161
162 pub fn filter_matches<T, F>(
164 &self,
165 field_id: LogicalFieldId,
166 predicate: F,
167 ) -> Result<FilterResult>
168 where
169 T: FilterPrimitive,
170 F: FnMut(T::Native) -> bool,
171 {
172 T::run_filter_with_result(self, field_id, predicate)
173 }
174
175 pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
184 let catalog = self.catalog.read().unwrap();
185 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
186
187 let desc_blob = self
188 .pager
189 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
190 .pop()
191 .and_then(|r| match r {
192 GetResult::Raw { bytes, .. } => Some(bytes),
193 _ => None,
194 })
195 .ok_or(Error::NotFound)?;
196 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
197
198 let kinds = descriptor.get_indexes()?;
199 Ok(kinds)
200 }
201
202 pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
211 let catalog = self.catalog.read().unwrap();
212 let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
213 drop(catalog);
214
215 let desc_blob = self
216 .pager
217 .batch_get(&[BatchGet::Raw { key: desc_pk }])?
218 .pop()
219 .and_then(|r| match r {
220 GetResult::Raw { bytes, .. } => Some(bytes),
221 _ => None,
222 })
223 .ok_or(Error::NotFound)?;
224
225 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
226 Ok(desc.total_row_count)
227 }
228
229 pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
238 use crate::types::Namespace;
239 let catalog = self.catalog.read().unwrap();
241 let candidates: Vec<LogicalFieldId> = catalog
243 .map
244 .keys()
245 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
246 .copied()
247 .collect();
248 drop(catalog);
249
250 if candidates.is_empty() {
251 return Ok(0);
252 }
253
254 let mut max_rows: u64 = 0;
256 for field in candidates {
257 let rows = self.total_rows_for_field(field)?;
258 if rows > max_rows {
259 max_rows = rows;
260 }
261 }
262 Ok(max_rows)
263 }
264
265 pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
271 use crate::types::Namespace;
272
273 let catalog = self.catalog.read().unwrap();
274 catalog
275 .map
276 .keys()
277 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
278 .copied()
279 .collect()
280 }
281
282 pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
292 let rid_fid = rowid_fid(field_id);
293 let catalog = self.catalog.read().unwrap();
294 let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
295 let rid_desc_blob = self
296 .pager
297 .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
298 .pop()
299 .and_then(|r| match r {
300 GetResult::Raw { bytes, .. } => Some(bytes),
301 _ => None,
302 })
303 .ok_or(Error::NotFound)?;
304 let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
305 drop(catalog);
306
307 for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
309 let meta = m?;
310 if meta.row_count == 0 {
311 continue;
312 }
313 if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
314 || row_id > meta.max_val_u64
315 {
316 continue;
317 }
318 let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
320 if meta.value_order_perm_pk != 0 {
321 gets.push(BatchGet::Raw {
322 key: meta.value_order_perm_pk,
323 });
324 }
325 let results = self.pager.batch_get(&gets)?;
326 let mut rid_blob: Option<EntryHandle> = None;
327 let mut perm_blob: Option<EntryHandle> = None;
328 for r in results {
329 if let GetResult::Raw { key, bytes } = r {
330 if key == meta.chunk_pk {
331 rid_blob = Some(bytes);
332 } else if key == meta.value_order_perm_pk {
333 perm_blob = Some(bytes);
334 }
335 }
336 }
337 let Some(rid_blob) = rid_blob else { continue };
339 let rid_any = deserialize_array(rid_blob)?;
340 let rids = rid_any
341 .as_any()
342 .downcast_ref::<UInt64Array>()
343 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
344 if let Some(pblob) = perm_blob {
345 let perm_any = deserialize_array(pblob)?;
346 let perm = perm_any
347 .as_any()
348 .downcast_ref::<UInt32Array>()
349 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
350 let mut lo: isize = 0;
352 let mut hi: isize = (perm.len() as isize) - 1;
353 while lo <= hi {
354 let mid = ((lo + hi) >> 1) as usize;
355 let rid = rids.value(perm.value(mid) as usize);
356 if rid == row_id {
357 return Ok(true);
358 } else if rid < row_id {
359 lo = mid as isize + 1;
360 } else {
361 hi = mid as isize - 1;
362 }
363 }
364 } else {
365 let mut lo: isize = 0;
367 let mut hi: isize = (rids.len() as isize) - 1;
368 while lo <= hi {
369 let mid = ((lo + hi) >> 1) as usize;
370 let rid = rids.value(mid);
371 if rid == row_id {
372 return Ok(true);
373 } else if rid < row_id {
374 lo = mid as isize + 1;
375 } else {
376 hi = mid as isize - 1;
377 }
378 }
379 }
380 }
381 Ok(false)
382 }
383
384 #[allow(unused_variables, unused_assignments)] pub fn append(&self, batch: &RecordBatch) -> Result<()> {
415 let working_batch: RecordBatch;
421 let batch_ref = {
422 let schema = batch.schema();
423 let row_id_idx = schema
424 .index_of(ROW_ID_COLUMN_NAME)
425 .map_err(|_| Error::Internal("row_id column required".into()))?;
426 let row_id_any = batch.column(row_id_idx).clone();
427 let row_id_arr = row_id_any
428 .as_any()
429 .downcast_ref::<UInt64Array>()
430 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
431
432 let mut is_sorted = true;
434 if !row_id_arr.is_empty() {
435 let mut last = row_id_arr.value(0);
436 for i in 1..row_id_arr.len() {
437 let current = row_id_arr.value(i);
438 if current < last {
439 is_sorted = false;
440 break;
441 }
442 last = current;
443 }
444 }
445
446 if is_sorted {
449 batch
450 } else {
451 let sort_col = SortColumn {
452 values: row_id_any,
453 options: None,
454 };
455 let idx = lexsort_to_indices(&[sort_col], None)?;
456 let perm = idx
457 .as_any()
458 .downcast_ref::<UInt32Array>()
459 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
460 let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
461 for i in 0..batch.num_columns() {
462 cols.push(compute::take(batch.column(i), perm, None)?);
463 }
464 working_batch = RecordBatch::try_new(schema.clone(), cols)
465 .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
466 &working_batch
467 }
468 };
469
470 let schema = batch_ref.schema();
475 let row_id_idx = schema
476 .index_of(ROW_ID_COLUMN_NAME)
477 .map_err(|_| Error::Internal("row_id column required".into()))?;
478
479 let row_id_arr = batch_ref
481 .column(row_id_idx)
482 .as_any()
483 .downcast_ref::<UInt64Array>()
484 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
485 let mut incoming_ids_map = FxHashMap::default();
486 incoming_ids_map.reserve(row_id_arr.len());
487 for i in 0..row_id_arr.len() {
488 incoming_ids_map.insert(row_id_arr.value(i), i);
489 }
490
491 let mut catalog_dirty = false;
493 let mut puts_rewrites: Vec<BatchPut> = Vec::new();
494 let mut all_rewritten_ids = FxHashSet::default();
495
496 let mut catalog_lock = self.catalog.write().unwrap();
498 for i in 0..batch_ref.num_columns() {
499 if i == row_id_idx {
500 continue;
501 }
502 let field = schema.field(i);
503 if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
504 let field_id = field_id_str
505 .parse::<u64>()
506 .map(LogicalFieldId::from)
507 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
508
509 let rewritten = self.lww_rewrite_for_field(
512 &mut catalog_lock,
513 field_id,
514 &incoming_ids_map,
515 batch_ref.column(i),
516 batch_ref.column(row_id_idx),
517 &mut puts_rewrites,
518 )?;
519 all_rewritten_ids.extend(rewritten);
520 }
521 }
522 drop(catalog_lock);
523
524 if !puts_rewrites.is_empty() {
526 self.pager.batch_put(&puts_rewrites)?;
527 }
528
529 let batch_to_append = if !all_rewritten_ids.is_empty() {
533 let keep_mask: Vec<bool> = (0..row_id_arr.len())
534 .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
535 .collect();
536 let keep_array = BooleanArray::from(keep_mask);
537 compute::filter_record_batch(batch_ref, &keep_array)?
538 } else {
539 batch_ref.clone()
540 };
541
542 if batch_to_append.num_rows() == 0 {
544 return Ok(());
545 }
546
547 let append_schema = batch_to_append.schema();
551 let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
552 let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
553 let mut puts_appends: Vec<BatchPut> = Vec::new();
554
555 for (i, array) in batch_to_append.columns().iter().enumerate() {
557 if i == append_row_id_idx {
558 continue;
559 }
560
561 let field = append_schema.field(i);
562
563 let field_id = field
564 .metadata()
565 .get(crate::store::FIELD_ID_META_KEY)
566 .ok_or_else(|| Error::Internal("Missing field_id".into()))?
567 .parse::<u64>()
568 .map(LogicalFieldId::from)
569 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
570
571 self.dtype_cache.insert(field_id, field.data_type().clone());
574
575 let (array_clean, rids_clean) = if array.null_count() == 0 {
578 (array.clone(), append_row_id_any.clone())
579 } else {
580 let keep =
581 BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
582 let a = compute::filter(array, &keep)?;
583 let r = compute::filter(&append_row_id_any, &keep)?;
584 (a, r)
585 };
586
587 if array_clean.is_empty() {
588 continue;
589 }
590
591 let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
594 let mut catalog = self.catalog.write().unwrap();
595 let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
596 catalog_dirty = true;
597 self.pager.alloc_many(1).unwrap()[0]
598 });
599 let r_fid = rowid_fid(field_id);
600 let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
601 catalog_dirty = true;
602 self.pager.alloc_many(1).unwrap()[0]
603 });
604 (pk1, pk2, r_fid)
605 };
606
607 let (mut data_descriptor, mut data_tail_page) =
610 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
611 let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
612 Arc::clone(&self.pager),
613 rid_descriptor_pk,
614 rid_fid,
615 )?;
616
617 self.index_manager
621 .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
622
623 let slices = split_to_target_bytes(
625 &array_clean,
626 TARGET_CHUNK_BYTES,
627 self.cfg.varwidth_fallback_rows_per_slice,
628 );
629 let mut row_off = 0usize;
630
631 for s in slices {
633 let rows = s.len();
634 let data_pk = self.pager.alloc_many(1)?[0];
636 let s_norm = zero_offset(&s);
637 let data_bytes = serialize_array(s_norm.as_ref())?;
638 puts_appends.push(BatchPut::Raw {
639 key: data_pk,
640 bytes: data_bytes,
641 });
642
643 let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
645 let rid_norm = zero_offset(&rid_slice);
646 let rid_pk = self.pager.alloc_many(1)?[0];
647 let rid_bytes = serialize_array(rid_norm.as_ref())?;
648 puts_appends.push(BatchPut::Raw {
649 key: rid_pk,
650 bytes: rid_bytes,
651 });
652
653 let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
655 let (min, max) = if !rids_for_meta.is_empty() {
656 let mut min_val = rids_for_meta.value(0);
657 let mut max_val = rids_for_meta.value(0);
658 for i in 1..rids_for_meta.len() {
659 let v = rids_for_meta.value(i);
660 if v < min_val {
661 min_val = v;
662 }
663 if v > max_val {
664 max_val = v;
665 }
666 }
667 (min_val, max_val)
668 } else {
669 (0, 0)
670 };
671
672 let mut data_meta = ChunkMetadata {
675 chunk_pk: data_pk,
676 row_count: rows as u64,
677 serialized_bytes: s_norm.get_array_memory_size() as u64,
678 max_val_u64: u64::MAX,
679 ..Default::default()
680 };
681 let mut rid_meta = ChunkMetadata {
682 chunk_pk: rid_pk,
683 row_count: rows as u64,
684 serialized_bytes: rid_norm.get_array_memory_size() as u64,
685 min_val_u64: min,
686 max_val_u64: max,
687 ..Default::default()
688 };
689
690 self.index_manager.stage_updates_for_new_chunk(
696 field_id,
697 &data_descriptor,
698 &s_norm,
699 &rid_norm,
700 &mut data_meta,
701 &mut rid_meta,
702 &mut puts_appends,
703 )?;
704
705 self.append_meta_in_loop(
707 &mut data_descriptor,
708 &mut data_tail_page,
709 data_meta,
710 &mut puts_appends,
711 )?;
712 self.append_meta_in_loop(
713 &mut rid_descriptor,
714 &mut rid_tail_page,
715 rid_meta,
716 &mut puts_appends,
717 )?;
718 row_off += rows;
719 }
720
721 puts_appends.push(BatchPut::Raw {
724 key: data_descriptor.tail_page_pk,
725 bytes: data_tail_page,
726 });
727 puts_appends.push(BatchPut::Raw {
728 key: descriptor_pk,
729 bytes: data_descriptor.to_le_bytes(),
730 });
731 puts_appends.push(BatchPut::Raw {
732 key: rid_descriptor.tail_page_pk,
733 bytes: rid_tail_page,
734 });
735 puts_appends.push(BatchPut::Raw {
736 key: rid_descriptor_pk,
737 bytes: rid_descriptor.to_le_bytes(),
738 });
739 }
740
741 if catalog_dirty {
744 let catalog = self.catalog.read().unwrap();
745 puts_appends.push(BatchPut::Raw {
746 key: CATALOG_ROOT_PKEY,
747 bytes: catalog.to_bytes(),
748 });
749 }
750
751 if !puts_appends.is_empty() {
755 self.pager.batch_put(&puts_appends)?;
756 }
757 Ok(())
758 }
759
760 fn lww_rewrite_for_field(
761 &self,
762 catalog: &mut ColumnCatalog,
763 field_id: LogicalFieldId,
764 incoming_ids_map: &FxHashMap<u64, usize>,
765 incoming_data: &ArrayRef,
766 incoming_row_ids: &ArrayRef,
767 puts: &mut Vec<BatchPut>,
768 ) -> Result<FxHashSet<u64>> {
769 use crate::store::descriptor::DescriptorIterator;
770 use crate::store::ingest::ChunkEdit;
771
772 if incoming_ids_map.is_empty() {
774 return Ok(FxHashSet::default());
775 }
776 let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
777
778 let desc_pk_data = match catalog.map.get(&field_id) {
780 Some(pk) => *pk,
781 None => return Ok(FxHashSet::default()),
782 };
783 let rid_fid = rowid_fid(field_id);
784 let desc_pk_rid = match catalog.map.get(&rid_fid) {
785 Some(pk) => *pk,
786 None => return Ok(FxHashSet::default()),
787 };
788
789 let gets = vec![
791 BatchGet::Raw { key: desc_pk_data },
792 BatchGet::Raw { key: desc_pk_rid },
793 ];
794 let results = self.pager.batch_get(&gets)?;
795 let mut blobs_by_pk = FxHashMap::default();
796 for r in results {
797 if let GetResult::Raw { key, bytes } = r {
798 blobs_by_pk.insert(key, bytes);
799 }
800 }
801
802 let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or(Error::NotFound)?;
803 let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
804
805 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
806 let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
807
808 let mut metas_data: Vec<ChunkMetadata> = Vec::new();
810 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
811 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
812 metas_data.push(m?);
813 }
814 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
815 metas_rid.push(m?);
816 }
817
818 let rid_in = incoming_row_ids
820 .as_any()
821 .downcast_ref::<UInt64Array>()
822 .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
823 let mut ids_to_delete = FxHashSet::default();
824 let mut ids_to_upsert = FxHashSet::default();
825 for i in 0..rid_in.len() {
826 let rid = rid_in.value(i);
827 if incoming_data.is_null(i) {
828 ids_to_delete.insert(rid);
829 } else {
830 ids_to_upsert.insert(rid);
831 }
832 }
833
834 let mut rewritten_ids = FxHashSet::default();
836 let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
837 let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
838
839 let n = metas_data.len().min(metas_rid.len());
840 if n > 0 {
841 let mut gets_rid = Vec::with_capacity(n);
843 for rm in metas_rid.iter().take(n) {
844 gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
845 }
846 let rid_results = self.pager.batch_get(&gets_rid)?;
847 let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
848 for r in rid_results {
849 if let GetResult::Raw { key, bytes } = r {
850 rid_blobs.insert(key, bytes);
851 }
852 }
853
854 for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
855 if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
856 let rid_arr_any = deserialize_array(rid_blob.clone())?;
857 let rid_arr = rid_arr_any
858 .as_any()
859 .downcast_ref::<UInt64Array>()
860 .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
861 for j in 0..rid_arr.len() {
862 let rid = rid_arr.value(j);
863 if incoming_ids.contains(&rid) {
864 if ids_to_delete.contains(&rid) {
865 hit_del.entry(i).or_default().push(rid);
866 } else if ids_to_upsert.contains(&rid) {
867 hit_up.entry(i).or_default().push(rid);
868 }
869 rewritten_ids.insert(rid);
870 }
871 }
872 }
873 }
874 }
875
876 if hit_up.is_empty() && hit_del.is_empty() {
877 return Ok(rewritten_ids);
878 }
879
880 let mut hit_set = FxHashSet::default();
882 hit_set.extend(hit_up.keys().copied());
883 hit_set.extend(hit_del.keys().copied());
884 let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
885
886 let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
887 for &i in &hit_idxs {
888 gets.push(BatchGet::Raw {
889 key: metas_data[i].chunk_pk,
890 });
891 gets.push(BatchGet::Raw {
892 key: metas_rid[i].chunk_pk,
893 });
894 }
895 let results = self.pager.batch_get(&gets)?;
896 let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
897 for r in results {
898 if let GetResult::Raw { key, bytes } = r {
899 blob_map.insert(key, bytes);
900 }
901 }
902
903 for i in hit_idxs {
905 let old_data_arr =
906 deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
907 let old_rid_arr_any =
908 deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
909 let old_rid_arr = old_rid_arr_any
910 .as_any()
911 .downcast_ref::<UInt64Array>()
912 .unwrap();
913
914 let up_vec = hit_up.remove(&i).unwrap_or_default();
915 let del_vec = hit_del.remove(&i).unwrap_or_default();
916
917 let edit = ChunkEdit::from_lww_upsert(
919 old_rid_arr,
920 &up_vec,
921 &del_vec,
922 incoming_data,
923 incoming_row_ids,
924 incoming_ids_map,
925 )?;
926
927 let (new_data_arr, new_rid_arr) =
928 ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
929
930 let data_bytes = serialize_array(&new_data_arr)?;
932 puts.push(BatchPut::Raw {
933 key: metas_data[i].chunk_pk,
934 bytes: data_bytes,
935 });
936 metas_data[i].row_count = new_data_arr.len() as u64;
937 metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
938
939 if let Some(rarr) = new_rid_arr {
941 let rid_bytes = serialize_array(&rarr)?;
942 puts.push(BatchPut::Raw {
943 key: metas_rid[i].chunk_pk,
944 bytes: rid_bytes,
945 });
946 metas_rid[i].row_count = rarr.len() as u64;
947 metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
948 }
949
950 if metas_data[i].value_order_perm_pk != 0 {
952 let sort_col = SortColumn {
953 values: new_data_arr,
954 options: None,
955 };
956 let idx = lexsort_to_indices(&[sort_col], None)?;
957 let perm_bytes = serialize_array(&idx)?;
958 puts.push(BatchPut::Raw {
959 key: metas_data[i].value_order_perm_pk,
960 bytes: perm_bytes,
961 });
962 }
963 }
964
965 descriptor_data.rewrite_pages(
967 Arc::clone(&self.pager),
968 desc_pk_data,
969 &mut metas_data,
970 puts,
971 )?;
972 descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
973
974 Ok(rewritten_ids)
975 }
976
977 fn stage_delete_rows_for_field(
978 &self,
979 field_id: LogicalFieldId,
980 rows_to_delete: &[RowId],
981 staged_puts: &mut Vec<BatchPut>,
982 ) -> Result<bool> {
983 tracing::warn!(
984 field_id = ?field_id,
985 rows = rows_to_delete.len(),
986 "delete_rows stage_delete_rows_for_field: start"
987 );
988 use crate::store::descriptor::DescriptorIterator;
989 use crate::store::ingest::ChunkEdit;
990
991 if rows_to_delete.is_empty() {
992 return Ok(false);
993 }
994
995 let mut del_iter = rows_to_delete.iter().copied();
997 let mut cur_del = del_iter.next();
998 let mut last_seen: Option<u64> = cur_del;
999
1000 let catalog = self.catalog.read().unwrap();
1002 let desc_pk = match catalog.map.get(&field_id) {
1003 Some(pk) => *pk,
1004 None => {
1005 tracing::trace!(
1006 field_id = ?field_id,
1007 "delete_rows stage_delete_rows_for_field: data descriptor missing"
1008 );
1009 return Err(Error::NotFound);
1010 }
1011 };
1012 let rid_fid = rowid_fid(field_id);
1013 let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1014 tracing::warn!(
1015 field_id = ?field_id,
1016 desc_pk,
1017 desc_pk_rid = ?desc_pk_rid,
1018 "delete_rows stage_delete_rows_for_field: descriptor keys"
1019 );
1020
1021 let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1023 if let Some(pk) = desc_pk_rid {
1024 gets.push(BatchGet::Raw { key: pk });
1025 }
1026 let results = match self.pager.batch_get(&gets) {
1027 Ok(res) => res,
1028 Err(err) => {
1029 tracing::trace!(
1030 field_id = ?field_id,
1031 error = ?err,
1032 "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1033 );
1034 return Err(err);
1035 }
1036 };
1037 let mut blobs_by_pk = FxHashMap::default();
1038 for res in results {
1039 if let GetResult::Raw { key, bytes } = res {
1040 blobs_by_pk.insert(key, bytes);
1041 }
1042 }
1043
1044 tracing::warn!(
1045 field_id = ?field_id,
1046 desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1047 rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1048 "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1049 );
1050
1051 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1052 tracing::trace!(
1053 field_id = ?field_id,
1054 desc_pk,
1055 "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1056 );
1057 Error::Internal(format!(
1058 "descriptor pk={} missing during delete_rows for field {:?}",
1059 desc_pk, field_id
1060 ))
1061 })?;
1062 let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1063
1064 let mut metas: Vec<ChunkMetadata> = Vec::new();
1066 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1067 metas.push(m?);
1068 }
1069 if metas.is_empty() {
1070 drop(catalog);
1071 return Ok(false);
1072 }
1073
1074 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1076 let mut descriptor_rid: Option<ColumnDescriptor> = None;
1077 tracing::warn!(
1078 field_id = ?field_id,
1079 metas_len = metas.len(),
1080 desc_pk_rid = ?desc_pk_rid,
1081 "delete_rows stage_delete_rows_for_field: data metas loaded"
1082 );
1083 if let Some(pk_rid) = desc_pk_rid
1084 && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1085 {
1086 let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1087 for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1088 metas_rid.push(m?);
1089 }
1090 descriptor_rid = Some(d_rid);
1091 }
1092
1093 tracing::warn!(
1094 field_id = ?field_id,
1095 metas_rid_len = metas_rid.len(),
1096 "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1097 );
1098
1099 let mut cum_rows = 0u64;
1100 let mut any_changed = false;
1101
1102 for (i, meta) in metas.iter_mut().enumerate() {
1103 let start_u64 = cum_rows;
1104 let end_u64 = start_u64 + meta.row_count;
1105
1106 while let Some(d) = cur_del {
1108 if d < start_u64
1109 && let Some(prev) = last_seen
1110 {
1111 if d < prev {
1112 return Err(Error::Internal(
1113 "rows_to_delete must be ascending/unique".into(),
1114 ));
1115 }
1116
1117 last_seen = Some(d);
1118 cur_del = del_iter.next();
1119 } else {
1120 break;
1121 }
1122 }
1123
1124 let rows = meta.row_count as usize;
1126 let mut del_local: FxHashSet<usize> = FxHashSet::default();
1127 while let Some(d) = cur_del {
1128 if d >= end_u64 {
1129 break;
1130 }
1131 del_local.insert((d - start_u64) as usize);
1132 last_seen = Some(d);
1133 cur_del = del_iter.next();
1134 }
1135
1136 if del_local.is_empty() {
1137 cum_rows = end_u64;
1138 continue;
1139 }
1140
1141 let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1143 if let Some(rm) = metas_rid.get(i) {
1144 chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1145 }
1146 let chunk_results = match self.pager.batch_get(&chunk_gets) {
1147 Ok(res) => res,
1148 Err(err) => {
1149 tracing::trace!(
1150 field_id = ?field_id,
1151 chunk_pk = meta.chunk_pk,
1152 error = ?err,
1153 "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1154 );
1155 return Err(err);
1156 }
1157 };
1158 let mut chunk_blobs = FxHashMap::default();
1159 for res in chunk_results {
1160 if let GetResult::Raw { key, bytes } = res {
1161 chunk_blobs.insert(key, bytes);
1162 }
1163 }
1164
1165 tracing::warn!(
1166 field_id = ?field_id,
1167 chunk_pk = meta.chunk_pk,
1168 rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1169 data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1170 rid_found = metas_rid
1171 .get(i)
1172 .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1173 "delete_rows stage_delete_rows_for_field: chunk fetch status"
1174 );
1175
1176 let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1177 Some(bytes) => bytes,
1178 None => {
1179 tracing::trace!(
1180 field_id = ?field_id,
1181 chunk_pk = meta.chunk_pk,
1182 "delete_rows stage_delete_rows_for_field: chunk missing"
1183 );
1184 return Err(Error::NotFound);
1185 }
1186 };
1187 let data_arr = deserialize_array(data_blob)?;
1188
1189 let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1190 let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1191 Some(bytes) => bytes,
1192 None => {
1193 tracing::trace!(
1194 field_id = ?field_id,
1195 rowid_chunk_pk = rm.chunk_pk,
1196 "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1197 );
1198 return Err(Error::NotFound);
1199 }
1200 };
1201 Some(deserialize_array(rid_blob)?)
1202 } else {
1203 None
1204 };
1205
1206 let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1208
1209 let (new_data_arr, new_rid_arr) =
1211 ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1212
1213 let data_bytes = serialize_array(&new_data_arr)?;
1215 staged_puts.push(BatchPut::Raw {
1216 key: meta.chunk_pk,
1217 bytes: data_bytes,
1218 });
1219 meta.row_count = new_data_arr.len() as u64;
1220 meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1221
1222 if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1224 let rm = metas_rid.get_mut(i).unwrap();
1225 let rid_bytes = serialize_array(&rids)?;
1226 staged_puts.push(BatchPut::Raw {
1227 key: rm.chunk_pk,
1228 bytes: rid_bytes,
1229 });
1230 rm.row_count = rids.len() as u64;
1231 rm.serialized_bytes = rids.get_array_memory_size() as u64;
1232 }
1233
1234 if meta.value_order_perm_pk != 0 {
1236 let sort_column = SortColumn {
1237 values: new_data_arr,
1238 options: None,
1239 };
1240 let indices = lexsort_to_indices(&[sort_column], None)?;
1241 let perm_bytes = serialize_array(&indices)?;
1242 staged_puts.push(BatchPut::Raw {
1243 key: meta.value_order_perm_pk,
1244 bytes: perm_bytes,
1245 });
1246 }
1247
1248 cum_rows = end_u64;
1249 any_changed = true;
1250 }
1251
1252 descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1254 if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1255 rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1256 }
1257 drop(catalog);
1258 tracing::trace!(
1259 field_id = ?field_id,
1260 changed = any_changed,
1261 "delete_rows stage_delete_rows_for_field: finished stage"
1262 );
1263 Ok(any_changed)
1264 }
1265
1266 pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1272 if fields.is_empty() || rows_to_delete.is_empty() {
1273 return Ok(());
1274 }
1275
1276 let mut puts = Vec::new();
1277 let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1278 let mut table_id: Option<TableId> = None;
1279
1280 tracing::warn!(
1281 fields = fields.len(),
1282 rows = rows_to_delete.len(),
1283 "delete_rows begin"
1284 );
1285 for field_id in fields {
1286 tracing::warn!(field = ?field_id, "delete_rows iter field");
1287 if let Some(expected) = table_id {
1288 if field_id.table_id() != expected {
1289 return Err(Error::InvalidArgumentError(
1290 "delete_rows requires fields from the same table".into(),
1291 ));
1292 }
1293 } else {
1294 table_id = Some(field_id.table_id());
1295 }
1296
1297 if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1298 touched.insert(*field_id);
1299 }
1300 }
1301
1302 if puts.is_empty() {
1303 return Ok(());
1304 }
1305
1306 self.pager.batch_put(&puts)?;
1307
1308 tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1309
1310 for field_id in touched {
1311 self.compact_field_bounded(field_id)?;
1312 }
1313 tracing::warn!("delete_rows complete");
1314 Ok(())
1315 }
1316
1317 pub(crate) fn write_descriptor_chain(
1321 &self,
1322 descriptor_pk: PhysicalKey,
1323 descriptor: &mut ColumnDescriptor,
1324 new_metas: &[ChunkMetadata],
1325 puts: &mut Vec<BatchPut>,
1326 frees: &mut Vec<PhysicalKey>,
1327 ) -> Result<()> {
1328 let mut old_pages = Vec::new();
1330 let mut pk = descriptor.head_page_pk;
1331 while pk != 0 {
1332 let page_blob = self
1333 .pager
1334 .batch_get(&[BatchGet::Raw { key: pk }])?
1335 .pop()
1336 .and_then(|res| match res {
1337 GetResult::Raw { bytes, .. } => Some(bytes),
1338 _ => None,
1339 })
1340 .ok_or(Error::NotFound)?;
1341 let header = DescriptorPageHeader::from_le_bytes(
1342 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1343 );
1344 old_pages.push(pk);
1345 pk = header.next_page_pk;
1346 }
1347
1348 let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1350 let need_pages = if new_metas.is_empty() {
1351 0
1352 } else {
1353 new_metas.len().div_ceil(per)
1354 };
1355 if need_pages == 0 {
1357 frees.extend(old_pages.iter().copied());
1358 descriptor.total_row_count = 0;
1359 descriptor.total_chunk_count = 0;
1360 puts.push(BatchPut::Raw {
1361 key: descriptor_pk,
1362 bytes: descriptor.to_le_bytes(),
1363 });
1364 return Ok(());
1365 }
1366
1367 let mut pages = Vec::with_capacity(need_pages);
1369 if !old_pages.is_empty() {
1370 pages.push(old_pages[0]);
1371 } else {
1372 pages.push(self.pager.alloc_many(1)?[0]);
1373 descriptor.head_page_pk = pages[0];
1374 }
1375 if need_pages > pages.len() {
1376 let extra = self.pager.alloc_many(need_pages - pages.len())?;
1377 pages.extend(extra);
1378 }
1379
1380 if old_pages.len() > need_pages {
1382 frees.extend(old_pages[need_pages..].iter().copied());
1383 }
1384
1385 let mut off = 0usize;
1387 for (i, page_pk) in pages.iter().copied().enumerate() {
1388 let remain = new_metas.len() - off;
1389 let count = remain.min(per);
1390 let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1391 let header = DescriptorPageHeader {
1392 next_page_pk: next,
1393 entry_count: count as u32,
1394 _padding: [0; 4],
1395 };
1396 let mut page_bytes = header.to_le_bytes().to_vec();
1397 for m in &new_metas[off..off + count] {
1398 page_bytes.extend_from_slice(&m.to_le_bytes());
1399 }
1400 puts.push(BatchPut::Raw {
1401 key: page_pk,
1402 bytes: page_bytes,
1403 });
1404 off += count;
1405 }
1406
1407 descriptor.tail_page_pk = *pages.last().unwrap();
1408 descriptor.total_chunk_count = new_metas.len() as u64;
1409 descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1410 puts.push(BatchPut::Raw {
1411 key: descriptor_pk,
1412 bytes: descriptor.to_le_bytes(),
1413 });
1414 Ok(())
1415 }
1416
1417 fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1421 let mut catalog = self.catalog.write().unwrap();
1423
1424 let desc_pk = match catalog.map.get(&field_id) {
1425 Some(&pk) => pk,
1426 None => return Ok(()),
1427 };
1428 let rid_fid = rowid_fid(field_id);
1429 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1430 Some(&pk) => pk,
1431 None => return Ok(()),
1432 };
1433
1434 let gets = vec![
1436 BatchGet::Raw { key: desc_pk },
1437 BatchGet::Raw { key: desc_pk_rid },
1438 ];
1439 let results = self.pager.batch_get(&gets)?;
1440 let mut blobs_by_pk = FxHashMap::default();
1441 for res in results {
1442 if let GetResult::Raw { key, bytes } = res {
1443 blobs_by_pk.insert(key, bytes);
1444 }
1445 }
1446 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1447 let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1448 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1449 let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1450
1451 let mut metas = Vec::new();
1453 for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1454 metas.push(m?);
1455 }
1456 let mut metas_rid = Vec::new();
1457 for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1458 metas_rid.push(m?);
1459 }
1460 if metas.is_empty() || metas_rid.is_empty() {
1461 return Ok(());
1462 }
1463
1464 let mut puts: Vec<BatchPut> = Vec::new();
1465 let mut frees: Vec<PhysicalKey> = Vec::new();
1466 let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1467 let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1468
1469 let mut i = 0usize;
1470 while i < metas.len() {
1471 let sz = metas[i].serialized_bytes as usize;
1472 if sz >= MIN_CHUNK_BYTES {
1474 new_metas.push(metas[i]);
1475 new_rid_metas.push(metas_rid[i]);
1476 i += 1;
1477 continue;
1478 }
1479
1480 let mut j = i;
1482 let mut run_bytes = 0usize;
1483 while j < metas.len() {
1484 let b = metas[j].serialized_bytes as usize;
1485 if b >= TARGET_CHUNK_BYTES {
1486 break;
1487 }
1488 if run_bytes + b > MAX_MERGE_RUN_BYTES {
1489 break;
1490 }
1491 run_bytes += b;
1492 j += 1;
1493 }
1494 if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1495 new_metas.push(metas[i]);
1496 new_rid_metas.push(metas_rid[i]);
1497 i += 1;
1498 continue;
1499 }
1500
1501 let mut gets = Vec::with_capacity((j - i) * 2);
1503 for k in i..j {
1504 gets.push(BatchGet::Raw {
1505 key: metas[k].chunk_pk,
1506 });
1507 gets.push(BatchGet::Raw {
1508 key: metas_rid[k].chunk_pk,
1509 });
1510 }
1511 let results = self.pager.batch_get(&gets)?;
1512 let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1513 for r in results {
1514 match r {
1515 GetResult::Raw { key, bytes } => {
1516 by_pk.insert(key, bytes);
1517 }
1518 _ => return Err(Error::NotFound),
1519 }
1520 }
1521 let mut data_parts = Vec::with_capacity(j - i);
1522 let mut rid_parts = Vec::with_capacity(j - i);
1523 for k in i..j {
1524 let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1525 data_parts.push(deserialize_array(db.clone())?);
1526 let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1527 rid_parts.push(deserialize_array(rb.clone())?);
1528 }
1529 let merged_data = concat_many(data_parts.iter().collect())?;
1530 let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1531
1532 let slices = split_to_target_bytes(
1534 &merged_data,
1535 TARGET_CHUNK_BYTES,
1536 self.cfg.varwidth_fallback_rows_per_slice,
1537 );
1538 let mut rid_off = 0usize;
1539 let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1540
1541 for s in slices {
1542 let rows = s.len();
1543 let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1545 let rid_norm = zero_offset(&rid_ref);
1546 let rid_pk = self.pager.alloc_many(1)?[0];
1547 let rid_bytes = serialize_array(rid_norm.as_ref())?;
1548
1549 let data_pk = self.pager.alloc_many(1)?[0];
1550 let s_norm = zero_offset(&s);
1551 let data_bytes = serialize_array(s_norm.as_ref())?;
1552 puts.push(BatchPut::Raw {
1553 key: data_pk,
1554 bytes: data_bytes,
1555 });
1556 puts.push(BatchPut::Raw {
1557 key: rid_pk,
1558 bytes: rid_bytes,
1559 });
1560 let mut meta = ChunkMetadata {
1561 chunk_pk: data_pk,
1562 row_count: rows as u64,
1563 serialized_bytes: s_norm.get_array_memory_size() as u64,
1564 max_val_u64: u64::MAX,
1565 ..Default::default()
1566 };
1567 if need_perms {
1569 let sort_col = SortColumn {
1570 values: s.clone(),
1571 options: None,
1572 };
1573 let idx = lexsort_to_indices(&[sort_col], None)?;
1574 let perm_bytes = serialize_array(&idx)?;
1575 let perm_pk = self.pager.alloc_many(1)?[0];
1576 puts.push(BatchPut::Raw {
1577 key: perm_pk,
1578 bytes: perm_bytes,
1579 });
1580 meta.value_order_perm_pk = perm_pk;
1581 }
1582
1583 let rid_any = rid_norm.clone();
1585 let rids = rid_any
1586 .as_any()
1587 .downcast_ref::<UInt64Array>()
1588 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1589 let mut min = u64::MAX;
1590 let mut max = 0u64;
1591 let mut sorted_rids = true;
1592 let mut last_v = 0u64;
1593 for ii in 0..rids.len() {
1594 let v = rids.value(ii);
1595 if ii == 0 {
1596 last_v = v;
1597 } else if v < last_v {
1598 sorted_rids = false;
1599 } else {
1600 last_v = v;
1601 }
1602 if v < min {
1603 min = v;
1604 }
1605 if v > max {
1606 max = v;
1607 }
1608 }
1609 let mut rid_perm_pk = 0u64;
1610 if !sorted_rids {
1611 let rid_sort_col = SortColumn {
1612 values: rid_any,
1613 options: None,
1614 };
1615 let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1616 let rid_perm_bytes = serialize_array(&rid_idx)?;
1617 rid_perm_pk = self.pager.alloc_many(1)?[0];
1618 puts.push(BatchPut::Raw {
1619 key: rid_perm_pk,
1620 bytes: rid_perm_bytes,
1621 });
1622 }
1623 let rid_meta = ChunkMetadata {
1624 chunk_pk: rid_pk,
1625 value_order_perm_pk: rid_perm_pk,
1626 row_count: rows as u64,
1627 serialized_bytes: rid_norm.get_array_memory_size() as u64,
1628 min_val_u64: if rows > 0 { min } else { 0 },
1629 max_val_u64: if rows > 0 { max } else { 0 },
1630 };
1631 new_metas.push(meta);
1632 new_rid_metas.push(rid_meta);
1633 rid_off += rows;
1634 }
1635
1636 for k in i..j {
1638 frees.push(metas[k].chunk_pk);
1639 if metas[k].value_order_perm_pk != 0 {
1640 frees.push(metas[k].value_order_perm_pk);
1641 }
1642 frees.push(metas_rid[k].chunk_pk);
1643 if metas_rid[k].value_order_perm_pk != 0 {
1644 frees.push(metas_rid[k].value_order_perm_pk);
1645 }
1646 }
1647
1648 i = j;
1649 }
1650
1651 if new_metas.is_empty() {
1653 self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1655 self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1656 catalog.map.remove(&field_id);
1657 catalog.map.remove(&rid_fid);
1658 puts.push(BatchPut::Raw {
1659 key: CATALOG_ROOT_PKEY,
1660 bytes: catalog.to_bytes(),
1661 });
1662 if !puts.is_empty() {
1663 self.pager.batch_put(&puts)?;
1664 }
1665 if !frees.is_empty() {
1666 self.pager.free_many(&frees)?;
1667 }
1668 return Ok(());
1669 }
1670
1671 self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1673 self.write_descriptor_chain(
1674 desc_pk_rid,
1675 &mut desc_rid,
1676 &new_rid_metas,
1677 &mut puts,
1678 &mut frees,
1679 )?;
1680 if !puts.is_empty() {
1682 self.pager.batch_put(&puts)?;
1683 }
1684 if !frees.is_empty() {
1685 self.pager.free_many(&frees)?;
1686 }
1687
1688 Ok(())
1689 }
1690
1691 fn append_meta_in_loop(
1694 &self,
1695 descriptor: &mut ColumnDescriptor,
1696 tail_page_bytes: &mut Vec<u8>,
1697 meta: ChunkMetadata,
1698 puts: &mut Vec<BatchPut>,
1699 ) -> Result<()> {
1700 let mut header = DescriptorPageHeader::from_le_bytes(
1701 &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1702 );
1703 if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1704 && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1705 {
1706 tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1708 header.entry_count += 1;
1709 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1710 .copy_from_slice(&header.to_le_bytes());
1711 } else {
1712 let new_tail_pk = self.pager.alloc_many(1)?[0];
1714 header.next_page_pk = new_tail_pk;
1715 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1716 .copy_from_slice(&header.to_le_bytes());
1717 let full_page_to_write = std::mem::take(tail_page_bytes);
1720 puts.push(BatchPut::Raw {
1721 key: descriptor.tail_page_pk,
1722 bytes: full_page_to_write,
1723 });
1724 let new_header = DescriptorPageHeader {
1726 next_page_pk: 0,
1727 entry_count: 1,
1728 _padding: [0; 4],
1729 };
1730 let mut new_page_bytes = new_header.to_le_bytes().to_vec();
1731 new_page_bytes.extend_from_slice(&meta.to_le_bytes());
1732 descriptor.tail_page_pk = new_tail_pk;
1734 *tail_page_bytes = new_page_bytes;
1735 }
1736
1737 descriptor.total_row_count += meta.row_count;
1738 descriptor.total_chunk_count += 1;
1739 Ok(())
1740 }
1741
1742 pub fn verify_integrity(&self) -> Result<()> {
1752 let catalog = self.catalog.read().unwrap();
1753 for (&field_id, &descriptor_pk) in &catalog.map {
1754 let desc_blob = self
1755 .pager
1756 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1757 .pop()
1758 .and_then(|r| match r {
1759 GetResult::Raw { bytes, .. } => Some(bytes),
1760 _ => None,
1761 })
1762 .ok_or_else(|| {
1763 Error::Internal(format!(
1764 "Catalog points to missing descriptor pk={}",
1765 descriptor_pk
1766 ))
1767 })?;
1768 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1769 if descriptor.field_id != field_id {
1770 return Err(Error::Internal(format!(
1771 "Descriptor at pk={} has wrong field_id: expected {:?}, \
1772 got {:?}",
1773 descriptor_pk, field_id, descriptor.field_id
1774 )));
1775 }
1776
1777 let mut actual_rows = 0;
1778 let mut actual_chunks = 0;
1779 let mut current_page_pk = descriptor.head_page_pk;
1780 while current_page_pk != 0 {
1781 let page_blob = self
1782 .pager
1783 .batch_get(&[BatchGet::Raw {
1784 key: current_page_pk,
1785 }])?
1786 .pop()
1787 .and_then(|r| match r {
1788 GetResult::Raw { bytes, .. } => Some(bytes),
1789 _ => None,
1790 })
1791 .ok_or_else(|| {
1792 Error::Internal(format!(
1793 "Descriptor page chain broken at pk={}",
1794 current_page_pk
1795 ))
1796 })?;
1797 let header = DescriptorPageHeader::from_le_bytes(
1798 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1799 );
1800 for i in 0..(header.entry_count as usize) {
1801 let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
1802 let end = off + ChunkMetadata::DISK_SIZE;
1803 let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
1804 actual_rows += meta.row_count;
1805 actual_chunks += 1;
1806 }
1807 current_page_pk = header.next_page_pk;
1808 }
1809
1810 if descriptor.total_row_count != actual_rows {
1811 return Err(Error::Internal(format!(
1812 "Row count mismatch for field {:?}: descriptor says {}, \
1813 actual is {}",
1814 field_id, descriptor.total_row_count, actual_rows
1815 )));
1816 }
1817 if descriptor.total_chunk_count != actual_chunks {
1818 return Err(Error::Internal(format!(
1819 "Chunk count mismatch for field {:?}: descriptor says {}, \
1820 actual is {}",
1821 field_id, descriptor.total_chunk_count, actual_chunks
1822 )));
1823 }
1824 }
1825 Ok(())
1826 }
1827
1828 pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
1833 let catalog = self.catalog.read().unwrap();
1834 let mut all_stats = Vec::new();
1835
1836 for (&field_id, &descriptor_pk) in &catalog.map {
1837 let desc_blob = self
1838 .pager
1839 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1840 .pop()
1841 .and_then(|r| match r {
1842 GetResult::Raw { bytes, .. } => Some(bytes),
1843 _ => None,
1844 })
1845 .ok_or(Error::NotFound)?;
1846 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1847
1848 let mut page_stats = Vec::new();
1849 let mut current_page_pk = descriptor.head_page_pk;
1850 while current_page_pk != 0 {
1851 let page_blob = self
1852 .pager
1853 .batch_get(&[BatchGet::Raw {
1854 key: current_page_pk,
1855 }])?
1856 .pop()
1857 .and_then(|r| match r {
1858 GetResult::Raw { bytes, .. } => Some(bytes),
1859 _ => None,
1860 })
1861 .ok_or(Error::NotFound)?;
1862 let header = DescriptorPageHeader::from_le_bytes(
1863 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1864 );
1865 page_stats.push(DescriptorPageStats {
1866 page_pk: current_page_pk,
1867 entry_count: header.entry_count,
1868 page_size_bytes: page_blob.as_ref().len(),
1869 });
1870 current_page_pk = header.next_page_pk;
1871 }
1872
1873 all_stats.push(ColumnLayoutStats {
1874 field_id,
1875 total_rows: descriptor.total_row_count,
1876 total_chunks: descriptor.total_chunk_count,
1877 pages: page_stats,
1878 });
1879 }
1880 Ok(all_stats)
1881 }
1882}