1use super::*;
2use crate::serialization::{deserialize_array, serialize_array};
3use crate::store::catalog::ColumnCatalog;
4use crate::store::descriptor::{
5 ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
6};
7use crate::store::scan::filter::FilterDispatch;
8use crate::store::scan::{FilterPrimitive, FilterResult};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16 constants::CATALOG_ROOT_PKEY,
17 pager::{BatchGet, BatchPut, GetResult, Pager},
18 types::PhysicalKey,
19};
20use llkv_types::ids::{LogicalFieldId, RowId, TableId};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26pub struct ColumnStore<P: Pager> {
60 pub(crate) pager: Arc<P>,
61 pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
62 cfg: ColumnStoreConfig,
63 dtype_cache: DTypeCache<P>,
64 index_manager: IndexManager<P>,
65}
66
67impl<P> Clone for ColumnStore<P>
68where
69 P: Pager<Blob = EntryHandle> + Send + Sync,
70{
71 fn clone(&self) -> Self {
72 Self {
73 pager: Arc::clone(&self.pager),
74 catalog: Arc::clone(&self.catalog),
75 cfg: self.cfg.clone(),
76 dtype_cache: self.dtype_cache.clone(),
77 index_manager: self.index_manager.clone(),
78 }
79 }
80}
81
82impl<P> ColumnStore<P>
83where
84 P: Pager<Blob = EntryHandle> + Send + Sync,
85{
86 pub fn open(pager: Arc<P>) -> Result<Self> {
96 let cfg = ColumnStoreConfig::default();
97 let catalog = match pager
98 .batch_get(&[BatchGet::Raw {
99 key: CATALOG_ROOT_PKEY,
100 }])?
101 .pop()
102 {
103 Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
104 _ => ColumnCatalog::default(),
105 };
106 let arc_catalog = Arc::new(RwLock::new(catalog));
107
108 let index_manager = IndexManager::new(Arc::clone(&pager));
109
110 Ok(Self {
111 pager: Arc::clone(&pager),
112 catalog: Arc::clone(&arc_catalog),
113 cfg,
114 dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
115 index_manager,
116 })
117 }
118
119 pub fn write_hints(&self) -> ColumnStoreWriteHints {
121 ColumnStoreWriteHints::from_config(&self.cfg)
122 }
123
124 pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
134 self.index_manager.register_index(self, field_id, kind)
135 }
136
137 pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
139 let catalog = self.catalog.read().unwrap();
140 catalog.map.contains_key(&field_id)
141 }
142
143 pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
152 self.index_manager.unregister_index(self, field_id, kind)
153 }
154
155 pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
164 if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
165 return Ok(dt);
166 }
167 self.dtype_cache.dtype_for_field(field_id)
168 }
169
170 pub fn update_data_type(
179 &self,
180 field_id: LogicalFieldId,
181 new_data_type: &DataType,
182 ) -> Result<()> {
183 let descriptor_pk = {
185 let catalog = self.catalog.read().unwrap();
186 *catalog.map.get(&field_id).ok_or_else(|| Error::NotFound)?
187 };
188
189 let mut descriptor = match self
191 .pager
192 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
193 .pop()
194 {
195 Some(GetResult::Raw { bytes, .. }) => ColumnDescriptor::from_le_bytes(bytes.as_ref()),
196 _ => return Err(Error::NotFound),
197 };
198
199 let new_fingerprint = DTypeCache::<P>::dtype_fingerprint(new_data_type);
201 if new_fingerprint != 0 {
202 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, new_fingerprint);
203 }
204
205 self.pager.batch_put(&[BatchPut::Raw {
207 key: descriptor_pk,
208 bytes: descriptor.to_le_bytes(),
209 }])?;
210
211 self.dtype_cache.insert(field_id, new_data_type.clone());
213
214 Ok(())
215 }
216
217 pub fn ensure_column_registered(
223 &self,
224 field_id: LogicalFieldId,
225 data_type: &DataType,
226 ) -> Result<()> {
227 let rid_field_id = rowid_fid(field_id);
228
229 let mut catalog_dirty = false;
230 let descriptor_pk;
231 let rid_descriptor_pk;
232
233 {
234 let mut catalog = self.catalog.write().unwrap();
235 descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
236 pk
237 } else {
238 let pk = self.pager.alloc_many(1)?[0];
239 catalog.map.insert(field_id, pk);
240 catalog_dirty = true;
241 pk
242 };
243
244 rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
245 pk
246 } else {
247 let pk = self.pager.alloc_many(1)?[0];
248 catalog.map.insert(rid_field_id, pk);
249 catalog_dirty = true;
250 pk
251 };
252 }
253
254 let mut puts: Vec<BatchPut> = Vec::new();
255
256 let data_descriptor_missing = self
257 .pager
258 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
259 .pop()
260 .and_then(|r| match r {
261 GetResult::Raw { bytes, .. } => Some(bytes),
262 _ => None,
263 })
264 .is_none();
265
266 if data_descriptor_missing {
267 let (mut descriptor, tail_page) =
268 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
269 let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
270 if fingerprint != 0 {
271 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
272 }
273 puts.push(BatchPut::Raw {
274 key: descriptor.tail_page_pk,
275 bytes: tail_page,
276 });
277 puts.push(BatchPut::Raw {
278 key: descriptor_pk,
279 bytes: descriptor.to_le_bytes(),
280 });
281 }
282
283 let rid_descriptor_missing = self
284 .pager
285 .batch_get(&[BatchGet::Raw {
286 key: rid_descriptor_pk,
287 }])?
288 .pop()
289 .and_then(|r| match r {
290 GetResult::Raw { bytes, .. } => Some(bytes),
291 _ => None,
292 })
293 .is_none();
294
295 if rid_descriptor_missing {
296 let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
297 Arc::clone(&self.pager),
298 rid_descriptor_pk,
299 rid_field_id,
300 )?;
301 let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
302 if fingerprint != 0 {
303 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
304 }
305 puts.push(BatchPut::Raw {
306 key: rid_descriptor.tail_page_pk,
307 bytes: tail_page,
308 });
309 puts.push(BatchPut::Raw {
310 key: rid_descriptor_pk,
311 bytes: rid_descriptor.to_le_bytes(),
312 });
313 }
314
315 self.dtype_cache.insert(field_id, data_type.clone());
316
317 if catalog_dirty {
318 let catalog_bytes = {
319 let catalog = self.catalog.read().unwrap();
320 catalog.to_bytes()
321 };
322 puts.push(BatchPut::Raw {
323 key: CATALOG_ROOT_PKEY,
324 bytes: catalog_bytes,
325 });
326 }
327
328 if !puts.is_empty() {
329 self.pager.batch_put(&puts)?;
330 }
331
332 Ok(())
333 }
334
335 pub fn filter_row_ids<T>(
345 &self,
346 field_id: LogicalFieldId,
347 predicate: &Predicate<T::Value>,
348 ) -> Result<Vec<u64>>
349 where
350 T: FilterDispatch,
351 {
352 tracing::trace!(field=?field_id, "filter_row_ids start");
353 let res = T::run_filter(self, field_id, predicate);
354 if let Err(ref err) = res {
355 tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
356 } else {
357 tracing::trace!(field=?field_id, "filter_row_ids ok");
358 }
359 res
360 }
361
362 pub fn filter_matches<T, F>(
377 &self,
378 field_id: LogicalFieldId,
379 predicate: F,
380 ) -> Result<FilterResult>
381 where
382 T: FilterPrimitive,
383 F: FnMut(T::Native) -> bool,
384 {
385 T::run_filter_with_result(self, field_id, predicate)
386 }
387
388 pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
397 let catalog = self.catalog.read().unwrap();
398 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
399
400 let desc_blob = self
401 .pager
402 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
403 .pop()
404 .and_then(|r| match r {
405 GetResult::Raw { bytes, .. } => Some(bytes),
406 _ => None,
407 })
408 .ok_or(Error::NotFound)?;
409 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
410
411 let kinds = descriptor.get_indexes()?;
412 Ok(kinds)
413 }
414
415 pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
424 let catalog = self.catalog.read().unwrap();
425 let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
426 drop(catalog);
427
428 let desc_blob = self
429 .pager
430 .batch_get(&[BatchGet::Raw { key: desc_pk }])?
431 .pop()
432 .and_then(|r| match r {
433 GetResult::Raw { bytes, .. } => Some(bytes),
434 _ => None,
435 })
436 .ok_or(Error::NotFound)?;
437
438 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
439 Ok(desc.total_row_count)
440 }
441
442 pub fn total_rows_for_table(&self, table_id: llkv_types::ids::TableId) -> Result<u64> {
451 use llkv_types::ids::LogicalStorageNamespace;
452 let catalog = self.catalog.read().unwrap();
454 let candidates: Vec<LogicalFieldId> = catalog
456 .map
457 .keys()
458 .filter(|fid| {
459 fid.namespace() == LogicalStorageNamespace::UserData && fid.table_id() == table_id
460 })
461 .copied()
462 .collect();
463 drop(catalog);
464
465 if candidates.is_empty() {
466 return Ok(0);
467 }
468
469 let mut max_rows: u64 = 0;
471 for field in candidates {
472 let rows = self.total_rows_for_field(field)?;
473 if rows > max_rows {
474 max_rows = rows;
475 }
476 }
477 Ok(max_rows)
478 }
479
480 pub fn user_field_ids_for_table(
486 &self,
487 table_id: llkv_types::ids::TableId,
488 ) -> Vec<LogicalFieldId> {
489 use llkv_types::ids::LogicalStorageNamespace;
490
491 let catalog = self.catalog.read().unwrap();
492 catalog
493 .map
494 .keys()
495 .filter(|fid| {
496 fid.namespace() == LogicalStorageNamespace::UserData && fid.table_id() == table_id
497 })
498 .copied()
499 .collect()
500 }
501
502 pub fn remove_column(&self, field_id: LogicalFieldId) -> Result<()> {
516 let rowid_field = rowid_fid(field_id);
517 let (descriptor_pk, rowid_descriptor_pk) = {
518 let catalog = self.catalog.read().unwrap();
519 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
520 let rowid_descriptor_pk = catalog.map.get(&rowid_field).copied();
521 (descriptor_pk, rowid_descriptor_pk)
522 };
523
524 let mut free_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
525 self.collect_descriptor_allocations(descriptor_pk, &mut free_keys)?;
526 if let Some(rid_pk) = rowid_descriptor_pk {
527 self.collect_descriptor_allocations(rid_pk, &mut free_keys)?;
528 }
529
530 let mut catalog = self.catalog.write().unwrap();
532 let removed_value = catalog.map.remove(&field_id).is_some();
533 let removed_rowid = catalog.map.remove(&rowid_field);
534 drop(catalog);
535
536 if !removed_value {
537 return Err(Error::NotFound);
538 }
539
540 let catalog_bytes = {
542 let catalog = self.catalog.read().unwrap();
543 catalog.to_bytes()
544 };
545
546 self.pager.batch_put(&[BatchPut::Raw {
547 key: CATALOG_ROOT_PKEY,
548 bytes: catalog_bytes,
549 }])?;
550
551 if !free_keys.is_empty() {
552 let mut frees: Vec<PhysicalKey> = free_keys.into_iter().collect();
553 frees.sort_unstable();
554 if let Err(err) = self.pager.free_many(&frees) {
555 let mut catalog = self.catalog.write().unwrap();
557 catalog.map.insert(field_id, descriptor_pk);
558 if let Some(rid_pk) = rowid_descriptor_pk.or(removed_rowid) {
559 catalog.map.insert(rowid_field, rid_pk);
560 }
561 drop(catalog);
562
563 let restore_bytes = {
564 let catalog = self.catalog.read().unwrap();
565 catalog.to_bytes()
566 };
567
568 let _ = self.pager.batch_put(&[BatchPut::Raw {
569 key: CATALOG_ROOT_PKEY,
570 bytes: restore_bytes,
571 }]);
572
573 return Err(err);
574 }
575 }
576
577 Ok(())
578 }
579
580 fn collect_descriptor_allocations(
581 &self,
582 descriptor_pk: PhysicalKey,
583 keys: &mut FxHashSet<PhysicalKey>,
584 ) -> Result<()> {
585 if descriptor_pk == 0 {
586 return Ok(());
587 }
588
589 let Some(GetResult::Raw { bytes, .. }) = self
590 .pager
591 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
592 .pop()
593 else {
594 return Ok(());
595 };
596
597 keys.insert(descriptor_pk);
598
599 let descriptor = ColumnDescriptor::from_le_bytes(bytes.as_ref());
600 let mut page_pk = descriptor.head_page_pk;
601
602 while page_pk != 0 {
603 let page_blob = self
604 .pager
605 .batch_get(&[BatchGet::Raw { key: page_pk }])?
606 .pop()
607 .and_then(|r| match r {
608 GetResult::Raw { bytes, .. } => Some(bytes),
609 _ => None,
610 })
611 .ok_or(Error::NotFound)?;
612 let page_bytes = page_blob.as_ref();
613 if page_bytes.len() < DescriptorPageHeader::DISK_SIZE {
614 return Err(Error::Internal("descriptor page truncated".into()));
615 }
616 let header =
617 DescriptorPageHeader::from_le_bytes(&page_bytes[..DescriptorPageHeader::DISK_SIZE]);
618 keys.insert(page_pk);
619
620 let mut offset = DescriptorPageHeader::DISK_SIZE;
621 for _ in 0..header.entry_count as usize {
622 let end = offset + ChunkMetadata::DISK_SIZE;
623 if end > page_bytes.len() {
624 break;
625 }
626 let meta = ChunkMetadata::from_le_bytes(&page_bytes[offset..end]);
627 if meta.chunk_pk != 0 {
628 keys.insert(meta.chunk_pk);
629 }
630 if meta.value_order_perm_pk != 0 {
631 keys.insert(meta.value_order_perm_pk);
632 }
633 offset = end;
634 }
635
636 page_pk = header.next_page_pk;
637 }
638
639 Ok(())
640 }
641
642 pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
652 let rid_fid = rowid_fid(field_id);
653 let catalog = self.catalog.read().unwrap();
654 let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
655 let rid_desc_blob = self
656 .pager
657 .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
658 .pop()
659 .and_then(|r| match r {
660 GetResult::Raw { bytes, .. } => Some(bytes),
661 _ => None,
662 })
663 .ok_or(Error::NotFound)?;
664 let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
665 drop(catalog);
666
667 for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
669 let meta = m?;
670 if meta.row_count == 0 {
671 continue;
672 }
673 if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
674 || row_id > meta.max_val_u64
675 {
676 continue;
677 }
678 let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
680 if meta.value_order_perm_pk != 0 {
681 gets.push(BatchGet::Raw {
682 key: meta.value_order_perm_pk,
683 });
684 }
685 let results = self.pager.batch_get(&gets)?;
686 let mut rid_blob: Option<EntryHandle> = None;
687 let mut perm_blob: Option<EntryHandle> = None;
688 for r in results {
689 if let GetResult::Raw { key, bytes } = r {
690 if key == meta.chunk_pk {
691 rid_blob = Some(bytes);
692 } else if key == meta.value_order_perm_pk {
693 perm_blob = Some(bytes);
694 }
695 }
696 }
697 let Some(rid_blob) = rid_blob else { continue };
699 let rid_any = deserialize_array(rid_blob)?;
700 let rids = rid_any
701 .as_any()
702 .downcast_ref::<UInt64Array>()
703 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
704 if let Some(pblob) = perm_blob {
705 let perm_any = deserialize_array(pblob)?;
706 let perm = perm_any
707 .as_any()
708 .downcast_ref::<UInt32Array>()
709 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
710 let mut lo: isize = 0;
712 let mut hi: isize = (perm.len() as isize) - 1;
713 while lo <= hi {
714 let mid = ((lo + hi) >> 1) as usize;
715 let rid = rids.value(perm.value(mid) as usize);
716 if rid == row_id {
717 return Ok(true);
718 } else if rid < row_id {
719 lo = mid as isize + 1;
720 } else {
721 hi = mid as isize - 1;
722 }
723 }
724 } else {
725 let mut lo: isize = 0;
727 let mut hi: isize = (rids.len() as isize) - 1;
728 while lo <= hi {
729 let mid = ((lo + hi) >> 1) as usize;
730 let rid = rids.value(mid);
731 if rid == row_id {
732 return Ok(true);
733 } else if rid < row_id {
734 lo = mid as isize + 1;
735 } else {
736 hi = mid as isize - 1;
737 }
738 }
739 }
740 }
741 Ok(false)
742 }
743
744 #[allow(unused_variables, unused_assignments)] pub fn append(&self, batch: &RecordBatch) -> Result<()> {
776 tracing::trace!(
777 num_columns = batch.num_columns(),
778 num_rows = batch.num_rows(),
779 "ColumnStore::append BEGIN"
780 );
781
782 let working_batch: RecordBatch;
788 let batch_ref = {
789 let schema = batch.schema();
790 let row_id_idx = schema
791 .index_of(ROW_ID_COLUMN_NAME)
792 .map_err(|_| Error::Internal("row_id column required".into()))?;
793 let row_id_any = batch.column(row_id_idx).clone();
794 let row_id_arr = row_id_any
795 .as_any()
796 .downcast_ref::<UInt64Array>()
797 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
798
799 let mut is_sorted = true;
801 if !row_id_arr.is_empty() {
802 let mut last = row_id_arr.value(0);
803 for i in 1..row_id_arr.len() {
804 let current = row_id_arr.value(i);
805 if current < last {
806 is_sorted = false;
807 break;
808 }
809 last = current;
810 }
811 }
812
813 if is_sorted {
816 batch
817 } else {
818 let sort_col = SortColumn {
819 values: row_id_any,
820 options: None,
821 };
822 let idx = lexsort_to_indices(&[sort_col], None)?;
823 let perm = idx
824 .as_any()
825 .downcast_ref::<UInt32Array>()
826 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
827 let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
828 for i in 0..batch.num_columns() {
829 cols.push(compute::take(batch.column(i), perm, None)?);
830 }
831 working_batch = RecordBatch::try_new(schema.clone(), cols)
832 .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
833 &working_batch
834 }
835 };
836
837 tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
838
839 let schema = batch_ref.schema();
845 let row_id_idx = schema
846 .index_of(ROW_ID_COLUMN_NAME)
847 .map_err(|_| Error::Internal("row_id column required".into()))?;
848
849 let row_id_arr = batch_ref
851 .column(row_id_idx)
852 .as_any()
853 .downcast_ref::<UInt64Array>()
854 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
855 let mut incoming_ids_map = FxHashMap::default();
856 incoming_ids_map.reserve(row_id_arr.len());
857 for i in 0..row_id_arr.len() {
858 incoming_ids_map.insert(row_id_arr.value(i), i);
859 }
860
861 let mut catalog_dirty = false;
862 let mut all_puts: Vec<BatchPut> = Vec::new();
863
864 for i in 0..batch_ref.num_columns() {
866 if i == row_id_idx {
867 continue;
868 }
869 let field = schema.field(i);
870 let field_id =
871 if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
872 field_id_str
873 .parse::<u64>()
874 .map(LogicalFieldId::from)
875 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?
876 } else {
877 continue;
878 };
879
880 let mut catalog_lock = self.catalog.write().unwrap();
882 let rewritten_ids = self.lww_rewrite_for_field(
883 &mut catalog_lock,
884 field_id,
885 &incoming_ids_map,
886 batch_ref.column(i),
887 batch_ref.column(row_id_idx),
888 &mut all_puts,
889 )?;
890
891 let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
893 let pk1 = *catalog_lock.map.entry(field_id).or_insert_with(|| {
894 catalog_dirty = true;
895 self.pager.alloc_many(1).unwrap()[0]
896 });
897 let r_fid = rowid_fid(field_id);
898 let pk2 = *catalog_lock.map.entry(r_fid).or_insert_with(|| {
899 catalog_dirty = true;
900 self.pager.alloc_many(1).unwrap()[0]
901 });
902 (pk1, pk2, r_fid)
903 };
904 drop(catalog_lock);
905
906 let array = batch_ref.column(i);
908 let (array_clean, rids_clean) = if rewritten_ids.is_empty() {
909 if array.null_count() == 0 {
911 (array.clone(), batch_ref.column(row_id_idx).clone())
912 } else {
913 let keep =
914 BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
915 (
916 compute::filter(array, &keep)?,
917 compute::filter(batch_ref.column(row_id_idx), &keep)?,
918 )
919 }
920 } else {
921 let keep = BooleanArray::from_iter((0..array.len()).map(|j| {
923 let rid = row_id_arr.value(j);
924 Some(!rewritten_ids.contains(&rid) && !array.is_null(j))
925 }));
926 (
927 compute::filter(array, &keep)?,
928 compute::filter(batch_ref.column(row_id_idx), &keep)?,
929 )
930 };
931
932 if array_clean.is_empty() {
933 self.ensure_column_registered(field_id, field.data_type())?;
936 continue;
937 }
938
939 self.dtype_cache.insert(field_id, field.data_type().clone());
941
942 let (mut data_descriptor, mut data_tail_page) =
945 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
946 .map_err(|e| {
947 tracing::error!(?field_id, descriptor_pk, error = ?e, "append: load_or_create failed for data descriptor");
948 e
949 })?;
950 let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
951 Arc::clone(&self.pager),
952 rid_descriptor_pk,
953 rid_fid,
954 )
955 .map_err(|e| {
956 tracing::error!(?rid_fid, rid_descriptor_pk, error = ?e, "append: load_or_create failed for rid descriptor");
957 e
958 })?;
959
960 self.index_manager
964 .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
965
966 let slices = split_to_target_bytes(
968 &array_clean,
969 TARGET_CHUNK_BYTES,
970 self.cfg.varwidth_fallback_rows_per_slice,
971 );
972 let mut row_off = 0usize;
973
974 for s in slices {
976 let rows = s.len();
977 let data_pk = self.pager.alloc_many(1)?[0];
979 let s_norm = zero_offset(&s);
980 let data_bytes = serialize_array(s_norm.as_ref())?;
981 all_puts.push(BatchPut::Raw {
982 key: data_pk,
983 bytes: data_bytes,
984 });
985
986 let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
988 let rid_norm = zero_offset(&rid_slice);
989 let rid_pk = self.pager.alloc_many(1)?[0];
990 let rid_bytes = serialize_array(rid_norm.as_ref())?;
991 all_puts.push(BatchPut::Raw {
992 key: rid_pk,
993 bytes: rid_bytes,
994 });
995
996 let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
998 let (min, max) = if !rids_for_meta.is_empty() {
999 let mut min_val = rids_for_meta.value(0);
1000 let mut max_val = rids_for_meta.value(0);
1001 for i in 1..rids_for_meta.len() {
1002 let v = rids_for_meta.value(i);
1003 if v < min_val {
1004 min_val = v;
1005 }
1006 if v > max_val {
1007 max_val = v;
1008 }
1009 }
1010 (min_val, max_val)
1011 } else {
1012 (0, 0)
1013 };
1014
1015 let mut data_meta = ChunkMetadata {
1018 chunk_pk: data_pk,
1019 row_count: rows as u64,
1020 serialized_bytes: s_norm.get_array_memory_size() as u64,
1021 max_val_u64: u64::MAX,
1022 ..Default::default()
1023 };
1024 let mut rid_meta = ChunkMetadata {
1025 chunk_pk: rid_pk,
1026 row_count: rows as u64,
1027 serialized_bytes: rid_norm.get_array_memory_size() as u64,
1028 min_val_u64: min,
1029 max_val_u64: max,
1030 ..Default::default()
1031 };
1032
1033 self.index_manager.stage_updates_for_new_chunk(
1039 field_id,
1040 &data_descriptor,
1041 &s_norm,
1042 &rid_norm,
1043 &mut data_meta,
1044 &mut rid_meta,
1045 &mut all_puts,
1046 )?;
1047
1048 self.append_meta_in_loop(
1050 &mut data_descriptor,
1051 &mut data_tail_page,
1052 data_meta,
1053 &mut all_puts,
1054 )?;
1055 self.append_meta_in_loop(
1056 &mut rid_descriptor,
1057 &mut rid_tail_page,
1058 rid_meta,
1059 &mut all_puts,
1060 )?;
1061 row_off += rows;
1062 }
1063
1064 all_puts.push(BatchPut::Raw {
1067 key: data_descriptor.tail_page_pk,
1068 bytes: data_tail_page,
1069 });
1070 all_puts.push(BatchPut::Raw {
1071 key: descriptor_pk,
1072 bytes: data_descriptor.to_le_bytes(),
1073 });
1074 all_puts.push(BatchPut::Raw {
1075 key: rid_descriptor.tail_page_pk,
1076 bytes: rid_tail_page,
1077 });
1078 all_puts.push(BatchPut::Raw {
1079 key: rid_descriptor_pk,
1080 bytes: rid_descriptor.to_le_bytes(),
1081 });
1082 }
1083
1084 if catalog_dirty {
1086 let catalog = self.catalog.read().unwrap();
1087 all_puts.push(BatchPut::Raw {
1088 key: CATALOG_ROOT_PKEY,
1089 bytes: catalog.to_bytes(),
1090 });
1091 }
1092
1093 if !all_puts.is_empty() {
1097 self.pager.batch_put(&all_puts)?;
1098 }
1099 tracing::trace!("ColumnStore::append END - success");
1100 Ok(())
1101 }
1102
1103 fn lww_rewrite_for_field(
1104 &self,
1105 catalog: &mut ColumnCatalog,
1106 field_id: LogicalFieldId,
1107 incoming_ids_map: &FxHashMap<u64, usize>,
1108 incoming_data: &ArrayRef,
1109 incoming_row_ids: &ArrayRef,
1110 puts: &mut Vec<BatchPut>,
1111 ) -> Result<FxHashSet<u64>> {
1112 use crate::store::descriptor::DescriptorIterator;
1113 use crate::store::ingest::ChunkEdit;
1114
1115 if incoming_ids_map.is_empty() {
1117 return Ok(FxHashSet::default());
1118 }
1119 let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
1120
1121 let desc_pk_data = match catalog.map.get(&field_id) {
1123 Some(pk) => *pk,
1124 None => return Ok(FxHashSet::default()),
1125 };
1126 let rid_fid = rowid_fid(field_id);
1127 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1128 Some(pk) => *pk,
1129 None => return Ok(FxHashSet::default()),
1130 };
1131
1132 let gets = vec![
1134 BatchGet::Raw { key: desc_pk_data },
1135 BatchGet::Raw { key: desc_pk_rid },
1136 ];
1137 let results = self.pager.batch_get(&gets)?;
1138 let mut blobs_by_pk = FxHashMap::default();
1139 for r in results {
1140 if let GetResult::Raw { key, bytes } = r {
1141 blobs_by_pk.insert(key, bytes);
1142 }
1143 }
1144
1145 let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
1146 tracing::error!(
1147 ?field_id,
1148 desc_pk_data,
1149 "lww_rewrite: data descriptor blob not found in pager"
1150 );
1151 Error::NotFound
1152 })?;
1153 let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
1154
1155 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
1156 tracing::error!(
1157 ?rid_fid,
1158 desc_pk_rid,
1159 "lww_rewrite: rid descriptor blob not found in pager"
1160 );
1161 Error::NotFound
1162 })?;
1163 let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1164
1165 tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
1166
1167 let mut metas_data: Vec<ChunkMetadata> = Vec::new();
1169 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1170 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
1171 metas_data.push(m.map_err(|e| {
1172 tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1173 e
1174 })?);
1175 }
1176 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1177 metas_rid.push(m.map_err(|e| {
1178 tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1179 e
1180 })?);
1181 }
1182
1183 tracing::trace!(
1184 ?field_id,
1185 data_chunks = metas_data.len(),
1186 rid_chunks = metas_rid.len(),
1187 "lww_rewrite: chunk metadata collected"
1188 );
1189
1190 let rid_in = incoming_row_ids
1192 .as_any()
1193 .downcast_ref::<UInt64Array>()
1194 .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1195 let mut ids_to_delete = FxHashSet::default();
1196 let mut ids_to_upsert = FxHashSet::default();
1197 for i in 0..rid_in.len() {
1198 let rid = rid_in.value(i);
1199 if incoming_data.is_null(i) {
1200 ids_to_delete.insert(rid);
1201 } else {
1202 ids_to_upsert.insert(rid);
1203 }
1204 }
1205
1206 let (incoming_min, incoming_max) = if !rid_in.is_empty() {
1208 let mut min_val = rid_in.value(0);
1209 let mut max_val = rid_in.value(0);
1210 for i in 1..rid_in.len() {
1211 let rid = rid_in.value(i);
1212 if rid < min_val {
1213 min_val = rid;
1214 }
1215 if rid > max_val {
1216 max_val = rid;
1217 }
1218 }
1219 (min_val, max_val)
1220 } else {
1221 (0, 0)
1222 };
1223
1224 let n = metas_data.len().min(metas_rid.len());
1227 if n > 0 && !rid_in.is_empty() {
1228 let mut has_overlap = false;
1230 for meta_rid in metas_rid.iter().take(n) {
1231 if incoming_min <= meta_rid.max_val_u64 && meta_rid.min_val_u64 <= incoming_max {
1233 has_overlap = true;
1234 break;
1235 }
1236 }
1237
1238 if !has_overlap {
1239 tracing::trace!(
1241 ?field_id,
1242 incoming_min,
1243 incoming_max,
1244 "lww_rewrite: no overlap detected, skipping chunk scan (O(1) fast path)"
1245 );
1246 return Ok(FxHashSet::default());
1247 }
1248 }
1249
1250 let mut rewritten_ids = FxHashSet::default();
1252 let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1253 let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1254
1255 if n > 0 {
1256 let mut chunks_to_fetch = Vec::new();
1258 for (idx, meta_rid) in metas_rid.iter().enumerate().take(n) {
1259 if incoming_min <= meta_rid.max_val_u64 && meta_rid.min_val_u64 <= incoming_max {
1260 chunks_to_fetch.push(idx);
1261 }
1262 }
1263
1264 if chunks_to_fetch.is_empty() {
1265 return Ok(FxHashSet::default());
1266 }
1267
1268 let mut gets_rid = Vec::with_capacity(chunks_to_fetch.len());
1270 for &idx in &chunks_to_fetch {
1271 gets_rid.push(BatchGet::Raw {
1272 key: metas_rid[idx].chunk_pk,
1273 });
1274 }
1275 let rid_results = self.pager.batch_get(&gets_rid)?;
1276 let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1277 for r in rid_results {
1278 if let GetResult::Raw { key, bytes } = r {
1279 rid_blobs.insert(key, bytes);
1280 }
1281 }
1282
1283 for &idx in &chunks_to_fetch {
1284 let meta_rid = &metas_rid[idx];
1285 if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1286 let rid_arr_any = deserialize_array(rid_blob.clone())?;
1287 let rid_arr = rid_arr_any
1288 .as_any()
1289 .downcast_ref::<UInt64Array>()
1290 .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1291 for j in 0..rid_arr.len() {
1292 let rid = rid_arr.value(j);
1293 if incoming_ids.contains(&rid) {
1294 if ids_to_delete.contains(&rid) {
1295 hit_del.entry(idx).or_default().push(rid);
1296 } else if ids_to_upsert.contains(&rid) {
1297 hit_up.entry(idx).or_default().push(rid);
1298 }
1299 rewritten_ids.insert(rid);
1300 }
1301 }
1302 }
1303 }
1304 }
1305
1306 if hit_up.is_empty() && hit_del.is_empty() {
1307 return Ok(rewritten_ids);
1308 }
1309
1310 let mut hit_set = FxHashSet::default();
1312 hit_set.extend(hit_up.keys().copied());
1313 hit_set.extend(hit_del.keys().copied());
1314 let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1315
1316 let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1317 for &i in &hit_idxs {
1318 gets.push(BatchGet::Raw {
1319 key: metas_data[i].chunk_pk,
1320 });
1321 gets.push(BatchGet::Raw {
1322 key: metas_rid[i].chunk_pk,
1323 });
1324 }
1325 let results = self.pager.batch_get(&gets)?;
1326 let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1327 for r in results {
1328 if let GetResult::Raw { key, bytes } = r {
1329 blob_map.insert(key, bytes);
1330 }
1331 }
1332
1333 for i in hit_idxs {
1335 let old_data_arr =
1336 deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1337 let old_rid_arr_any =
1338 deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1339 let old_rid_arr = old_rid_arr_any
1340 .as_any()
1341 .downcast_ref::<UInt64Array>()
1342 .unwrap();
1343
1344 let up_vec = hit_up.remove(&i).unwrap_or_default();
1345 let del_vec = hit_del.remove(&i).unwrap_or_default();
1346
1347 let edit = ChunkEdit::from_lww_upsert(
1349 old_rid_arr,
1350 &up_vec,
1351 &del_vec,
1352 incoming_data,
1353 incoming_row_ids,
1354 incoming_ids_map,
1355 )?;
1356
1357 let (new_data_arr, new_rid_arr) =
1358 ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1359
1360 let data_bytes = serialize_array(&new_data_arr)?;
1362 puts.push(BatchPut::Raw {
1363 key: metas_data[i].chunk_pk,
1364 bytes: data_bytes,
1365 });
1366 metas_data[i].row_count = new_data_arr.len() as u64;
1367 metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1368
1369 if let Some(rarr) = new_rid_arr {
1371 let rid_bytes = serialize_array(&rarr)?;
1372 puts.push(BatchPut::Raw {
1373 key: metas_rid[i].chunk_pk,
1374 bytes: rid_bytes,
1375 });
1376 metas_rid[i].row_count = rarr.len() as u64;
1377 metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1378 }
1379
1380 if metas_data[i].value_order_perm_pk != 0 {
1382 let sort_col = SortColumn {
1383 values: new_data_arr,
1384 options: None,
1385 };
1386 let idx = lexsort_to_indices(&[sort_col], None)?;
1387 let perm_bytes = serialize_array(&idx)?;
1388 puts.push(BatchPut::Raw {
1389 key: metas_data[i].value_order_perm_pk,
1390 bytes: perm_bytes,
1391 });
1392 }
1393 }
1394
1395 descriptor_data.rewrite_pages(
1397 Arc::clone(&self.pager),
1398 desc_pk_data,
1399 &mut metas_data,
1400 puts,
1401 )?;
1402 descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1403
1404 Ok(rewritten_ids)
1405 }
1406
1407 fn stage_delete_rows_for_field(
1408 &self,
1409 field_id: LogicalFieldId,
1410 rows_to_delete: &[RowId],
1411 staged_puts: &mut Vec<BatchPut>,
1412 ) -> Result<bool> {
1413 tracing::warn!(
1414 field_id = ?field_id,
1415 rows = rows_to_delete.len(),
1416 "delete_rows stage_delete_rows_for_field: start"
1417 );
1418 use crate::store::descriptor::DescriptorIterator;
1419 use crate::store::ingest::ChunkEdit;
1420
1421 if rows_to_delete.is_empty() {
1422 return Ok(false);
1423 }
1424
1425 let mut del_iter = rows_to_delete.iter().copied();
1427 let mut cur_del = del_iter.next();
1428 let mut last_seen: Option<u64> = cur_del;
1429
1430 let catalog = self.catalog.read().unwrap();
1432 let desc_pk = match catalog.map.get(&field_id) {
1433 Some(pk) => *pk,
1434 None => {
1435 tracing::trace!(
1436 field_id = ?field_id,
1437 "delete_rows stage_delete_rows_for_field: data descriptor missing"
1438 );
1439 return Err(Error::NotFound);
1440 }
1441 };
1442 let rid_fid = rowid_fid(field_id);
1443 let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1444 tracing::warn!(
1445 field_id = ?field_id,
1446 desc_pk,
1447 desc_pk_rid = ?desc_pk_rid,
1448 "delete_rows stage_delete_rows_for_field: descriptor keys"
1449 );
1450
1451 let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1453 if let Some(pk) = desc_pk_rid {
1454 gets.push(BatchGet::Raw { key: pk });
1455 }
1456 let results = match self.pager.batch_get(&gets) {
1457 Ok(res) => res,
1458 Err(err) => {
1459 tracing::trace!(
1460 field_id = ?field_id,
1461 error = ?err,
1462 "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1463 );
1464 return Err(err);
1465 }
1466 };
1467 let mut blobs_by_pk = FxHashMap::default();
1468 for res in results {
1469 if let GetResult::Raw { key, bytes } = res {
1470 blobs_by_pk.insert(key, bytes);
1471 }
1472 }
1473
1474 tracing::warn!(
1475 field_id = ?field_id,
1476 desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1477 rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1478 "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1479 );
1480
1481 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1482 tracing::trace!(
1483 field_id = ?field_id,
1484 desc_pk,
1485 "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1486 );
1487 Error::Internal(format!(
1488 "descriptor pk={} missing during delete_rows for field {:?}",
1489 desc_pk, field_id
1490 ))
1491 })?;
1492 let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1493
1494 let mut metas: Vec<ChunkMetadata> = Vec::new();
1496 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1497 metas.push(m?);
1498 }
1499 if metas.is_empty() {
1500 drop(catalog);
1501 return Ok(false);
1502 }
1503
1504 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1506 let mut descriptor_rid: Option<ColumnDescriptor> = None;
1507 tracing::warn!(
1508 field_id = ?field_id,
1509 metas_len = metas.len(),
1510 desc_pk_rid = ?desc_pk_rid,
1511 "delete_rows stage_delete_rows_for_field: data metas loaded"
1512 );
1513 if let Some(pk_rid) = desc_pk_rid
1514 && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1515 {
1516 let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1517 for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1518 metas_rid.push(m?);
1519 }
1520 descriptor_rid = Some(d_rid);
1521 }
1522
1523 tracing::warn!(
1524 field_id = ?field_id,
1525 metas_rid_len = metas_rid.len(),
1526 "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1527 );
1528
1529 let mut cum_rows = 0u64;
1530 let mut any_changed = false;
1531
1532 for (i, meta) in metas.iter_mut().enumerate() {
1533 let start_u64 = cum_rows;
1534 let end_u64 = start_u64 + meta.row_count;
1535
1536 while let Some(d) = cur_del {
1538 if d < start_u64
1539 && let Some(prev) = last_seen
1540 {
1541 if d < prev {
1542 return Err(Error::Internal(
1543 "rows_to_delete must be ascending/unique".into(),
1544 ));
1545 }
1546
1547 last_seen = Some(d);
1548 cur_del = del_iter.next();
1549 } else {
1550 break;
1551 }
1552 }
1553
1554 let rows = meta.row_count as usize;
1556 let mut del_local: FxHashSet<usize> = FxHashSet::default();
1557 while let Some(d) = cur_del {
1558 if d >= end_u64 {
1559 break;
1560 }
1561 del_local.insert((d - start_u64) as usize);
1562 last_seen = Some(d);
1563 cur_del = del_iter.next();
1564 }
1565
1566 if del_local.is_empty() {
1567 cum_rows = end_u64;
1568 continue;
1569 }
1570
1571 let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1573 if let Some(rm) = metas_rid.get(i) {
1574 chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1575 }
1576 let chunk_results = match self.pager.batch_get(&chunk_gets) {
1577 Ok(res) => res,
1578 Err(err) => {
1579 tracing::trace!(
1580 field_id = ?field_id,
1581 chunk_pk = meta.chunk_pk,
1582 error = ?err,
1583 "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1584 );
1585 return Err(err);
1586 }
1587 };
1588 let mut chunk_blobs = FxHashMap::default();
1589 for res in chunk_results {
1590 if let GetResult::Raw { key, bytes } = res {
1591 chunk_blobs.insert(key, bytes);
1592 }
1593 }
1594
1595 tracing::warn!(
1596 field_id = ?field_id,
1597 chunk_pk = meta.chunk_pk,
1598 rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1599 data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1600 rid_found = metas_rid
1601 .get(i)
1602 .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1603 "delete_rows stage_delete_rows_for_field: chunk fetch status"
1604 );
1605
1606 let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1607 Some(bytes) => bytes,
1608 None => {
1609 tracing::trace!(
1610 field_id = ?field_id,
1611 chunk_pk = meta.chunk_pk,
1612 "delete_rows stage_delete_rows_for_field: chunk missing"
1613 );
1614 return Err(Error::NotFound);
1615 }
1616 };
1617 let data_arr = deserialize_array(data_blob)?;
1618
1619 let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1620 let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1621 Some(bytes) => bytes,
1622 None => {
1623 tracing::trace!(
1624 field_id = ?field_id,
1625 rowid_chunk_pk = rm.chunk_pk,
1626 "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1627 );
1628 return Err(Error::NotFound);
1629 }
1630 };
1631 Some(deserialize_array(rid_blob)?)
1632 } else {
1633 None
1634 };
1635
1636 let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1638
1639 let (new_data_arr, new_rid_arr) =
1641 ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1642
1643 let data_bytes = serialize_array(&new_data_arr)?;
1645 staged_puts.push(BatchPut::Raw {
1646 key: meta.chunk_pk,
1647 bytes: data_bytes,
1648 });
1649 meta.row_count = new_data_arr.len() as u64;
1650 meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1651
1652 if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1654 let rm = metas_rid.get_mut(i).unwrap();
1655 let rid_bytes = serialize_array(&rids)?;
1656 staged_puts.push(BatchPut::Raw {
1657 key: rm.chunk_pk,
1658 bytes: rid_bytes,
1659 });
1660 rm.row_count = rids.len() as u64;
1661 rm.serialized_bytes = rids.get_array_memory_size() as u64;
1662 }
1663
1664 if meta.value_order_perm_pk != 0 {
1666 let sort_column = SortColumn {
1667 values: new_data_arr,
1668 options: None,
1669 };
1670 let indices = lexsort_to_indices(&[sort_column], None)?;
1671 let perm_bytes = serialize_array(&indices)?;
1672 staged_puts.push(BatchPut::Raw {
1673 key: meta.value_order_perm_pk,
1674 bytes: perm_bytes,
1675 });
1676 }
1677
1678 cum_rows = end_u64;
1679 any_changed = true;
1680 }
1681
1682 descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1684 if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1685 rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1686 }
1687 drop(catalog);
1688 tracing::trace!(
1689 field_id = ?field_id,
1690 changed = any_changed,
1691 "delete_rows stage_delete_rows_for_field: finished stage"
1692 );
1693 Ok(any_changed)
1694 }
1695
1696 pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1702 if fields.is_empty() || rows_to_delete.is_empty() {
1703 return Ok(());
1704 }
1705
1706 let mut puts = Vec::new();
1707 let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1708 let mut table_id: Option<TableId> = None;
1709
1710 tracing::warn!(
1711 fields = fields.len(),
1712 rows = rows_to_delete.len(),
1713 "delete_rows begin"
1714 );
1715 for field_id in fields {
1716 tracing::warn!(field = ?field_id, "delete_rows iter field");
1717 if let Some(expected) = table_id {
1718 if field_id.table_id() != expected {
1719 return Err(Error::InvalidArgumentError(
1720 "delete_rows requires fields from the same table".into(),
1721 ));
1722 }
1723 } else {
1724 table_id = Some(field_id.table_id());
1725 }
1726
1727 if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1728 touched.insert(*field_id);
1729 }
1730 }
1731
1732 if puts.is_empty() {
1733 return Ok(());
1734 }
1735
1736 self.pager.batch_put(&puts)?;
1737
1738 tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1739
1740 for field_id in touched {
1741 self.compact_field_bounded(field_id)?;
1742 }
1743 tracing::warn!("delete_rows complete");
1744 Ok(())
1745 }
1746
1747 pub(crate) fn write_descriptor_chain(
1753 &self,
1754 descriptor_pk: PhysicalKey,
1755 descriptor: &mut ColumnDescriptor,
1756 new_metas: &[ChunkMetadata],
1757 puts: &mut Vec<BatchPut>,
1758 frees: &mut Vec<PhysicalKey>,
1759 ) -> Result<()> {
1760 let mut old_pages = Vec::new();
1762 let mut pk = descriptor.head_page_pk;
1763 while pk != 0 {
1764 let page_blob = self
1765 .pager
1766 .batch_get(&[BatchGet::Raw { key: pk }])?
1767 .pop()
1768 .and_then(|res| match res {
1769 GetResult::Raw { bytes, .. } => Some(bytes),
1770 _ => None,
1771 })
1772 .ok_or(Error::NotFound)?;
1773 let header = DescriptorPageHeader::from_le_bytes(
1774 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1775 );
1776 old_pages.push(pk);
1777 pk = header.next_page_pk;
1778 }
1779
1780 let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1782 let need_pages = if new_metas.is_empty() {
1783 0
1784 } else {
1785 new_metas.len().div_ceil(per)
1786 };
1787 if need_pages == 0 {
1789 frees.extend(old_pages.iter().copied());
1790
1791 descriptor.head_page_pk = 0;
1796 descriptor.tail_page_pk = 0;
1797 descriptor.total_row_count = 0;
1798 descriptor.total_chunk_count = 0;
1799 puts.push(BatchPut::Raw {
1800 key: descriptor_pk,
1801 bytes: descriptor.to_le_bytes(),
1802 });
1803 return Ok(());
1804 }
1805
1806 let mut pages = Vec::with_capacity(need_pages);
1808 if !old_pages.is_empty() {
1809 pages.push(old_pages[0]);
1810 } else {
1811 pages.push(self.pager.alloc_many(1)?[0]);
1812 descriptor.head_page_pk = pages[0];
1813 }
1814 if need_pages > pages.len() {
1815 let extra = self.pager.alloc_many(need_pages - pages.len())?;
1816 pages.extend(extra);
1817 }
1818
1819 if old_pages.len() > need_pages {
1821 frees.extend(old_pages[need_pages..].iter().copied());
1822 }
1823
1824 let mut off = 0usize;
1826 for (i, page_pk) in pages.iter().copied().enumerate() {
1827 let remain = new_metas.len() - off;
1828 let count = remain.min(per);
1829 let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1830 let header = DescriptorPageHeader {
1831 next_page_pk: next,
1832 entry_count: count as u32,
1833 _padding: [0; 4],
1834 };
1835 let mut page_bytes = header.to_le_bytes().to_vec();
1836 for m in &new_metas[off..off + count] {
1837 page_bytes.extend_from_slice(&m.to_le_bytes());
1838 }
1839 puts.push(BatchPut::Raw {
1840 key: page_pk,
1841 bytes: page_bytes,
1842 });
1843 off += count;
1844 }
1845
1846 descriptor.tail_page_pk = *pages.last().unwrap();
1847 descriptor.total_chunk_count = new_metas.len() as u64;
1848 descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1849 puts.push(BatchPut::Raw {
1850 key: descriptor_pk,
1851 bytes: descriptor.to_le_bytes(),
1852 });
1853 Ok(())
1854 }
1855
1856 fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1860 let mut catalog = self.catalog.write().unwrap();
1862
1863 let desc_pk = match catalog.map.get(&field_id) {
1864 Some(&pk) => pk,
1865 None => return Ok(()),
1866 };
1867 let rid_fid = rowid_fid(field_id);
1868 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1869 Some(&pk) => pk,
1870 None => return Ok(()),
1871 };
1872
1873 let gets = vec![
1875 BatchGet::Raw { key: desc_pk },
1876 BatchGet::Raw { key: desc_pk_rid },
1877 ];
1878 let results = self.pager.batch_get(&gets)?;
1879 let mut blobs_by_pk = FxHashMap::default();
1880 for res in results {
1881 if let GetResult::Raw { key, bytes } = res {
1882 blobs_by_pk.insert(key, bytes);
1883 }
1884 }
1885 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1886 let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1887 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1888 let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1889
1890 let mut metas = Vec::new();
1892 for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1893 metas.push(m?);
1894 }
1895 let mut metas_rid = Vec::new();
1896 for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1897 metas_rid.push(m?);
1898 }
1899 if metas.is_empty() || metas_rid.is_empty() {
1900 return Ok(());
1901 }
1902
1903 let mut puts: Vec<BatchPut> = Vec::new();
1904 let mut frees: Vec<PhysicalKey> = Vec::new();
1905 let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1906 let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1907
1908 let mut i = 0usize;
1909 while i < metas.len() {
1910 let sz = metas[i].serialized_bytes as usize;
1911 if sz >= MIN_CHUNK_BYTES {
1913 new_metas.push(metas[i]);
1914 new_rid_metas.push(metas_rid[i]);
1915 i += 1;
1916 continue;
1917 }
1918
1919 let mut j = i;
1921 let mut run_bytes = 0usize;
1922 while j < metas.len() {
1923 let b = metas[j].serialized_bytes as usize;
1924 if b >= TARGET_CHUNK_BYTES {
1925 break;
1926 }
1927 if run_bytes + b > MAX_MERGE_RUN_BYTES {
1928 break;
1929 }
1930 run_bytes += b;
1931 j += 1;
1932 }
1933 if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1934 new_metas.push(metas[i]);
1935 new_rid_metas.push(metas_rid[i]);
1936 i += 1;
1937 continue;
1938 }
1939
1940 let mut gets = Vec::with_capacity((j - i) * 2);
1942 for k in i..j {
1943 gets.push(BatchGet::Raw {
1944 key: metas[k].chunk_pk,
1945 });
1946 gets.push(BatchGet::Raw {
1947 key: metas_rid[k].chunk_pk,
1948 });
1949 }
1950 let results = self.pager.batch_get(&gets)?;
1951 let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1952 for r in results {
1953 match r {
1954 GetResult::Raw { key, bytes } => {
1955 by_pk.insert(key, bytes);
1956 }
1957 _ => return Err(Error::NotFound),
1958 }
1959 }
1960 let mut data_parts = Vec::with_capacity(j - i);
1961 let mut rid_parts = Vec::with_capacity(j - i);
1962 for k in i..j {
1963 let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1964 data_parts.push(deserialize_array(db.clone())?);
1965 let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1966 rid_parts.push(deserialize_array(rb.clone())?);
1967 }
1968 let merged_data = concat_many(data_parts.iter().collect())?;
1969 let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1970
1971 let slices = split_to_target_bytes(
1973 &merged_data,
1974 TARGET_CHUNK_BYTES,
1975 self.cfg.varwidth_fallback_rows_per_slice,
1976 );
1977 let mut rid_off = 0usize;
1978 let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1979
1980 for s in slices {
1981 let rows = s.len();
1982 let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1984 let rid_norm = zero_offset(&rid_ref);
1985 let rid_pk = self.pager.alloc_many(1)?[0];
1986 let rid_bytes = serialize_array(rid_norm.as_ref())?;
1987
1988 let data_pk = self.pager.alloc_many(1)?[0];
1989 let s_norm = zero_offset(&s);
1990 let data_bytes = serialize_array(s_norm.as_ref())?;
1991 puts.push(BatchPut::Raw {
1992 key: data_pk,
1993 bytes: data_bytes,
1994 });
1995 puts.push(BatchPut::Raw {
1996 key: rid_pk,
1997 bytes: rid_bytes,
1998 });
1999 let mut meta = ChunkMetadata {
2000 chunk_pk: data_pk,
2001 row_count: rows as u64,
2002 serialized_bytes: s_norm.get_array_memory_size() as u64,
2003 max_val_u64: u64::MAX,
2004 ..Default::default()
2005 };
2006 if need_perms {
2008 let sort_col = SortColumn {
2009 values: s.clone(),
2010 options: None,
2011 };
2012 let idx = lexsort_to_indices(&[sort_col], None)?;
2013 let perm_bytes = serialize_array(&idx)?;
2014 let perm_pk = self.pager.alloc_many(1)?[0];
2015 puts.push(BatchPut::Raw {
2016 key: perm_pk,
2017 bytes: perm_bytes,
2018 });
2019 meta.value_order_perm_pk = perm_pk;
2020 }
2021
2022 let rid_any = rid_norm.clone();
2024 let rids = rid_any
2025 .as_any()
2026 .downcast_ref::<UInt64Array>()
2027 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
2028 let mut min = u64::MAX;
2029 let mut max = 0u64;
2030 let mut sorted_rids = true;
2031 let mut last_v = 0u64;
2032 for ii in 0..rids.len() {
2033 let v = rids.value(ii);
2034 if ii == 0 {
2035 last_v = v;
2036 } else if v < last_v {
2037 sorted_rids = false;
2038 } else {
2039 last_v = v;
2040 }
2041 if v < min {
2042 min = v;
2043 }
2044 if v > max {
2045 max = v;
2046 }
2047 }
2048 let mut rid_perm_pk = 0u64;
2049 if !sorted_rids {
2050 let rid_sort_col = SortColumn {
2051 values: rid_any,
2052 options: None,
2053 };
2054 let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
2055 let rid_perm_bytes = serialize_array(&rid_idx)?;
2056 rid_perm_pk = self.pager.alloc_many(1)?[0];
2057 puts.push(BatchPut::Raw {
2058 key: rid_perm_pk,
2059 bytes: rid_perm_bytes,
2060 });
2061 }
2062 let rid_meta = ChunkMetadata {
2063 chunk_pk: rid_pk,
2064 value_order_perm_pk: rid_perm_pk,
2065 row_count: rows as u64,
2066 serialized_bytes: rid_norm.get_array_memory_size() as u64,
2067 min_val_u64: if rows > 0 { min } else { 0 },
2068 max_val_u64: if rows > 0 { max } else { 0 },
2069 };
2070 new_metas.push(meta);
2071 new_rid_metas.push(rid_meta);
2072 rid_off += rows;
2073 }
2074
2075 for k in i..j {
2077 frees.push(metas[k].chunk_pk);
2078 if metas[k].value_order_perm_pk != 0 {
2079 frees.push(metas[k].value_order_perm_pk);
2080 }
2081 frees.push(metas_rid[k].chunk_pk);
2082 if metas_rid[k].value_order_perm_pk != 0 {
2083 frees.push(metas_rid[k].value_order_perm_pk);
2084 }
2085 }
2086
2087 i = j;
2088 }
2089
2090 if new_metas.is_empty() {
2092 self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
2094 self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
2095 catalog.map.remove(&field_id);
2096 catalog.map.remove(&rid_fid);
2097 puts.push(BatchPut::Raw {
2098 key: CATALOG_ROOT_PKEY,
2099 bytes: catalog.to_bytes(),
2100 });
2101 if !puts.is_empty() {
2102 self.pager.batch_put(&puts)?;
2103 }
2104 if !frees.is_empty() {
2105 self.pager.free_many(&frees)?;
2106 }
2107 return Ok(());
2108 }
2109
2110 self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
2112 self.write_descriptor_chain(
2113 desc_pk_rid,
2114 &mut desc_rid,
2115 &new_rid_metas,
2116 &mut puts,
2117 &mut frees,
2118 )?;
2119 if !puts.is_empty() {
2121 self.pager.batch_put(&puts)?;
2122 }
2123 if !frees.is_empty() {
2124 self.pager.free_many(&frees)?;
2125 }
2126
2127 Ok(())
2128 }
2129
2130 fn append_meta_in_loop(
2133 &self,
2134 descriptor: &mut ColumnDescriptor,
2135 tail_page_bytes: &mut Vec<u8>,
2136 meta: ChunkMetadata,
2137 puts: &mut Vec<BatchPut>,
2138 ) -> Result<()> {
2139 let mut header = DescriptorPageHeader::from_le_bytes(
2140 &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
2141 );
2142 if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
2143 && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
2144 {
2145 tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
2147 header.entry_count += 1;
2148 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
2149 .copy_from_slice(&header.to_le_bytes());
2150 } else {
2151 let new_tail_pk = self.pager.alloc_many(1)?[0];
2153 header.next_page_pk = new_tail_pk;
2154 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
2155 .copy_from_slice(&header.to_le_bytes());
2156 let full_page_to_write = std::mem::take(tail_page_bytes);
2159 puts.push(BatchPut::Raw {
2160 key: descriptor.tail_page_pk,
2161 bytes: full_page_to_write,
2162 });
2163 let new_header = DescriptorPageHeader {
2165 next_page_pk: 0,
2166 entry_count: 1,
2167 _padding: [0; 4],
2168 };
2169 let mut new_page_bytes = new_header.to_le_bytes().to_vec();
2170 new_page_bytes.extend_from_slice(&meta.to_le_bytes());
2171 descriptor.tail_page_pk = new_tail_pk;
2173 *tail_page_bytes = new_page_bytes;
2174 }
2175
2176 descriptor.total_row_count += meta.row_count;
2177 descriptor.total_chunk_count += 1;
2178 Ok(())
2179 }
2180
2181 pub fn verify_integrity(&self) -> Result<()> {
2191 let catalog = self.catalog.read().unwrap();
2192 for (&field_id, &descriptor_pk) in &catalog.map {
2193 let desc_blob = self
2194 .pager
2195 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2196 .pop()
2197 .and_then(|r| match r {
2198 GetResult::Raw { bytes, .. } => Some(bytes),
2199 _ => None,
2200 })
2201 .ok_or_else(|| {
2202 Error::Internal(format!(
2203 "Catalog points to missing descriptor pk={}",
2204 descriptor_pk
2205 ))
2206 })?;
2207 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2208 if descriptor.field_id != field_id {
2209 return Err(Error::Internal(format!(
2210 "Descriptor at pk={} has wrong field_id: expected {:?}, \
2211 got {:?}",
2212 descriptor_pk, field_id, descriptor.field_id
2213 )));
2214 }
2215
2216 let mut actual_rows = 0;
2217 let mut actual_chunks = 0;
2218 let mut current_page_pk = descriptor.head_page_pk;
2219 while current_page_pk != 0 {
2220 let page_blob = self
2221 .pager
2222 .batch_get(&[BatchGet::Raw {
2223 key: current_page_pk,
2224 }])?
2225 .pop()
2226 .and_then(|r| match r {
2227 GetResult::Raw { bytes, .. } => Some(bytes),
2228 _ => None,
2229 })
2230 .ok_or_else(|| {
2231 Error::Internal(format!(
2232 "Descriptor page chain broken at pk={}",
2233 current_page_pk
2234 ))
2235 })?;
2236 let header = DescriptorPageHeader::from_le_bytes(
2237 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2238 );
2239 for i in 0..(header.entry_count as usize) {
2240 let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2241 let end = off + ChunkMetadata::DISK_SIZE;
2242 let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2243 actual_rows += meta.row_count;
2244 actual_chunks += 1;
2245 }
2246 current_page_pk = header.next_page_pk;
2247 }
2248
2249 if descriptor.total_row_count != actual_rows {
2250 return Err(Error::Internal(format!(
2251 "Row count mismatch for field {:?}: descriptor says {}, \
2252 actual is {}",
2253 field_id, descriptor.total_row_count, actual_rows
2254 )));
2255 }
2256 if descriptor.total_chunk_count != actual_chunks {
2257 return Err(Error::Internal(format!(
2258 "Chunk count mismatch for field {:?}: descriptor says {}, \
2259 actual is {}",
2260 field_id, descriptor.total_chunk_count, actual_chunks
2261 )));
2262 }
2263 }
2264 Ok(())
2265 }
2266
2267 pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2272 let catalog = self.catalog.read().unwrap();
2273 let mut all_stats = Vec::new();
2274
2275 for (&field_id, &descriptor_pk) in &catalog.map {
2276 let desc_blob = self
2277 .pager
2278 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2279 .pop()
2280 .and_then(|r| match r {
2281 GetResult::Raw { bytes, .. } => Some(bytes),
2282 _ => None,
2283 })
2284 .ok_or(Error::NotFound)?;
2285 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2286
2287 let mut page_stats = Vec::new();
2288 let mut current_page_pk = descriptor.head_page_pk;
2289 while current_page_pk != 0 {
2290 let page_blob = self
2291 .pager
2292 .batch_get(&[BatchGet::Raw {
2293 key: current_page_pk,
2294 }])?
2295 .pop()
2296 .and_then(|r| match r {
2297 GetResult::Raw { bytes, .. } => Some(bytes),
2298 _ => None,
2299 })
2300 .ok_or(Error::NotFound)?;
2301 let header = DescriptorPageHeader::from_le_bytes(
2302 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2303 );
2304 page_stats.push(DescriptorPageStats {
2305 page_pk: current_page_pk,
2306 entry_count: header.entry_count,
2307 page_size_bytes: page_blob.as_ref().len(),
2308 });
2309 current_page_pk = header.next_page_pk;
2310 }
2311
2312 all_stats.push(ColumnLayoutStats {
2313 field_id,
2314 total_rows: descriptor.total_row_count,
2315 total_chunks: descriptor.total_chunk_count,
2316 pages: page_stats,
2317 });
2318 }
2319 Ok(all_stats)
2320 }
2321}