1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4 ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16 constants::CATALOG_ROOT_PKEY,
17 pager::{BatchGet, BatchPut, GetResult, Pager},
18 serialization::{deserialize_array, serialize_array},
19 types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26pub struct ColumnStore<P: Pager> {
51 pub(crate) pager: Arc<P>,
52 pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
53 cfg: ColumnStoreConfig,
54 dtype_cache: DTypeCache<P>,
55 index_manager: IndexManager<P>,
56}
57
58impl<P> Clone for ColumnStore<P>
59where
60 P: Pager<Blob = EntryHandle> + Send + Sync,
61{
62 fn clone(&self) -> Self {
63 Self {
64 pager: Arc::clone(&self.pager),
65 catalog: Arc::clone(&self.catalog),
66 cfg: self.cfg.clone(),
67 dtype_cache: self.dtype_cache.clone(),
68 index_manager: self.index_manager.clone(),
69 }
70 }
71}
72
73impl<P> ColumnStore<P>
74where
75 P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77 pub fn open(pager: Arc<P>) -> Result<Self> {
87 let cfg = ColumnStoreConfig::default();
88 let catalog = match pager
89 .batch_get(&[BatchGet::Raw {
90 key: CATALOG_ROOT_PKEY,
91 }])?
92 .pop()
93 {
94 Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
95 _ => ColumnCatalog::default(),
96 };
97 let arc_catalog = Arc::new(RwLock::new(catalog));
98
99 let index_manager = IndexManager::new(Arc::clone(&pager));
100
101 Ok(Self {
102 pager: Arc::clone(&pager),
103 catalog: Arc::clone(&arc_catalog),
104 cfg,
105 dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
106 index_manager,
107 })
108 }
109
110 pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
120 self.index_manager.register_index(self, field_id, kind)
121 }
122
123 pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
125 let catalog = self.catalog.read().unwrap();
126 catalog.map.contains_key(&field_id)
127 }
128
129 pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
138 self.index_manager.unregister_index(self, field_id, kind)
139 }
140
141 pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
150 if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
151 return Ok(dt);
152 }
153 self.dtype_cache.dtype_for_field(field_id)
154 }
155
156 pub fn ensure_column_registered(
162 &self,
163 field_id: LogicalFieldId,
164 data_type: &DataType,
165 ) -> Result<()> {
166 let rid_field_id = rowid_fid(field_id);
167
168 let mut catalog_dirty = false;
169 let descriptor_pk;
170 let rid_descriptor_pk;
171
172 {
173 let mut catalog = self.catalog.write().unwrap();
174 descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
175 pk
176 } else {
177 let pk = self.pager.alloc_many(1)?[0];
178 catalog.map.insert(field_id, pk);
179 catalog_dirty = true;
180 pk
181 };
182
183 rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
184 pk
185 } else {
186 let pk = self.pager.alloc_many(1)?[0];
187 catalog.map.insert(rid_field_id, pk);
188 catalog_dirty = true;
189 pk
190 };
191 }
192
193 let mut puts: Vec<BatchPut> = Vec::new();
194
195 let data_descriptor_missing = self
196 .pager
197 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
198 .pop()
199 .and_then(|r| match r {
200 GetResult::Raw { bytes, .. } => Some(bytes),
201 _ => None,
202 })
203 .is_none();
204
205 if data_descriptor_missing {
206 let (mut descriptor, tail_page) =
207 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
208 let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
209 if fingerprint != 0 {
210 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
211 }
212 puts.push(BatchPut::Raw {
213 key: descriptor.tail_page_pk,
214 bytes: tail_page,
215 });
216 puts.push(BatchPut::Raw {
217 key: descriptor_pk,
218 bytes: descriptor.to_le_bytes(),
219 });
220 }
221
222 let rid_descriptor_missing = self
223 .pager
224 .batch_get(&[BatchGet::Raw {
225 key: rid_descriptor_pk,
226 }])?
227 .pop()
228 .and_then(|r| match r {
229 GetResult::Raw { bytes, .. } => Some(bytes),
230 _ => None,
231 })
232 .is_none();
233
234 if rid_descriptor_missing {
235 let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
236 Arc::clone(&self.pager),
237 rid_descriptor_pk,
238 rid_field_id,
239 )?;
240 let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
241 if fingerprint != 0 {
242 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
243 }
244 puts.push(BatchPut::Raw {
245 key: rid_descriptor.tail_page_pk,
246 bytes: tail_page,
247 });
248 puts.push(BatchPut::Raw {
249 key: rid_descriptor_pk,
250 bytes: rid_descriptor.to_le_bytes(),
251 });
252 }
253
254 self.dtype_cache.insert(field_id, data_type.clone());
255
256 if catalog_dirty {
257 let catalog_bytes = {
258 let catalog = self.catalog.read().unwrap();
259 catalog.to_bytes()
260 };
261 puts.push(BatchPut::Raw {
262 key: CATALOG_ROOT_PKEY,
263 bytes: catalog_bytes,
264 });
265 }
266
267 if !puts.is_empty() {
268 self.pager.batch_put(&puts)?;
269 }
270
271 Ok(())
272 }
273
274 pub fn filter_row_ids<T>(
284 &self,
285 field_id: LogicalFieldId,
286 predicate: &Predicate<T::Value>,
287 ) -> Result<Vec<u64>>
288 where
289 T: FilterDispatch,
290 {
291 tracing::trace!(field=?field_id, "filter_row_ids start");
292 let res = T::run_filter(self, field_id, predicate);
293 if let Err(ref err) = res {
294 tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
295 } else {
296 tracing::trace!(field=?field_id, "filter_row_ids ok");
297 }
298 res
299 }
300
301 pub fn filter_matches<T, F>(
316 &self,
317 field_id: LogicalFieldId,
318 predicate: F,
319 ) -> Result<FilterResult>
320 where
321 T: FilterPrimitive,
322 F: FnMut(T::Native) -> bool,
323 {
324 T::run_filter_with_result(self, field_id, predicate)
325 }
326
327 pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
336 let catalog = self.catalog.read().unwrap();
337 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
338
339 let desc_blob = self
340 .pager
341 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
342 .pop()
343 .and_then(|r| match r {
344 GetResult::Raw { bytes, .. } => Some(bytes),
345 _ => None,
346 })
347 .ok_or(Error::NotFound)?;
348 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
349
350 let kinds = descriptor.get_indexes()?;
351 Ok(kinds)
352 }
353
354 pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
363 let catalog = self.catalog.read().unwrap();
364 let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
365 drop(catalog);
366
367 let desc_blob = self
368 .pager
369 .batch_get(&[BatchGet::Raw { key: desc_pk }])?
370 .pop()
371 .and_then(|r| match r {
372 GetResult::Raw { bytes, .. } => Some(bytes),
373 _ => None,
374 })
375 .ok_or(Error::NotFound)?;
376
377 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
378 Ok(desc.total_row_count)
379 }
380
381 pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
390 use crate::types::Namespace;
391 let catalog = self.catalog.read().unwrap();
393 let candidates: Vec<LogicalFieldId> = catalog
395 .map
396 .keys()
397 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
398 .copied()
399 .collect();
400 drop(catalog);
401
402 if candidates.is_empty() {
403 return Ok(0);
404 }
405
406 let mut max_rows: u64 = 0;
408 for field in candidates {
409 let rows = self.total_rows_for_field(field)?;
410 if rows > max_rows {
411 max_rows = rows;
412 }
413 }
414 Ok(max_rows)
415 }
416
417 pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
423 use crate::types::Namespace;
424
425 let catalog = self.catalog.read().unwrap();
426 catalog
427 .map
428 .keys()
429 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
430 .copied()
431 .collect()
432 }
433
434 pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
444 let rid_fid = rowid_fid(field_id);
445 let catalog = self.catalog.read().unwrap();
446 let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
447 let rid_desc_blob = self
448 .pager
449 .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
450 .pop()
451 .and_then(|r| match r {
452 GetResult::Raw { bytes, .. } => Some(bytes),
453 _ => None,
454 })
455 .ok_or(Error::NotFound)?;
456 let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
457 drop(catalog);
458
459 for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
461 let meta = m?;
462 if meta.row_count == 0 {
463 continue;
464 }
465 if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
466 || row_id > meta.max_val_u64
467 {
468 continue;
469 }
470 let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
472 if meta.value_order_perm_pk != 0 {
473 gets.push(BatchGet::Raw {
474 key: meta.value_order_perm_pk,
475 });
476 }
477 let results = self.pager.batch_get(&gets)?;
478 let mut rid_blob: Option<EntryHandle> = None;
479 let mut perm_blob: Option<EntryHandle> = None;
480 for r in results {
481 if let GetResult::Raw { key, bytes } = r {
482 if key == meta.chunk_pk {
483 rid_blob = Some(bytes);
484 } else if key == meta.value_order_perm_pk {
485 perm_blob = Some(bytes);
486 }
487 }
488 }
489 let Some(rid_blob) = rid_blob else { continue };
491 let rid_any = deserialize_array(rid_blob)?;
492 let rids = rid_any
493 .as_any()
494 .downcast_ref::<UInt64Array>()
495 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
496 if let Some(pblob) = perm_blob {
497 let perm_any = deserialize_array(pblob)?;
498 let perm = perm_any
499 .as_any()
500 .downcast_ref::<UInt32Array>()
501 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
502 let mut lo: isize = 0;
504 let mut hi: isize = (perm.len() as isize) - 1;
505 while lo <= hi {
506 let mid = ((lo + hi) >> 1) as usize;
507 let rid = rids.value(perm.value(mid) as usize);
508 if rid == row_id {
509 return Ok(true);
510 } else if rid < row_id {
511 lo = mid as isize + 1;
512 } else {
513 hi = mid as isize - 1;
514 }
515 }
516 } else {
517 let mut lo: isize = 0;
519 let mut hi: isize = (rids.len() as isize) - 1;
520 while lo <= hi {
521 let mid = ((lo + hi) >> 1) as usize;
522 let rid = rids.value(mid);
523 if rid == row_id {
524 return Ok(true);
525 } else if rid < row_id {
526 lo = mid as isize + 1;
527 } else {
528 hi = mid as isize - 1;
529 }
530 }
531 }
532 }
533 Ok(false)
534 }
535
536 #[allow(unused_variables, unused_assignments)] pub fn append(&self, batch: &RecordBatch) -> Result<()> {
568 tracing::trace!(
569 num_columns = batch.num_columns(),
570 num_rows = batch.num_rows(),
571 "ColumnStore::append BEGIN"
572 );
573 let working_batch: RecordBatch;
579 let batch_ref = {
580 let schema = batch.schema();
581 let row_id_idx = schema
582 .index_of(ROW_ID_COLUMN_NAME)
583 .map_err(|_| Error::Internal("row_id column required".into()))?;
584 let row_id_any = batch.column(row_id_idx).clone();
585 let row_id_arr = row_id_any
586 .as_any()
587 .downcast_ref::<UInt64Array>()
588 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
589
590 let mut is_sorted = true;
592 if !row_id_arr.is_empty() {
593 let mut last = row_id_arr.value(0);
594 for i in 1..row_id_arr.len() {
595 let current = row_id_arr.value(i);
596 if current < last {
597 is_sorted = false;
598 break;
599 }
600 last = current;
601 }
602 }
603
604 if is_sorted {
607 batch
608 } else {
609 let sort_col = SortColumn {
610 values: row_id_any,
611 options: None,
612 };
613 let idx = lexsort_to_indices(&[sort_col], None)?;
614 let perm = idx
615 .as_any()
616 .downcast_ref::<UInt32Array>()
617 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
618 let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
619 for i in 0..batch.num_columns() {
620 cols.push(compute::take(batch.column(i), perm, None)?);
621 }
622 working_batch = RecordBatch::try_new(schema.clone(), cols)
623 .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
624 &working_batch
625 }
626 };
627
628 tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
629
630 let schema = batch_ref.schema();
635 let row_id_idx = schema
636 .index_of(ROW_ID_COLUMN_NAME)
637 .map_err(|_| Error::Internal("row_id column required".into()))?;
638
639 let row_id_arr = batch_ref
641 .column(row_id_idx)
642 .as_any()
643 .downcast_ref::<UInt64Array>()
644 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
645 let mut incoming_ids_map = FxHashMap::default();
646 incoming_ids_map.reserve(row_id_arr.len());
647 for i in 0..row_id_arr.len() {
648 incoming_ids_map.insert(row_id_arr.value(i), i);
649 }
650
651 let mut catalog_dirty = false;
653 let mut puts_rewrites: Vec<BatchPut> = Vec::new();
654 let mut all_rewritten_ids = FxHashSet::default();
655
656 let mut catalog_lock = self.catalog.write().unwrap();
658 for i in 0..batch_ref.num_columns() {
659 if i == row_id_idx {
660 continue;
661 }
662 let field = schema.field(i);
663 if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
664 let field_id = field_id_str
665 .parse::<u64>()
666 .map(LogicalFieldId::from)
667 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
668
669 let rewritten = self.lww_rewrite_for_field(
672 &mut catalog_lock,
673 field_id,
674 &incoming_ids_map,
675 batch_ref.column(i),
676 batch_ref.column(row_id_idx),
677 &mut puts_rewrites,
678 )?;
679 all_rewritten_ids.extend(rewritten);
680 }
681 }
682 drop(catalog_lock);
683
684 if !puts_rewrites.is_empty() {
686 self.pager.batch_put(&puts_rewrites)?;
687 }
688
689 tracing::trace!("ColumnStore::append PHASE 2 complete - LWW rewrites done");
690
691 let batch_to_append = if !all_rewritten_ids.is_empty() {
695 let keep_mask: Vec<bool> = (0..row_id_arr.len())
696 .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
697 .collect();
698 let keep_array = BooleanArray::from(keep_mask);
699 compute::filter_record_batch(batch_ref, &keep_array)?
700 } else {
701 batch_ref.clone()
702 };
703
704 if batch_to_append.num_rows() == 0 {
706 tracing::trace!("ColumnStore::append early exit - no new rows to append");
707 return Ok(());
708 }
709
710 tracing::trace!("ColumnStore::append PHASE 3 complete - filtered for new rows");
711
712 let append_schema = batch_to_append.schema();
716 let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
717 let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
718 let mut puts_appends: Vec<BatchPut> = Vec::new();
719
720 for (i, array) in batch_to_append.columns().iter().enumerate() {
722 if i == append_row_id_idx {
723 continue;
724 }
725
726 let field = append_schema.field(i);
727
728 let field_id = field
729 .metadata()
730 .get(crate::store::FIELD_ID_META_KEY)
731 .ok_or_else(|| Error::Internal("Missing field_id".into()))?
732 .parse::<u64>()
733 .map(LogicalFieldId::from)
734 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
735
736 self.dtype_cache.insert(field_id, field.data_type().clone());
739
740 let (array_clean, rids_clean) = if array.null_count() == 0 {
743 (array.clone(), append_row_id_any.clone())
744 } else {
745 let keep =
746 BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
747 let a = compute::filter(array, &keep)?;
748 let r = compute::filter(&append_row_id_any, &keep)?;
749 (a, r)
750 };
751
752 if array_clean.is_empty() {
753 continue;
754 }
755
756 let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
759 let mut catalog = self.catalog.write().unwrap();
760 let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
761 catalog_dirty = true;
762 self.pager.alloc_many(1).unwrap()[0]
763 });
764 let r_fid = rowid_fid(field_id);
765 let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
766 catalog_dirty = true;
767 self.pager.alloc_many(1).unwrap()[0]
768 });
769 (pk1, pk2, r_fid)
770 };
771
772 let (mut data_descriptor, mut data_tail_page) =
775 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
776 .map_err(|e| {
777 tracing::error!(
778 ?field_id,
779 descriptor_pk,
780 error = ?e,
781 "append: load_or_create failed for data descriptor"
782 );
783 e
784 })?;
785 let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
786 Arc::clone(&self.pager),
787 rid_descriptor_pk,
788 rid_fid,
789 )
790 .map_err(|e| {
791 tracing::error!(
792 ?rid_fid,
793 rid_descriptor_pk,
794 error = ?e,
795 "append: load_or_create failed for rid descriptor"
796 );
797 e
798 })?;
799
800 self.index_manager
804 .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
805
806 let slices = split_to_target_bytes(
808 &array_clean,
809 TARGET_CHUNK_BYTES,
810 self.cfg.varwidth_fallback_rows_per_slice,
811 );
812 let mut row_off = 0usize;
813
814 for s in slices {
816 let rows = s.len();
817 let data_pk = self.pager.alloc_many(1)?[0];
819 let s_norm = zero_offset(&s);
820 let data_bytes = serialize_array(s_norm.as_ref())?;
821 puts_appends.push(BatchPut::Raw {
822 key: data_pk,
823 bytes: data_bytes,
824 });
825
826 let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
828 let rid_norm = zero_offset(&rid_slice);
829 let rid_pk = self.pager.alloc_many(1)?[0];
830 let rid_bytes = serialize_array(rid_norm.as_ref())?;
831 puts_appends.push(BatchPut::Raw {
832 key: rid_pk,
833 bytes: rid_bytes,
834 });
835
836 let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
838 let (min, max) = if !rids_for_meta.is_empty() {
839 let mut min_val = rids_for_meta.value(0);
840 let mut max_val = rids_for_meta.value(0);
841 for i in 1..rids_for_meta.len() {
842 let v = rids_for_meta.value(i);
843 if v < min_val {
844 min_val = v;
845 }
846 if v > max_val {
847 max_val = v;
848 }
849 }
850 (min_val, max_val)
851 } else {
852 (0, 0)
853 };
854
855 let mut data_meta = ChunkMetadata {
858 chunk_pk: data_pk,
859 row_count: rows as u64,
860 serialized_bytes: s_norm.get_array_memory_size() as u64,
861 max_val_u64: u64::MAX,
862 ..Default::default()
863 };
864 let mut rid_meta = ChunkMetadata {
865 chunk_pk: rid_pk,
866 row_count: rows as u64,
867 serialized_bytes: rid_norm.get_array_memory_size() as u64,
868 min_val_u64: min,
869 max_val_u64: max,
870 ..Default::default()
871 };
872
873 self.index_manager.stage_updates_for_new_chunk(
879 field_id,
880 &data_descriptor,
881 &s_norm,
882 &rid_norm,
883 &mut data_meta,
884 &mut rid_meta,
885 &mut puts_appends,
886 )?;
887
888 self.append_meta_in_loop(
890 &mut data_descriptor,
891 &mut data_tail_page,
892 data_meta,
893 &mut puts_appends,
894 )?;
895 self.append_meta_in_loop(
896 &mut rid_descriptor,
897 &mut rid_tail_page,
898 rid_meta,
899 &mut puts_appends,
900 )?;
901 row_off += rows;
902 }
903
904 puts_appends.push(BatchPut::Raw {
907 key: data_descriptor.tail_page_pk,
908 bytes: data_tail_page,
909 });
910 puts_appends.push(BatchPut::Raw {
911 key: descriptor_pk,
912 bytes: data_descriptor.to_le_bytes(),
913 });
914 puts_appends.push(BatchPut::Raw {
915 key: rid_descriptor.tail_page_pk,
916 bytes: rid_tail_page,
917 });
918 puts_appends.push(BatchPut::Raw {
919 key: rid_descriptor_pk,
920 bytes: rid_descriptor.to_le_bytes(),
921 });
922 }
923
924 if catalog_dirty {
927 let catalog = self.catalog.read().unwrap();
928 puts_appends.push(BatchPut::Raw {
929 key: CATALOG_ROOT_PKEY,
930 bytes: catalog.to_bytes(),
931 });
932 }
933
934 if !puts_appends.is_empty() {
938 self.pager.batch_put(&puts_appends)?;
939 }
940 tracing::trace!("ColumnStore::append END - success");
941 Ok(())
942 }
943
944 fn lww_rewrite_for_field(
945 &self,
946 catalog: &mut ColumnCatalog,
947 field_id: LogicalFieldId,
948 incoming_ids_map: &FxHashMap<u64, usize>,
949 incoming_data: &ArrayRef,
950 incoming_row_ids: &ArrayRef,
951 puts: &mut Vec<BatchPut>,
952 ) -> Result<FxHashSet<u64>> {
953 use crate::store::descriptor::DescriptorIterator;
954 use crate::store::ingest::ChunkEdit;
955
956 if incoming_ids_map.is_empty() {
958 return Ok(FxHashSet::default());
959 }
960 let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
961
962 let desc_pk_data = match catalog.map.get(&field_id) {
964 Some(pk) => *pk,
965 None => return Ok(FxHashSet::default()),
966 };
967 let rid_fid = rowid_fid(field_id);
968 let desc_pk_rid = match catalog.map.get(&rid_fid) {
969 Some(pk) => *pk,
970 None => return Ok(FxHashSet::default()),
971 };
972
973 let gets = vec![
975 BatchGet::Raw { key: desc_pk_data },
976 BatchGet::Raw { key: desc_pk_rid },
977 ];
978 let results = self.pager.batch_get(&gets)?;
979 let mut blobs_by_pk = FxHashMap::default();
980 for r in results {
981 if let GetResult::Raw { key, bytes } = r {
982 blobs_by_pk.insert(key, bytes);
983 }
984 }
985
986 let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
987 tracing::error!(
988 ?field_id,
989 desc_pk_data,
990 "lww_rewrite: data descriptor blob not found in pager"
991 );
992 Error::NotFound
993 })?;
994 let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
995
996 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
997 tracing::error!(
998 ?rid_fid,
999 desc_pk_rid,
1000 "lww_rewrite: rid descriptor blob not found in pager"
1001 );
1002 Error::NotFound
1003 })?;
1004 let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1005
1006 tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
1007
1008 let mut metas_data: Vec<ChunkMetadata> = Vec::new();
1010 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1011 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
1012 metas_data.push(m.map_err(|e| {
1013 tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1014 e
1015 })?);
1016 }
1017 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1018 metas_rid.push(m.map_err(|e| {
1019 tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1020 e
1021 })?);
1022 }
1023
1024 tracing::trace!(
1025 ?field_id,
1026 data_chunks = metas_data.len(),
1027 rid_chunks = metas_rid.len(),
1028 "lww_rewrite: chunk metadata collected"
1029 );
1030
1031 let rid_in = incoming_row_ids
1033 .as_any()
1034 .downcast_ref::<UInt64Array>()
1035 .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1036 let mut ids_to_delete = FxHashSet::default();
1037 let mut ids_to_upsert = FxHashSet::default();
1038 for i in 0..rid_in.len() {
1039 let rid = rid_in.value(i);
1040 if incoming_data.is_null(i) {
1041 ids_to_delete.insert(rid);
1042 } else {
1043 ids_to_upsert.insert(rid);
1044 }
1045 }
1046
1047 let mut rewritten_ids = FxHashSet::default();
1049 let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1050 let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1051
1052 let n = metas_data.len().min(metas_rid.len());
1053 if n > 0 {
1054 let mut gets_rid = Vec::with_capacity(n);
1056 for rm in metas_rid.iter().take(n) {
1057 gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
1058 }
1059 let rid_results = self.pager.batch_get(&gets_rid)?;
1060 let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1061 for r in rid_results {
1062 if let GetResult::Raw { key, bytes } = r {
1063 rid_blobs.insert(key, bytes);
1064 }
1065 }
1066
1067 for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
1068 if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1069 let rid_arr_any = deserialize_array(rid_blob.clone())?;
1070 let rid_arr = rid_arr_any
1071 .as_any()
1072 .downcast_ref::<UInt64Array>()
1073 .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1074 for j in 0..rid_arr.len() {
1075 let rid = rid_arr.value(j);
1076 if incoming_ids.contains(&rid) {
1077 if ids_to_delete.contains(&rid) {
1078 hit_del.entry(i).or_default().push(rid);
1079 } else if ids_to_upsert.contains(&rid) {
1080 hit_up.entry(i).or_default().push(rid);
1081 }
1082 rewritten_ids.insert(rid);
1083 }
1084 }
1085 }
1086 }
1087 }
1088
1089 if hit_up.is_empty() && hit_del.is_empty() {
1090 return Ok(rewritten_ids);
1091 }
1092
1093 let mut hit_set = FxHashSet::default();
1095 hit_set.extend(hit_up.keys().copied());
1096 hit_set.extend(hit_del.keys().copied());
1097 let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1098
1099 let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1100 for &i in &hit_idxs {
1101 gets.push(BatchGet::Raw {
1102 key: metas_data[i].chunk_pk,
1103 });
1104 gets.push(BatchGet::Raw {
1105 key: metas_rid[i].chunk_pk,
1106 });
1107 }
1108 let results = self.pager.batch_get(&gets)?;
1109 let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1110 for r in results {
1111 if let GetResult::Raw { key, bytes } = r {
1112 blob_map.insert(key, bytes);
1113 }
1114 }
1115
1116 for i in hit_idxs {
1118 let old_data_arr =
1119 deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1120 let old_rid_arr_any =
1121 deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1122 let old_rid_arr = old_rid_arr_any
1123 .as_any()
1124 .downcast_ref::<UInt64Array>()
1125 .unwrap();
1126
1127 let up_vec = hit_up.remove(&i).unwrap_or_default();
1128 let del_vec = hit_del.remove(&i).unwrap_or_default();
1129
1130 let edit = ChunkEdit::from_lww_upsert(
1132 old_rid_arr,
1133 &up_vec,
1134 &del_vec,
1135 incoming_data,
1136 incoming_row_ids,
1137 incoming_ids_map,
1138 )?;
1139
1140 let (new_data_arr, new_rid_arr) =
1141 ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1142
1143 let data_bytes = serialize_array(&new_data_arr)?;
1145 puts.push(BatchPut::Raw {
1146 key: metas_data[i].chunk_pk,
1147 bytes: data_bytes,
1148 });
1149 metas_data[i].row_count = new_data_arr.len() as u64;
1150 metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1151
1152 if let Some(rarr) = new_rid_arr {
1154 let rid_bytes = serialize_array(&rarr)?;
1155 puts.push(BatchPut::Raw {
1156 key: metas_rid[i].chunk_pk,
1157 bytes: rid_bytes,
1158 });
1159 metas_rid[i].row_count = rarr.len() as u64;
1160 metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1161 }
1162
1163 if metas_data[i].value_order_perm_pk != 0 {
1165 let sort_col = SortColumn {
1166 values: new_data_arr,
1167 options: None,
1168 };
1169 let idx = lexsort_to_indices(&[sort_col], None)?;
1170 let perm_bytes = serialize_array(&idx)?;
1171 puts.push(BatchPut::Raw {
1172 key: metas_data[i].value_order_perm_pk,
1173 bytes: perm_bytes,
1174 });
1175 }
1176 }
1177
1178 descriptor_data.rewrite_pages(
1180 Arc::clone(&self.pager),
1181 desc_pk_data,
1182 &mut metas_data,
1183 puts,
1184 )?;
1185 descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1186
1187 Ok(rewritten_ids)
1188 }
1189
1190 fn stage_delete_rows_for_field(
1191 &self,
1192 field_id: LogicalFieldId,
1193 rows_to_delete: &[RowId],
1194 staged_puts: &mut Vec<BatchPut>,
1195 ) -> Result<bool> {
1196 tracing::warn!(
1197 field_id = ?field_id,
1198 rows = rows_to_delete.len(),
1199 "delete_rows stage_delete_rows_for_field: start"
1200 );
1201 use crate::store::descriptor::DescriptorIterator;
1202 use crate::store::ingest::ChunkEdit;
1203
1204 if rows_to_delete.is_empty() {
1205 return Ok(false);
1206 }
1207
1208 let mut del_iter = rows_to_delete.iter().copied();
1210 let mut cur_del = del_iter.next();
1211 let mut last_seen: Option<u64> = cur_del;
1212
1213 let catalog = self.catalog.read().unwrap();
1215 let desc_pk = match catalog.map.get(&field_id) {
1216 Some(pk) => *pk,
1217 None => {
1218 tracing::trace!(
1219 field_id = ?field_id,
1220 "delete_rows stage_delete_rows_for_field: data descriptor missing"
1221 );
1222 return Err(Error::NotFound);
1223 }
1224 };
1225 let rid_fid = rowid_fid(field_id);
1226 let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1227 tracing::warn!(
1228 field_id = ?field_id,
1229 desc_pk,
1230 desc_pk_rid = ?desc_pk_rid,
1231 "delete_rows stage_delete_rows_for_field: descriptor keys"
1232 );
1233
1234 let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1236 if let Some(pk) = desc_pk_rid {
1237 gets.push(BatchGet::Raw { key: pk });
1238 }
1239 let results = match self.pager.batch_get(&gets) {
1240 Ok(res) => res,
1241 Err(err) => {
1242 tracing::trace!(
1243 field_id = ?field_id,
1244 error = ?err,
1245 "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1246 );
1247 return Err(err);
1248 }
1249 };
1250 let mut blobs_by_pk = FxHashMap::default();
1251 for res in results {
1252 if let GetResult::Raw { key, bytes } = res {
1253 blobs_by_pk.insert(key, bytes);
1254 }
1255 }
1256
1257 tracing::warn!(
1258 field_id = ?field_id,
1259 desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1260 rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1261 "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1262 );
1263
1264 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1265 tracing::trace!(
1266 field_id = ?field_id,
1267 desc_pk,
1268 "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1269 );
1270 Error::Internal(format!(
1271 "descriptor pk={} missing during delete_rows for field {:?}",
1272 desc_pk, field_id
1273 ))
1274 })?;
1275 let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1276
1277 let mut metas: Vec<ChunkMetadata> = Vec::new();
1279 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1280 metas.push(m?);
1281 }
1282 if metas.is_empty() {
1283 drop(catalog);
1284 return Ok(false);
1285 }
1286
1287 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1289 let mut descriptor_rid: Option<ColumnDescriptor> = None;
1290 tracing::warn!(
1291 field_id = ?field_id,
1292 metas_len = metas.len(),
1293 desc_pk_rid = ?desc_pk_rid,
1294 "delete_rows stage_delete_rows_for_field: data metas loaded"
1295 );
1296 if let Some(pk_rid) = desc_pk_rid
1297 && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1298 {
1299 let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1300 for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1301 metas_rid.push(m?);
1302 }
1303 descriptor_rid = Some(d_rid);
1304 }
1305
1306 tracing::warn!(
1307 field_id = ?field_id,
1308 metas_rid_len = metas_rid.len(),
1309 "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1310 );
1311
1312 let mut cum_rows = 0u64;
1313 let mut any_changed = false;
1314
1315 for (i, meta) in metas.iter_mut().enumerate() {
1316 let start_u64 = cum_rows;
1317 let end_u64 = start_u64 + meta.row_count;
1318
1319 while let Some(d) = cur_del {
1321 if d < start_u64
1322 && let Some(prev) = last_seen
1323 {
1324 if d < prev {
1325 return Err(Error::Internal(
1326 "rows_to_delete must be ascending/unique".into(),
1327 ));
1328 }
1329
1330 last_seen = Some(d);
1331 cur_del = del_iter.next();
1332 } else {
1333 break;
1334 }
1335 }
1336
1337 let rows = meta.row_count as usize;
1339 let mut del_local: FxHashSet<usize> = FxHashSet::default();
1340 while let Some(d) = cur_del {
1341 if d >= end_u64 {
1342 break;
1343 }
1344 del_local.insert((d - start_u64) as usize);
1345 last_seen = Some(d);
1346 cur_del = del_iter.next();
1347 }
1348
1349 if del_local.is_empty() {
1350 cum_rows = end_u64;
1351 continue;
1352 }
1353
1354 let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1356 if let Some(rm) = metas_rid.get(i) {
1357 chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1358 }
1359 let chunk_results = match self.pager.batch_get(&chunk_gets) {
1360 Ok(res) => res,
1361 Err(err) => {
1362 tracing::trace!(
1363 field_id = ?field_id,
1364 chunk_pk = meta.chunk_pk,
1365 error = ?err,
1366 "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1367 );
1368 return Err(err);
1369 }
1370 };
1371 let mut chunk_blobs = FxHashMap::default();
1372 for res in chunk_results {
1373 if let GetResult::Raw { key, bytes } = res {
1374 chunk_blobs.insert(key, bytes);
1375 }
1376 }
1377
1378 tracing::warn!(
1379 field_id = ?field_id,
1380 chunk_pk = meta.chunk_pk,
1381 rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1382 data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1383 rid_found = metas_rid
1384 .get(i)
1385 .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1386 "delete_rows stage_delete_rows_for_field: chunk fetch status"
1387 );
1388
1389 let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1390 Some(bytes) => bytes,
1391 None => {
1392 tracing::trace!(
1393 field_id = ?field_id,
1394 chunk_pk = meta.chunk_pk,
1395 "delete_rows stage_delete_rows_for_field: chunk missing"
1396 );
1397 return Err(Error::NotFound);
1398 }
1399 };
1400 let data_arr = deserialize_array(data_blob)?;
1401
1402 let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1403 let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1404 Some(bytes) => bytes,
1405 None => {
1406 tracing::trace!(
1407 field_id = ?field_id,
1408 rowid_chunk_pk = rm.chunk_pk,
1409 "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1410 );
1411 return Err(Error::NotFound);
1412 }
1413 };
1414 Some(deserialize_array(rid_blob)?)
1415 } else {
1416 None
1417 };
1418
1419 let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1421
1422 let (new_data_arr, new_rid_arr) =
1424 ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1425
1426 let data_bytes = serialize_array(&new_data_arr)?;
1428 staged_puts.push(BatchPut::Raw {
1429 key: meta.chunk_pk,
1430 bytes: data_bytes,
1431 });
1432 meta.row_count = new_data_arr.len() as u64;
1433 meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1434
1435 if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1437 let rm = metas_rid.get_mut(i).unwrap();
1438 let rid_bytes = serialize_array(&rids)?;
1439 staged_puts.push(BatchPut::Raw {
1440 key: rm.chunk_pk,
1441 bytes: rid_bytes,
1442 });
1443 rm.row_count = rids.len() as u64;
1444 rm.serialized_bytes = rids.get_array_memory_size() as u64;
1445 }
1446
1447 if meta.value_order_perm_pk != 0 {
1449 let sort_column = SortColumn {
1450 values: new_data_arr,
1451 options: None,
1452 };
1453 let indices = lexsort_to_indices(&[sort_column], None)?;
1454 let perm_bytes = serialize_array(&indices)?;
1455 staged_puts.push(BatchPut::Raw {
1456 key: meta.value_order_perm_pk,
1457 bytes: perm_bytes,
1458 });
1459 }
1460
1461 cum_rows = end_u64;
1462 any_changed = true;
1463 }
1464
1465 descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1467 if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1468 rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1469 }
1470 drop(catalog);
1471 tracing::trace!(
1472 field_id = ?field_id,
1473 changed = any_changed,
1474 "delete_rows stage_delete_rows_for_field: finished stage"
1475 );
1476 Ok(any_changed)
1477 }
1478
1479 pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1485 if fields.is_empty() || rows_to_delete.is_empty() {
1486 return Ok(());
1487 }
1488
1489 let mut puts = Vec::new();
1490 let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1491 let mut table_id: Option<TableId> = None;
1492
1493 tracing::warn!(
1494 fields = fields.len(),
1495 rows = rows_to_delete.len(),
1496 "delete_rows begin"
1497 );
1498 for field_id in fields {
1499 tracing::warn!(field = ?field_id, "delete_rows iter field");
1500 if let Some(expected) = table_id {
1501 if field_id.table_id() != expected {
1502 return Err(Error::InvalidArgumentError(
1503 "delete_rows requires fields from the same table".into(),
1504 ));
1505 }
1506 } else {
1507 table_id = Some(field_id.table_id());
1508 }
1509
1510 if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1511 touched.insert(*field_id);
1512 }
1513 }
1514
1515 if puts.is_empty() {
1516 return Ok(());
1517 }
1518
1519 self.pager.batch_put(&puts)?;
1520
1521 tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1522
1523 for field_id in touched {
1524 self.compact_field_bounded(field_id)?;
1525 }
1526 tracing::warn!("delete_rows complete");
1527 Ok(())
1528 }
1529
1530 pub(crate) fn write_descriptor_chain(
1536 &self,
1537 descriptor_pk: PhysicalKey,
1538 descriptor: &mut ColumnDescriptor,
1539 new_metas: &[ChunkMetadata],
1540 puts: &mut Vec<BatchPut>,
1541 frees: &mut Vec<PhysicalKey>,
1542 ) -> Result<()> {
1543 let mut old_pages = Vec::new();
1545 let mut pk = descriptor.head_page_pk;
1546 while pk != 0 {
1547 let page_blob = self
1548 .pager
1549 .batch_get(&[BatchGet::Raw { key: pk }])?
1550 .pop()
1551 .and_then(|res| match res {
1552 GetResult::Raw { bytes, .. } => Some(bytes),
1553 _ => None,
1554 })
1555 .ok_or(Error::NotFound)?;
1556 let header = DescriptorPageHeader::from_le_bytes(
1557 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1558 );
1559 old_pages.push(pk);
1560 pk = header.next_page_pk;
1561 }
1562
1563 let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1565 let need_pages = if new_metas.is_empty() {
1566 0
1567 } else {
1568 new_metas.len().div_ceil(per)
1569 };
1570 if need_pages == 0 {
1572 frees.extend(old_pages.iter().copied());
1573
1574 descriptor.head_page_pk = 0;
1579 descriptor.tail_page_pk = 0;
1580 descriptor.total_row_count = 0;
1581 descriptor.total_chunk_count = 0;
1582 puts.push(BatchPut::Raw {
1583 key: descriptor_pk,
1584 bytes: descriptor.to_le_bytes(),
1585 });
1586 return Ok(());
1587 }
1588
1589 let mut pages = Vec::with_capacity(need_pages);
1591 if !old_pages.is_empty() {
1592 pages.push(old_pages[0]);
1593 } else {
1594 pages.push(self.pager.alloc_many(1)?[0]);
1595 descriptor.head_page_pk = pages[0];
1596 }
1597 if need_pages > pages.len() {
1598 let extra = self.pager.alloc_many(need_pages - pages.len())?;
1599 pages.extend(extra);
1600 }
1601
1602 if old_pages.len() > need_pages {
1604 frees.extend(old_pages[need_pages..].iter().copied());
1605 }
1606
1607 let mut off = 0usize;
1609 for (i, page_pk) in pages.iter().copied().enumerate() {
1610 let remain = new_metas.len() - off;
1611 let count = remain.min(per);
1612 let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1613 let header = DescriptorPageHeader {
1614 next_page_pk: next,
1615 entry_count: count as u32,
1616 _padding: [0; 4],
1617 };
1618 let mut page_bytes = header.to_le_bytes().to_vec();
1619 for m in &new_metas[off..off + count] {
1620 page_bytes.extend_from_slice(&m.to_le_bytes());
1621 }
1622 puts.push(BatchPut::Raw {
1623 key: page_pk,
1624 bytes: page_bytes,
1625 });
1626 off += count;
1627 }
1628
1629 descriptor.tail_page_pk = *pages.last().unwrap();
1630 descriptor.total_chunk_count = new_metas.len() as u64;
1631 descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1632 puts.push(BatchPut::Raw {
1633 key: descriptor_pk,
1634 bytes: descriptor.to_le_bytes(),
1635 });
1636 Ok(())
1637 }
1638
1639 fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1643 let mut catalog = self.catalog.write().unwrap();
1645
1646 let desc_pk = match catalog.map.get(&field_id) {
1647 Some(&pk) => pk,
1648 None => return Ok(()),
1649 };
1650 let rid_fid = rowid_fid(field_id);
1651 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1652 Some(&pk) => pk,
1653 None => return Ok(()),
1654 };
1655
1656 let gets = vec![
1658 BatchGet::Raw { key: desc_pk },
1659 BatchGet::Raw { key: desc_pk_rid },
1660 ];
1661 let results = self.pager.batch_get(&gets)?;
1662 let mut blobs_by_pk = FxHashMap::default();
1663 for res in results {
1664 if let GetResult::Raw { key, bytes } = res {
1665 blobs_by_pk.insert(key, bytes);
1666 }
1667 }
1668 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1669 let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1670 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1671 let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1672
1673 let mut metas = Vec::new();
1675 for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1676 metas.push(m?);
1677 }
1678 let mut metas_rid = Vec::new();
1679 for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1680 metas_rid.push(m?);
1681 }
1682 if metas.is_empty() || metas_rid.is_empty() {
1683 return Ok(());
1684 }
1685
1686 let mut puts: Vec<BatchPut> = Vec::new();
1687 let mut frees: Vec<PhysicalKey> = Vec::new();
1688 let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1689 let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1690
1691 let mut i = 0usize;
1692 while i < metas.len() {
1693 let sz = metas[i].serialized_bytes as usize;
1694 if sz >= MIN_CHUNK_BYTES {
1696 new_metas.push(metas[i]);
1697 new_rid_metas.push(metas_rid[i]);
1698 i += 1;
1699 continue;
1700 }
1701
1702 let mut j = i;
1704 let mut run_bytes = 0usize;
1705 while j < metas.len() {
1706 let b = metas[j].serialized_bytes as usize;
1707 if b >= TARGET_CHUNK_BYTES {
1708 break;
1709 }
1710 if run_bytes + b > MAX_MERGE_RUN_BYTES {
1711 break;
1712 }
1713 run_bytes += b;
1714 j += 1;
1715 }
1716 if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1717 new_metas.push(metas[i]);
1718 new_rid_metas.push(metas_rid[i]);
1719 i += 1;
1720 continue;
1721 }
1722
1723 let mut gets = Vec::with_capacity((j - i) * 2);
1725 for k in i..j {
1726 gets.push(BatchGet::Raw {
1727 key: metas[k].chunk_pk,
1728 });
1729 gets.push(BatchGet::Raw {
1730 key: metas_rid[k].chunk_pk,
1731 });
1732 }
1733 let results = self.pager.batch_get(&gets)?;
1734 let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1735 for r in results {
1736 match r {
1737 GetResult::Raw { key, bytes } => {
1738 by_pk.insert(key, bytes);
1739 }
1740 _ => return Err(Error::NotFound),
1741 }
1742 }
1743 let mut data_parts = Vec::with_capacity(j - i);
1744 let mut rid_parts = Vec::with_capacity(j - i);
1745 for k in i..j {
1746 let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1747 data_parts.push(deserialize_array(db.clone())?);
1748 let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1749 rid_parts.push(deserialize_array(rb.clone())?);
1750 }
1751 let merged_data = concat_many(data_parts.iter().collect())?;
1752 let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1753
1754 let slices = split_to_target_bytes(
1756 &merged_data,
1757 TARGET_CHUNK_BYTES,
1758 self.cfg.varwidth_fallback_rows_per_slice,
1759 );
1760 let mut rid_off = 0usize;
1761 let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1762
1763 for s in slices {
1764 let rows = s.len();
1765 let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1767 let rid_norm = zero_offset(&rid_ref);
1768 let rid_pk = self.pager.alloc_many(1)?[0];
1769 let rid_bytes = serialize_array(rid_norm.as_ref())?;
1770
1771 let data_pk = self.pager.alloc_many(1)?[0];
1772 let s_norm = zero_offset(&s);
1773 let data_bytes = serialize_array(s_norm.as_ref())?;
1774 puts.push(BatchPut::Raw {
1775 key: data_pk,
1776 bytes: data_bytes,
1777 });
1778 puts.push(BatchPut::Raw {
1779 key: rid_pk,
1780 bytes: rid_bytes,
1781 });
1782 let mut meta = ChunkMetadata {
1783 chunk_pk: data_pk,
1784 row_count: rows as u64,
1785 serialized_bytes: s_norm.get_array_memory_size() as u64,
1786 max_val_u64: u64::MAX,
1787 ..Default::default()
1788 };
1789 if need_perms {
1791 let sort_col = SortColumn {
1792 values: s.clone(),
1793 options: None,
1794 };
1795 let idx = lexsort_to_indices(&[sort_col], None)?;
1796 let perm_bytes = serialize_array(&idx)?;
1797 let perm_pk = self.pager.alloc_many(1)?[0];
1798 puts.push(BatchPut::Raw {
1799 key: perm_pk,
1800 bytes: perm_bytes,
1801 });
1802 meta.value_order_perm_pk = perm_pk;
1803 }
1804
1805 let rid_any = rid_norm.clone();
1807 let rids = rid_any
1808 .as_any()
1809 .downcast_ref::<UInt64Array>()
1810 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1811 let mut min = u64::MAX;
1812 let mut max = 0u64;
1813 let mut sorted_rids = true;
1814 let mut last_v = 0u64;
1815 for ii in 0..rids.len() {
1816 let v = rids.value(ii);
1817 if ii == 0 {
1818 last_v = v;
1819 } else if v < last_v {
1820 sorted_rids = false;
1821 } else {
1822 last_v = v;
1823 }
1824 if v < min {
1825 min = v;
1826 }
1827 if v > max {
1828 max = v;
1829 }
1830 }
1831 let mut rid_perm_pk = 0u64;
1832 if !sorted_rids {
1833 let rid_sort_col = SortColumn {
1834 values: rid_any,
1835 options: None,
1836 };
1837 let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1838 let rid_perm_bytes = serialize_array(&rid_idx)?;
1839 rid_perm_pk = self.pager.alloc_many(1)?[0];
1840 puts.push(BatchPut::Raw {
1841 key: rid_perm_pk,
1842 bytes: rid_perm_bytes,
1843 });
1844 }
1845 let rid_meta = ChunkMetadata {
1846 chunk_pk: rid_pk,
1847 value_order_perm_pk: rid_perm_pk,
1848 row_count: rows as u64,
1849 serialized_bytes: rid_norm.get_array_memory_size() as u64,
1850 min_val_u64: if rows > 0 { min } else { 0 },
1851 max_val_u64: if rows > 0 { max } else { 0 },
1852 };
1853 new_metas.push(meta);
1854 new_rid_metas.push(rid_meta);
1855 rid_off += rows;
1856 }
1857
1858 for k in i..j {
1860 frees.push(metas[k].chunk_pk);
1861 if metas[k].value_order_perm_pk != 0 {
1862 frees.push(metas[k].value_order_perm_pk);
1863 }
1864 frees.push(metas_rid[k].chunk_pk);
1865 if metas_rid[k].value_order_perm_pk != 0 {
1866 frees.push(metas_rid[k].value_order_perm_pk);
1867 }
1868 }
1869
1870 i = j;
1871 }
1872
1873 if new_metas.is_empty() {
1875 self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1877 self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1878 catalog.map.remove(&field_id);
1879 catalog.map.remove(&rid_fid);
1880 puts.push(BatchPut::Raw {
1881 key: CATALOG_ROOT_PKEY,
1882 bytes: catalog.to_bytes(),
1883 });
1884 if !puts.is_empty() {
1885 self.pager.batch_put(&puts)?;
1886 }
1887 if !frees.is_empty() {
1888 self.pager.free_many(&frees)?;
1889 }
1890 return Ok(());
1891 }
1892
1893 self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1895 self.write_descriptor_chain(
1896 desc_pk_rid,
1897 &mut desc_rid,
1898 &new_rid_metas,
1899 &mut puts,
1900 &mut frees,
1901 )?;
1902 if !puts.is_empty() {
1904 self.pager.batch_put(&puts)?;
1905 }
1906 if !frees.is_empty() {
1907 self.pager.free_many(&frees)?;
1908 }
1909
1910 Ok(())
1911 }
1912
1913 fn append_meta_in_loop(
1916 &self,
1917 descriptor: &mut ColumnDescriptor,
1918 tail_page_bytes: &mut Vec<u8>,
1919 meta: ChunkMetadata,
1920 puts: &mut Vec<BatchPut>,
1921 ) -> Result<()> {
1922 let mut header = DescriptorPageHeader::from_le_bytes(
1923 &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1924 );
1925 if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1926 && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1927 {
1928 tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1930 header.entry_count += 1;
1931 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1932 .copy_from_slice(&header.to_le_bytes());
1933 } else {
1934 let new_tail_pk = self.pager.alloc_many(1)?[0];
1936 header.next_page_pk = new_tail_pk;
1937 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1938 .copy_from_slice(&header.to_le_bytes());
1939 let full_page_to_write = std::mem::take(tail_page_bytes);
1942 puts.push(BatchPut::Raw {
1943 key: descriptor.tail_page_pk,
1944 bytes: full_page_to_write,
1945 });
1946 let new_header = DescriptorPageHeader {
1948 next_page_pk: 0,
1949 entry_count: 1,
1950 _padding: [0; 4],
1951 };
1952 let mut new_page_bytes = new_header.to_le_bytes().to_vec();
1953 new_page_bytes.extend_from_slice(&meta.to_le_bytes());
1954 descriptor.tail_page_pk = new_tail_pk;
1956 *tail_page_bytes = new_page_bytes;
1957 }
1958
1959 descriptor.total_row_count += meta.row_count;
1960 descriptor.total_chunk_count += 1;
1961 Ok(())
1962 }
1963
1964 pub fn verify_integrity(&self) -> Result<()> {
1974 let catalog = self.catalog.read().unwrap();
1975 for (&field_id, &descriptor_pk) in &catalog.map {
1976 let desc_blob = self
1977 .pager
1978 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1979 .pop()
1980 .and_then(|r| match r {
1981 GetResult::Raw { bytes, .. } => Some(bytes),
1982 _ => None,
1983 })
1984 .ok_or_else(|| {
1985 Error::Internal(format!(
1986 "Catalog points to missing descriptor pk={}",
1987 descriptor_pk
1988 ))
1989 })?;
1990 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1991 if descriptor.field_id != field_id {
1992 return Err(Error::Internal(format!(
1993 "Descriptor at pk={} has wrong field_id: expected {:?}, \
1994 got {:?}",
1995 descriptor_pk, field_id, descriptor.field_id
1996 )));
1997 }
1998
1999 let mut actual_rows = 0;
2000 let mut actual_chunks = 0;
2001 let mut current_page_pk = descriptor.head_page_pk;
2002 while current_page_pk != 0 {
2003 let page_blob = self
2004 .pager
2005 .batch_get(&[BatchGet::Raw {
2006 key: current_page_pk,
2007 }])?
2008 .pop()
2009 .and_then(|r| match r {
2010 GetResult::Raw { bytes, .. } => Some(bytes),
2011 _ => None,
2012 })
2013 .ok_or_else(|| {
2014 Error::Internal(format!(
2015 "Descriptor page chain broken at pk={}",
2016 current_page_pk
2017 ))
2018 })?;
2019 let header = DescriptorPageHeader::from_le_bytes(
2020 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2021 );
2022 for i in 0..(header.entry_count as usize) {
2023 let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2024 let end = off + ChunkMetadata::DISK_SIZE;
2025 let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2026 actual_rows += meta.row_count;
2027 actual_chunks += 1;
2028 }
2029 current_page_pk = header.next_page_pk;
2030 }
2031
2032 if descriptor.total_row_count != actual_rows {
2033 return Err(Error::Internal(format!(
2034 "Row count mismatch for field {:?}: descriptor says {}, \
2035 actual is {}",
2036 field_id, descriptor.total_row_count, actual_rows
2037 )));
2038 }
2039 if descriptor.total_chunk_count != actual_chunks {
2040 return Err(Error::Internal(format!(
2041 "Chunk count mismatch for field {:?}: descriptor says {}, \
2042 actual is {}",
2043 field_id, descriptor.total_chunk_count, actual_chunks
2044 )));
2045 }
2046 }
2047 Ok(())
2048 }
2049
2050 pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2055 let catalog = self.catalog.read().unwrap();
2056 let mut all_stats = Vec::new();
2057
2058 for (&field_id, &descriptor_pk) in &catalog.map {
2059 let desc_blob = self
2060 .pager
2061 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2062 .pop()
2063 .and_then(|r| match r {
2064 GetResult::Raw { bytes, .. } => Some(bytes),
2065 _ => None,
2066 })
2067 .ok_or(Error::NotFound)?;
2068 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2069
2070 let mut page_stats = Vec::new();
2071 let mut current_page_pk = descriptor.head_page_pk;
2072 while current_page_pk != 0 {
2073 let page_blob = self
2074 .pager
2075 .batch_get(&[BatchGet::Raw {
2076 key: current_page_pk,
2077 }])?
2078 .pop()
2079 .and_then(|r| match r {
2080 GetResult::Raw { bytes, .. } => Some(bytes),
2081 _ => None,
2082 })
2083 .ok_or(Error::NotFound)?;
2084 let header = DescriptorPageHeader::from_le_bytes(
2085 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2086 );
2087 page_stats.push(DescriptorPageStats {
2088 page_pk: current_page_pk,
2089 entry_count: header.entry_count,
2090 page_size_bytes: page_blob.as_ref().len(),
2091 });
2092 current_page_pk = header.next_page_pk;
2093 }
2094
2095 all_stats.push(ColumnLayoutStats {
2096 field_id,
2097 total_rows: descriptor.total_row_count,
2098 total_chunks: descriptor.total_chunk_count,
2099 pages: page_stats,
2100 });
2101 }
2102 Ok(all_stats)
2103 }
2104}