1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4 ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16 constants::CATALOG_ROOT_PKEY,
17 pager::{BatchGet, BatchPut, GetResult, Pager},
18 serialization::{deserialize_array, serialize_array},
19 types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26pub struct ColumnStore<P: Pager> {
51 pub(crate) pager: Arc<P>,
52 pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
53 cfg: ColumnStoreConfig,
54 dtype_cache: DTypeCache<P>,
55 index_manager: IndexManager<P>,
56}
57
58impl<P> Clone for ColumnStore<P>
59where
60 P: Pager<Blob = EntryHandle> + Send + Sync,
61{
62 fn clone(&self) -> Self {
63 Self {
64 pager: Arc::clone(&self.pager),
65 catalog: Arc::clone(&self.catalog),
66 cfg: self.cfg.clone(),
67 dtype_cache: self.dtype_cache.clone(),
68 index_manager: self.index_manager.clone(),
69 }
70 }
71}
72
73impl<P> ColumnStore<P>
74where
75 P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77 pub fn open(pager: Arc<P>) -> Result<Self> {
87 let cfg = ColumnStoreConfig::default();
88 let catalog = match pager
89 .batch_get(&[BatchGet::Raw {
90 key: CATALOG_ROOT_PKEY,
91 }])?
92 .pop()
93 {
94 Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
95 _ => ColumnCatalog::default(),
96 };
97 let arc_catalog = Arc::new(RwLock::new(catalog));
98
99 let index_manager = IndexManager::new(Arc::clone(&pager));
100
101 Ok(Self {
102 pager: Arc::clone(&pager),
103 catalog: Arc::clone(&arc_catalog),
104 cfg,
105 dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
106 index_manager,
107 })
108 }
109
110 pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
120 self.index_manager.register_index(self, field_id, kind)
121 }
122
123 pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
125 let catalog = self.catalog.read().unwrap();
126 catalog.map.contains_key(&field_id)
127 }
128
129 pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
138 self.index_manager.unregister_index(self, field_id, kind)
139 }
140
141 pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
150 if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
151 return Ok(dt);
152 }
153 self.dtype_cache.dtype_for_field(field_id)
154 }
155
156 pub fn ensure_column_registered(
162 &self,
163 field_id: LogicalFieldId,
164 data_type: &DataType,
165 ) -> Result<()> {
166 let rid_field_id = rowid_fid(field_id);
167
168 let mut catalog_dirty = false;
169 let descriptor_pk;
170 let rid_descriptor_pk;
171
172 {
173 let mut catalog = self.catalog.write().unwrap();
174 descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
175 pk
176 } else {
177 let pk = self.pager.alloc_many(1)?[0];
178 catalog.map.insert(field_id, pk);
179 catalog_dirty = true;
180 pk
181 };
182
183 rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
184 pk
185 } else {
186 let pk = self.pager.alloc_many(1)?[0];
187 catalog.map.insert(rid_field_id, pk);
188 catalog_dirty = true;
189 pk
190 };
191 }
192
193 let mut puts: Vec<BatchPut> = Vec::new();
194
195 let data_descriptor_missing = self
196 .pager
197 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
198 .pop()
199 .and_then(|r| match r {
200 GetResult::Raw { bytes, .. } => Some(bytes),
201 _ => None,
202 })
203 .is_none();
204
205 if data_descriptor_missing {
206 let (mut descriptor, tail_page) =
207 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
208 let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
209 if fingerprint != 0 {
210 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
211 }
212 puts.push(BatchPut::Raw {
213 key: descriptor.tail_page_pk,
214 bytes: tail_page,
215 });
216 puts.push(BatchPut::Raw {
217 key: descriptor_pk,
218 bytes: descriptor.to_le_bytes(),
219 });
220 }
221
222 let rid_descriptor_missing = self
223 .pager
224 .batch_get(&[BatchGet::Raw {
225 key: rid_descriptor_pk,
226 }])?
227 .pop()
228 .and_then(|r| match r {
229 GetResult::Raw { bytes, .. } => Some(bytes),
230 _ => None,
231 })
232 .is_none();
233
234 if rid_descriptor_missing {
235 let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
236 Arc::clone(&self.pager),
237 rid_descriptor_pk,
238 rid_field_id,
239 )?;
240 let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
241 if fingerprint != 0 {
242 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
243 }
244 puts.push(BatchPut::Raw {
245 key: rid_descriptor.tail_page_pk,
246 bytes: tail_page,
247 });
248 puts.push(BatchPut::Raw {
249 key: rid_descriptor_pk,
250 bytes: rid_descriptor.to_le_bytes(),
251 });
252 }
253
254 self.dtype_cache.insert(field_id, data_type.clone());
255
256 if catalog_dirty {
257 let catalog_bytes = {
258 let catalog = self.catalog.read().unwrap();
259 catalog.to_bytes()
260 };
261 puts.push(BatchPut::Raw {
262 key: CATALOG_ROOT_PKEY,
263 bytes: catalog_bytes,
264 });
265 }
266
267 if !puts.is_empty() {
268 self.pager.batch_put(&puts)?;
269 }
270
271 Ok(())
272 }
273
274 pub fn filter_row_ids<T>(
284 &self,
285 field_id: LogicalFieldId,
286 predicate: &Predicate<T::Value>,
287 ) -> Result<Vec<u64>>
288 where
289 T: FilterDispatch,
290 {
291 tracing::trace!(field=?field_id, "filter_row_ids start");
292 let res = T::run_filter(self, field_id, predicate);
293 if let Err(ref err) = res {
294 tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
295 } else {
296 tracing::trace!(field=?field_id, "filter_row_ids ok");
297 }
298 res
299 }
300
301 pub fn filter_matches<T, F>(
303 &self,
304 field_id: LogicalFieldId,
305 predicate: F,
306 ) -> Result<FilterResult>
307 where
308 T: FilterPrimitive,
309 F: FnMut(T::Native) -> bool,
310 {
311 T::run_filter_with_result(self, field_id, predicate)
312 }
313
314 pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
323 let catalog = self.catalog.read().unwrap();
324 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
325
326 let desc_blob = self
327 .pager
328 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
329 .pop()
330 .and_then(|r| match r {
331 GetResult::Raw { bytes, .. } => Some(bytes),
332 _ => None,
333 })
334 .ok_or(Error::NotFound)?;
335 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
336
337 let kinds = descriptor.get_indexes()?;
338 Ok(kinds)
339 }
340
341 pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
350 let catalog = self.catalog.read().unwrap();
351 let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
352 drop(catalog);
353
354 let desc_blob = self
355 .pager
356 .batch_get(&[BatchGet::Raw { key: desc_pk }])?
357 .pop()
358 .and_then(|r| match r {
359 GetResult::Raw { bytes, .. } => Some(bytes),
360 _ => None,
361 })
362 .ok_or(Error::NotFound)?;
363
364 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
365 Ok(desc.total_row_count)
366 }
367
368 pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
377 use crate::types::Namespace;
378 let catalog = self.catalog.read().unwrap();
380 let candidates: Vec<LogicalFieldId> = catalog
382 .map
383 .keys()
384 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
385 .copied()
386 .collect();
387 drop(catalog);
388
389 if candidates.is_empty() {
390 return Ok(0);
391 }
392
393 let mut max_rows: u64 = 0;
395 for field in candidates {
396 let rows = self.total_rows_for_field(field)?;
397 if rows > max_rows {
398 max_rows = rows;
399 }
400 }
401 Ok(max_rows)
402 }
403
404 pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
410 use crate::types::Namespace;
411
412 let catalog = self.catalog.read().unwrap();
413 catalog
414 .map
415 .keys()
416 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
417 .copied()
418 .collect()
419 }
420
421 pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
431 let rid_fid = rowid_fid(field_id);
432 let catalog = self.catalog.read().unwrap();
433 let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
434 let rid_desc_blob = self
435 .pager
436 .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
437 .pop()
438 .and_then(|r| match r {
439 GetResult::Raw { bytes, .. } => Some(bytes),
440 _ => None,
441 })
442 .ok_or(Error::NotFound)?;
443 let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
444 drop(catalog);
445
446 for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
448 let meta = m?;
449 if meta.row_count == 0 {
450 continue;
451 }
452 if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
453 || row_id > meta.max_val_u64
454 {
455 continue;
456 }
457 let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
459 if meta.value_order_perm_pk != 0 {
460 gets.push(BatchGet::Raw {
461 key: meta.value_order_perm_pk,
462 });
463 }
464 let results = self.pager.batch_get(&gets)?;
465 let mut rid_blob: Option<EntryHandle> = None;
466 let mut perm_blob: Option<EntryHandle> = None;
467 for r in results {
468 if let GetResult::Raw { key, bytes } = r {
469 if key == meta.chunk_pk {
470 rid_blob = Some(bytes);
471 } else if key == meta.value_order_perm_pk {
472 perm_blob = Some(bytes);
473 }
474 }
475 }
476 let Some(rid_blob) = rid_blob else { continue };
478 let rid_any = deserialize_array(rid_blob)?;
479 let rids = rid_any
480 .as_any()
481 .downcast_ref::<UInt64Array>()
482 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
483 if let Some(pblob) = perm_blob {
484 let perm_any = deserialize_array(pblob)?;
485 let perm = perm_any
486 .as_any()
487 .downcast_ref::<UInt32Array>()
488 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
489 let mut lo: isize = 0;
491 let mut hi: isize = (perm.len() as isize) - 1;
492 while lo <= hi {
493 let mid = ((lo + hi) >> 1) as usize;
494 let rid = rids.value(perm.value(mid) as usize);
495 if rid == row_id {
496 return Ok(true);
497 } else if rid < row_id {
498 lo = mid as isize + 1;
499 } else {
500 hi = mid as isize - 1;
501 }
502 }
503 } else {
504 let mut lo: isize = 0;
506 let mut hi: isize = (rids.len() as isize) - 1;
507 while lo <= hi {
508 let mid = ((lo + hi) >> 1) as usize;
509 let rid = rids.value(mid);
510 if rid == row_id {
511 return Ok(true);
512 } else if rid < row_id {
513 lo = mid as isize + 1;
514 } else {
515 hi = mid as isize - 1;
516 }
517 }
518 }
519 }
520 Ok(false)
521 }
522
523 #[allow(unused_variables, unused_assignments)] pub fn append(&self, batch: &RecordBatch) -> Result<()> {
554 tracing::trace!(
555 num_columns = batch.num_columns(),
556 num_rows = batch.num_rows(),
557 "ColumnStore::append BEGIN"
558 );
559 let working_batch: RecordBatch;
565 let batch_ref = {
566 let schema = batch.schema();
567 let row_id_idx = schema
568 .index_of(ROW_ID_COLUMN_NAME)
569 .map_err(|_| Error::Internal("row_id column required".into()))?;
570 let row_id_any = batch.column(row_id_idx).clone();
571 let row_id_arr = row_id_any
572 .as_any()
573 .downcast_ref::<UInt64Array>()
574 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
575
576 let mut is_sorted = true;
578 if !row_id_arr.is_empty() {
579 let mut last = row_id_arr.value(0);
580 for i in 1..row_id_arr.len() {
581 let current = row_id_arr.value(i);
582 if current < last {
583 is_sorted = false;
584 break;
585 }
586 last = current;
587 }
588 }
589
590 if is_sorted {
593 batch
594 } else {
595 let sort_col = SortColumn {
596 values: row_id_any,
597 options: None,
598 };
599 let idx = lexsort_to_indices(&[sort_col], None)?;
600 let perm = idx
601 .as_any()
602 .downcast_ref::<UInt32Array>()
603 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
604 let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
605 for i in 0..batch.num_columns() {
606 cols.push(compute::take(batch.column(i), perm, None)?);
607 }
608 working_batch = RecordBatch::try_new(schema.clone(), cols)
609 .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
610 &working_batch
611 }
612 };
613
614 tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
615
616 let schema = batch_ref.schema();
621 let row_id_idx = schema
622 .index_of(ROW_ID_COLUMN_NAME)
623 .map_err(|_| Error::Internal("row_id column required".into()))?;
624
625 let row_id_arr = batch_ref
627 .column(row_id_idx)
628 .as_any()
629 .downcast_ref::<UInt64Array>()
630 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
631 let mut incoming_ids_map = FxHashMap::default();
632 incoming_ids_map.reserve(row_id_arr.len());
633 for i in 0..row_id_arr.len() {
634 incoming_ids_map.insert(row_id_arr.value(i), i);
635 }
636
637 let mut catalog_dirty = false;
639 let mut puts_rewrites: Vec<BatchPut> = Vec::new();
640 let mut all_rewritten_ids = FxHashSet::default();
641
642 let mut catalog_lock = self.catalog.write().unwrap();
644 for i in 0..batch_ref.num_columns() {
645 if i == row_id_idx {
646 continue;
647 }
648 let field = schema.field(i);
649 if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
650 let field_id = field_id_str
651 .parse::<u64>()
652 .map(LogicalFieldId::from)
653 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
654
655 let rewritten = self.lww_rewrite_for_field(
658 &mut catalog_lock,
659 field_id,
660 &incoming_ids_map,
661 batch_ref.column(i),
662 batch_ref.column(row_id_idx),
663 &mut puts_rewrites,
664 )?;
665 all_rewritten_ids.extend(rewritten);
666 }
667 }
668 drop(catalog_lock);
669
670 if !puts_rewrites.is_empty() {
672 self.pager.batch_put(&puts_rewrites)?;
673 }
674
675 tracing::trace!("ColumnStore::append PHASE 2 complete - LWW rewrites done");
676
677 let batch_to_append = if !all_rewritten_ids.is_empty() {
681 let keep_mask: Vec<bool> = (0..row_id_arr.len())
682 .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
683 .collect();
684 let keep_array = BooleanArray::from(keep_mask);
685 compute::filter_record_batch(batch_ref, &keep_array)?
686 } else {
687 batch_ref.clone()
688 };
689
690 if batch_to_append.num_rows() == 0 {
692 tracing::trace!("ColumnStore::append early exit - no new rows to append");
693 return Ok(());
694 }
695
696 tracing::trace!("ColumnStore::append PHASE 3 complete - filtered for new rows");
697
698 let append_schema = batch_to_append.schema();
702 let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
703 let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
704 let mut puts_appends: Vec<BatchPut> = Vec::new();
705
706 for (i, array) in batch_to_append.columns().iter().enumerate() {
708 if i == append_row_id_idx {
709 continue;
710 }
711
712 let field = append_schema.field(i);
713
714 let field_id = field
715 .metadata()
716 .get(crate::store::FIELD_ID_META_KEY)
717 .ok_or_else(|| Error::Internal("Missing field_id".into()))?
718 .parse::<u64>()
719 .map(LogicalFieldId::from)
720 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
721
722 self.dtype_cache.insert(field_id, field.data_type().clone());
725
726 let (array_clean, rids_clean) = if array.null_count() == 0 {
729 (array.clone(), append_row_id_any.clone())
730 } else {
731 let keep =
732 BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
733 let a = compute::filter(array, &keep)?;
734 let r = compute::filter(&append_row_id_any, &keep)?;
735 (a, r)
736 };
737
738 if array_clean.is_empty() {
739 continue;
740 }
741
742 let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
745 let mut catalog = self.catalog.write().unwrap();
746 let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
747 catalog_dirty = true;
748 self.pager.alloc_many(1).unwrap()[0]
749 });
750 let r_fid = rowid_fid(field_id);
751 let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
752 catalog_dirty = true;
753 self.pager.alloc_many(1).unwrap()[0]
754 });
755 (pk1, pk2, r_fid)
756 };
757
758 let (mut data_descriptor, mut data_tail_page) =
761 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
762 .map_err(|e| {
763 tracing::error!(
764 ?field_id,
765 descriptor_pk,
766 error = ?e,
767 "append: load_or_create failed for data descriptor"
768 );
769 e
770 })?;
771 let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
772 Arc::clone(&self.pager),
773 rid_descriptor_pk,
774 rid_fid,
775 )
776 .map_err(|e| {
777 tracing::error!(
778 ?rid_fid,
779 rid_descriptor_pk,
780 error = ?e,
781 "append: load_or_create failed for rid descriptor"
782 );
783 e
784 })?;
785
786 self.index_manager
790 .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
791
792 let slices = split_to_target_bytes(
794 &array_clean,
795 TARGET_CHUNK_BYTES,
796 self.cfg.varwidth_fallback_rows_per_slice,
797 );
798 let mut row_off = 0usize;
799
800 for s in slices {
802 let rows = s.len();
803 let data_pk = self.pager.alloc_many(1)?[0];
805 let s_norm = zero_offset(&s);
806 let data_bytes = serialize_array(s_norm.as_ref())?;
807 puts_appends.push(BatchPut::Raw {
808 key: data_pk,
809 bytes: data_bytes,
810 });
811
812 let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
814 let rid_norm = zero_offset(&rid_slice);
815 let rid_pk = self.pager.alloc_many(1)?[0];
816 let rid_bytes = serialize_array(rid_norm.as_ref())?;
817 puts_appends.push(BatchPut::Raw {
818 key: rid_pk,
819 bytes: rid_bytes,
820 });
821
822 let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
824 let (min, max) = if !rids_for_meta.is_empty() {
825 let mut min_val = rids_for_meta.value(0);
826 let mut max_val = rids_for_meta.value(0);
827 for i in 1..rids_for_meta.len() {
828 let v = rids_for_meta.value(i);
829 if v < min_val {
830 min_val = v;
831 }
832 if v > max_val {
833 max_val = v;
834 }
835 }
836 (min_val, max_val)
837 } else {
838 (0, 0)
839 };
840
841 let mut data_meta = ChunkMetadata {
844 chunk_pk: data_pk,
845 row_count: rows as u64,
846 serialized_bytes: s_norm.get_array_memory_size() as u64,
847 max_val_u64: u64::MAX,
848 ..Default::default()
849 };
850 let mut rid_meta = ChunkMetadata {
851 chunk_pk: rid_pk,
852 row_count: rows as u64,
853 serialized_bytes: rid_norm.get_array_memory_size() as u64,
854 min_val_u64: min,
855 max_val_u64: max,
856 ..Default::default()
857 };
858
859 self.index_manager.stage_updates_for_new_chunk(
865 field_id,
866 &data_descriptor,
867 &s_norm,
868 &rid_norm,
869 &mut data_meta,
870 &mut rid_meta,
871 &mut puts_appends,
872 )?;
873
874 self.append_meta_in_loop(
876 &mut data_descriptor,
877 &mut data_tail_page,
878 data_meta,
879 &mut puts_appends,
880 )?;
881 self.append_meta_in_loop(
882 &mut rid_descriptor,
883 &mut rid_tail_page,
884 rid_meta,
885 &mut puts_appends,
886 )?;
887 row_off += rows;
888 }
889
890 puts_appends.push(BatchPut::Raw {
893 key: data_descriptor.tail_page_pk,
894 bytes: data_tail_page,
895 });
896 puts_appends.push(BatchPut::Raw {
897 key: descriptor_pk,
898 bytes: data_descriptor.to_le_bytes(),
899 });
900 puts_appends.push(BatchPut::Raw {
901 key: rid_descriptor.tail_page_pk,
902 bytes: rid_tail_page,
903 });
904 puts_appends.push(BatchPut::Raw {
905 key: rid_descriptor_pk,
906 bytes: rid_descriptor.to_le_bytes(),
907 });
908 }
909
910 if catalog_dirty {
913 let catalog = self.catalog.read().unwrap();
914 puts_appends.push(BatchPut::Raw {
915 key: CATALOG_ROOT_PKEY,
916 bytes: catalog.to_bytes(),
917 });
918 }
919
920 if !puts_appends.is_empty() {
924 self.pager.batch_put(&puts_appends)?;
925 }
926 tracing::trace!("ColumnStore::append END - success");
927 Ok(())
928 }
929
930 fn lww_rewrite_for_field(
931 &self,
932 catalog: &mut ColumnCatalog,
933 field_id: LogicalFieldId,
934 incoming_ids_map: &FxHashMap<u64, usize>,
935 incoming_data: &ArrayRef,
936 incoming_row_ids: &ArrayRef,
937 puts: &mut Vec<BatchPut>,
938 ) -> Result<FxHashSet<u64>> {
939 use crate::store::descriptor::DescriptorIterator;
940 use crate::store::ingest::ChunkEdit;
941
942 if incoming_ids_map.is_empty() {
944 return Ok(FxHashSet::default());
945 }
946 let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
947
948 let desc_pk_data = match catalog.map.get(&field_id) {
950 Some(pk) => *pk,
951 None => return Ok(FxHashSet::default()),
952 };
953 let rid_fid = rowid_fid(field_id);
954 let desc_pk_rid = match catalog.map.get(&rid_fid) {
955 Some(pk) => *pk,
956 None => return Ok(FxHashSet::default()),
957 };
958
959 let gets = vec![
961 BatchGet::Raw { key: desc_pk_data },
962 BatchGet::Raw { key: desc_pk_rid },
963 ];
964 let results = self.pager.batch_get(&gets)?;
965 let mut blobs_by_pk = FxHashMap::default();
966 for r in results {
967 if let GetResult::Raw { key, bytes } = r {
968 blobs_by_pk.insert(key, bytes);
969 }
970 }
971
972 let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
973 tracing::error!(
974 ?field_id,
975 desc_pk_data,
976 "lww_rewrite: data descriptor blob not found in pager"
977 );
978 Error::NotFound
979 })?;
980 let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
981
982 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
983 tracing::error!(
984 ?rid_fid,
985 desc_pk_rid,
986 "lww_rewrite: rid descriptor blob not found in pager"
987 );
988 Error::NotFound
989 })?;
990 let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
991
992 tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
993
994 let mut metas_data: Vec<ChunkMetadata> = Vec::new();
996 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
997 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
998 metas_data.push(m.map_err(|e| {
999 tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1000 e
1001 })?);
1002 }
1003 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1004 metas_rid.push(m.map_err(|e| {
1005 tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1006 e
1007 })?);
1008 }
1009
1010 tracing::trace!(
1011 ?field_id,
1012 data_chunks = metas_data.len(),
1013 rid_chunks = metas_rid.len(),
1014 "lww_rewrite: chunk metadata collected"
1015 );
1016
1017 let rid_in = incoming_row_ids
1019 .as_any()
1020 .downcast_ref::<UInt64Array>()
1021 .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1022 let mut ids_to_delete = FxHashSet::default();
1023 let mut ids_to_upsert = FxHashSet::default();
1024 for i in 0..rid_in.len() {
1025 let rid = rid_in.value(i);
1026 if incoming_data.is_null(i) {
1027 ids_to_delete.insert(rid);
1028 } else {
1029 ids_to_upsert.insert(rid);
1030 }
1031 }
1032
1033 let mut rewritten_ids = FxHashSet::default();
1035 let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1036 let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1037
1038 let n = metas_data.len().min(metas_rid.len());
1039 if n > 0 {
1040 let mut gets_rid = Vec::with_capacity(n);
1042 for rm in metas_rid.iter().take(n) {
1043 gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
1044 }
1045 let rid_results = self.pager.batch_get(&gets_rid)?;
1046 let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1047 for r in rid_results {
1048 if let GetResult::Raw { key, bytes } = r {
1049 rid_blobs.insert(key, bytes);
1050 }
1051 }
1052
1053 for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
1054 if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1055 let rid_arr_any = deserialize_array(rid_blob.clone())?;
1056 let rid_arr = rid_arr_any
1057 .as_any()
1058 .downcast_ref::<UInt64Array>()
1059 .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1060 for j in 0..rid_arr.len() {
1061 let rid = rid_arr.value(j);
1062 if incoming_ids.contains(&rid) {
1063 if ids_to_delete.contains(&rid) {
1064 hit_del.entry(i).or_default().push(rid);
1065 } else if ids_to_upsert.contains(&rid) {
1066 hit_up.entry(i).or_default().push(rid);
1067 }
1068 rewritten_ids.insert(rid);
1069 }
1070 }
1071 }
1072 }
1073 }
1074
1075 if hit_up.is_empty() && hit_del.is_empty() {
1076 return Ok(rewritten_ids);
1077 }
1078
1079 let mut hit_set = FxHashSet::default();
1081 hit_set.extend(hit_up.keys().copied());
1082 hit_set.extend(hit_del.keys().copied());
1083 let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1084
1085 let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1086 for &i in &hit_idxs {
1087 gets.push(BatchGet::Raw {
1088 key: metas_data[i].chunk_pk,
1089 });
1090 gets.push(BatchGet::Raw {
1091 key: metas_rid[i].chunk_pk,
1092 });
1093 }
1094 let results = self.pager.batch_get(&gets)?;
1095 let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1096 for r in results {
1097 if let GetResult::Raw { key, bytes } = r {
1098 blob_map.insert(key, bytes);
1099 }
1100 }
1101
1102 for i in hit_idxs {
1104 let old_data_arr =
1105 deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1106 let old_rid_arr_any =
1107 deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1108 let old_rid_arr = old_rid_arr_any
1109 .as_any()
1110 .downcast_ref::<UInt64Array>()
1111 .unwrap();
1112
1113 let up_vec = hit_up.remove(&i).unwrap_or_default();
1114 let del_vec = hit_del.remove(&i).unwrap_or_default();
1115
1116 let edit = ChunkEdit::from_lww_upsert(
1118 old_rid_arr,
1119 &up_vec,
1120 &del_vec,
1121 incoming_data,
1122 incoming_row_ids,
1123 incoming_ids_map,
1124 )?;
1125
1126 let (new_data_arr, new_rid_arr) =
1127 ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1128
1129 let data_bytes = serialize_array(&new_data_arr)?;
1131 puts.push(BatchPut::Raw {
1132 key: metas_data[i].chunk_pk,
1133 bytes: data_bytes,
1134 });
1135 metas_data[i].row_count = new_data_arr.len() as u64;
1136 metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1137
1138 if let Some(rarr) = new_rid_arr {
1140 let rid_bytes = serialize_array(&rarr)?;
1141 puts.push(BatchPut::Raw {
1142 key: metas_rid[i].chunk_pk,
1143 bytes: rid_bytes,
1144 });
1145 metas_rid[i].row_count = rarr.len() as u64;
1146 metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1147 }
1148
1149 if metas_data[i].value_order_perm_pk != 0 {
1151 let sort_col = SortColumn {
1152 values: new_data_arr,
1153 options: None,
1154 };
1155 let idx = lexsort_to_indices(&[sort_col], None)?;
1156 let perm_bytes = serialize_array(&idx)?;
1157 puts.push(BatchPut::Raw {
1158 key: metas_data[i].value_order_perm_pk,
1159 bytes: perm_bytes,
1160 });
1161 }
1162 }
1163
1164 descriptor_data.rewrite_pages(
1166 Arc::clone(&self.pager),
1167 desc_pk_data,
1168 &mut metas_data,
1169 puts,
1170 )?;
1171 descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1172
1173 Ok(rewritten_ids)
1174 }
1175
1176 fn stage_delete_rows_for_field(
1177 &self,
1178 field_id: LogicalFieldId,
1179 rows_to_delete: &[RowId],
1180 staged_puts: &mut Vec<BatchPut>,
1181 ) -> Result<bool> {
1182 tracing::warn!(
1183 field_id = ?field_id,
1184 rows = rows_to_delete.len(),
1185 "delete_rows stage_delete_rows_for_field: start"
1186 );
1187 use crate::store::descriptor::DescriptorIterator;
1188 use crate::store::ingest::ChunkEdit;
1189
1190 if rows_to_delete.is_empty() {
1191 return Ok(false);
1192 }
1193
1194 let mut del_iter = rows_to_delete.iter().copied();
1196 let mut cur_del = del_iter.next();
1197 let mut last_seen: Option<u64> = cur_del;
1198
1199 let catalog = self.catalog.read().unwrap();
1201 let desc_pk = match catalog.map.get(&field_id) {
1202 Some(pk) => *pk,
1203 None => {
1204 tracing::trace!(
1205 field_id = ?field_id,
1206 "delete_rows stage_delete_rows_for_field: data descriptor missing"
1207 );
1208 return Err(Error::NotFound);
1209 }
1210 };
1211 let rid_fid = rowid_fid(field_id);
1212 let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1213 tracing::warn!(
1214 field_id = ?field_id,
1215 desc_pk,
1216 desc_pk_rid = ?desc_pk_rid,
1217 "delete_rows stage_delete_rows_for_field: descriptor keys"
1218 );
1219
1220 let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1222 if let Some(pk) = desc_pk_rid {
1223 gets.push(BatchGet::Raw { key: pk });
1224 }
1225 let results = match self.pager.batch_get(&gets) {
1226 Ok(res) => res,
1227 Err(err) => {
1228 tracing::trace!(
1229 field_id = ?field_id,
1230 error = ?err,
1231 "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1232 );
1233 return Err(err);
1234 }
1235 };
1236 let mut blobs_by_pk = FxHashMap::default();
1237 for res in results {
1238 if let GetResult::Raw { key, bytes } = res {
1239 blobs_by_pk.insert(key, bytes);
1240 }
1241 }
1242
1243 tracing::warn!(
1244 field_id = ?field_id,
1245 desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1246 rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1247 "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1248 );
1249
1250 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1251 tracing::trace!(
1252 field_id = ?field_id,
1253 desc_pk,
1254 "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1255 );
1256 Error::Internal(format!(
1257 "descriptor pk={} missing during delete_rows for field {:?}",
1258 desc_pk, field_id
1259 ))
1260 })?;
1261 let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1262
1263 let mut metas: Vec<ChunkMetadata> = Vec::new();
1265 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1266 metas.push(m?);
1267 }
1268 if metas.is_empty() {
1269 drop(catalog);
1270 return Ok(false);
1271 }
1272
1273 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1275 let mut descriptor_rid: Option<ColumnDescriptor> = None;
1276 tracing::warn!(
1277 field_id = ?field_id,
1278 metas_len = metas.len(),
1279 desc_pk_rid = ?desc_pk_rid,
1280 "delete_rows stage_delete_rows_for_field: data metas loaded"
1281 );
1282 if let Some(pk_rid) = desc_pk_rid
1283 && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1284 {
1285 let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1286 for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1287 metas_rid.push(m?);
1288 }
1289 descriptor_rid = Some(d_rid);
1290 }
1291
1292 tracing::warn!(
1293 field_id = ?field_id,
1294 metas_rid_len = metas_rid.len(),
1295 "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1296 );
1297
1298 let mut cum_rows = 0u64;
1299 let mut any_changed = false;
1300
1301 for (i, meta) in metas.iter_mut().enumerate() {
1302 let start_u64 = cum_rows;
1303 let end_u64 = start_u64 + meta.row_count;
1304
1305 while let Some(d) = cur_del {
1307 if d < start_u64
1308 && let Some(prev) = last_seen
1309 {
1310 if d < prev {
1311 return Err(Error::Internal(
1312 "rows_to_delete must be ascending/unique".into(),
1313 ));
1314 }
1315
1316 last_seen = Some(d);
1317 cur_del = del_iter.next();
1318 } else {
1319 break;
1320 }
1321 }
1322
1323 let rows = meta.row_count as usize;
1325 let mut del_local: FxHashSet<usize> = FxHashSet::default();
1326 while let Some(d) = cur_del {
1327 if d >= end_u64 {
1328 break;
1329 }
1330 del_local.insert((d - start_u64) as usize);
1331 last_seen = Some(d);
1332 cur_del = del_iter.next();
1333 }
1334
1335 if del_local.is_empty() {
1336 cum_rows = end_u64;
1337 continue;
1338 }
1339
1340 let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1342 if let Some(rm) = metas_rid.get(i) {
1343 chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1344 }
1345 let chunk_results = match self.pager.batch_get(&chunk_gets) {
1346 Ok(res) => res,
1347 Err(err) => {
1348 tracing::trace!(
1349 field_id = ?field_id,
1350 chunk_pk = meta.chunk_pk,
1351 error = ?err,
1352 "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1353 );
1354 return Err(err);
1355 }
1356 };
1357 let mut chunk_blobs = FxHashMap::default();
1358 for res in chunk_results {
1359 if let GetResult::Raw { key, bytes } = res {
1360 chunk_blobs.insert(key, bytes);
1361 }
1362 }
1363
1364 tracing::warn!(
1365 field_id = ?field_id,
1366 chunk_pk = meta.chunk_pk,
1367 rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1368 data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1369 rid_found = metas_rid
1370 .get(i)
1371 .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1372 "delete_rows stage_delete_rows_for_field: chunk fetch status"
1373 );
1374
1375 let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1376 Some(bytes) => bytes,
1377 None => {
1378 tracing::trace!(
1379 field_id = ?field_id,
1380 chunk_pk = meta.chunk_pk,
1381 "delete_rows stage_delete_rows_for_field: chunk missing"
1382 );
1383 return Err(Error::NotFound);
1384 }
1385 };
1386 let data_arr = deserialize_array(data_blob)?;
1387
1388 let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1389 let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1390 Some(bytes) => bytes,
1391 None => {
1392 tracing::trace!(
1393 field_id = ?field_id,
1394 rowid_chunk_pk = rm.chunk_pk,
1395 "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1396 );
1397 return Err(Error::NotFound);
1398 }
1399 };
1400 Some(deserialize_array(rid_blob)?)
1401 } else {
1402 None
1403 };
1404
1405 let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1407
1408 let (new_data_arr, new_rid_arr) =
1410 ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1411
1412 let data_bytes = serialize_array(&new_data_arr)?;
1414 staged_puts.push(BatchPut::Raw {
1415 key: meta.chunk_pk,
1416 bytes: data_bytes,
1417 });
1418 meta.row_count = new_data_arr.len() as u64;
1419 meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1420
1421 if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1423 let rm = metas_rid.get_mut(i).unwrap();
1424 let rid_bytes = serialize_array(&rids)?;
1425 staged_puts.push(BatchPut::Raw {
1426 key: rm.chunk_pk,
1427 bytes: rid_bytes,
1428 });
1429 rm.row_count = rids.len() as u64;
1430 rm.serialized_bytes = rids.get_array_memory_size() as u64;
1431 }
1432
1433 if meta.value_order_perm_pk != 0 {
1435 let sort_column = SortColumn {
1436 values: new_data_arr,
1437 options: None,
1438 };
1439 let indices = lexsort_to_indices(&[sort_column], None)?;
1440 let perm_bytes = serialize_array(&indices)?;
1441 staged_puts.push(BatchPut::Raw {
1442 key: meta.value_order_perm_pk,
1443 bytes: perm_bytes,
1444 });
1445 }
1446
1447 cum_rows = end_u64;
1448 any_changed = true;
1449 }
1450
1451 descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1453 if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1454 rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1455 }
1456 drop(catalog);
1457 tracing::trace!(
1458 field_id = ?field_id,
1459 changed = any_changed,
1460 "delete_rows stage_delete_rows_for_field: finished stage"
1461 );
1462 Ok(any_changed)
1463 }
1464
1465 pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1471 if fields.is_empty() || rows_to_delete.is_empty() {
1472 return Ok(());
1473 }
1474
1475 let mut puts = Vec::new();
1476 let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1477 let mut table_id: Option<TableId> = None;
1478
1479 tracing::warn!(
1480 fields = fields.len(),
1481 rows = rows_to_delete.len(),
1482 "delete_rows begin"
1483 );
1484 for field_id in fields {
1485 tracing::warn!(field = ?field_id, "delete_rows iter field");
1486 if let Some(expected) = table_id {
1487 if field_id.table_id() != expected {
1488 return Err(Error::InvalidArgumentError(
1489 "delete_rows requires fields from the same table".into(),
1490 ));
1491 }
1492 } else {
1493 table_id = Some(field_id.table_id());
1494 }
1495
1496 if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1497 touched.insert(*field_id);
1498 }
1499 }
1500
1501 if puts.is_empty() {
1502 return Ok(());
1503 }
1504
1505 self.pager.batch_put(&puts)?;
1506
1507 tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1508
1509 for field_id in touched {
1510 self.compact_field_bounded(field_id)?;
1511 }
1512 tracing::warn!("delete_rows complete");
1513 Ok(())
1514 }
1515
1516 pub(crate) fn write_descriptor_chain(
1520 &self,
1521 descriptor_pk: PhysicalKey,
1522 descriptor: &mut ColumnDescriptor,
1523 new_metas: &[ChunkMetadata],
1524 puts: &mut Vec<BatchPut>,
1525 frees: &mut Vec<PhysicalKey>,
1526 ) -> Result<()> {
1527 let mut old_pages = Vec::new();
1529 let mut pk = descriptor.head_page_pk;
1530 while pk != 0 {
1531 let page_blob = self
1532 .pager
1533 .batch_get(&[BatchGet::Raw { key: pk }])?
1534 .pop()
1535 .and_then(|res| match res {
1536 GetResult::Raw { bytes, .. } => Some(bytes),
1537 _ => None,
1538 })
1539 .ok_or(Error::NotFound)?;
1540 let header = DescriptorPageHeader::from_le_bytes(
1541 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1542 );
1543 old_pages.push(pk);
1544 pk = header.next_page_pk;
1545 }
1546
1547 let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1549 let need_pages = if new_metas.is_empty() {
1550 0
1551 } else {
1552 new_metas.len().div_ceil(per)
1553 };
1554 if need_pages == 0 {
1556 frees.extend(old_pages.iter().copied());
1557
1558 descriptor.head_page_pk = 0;
1563 descriptor.tail_page_pk = 0;
1564 descriptor.total_row_count = 0;
1565 descriptor.total_chunk_count = 0;
1566 puts.push(BatchPut::Raw {
1567 key: descriptor_pk,
1568 bytes: descriptor.to_le_bytes(),
1569 });
1570 return Ok(());
1571 }
1572
1573 let mut pages = Vec::with_capacity(need_pages);
1575 if !old_pages.is_empty() {
1576 pages.push(old_pages[0]);
1577 } else {
1578 pages.push(self.pager.alloc_many(1)?[0]);
1579 descriptor.head_page_pk = pages[0];
1580 }
1581 if need_pages > pages.len() {
1582 let extra = self.pager.alloc_many(need_pages - pages.len())?;
1583 pages.extend(extra);
1584 }
1585
1586 if old_pages.len() > need_pages {
1588 frees.extend(old_pages[need_pages..].iter().copied());
1589 }
1590
1591 let mut off = 0usize;
1593 for (i, page_pk) in pages.iter().copied().enumerate() {
1594 let remain = new_metas.len() - off;
1595 let count = remain.min(per);
1596 let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1597 let header = DescriptorPageHeader {
1598 next_page_pk: next,
1599 entry_count: count as u32,
1600 _padding: [0; 4],
1601 };
1602 let mut page_bytes = header.to_le_bytes().to_vec();
1603 for m in &new_metas[off..off + count] {
1604 page_bytes.extend_from_slice(&m.to_le_bytes());
1605 }
1606 puts.push(BatchPut::Raw {
1607 key: page_pk,
1608 bytes: page_bytes,
1609 });
1610 off += count;
1611 }
1612
1613 descriptor.tail_page_pk = *pages.last().unwrap();
1614 descriptor.total_chunk_count = new_metas.len() as u64;
1615 descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1616 puts.push(BatchPut::Raw {
1617 key: descriptor_pk,
1618 bytes: descriptor.to_le_bytes(),
1619 });
1620 Ok(())
1621 }
1622
1623 fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1627 let mut catalog = self.catalog.write().unwrap();
1629
1630 let desc_pk = match catalog.map.get(&field_id) {
1631 Some(&pk) => pk,
1632 None => return Ok(()),
1633 };
1634 let rid_fid = rowid_fid(field_id);
1635 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1636 Some(&pk) => pk,
1637 None => return Ok(()),
1638 };
1639
1640 let gets = vec![
1642 BatchGet::Raw { key: desc_pk },
1643 BatchGet::Raw { key: desc_pk_rid },
1644 ];
1645 let results = self.pager.batch_get(&gets)?;
1646 let mut blobs_by_pk = FxHashMap::default();
1647 for res in results {
1648 if let GetResult::Raw { key, bytes } = res {
1649 blobs_by_pk.insert(key, bytes);
1650 }
1651 }
1652 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1653 let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1654 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1655 let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1656
1657 let mut metas = Vec::new();
1659 for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1660 metas.push(m?);
1661 }
1662 let mut metas_rid = Vec::new();
1663 for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1664 metas_rid.push(m?);
1665 }
1666 if metas.is_empty() || metas_rid.is_empty() {
1667 return Ok(());
1668 }
1669
1670 let mut puts: Vec<BatchPut> = Vec::new();
1671 let mut frees: Vec<PhysicalKey> = Vec::new();
1672 let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1673 let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1674
1675 let mut i = 0usize;
1676 while i < metas.len() {
1677 let sz = metas[i].serialized_bytes as usize;
1678 if sz >= MIN_CHUNK_BYTES {
1680 new_metas.push(metas[i]);
1681 new_rid_metas.push(metas_rid[i]);
1682 i += 1;
1683 continue;
1684 }
1685
1686 let mut j = i;
1688 let mut run_bytes = 0usize;
1689 while j < metas.len() {
1690 let b = metas[j].serialized_bytes as usize;
1691 if b >= TARGET_CHUNK_BYTES {
1692 break;
1693 }
1694 if run_bytes + b > MAX_MERGE_RUN_BYTES {
1695 break;
1696 }
1697 run_bytes += b;
1698 j += 1;
1699 }
1700 if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1701 new_metas.push(metas[i]);
1702 new_rid_metas.push(metas_rid[i]);
1703 i += 1;
1704 continue;
1705 }
1706
1707 let mut gets = Vec::with_capacity((j - i) * 2);
1709 for k in i..j {
1710 gets.push(BatchGet::Raw {
1711 key: metas[k].chunk_pk,
1712 });
1713 gets.push(BatchGet::Raw {
1714 key: metas_rid[k].chunk_pk,
1715 });
1716 }
1717 let results = self.pager.batch_get(&gets)?;
1718 let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1719 for r in results {
1720 match r {
1721 GetResult::Raw { key, bytes } => {
1722 by_pk.insert(key, bytes);
1723 }
1724 _ => return Err(Error::NotFound),
1725 }
1726 }
1727 let mut data_parts = Vec::with_capacity(j - i);
1728 let mut rid_parts = Vec::with_capacity(j - i);
1729 for k in i..j {
1730 let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1731 data_parts.push(deserialize_array(db.clone())?);
1732 let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1733 rid_parts.push(deserialize_array(rb.clone())?);
1734 }
1735 let merged_data = concat_many(data_parts.iter().collect())?;
1736 let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1737
1738 let slices = split_to_target_bytes(
1740 &merged_data,
1741 TARGET_CHUNK_BYTES,
1742 self.cfg.varwidth_fallback_rows_per_slice,
1743 );
1744 let mut rid_off = 0usize;
1745 let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1746
1747 for s in slices {
1748 let rows = s.len();
1749 let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1751 let rid_norm = zero_offset(&rid_ref);
1752 let rid_pk = self.pager.alloc_many(1)?[0];
1753 let rid_bytes = serialize_array(rid_norm.as_ref())?;
1754
1755 let data_pk = self.pager.alloc_many(1)?[0];
1756 let s_norm = zero_offset(&s);
1757 let data_bytes = serialize_array(s_norm.as_ref())?;
1758 puts.push(BatchPut::Raw {
1759 key: data_pk,
1760 bytes: data_bytes,
1761 });
1762 puts.push(BatchPut::Raw {
1763 key: rid_pk,
1764 bytes: rid_bytes,
1765 });
1766 let mut meta = ChunkMetadata {
1767 chunk_pk: data_pk,
1768 row_count: rows as u64,
1769 serialized_bytes: s_norm.get_array_memory_size() as u64,
1770 max_val_u64: u64::MAX,
1771 ..Default::default()
1772 };
1773 if need_perms {
1775 let sort_col = SortColumn {
1776 values: s.clone(),
1777 options: None,
1778 };
1779 let idx = lexsort_to_indices(&[sort_col], None)?;
1780 let perm_bytes = serialize_array(&idx)?;
1781 let perm_pk = self.pager.alloc_many(1)?[0];
1782 puts.push(BatchPut::Raw {
1783 key: perm_pk,
1784 bytes: perm_bytes,
1785 });
1786 meta.value_order_perm_pk = perm_pk;
1787 }
1788
1789 let rid_any = rid_norm.clone();
1791 let rids = rid_any
1792 .as_any()
1793 .downcast_ref::<UInt64Array>()
1794 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1795 let mut min = u64::MAX;
1796 let mut max = 0u64;
1797 let mut sorted_rids = true;
1798 let mut last_v = 0u64;
1799 for ii in 0..rids.len() {
1800 let v = rids.value(ii);
1801 if ii == 0 {
1802 last_v = v;
1803 } else if v < last_v {
1804 sorted_rids = false;
1805 } else {
1806 last_v = v;
1807 }
1808 if v < min {
1809 min = v;
1810 }
1811 if v > max {
1812 max = v;
1813 }
1814 }
1815 let mut rid_perm_pk = 0u64;
1816 if !sorted_rids {
1817 let rid_sort_col = SortColumn {
1818 values: rid_any,
1819 options: None,
1820 };
1821 let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1822 let rid_perm_bytes = serialize_array(&rid_idx)?;
1823 rid_perm_pk = self.pager.alloc_many(1)?[0];
1824 puts.push(BatchPut::Raw {
1825 key: rid_perm_pk,
1826 bytes: rid_perm_bytes,
1827 });
1828 }
1829 let rid_meta = ChunkMetadata {
1830 chunk_pk: rid_pk,
1831 value_order_perm_pk: rid_perm_pk,
1832 row_count: rows as u64,
1833 serialized_bytes: rid_norm.get_array_memory_size() as u64,
1834 min_val_u64: if rows > 0 { min } else { 0 },
1835 max_val_u64: if rows > 0 { max } else { 0 },
1836 };
1837 new_metas.push(meta);
1838 new_rid_metas.push(rid_meta);
1839 rid_off += rows;
1840 }
1841
1842 for k in i..j {
1844 frees.push(metas[k].chunk_pk);
1845 if metas[k].value_order_perm_pk != 0 {
1846 frees.push(metas[k].value_order_perm_pk);
1847 }
1848 frees.push(metas_rid[k].chunk_pk);
1849 if metas_rid[k].value_order_perm_pk != 0 {
1850 frees.push(metas_rid[k].value_order_perm_pk);
1851 }
1852 }
1853
1854 i = j;
1855 }
1856
1857 if new_metas.is_empty() {
1859 self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1861 self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1862 catalog.map.remove(&field_id);
1863 catalog.map.remove(&rid_fid);
1864 puts.push(BatchPut::Raw {
1865 key: CATALOG_ROOT_PKEY,
1866 bytes: catalog.to_bytes(),
1867 });
1868 if !puts.is_empty() {
1869 self.pager.batch_put(&puts)?;
1870 }
1871 if !frees.is_empty() {
1872 self.pager.free_many(&frees)?;
1873 }
1874 return Ok(());
1875 }
1876
1877 self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1879 self.write_descriptor_chain(
1880 desc_pk_rid,
1881 &mut desc_rid,
1882 &new_rid_metas,
1883 &mut puts,
1884 &mut frees,
1885 )?;
1886 if !puts.is_empty() {
1888 self.pager.batch_put(&puts)?;
1889 }
1890 if !frees.is_empty() {
1891 self.pager.free_many(&frees)?;
1892 }
1893
1894 Ok(())
1895 }
1896
1897 fn append_meta_in_loop(
1900 &self,
1901 descriptor: &mut ColumnDescriptor,
1902 tail_page_bytes: &mut Vec<u8>,
1903 meta: ChunkMetadata,
1904 puts: &mut Vec<BatchPut>,
1905 ) -> Result<()> {
1906 let mut header = DescriptorPageHeader::from_le_bytes(
1907 &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1908 );
1909 if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1910 && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1911 {
1912 tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1914 header.entry_count += 1;
1915 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1916 .copy_from_slice(&header.to_le_bytes());
1917 } else {
1918 let new_tail_pk = self.pager.alloc_many(1)?[0];
1920 header.next_page_pk = new_tail_pk;
1921 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1922 .copy_from_slice(&header.to_le_bytes());
1923 let full_page_to_write = std::mem::take(tail_page_bytes);
1926 puts.push(BatchPut::Raw {
1927 key: descriptor.tail_page_pk,
1928 bytes: full_page_to_write,
1929 });
1930 let new_header = DescriptorPageHeader {
1932 next_page_pk: 0,
1933 entry_count: 1,
1934 _padding: [0; 4],
1935 };
1936 let mut new_page_bytes = new_header.to_le_bytes().to_vec();
1937 new_page_bytes.extend_from_slice(&meta.to_le_bytes());
1938 descriptor.tail_page_pk = new_tail_pk;
1940 *tail_page_bytes = new_page_bytes;
1941 }
1942
1943 descriptor.total_row_count += meta.row_count;
1944 descriptor.total_chunk_count += 1;
1945 Ok(())
1946 }
1947
1948 pub fn verify_integrity(&self) -> Result<()> {
1958 let catalog = self.catalog.read().unwrap();
1959 for (&field_id, &descriptor_pk) in &catalog.map {
1960 let desc_blob = self
1961 .pager
1962 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1963 .pop()
1964 .and_then(|r| match r {
1965 GetResult::Raw { bytes, .. } => Some(bytes),
1966 _ => None,
1967 })
1968 .ok_or_else(|| {
1969 Error::Internal(format!(
1970 "Catalog points to missing descriptor pk={}",
1971 descriptor_pk
1972 ))
1973 })?;
1974 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1975 if descriptor.field_id != field_id {
1976 return Err(Error::Internal(format!(
1977 "Descriptor at pk={} has wrong field_id: expected {:?}, \
1978 got {:?}",
1979 descriptor_pk, field_id, descriptor.field_id
1980 )));
1981 }
1982
1983 let mut actual_rows = 0;
1984 let mut actual_chunks = 0;
1985 let mut current_page_pk = descriptor.head_page_pk;
1986 while current_page_pk != 0 {
1987 let page_blob = self
1988 .pager
1989 .batch_get(&[BatchGet::Raw {
1990 key: current_page_pk,
1991 }])?
1992 .pop()
1993 .and_then(|r| match r {
1994 GetResult::Raw { bytes, .. } => Some(bytes),
1995 _ => None,
1996 })
1997 .ok_or_else(|| {
1998 Error::Internal(format!(
1999 "Descriptor page chain broken at pk={}",
2000 current_page_pk
2001 ))
2002 })?;
2003 let header = DescriptorPageHeader::from_le_bytes(
2004 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2005 );
2006 for i in 0..(header.entry_count as usize) {
2007 let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2008 let end = off + ChunkMetadata::DISK_SIZE;
2009 let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2010 actual_rows += meta.row_count;
2011 actual_chunks += 1;
2012 }
2013 current_page_pk = header.next_page_pk;
2014 }
2015
2016 if descriptor.total_row_count != actual_rows {
2017 return Err(Error::Internal(format!(
2018 "Row count mismatch for field {:?}: descriptor says {}, \
2019 actual is {}",
2020 field_id, descriptor.total_row_count, actual_rows
2021 )));
2022 }
2023 if descriptor.total_chunk_count != actual_chunks {
2024 return Err(Error::Internal(format!(
2025 "Chunk count mismatch for field {:?}: descriptor says {}, \
2026 actual is {}",
2027 field_id, descriptor.total_chunk_count, actual_chunks
2028 )));
2029 }
2030 }
2031 Ok(())
2032 }
2033
2034 pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2039 let catalog = self.catalog.read().unwrap();
2040 let mut all_stats = Vec::new();
2041
2042 for (&field_id, &descriptor_pk) in &catalog.map {
2043 let desc_blob = self
2044 .pager
2045 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2046 .pop()
2047 .and_then(|r| match r {
2048 GetResult::Raw { bytes, .. } => Some(bytes),
2049 _ => None,
2050 })
2051 .ok_or(Error::NotFound)?;
2052 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2053
2054 let mut page_stats = Vec::new();
2055 let mut current_page_pk = descriptor.head_page_pk;
2056 while current_page_pk != 0 {
2057 let page_blob = self
2058 .pager
2059 .batch_get(&[BatchGet::Raw {
2060 key: current_page_pk,
2061 }])?
2062 .pop()
2063 .and_then(|r| match r {
2064 GetResult::Raw { bytes, .. } => Some(bytes),
2065 _ => None,
2066 })
2067 .ok_or(Error::NotFound)?;
2068 let header = DescriptorPageHeader::from_le_bytes(
2069 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2070 );
2071 page_stats.push(DescriptorPageStats {
2072 page_pk: current_page_pk,
2073 entry_count: header.entry_count,
2074 page_size_bytes: page_blob.as_ref().len(),
2075 });
2076 current_page_pk = header.next_page_pk;
2077 }
2078
2079 all_stats.push(ColumnLayoutStats {
2080 field_id,
2081 total_rows: descriptor.total_row_count,
2082 total_chunks: descriptor.total_chunk_count,
2083 pages: page_stats,
2084 });
2085 }
2086 Ok(all_stats)
2087 }
2088}