1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4 ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16 constants::CATALOG_ROOT_PKEY,
17 pager::{BatchGet, BatchPut, GetResult, Pager},
18 serialization::{deserialize_array, serialize_array},
19 types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26pub struct ColumnStore<P: Pager> {
51 pub(crate) pager: Arc<P>,
52 pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
53 cfg: ColumnStoreConfig,
54 dtype_cache: DTypeCache<P>,
55 index_manager: IndexManager<P>,
56}
57
58impl<P> Clone for ColumnStore<P>
59where
60 P: Pager<Blob = EntryHandle> + Send + Sync,
61{
62 fn clone(&self) -> Self {
63 Self {
64 pager: Arc::clone(&self.pager),
65 catalog: Arc::clone(&self.catalog),
66 cfg: self.cfg.clone(),
67 dtype_cache: self.dtype_cache.clone(),
68 index_manager: self.index_manager.clone(),
69 }
70 }
71}
72
73impl<P> ColumnStore<P>
74where
75 P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77 pub fn open(pager: Arc<P>) -> Result<Self> {
87 let cfg = ColumnStoreConfig::default();
88 let catalog = match pager
89 .batch_get(&[BatchGet::Raw {
90 key: CATALOG_ROOT_PKEY,
91 }])?
92 .pop()
93 {
94 Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
95 _ => ColumnCatalog::default(),
96 };
97 let arc_catalog = Arc::new(RwLock::new(catalog));
98
99 let index_manager = IndexManager::new(Arc::clone(&pager));
100
101 Ok(Self {
102 pager: Arc::clone(&pager),
103 catalog: Arc::clone(&arc_catalog),
104 cfg,
105 dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
106 index_manager,
107 })
108 }
109
110 pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
120 self.index_manager.register_index(self, field_id, kind)
121 }
122
123 pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
125 let catalog = self.catalog.read().unwrap();
126 catalog.map.contains_key(&field_id)
127 }
128
129 pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
138 self.index_manager.unregister_index(self, field_id, kind)
139 }
140
141 pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
150 if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
151 return Ok(dt);
152 }
153 self.dtype_cache.dtype_for_field(field_id)
154 }
155
156 pub fn update_data_type(
165 &self,
166 field_id: LogicalFieldId,
167 new_data_type: &DataType,
168 ) -> Result<()> {
169 let descriptor_pk = {
171 let catalog = self.catalog.read().unwrap();
172 *catalog.map.get(&field_id).ok_or_else(|| Error::NotFound)?
173 };
174
175 let mut descriptor = match self
177 .pager
178 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
179 .pop()
180 {
181 Some(GetResult::Raw { bytes, .. }) => ColumnDescriptor::from_le_bytes(bytes.as_ref()),
182 _ => return Err(Error::NotFound),
183 };
184
185 let new_fingerprint = DTypeCache::<P>::dtype_fingerprint(new_data_type);
187 if new_fingerprint != 0 {
188 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, new_fingerprint);
189 }
190
191 self.pager.batch_put(&[BatchPut::Raw {
193 key: descriptor_pk,
194 bytes: descriptor.to_le_bytes(),
195 }])?;
196
197 self.dtype_cache.insert(field_id, new_data_type.clone());
199
200 Ok(())
201 }
202
203 pub fn ensure_column_registered(
209 &self,
210 field_id: LogicalFieldId,
211 data_type: &DataType,
212 ) -> Result<()> {
213 let rid_field_id = rowid_fid(field_id);
214
215 let mut catalog_dirty = false;
216 let descriptor_pk;
217 let rid_descriptor_pk;
218
219 {
220 let mut catalog = self.catalog.write().unwrap();
221 descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
222 pk
223 } else {
224 let pk = self.pager.alloc_many(1)?[0];
225 catalog.map.insert(field_id, pk);
226 catalog_dirty = true;
227 pk
228 };
229
230 rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
231 pk
232 } else {
233 let pk = self.pager.alloc_many(1)?[0];
234 catalog.map.insert(rid_field_id, pk);
235 catalog_dirty = true;
236 pk
237 };
238 }
239
240 let mut puts: Vec<BatchPut> = Vec::new();
241
242 let data_descriptor_missing = self
243 .pager
244 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
245 .pop()
246 .and_then(|r| match r {
247 GetResult::Raw { bytes, .. } => Some(bytes),
248 _ => None,
249 })
250 .is_none();
251
252 if data_descriptor_missing {
253 let (mut descriptor, tail_page) =
254 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
255 let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
256 if fingerprint != 0 {
257 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
258 }
259 puts.push(BatchPut::Raw {
260 key: descriptor.tail_page_pk,
261 bytes: tail_page,
262 });
263 puts.push(BatchPut::Raw {
264 key: descriptor_pk,
265 bytes: descriptor.to_le_bytes(),
266 });
267 }
268
269 let rid_descriptor_missing = self
270 .pager
271 .batch_get(&[BatchGet::Raw {
272 key: rid_descriptor_pk,
273 }])?
274 .pop()
275 .and_then(|r| match r {
276 GetResult::Raw { bytes, .. } => Some(bytes),
277 _ => None,
278 })
279 .is_none();
280
281 if rid_descriptor_missing {
282 let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
283 Arc::clone(&self.pager),
284 rid_descriptor_pk,
285 rid_field_id,
286 )?;
287 let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
288 if fingerprint != 0 {
289 DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
290 }
291 puts.push(BatchPut::Raw {
292 key: rid_descriptor.tail_page_pk,
293 bytes: tail_page,
294 });
295 puts.push(BatchPut::Raw {
296 key: rid_descriptor_pk,
297 bytes: rid_descriptor.to_le_bytes(),
298 });
299 }
300
301 self.dtype_cache.insert(field_id, data_type.clone());
302
303 if catalog_dirty {
304 let catalog_bytes = {
305 let catalog = self.catalog.read().unwrap();
306 catalog.to_bytes()
307 };
308 puts.push(BatchPut::Raw {
309 key: CATALOG_ROOT_PKEY,
310 bytes: catalog_bytes,
311 });
312 }
313
314 if !puts.is_empty() {
315 self.pager.batch_put(&puts)?;
316 }
317
318 Ok(())
319 }
320
321 pub fn filter_row_ids<T>(
331 &self,
332 field_id: LogicalFieldId,
333 predicate: &Predicate<T::Value>,
334 ) -> Result<Vec<u64>>
335 where
336 T: FilterDispatch,
337 {
338 tracing::trace!(field=?field_id, "filter_row_ids start");
339 let res = T::run_filter(self, field_id, predicate);
340 if let Err(ref err) = res {
341 tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
342 } else {
343 tracing::trace!(field=?field_id, "filter_row_ids ok");
344 }
345 res
346 }
347
348 pub fn filter_matches<T, F>(
363 &self,
364 field_id: LogicalFieldId,
365 predicate: F,
366 ) -> Result<FilterResult>
367 where
368 T: FilterPrimitive,
369 F: FnMut(T::Native) -> bool,
370 {
371 T::run_filter_with_result(self, field_id, predicate)
372 }
373
374 pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
383 let catalog = self.catalog.read().unwrap();
384 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
385
386 let desc_blob = self
387 .pager
388 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
389 .pop()
390 .and_then(|r| match r {
391 GetResult::Raw { bytes, .. } => Some(bytes),
392 _ => None,
393 })
394 .ok_or(Error::NotFound)?;
395 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
396
397 let kinds = descriptor.get_indexes()?;
398 Ok(kinds)
399 }
400
401 pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
410 let catalog = self.catalog.read().unwrap();
411 let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
412 drop(catalog);
413
414 let desc_blob = self
415 .pager
416 .batch_get(&[BatchGet::Raw { key: desc_pk }])?
417 .pop()
418 .and_then(|r| match r {
419 GetResult::Raw { bytes, .. } => Some(bytes),
420 _ => None,
421 })
422 .ok_or(Error::NotFound)?;
423
424 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
425 Ok(desc.total_row_count)
426 }
427
428 pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
437 use crate::types::Namespace;
438 let catalog = self.catalog.read().unwrap();
440 let candidates: Vec<LogicalFieldId> = catalog
442 .map
443 .keys()
444 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
445 .copied()
446 .collect();
447 drop(catalog);
448
449 if candidates.is_empty() {
450 return Ok(0);
451 }
452
453 let mut max_rows: u64 = 0;
455 for field in candidates {
456 let rows = self.total_rows_for_field(field)?;
457 if rows > max_rows {
458 max_rows = rows;
459 }
460 }
461 Ok(max_rows)
462 }
463
464 pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
470 use crate::types::Namespace;
471
472 let catalog = self.catalog.read().unwrap();
473 catalog
474 .map
475 .keys()
476 .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
477 .copied()
478 .collect()
479 }
480
481 pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
491 let rid_fid = rowid_fid(field_id);
492 let catalog = self.catalog.read().unwrap();
493 let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
494 let rid_desc_blob = self
495 .pager
496 .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
497 .pop()
498 .and_then(|r| match r {
499 GetResult::Raw { bytes, .. } => Some(bytes),
500 _ => None,
501 })
502 .ok_or(Error::NotFound)?;
503 let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
504 drop(catalog);
505
506 for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
508 let meta = m?;
509 if meta.row_count == 0 {
510 continue;
511 }
512 if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
513 || row_id > meta.max_val_u64
514 {
515 continue;
516 }
517 let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
519 if meta.value_order_perm_pk != 0 {
520 gets.push(BatchGet::Raw {
521 key: meta.value_order_perm_pk,
522 });
523 }
524 let results = self.pager.batch_get(&gets)?;
525 let mut rid_blob: Option<EntryHandle> = None;
526 let mut perm_blob: Option<EntryHandle> = None;
527 for r in results {
528 if let GetResult::Raw { key, bytes } = r {
529 if key == meta.chunk_pk {
530 rid_blob = Some(bytes);
531 } else if key == meta.value_order_perm_pk {
532 perm_blob = Some(bytes);
533 }
534 }
535 }
536 let Some(rid_blob) = rid_blob else { continue };
538 let rid_any = deserialize_array(rid_blob)?;
539 let rids = rid_any
540 .as_any()
541 .downcast_ref::<UInt64Array>()
542 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
543 if let Some(pblob) = perm_blob {
544 let perm_any = deserialize_array(pblob)?;
545 let perm = perm_any
546 .as_any()
547 .downcast_ref::<UInt32Array>()
548 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
549 let mut lo: isize = 0;
551 let mut hi: isize = (perm.len() as isize) - 1;
552 while lo <= hi {
553 let mid = ((lo + hi) >> 1) as usize;
554 let rid = rids.value(perm.value(mid) as usize);
555 if rid == row_id {
556 return Ok(true);
557 } else if rid < row_id {
558 lo = mid as isize + 1;
559 } else {
560 hi = mid as isize - 1;
561 }
562 }
563 } else {
564 let mut lo: isize = 0;
566 let mut hi: isize = (rids.len() as isize) - 1;
567 while lo <= hi {
568 let mid = ((lo + hi) >> 1) as usize;
569 let rid = rids.value(mid);
570 if rid == row_id {
571 return Ok(true);
572 } else if rid < row_id {
573 lo = mid as isize + 1;
574 } else {
575 hi = mid as isize - 1;
576 }
577 }
578 }
579 }
580 Ok(false)
581 }
582
583 #[allow(unused_variables, unused_assignments)] pub fn append(&self, batch: &RecordBatch) -> Result<()> {
615 tracing::trace!(
616 num_columns = batch.num_columns(),
617 num_rows = batch.num_rows(),
618 "ColumnStore::append BEGIN"
619 );
620 let working_batch: RecordBatch;
626 let batch_ref = {
627 let schema = batch.schema();
628 let row_id_idx = schema
629 .index_of(ROW_ID_COLUMN_NAME)
630 .map_err(|_| Error::Internal("row_id column required".into()))?;
631 let row_id_any = batch.column(row_id_idx).clone();
632 let row_id_arr = row_id_any
633 .as_any()
634 .downcast_ref::<UInt64Array>()
635 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
636
637 let mut is_sorted = true;
639 if !row_id_arr.is_empty() {
640 let mut last = row_id_arr.value(0);
641 for i in 1..row_id_arr.len() {
642 let current = row_id_arr.value(i);
643 if current < last {
644 is_sorted = false;
645 break;
646 }
647 last = current;
648 }
649 }
650
651 if is_sorted {
654 batch
655 } else {
656 let sort_col = SortColumn {
657 values: row_id_any,
658 options: None,
659 };
660 let idx = lexsort_to_indices(&[sort_col], None)?;
661 let perm = idx
662 .as_any()
663 .downcast_ref::<UInt32Array>()
664 .ok_or_else(|| Error::Internal("perm not u32".into()))?;
665 let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
666 for i in 0..batch.num_columns() {
667 cols.push(compute::take(batch.column(i), perm, None)?);
668 }
669 working_batch = RecordBatch::try_new(schema.clone(), cols)
670 .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
671 &working_batch
672 }
673 };
674
675 tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
676
677 let schema = batch_ref.schema();
682 let row_id_idx = schema
683 .index_of(ROW_ID_COLUMN_NAME)
684 .map_err(|_| Error::Internal("row_id column required".into()))?;
685
686 let row_id_arr = batch_ref
688 .column(row_id_idx)
689 .as_any()
690 .downcast_ref::<UInt64Array>()
691 .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
692 let mut incoming_ids_map = FxHashMap::default();
693 incoming_ids_map.reserve(row_id_arr.len());
694 for i in 0..row_id_arr.len() {
695 incoming_ids_map.insert(row_id_arr.value(i), i);
696 }
697
698 let mut catalog_dirty = false;
700 let mut puts_rewrites: Vec<BatchPut> = Vec::new();
701 let mut all_rewritten_ids = FxHashSet::default();
702
703 let mut catalog_lock = self.catalog.write().unwrap();
705 for i in 0..batch_ref.num_columns() {
706 if i == row_id_idx {
707 continue;
708 }
709 let field = schema.field(i);
710 if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
711 let field_id = field_id_str
712 .parse::<u64>()
713 .map(LogicalFieldId::from)
714 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
715
716 let rewritten = self.lww_rewrite_for_field(
719 &mut catalog_lock,
720 field_id,
721 &incoming_ids_map,
722 batch_ref.column(i),
723 batch_ref.column(row_id_idx),
724 &mut puts_rewrites,
725 )?;
726 all_rewritten_ids.extend(rewritten);
727 }
728 }
729 drop(catalog_lock);
730
731 if !puts_rewrites.is_empty() {
733 self.pager.batch_put(&puts_rewrites)?;
734 }
735
736 tracing::trace!("ColumnStore::append PHASE 2 complete - LWW rewrites done");
737
738 let batch_to_append = if !all_rewritten_ids.is_empty() {
742 let keep_mask: Vec<bool> = (0..row_id_arr.len())
743 .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
744 .collect();
745 let keep_array = BooleanArray::from(keep_mask);
746 compute::filter_record_batch(batch_ref, &keep_array)?
747 } else {
748 batch_ref.clone()
749 };
750
751 if batch_to_append.num_rows() == 0 {
753 tracing::trace!("ColumnStore::append early exit - no new rows to append");
754 return Ok(());
755 }
756
757 tracing::trace!("ColumnStore::append PHASE 3 complete - filtered for new rows");
758
759 let append_schema = batch_to_append.schema();
763 let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
764 let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
765 let mut puts_appends: Vec<BatchPut> = Vec::new();
766
767 for (i, array) in batch_to_append.columns().iter().enumerate() {
769 if i == append_row_id_idx {
770 continue;
771 }
772
773 let field = append_schema.field(i);
774
775 let field_id = field
776 .metadata()
777 .get(crate::store::FIELD_ID_META_KEY)
778 .ok_or_else(|| Error::Internal("Missing field_id".into()))?
779 .parse::<u64>()
780 .map(LogicalFieldId::from)
781 .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
782
783 self.dtype_cache.insert(field_id, field.data_type().clone());
786
787 let (array_clean, rids_clean) = if array.null_count() == 0 {
790 (array.clone(), append_row_id_any.clone())
791 } else {
792 let keep =
793 BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
794 let a = compute::filter(array, &keep)?;
795 let r = compute::filter(&append_row_id_any, &keep)?;
796 (a, r)
797 };
798
799 if array_clean.is_empty() {
800 continue;
801 }
802
803 let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
806 let mut catalog = self.catalog.write().unwrap();
807 let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
808 catalog_dirty = true;
809 self.pager.alloc_many(1).unwrap()[0]
810 });
811 let r_fid = rowid_fid(field_id);
812 let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
813 catalog_dirty = true;
814 self.pager.alloc_many(1).unwrap()[0]
815 });
816 (pk1, pk2, r_fid)
817 };
818
819 let (mut data_descriptor, mut data_tail_page) =
822 ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
823 .map_err(|e| {
824 tracing::error!(
825 ?field_id,
826 descriptor_pk,
827 error = ?e,
828 "append: load_or_create failed for data descriptor"
829 );
830 e
831 })?;
832 let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
833 Arc::clone(&self.pager),
834 rid_descriptor_pk,
835 rid_fid,
836 )
837 .map_err(|e| {
838 tracing::error!(
839 ?rid_fid,
840 rid_descriptor_pk,
841 error = ?e,
842 "append: load_or_create failed for rid descriptor"
843 );
844 e
845 })?;
846
847 self.index_manager
851 .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
852
853 let slices = split_to_target_bytes(
855 &array_clean,
856 TARGET_CHUNK_BYTES,
857 self.cfg.varwidth_fallback_rows_per_slice,
858 );
859 let mut row_off = 0usize;
860
861 for s in slices {
863 let rows = s.len();
864 let data_pk = self.pager.alloc_many(1)?[0];
866 let s_norm = zero_offset(&s);
867 let data_bytes = serialize_array(s_norm.as_ref())?;
868 puts_appends.push(BatchPut::Raw {
869 key: data_pk,
870 bytes: data_bytes,
871 });
872
873 let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
875 let rid_norm = zero_offset(&rid_slice);
876 let rid_pk = self.pager.alloc_many(1)?[0];
877 let rid_bytes = serialize_array(rid_norm.as_ref())?;
878 puts_appends.push(BatchPut::Raw {
879 key: rid_pk,
880 bytes: rid_bytes,
881 });
882
883 let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
885 let (min, max) = if !rids_for_meta.is_empty() {
886 let mut min_val = rids_for_meta.value(0);
887 let mut max_val = rids_for_meta.value(0);
888 for i in 1..rids_for_meta.len() {
889 let v = rids_for_meta.value(i);
890 if v < min_val {
891 min_val = v;
892 }
893 if v > max_val {
894 max_val = v;
895 }
896 }
897 (min_val, max_val)
898 } else {
899 (0, 0)
900 };
901
902 let mut data_meta = ChunkMetadata {
905 chunk_pk: data_pk,
906 row_count: rows as u64,
907 serialized_bytes: s_norm.get_array_memory_size() as u64,
908 max_val_u64: u64::MAX,
909 ..Default::default()
910 };
911 let mut rid_meta = ChunkMetadata {
912 chunk_pk: rid_pk,
913 row_count: rows as u64,
914 serialized_bytes: rid_norm.get_array_memory_size() as u64,
915 min_val_u64: min,
916 max_val_u64: max,
917 ..Default::default()
918 };
919
920 self.index_manager.stage_updates_for_new_chunk(
926 field_id,
927 &data_descriptor,
928 &s_norm,
929 &rid_norm,
930 &mut data_meta,
931 &mut rid_meta,
932 &mut puts_appends,
933 )?;
934
935 self.append_meta_in_loop(
937 &mut data_descriptor,
938 &mut data_tail_page,
939 data_meta,
940 &mut puts_appends,
941 )?;
942 self.append_meta_in_loop(
943 &mut rid_descriptor,
944 &mut rid_tail_page,
945 rid_meta,
946 &mut puts_appends,
947 )?;
948 row_off += rows;
949 }
950
951 puts_appends.push(BatchPut::Raw {
954 key: data_descriptor.tail_page_pk,
955 bytes: data_tail_page,
956 });
957 puts_appends.push(BatchPut::Raw {
958 key: descriptor_pk,
959 bytes: data_descriptor.to_le_bytes(),
960 });
961 puts_appends.push(BatchPut::Raw {
962 key: rid_descriptor.tail_page_pk,
963 bytes: rid_tail_page,
964 });
965 puts_appends.push(BatchPut::Raw {
966 key: rid_descriptor_pk,
967 bytes: rid_descriptor.to_le_bytes(),
968 });
969 }
970
971 if catalog_dirty {
974 let catalog = self.catalog.read().unwrap();
975 puts_appends.push(BatchPut::Raw {
976 key: CATALOG_ROOT_PKEY,
977 bytes: catalog.to_bytes(),
978 });
979 }
980
981 if !puts_appends.is_empty() {
985 self.pager.batch_put(&puts_appends)?;
986 }
987 tracing::trace!("ColumnStore::append END - success");
988 Ok(())
989 }
990
991 fn lww_rewrite_for_field(
992 &self,
993 catalog: &mut ColumnCatalog,
994 field_id: LogicalFieldId,
995 incoming_ids_map: &FxHashMap<u64, usize>,
996 incoming_data: &ArrayRef,
997 incoming_row_ids: &ArrayRef,
998 puts: &mut Vec<BatchPut>,
999 ) -> Result<FxHashSet<u64>> {
1000 use crate::store::descriptor::DescriptorIterator;
1001 use crate::store::ingest::ChunkEdit;
1002
1003 if incoming_ids_map.is_empty() {
1005 return Ok(FxHashSet::default());
1006 }
1007 let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
1008
1009 let desc_pk_data = match catalog.map.get(&field_id) {
1011 Some(pk) => *pk,
1012 None => return Ok(FxHashSet::default()),
1013 };
1014 let rid_fid = rowid_fid(field_id);
1015 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1016 Some(pk) => *pk,
1017 None => return Ok(FxHashSet::default()),
1018 };
1019
1020 let gets = vec![
1022 BatchGet::Raw { key: desc_pk_data },
1023 BatchGet::Raw { key: desc_pk_rid },
1024 ];
1025 let results = self.pager.batch_get(&gets)?;
1026 let mut blobs_by_pk = FxHashMap::default();
1027 for r in results {
1028 if let GetResult::Raw { key, bytes } = r {
1029 blobs_by_pk.insert(key, bytes);
1030 }
1031 }
1032
1033 let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
1034 tracing::error!(
1035 ?field_id,
1036 desc_pk_data,
1037 "lww_rewrite: data descriptor blob not found in pager"
1038 );
1039 Error::NotFound
1040 })?;
1041 let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
1042
1043 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
1044 tracing::error!(
1045 ?rid_fid,
1046 desc_pk_rid,
1047 "lww_rewrite: rid descriptor blob not found in pager"
1048 );
1049 Error::NotFound
1050 })?;
1051 let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1052
1053 tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
1054
1055 let mut metas_data: Vec<ChunkMetadata> = Vec::new();
1057 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1058 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
1059 metas_data.push(m.map_err(|e| {
1060 tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1061 e
1062 })?);
1063 }
1064 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1065 metas_rid.push(m.map_err(|e| {
1066 tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1067 e
1068 })?);
1069 }
1070
1071 tracing::trace!(
1072 ?field_id,
1073 data_chunks = metas_data.len(),
1074 rid_chunks = metas_rid.len(),
1075 "lww_rewrite: chunk metadata collected"
1076 );
1077
1078 let rid_in = incoming_row_ids
1080 .as_any()
1081 .downcast_ref::<UInt64Array>()
1082 .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1083 let mut ids_to_delete = FxHashSet::default();
1084 let mut ids_to_upsert = FxHashSet::default();
1085 for i in 0..rid_in.len() {
1086 let rid = rid_in.value(i);
1087 if incoming_data.is_null(i) {
1088 ids_to_delete.insert(rid);
1089 } else {
1090 ids_to_upsert.insert(rid);
1091 }
1092 }
1093
1094 let mut rewritten_ids = FxHashSet::default();
1096 let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1097 let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1098
1099 let n = metas_data.len().min(metas_rid.len());
1100 if n > 0 {
1101 let mut gets_rid = Vec::with_capacity(n);
1103 for rm in metas_rid.iter().take(n) {
1104 gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
1105 }
1106 let rid_results = self.pager.batch_get(&gets_rid)?;
1107 let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1108 for r in rid_results {
1109 if let GetResult::Raw { key, bytes } = r {
1110 rid_blobs.insert(key, bytes);
1111 }
1112 }
1113
1114 for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
1115 if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1116 let rid_arr_any = deserialize_array(rid_blob.clone())?;
1117 let rid_arr = rid_arr_any
1118 .as_any()
1119 .downcast_ref::<UInt64Array>()
1120 .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1121 for j in 0..rid_arr.len() {
1122 let rid = rid_arr.value(j);
1123 if incoming_ids.contains(&rid) {
1124 if ids_to_delete.contains(&rid) {
1125 hit_del.entry(i).or_default().push(rid);
1126 } else if ids_to_upsert.contains(&rid) {
1127 hit_up.entry(i).or_default().push(rid);
1128 }
1129 rewritten_ids.insert(rid);
1130 }
1131 }
1132 }
1133 }
1134 }
1135
1136 if hit_up.is_empty() && hit_del.is_empty() {
1137 return Ok(rewritten_ids);
1138 }
1139
1140 let mut hit_set = FxHashSet::default();
1142 hit_set.extend(hit_up.keys().copied());
1143 hit_set.extend(hit_del.keys().copied());
1144 let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1145
1146 let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1147 for &i in &hit_idxs {
1148 gets.push(BatchGet::Raw {
1149 key: metas_data[i].chunk_pk,
1150 });
1151 gets.push(BatchGet::Raw {
1152 key: metas_rid[i].chunk_pk,
1153 });
1154 }
1155 let results = self.pager.batch_get(&gets)?;
1156 let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1157 for r in results {
1158 if let GetResult::Raw { key, bytes } = r {
1159 blob_map.insert(key, bytes);
1160 }
1161 }
1162
1163 for i in hit_idxs {
1165 let old_data_arr =
1166 deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1167 let old_rid_arr_any =
1168 deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1169 let old_rid_arr = old_rid_arr_any
1170 .as_any()
1171 .downcast_ref::<UInt64Array>()
1172 .unwrap();
1173
1174 let up_vec = hit_up.remove(&i).unwrap_or_default();
1175 let del_vec = hit_del.remove(&i).unwrap_or_default();
1176
1177 let edit = ChunkEdit::from_lww_upsert(
1179 old_rid_arr,
1180 &up_vec,
1181 &del_vec,
1182 incoming_data,
1183 incoming_row_ids,
1184 incoming_ids_map,
1185 )?;
1186
1187 let (new_data_arr, new_rid_arr) =
1188 ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1189
1190 let data_bytes = serialize_array(&new_data_arr)?;
1192 puts.push(BatchPut::Raw {
1193 key: metas_data[i].chunk_pk,
1194 bytes: data_bytes,
1195 });
1196 metas_data[i].row_count = new_data_arr.len() as u64;
1197 metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1198
1199 if let Some(rarr) = new_rid_arr {
1201 let rid_bytes = serialize_array(&rarr)?;
1202 puts.push(BatchPut::Raw {
1203 key: metas_rid[i].chunk_pk,
1204 bytes: rid_bytes,
1205 });
1206 metas_rid[i].row_count = rarr.len() as u64;
1207 metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1208 }
1209
1210 if metas_data[i].value_order_perm_pk != 0 {
1212 let sort_col = SortColumn {
1213 values: new_data_arr,
1214 options: None,
1215 };
1216 let idx = lexsort_to_indices(&[sort_col], None)?;
1217 let perm_bytes = serialize_array(&idx)?;
1218 puts.push(BatchPut::Raw {
1219 key: metas_data[i].value_order_perm_pk,
1220 bytes: perm_bytes,
1221 });
1222 }
1223 }
1224
1225 descriptor_data.rewrite_pages(
1227 Arc::clone(&self.pager),
1228 desc_pk_data,
1229 &mut metas_data,
1230 puts,
1231 )?;
1232 descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1233
1234 Ok(rewritten_ids)
1235 }
1236
1237 fn stage_delete_rows_for_field(
1238 &self,
1239 field_id: LogicalFieldId,
1240 rows_to_delete: &[RowId],
1241 staged_puts: &mut Vec<BatchPut>,
1242 ) -> Result<bool> {
1243 tracing::warn!(
1244 field_id = ?field_id,
1245 rows = rows_to_delete.len(),
1246 "delete_rows stage_delete_rows_for_field: start"
1247 );
1248 use crate::store::descriptor::DescriptorIterator;
1249 use crate::store::ingest::ChunkEdit;
1250
1251 if rows_to_delete.is_empty() {
1252 return Ok(false);
1253 }
1254
1255 let mut del_iter = rows_to_delete.iter().copied();
1257 let mut cur_del = del_iter.next();
1258 let mut last_seen: Option<u64> = cur_del;
1259
1260 let catalog = self.catalog.read().unwrap();
1262 let desc_pk = match catalog.map.get(&field_id) {
1263 Some(pk) => *pk,
1264 None => {
1265 tracing::trace!(
1266 field_id = ?field_id,
1267 "delete_rows stage_delete_rows_for_field: data descriptor missing"
1268 );
1269 return Err(Error::NotFound);
1270 }
1271 };
1272 let rid_fid = rowid_fid(field_id);
1273 let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1274 tracing::warn!(
1275 field_id = ?field_id,
1276 desc_pk,
1277 desc_pk_rid = ?desc_pk_rid,
1278 "delete_rows stage_delete_rows_for_field: descriptor keys"
1279 );
1280
1281 let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1283 if let Some(pk) = desc_pk_rid {
1284 gets.push(BatchGet::Raw { key: pk });
1285 }
1286 let results = match self.pager.batch_get(&gets) {
1287 Ok(res) => res,
1288 Err(err) => {
1289 tracing::trace!(
1290 field_id = ?field_id,
1291 error = ?err,
1292 "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1293 );
1294 return Err(err);
1295 }
1296 };
1297 let mut blobs_by_pk = FxHashMap::default();
1298 for res in results {
1299 if let GetResult::Raw { key, bytes } = res {
1300 blobs_by_pk.insert(key, bytes);
1301 }
1302 }
1303
1304 tracing::warn!(
1305 field_id = ?field_id,
1306 desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1307 rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1308 "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1309 );
1310
1311 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1312 tracing::trace!(
1313 field_id = ?field_id,
1314 desc_pk,
1315 "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1316 );
1317 Error::Internal(format!(
1318 "descriptor pk={} missing during delete_rows for field {:?}",
1319 desc_pk, field_id
1320 ))
1321 })?;
1322 let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1323
1324 let mut metas: Vec<ChunkMetadata> = Vec::new();
1326 for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1327 metas.push(m?);
1328 }
1329 if metas.is_empty() {
1330 drop(catalog);
1331 return Ok(false);
1332 }
1333
1334 let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1336 let mut descriptor_rid: Option<ColumnDescriptor> = None;
1337 tracing::warn!(
1338 field_id = ?field_id,
1339 metas_len = metas.len(),
1340 desc_pk_rid = ?desc_pk_rid,
1341 "delete_rows stage_delete_rows_for_field: data metas loaded"
1342 );
1343 if let Some(pk_rid) = desc_pk_rid
1344 && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1345 {
1346 let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1347 for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1348 metas_rid.push(m?);
1349 }
1350 descriptor_rid = Some(d_rid);
1351 }
1352
1353 tracing::warn!(
1354 field_id = ?field_id,
1355 metas_rid_len = metas_rid.len(),
1356 "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1357 );
1358
1359 let mut cum_rows = 0u64;
1360 let mut any_changed = false;
1361
1362 for (i, meta) in metas.iter_mut().enumerate() {
1363 let start_u64 = cum_rows;
1364 let end_u64 = start_u64 + meta.row_count;
1365
1366 while let Some(d) = cur_del {
1368 if d < start_u64
1369 && let Some(prev) = last_seen
1370 {
1371 if d < prev {
1372 return Err(Error::Internal(
1373 "rows_to_delete must be ascending/unique".into(),
1374 ));
1375 }
1376
1377 last_seen = Some(d);
1378 cur_del = del_iter.next();
1379 } else {
1380 break;
1381 }
1382 }
1383
1384 let rows = meta.row_count as usize;
1386 let mut del_local: FxHashSet<usize> = FxHashSet::default();
1387 while let Some(d) = cur_del {
1388 if d >= end_u64 {
1389 break;
1390 }
1391 del_local.insert((d - start_u64) as usize);
1392 last_seen = Some(d);
1393 cur_del = del_iter.next();
1394 }
1395
1396 if del_local.is_empty() {
1397 cum_rows = end_u64;
1398 continue;
1399 }
1400
1401 let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1403 if let Some(rm) = metas_rid.get(i) {
1404 chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1405 }
1406 let chunk_results = match self.pager.batch_get(&chunk_gets) {
1407 Ok(res) => res,
1408 Err(err) => {
1409 tracing::trace!(
1410 field_id = ?field_id,
1411 chunk_pk = meta.chunk_pk,
1412 error = ?err,
1413 "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1414 );
1415 return Err(err);
1416 }
1417 };
1418 let mut chunk_blobs = FxHashMap::default();
1419 for res in chunk_results {
1420 if let GetResult::Raw { key, bytes } = res {
1421 chunk_blobs.insert(key, bytes);
1422 }
1423 }
1424
1425 tracing::warn!(
1426 field_id = ?field_id,
1427 chunk_pk = meta.chunk_pk,
1428 rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1429 data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1430 rid_found = metas_rid
1431 .get(i)
1432 .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1433 "delete_rows stage_delete_rows_for_field: chunk fetch status"
1434 );
1435
1436 let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1437 Some(bytes) => bytes,
1438 None => {
1439 tracing::trace!(
1440 field_id = ?field_id,
1441 chunk_pk = meta.chunk_pk,
1442 "delete_rows stage_delete_rows_for_field: chunk missing"
1443 );
1444 return Err(Error::NotFound);
1445 }
1446 };
1447 let data_arr = deserialize_array(data_blob)?;
1448
1449 let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1450 let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1451 Some(bytes) => bytes,
1452 None => {
1453 tracing::trace!(
1454 field_id = ?field_id,
1455 rowid_chunk_pk = rm.chunk_pk,
1456 "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1457 );
1458 return Err(Error::NotFound);
1459 }
1460 };
1461 Some(deserialize_array(rid_blob)?)
1462 } else {
1463 None
1464 };
1465
1466 let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1468
1469 let (new_data_arr, new_rid_arr) =
1471 ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1472
1473 let data_bytes = serialize_array(&new_data_arr)?;
1475 staged_puts.push(BatchPut::Raw {
1476 key: meta.chunk_pk,
1477 bytes: data_bytes,
1478 });
1479 meta.row_count = new_data_arr.len() as u64;
1480 meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1481
1482 if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1484 let rm = metas_rid.get_mut(i).unwrap();
1485 let rid_bytes = serialize_array(&rids)?;
1486 staged_puts.push(BatchPut::Raw {
1487 key: rm.chunk_pk,
1488 bytes: rid_bytes,
1489 });
1490 rm.row_count = rids.len() as u64;
1491 rm.serialized_bytes = rids.get_array_memory_size() as u64;
1492 }
1493
1494 if meta.value_order_perm_pk != 0 {
1496 let sort_column = SortColumn {
1497 values: new_data_arr,
1498 options: None,
1499 };
1500 let indices = lexsort_to_indices(&[sort_column], None)?;
1501 let perm_bytes = serialize_array(&indices)?;
1502 staged_puts.push(BatchPut::Raw {
1503 key: meta.value_order_perm_pk,
1504 bytes: perm_bytes,
1505 });
1506 }
1507
1508 cum_rows = end_u64;
1509 any_changed = true;
1510 }
1511
1512 descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1514 if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1515 rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1516 }
1517 drop(catalog);
1518 tracing::trace!(
1519 field_id = ?field_id,
1520 changed = any_changed,
1521 "delete_rows stage_delete_rows_for_field: finished stage"
1522 );
1523 Ok(any_changed)
1524 }
1525
1526 pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1532 if fields.is_empty() || rows_to_delete.is_empty() {
1533 return Ok(());
1534 }
1535
1536 let mut puts = Vec::new();
1537 let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1538 let mut table_id: Option<TableId> = None;
1539
1540 tracing::warn!(
1541 fields = fields.len(),
1542 rows = rows_to_delete.len(),
1543 "delete_rows begin"
1544 );
1545 for field_id in fields {
1546 tracing::warn!(field = ?field_id, "delete_rows iter field");
1547 if let Some(expected) = table_id {
1548 if field_id.table_id() != expected {
1549 return Err(Error::InvalidArgumentError(
1550 "delete_rows requires fields from the same table".into(),
1551 ));
1552 }
1553 } else {
1554 table_id = Some(field_id.table_id());
1555 }
1556
1557 if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1558 touched.insert(*field_id);
1559 }
1560 }
1561
1562 if puts.is_empty() {
1563 return Ok(());
1564 }
1565
1566 self.pager.batch_put(&puts)?;
1567
1568 tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1569
1570 for field_id in touched {
1571 self.compact_field_bounded(field_id)?;
1572 }
1573 tracing::warn!("delete_rows complete");
1574 Ok(())
1575 }
1576
1577 pub(crate) fn write_descriptor_chain(
1583 &self,
1584 descriptor_pk: PhysicalKey,
1585 descriptor: &mut ColumnDescriptor,
1586 new_metas: &[ChunkMetadata],
1587 puts: &mut Vec<BatchPut>,
1588 frees: &mut Vec<PhysicalKey>,
1589 ) -> Result<()> {
1590 let mut old_pages = Vec::new();
1592 let mut pk = descriptor.head_page_pk;
1593 while pk != 0 {
1594 let page_blob = self
1595 .pager
1596 .batch_get(&[BatchGet::Raw { key: pk }])?
1597 .pop()
1598 .and_then(|res| match res {
1599 GetResult::Raw { bytes, .. } => Some(bytes),
1600 _ => None,
1601 })
1602 .ok_or(Error::NotFound)?;
1603 let header = DescriptorPageHeader::from_le_bytes(
1604 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1605 );
1606 old_pages.push(pk);
1607 pk = header.next_page_pk;
1608 }
1609
1610 let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1612 let need_pages = if new_metas.is_empty() {
1613 0
1614 } else {
1615 new_metas.len().div_ceil(per)
1616 };
1617 if need_pages == 0 {
1619 frees.extend(old_pages.iter().copied());
1620
1621 descriptor.head_page_pk = 0;
1626 descriptor.tail_page_pk = 0;
1627 descriptor.total_row_count = 0;
1628 descriptor.total_chunk_count = 0;
1629 puts.push(BatchPut::Raw {
1630 key: descriptor_pk,
1631 bytes: descriptor.to_le_bytes(),
1632 });
1633 return Ok(());
1634 }
1635
1636 let mut pages = Vec::with_capacity(need_pages);
1638 if !old_pages.is_empty() {
1639 pages.push(old_pages[0]);
1640 } else {
1641 pages.push(self.pager.alloc_many(1)?[0]);
1642 descriptor.head_page_pk = pages[0];
1643 }
1644 if need_pages > pages.len() {
1645 let extra = self.pager.alloc_many(need_pages - pages.len())?;
1646 pages.extend(extra);
1647 }
1648
1649 if old_pages.len() > need_pages {
1651 frees.extend(old_pages[need_pages..].iter().copied());
1652 }
1653
1654 let mut off = 0usize;
1656 for (i, page_pk) in pages.iter().copied().enumerate() {
1657 let remain = new_metas.len() - off;
1658 let count = remain.min(per);
1659 let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1660 let header = DescriptorPageHeader {
1661 next_page_pk: next,
1662 entry_count: count as u32,
1663 _padding: [0; 4],
1664 };
1665 let mut page_bytes = header.to_le_bytes().to_vec();
1666 for m in &new_metas[off..off + count] {
1667 page_bytes.extend_from_slice(&m.to_le_bytes());
1668 }
1669 puts.push(BatchPut::Raw {
1670 key: page_pk,
1671 bytes: page_bytes,
1672 });
1673 off += count;
1674 }
1675
1676 descriptor.tail_page_pk = *pages.last().unwrap();
1677 descriptor.total_chunk_count = new_metas.len() as u64;
1678 descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1679 puts.push(BatchPut::Raw {
1680 key: descriptor_pk,
1681 bytes: descriptor.to_le_bytes(),
1682 });
1683 Ok(())
1684 }
1685
1686 fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1690 let mut catalog = self.catalog.write().unwrap();
1692
1693 let desc_pk = match catalog.map.get(&field_id) {
1694 Some(&pk) => pk,
1695 None => return Ok(()),
1696 };
1697 let rid_fid = rowid_fid(field_id);
1698 let desc_pk_rid = match catalog.map.get(&rid_fid) {
1699 Some(&pk) => pk,
1700 None => return Ok(()),
1701 };
1702
1703 let gets = vec![
1705 BatchGet::Raw { key: desc_pk },
1706 BatchGet::Raw { key: desc_pk_rid },
1707 ];
1708 let results = self.pager.batch_get(&gets)?;
1709 let mut blobs_by_pk = FxHashMap::default();
1710 for res in results {
1711 if let GetResult::Raw { key, bytes } = res {
1712 blobs_by_pk.insert(key, bytes);
1713 }
1714 }
1715 let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1716 let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1717 let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1718 let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1719
1720 let mut metas = Vec::new();
1722 for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1723 metas.push(m?);
1724 }
1725 let mut metas_rid = Vec::new();
1726 for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1727 metas_rid.push(m?);
1728 }
1729 if metas.is_empty() || metas_rid.is_empty() {
1730 return Ok(());
1731 }
1732
1733 let mut puts: Vec<BatchPut> = Vec::new();
1734 let mut frees: Vec<PhysicalKey> = Vec::new();
1735 let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1736 let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1737
1738 let mut i = 0usize;
1739 while i < metas.len() {
1740 let sz = metas[i].serialized_bytes as usize;
1741 if sz >= MIN_CHUNK_BYTES {
1743 new_metas.push(metas[i]);
1744 new_rid_metas.push(metas_rid[i]);
1745 i += 1;
1746 continue;
1747 }
1748
1749 let mut j = i;
1751 let mut run_bytes = 0usize;
1752 while j < metas.len() {
1753 let b = metas[j].serialized_bytes as usize;
1754 if b >= TARGET_CHUNK_BYTES {
1755 break;
1756 }
1757 if run_bytes + b > MAX_MERGE_RUN_BYTES {
1758 break;
1759 }
1760 run_bytes += b;
1761 j += 1;
1762 }
1763 if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1764 new_metas.push(metas[i]);
1765 new_rid_metas.push(metas_rid[i]);
1766 i += 1;
1767 continue;
1768 }
1769
1770 let mut gets = Vec::with_capacity((j - i) * 2);
1772 for k in i..j {
1773 gets.push(BatchGet::Raw {
1774 key: metas[k].chunk_pk,
1775 });
1776 gets.push(BatchGet::Raw {
1777 key: metas_rid[k].chunk_pk,
1778 });
1779 }
1780 let results = self.pager.batch_get(&gets)?;
1781 let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1782 for r in results {
1783 match r {
1784 GetResult::Raw { key, bytes } => {
1785 by_pk.insert(key, bytes);
1786 }
1787 _ => return Err(Error::NotFound),
1788 }
1789 }
1790 let mut data_parts = Vec::with_capacity(j - i);
1791 let mut rid_parts = Vec::with_capacity(j - i);
1792 for k in i..j {
1793 let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1794 data_parts.push(deserialize_array(db.clone())?);
1795 let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1796 rid_parts.push(deserialize_array(rb.clone())?);
1797 }
1798 let merged_data = concat_many(data_parts.iter().collect())?;
1799 let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1800
1801 let slices = split_to_target_bytes(
1803 &merged_data,
1804 TARGET_CHUNK_BYTES,
1805 self.cfg.varwidth_fallback_rows_per_slice,
1806 );
1807 let mut rid_off = 0usize;
1808 let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1809
1810 for s in slices {
1811 let rows = s.len();
1812 let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1814 let rid_norm = zero_offset(&rid_ref);
1815 let rid_pk = self.pager.alloc_many(1)?[0];
1816 let rid_bytes = serialize_array(rid_norm.as_ref())?;
1817
1818 let data_pk = self.pager.alloc_many(1)?[0];
1819 let s_norm = zero_offset(&s);
1820 let data_bytes = serialize_array(s_norm.as_ref())?;
1821 puts.push(BatchPut::Raw {
1822 key: data_pk,
1823 bytes: data_bytes,
1824 });
1825 puts.push(BatchPut::Raw {
1826 key: rid_pk,
1827 bytes: rid_bytes,
1828 });
1829 let mut meta = ChunkMetadata {
1830 chunk_pk: data_pk,
1831 row_count: rows as u64,
1832 serialized_bytes: s_norm.get_array_memory_size() as u64,
1833 max_val_u64: u64::MAX,
1834 ..Default::default()
1835 };
1836 if need_perms {
1838 let sort_col = SortColumn {
1839 values: s.clone(),
1840 options: None,
1841 };
1842 let idx = lexsort_to_indices(&[sort_col], None)?;
1843 let perm_bytes = serialize_array(&idx)?;
1844 let perm_pk = self.pager.alloc_many(1)?[0];
1845 puts.push(BatchPut::Raw {
1846 key: perm_pk,
1847 bytes: perm_bytes,
1848 });
1849 meta.value_order_perm_pk = perm_pk;
1850 }
1851
1852 let rid_any = rid_norm.clone();
1854 let rids = rid_any
1855 .as_any()
1856 .downcast_ref::<UInt64Array>()
1857 .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1858 let mut min = u64::MAX;
1859 let mut max = 0u64;
1860 let mut sorted_rids = true;
1861 let mut last_v = 0u64;
1862 for ii in 0..rids.len() {
1863 let v = rids.value(ii);
1864 if ii == 0 {
1865 last_v = v;
1866 } else if v < last_v {
1867 sorted_rids = false;
1868 } else {
1869 last_v = v;
1870 }
1871 if v < min {
1872 min = v;
1873 }
1874 if v > max {
1875 max = v;
1876 }
1877 }
1878 let mut rid_perm_pk = 0u64;
1879 if !sorted_rids {
1880 let rid_sort_col = SortColumn {
1881 values: rid_any,
1882 options: None,
1883 };
1884 let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1885 let rid_perm_bytes = serialize_array(&rid_idx)?;
1886 rid_perm_pk = self.pager.alloc_many(1)?[0];
1887 puts.push(BatchPut::Raw {
1888 key: rid_perm_pk,
1889 bytes: rid_perm_bytes,
1890 });
1891 }
1892 let rid_meta = ChunkMetadata {
1893 chunk_pk: rid_pk,
1894 value_order_perm_pk: rid_perm_pk,
1895 row_count: rows as u64,
1896 serialized_bytes: rid_norm.get_array_memory_size() as u64,
1897 min_val_u64: if rows > 0 { min } else { 0 },
1898 max_val_u64: if rows > 0 { max } else { 0 },
1899 };
1900 new_metas.push(meta);
1901 new_rid_metas.push(rid_meta);
1902 rid_off += rows;
1903 }
1904
1905 for k in i..j {
1907 frees.push(metas[k].chunk_pk);
1908 if metas[k].value_order_perm_pk != 0 {
1909 frees.push(metas[k].value_order_perm_pk);
1910 }
1911 frees.push(metas_rid[k].chunk_pk);
1912 if metas_rid[k].value_order_perm_pk != 0 {
1913 frees.push(metas_rid[k].value_order_perm_pk);
1914 }
1915 }
1916
1917 i = j;
1918 }
1919
1920 if new_metas.is_empty() {
1922 self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1924 self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1925 catalog.map.remove(&field_id);
1926 catalog.map.remove(&rid_fid);
1927 puts.push(BatchPut::Raw {
1928 key: CATALOG_ROOT_PKEY,
1929 bytes: catalog.to_bytes(),
1930 });
1931 if !puts.is_empty() {
1932 self.pager.batch_put(&puts)?;
1933 }
1934 if !frees.is_empty() {
1935 self.pager.free_many(&frees)?;
1936 }
1937 return Ok(());
1938 }
1939
1940 self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1942 self.write_descriptor_chain(
1943 desc_pk_rid,
1944 &mut desc_rid,
1945 &new_rid_metas,
1946 &mut puts,
1947 &mut frees,
1948 )?;
1949 if !puts.is_empty() {
1951 self.pager.batch_put(&puts)?;
1952 }
1953 if !frees.is_empty() {
1954 self.pager.free_many(&frees)?;
1955 }
1956
1957 Ok(())
1958 }
1959
1960 fn append_meta_in_loop(
1963 &self,
1964 descriptor: &mut ColumnDescriptor,
1965 tail_page_bytes: &mut Vec<u8>,
1966 meta: ChunkMetadata,
1967 puts: &mut Vec<BatchPut>,
1968 ) -> Result<()> {
1969 let mut header = DescriptorPageHeader::from_le_bytes(
1970 &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1971 );
1972 if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1973 && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1974 {
1975 tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1977 header.entry_count += 1;
1978 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1979 .copy_from_slice(&header.to_le_bytes());
1980 } else {
1981 let new_tail_pk = self.pager.alloc_many(1)?[0];
1983 header.next_page_pk = new_tail_pk;
1984 tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1985 .copy_from_slice(&header.to_le_bytes());
1986 let full_page_to_write = std::mem::take(tail_page_bytes);
1989 puts.push(BatchPut::Raw {
1990 key: descriptor.tail_page_pk,
1991 bytes: full_page_to_write,
1992 });
1993 let new_header = DescriptorPageHeader {
1995 next_page_pk: 0,
1996 entry_count: 1,
1997 _padding: [0; 4],
1998 };
1999 let mut new_page_bytes = new_header.to_le_bytes().to_vec();
2000 new_page_bytes.extend_from_slice(&meta.to_le_bytes());
2001 descriptor.tail_page_pk = new_tail_pk;
2003 *tail_page_bytes = new_page_bytes;
2004 }
2005
2006 descriptor.total_row_count += meta.row_count;
2007 descriptor.total_chunk_count += 1;
2008 Ok(())
2009 }
2010
2011 pub fn verify_integrity(&self) -> Result<()> {
2021 let catalog = self.catalog.read().unwrap();
2022 for (&field_id, &descriptor_pk) in &catalog.map {
2023 let desc_blob = self
2024 .pager
2025 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2026 .pop()
2027 .and_then(|r| match r {
2028 GetResult::Raw { bytes, .. } => Some(bytes),
2029 _ => None,
2030 })
2031 .ok_or_else(|| {
2032 Error::Internal(format!(
2033 "Catalog points to missing descriptor pk={}",
2034 descriptor_pk
2035 ))
2036 })?;
2037 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2038 if descriptor.field_id != field_id {
2039 return Err(Error::Internal(format!(
2040 "Descriptor at pk={} has wrong field_id: expected {:?}, \
2041 got {:?}",
2042 descriptor_pk, field_id, descriptor.field_id
2043 )));
2044 }
2045
2046 let mut actual_rows = 0;
2047 let mut actual_chunks = 0;
2048 let mut current_page_pk = descriptor.head_page_pk;
2049 while current_page_pk != 0 {
2050 let page_blob = self
2051 .pager
2052 .batch_get(&[BatchGet::Raw {
2053 key: current_page_pk,
2054 }])?
2055 .pop()
2056 .and_then(|r| match r {
2057 GetResult::Raw { bytes, .. } => Some(bytes),
2058 _ => None,
2059 })
2060 .ok_or_else(|| {
2061 Error::Internal(format!(
2062 "Descriptor page chain broken at pk={}",
2063 current_page_pk
2064 ))
2065 })?;
2066 let header = DescriptorPageHeader::from_le_bytes(
2067 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2068 );
2069 for i in 0..(header.entry_count as usize) {
2070 let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2071 let end = off + ChunkMetadata::DISK_SIZE;
2072 let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2073 actual_rows += meta.row_count;
2074 actual_chunks += 1;
2075 }
2076 current_page_pk = header.next_page_pk;
2077 }
2078
2079 if descriptor.total_row_count != actual_rows {
2080 return Err(Error::Internal(format!(
2081 "Row count mismatch for field {:?}: descriptor says {}, \
2082 actual is {}",
2083 field_id, descriptor.total_row_count, actual_rows
2084 )));
2085 }
2086 if descriptor.total_chunk_count != actual_chunks {
2087 return Err(Error::Internal(format!(
2088 "Chunk count mismatch for field {:?}: descriptor says {}, \
2089 actual is {}",
2090 field_id, descriptor.total_chunk_count, actual_chunks
2091 )));
2092 }
2093 }
2094 Ok(())
2095 }
2096
2097 pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2102 let catalog = self.catalog.read().unwrap();
2103 let mut all_stats = Vec::new();
2104
2105 for (&field_id, &descriptor_pk) in &catalog.map {
2106 let desc_blob = self
2107 .pager
2108 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2109 .pop()
2110 .and_then(|r| match r {
2111 GetResult::Raw { bytes, .. } => Some(bytes),
2112 _ => None,
2113 })
2114 .ok_or(Error::NotFound)?;
2115 let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2116
2117 let mut page_stats = Vec::new();
2118 let mut current_page_pk = descriptor.head_page_pk;
2119 while current_page_pk != 0 {
2120 let page_blob = self
2121 .pager
2122 .batch_get(&[BatchGet::Raw {
2123 key: current_page_pk,
2124 }])?
2125 .pop()
2126 .and_then(|r| match r {
2127 GetResult::Raw { bytes, .. } => Some(bytes),
2128 _ => None,
2129 })
2130 .ok_or(Error::NotFound)?;
2131 let header = DescriptorPageHeader::from_le_bytes(
2132 &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2133 );
2134 page_stats.push(DescriptorPageStats {
2135 page_pk: current_page_pk,
2136 entry_count: header.entry_count,
2137 page_size_bytes: page_blob.as_ref().len(),
2138 });
2139 current_page_pk = header.next_page_pk;
2140 }
2141
2142 all_stats.push(ColumnLayoutStats {
2143 field_id,
2144 total_rows: descriptor.total_row_count,
2145 total_chunks: descriptor.total_chunk_count,
2146 pages: page_stats,
2147 });
2148 }
2149 Ok(all_stats)
2150 }
2151}