1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4 ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16 constants::CATALOG_ROOT_PKEY,
17 pager::{BatchGet, BatchPut, GetResult, Pager},
18 serialization::{deserialize_array, serialize_array},
19 types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26pub struct ColumnStore<P: Pager> {
60 pub(crate) pager: Arc<P>,
61 pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
62 cfg: ColumnStoreConfig,
63 dtype_cache: DTypeCache<P>,
64 index_manager: IndexManager<P>,
65}
66
67impl<P> Clone for ColumnStore<P>
68where
69 P: Pager<Blob = EntryHandle> + Send + Sync,
70{
71 fn clone(&self) -> Self {
72 Self {
73 pager: Arc::clone(&self.pager),
74 catalog: Arc::clone(&self.catalog),
75 cfg: self.cfg.clone(),
76 dtype_cache: self.dtype_cache.clone(),
77 index_manager: self.index_manager.clone(),
78 }
79 }
80}
81
82impl<P> ColumnStore<P>
83where
84 P: Pager<Blob = EntryHandle> + Send + Sync,
85{
86 pub fn open(pager: Arc<P>) -> Result<Self> {
96 let cfg = ColumnStoreConfig::default();
97 let catalog = match pager
98 .batch_get(&[BatchGet::Raw {
99 key: CATALOG_ROOT_PKEY,
100 }])?
101 .pop()
102 {
103 Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
104 _ => ColumnCatalog::default(),
105 };
106 let arc_catalog = Arc::new(RwLock::new(catalog));
107
108 let index_manager = IndexManager::new(Arc::clone(&pager));
109
110 Ok(Self {
111 pager: Arc::clone(&pager),
112 catalog: Arc::clone(&arc_catalog),
113 cfg,
114 dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
115 index_manager,
116 })
117 }
118
119 pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
129 self.index_manager.register_index(self, field_id, kind)
130 }
131
132 pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
134 let catalog = self.catalog.read().unwrap();
135 catalog.map.contains_key(&field_id)
136 }
137
138 pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
147 self.index_manager.unregister_index(self, field_id, kind)
148 }
149
150 pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
159 if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
160 return Ok(dt);
161 }
162 self.dtype_cache.dtype_for_field(field_id)
163 }
164
165 pub fn update_data_type(
174 &self,
175 field_id: LogicalFieldId,
176 new_data_type: &DataType,
177 ) -> Result<()> {
178 let descriptor_pk = {
180 let catalog = self.catalog.read().unwrap();
181 *catalog.map.get(&field_id).ok_or_else(|| Error::NotFound)?
182 };
183
184 let mut descriptor = match self
186 .pager
187 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
188 .pop()
189 {
190 Some(GetResult::Raw { bytes, .. }) => ColumnDescriptor::from_le_bytes(bytes.as_ref()),
191 _ => return Err(Error::NotFound),
192 };
193
194 let new_fingerprint = DTypeCache::<P>::dtype_fingerprint(new_data_type);
196 if new_fingerprint != 0 {
197 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, new_fingerprint);
198 }
199
200 self.pager.batch_put(&[BatchPut::Raw {
202 key: descriptor_pk,
203 bytes: descriptor.to_le_bytes(),
204 }])?;
205
206 self.dtype_cache.insert(field_id, new_data_type.clone());
208
209 Ok(())
210 }
211
212 pub fn ensure_column_registered(
218 &self,
219 field_id: LogicalFieldId,
220 data_type: &DataType,
221 ) -> Result<()> {
222 let rid_field_id = rowid_fid(field_id);
223
224 let mut catalog_dirty = false;
225 let descriptor_pk;
226 let rid_descriptor_pk;
227
228 {
229 let mut catalog = self.catalog.write().unwrap();
230 descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
231 pk
232 } else {
233 let pk = self.pager.alloc_many(1)?[0];
234 catalog.map.insert(field_id, pk);
235 catalog_dirty = true;
236 pk
237 };
238
239 rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
240 pk
241 } else {
242 let pk = self.pager.alloc_many(1)?[0];
243 catalog.map.insert(rid_field_id, pk);
244 catalog_dirty = true;
245 pk
246 };
247 }
248
249 let mut puts: Vec<BatchPut> = Vec::new();
250
251 let data_descriptor_missing = self
252 .pager
253 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
254 .pop()
255 .and_then(|r| match r {
256 GetResult::Raw { bytes, .. } => Some(bytes),
257 _ => None,
258 })
259 .is_none();
260
261 if data_descriptor_missing {
262 let (mut descriptor, tail_page) =
263 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
264 let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
265 if fingerprint != 0 {
266 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
267 }
268 puts.push(BatchPut::Raw {
269 key: descriptor.tail_page_pk,
270 bytes: tail_page,
271 });
272 puts.push(BatchPut::Raw {
273 key: descriptor_pk,
274 bytes: descriptor.to_le_bytes(),
275 });
276 }
277
278 let rid_descriptor_missing = self
279 .pager
280 .batch_get(&[BatchGet::Raw {
281 key: rid_descriptor_pk,
282 }])?
283 .pop()
284 .and_then(|r| match r {
285 GetResult::Raw { bytes, .. } => Some(bytes),
286 _ => None,
287 })
288 .is_none();
289
290 if rid_descriptor_missing {
291 let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
292 Arc::clone(&self.pager),
293 rid_descriptor_pk,
294 rid_field_id,
295 )?;
296 let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
297 if fingerprint != 0 {
298 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
299 }
300 puts.push(BatchPut::Raw {
301 key: rid_descriptor.tail_page_pk,
302 bytes: tail_page,
303 });
304 puts.push(BatchPut::Raw {
305 key: rid_descriptor_pk,
306 bytes: rid_descriptor.to_le_bytes(),
307 });
308 }
309
310 self.dtype_cache.insert(field_id, data_type.clone());
311
312 if catalog_dirty {
313 let catalog_bytes = {
314 let catalog = self.catalog.read().unwrap();
315 catalog.to_bytes()
316 };
317 puts.push(BatchPut::Raw {
318 key: CATALOG_ROOT_PKEY,
319 bytes: catalog_bytes,
320 });
321 }
322
323 if !puts.is_empty() {
324 self.pager.batch_put(&puts)?;
325 }
326
327 Ok(())
328 }
329
330 pub fn filter_row_ids<T>(
340 &self,
341 field_id: LogicalFieldId,
342 predicate: &Predicate<T::Value>,
343 ) -> Result<Vec<u64>>
344 where
345 T: FilterDispatch,
346 {
347 tracing::trace!(field=?field_id, "filter_row_ids start");
348 let res = T::run_filter(self, field_id, predicate);
349 if let Err(ref err) = res {
350 tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
351 } else {
352 tracing::trace!(field=?field_id, "filter_row_ids ok");
353 }
354 res
355 }
356
357 pub fn filter_matches<T, F>(
372 &self,
373 field_id: LogicalFieldId,
374 predicate: F,
375 ) -> Result<FilterResult>
376 where
377 T: FilterPrimitive,
378 F: FnMut(T::Native) -> bool,
379 {
380 T::run_filter_with_result(self, field_id, predicate)
381 }
382
383 pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
392 let catalog = self.catalog.read().unwrap();
393 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
394
395 let desc_blob = self
396 .pager
397 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
398 .pop()
399 .and_then(|r| match r {
400 GetResult::Raw { bytes, .. } => Some(bytes),
401 _ => None,
402 })
403 .ok_or(Error::NotFound)?;
404 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
405
406 let kinds = descriptor.get_indexes()?;
407 Ok(kinds)
408 }
409
410 pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
419 let catalog = self.catalog.read().unwrap();
420 let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
421 drop(catalog);
422
423 let desc_blob = self
424 .pager
425 .batch_get(&[BatchGet::Raw { key: desc_pk }])?
426 .pop()
427 .and_then(|r| match r {
428 GetResult::Raw { bytes, .. } => Some(bytes),
429 _ => None,
430 })
431 .ok_or(Error::NotFound)?;
432
433 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
434 Ok(desc.total_row_count)
435 }
436
437 pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
446 use crate::types::Namespace;
447 let catalog = self.catalog.read().unwrap();
449 let candidates: Vec<LogicalFieldId> = catalog
451 .map
452 .keys()
453 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
454 .copied()
455 .collect();
456 drop(catalog);
457
458 if candidates.is_empty() {
459 return Ok(0);
460 }
461
462 let mut max_rows: u64 = 0;
464 for field in candidates {
465 let rows = self.total_rows_for_field(field)?;
466 if rows > max_rows {
467 max_rows = rows;
468 }
469 }
470 Ok(max_rows)
471 }
472
473 pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
479 use crate::types::Namespace;
480
481 let catalog = self.catalog.read().unwrap();
482 catalog
483 .map
484 .keys()
485 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
486 .copied()
487 .collect()
488 }
489
490 pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
500 let rid_fid = rowid_fid(field_id);
501 let catalog = self.catalog.read().unwrap();
502 let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
503 let rid_desc_blob = self
504 .pager
505 .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
506 .pop()
507 .and_then(|r| match r {
508 GetResult::Raw { bytes, .. } => Some(bytes),
509 _ => None,
510 })
511 .ok_or(Error::NotFound)?;
512 let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
513 drop(catalog);
514
515 for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
517 let meta = m?;
518 if meta.row_count == 0 {
519 continue;
520 }
521 if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
522 || row_id > meta.max_val_u64
523 {
524 continue;
525 }
526 let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
528 if meta.value_order_perm_pk != 0 {
529 gets.push(BatchGet::Raw {
530 key: meta.value_order_perm_pk,
531 });
532 }
533 let results = self.pager.batch_get(&gets)?;
534 let mut rid_blob: Option<EntryHandle> = None;
535 let mut perm_blob: Option<EntryHandle> = None;
536 for r in results {
537 if let GetResult::Raw { key, bytes } = r {
538 if key == meta.chunk_pk {
539 rid_blob = Some(bytes);
540 } else if key == meta.value_order_perm_pk {
541 perm_blob = Some(bytes);
542 }
543 }
544 }
545 let Some(rid_blob) = rid_blob else { continue };
547 let rid_any = deserialize_array(rid_blob)?;
548 let rids = rid_any
549 .as_any()
550 .downcast_ref::<UInt64Array>()
551 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
552 if let Some(pblob) = perm_blob {
553 let perm_any = deserialize_array(pblob)?;
554 let perm = perm_any
555 .as_any()
556 .downcast_ref::<UInt32Array>()
557 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
558 let mut lo: isize = 0;
560 let mut hi: isize = (perm.len() as isize) - 1;
561 while lo <= hi {
562 let mid = ((lo + hi) >> 1) as usize;
563 let rid = rids.value(perm.value(mid) as usize);
564 if rid == row_id {
565 return Ok(true);
566 } else if rid < row_id {
567 lo = mid as isize + 1;
568 } else {
569 hi = mid as isize - 1;
570 }
571 }
572 } else {
573 let mut lo: isize = 0;
575 let mut hi: isize = (rids.len() as isize) - 1;
576 while lo <= hi {
577 let mid = ((lo + hi) >> 1) as usize;
578 let rid = rids.value(mid);
579 if rid == row_id {
580 return Ok(true);
581 } else if rid < row_id {
582 lo = mid as isize + 1;
583 } else {
584 hi = mid as isize - 1;
585 }
586 }
587 }
588 }
589 Ok(false)
590 }
591
592 #[allow(unused_variables, unused_assignments)] pub fn append(&self, batch: &RecordBatch) -> Result<()> {
624 tracing::trace!(
625 num_columns = batch.num_columns(),
626 num_rows = batch.num_rows(),
627 "ColumnStore::append BEGIN"
628 );
629 let working_batch: RecordBatch;
635 let batch_ref = {
636 let schema = batch.schema();
637 let row_id_idx = schema
638 .index_of(ROW_ID_COLUMN_NAME)
639 .map_err(|_| Error::Internal("row_id column required".into()))?;
640 let row_id_any = batch.column(row_id_idx).clone();
641 let row_id_arr = row_id_any
642 .as_any()
643 .downcast_ref::<UInt64Array>()
644 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
645
646 let mut is_sorted = true;
648 if !row_id_arr.is_empty() {
649 let mut last = row_id_arr.value(0);
650 for i in 1..row_id_arr.len() {
651 let current = row_id_arr.value(i);
652 if current < last {
653 is_sorted = false;
654 break;
655 }
656 last = current;
657 }
658 }
659
660 if is_sorted {
663 batch
664 } else {
665 let sort_col = SortColumn {
666 values: row_id_any,
667 options: None,
668 };
669 let idx = lexsort_to_indices(&[sort_col], None)?;
670 let perm = idx
671 .as_any()
672 .downcast_ref::<UInt32Array>()
673 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
674 let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
675 for i in 0..batch.num_columns() {
676 cols.push(compute::take(batch.column(i), perm, None)?);
677 }
678 working_batch = RecordBatch::try_new(schema.clone(), cols)
679 .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
680 &working_batch
681 }
682 };
683
684 tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
685
686 let schema = batch_ref.schema();
691 let row_id_idx = schema
692 .index_of(ROW_ID_COLUMN_NAME)
693 .map_err(|_| Error::Internal("row_id column required".into()))?;
694
695 let row_id_arr = batch_ref
697 .column(row_id_idx)
698 .as_any()
699 .downcast_ref::<UInt64Array>()
700 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
701 let mut incoming_ids_map = FxHashMap::default();
702 incoming_ids_map.reserve(row_id_arr.len());
703 for i in 0..row_id_arr.len() {
704 incoming_ids_map.insert(row_id_arr.value(i), i);
705 }
706
707 let mut catalog_dirty = false;
709 let mut puts_rewrites: Vec<BatchPut> = Vec::new();
710 let mut all_rewritten_ids = FxHashSet::default();
711
712 let mut catalog_lock = self.catalog.write().unwrap();
714 for i in 0..batch_ref.num_columns() {
715 if i == row_id_idx {
716 continue;
717 }
718 let field = schema.field(i);
719 if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
720 let field_id = field_id_str
721 .parse::<u64>()
722 .map(LogicalFieldId::from)
723 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
724
725 let rewritten = self.lww_rewrite_for_field(
728 &mut catalog_lock,
729 field_id,
730 &incoming_ids_map,
731 batch_ref.column(i),
732 batch_ref.column(row_id_idx),
733 &mut puts_rewrites,
734 )?;
735 all_rewritten_ids.extend(rewritten);
736 }
737 }
738 drop(catalog_lock);
739
740 if !puts_rewrites.is_empty() {
742 self.pager.batch_put(&puts_rewrites)?;
743 }
744
745 tracing::trace!("ColumnStore::append PHASE 2 complete - LWW rewrites done");
746
747 let batch_to_append = if !all_rewritten_ids.is_empty() {
751 let keep_mask: Vec<bool> = (0..row_id_arr.len())
752 .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
753 .collect();
754 let keep_array = BooleanArray::from(keep_mask);
755 compute::filter_record_batch(batch_ref, &keep_array)?
756 } else {
757 batch_ref.clone()
758 };
759
760 if batch_to_append.num_rows() == 0 {
762 tracing::trace!("ColumnStore::append early exit - no new rows to append");
763 return Ok(());
764 }
765
766 tracing::trace!("ColumnStore::append PHASE 3 complete - filtered for new rows");
767
768 let append_schema = batch_to_append.schema();
772 let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
773 let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
774 let mut puts_appends: Vec<BatchPut> = Vec::new();
775
776 for (i, array) in batch_to_append.columns().iter().enumerate() {
778 if i == append_row_id_idx {
779 continue;
780 }
781
782 let field = append_schema.field(i);
783
784 let field_id = field
785 .metadata()
786 .get(crate::store::FIELD_ID_META_KEY)
787 .ok_or_else(|| Error::Internal("Missing field_id".into()))?
788 .parse::<u64>()
789 .map(LogicalFieldId::from)
790 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
791
792 self.dtype_cache.insert(field_id, field.data_type().clone());
795
796 let (array_clean, rids_clean) = if array.null_count() == 0 {
799 (array.clone(), append_row_id_any.clone())
800 } else {
801 let keep =
802 BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
803 let a = compute::filter(array, &keep)?;
804 let r = compute::filter(&append_row_id_any, &keep)?;
805 (a, r)
806 };
807
808 if array_clean.is_empty() {
809 continue;
810 }
811
812 let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
815 let mut catalog = self.catalog.write().unwrap();
816 let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
817 catalog_dirty = true;
818 self.pager.alloc_many(1).unwrap()[0]
819 });
820 let r_fid = rowid_fid(field_id);
821 let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
822 catalog_dirty = true;
823 self.pager.alloc_many(1).unwrap()[0]
824 });
825 (pk1, pk2, r_fid)
826 };
827
828 let (mut data_descriptor, mut data_tail_page) =
831 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
832 .map_err(|e| {
833 tracing::error!(
834 ?field_id,
835 descriptor_pk,
836 error = ?e,
837 "append: load_or_create failed for data descriptor"
838 );
839 e
840 })?;
841 let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
842 Arc::clone(&self.pager),
843 rid_descriptor_pk,
844 rid_fid,
845 )
846 .map_err(|e| {
847 tracing::error!(
848 ?rid_fid,
849 rid_descriptor_pk,
850 error = ?e,
851 "append: load_or_create failed for rid descriptor"
852 );
853 e
854 })?;
855
856 self.index_manager
860 .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
861
862 let slices = split_to_target_bytes(
864 &array_clean,
865 TARGET_CHUNK_BYTES,
866 self.cfg.varwidth_fallback_rows_per_slice,
867 );
868 let mut row_off = 0usize;
869
870 for s in slices {
872 let rows = s.len();
873 let data_pk = self.pager.alloc_many(1)?[0];
875 let s_norm = zero_offset(&s);
876 let data_bytes = serialize_array(s_norm.as_ref())?;
877 puts_appends.push(BatchPut::Raw {
878 key: data_pk,
879 bytes: data_bytes,
880 });
881
882 let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
884 let rid_norm = zero_offset(&rid_slice);
885 let rid_pk = self.pager.alloc_many(1)?[0];
886 let rid_bytes = serialize_array(rid_norm.as_ref())?;
887 puts_appends.push(BatchPut::Raw {
888 key: rid_pk,
889 bytes: rid_bytes,
890 });
891
892 let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
894 let (min, max) = if !rids_for_meta.is_empty() {
895 let mut min_val = rids_for_meta.value(0);
896 let mut max_val = rids_for_meta.value(0);
897 for i in 1..rids_for_meta.len() {
898 let v = rids_for_meta.value(i);
899 if v < min_val {
900 min_val = v;
901 }
902 if v > max_val {
903 max_val = v;
904 }
905 }
906 (min_val, max_val)
907 } else {
908 (0, 0)
909 };
910
911 let mut data_meta = ChunkMetadata {
914 chunk_pk: data_pk,
915 row_count: rows as u64,
916 serialized_bytes: s_norm.get_array_memory_size() as u64,
917 max_val_u64: u64::MAX,
918 ..Default::default()
919 };
920 let mut rid_meta = ChunkMetadata {
921 chunk_pk: rid_pk,
922 row_count: rows as u64,
923 serialized_bytes: rid_norm.get_array_memory_size() as u64,
924 min_val_u64: min,
925 max_val_u64: max,
926 ..Default::default()
927 };
928
929 self.index_manager.stage_updates_for_new_chunk(
935 field_id,
936 &data_descriptor,
937 &s_norm,
938 &rid_norm,
939 &mut data_meta,
940 &mut rid_meta,
941 &mut puts_appends,
942 )?;
943
944 self.append_meta_in_loop(
946 &mut data_descriptor,
947 &mut data_tail_page,
948 data_meta,
949 &mut puts_appends,
950 )?;
951 self.append_meta_in_loop(
952 &mut rid_descriptor,
953 &mut rid_tail_page,
954 rid_meta,
955 &mut puts_appends,
956 )?;
957 row_off += rows;
958 }
959
960 puts_appends.push(BatchPut::Raw {
963 key: data_descriptor.tail_page_pk,
964 bytes: data_tail_page,
965 });
966 puts_appends.push(BatchPut::Raw {
967 key: descriptor_pk,
968 bytes: data_descriptor.to_le_bytes(),
969 });
970 puts_appends.push(BatchPut::Raw {
971 key: rid_descriptor.tail_page_pk,
972 bytes: rid_tail_page,
973 });
974 puts_appends.push(BatchPut::Raw {
975 key: rid_descriptor_pk,
976 bytes: rid_descriptor.to_le_bytes(),
977 });
978 }
979
980 if catalog_dirty {
983 let catalog = self.catalog.read().unwrap();
984 puts_appends.push(BatchPut::Raw {
985 key: CATALOG_ROOT_PKEY,
986 bytes: catalog.to_bytes(),
987 });
988 }
989
990 if !puts_appends.is_empty() {
994 self.pager.batch_put(&puts_appends)?;
995 }
996 tracing::trace!("ColumnStore::append END - success");
997 Ok(())
998 }
999
1000 fn lww_rewrite_for_field(
1001 &self,
1002 catalog: &mut ColumnCatalog,
1003 field_id: LogicalFieldId,
1004 incoming_ids_map: &FxHashMap<u64, usize>,
1005 incoming_data: &ArrayRef,
1006 incoming_row_ids: &ArrayRef,
1007 puts: &mut Vec<BatchPut>,
1008 ) -> Result<FxHashSet<u64>> {
1009 use crate::store::descriptor::DescriptorIterator;
1010 use crate::store::ingest::ChunkEdit;
1011
1012 if incoming_ids_map.is_empty() {
1014 return Ok(FxHashSet::default());
1015 }
1016 let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
1017
1018 let desc_pk_data = match catalog.map.get(&field_id) {
1020 Some(pk) => *pk,
1021 None => return Ok(FxHashSet::default()),
1022 };
1023 let rid_fid = rowid_fid(field_id);
1024 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1025 Some(pk) => *pk,
1026 None => return Ok(FxHashSet::default()),
1027 };
1028
1029 let gets = vec![
1031 BatchGet::Raw { key: desc_pk_data },
1032 BatchGet::Raw { key: desc_pk_rid },
1033 ];
1034 let results = self.pager.batch_get(&gets)?;
1035 let mut blobs_by_pk = FxHashMap::default();
1036 for r in results {
1037 if let GetResult::Raw { key, bytes } = r {
1038 blobs_by_pk.insert(key, bytes);
1039 }
1040 }
1041
1042 let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
1043 tracing::error!(
1044 ?field_id,
1045 desc_pk_data,
1046 "lww_rewrite: data descriptor blob not found in pager"
1047 );
1048 Error::NotFound
1049 })?;
1050 let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
1051
1052 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
1053 tracing::error!(
1054 ?rid_fid,
1055 desc_pk_rid,
1056 "lww_rewrite: rid descriptor blob not found in pager"
1057 );
1058 Error::NotFound
1059 })?;
1060 let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1061
1062 tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
1063
1064 let mut metas_data: Vec<ChunkMetadata> = Vec::new();
1066 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1067 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
1068 metas_data.push(m.map_err(|e| {
1069 tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1070 e
1071 })?);
1072 }
1073 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1074 metas_rid.push(m.map_err(|e| {
1075 tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1076 e
1077 })?);
1078 }
1079
1080 tracing::trace!(
1081 ?field_id,
1082 data_chunks = metas_data.len(),
1083 rid_chunks = metas_rid.len(),
1084 "lww_rewrite: chunk metadata collected"
1085 );
1086
1087 let rid_in = incoming_row_ids
1089 .as_any()
1090 .downcast_ref::<UInt64Array>()
1091 .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1092 let mut ids_to_delete = FxHashSet::default();
1093 let mut ids_to_upsert = FxHashSet::default();
1094 for i in 0..rid_in.len() {
1095 let rid = rid_in.value(i);
1096 if incoming_data.is_null(i) {
1097 ids_to_delete.insert(rid);
1098 } else {
1099 ids_to_upsert.insert(rid);
1100 }
1101 }
1102
1103 let mut rewritten_ids = FxHashSet::default();
1105 let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1106 let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1107
1108 let n = metas_data.len().min(metas_rid.len());
1109 if n > 0 {
1110 let mut gets_rid = Vec::with_capacity(n);
1112 for rm in metas_rid.iter().take(n) {
1113 gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
1114 }
1115 let rid_results = self.pager.batch_get(&gets_rid)?;
1116 let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1117 for r in rid_results {
1118 if let GetResult::Raw { key, bytes } = r {
1119 rid_blobs.insert(key, bytes);
1120 }
1121 }
1122
1123 for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
1124 if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1125 let rid_arr_any = deserialize_array(rid_blob.clone())?;
1126 let rid_arr = rid_arr_any
1127 .as_any()
1128 .downcast_ref::<UInt64Array>()
1129 .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1130 for j in 0..rid_arr.len() {
1131 let rid = rid_arr.value(j);
1132 if incoming_ids.contains(&rid) {
1133 if ids_to_delete.contains(&rid) {
1134 hit_del.entry(i).or_default().push(rid);
1135 } else if ids_to_upsert.contains(&rid) {
1136 hit_up.entry(i).or_default().push(rid);
1137 }
1138 rewritten_ids.insert(rid);
1139 }
1140 }
1141 }
1142 }
1143 }
1144
1145 if hit_up.is_empty() && hit_del.is_empty() {
1146 return Ok(rewritten_ids);
1147 }
1148
1149 let mut hit_set = FxHashSet::default();
1151 hit_set.extend(hit_up.keys().copied());
1152 hit_set.extend(hit_del.keys().copied());
1153 let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1154
1155 let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1156 for &i in &hit_idxs {
1157 gets.push(BatchGet::Raw {
1158 key: metas_data[i].chunk_pk,
1159 });
1160 gets.push(BatchGet::Raw {
1161 key: metas_rid[i].chunk_pk,
1162 });
1163 }
1164 let results = self.pager.batch_get(&gets)?;
1165 let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1166 for r in results {
1167 if let GetResult::Raw { key, bytes } = r {
1168 blob_map.insert(key, bytes);
1169 }
1170 }
1171
1172 for i in hit_idxs {
1174 let old_data_arr =
1175 deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1176 let old_rid_arr_any =
1177 deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1178 let old_rid_arr = old_rid_arr_any
1179 .as_any()
1180 .downcast_ref::<UInt64Array>()
1181 .unwrap();
1182
1183 let up_vec = hit_up.remove(&i).unwrap_or_default();
1184 let del_vec = hit_del.remove(&i).unwrap_or_default();
1185
1186 let edit = ChunkEdit::from_lww_upsert(
1188 old_rid_arr,
1189 &up_vec,
1190 &del_vec,
1191 incoming_data,
1192 incoming_row_ids,
1193 incoming_ids_map,
1194 )?;
1195
1196 let (new_data_arr, new_rid_arr) =
1197 ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1198
1199 let data_bytes = serialize_array(&new_data_arr)?;
1201 puts.push(BatchPut::Raw {
1202 key: metas_data[i].chunk_pk,
1203 bytes: data_bytes,
1204 });
1205 metas_data[i].row_count = new_data_arr.len() as u64;
1206 metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1207
1208 if let Some(rarr) = new_rid_arr {
1210 let rid_bytes = serialize_array(&rarr)?;
1211 puts.push(BatchPut::Raw {
1212 key: metas_rid[i].chunk_pk,
1213 bytes: rid_bytes,
1214 });
1215 metas_rid[i].row_count = rarr.len() as u64;
1216 metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1217 }
1218
1219 if metas_data[i].value_order_perm_pk != 0 {
1221 let sort_col = SortColumn {
1222 values: new_data_arr,
1223 options: None,
1224 };
1225 let idx = lexsort_to_indices(&[sort_col], None)?;
1226 let perm_bytes = serialize_array(&idx)?;
1227 puts.push(BatchPut::Raw {
1228 key: metas_data[i].value_order_perm_pk,
1229 bytes: perm_bytes,
1230 });
1231 }
1232 }
1233
1234 descriptor_data.rewrite_pages(
1236 Arc::clone(&self.pager),
1237 desc_pk_data,
1238 &mut metas_data,
1239 puts,
1240 )?;
1241 descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1242
1243 Ok(rewritten_ids)
1244 }
1245
1246 fn stage_delete_rows_for_field(
1247 &self,
1248 field_id: LogicalFieldId,
1249 rows_to_delete: &[RowId],
1250 staged_puts: &mut Vec<BatchPut>,
1251 ) -> Result<bool> {
1252 tracing::warn!(
1253 field_id = ?field_id,
1254 rows = rows_to_delete.len(),
1255 "delete_rows stage_delete_rows_for_field: start"
1256 );
1257 use crate::store::descriptor::DescriptorIterator;
1258 use crate::store::ingest::ChunkEdit;
1259
1260 if rows_to_delete.is_empty() {
1261 return Ok(false);
1262 }
1263
1264 let mut del_iter = rows_to_delete.iter().copied();
1266 let mut cur_del = del_iter.next();
1267 let mut last_seen: Option<u64> = cur_del;
1268
1269 let catalog = self.catalog.read().unwrap();
1271 let desc_pk = match catalog.map.get(&field_id) {
1272 Some(pk) => *pk,
1273 None => {
1274 tracing::trace!(
1275 field_id = ?field_id,
1276 "delete_rows stage_delete_rows_for_field: data descriptor missing"
1277 );
1278 return Err(Error::NotFound);
1279 }
1280 };
1281 let rid_fid = rowid_fid(field_id);
1282 let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1283 tracing::warn!(
1284 field_id = ?field_id,
1285 desc_pk,
1286 desc_pk_rid = ?desc_pk_rid,
1287 "delete_rows stage_delete_rows_for_field: descriptor keys"
1288 );
1289
1290 let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1292 if let Some(pk) = desc_pk_rid {
1293 gets.push(BatchGet::Raw { key: pk });
1294 }
1295 let results = match self.pager.batch_get(&gets) {
1296 Ok(res) => res,
1297 Err(err) => {
1298 tracing::trace!(
1299 field_id = ?field_id,
1300 error = ?err,
1301 "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1302 );
1303 return Err(err);
1304 }
1305 };
1306 let mut blobs_by_pk = FxHashMap::default();
1307 for res in results {
1308 if let GetResult::Raw { key, bytes } = res {
1309 blobs_by_pk.insert(key, bytes);
1310 }
1311 }
1312
1313 tracing::warn!(
1314 field_id = ?field_id,
1315 desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1316 rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1317 "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1318 );
1319
1320 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1321 tracing::trace!(
1322 field_id = ?field_id,
1323 desc_pk,
1324 "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1325 );
1326 Error::Internal(format!(
1327 "descriptor pk={} missing during delete_rows for field {:?}",
1328 desc_pk, field_id
1329 ))
1330 })?;
1331 let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1332
1333 let mut metas: Vec<ChunkMetadata> = Vec::new();
1335 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1336 metas.push(m?);
1337 }
1338 if metas.is_empty() {
1339 drop(catalog);
1340 return Ok(false);
1341 }
1342
1343 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1345 let mut descriptor_rid: Option<ColumnDescriptor> = None;
1346 tracing::warn!(
1347 field_id = ?field_id,
1348 metas_len = metas.len(),
1349 desc_pk_rid = ?desc_pk_rid,
1350 "delete_rows stage_delete_rows_for_field: data metas loaded"
1351 );
1352 if let Some(pk_rid) = desc_pk_rid
1353 && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1354 {
1355 let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1356 for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1357 metas_rid.push(m?);
1358 }
1359 descriptor_rid = Some(d_rid);
1360 }
1361
1362 tracing::warn!(
1363 field_id = ?field_id,
1364 metas_rid_len = metas_rid.len(),
1365 "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1366 );
1367
1368 let mut cum_rows = 0u64;
1369 let mut any_changed = false;
1370
1371 for (i, meta) in metas.iter_mut().enumerate() {
1372 let start_u64 = cum_rows;
1373 let end_u64 = start_u64 + meta.row_count;
1374
1375 while let Some(d) = cur_del {
1377 if d < start_u64
1378 && let Some(prev) = last_seen
1379 {
1380 if d < prev {
1381 return Err(Error::Internal(
1382 "rows_to_delete must be ascending/unique".into(),
1383 ));
1384 }
1385
1386 last_seen = Some(d);
1387 cur_del = del_iter.next();
1388 } else {
1389 break;
1390 }
1391 }
1392
1393 let rows = meta.row_count as usize;
1395 let mut del_local: FxHashSet<usize> = FxHashSet::default();
1396 while let Some(d) = cur_del {
1397 if d >= end_u64 {
1398 break;
1399 }
1400 del_local.insert((d - start_u64) as usize);
1401 last_seen = Some(d);
1402 cur_del = del_iter.next();
1403 }
1404
1405 if del_local.is_empty() {
1406 cum_rows = end_u64;
1407 continue;
1408 }
1409
1410 let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1412 if let Some(rm) = metas_rid.get(i) {
1413 chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1414 }
1415 let chunk_results = match self.pager.batch_get(&chunk_gets) {
1416 Ok(res) => res,
1417 Err(err) => {
1418 tracing::trace!(
1419 field_id = ?field_id,
1420 chunk_pk = meta.chunk_pk,
1421 error = ?err,
1422 "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1423 );
1424 return Err(err);
1425 }
1426 };
1427 let mut chunk_blobs = FxHashMap::default();
1428 for res in chunk_results {
1429 if let GetResult::Raw { key, bytes } = res {
1430 chunk_blobs.insert(key, bytes);
1431 }
1432 }
1433
1434 tracing::warn!(
1435 field_id = ?field_id,
1436 chunk_pk = meta.chunk_pk,
1437 rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1438 data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1439 rid_found = metas_rid
1440 .get(i)
1441 .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1442 "delete_rows stage_delete_rows_for_field: chunk fetch status"
1443 );
1444
1445 let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1446 Some(bytes) => bytes,
1447 None => {
1448 tracing::trace!(
1449 field_id = ?field_id,
1450 chunk_pk = meta.chunk_pk,
1451 "delete_rows stage_delete_rows_for_field: chunk missing"
1452 );
1453 return Err(Error::NotFound);
1454 }
1455 };
1456 let data_arr = deserialize_array(data_blob)?;
1457
1458 let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1459 let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1460 Some(bytes) => bytes,
1461 None => {
1462 tracing::trace!(
1463 field_id = ?field_id,
1464 rowid_chunk_pk = rm.chunk_pk,
1465 "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1466 );
1467 return Err(Error::NotFound);
1468 }
1469 };
1470 Some(deserialize_array(rid_blob)?)
1471 } else {
1472 None
1473 };
1474
1475 let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1477
1478 let (new_data_arr, new_rid_arr) =
1480 ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1481
1482 let data_bytes = serialize_array(&new_data_arr)?;
1484 staged_puts.push(BatchPut::Raw {
1485 key: meta.chunk_pk,
1486 bytes: data_bytes,
1487 });
1488 meta.row_count = new_data_arr.len() as u64;
1489 meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1490
1491 if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1493 let rm = metas_rid.get_mut(i).unwrap();
1494 let rid_bytes = serialize_array(&rids)?;
1495 staged_puts.push(BatchPut::Raw {
1496 key: rm.chunk_pk,
1497 bytes: rid_bytes,
1498 });
1499 rm.row_count = rids.len() as u64;
1500 rm.serialized_bytes = rids.get_array_memory_size() as u64;
1501 }
1502
1503 if meta.value_order_perm_pk != 0 {
1505 let sort_column = SortColumn {
1506 values: new_data_arr,
1507 options: None,
1508 };
1509 let indices = lexsort_to_indices(&[sort_column], None)?;
1510 let perm_bytes = serialize_array(&indices)?;
1511 staged_puts.push(BatchPut::Raw {
1512 key: meta.value_order_perm_pk,
1513 bytes: perm_bytes,
1514 });
1515 }
1516
1517 cum_rows = end_u64;
1518 any_changed = true;
1519 }
1520
1521 descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1523 if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1524 rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1525 }
1526 drop(catalog);
1527 tracing::trace!(
1528 field_id = ?field_id,
1529 changed = any_changed,
1530 "delete_rows stage_delete_rows_for_field: finished stage"
1531 );
1532 Ok(any_changed)
1533 }
1534
1535 pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1541 if fields.is_empty() || rows_to_delete.is_empty() {
1542 return Ok(());
1543 }
1544
1545 let mut puts = Vec::new();
1546 let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1547 let mut table_id: Option<TableId> = None;
1548
1549 tracing::warn!(
1550 fields = fields.len(),
1551 rows = rows_to_delete.len(),
1552 "delete_rows begin"
1553 );
1554 for field_id in fields {
1555 tracing::warn!(field = ?field_id, "delete_rows iter field");
1556 if let Some(expected) = table_id {
1557 if field_id.table_id() != expected {
1558 return Err(Error::InvalidArgumentError(
1559 "delete_rows requires fields from the same table".into(),
1560 ));
1561 }
1562 } else {
1563 table_id = Some(field_id.table_id());
1564 }
1565
1566 if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1567 touched.insert(*field_id);
1568 }
1569 }
1570
1571 if puts.is_empty() {
1572 return Ok(());
1573 }
1574
1575 self.pager.batch_put(&puts)?;
1576
1577 tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1578
1579 for field_id in touched {
1580 self.compact_field_bounded(field_id)?;
1581 }
1582 tracing::warn!("delete_rows complete");
1583 Ok(())
1584 }
1585
1586 pub(crate) fn write_descriptor_chain(
1592 &self,
1593 descriptor_pk: PhysicalKey,
1594 descriptor: &mut ColumnDescriptor,
1595 new_metas: &[ChunkMetadata],
1596 puts: &mut Vec<BatchPut>,
1597 frees: &mut Vec<PhysicalKey>,
1598 ) -> Result<()> {
1599 let mut old_pages = Vec::new();
1601 let mut pk = descriptor.head_page_pk;
1602 while pk != 0 {
1603 let page_blob = self
1604 .pager
1605 .batch_get(&[BatchGet::Raw { key: pk }])?
1606 .pop()
1607 .and_then(|res| match res {
1608 GetResult::Raw { bytes, .. } => Some(bytes),
1609 _ => None,
1610 })
1611 .ok_or(Error::NotFound)?;
1612 let header = DescriptorPageHeader::from_le_bytes(
1613 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1614 );
1615 old_pages.push(pk);
1616 pk = header.next_page_pk;
1617 }
1618
1619 let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1621 let need_pages = if new_metas.is_empty() {
1622 0
1623 } else {
1624 new_metas.len().div_ceil(per)
1625 };
1626 if need_pages == 0 {
1628 frees.extend(old_pages.iter().copied());
1629
1630 descriptor.head_page_pk = 0;
1635 descriptor.tail_page_pk = 0;
1636 descriptor.total_row_count = 0;
1637 descriptor.total_chunk_count = 0;
1638 puts.push(BatchPut::Raw {
1639 key: descriptor_pk,
1640 bytes: descriptor.to_le_bytes(),
1641 });
1642 return Ok(());
1643 }
1644
1645 let mut pages = Vec::with_capacity(need_pages);
1647 if !old_pages.is_empty() {
1648 pages.push(old_pages[0]);
1649 } else {
1650 pages.push(self.pager.alloc_many(1)?[0]);
1651 descriptor.head_page_pk = pages[0];
1652 }
1653 if need_pages > pages.len() {
1654 let extra = self.pager.alloc_many(need_pages - pages.len())?;
1655 pages.extend(extra);
1656 }
1657
1658 if old_pages.len() > need_pages {
1660 frees.extend(old_pages[need_pages..].iter().copied());
1661 }
1662
1663 let mut off = 0usize;
1665 for (i, page_pk) in pages.iter().copied().enumerate() {
1666 let remain = new_metas.len() - off;
1667 let count = remain.min(per);
1668 let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1669 let header = DescriptorPageHeader {
1670 next_page_pk: next,
1671 entry_count: count as u32,
1672 _padding: [0; 4],
1673 };
1674 let mut page_bytes = header.to_le_bytes().to_vec();
1675 for m in &new_metas[off..off + count] {
1676 page_bytes.extend_from_slice(&m.to_le_bytes());
1677 }
1678 puts.push(BatchPut::Raw {
1679 key: page_pk,
1680 bytes: page_bytes,
1681 });
1682 off += count;
1683 }
1684
1685 descriptor.tail_page_pk = *pages.last().unwrap();
1686 descriptor.total_chunk_count = new_metas.len() as u64;
1687 descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1688 puts.push(BatchPut::Raw {
1689 key: descriptor_pk,
1690 bytes: descriptor.to_le_bytes(),
1691 });
1692 Ok(())
1693 }
1694
1695 fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1699 let mut catalog = self.catalog.write().unwrap();
1701
1702 let desc_pk = match catalog.map.get(&field_id) {
1703 Some(&pk) => pk,
1704 None => return Ok(()),
1705 };
1706 let rid_fid = rowid_fid(field_id);
1707 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1708 Some(&pk) => pk,
1709 None => return Ok(()),
1710 };
1711
1712 let gets = vec![
1714 BatchGet::Raw { key: desc_pk },
1715 BatchGet::Raw { key: desc_pk_rid },
1716 ];
1717 let results = self.pager.batch_get(&gets)?;
1718 let mut blobs_by_pk = FxHashMap::default();
1719 for res in results {
1720 if let GetResult::Raw { key, bytes } = res {
1721 blobs_by_pk.insert(key, bytes);
1722 }
1723 }
1724 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1725 let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1726 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1727 let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1728
1729 let mut metas = Vec::new();
1731 for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1732 metas.push(m?);
1733 }
1734 let mut metas_rid = Vec::new();
1735 for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1736 metas_rid.push(m?);
1737 }
1738 if metas.is_empty() || metas_rid.is_empty() {
1739 return Ok(());
1740 }
1741
1742 let mut puts: Vec<BatchPut> = Vec::new();
1743 let mut frees: Vec<PhysicalKey> = Vec::new();
1744 let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1745 let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1746
1747 let mut i = 0usize;
1748 while i < metas.len() {
1749 let sz = metas[i].serialized_bytes as usize;
1750 if sz >= MIN_CHUNK_BYTES {
1752 new_metas.push(metas[i]);
1753 new_rid_metas.push(metas_rid[i]);
1754 i += 1;
1755 continue;
1756 }
1757
1758 let mut j = i;
1760 let mut run_bytes = 0usize;
1761 while j < metas.len() {
1762 let b = metas[j].serialized_bytes as usize;
1763 if b >= TARGET_CHUNK_BYTES {
1764 break;
1765 }
1766 if run_bytes + b > MAX_MERGE_RUN_BYTES {
1767 break;
1768 }
1769 run_bytes += b;
1770 j += 1;
1771 }
1772 if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1773 new_metas.push(metas[i]);
1774 new_rid_metas.push(metas_rid[i]);
1775 i += 1;
1776 continue;
1777 }
1778
1779 let mut gets = Vec::with_capacity((j - i) * 2);
1781 for k in i..j {
1782 gets.push(BatchGet::Raw {
1783 key: metas[k].chunk_pk,
1784 });
1785 gets.push(BatchGet::Raw {
1786 key: metas_rid[k].chunk_pk,
1787 });
1788 }
1789 let results = self.pager.batch_get(&gets)?;
1790 let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1791 for r in results {
1792 match r {
1793 GetResult::Raw { key, bytes } => {
1794 by_pk.insert(key, bytes);
1795 }
1796 _ => return Err(Error::NotFound),
1797 }
1798 }
1799 let mut data_parts = Vec::with_capacity(j - i);
1800 let mut rid_parts = Vec::with_capacity(j - i);
1801 for k in i..j {
1802 let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1803 data_parts.push(deserialize_array(db.clone())?);
1804 let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1805 rid_parts.push(deserialize_array(rb.clone())?);
1806 }
1807 let merged_data = concat_many(data_parts.iter().collect())?;
1808 let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1809
1810 let slices = split_to_target_bytes(
1812 &merged_data,
1813 TARGET_CHUNK_BYTES,
1814 self.cfg.varwidth_fallback_rows_per_slice,
1815 );
1816 let mut rid_off = 0usize;
1817 let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1818
1819 for s in slices {
1820 let rows = s.len();
1821 let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1823 let rid_norm = zero_offset(&rid_ref);
1824 let rid_pk = self.pager.alloc_many(1)?[0];
1825 let rid_bytes = serialize_array(rid_norm.as_ref())?;
1826
1827 let data_pk = self.pager.alloc_many(1)?[0];
1828 let s_norm = zero_offset(&s);
1829 let data_bytes = serialize_array(s_norm.as_ref())?;
1830 puts.push(BatchPut::Raw {
1831 key: data_pk,
1832 bytes: data_bytes,
1833 });
1834 puts.push(BatchPut::Raw {
1835 key: rid_pk,
1836 bytes: rid_bytes,
1837 });
1838 let mut meta = ChunkMetadata {
1839 chunk_pk: data_pk,
1840 row_count: rows as u64,
1841 serialized_bytes: s_norm.get_array_memory_size() as u64,
1842 max_val_u64: u64::MAX,
1843 ..Default::default()
1844 };
1845 if need_perms {
1847 let sort_col = SortColumn {
1848 values: s.clone(),
1849 options: None,
1850 };
1851 let idx = lexsort_to_indices(&[sort_col], None)?;
1852 let perm_bytes = serialize_array(&idx)?;
1853 let perm_pk = self.pager.alloc_many(1)?[0];
1854 puts.push(BatchPut::Raw {
1855 key: perm_pk,
1856 bytes: perm_bytes,
1857 });
1858 meta.value_order_perm_pk = perm_pk;
1859 }
1860
1861 let rid_any = rid_norm.clone();
1863 let rids = rid_any
1864 .as_any()
1865 .downcast_ref::<UInt64Array>()
1866 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1867 let mut min = u64::MAX;
1868 let mut max = 0u64;
1869 let mut sorted_rids = true;
1870 let mut last_v = 0u64;
1871 for ii in 0..rids.len() {
1872 let v = rids.value(ii);
1873 if ii == 0 {
1874 last_v = v;
1875 } else if v < last_v {
1876 sorted_rids = false;
1877 } else {
1878 last_v = v;
1879 }
1880 if v < min {
1881 min = v;
1882 }
1883 if v > max {
1884 max = v;
1885 }
1886 }
1887 let mut rid_perm_pk = 0u64;
1888 if !sorted_rids {
1889 let rid_sort_col = SortColumn {
1890 values: rid_any,
1891 options: None,
1892 };
1893 let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1894 let rid_perm_bytes = serialize_array(&rid_idx)?;
1895 rid_perm_pk = self.pager.alloc_many(1)?[0];
1896 puts.push(BatchPut::Raw {
1897 key: rid_perm_pk,
1898 bytes: rid_perm_bytes,
1899 });
1900 }
1901 let rid_meta = ChunkMetadata {
1902 chunk_pk: rid_pk,
1903 value_order_perm_pk: rid_perm_pk,
1904 row_count: rows as u64,
1905 serialized_bytes: rid_norm.get_array_memory_size() as u64,
1906 min_val_u64: if rows > 0 { min } else { 0 },
1907 max_val_u64: if rows > 0 { max } else { 0 },
1908 };
1909 new_metas.push(meta);
1910 new_rid_metas.push(rid_meta);
1911 rid_off += rows;
1912 }
1913
1914 for k in i..j {
1916 frees.push(metas[k].chunk_pk);
1917 if metas[k].value_order_perm_pk != 0 {
1918 frees.push(metas[k].value_order_perm_pk);
1919 }
1920 frees.push(metas_rid[k].chunk_pk);
1921 if metas_rid[k].value_order_perm_pk != 0 {
1922 frees.push(metas_rid[k].value_order_perm_pk);
1923 }
1924 }
1925
1926 i = j;
1927 }
1928
1929 if new_metas.is_empty() {
1931 self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1933 self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1934 catalog.map.remove(&field_id);
1935 catalog.map.remove(&rid_fid);
1936 puts.push(BatchPut::Raw {
1937 key: CATALOG_ROOT_PKEY,
1938 bytes: catalog.to_bytes(),
1939 });
1940 if !puts.is_empty() {
1941 self.pager.batch_put(&puts)?;
1942 }
1943 if !frees.is_empty() {
1944 self.pager.free_many(&frees)?;
1945 }
1946 return Ok(());
1947 }
1948
1949 self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1951 self.write_descriptor_chain(
1952 desc_pk_rid,
1953 &mut desc_rid,
1954 &new_rid_metas,
1955 &mut puts,
1956 &mut frees,
1957 )?;
1958 if !puts.is_empty() {
1960 self.pager.batch_put(&puts)?;
1961 }
1962 if !frees.is_empty() {
1963 self.pager.free_many(&frees)?;
1964 }
1965
1966 Ok(())
1967 }
1968
1969 fn append_meta_in_loop(
1972 &self,
1973 descriptor: &mut ColumnDescriptor,
1974 tail_page_bytes: &mut Vec<u8>,
1975 meta: ChunkMetadata,
1976 puts: &mut Vec<BatchPut>,
1977 ) -> Result<()> {
1978 let mut header = DescriptorPageHeader::from_le_bytes(
1979 &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1980 );
1981 if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1982 && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1983 {
1984 tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1986 header.entry_count += 1;
1987 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1988 .copy_from_slice(&header.to_le_bytes());
1989 } else {
1990 let new_tail_pk = self.pager.alloc_many(1)?[0];
1992 header.next_page_pk = new_tail_pk;
1993 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1994 .copy_from_slice(&header.to_le_bytes());
1995 let full_page_to_write = std::mem::take(tail_page_bytes);
1998 puts.push(BatchPut::Raw {
1999 key: descriptor.tail_page_pk,
2000 bytes: full_page_to_write,
2001 });
2002 let new_header = DescriptorPageHeader {
2004 next_page_pk: 0,
2005 entry_count: 1,
2006 _padding: [0; 4],
2007 };
2008 let mut new_page_bytes = new_header.to_le_bytes().to_vec();
2009 new_page_bytes.extend_from_slice(&meta.to_le_bytes());
2010 descriptor.tail_page_pk = new_tail_pk;
2012 *tail_page_bytes = new_page_bytes;
2013 }
2014
2015 descriptor.total_row_count += meta.row_count;
2016 descriptor.total_chunk_count += 1;
2017 Ok(())
2018 }
2019
2020 pub fn verify_integrity(&self) -> Result<()> {
2030 let catalog = self.catalog.read().unwrap();
2031 for (&field_id, &descriptor_pk) in &catalog.map {
2032 let desc_blob = self
2033 .pager
2034 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2035 .pop()
2036 .and_then(|r| match r {
2037 GetResult::Raw { bytes, .. } => Some(bytes),
2038 _ => None,
2039 })
2040 .ok_or_else(|| {
2041 Error::Internal(format!(
2042 "Catalog points to missing descriptor pk={}",
2043 descriptor_pk
2044 ))
2045 })?;
2046 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2047 if descriptor.field_id != field_id {
2048 return Err(Error::Internal(format!(
2049 "Descriptor at pk={} has wrong field_id: expected {:?}, \
2050 got {:?}",
2051 descriptor_pk, field_id, descriptor.field_id
2052 )));
2053 }
2054
2055 let mut actual_rows = 0;
2056 let mut actual_chunks = 0;
2057 let mut current_page_pk = descriptor.head_page_pk;
2058 while current_page_pk != 0 {
2059 let page_blob = self
2060 .pager
2061 .batch_get(&[BatchGet::Raw {
2062 key: current_page_pk,
2063 }])?
2064 .pop()
2065 .and_then(|r| match r {
2066 GetResult::Raw { bytes, .. } => Some(bytes),
2067 _ => None,
2068 })
2069 .ok_or_else(|| {
2070 Error::Internal(format!(
2071 "Descriptor page chain broken at pk={}",
2072 current_page_pk
2073 ))
2074 })?;
2075 let header = DescriptorPageHeader::from_le_bytes(
2076 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2077 );
2078 for i in 0..(header.entry_count as usize) {
2079 let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2080 let end = off + ChunkMetadata::DISK_SIZE;
2081 let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2082 actual_rows += meta.row_count;
2083 actual_chunks += 1;
2084 }
2085 current_page_pk = header.next_page_pk;
2086 }
2087
2088 if descriptor.total_row_count != actual_rows {
2089 return Err(Error::Internal(format!(
2090 "Row count mismatch for field {:?}: descriptor says {}, \
2091 actual is {}",
2092 field_id, descriptor.total_row_count, actual_rows
2093 )));
2094 }
2095 if descriptor.total_chunk_count != actual_chunks {
2096 return Err(Error::Internal(format!(
2097 "Chunk count mismatch for field {:?}: descriptor says {}, \
2098 actual is {}",
2099 field_id, descriptor.total_chunk_count, actual_chunks
2100 )));
2101 }
2102 }
2103 Ok(())
2104 }
2105
2106 pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2111 let catalog = self.catalog.read().unwrap();
2112 let mut all_stats = Vec::new();
2113
2114 for (&field_id, &descriptor_pk) in &catalog.map {
2115 let desc_blob = self
2116 .pager
2117 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2118 .pop()
2119 .and_then(|r| match r {
2120 GetResult::Raw { bytes, .. } => Some(bytes),
2121 _ => None,
2122 })
2123 .ok_or(Error::NotFound)?;
2124 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2125
2126 let mut page_stats = Vec::new();
2127 let mut current_page_pk = descriptor.head_page_pk;
2128 while current_page_pk != 0 {
2129 let page_blob = self
2130 .pager
2131 .batch_get(&[BatchGet::Raw {
2132 key: current_page_pk,
2133 }])?
2134 .pop()
2135 .and_then(|r| match r {
2136 GetResult::Raw { bytes, .. } => Some(bytes),
2137 _ => None,
2138 })
2139 .ok_or(Error::NotFound)?;
2140 let header = DescriptorPageHeader::from_le_bytes(
2141 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2142 );
2143 page_stats.push(DescriptorPageStats {
2144 page_pk: current_page_pk,
2145 entry_count: header.entry_count,
2146 page_size_bytes: page_blob.as_ref().len(),
2147 });
2148 current_page_pk = header.next_page_pk;
2149 }
2150
2151 all_stats.push(ColumnLayoutStats {
2152 field_id,
2153 total_rows: descriptor.total_row_count,
2154 total_chunks: descriptor.total_chunk_count,
2155 pages: page_stats,
2156 });
2157 }
2158 Ok(all_stats)
2159 }
2160}