llkv_column_map/store/
core.rs

1use super::*;
2use crate::serialization::{deserialize_array, serialize_array};
3use crate::store::catalog::ColumnCatalog;
4use crate::store::descriptor::{
5    ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
6};
7use crate::store::scan::filter::FilterDispatch;
8use crate::store::scan::{FilterPrimitive, FilterResult};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16    constants::CATALOG_ROOT_PKEY,
17    pager::{BatchGet, BatchPut, GetResult, Pager},
18    types::PhysicalKey,
19};
20use llkv_types::ids::{LogicalFieldId, RowId, TableId};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26/// Columnar storage engine for managing Arrow-based data.
27///
28/// `ColumnStore` provides the primary interface for persisting and retrieving columnar
29/// data using Apache Arrow [`RecordBatch`]es. It manages:
30///
31/// - Column descriptors and metadata (chunk locations, row counts, min/max values)
32/// - Data type caching for efficient schema queries
33/// - Index management (presence indexes, value indexes)
34/// - Integration with the [`Pager`] for persistent storage
35///
36/// # Namespaces
37///
38/// Columns are identified by [`LogicalFieldId`], which combines a namespace, table ID,
39/// and field ID. This prevents collisions between user data, row IDs, and MVCC metadata:
40///
41/// - `UserData`: Regular table columns
42/// - `RowIdShadow`: Internal row ID tracking
43/// - `TxnCreatedBy`: MVCC transaction creation timestamps
44/// - `TxnDeletedBy`: MVCC transaction deletion timestamps
45///
46/// # Thread Safety
47///
48/// `ColumnStore` is `Send + Sync` and can be safely shared across threads via `Arc`.
49/// Internal state (catalog, caches) uses `RwLock` for concurrent access.
50///
51/// # Test Harness Integration
52///
53/// - **SQLite `sqllogictest`**: Every upstream case exercises the column store, providing
54///   a compatibility baseline but not full parity with SQLite yet.
55/// - **DuckDB suites**: Early dialect-specific tests stress MVCC and typed casts, informing
56///   future work rather than proving comprehensive DuckDB coverage.
57/// - **Hardening mandate**: Failures uncovered by the suites result in storage fixes, not
58///   filtered tests, to preserve confidence in OLAP scenarios built atop this crate.
59pub struct ColumnStore<P: Pager> {
60    pub(crate) pager: Arc<P>,
61    pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
62    cfg: ColumnStoreConfig,
63    dtype_cache: DTypeCache<P>,
64    index_manager: IndexManager<P>,
65}
66
67impl<P> Clone for ColumnStore<P>
68where
69    P: Pager<Blob = EntryHandle> + Send + Sync,
70{
71    fn clone(&self) -> Self {
72        Self {
73            pager: Arc::clone(&self.pager),
74            catalog: Arc::clone(&self.catalog),
75            cfg: self.cfg.clone(),
76            dtype_cache: self.dtype_cache.clone(),
77            index_manager: self.index_manager.clone(),
78        }
79    }
80}
81
82impl<P> ColumnStore<P>
83where
84    P: Pager<Blob = EntryHandle> + Send + Sync,
85{
86    /// Opens or creates a `ColumnStore` using the provided pager.
87    ///
88    /// Loads the column catalog from the pager's root catalog key, or initializes
89    /// an empty catalog if none exists. The catalog maps [`LogicalFieldId`] to the
90    /// physical keys of column descriptors.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the pager fails to load the catalog or if deserialization fails.
95    pub fn open(pager: Arc<P>) -> Result<Self> {
96        let cfg = ColumnStoreConfig::default();
97        let catalog = match pager
98            .batch_get(&[BatchGet::Raw {
99                key: CATALOG_ROOT_PKEY,
100            }])?
101            .pop()
102        {
103            Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
104            _ => ColumnCatalog::default(),
105        };
106        let arc_catalog = Arc::new(RwLock::new(catalog));
107
108        let index_manager = IndexManager::new(Arc::clone(&pager));
109
110        Ok(Self {
111            pager: Arc::clone(&pager),
112            catalog: Arc::clone(&arc_catalog),
113            cfg,
114            dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
115            index_manager,
116        })
117    }
118
119    /// Return heuristics that guide upstream writers when sizing batches.
120    pub fn write_hints(&self) -> ColumnStoreWriteHints {
121        ColumnStoreWriteHints::from_config(&self.cfg)
122    }
123
124    /// Creates and persists an index for a column.
125    ///
126    /// Builds the specified index type for all existing data in the column and
127    /// persists it atomically. The index will be maintained automatically on subsequent
128    /// appends and updates.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if the column doesn't exist or if index creation fails.
133    pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
134        self.index_manager.register_index(self, field_id, kind)
135    }
136
137    /// Checks if a logical field is registered in the catalog.
138    pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
139        let catalog = self.catalog.read().unwrap();
140        catalog.map.contains_key(&field_id)
141    }
142
143    /// Removes a persisted index from a column.
144    ///
145    /// Atomically removes the index and frees associated storage. The column data
146    /// itself is not affected.
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if the column or index doesn't exist.
151    pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
152        self.index_manager.unregister_index(self, field_id, kind)
153    }
154
155    /// Returns the Arrow data type of a column.
156    ///
157    /// Returns the data type from cache if available, otherwise loads it from
158    /// the column descriptor and caches it for future queries.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
163    pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
164        if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
165            return Ok(dt);
166        }
167        self.dtype_cache.dtype_for_field(field_id)
168    }
169
170    /// Updates the data type of an existing column.
171    ///
172    /// This updates the descriptor's data type fingerprint and cache. Note that this
173    /// does NOT migrate existing data - the caller must ensure data compatibility.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the column doesn't exist or if the descriptor update fails.
178    pub fn update_data_type(
179        &self,
180        field_id: LogicalFieldId,
181        new_data_type: &DataType,
182    ) -> Result<()> {
183        // Get the descriptor physical key from catalog
184        let descriptor_pk = {
185            let catalog = self.catalog.read().unwrap();
186            *catalog.map.get(&field_id).ok_or_else(|| Error::NotFound)?
187        };
188
189        // Load the existing descriptor
190        let mut descriptor = match self
191            .pager
192            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
193            .pop()
194        {
195            Some(GetResult::Raw { bytes, .. }) => ColumnDescriptor::from_le_bytes(bytes.as_ref()),
196            _ => return Err(Error::NotFound),
197        };
198
199        // Update the data type fingerprint
200        let new_fingerprint = DTypeCache::<P>::dtype_fingerprint(new_data_type);
201        if new_fingerprint != 0 {
202            DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, new_fingerprint);
203        }
204
205        // Persist the updated descriptor
206        self.pager.batch_put(&[BatchPut::Raw {
207            key: descriptor_pk,
208            bytes: descriptor.to_le_bytes(),
209        }])?;
210
211        // Update the cache
212        self.dtype_cache.insert(field_id, new_data_type.clone());
213
214        Ok(())
215    }
216
217    /// Ensures that catalog entries and descriptors exist for a logical column.
218    ///
219    /// Primarily used when creating empty tables so that subsequent
220    /// operations (like `CREATE INDEX`) can resolve column metadata before any
221    /// data has been appended.
222    pub fn ensure_column_registered(
223        &self,
224        field_id: LogicalFieldId,
225        data_type: &DataType,
226    ) -> Result<()> {
227        let rid_field_id = rowid_fid(field_id);
228
229        let mut catalog_dirty = false;
230        let descriptor_pk;
231        let rid_descriptor_pk;
232
233        {
234            let mut catalog = self.catalog.write().unwrap();
235            descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
236                pk
237            } else {
238                let pk = self.pager.alloc_many(1)?[0];
239                catalog.map.insert(field_id, pk);
240                catalog_dirty = true;
241                pk
242            };
243
244            rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
245                pk
246            } else {
247                let pk = self.pager.alloc_many(1)?[0];
248                catalog.map.insert(rid_field_id, pk);
249                catalog_dirty = true;
250                pk
251            };
252        }
253
254        let mut puts: Vec<BatchPut> = Vec::new();
255
256        let data_descriptor_missing = self
257            .pager
258            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
259            .pop()
260            .and_then(|r| match r {
261                GetResult::Raw { bytes, .. } => Some(bytes),
262                _ => None,
263            })
264            .is_none();
265
266        if data_descriptor_missing {
267            let (mut descriptor, tail_page) =
268                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
269            let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
270            if fingerprint != 0 {
271                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
272            }
273            puts.push(BatchPut::Raw {
274                key: descriptor.tail_page_pk,
275                bytes: tail_page,
276            });
277            puts.push(BatchPut::Raw {
278                key: descriptor_pk,
279                bytes: descriptor.to_le_bytes(),
280            });
281        }
282
283        let rid_descriptor_missing = self
284            .pager
285            .batch_get(&[BatchGet::Raw {
286                key: rid_descriptor_pk,
287            }])?
288            .pop()
289            .and_then(|r| match r {
290                GetResult::Raw { bytes, .. } => Some(bytes),
291                _ => None,
292            })
293            .is_none();
294
295        if rid_descriptor_missing {
296            let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
297                Arc::clone(&self.pager),
298                rid_descriptor_pk,
299                rid_field_id,
300            )?;
301            let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
302            if fingerprint != 0 {
303                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
304            }
305            puts.push(BatchPut::Raw {
306                key: rid_descriptor.tail_page_pk,
307                bytes: tail_page,
308            });
309            puts.push(BatchPut::Raw {
310                key: rid_descriptor_pk,
311                bytes: rid_descriptor.to_le_bytes(),
312            });
313        }
314
315        self.dtype_cache.insert(field_id, data_type.clone());
316
317        if catalog_dirty {
318            let catalog_bytes = {
319                let catalog = self.catalog.read().unwrap();
320                catalog.to_bytes()
321            };
322            puts.push(BatchPut::Raw {
323                key: CATALOG_ROOT_PKEY,
324                bytes: catalog_bytes,
325            });
326        }
327
328        if !puts.is_empty() {
329            self.pager.batch_put(&puts)?;
330        }
331
332        Ok(())
333    }
334
335    /// Find all row IDs where a column satisfies a predicate.
336    ///
337    /// This evaluates the predicate against the column's data and returns a vector
338    /// of matching row IDs. Uses indexes and chunk metadata (min/max values) to
339    /// skip irrelevant data when possible.
340    ///
341    /// # Errors
342    ///
343    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
344    pub fn filter_row_ids<T>(
345        &self,
346        field_id: LogicalFieldId,
347        predicate: &Predicate<T::Value>,
348    ) -> Result<Vec<u64>>
349    where
350        T: FilterDispatch,
351    {
352        tracing::trace!(field=?field_id, "filter_row_ids start");
353        let res = T::run_filter(self, field_id, predicate);
354        if let Err(ref err) = res {
355            tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
356        } else {
357            tracing::trace!(field=?field_id, "filter_row_ids ok");
358        }
359        res
360    }
361
362    /// Evaluate a predicate against a column and return match metadata.
363    ///
364    /// This variant drives a primitive predicate over the column and returns a
365    /// [`FilterResult`] describing contiguous match regions. Callers can use the
366    /// result to build paginated scans or gather row identifiers.
367    ///
368    /// # Arguments
369    /// - `field_id`: Logical column to filter.
370    /// - `predicate`: Callable invoked on each value; should be cheap and free of
371    ///   side effects.
372    ///
373    /// # Errors
374    /// Returns an error if the column metadata cannot be loaded or if decoding a
375    /// chunk fails.
376    pub fn filter_matches<T, F>(
377        &self,
378        field_id: LogicalFieldId,
379        predicate: F,
380    ) -> Result<FilterResult>
381    where
382        T: FilterPrimitive,
383        F: FnMut(T::Native) -> bool,
384    {
385        T::run_filter_with_result(self, field_id, predicate)
386    }
387
388    /// List all indexes registered for a column.
389    ///
390    /// Returns the types of indexes (e.g., presence, value) that are currently
391    /// persisted for the specified column.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
396    pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
397        let catalog = self.catalog.read().unwrap();
398        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
399
400        let desc_blob = self
401            .pager
402            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
403            .pop()
404            .and_then(|r| match r {
405                GetResult::Raw { bytes, .. } => Some(bytes),
406                _ => None,
407            })
408            .ok_or(Error::NotFound)?;
409        let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
410
411        let kinds = descriptor.get_indexes()?;
412        Ok(kinds)
413    }
414
415    /// Get the total number of rows in a column.
416    ///
417    /// Returns the persisted row count from the column's descriptor. This value is
418    /// updated by append and delete operations.
419    ///
420    /// # Errors
421    ///
422    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
423    pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
424        let catalog = self.catalog.read().unwrap();
425        let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
426        drop(catalog);
427
428        let desc_blob = self
429            .pager
430            .batch_get(&[BatchGet::Raw { key: desc_pk }])?
431            .pop()
432            .and_then(|r| match r {
433                GetResult::Raw { bytes, .. } => Some(bytes),
434                _ => None,
435            })
436            .ok_or(Error::NotFound)?;
437
438        let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
439        Ok(desc.total_row_count)
440    }
441
442    /// Get the total number of rows in a table.
443    ///
444    /// This returns the maximum row count across all user-data columns in the table.
445    /// If the table has no persisted columns, returns `0`.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if column descriptors cannot be loaded.
450    pub fn total_rows_for_table(&self, table_id: llkv_types::ids::TableId) -> Result<u64> {
451        use llkv_types::ids::LogicalStorageNamespace;
452        // Acquire read lock on catalog and find any matching user-data field
453        let catalog = self.catalog.read().unwrap();
454        // Collect all user-data logical field ids for this table.
455        let candidates: Vec<LogicalFieldId> = catalog
456            .map
457            .keys()
458            .filter(|fid| {
459                fid.namespace() == LogicalStorageNamespace::UserData && fid.table_id() == table_id
460            })
461            .copied()
462            .collect();
463        drop(catalog);
464
465        if candidates.is_empty() {
466            return Ok(0);
467        }
468
469        // Return the maximum total_row_count across all user columns for the table.
470        let mut max_rows: u64 = 0;
471        for field in candidates {
472            let rows = self.total_rows_for_field(field)?;
473            if rows > max_rows {
474                max_rows = rows;
475            }
476        }
477        Ok(max_rows)
478    }
479
480    /// Get all user-data column IDs for a table.
481    ///
482    /// This returns the [`LogicalFieldId`]s of all persisted user columns (namespace
483    /// `UserData`) belonging to the specified table. MVCC and row ID columns are not
484    /// included.
485    pub fn user_field_ids_for_table(
486        &self,
487        table_id: llkv_types::ids::TableId,
488    ) -> Vec<LogicalFieldId> {
489        use llkv_types::ids::LogicalStorageNamespace;
490
491        let catalog = self.catalog.read().unwrap();
492        catalog
493            .map
494            .keys()
495            .filter(|fid| {
496                fid.namespace() == LogicalStorageNamespace::UserData && fid.table_id() == table_id
497            })
498            .copied()
499            .collect()
500    }
501
502    /// Remove a column from the column store catalog.
503    ///
504    /// This removes the column descriptor entry from the in-memory catalog and persists
505    /// the updated catalog. The actual column data pages remain in the pager but become
506    /// unreachable and will be garbage collected on compaction.
507    ///
508    /// # Arguments
509    ///
510    /// * `field_id` - The logical field ID of the column to remove.
511    ///
512    /// # Errors
513    ///
514    /// Returns an error if the column doesn't exist or if catalog persistence fails.
515    pub fn remove_column(&self, field_id: LogicalFieldId) -> Result<()> {
516        let rowid_field = rowid_fid(field_id);
517        let (descriptor_pk, rowid_descriptor_pk) = {
518            let catalog = self.catalog.read().unwrap();
519            let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
520            let rowid_descriptor_pk = catalog.map.get(&rowid_field).copied();
521            (descriptor_pk, rowid_descriptor_pk)
522        };
523
524        let mut free_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
525        self.collect_descriptor_allocations(descriptor_pk, &mut free_keys)?;
526        if let Some(rid_pk) = rowid_descriptor_pk {
527            self.collect_descriptor_allocations(rid_pk, &mut free_keys)?;
528        }
529
530        // Remove catalog entries for both the value and row-id columns.
531        let mut catalog = self.catalog.write().unwrap();
532        let removed_value = catalog.map.remove(&field_id).is_some();
533        let removed_rowid = catalog.map.remove(&rowid_field);
534        drop(catalog);
535
536        if !removed_value {
537            return Err(Error::NotFound);
538        }
539
540        // Persist updated catalog
541        let catalog_bytes = {
542            let catalog = self.catalog.read().unwrap();
543            catalog.to_bytes()
544        };
545
546        self.pager.batch_put(&[BatchPut::Raw {
547            key: CATALOG_ROOT_PKEY,
548            bytes: catalog_bytes,
549        }])?;
550
551        if !free_keys.is_empty() {
552            let mut frees: Vec<PhysicalKey> = free_keys.into_iter().collect();
553            frees.sort_unstable();
554            if let Err(err) = self.pager.free_many(&frees) {
555                // Attempt to restore catalog entries before bubbling the error up.
556                let mut catalog = self.catalog.write().unwrap();
557                catalog.map.insert(field_id, descriptor_pk);
558                if let Some(rid_pk) = rowid_descriptor_pk.or(removed_rowid) {
559                    catalog.map.insert(rowid_field, rid_pk);
560                }
561                drop(catalog);
562
563                let restore_bytes = {
564                    let catalog = self.catalog.read().unwrap();
565                    catalog.to_bytes()
566                };
567
568                let _ = self.pager.batch_put(&[BatchPut::Raw {
569                    key: CATALOG_ROOT_PKEY,
570                    bytes: restore_bytes,
571                }]);
572
573                return Err(err);
574            }
575        }
576
577        Ok(())
578    }
579
580    fn collect_descriptor_allocations(
581        &self,
582        descriptor_pk: PhysicalKey,
583        keys: &mut FxHashSet<PhysicalKey>,
584    ) -> Result<()> {
585        if descriptor_pk == 0 {
586            return Ok(());
587        }
588
589        let Some(GetResult::Raw { bytes, .. }) = self
590            .pager
591            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
592            .pop()
593        else {
594            return Ok(());
595        };
596
597        keys.insert(descriptor_pk);
598
599        let descriptor = ColumnDescriptor::from_le_bytes(bytes.as_ref());
600        let mut page_pk = descriptor.head_page_pk;
601
602        while page_pk != 0 {
603            let page_blob = self
604                .pager
605                .batch_get(&[BatchGet::Raw { key: page_pk }])?
606                .pop()
607                .and_then(|r| match r {
608                    GetResult::Raw { bytes, .. } => Some(bytes),
609                    _ => None,
610                })
611                .ok_or(Error::NotFound)?;
612            let page_bytes = page_blob.as_ref();
613            if page_bytes.len() < DescriptorPageHeader::DISK_SIZE {
614                return Err(Error::Internal("descriptor page truncated".into()));
615            }
616            let header =
617                DescriptorPageHeader::from_le_bytes(&page_bytes[..DescriptorPageHeader::DISK_SIZE]);
618            keys.insert(page_pk);
619
620            let mut offset = DescriptorPageHeader::DISK_SIZE;
621            for _ in 0..header.entry_count as usize {
622                let end = offset + ChunkMetadata::DISK_SIZE;
623                if end > page_bytes.len() {
624                    break;
625                }
626                let meta = ChunkMetadata::from_le_bytes(&page_bytes[offset..end]);
627                if meta.chunk_pk != 0 {
628                    keys.insert(meta.chunk_pk);
629                }
630                if meta.value_order_perm_pk != 0 {
631                    keys.insert(meta.value_order_perm_pk);
632                }
633                offset = end;
634            }
635
636            page_pk = header.next_page_pk;
637        }
638
639        Ok(())
640    }
641
642    /// Check whether a specific row ID exists in a column.
643    ///
644    /// This uses presence indexes and binary search when available for fast lookups.
645    /// If no presence index exists, it scans chunks and uses min/max metadata to
646    /// prune irrelevant data.
647    ///
648    /// # Errors
649    ///
650    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
651    pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
652        let rid_fid = rowid_fid(field_id);
653        let catalog = self.catalog.read().unwrap();
654        let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
655        let rid_desc_blob = self
656            .pager
657            .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
658            .pop()
659            .and_then(|r| match r {
660                GetResult::Raw { bytes, .. } => Some(bytes),
661                _ => None,
662            })
663            .ok_or(Error::NotFound)?;
664        let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
665        drop(catalog);
666
667        // Walk metas; prune by min/max when available.
668        for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
669            let meta = m?;
670            if meta.row_count == 0 {
671                continue;
672            }
673            if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
674                || row_id > meta.max_val_u64
675            {
676                continue;
677            }
678            // Fetch rid chunk and, if present, the presence perm
679            let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
680            if meta.value_order_perm_pk != 0 {
681                gets.push(BatchGet::Raw {
682                    key: meta.value_order_perm_pk,
683                });
684            }
685            let results = self.pager.batch_get(&gets)?;
686            let mut rid_blob: Option<EntryHandle> = None;
687            let mut perm_blob: Option<EntryHandle> = None;
688            for r in results {
689                if let GetResult::Raw { key, bytes } = r {
690                    if key == meta.chunk_pk {
691                        rid_blob = Some(bytes);
692                    } else if key == meta.value_order_perm_pk {
693                        perm_blob = Some(bytes);
694                    }
695                }
696            }
697            // If the rid blob for this chunk is missing, treat as absent and continue
698            let Some(rid_blob) = rid_blob else { continue };
699            let rid_any = deserialize_array(rid_blob)?;
700            let rids = rid_any
701                .as_any()
702                .downcast_ref::<UInt64Array>()
703                .ok_or_else(|| Error::Internal("rid downcast".into()))?;
704            if let Some(pblob) = perm_blob {
705                let perm_any = deserialize_array(pblob)?;
706                let perm = perm_any
707                    .as_any()
708                    .downcast_ref::<UInt32Array>()
709                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
710                // Binary search over sorted-by-perm view
711                let mut lo: isize = 0;
712                let mut hi: isize = (perm.len() as isize) - 1;
713                while lo <= hi {
714                    let mid = ((lo + hi) >> 1) as usize;
715                    let rid = rids.value(perm.value(mid) as usize);
716                    if rid == row_id {
717                        return Ok(true);
718                    } else if rid < row_id {
719                        lo = mid as isize + 1;
720                    } else {
721                        hi = mid as isize - 1;
722                    }
723                }
724            } else {
725                // Assume rid chunk is sorted ascending (common for appends/compaction) and binary search
726                let mut lo: isize = 0;
727                let mut hi: isize = (rids.len() as isize) - 1;
728                while lo <= hi {
729                    let mid = ((lo + hi) >> 1) as usize;
730                    let rid = rids.value(mid);
731                    if rid == row_id {
732                        return Ok(true);
733                    } else if rid < row_id {
734                        lo = mid as isize + 1;
735                    } else {
736                        hi = mid as isize - 1;
737                    }
738                }
739            }
740        }
741        Ok(false)
742    }
743
744    // NOTE: Row IDs must be provided explicitly today. Consider introducing an
745    // opt-in auto-increment mode once table-level schema metadata can enforce it.
746    /// Append a [`RecordBatch`] to the store.
747    ///
748    /// The batch must include a `rowid` column (type `UInt64`) that uniquely identifies
749    /// each row. Each other column must have `field_id` metadata mapping it to a
750    /// [`LogicalFieldId`].
751    ///
752    /// # Last-Write-Wins Updates
753    ///
754    /// If any row IDs in the batch already exist, they are updated in-place (overwritten)
755    /// rather than creating duplicates. This happens in a separate transaction before
756    /// appending new rows.
757    ///
758    /// # Row ID Ordering
759    ///
760    /// The batch is automatically sorted by `rowid` if not already sorted. This ensures
761    /// efficient metadata updates and naturally sorted shadow columns.
762    ///
763    /// # Table Separation
764    ///
765    /// Each batch should contain columns from only one table. To append to multiple
766    /// tables, call `append` separately for each table's batch (may be concurrent).
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if:
771    /// - The batch is missing the `rowid` column
772    /// - Column metadata is missing or invalid
773    /// - Storage operations fail
774    #[allow(unused_variables, unused_assignments)] // NOTE: Preserve variable hooks used during feature gating of presence indexes.
775    pub fn append(&self, batch: &RecordBatch) -> Result<()> {
776        tracing::trace!(
777            num_columns = batch.num_columns(),
778            num_rows = batch.num_rows(),
779            "ColumnStore::append BEGIN"
780        );
781
782        // --- PHASE 1: PRE-PROCESSING THE INCOMING BATCH ---
783        // The `append` logic relies on row IDs being processed in ascending order to handle
784        // metadata updates efficiently and to ensure the shadow row_id chunks are naturally sorted.
785        // This block checks if the incoming batch is already sorted by `row_id`. If not, it creates a
786        // new, sorted `RecordBatch` to work with for the rest of the function.
787        let working_batch: RecordBatch;
788        let batch_ref = {
789            let schema = batch.schema();
790            let row_id_idx = schema
791                .index_of(ROW_ID_COLUMN_NAME)
792                .map_err(|_| Error::Internal("row_id column required".into()))?;
793            let row_id_any = batch.column(row_id_idx).clone();
794            let row_id_arr = row_id_any
795                .as_any()
796                .downcast_ref::<UInt64Array>()
797                .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
798
799            // Manually check if the row_id column is sorted.
800            let mut is_sorted = true;
801            if !row_id_arr.is_empty() {
802                let mut last = row_id_arr.value(0);
803                for i in 1..row_id_arr.len() {
804                    let current = row_id_arr.value(i);
805                    if current < last {
806                        is_sorted = false;
807                        break;
808                    }
809                    last = current;
810                }
811            }
812
813            // If sorted, we can use the original batch directly.
814            // Otherwise, we compute a permutation and reorder all columns.
815            if is_sorted {
816                batch
817            } else {
818                let sort_col = SortColumn {
819                    values: row_id_any,
820                    options: None,
821                };
822                let idx = lexsort_to_indices(&[sort_col], None)?;
823                let perm = idx
824                    .as_any()
825                    .downcast_ref::<UInt32Array>()
826                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
827                let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
828                for i in 0..batch.num_columns() {
829                    cols.push(compute::take(batch.column(i), perm, None)?);
830                }
831                working_batch = RecordBatch::try_new(schema.clone(), cols)
832                    .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
833                &working_batch
834            }
835        };
836
837        tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
838
839        // --- PHASE 2: LWW REWRITE AND APPEND PER COLUMN ---
840        // We iterate through each column to perform rewrites and appends.
841        // This handles sparse updates correctly: if a row is rewritten in one column
842        // but not another, it will be appended to the column where it was missing.
843
844        let schema = batch_ref.schema();
845        let row_id_idx = schema
846            .index_of(ROW_ID_COLUMN_NAME)
847            .map_err(|_| Error::Internal("row_id column required".into()))?;
848
849        // Create a quick lookup map of incoming row IDs to their positions in the batch.
850        let row_id_arr = batch_ref
851            .column(row_id_idx)
852            .as_any()
853            .downcast_ref::<UInt64Array>()
854            .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
855        let mut incoming_ids_map = FxHashMap::default();
856        incoming_ids_map.reserve(row_id_arr.len());
857        for i in 0..row_id_arr.len() {
858            incoming_ids_map.insert(row_id_arr.value(i), i);
859        }
860
861        let mut catalog_dirty = false;
862        let mut all_puts: Vec<BatchPut> = Vec::new();
863
864        // Iterate through each column in the batch (except row_id).
865        for i in 0..batch_ref.num_columns() {
866            if i == row_id_idx {
867                continue;
868            }
869            let field = schema.field(i);
870            let field_id =
871                if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
872                    field_id_str
873                        .parse::<u64>()
874                        .map(LogicalFieldId::from)
875                        .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?
876                } else {
877                    continue;
878                };
879
880            // 1. LWW Rewrite
881            let mut catalog_lock = self.catalog.write().unwrap();
882            let rewritten_ids = self.lww_rewrite_for_field(
883                &mut catalog_lock,
884                field_id,
885                &incoming_ids_map,
886                batch_ref.column(i),
887                batch_ref.column(row_id_idx),
888                &mut all_puts,
889            )?;
890
891            // Ensure catalog entries exist for append
892            let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
893                let pk1 = *catalog_lock.map.entry(field_id).or_insert_with(|| {
894                    catalog_dirty = true;
895                    self.pager.alloc_many(1).unwrap()[0]
896                });
897                let r_fid = rowid_fid(field_id);
898                let pk2 = *catalog_lock.map.entry(r_fid).or_insert_with(|| {
899                    catalog_dirty = true;
900                    self.pager.alloc_many(1).unwrap()[0]
901                });
902                (pk1, pk2, r_fid)
903            };
904            drop(catalog_lock);
905
906            // 2. Filter for Append
907            let array = batch_ref.column(i);
908            let (array_clean, rids_clean) = if rewritten_ids.is_empty() {
909                // No rewrites, append everything (filtering nulls)
910                if array.null_count() == 0 {
911                    (array.clone(), batch_ref.column(row_id_idx).clone())
912                } else {
913                    let keep =
914                        BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
915                    (
916                        compute::filter(array, &keep)?,
917                        compute::filter(batch_ref.column(row_id_idx), &keep)?,
918                    )
919                }
920            } else {
921                // Filter out rewritten rows AND nulls
922                let keep = BooleanArray::from_iter((0..array.len()).map(|j| {
923                    let rid = row_id_arr.value(j);
924                    Some(!rewritten_ids.contains(&rid) && !array.is_null(j))
925                }));
926                (
927                    compute::filter(array, &keep)?,
928                    compute::filter(batch_ref.column(row_id_idx), &keep)?,
929                )
930            };
931
932            if array_clean.is_empty() {
933                // Even if we have no data to append, we must ensure the column exists
934                // so that subsequent schema queries (e.g. data_type()) succeed.
935                self.ensure_column_registered(field_id, field.data_type())?;
936                continue;
937            }
938
939            // 3. Append (Phase 4 logic)
940            self.dtype_cache.insert(field_id, field.data_type().clone());
941
942            // Load the descriptors and their tail metadata pages into memory.
943            // If they don't exist, `load_or_create` will initialize new ones.
944            let (mut data_descriptor, mut data_tail_page) =
945                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
946                    .map_err(|e| {
947                        tracing::error!(?field_id, descriptor_pk, error = ?e, "append: load_or_create failed for data descriptor");
948                        e
949                    })?;
950            let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
951                Arc::clone(&self.pager),
952                rid_descriptor_pk,
953                rid_fid,
954            )
955            .map_err(|e| {
956                tracing::error!(?rid_fid, rid_descriptor_pk, error = ?e, "append: load_or_create failed for rid descriptor");
957                e
958            })?;
959
960            // Logically register the Presence index on the main data descriptor. This ensures
961            // that even if no physical index chunks are created (because data arrived sorted),
962            // the system knows a Presence index is conceptually active for this column.
963            self.index_manager
964                .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
965
966            // Split the data to be appended into chunks of a target size.
967            let slices = split_to_target_bytes(
968                &array_clean,
969                TARGET_CHUNK_BYTES,
970                self.cfg.varwidth_fallback_rows_per_slice,
971            );
972            let mut row_off = 0usize;
973
974            // Loop through each new slice to create and stage its chunks.
975            for s in slices {
976                let rows = s.len();
977                // Create and stage the data chunk.
978                let data_pk = self.pager.alloc_many(1)?[0];
979                let s_norm = zero_offset(&s);
980                let data_bytes = serialize_array(s_norm.as_ref())?;
981                all_puts.push(BatchPut::Raw {
982                    key: data_pk,
983                    bytes: data_bytes,
984                });
985
986                // Create and stage the corresponding row_id chunk.
987                let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
988                let rid_norm = zero_offset(&rid_slice);
989                let rid_pk = self.pager.alloc_many(1)?[0];
990                let rid_bytes = serialize_array(rid_norm.as_ref())?;
991                all_puts.push(BatchPut::Raw {
992                    key: rid_pk,
993                    bytes: rid_bytes,
994                });
995
996                // Compute min/max for the row_id chunk to enable pruning during scans.
997                let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
998                let (min, max) = if !rids_for_meta.is_empty() {
999                    let mut min_val = rids_for_meta.value(0);
1000                    let mut max_val = rids_for_meta.value(0);
1001                    for i in 1..rids_for_meta.len() {
1002                        let v = rids_for_meta.value(i);
1003                        if v < min_val {
1004                            min_val = v;
1005                        }
1006                        if v > max_val {
1007                            max_val = v;
1008                        }
1009                    }
1010                    (min_val, max_val)
1011                } else {
1012                    (0, 0)
1013                };
1014
1015                // Create the initial metadata for both chunks.
1016                // The `value_order_perm_pk` is initialized to 0 (None).
1017                let mut data_meta = ChunkMetadata {
1018                    chunk_pk: data_pk,
1019                    row_count: rows as u64,
1020                    serialized_bytes: s_norm.get_array_memory_size() as u64,
1021                    max_val_u64: u64::MAX,
1022                    ..Default::default()
1023                };
1024                let mut rid_meta = ChunkMetadata {
1025                    chunk_pk: rid_pk,
1026                    row_count: rows as u64,
1027                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
1028                    min_val_u64: min,
1029                    max_val_u64: max,
1030                    ..Default::default()
1031                };
1032
1033                // **GENERIC INDEX UPDATE DISPATCH**
1034                // This is the single, index-agnostic call. The IndexManager will look up all
1035                // active indexes for this column (e.g., Presence, Sort) and call their respective
1036                // `stage_update_for_new_chunk` methods. This is where the physical index data
1037                // (like permutation blobs) is created and staged.
1038                self.index_manager.stage_updates_for_new_chunk(
1039                    field_id,
1040                    &data_descriptor,
1041                    &s_norm,
1042                    &rid_norm,
1043                    &mut data_meta,
1044                    &mut rid_meta,
1045                    &mut all_puts,
1046                )?;
1047
1048                // Append the (potentially modified) metadata to their respective descriptor chains.
1049                self.append_meta_in_loop(
1050                    &mut data_descriptor,
1051                    &mut data_tail_page,
1052                    data_meta,
1053                    &mut all_puts,
1054                )?;
1055                self.append_meta_in_loop(
1056                    &mut rid_descriptor,
1057                    &mut rid_tail_page,
1058                    rid_meta,
1059                    &mut all_puts,
1060                )?;
1061                row_off += rows;
1062            }
1063
1064            // After processing all slices, stage the final writes for the updated tail pages
1065            // and the root descriptor objects themselves.
1066            all_puts.push(BatchPut::Raw {
1067                key: data_descriptor.tail_page_pk,
1068                bytes: data_tail_page,
1069            });
1070            all_puts.push(BatchPut::Raw {
1071                key: descriptor_pk,
1072                bytes: data_descriptor.to_le_bytes(),
1073            });
1074            all_puts.push(BatchPut::Raw {
1075                key: rid_descriptor.tail_page_pk,
1076                bytes: rid_tail_page,
1077            });
1078            all_puts.push(BatchPut::Raw {
1079                key: rid_descriptor_pk,
1080                bytes: rid_descriptor.to_le_bytes(),
1081            });
1082        }
1083
1084        // --- PHASE 3: FINAL ATOMIC COMMIT ---
1085        if catalog_dirty {
1086            let catalog = self.catalog.read().unwrap();
1087            all_puts.push(BatchPut::Raw {
1088                key: CATALOG_ROOT_PKEY,
1089                bytes: catalog.to_bytes(),
1090            });
1091        }
1092
1093        // Commit all staged puts (new data chunks, new row_id chunks, new index permutations,
1094        // updated descriptor pages, updated root descriptors, and the updated catalog)
1095        // in a single atomic operation.
1096        if !all_puts.is_empty() {
1097            self.pager.batch_put(&all_puts)?;
1098        }
1099        tracing::trace!("ColumnStore::append END - success");
1100        Ok(())
1101    }
1102
1103    fn lww_rewrite_for_field(
1104        &self,
1105        catalog: &mut ColumnCatalog,
1106        field_id: LogicalFieldId,
1107        incoming_ids_map: &FxHashMap<u64, usize>,
1108        incoming_data: &ArrayRef,
1109        incoming_row_ids: &ArrayRef,
1110        puts: &mut Vec<BatchPut>,
1111    ) -> Result<FxHashSet<u64>> {
1112        use crate::store::descriptor::DescriptorIterator;
1113        use crate::store::ingest::ChunkEdit;
1114
1115        // Fast exit if nothing to rewrite.
1116        if incoming_ids_map.is_empty() {
1117            return Ok(FxHashSet::default());
1118        }
1119        let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
1120
1121        // Resolve descriptors for data and row_id columns.
1122        let desc_pk_data = match catalog.map.get(&field_id) {
1123            Some(pk) => *pk,
1124            None => return Ok(FxHashSet::default()),
1125        };
1126        let rid_fid = rowid_fid(field_id);
1127        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1128            Some(pk) => *pk,
1129            None => return Ok(FxHashSet::default()),
1130        };
1131
1132        // Batch fetch both descriptors.
1133        let gets = vec![
1134            BatchGet::Raw { key: desc_pk_data },
1135            BatchGet::Raw { key: desc_pk_rid },
1136        ];
1137        let results = self.pager.batch_get(&gets)?;
1138        let mut blobs_by_pk = FxHashMap::default();
1139        for r in results {
1140            if let GetResult::Raw { key, bytes } = r {
1141                blobs_by_pk.insert(key, bytes);
1142            }
1143        }
1144
1145        let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
1146            tracing::error!(
1147                ?field_id,
1148                desc_pk_data,
1149                "lww_rewrite: data descriptor blob not found in pager"
1150            );
1151            Error::NotFound
1152        })?;
1153        let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
1154
1155        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
1156            tracing::error!(
1157                ?rid_fid,
1158                desc_pk_rid,
1159                "lww_rewrite: rid descriptor blob not found in pager"
1160            );
1161            Error::NotFound
1162        })?;
1163        let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1164
1165        tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
1166
1167        // Collect chunk metadata.
1168        let mut metas_data: Vec<ChunkMetadata> = Vec::new();
1169        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1170        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
1171            metas_data.push(m.map_err(|e| {
1172                tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1173                e
1174            })?);
1175        }
1176        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1177            metas_rid.push(m.map_err(|e| {
1178                tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1179                e
1180            })?);
1181        }
1182
1183        tracing::trace!(
1184            ?field_id,
1185            data_chunks = metas_data.len(),
1186            rid_chunks = metas_rid.len(),
1187            "lww_rewrite: chunk metadata collected"
1188        );
1189
1190        // Classify incoming rows: delete vs upsert.
1191        let rid_in = incoming_row_ids
1192            .as_any()
1193            .downcast_ref::<UInt64Array>()
1194            .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1195        let mut ids_to_delete = FxHashSet::default();
1196        let mut ids_to_upsert = FxHashSet::default();
1197        for i in 0..rid_in.len() {
1198            let rid = rid_in.value(i);
1199            if incoming_data.is_null(i) {
1200                ids_to_delete.insert(rid);
1201            } else {
1202                ids_to_upsert.insert(rid);
1203            }
1204        }
1205
1206        // Compute incoming row_id range once for optimization checks
1207        let (incoming_min, incoming_max) = if !rid_in.is_empty() {
1208            let mut min_val = rid_in.value(0);
1209            let mut max_val = rid_in.value(0);
1210            for i in 1..rid_in.len() {
1211                let rid = rid_in.value(i);
1212                if rid < min_val {
1213                    min_val = rid;
1214                }
1215                if rid > max_val {
1216                    max_val = rid;
1217                }
1218            }
1219            (min_val, max_val)
1220        } else {
1221            (0, 0)
1222        };
1223
1224        // OPTIMIZATION: Early exit if incoming row_id range doesn't overlap with existing chunks.
1225        // This makes append-only workloads O(1) instead of O(n).
1226        let n = metas_data.len().min(metas_rid.len());
1227        if n > 0 && !rid_in.is_empty() {
1228            // Check if incoming range overlaps with ANY existing chunk
1229            let mut has_overlap = false;
1230            for meta_rid in metas_rid.iter().take(n) {
1231                // Check for range overlap: [a, b] overlaps [c, d] if a <= d AND c <= b
1232                if incoming_min <= meta_rid.max_val_u64 && meta_rid.min_val_u64 <= incoming_max {
1233                    has_overlap = true;
1234                    break;
1235                }
1236            }
1237
1238            if !has_overlap {
1239                // No overlap means these are all new rows - skip expensive chunk scanning
1240                tracing::trace!(
1241                    ?field_id,
1242                    incoming_min,
1243                    incoming_max,
1244                    "lww_rewrite: no overlap detected, skipping chunk scan (O(1) fast path)"
1245                );
1246                return Ok(FxHashSet::default());
1247            }
1248        }
1249
1250        // Scan row_id chunks to find hits, bucketed by chunk index.
1251        let mut rewritten_ids = FxHashSet::default();
1252        let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1253        let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1254
1255        if n > 0 {
1256            // Filter chunks by row_id range overlap before fetching
1257            let mut chunks_to_fetch = Vec::new();
1258            for (idx, meta_rid) in metas_rid.iter().enumerate().take(n) {
1259                if incoming_min <= meta_rid.max_val_u64 && meta_rid.min_val_u64 <= incoming_max {
1260                    chunks_to_fetch.push(idx);
1261                }
1262            }
1263
1264            if chunks_to_fetch.is_empty() {
1265                return Ok(FxHashSet::default());
1266            }
1267
1268            // Fetch only overlapping row_id chunks to locate matches.
1269            let mut gets_rid = Vec::with_capacity(chunks_to_fetch.len());
1270            for &idx in &chunks_to_fetch {
1271                gets_rid.push(BatchGet::Raw {
1272                    key: metas_rid[idx].chunk_pk,
1273                });
1274            }
1275            let rid_results = self.pager.batch_get(&gets_rid)?;
1276            let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1277            for r in rid_results {
1278                if let GetResult::Raw { key, bytes } = r {
1279                    rid_blobs.insert(key, bytes);
1280                }
1281            }
1282
1283            for &idx in &chunks_to_fetch {
1284                let meta_rid = &metas_rid[idx];
1285                if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1286                    let rid_arr_any = deserialize_array(rid_blob.clone())?;
1287                    let rid_arr = rid_arr_any
1288                        .as_any()
1289                        .downcast_ref::<UInt64Array>()
1290                        .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1291                    for j in 0..rid_arr.len() {
1292                        let rid = rid_arr.value(j);
1293                        if incoming_ids.contains(&rid) {
1294                            if ids_to_delete.contains(&rid) {
1295                                hit_del.entry(idx).or_default().push(rid);
1296                            } else if ids_to_upsert.contains(&rid) {
1297                                hit_up.entry(idx).or_default().push(rid);
1298                            }
1299                            rewritten_ids.insert(rid);
1300                        }
1301                    }
1302                }
1303            }
1304        }
1305
1306        if hit_up.is_empty() && hit_del.is_empty() {
1307            return Ok(rewritten_ids);
1308        }
1309
1310        // Batch fetch data+rid blobs for all chunks with hits.
1311        let mut hit_set = FxHashSet::default();
1312        hit_set.extend(hit_up.keys().copied());
1313        hit_set.extend(hit_del.keys().copied());
1314        let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1315
1316        let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1317        for &i in &hit_idxs {
1318            gets.push(BatchGet::Raw {
1319                key: metas_data[i].chunk_pk,
1320            });
1321            gets.push(BatchGet::Raw {
1322                key: metas_rid[i].chunk_pk,
1323            });
1324        }
1325        let results = self.pager.batch_get(&gets)?;
1326        let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1327        for r in results {
1328            if let GetResult::Raw { key, bytes } = r {
1329                blob_map.insert(key, bytes);
1330            }
1331        }
1332
1333        // Apply per-chunk edits and stage writebacks.
1334        for i in hit_idxs {
1335            let old_data_arr =
1336                deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1337            let old_rid_arr_any =
1338                deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1339            let old_rid_arr = old_rid_arr_any
1340                .as_any()
1341                .downcast_ref::<UInt64Array>()
1342                .unwrap();
1343
1344            let up_vec = hit_up.remove(&i).unwrap_or_default();
1345            let del_vec = hit_del.remove(&i).unwrap_or_default();
1346
1347            // Centralized LWW edit: builds keep mask and inject tails.
1348            let edit = ChunkEdit::from_lww_upsert(
1349                old_rid_arr,
1350                &up_vec,
1351                &del_vec,
1352                incoming_data,
1353                incoming_row_ids,
1354                incoming_ids_map,
1355            )?;
1356
1357            let (new_data_arr, new_rid_arr) =
1358                ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1359
1360            // Stage data writeback.
1361            let data_bytes = serialize_array(&new_data_arr)?;
1362            puts.push(BatchPut::Raw {
1363                key: metas_data[i].chunk_pk,
1364                bytes: data_bytes,
1365            });
1366            metas_data[i].row_count = new_data_arr.len() as u64;
1367            metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1368
1369            // Stage row_id writeback.
1370            if let Some(rarr) = new_rid_arr {
1371                let rid_bytes = serialize_array(&rarr)?;
1372                puts.push(BatchPut::Raw {
1373                    key: metas_rid[i].chunk_pk,
1374                    bytes: rid_bytes,
1375                });
1376                metas_rid[i].row_count = rarr.len() as u64;
1377                metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1378            }
1379
1380            // Refresh permutation if present.
1381            if metas_data[i].value_order_perm_pk != 0 {
1382                let sort_col = SortColumn {
1383                    values: new_data_arr,
1384                    options: None,
1385                };
1386                let idx = lexsort_to_indices(&[sort_col], None)?;
1387                let perm_bytes = serialize_array(&idx)?;
1388                puts.push(BatchPut::Raw {
1389                    key: metas_data[i].value_order_perm_pk,
1390                    bytes: perm_bytes,
1391                });
1392            }
1393        }
1394
1395        // Rewrite descriptor chains/totals for both columns.
1396        descriptor_data.rewrite_pages(
1397            Arc::clone(&self.pager),
1398            desc_pk_data,
1399            &mut metas_data,
1400            puts,
1401        )?;
1402        descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1403
1404        Ok(rewritten_ids)
1405    }
1406
1407    fn stage_delete_rows_for_field(
1408        &self,
1409        field_id: LogicalFieldId,
1410        rows_to_delete: &[RowId],
1411        staged_puts: &mut Vec<BatchPut>,
1412    ) -> Result<bool> {
1413        tracing::warn!(
1414            field_id = ?field_id,
1415            rows = rows_to_delete.len(),
1416            "delete_rows stage_delete_rows_for_field: start"
1417        );
1418        use crate::store::descriptor::DescriptorIterator;
1419        use crate::store::ingest::ChunkEdit;
1420
1421        if rows_to_delete.is_empty() {
1422            return Ok(false);
1423        }
1424
1425        // Stream and validate ascending, unique positions.
1426        let mut del_iter = rows_to_delete.iter().copied();
1427        let mut cur_del = del_iter.next();
1428        let mut last_seen: Option<u64> = cur_del;
1429
1430        // Lookup descriptors (data and optional row_id).
1431        let catalog = self.catalog.read().unwrap();
1432        let desc_pk = match catalog.map.get(&field_id) {
1433            Some(pk) => *pk,
1434            None => {
1435                tracing::trace!(
1436                    field_id = ?field_id,
1437                    "delete_rows stage_delete_rows_for_field: data descriptor missing"
1438                );
1439                return Err(Error::NotFound);
1440            }
1441        };
1442        let rid_fid = rowid_fid(field_id);
1443        let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1444        tracing::warn!(
1445            field_id = ?field_id,
1446            desc_pk,
1447            desc_pk_rid = ?desc_pk_rid,
1448            "delete_rows stage_delete_rows_for_field: descriptor keys"
1449        );
1450
1451        // Batch fetch descriptor blobs up front.
1452        let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1453        if let Some(pk) = desc_pk_rid {
1454            gets.push(BatchGet::Raw { key: pk });
1455        }
1456        let results = match self.pager.batch_get(&gets) {
1457            Ok(res) => res,
1458            Err(err) => {
1459                tracing::trace!(
1460                    field_id = ?field_id,
1461                    error = ?err,
1462                    "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1463                );
1464                return Err(err);
1465            }
1466        };
1467        let mut blobs_by_pk = FxHashMap::default();
1468        for res in results {
1469            if let GetResult::Raw { key, bytes } = res {
1470                blobs_by_pk.insert(key, bytes);
1471            }
1472        }
1473
1474        tracing::warn!(
1475            field_id = ?field_id,
1476            desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1477            rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1478            "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1479        );
1480
1481        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1482            tracing::trace!(
1483                field_id = ?field_id,
1484                desc_pk,
1485                "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1486            );
1487            Error::Internal(format!(
1488                "descriptor pk={} missing during delete_rows for field {:?}",
1489                desc_pk, field_id
1490            ))
1491        })?;
1492        let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1493
1494        // Build metas for data column.
1495        let mut metas: Vec<ChunkMetadata> = Vec::new();
1496        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1497            metas.push(m?);
1498        }
1499        if metas.is_empty() {
1500            drop(catalog);
1501            return Ok(false);
1502        }
1503
1504        // Optionally mirror metas for row_id column.
1505        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1506        let mut descriptor_rid: Option<ColumnDescriptor> = None;
1507        tracing::warn!(
1508            field_id = ?field_id,
1509            metas_len = metas.len(),
1510            desc_pk_rid = ?desc_pk_rid,
1511            "delete_rows stage_delete_rows_for_field: data metas loaded"
1512        );
1513        if let Some(pk_rid) = desc_pk_rid
1514            && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1515        {
1516            let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1517            for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1518                metas_rid.push(m?);
1519            }
1520            descriptor_rid = Some(d_rid);
1521        }
1522
1523        tracing::warn!(
1524            field_id = ?field_id,
1525            metas_rid_len = metas_rid.len(),
1526            "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1527        );
1528
1529        let mut cum_rows = 0u64;
1530        let mut any_changed = false;
1531
1532        for (i, meta) in metas.iter_mut().enumerate() {
1533            let start_u64 = cum_rows;
1534            let end_u64 = start_u64 + meta.row_count;
1535
1536            // Advance deletes into this chunk window [start, end).
1537            while let Some(d) = cur_del {
1538                if d < start_u64
1539                    && let Some(prev) = last_seen
1540                {
1541                    if d < prev {
1542                        return Err(Error::Internal(
1543                            "rows_to_delete must be ascending/unique".into(),
1544                        ));
1545                    }
1546
1547                    last_seen = Some(d);
1548                    cur_del = del_iter.next();
1549                } else {
1550                    break;
1551                }
1552            }
1553
1554            // Collect local delete indices.
1555            let rows = meta.row_count as usize;
1556            let mut del_local: FxHashSet<usize> = FxHashSet::default();
1557            while let Some(d) = cur_del {
1558                if d >= end_u64 {
1559                    break;
1560                }
1561                del_local.insert((d - start_u64) as usize);
1562                last_seen = Some(d);
1563                cur_del = del_iter.next();
1564            }
1565
1566            if del_local.is_empty() {
1567                cum_rows = end_u64;
1568                continue;
1569            }
1570
1571            // Batch get chunk blobs (data and optional row_id).
1572            let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1573            if let Some(rm) = metas_rid.get(i) {
1574                chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1575            }
1576            let chunk_results = match self.pager.batch_get(&chunk_gets) {
1577                Ok(res) => res,
1578                Err(err) => {
1579                    tracing::trace!(
1580                        field_id = ?field_id,
1581                        chunk_pk = meta.chunk_pk,
1582                        error = ?err,
1583                        "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1584                    );
1585                    return Err(err);
1586                }
1587            };
1588            let mut chunk_blobs = FxHashMap::default();
1589            for res in chunk_results {
1590                if let GetResult::Raw { key, bytes } = res {
1591                    chunk_blobs.insert(key, bytes);
1592                }
1593            }
1594
1595            tracing::warn!(
1596                field_id = ?field_id,
1597                chunk_pk = meta.chunk_pk,
1598                rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1599                data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1600                rid_found = metas_rid
1601                    .get(i)
1602                    .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1603                "delete_rows stage_delete_rows_for_field: chunk fetch status"
1604            );
1605
1606            let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1607                Some(bytes) => bytes,
1608                None => {
1609                    tracing::trace!(
1610                        field_id = ?field_id,
1611                        chunk_pk = meta.chunk_pk,
1612                        "delete_rows stage_delete_rows_for_field: chunk missing"
1613                    );
1614                    return Err(Error::NotFound);
1615                }
1616            };
1617            let data_arr = deserialize_array(data_blob)?;
1618
1619            let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1620                let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1621                    Some(bytes) => bytes,
1622                    None => {
1623                        tracing::trace!(
1624                            field_id = ?field_id,
1625                            rowid_chunk_pk = rm.chunk_pk,
1626                            "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1627                        );
1628                        return Err(Error::NotFound);
1629                    }
1630                };
1631                Some(deserialize_array(rid_blob)?)
1632            } else {
1633                None
1634            };
1635
1636            // *** Wired: build edit via ingest helper.
1637            let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1638
1639            // Apply edit (pure array ops).
1640            let (new_data_arr, new_rid_arr) =
1641                ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1642
1643            // Write back data.
1644            let data_bytes = serialize_array(&new_data_arr)?;
1645            staged_puts.push(BatchPut::Raw {
1646                key: meta.chunk_pk,
1647                bytes: data_bytes,
1648            });
1649            meta.row_count = new_data_arr.len() as u64;
1650            meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1651
1652            // Write back row_ids if present.
1653            if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1654                let rm = metas_rid.get_mut(i).unwrap();
1655                let rid_bytes = serialize_array(&rids)?;
1656                staged_puts.push(BatchPut::Raw {
1657                    key: rm.chunk_pk,
1658                    bytes: rid_bytes,
1659                });
1660                rm.row_count = rids.len() as u64;
1661                rm.serialized_bytes = rids.get_array_memory_size() as u64;
1662            }
1663
1664            // Refresh permutation if this chunk has one.
1665            if meta.value_order_perm_pk != 0 {
1666                let sort_column = SortColumn {
1667                    values: new_data_arr,
1668                    options: None,
1669                };
1670                let indices = lexsort_to_indices(&[sort_column], None)?;
1671                let perm_bytes = serialize_array(&indices)?;
1672                staged_puts.push(BatchPut::Raw {
1673                    key: meta.value_order_perm_pk,
1674                    bytes: perm_bytes,
1675                });
1676            }
1677
1678            cum_rows = end_u64;
1679            any_changed = true;
1680        }
1681
1682        // Rewrite descriptor chains/totals and commit.
1683        descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1684        if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1685            rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1686        }
1687        drop(catalog);
1688        tracing::trace!(
1689            field_id = ?field_id,
1690            changed = any_changed,
1691            "delete_rows stage_delete_rows_for_field: finished stage"
1692        );
1693        Ok(any_changed)
1694    }
1695
1696    /// Delete row positions for one or more logical fields in a single atomic batch.
1697    ///
1698    /// The same set of global row positions is applied to every field in
1699    /// `fields`. All staged metadata and chunk updates are committed in a
1700    /// single pager batch.
1701    pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1702        if fields.is_empty() || rows_to_delete.is_empty() {
1703            return Ok(());
1704        }
1705
1706        let mut puts = Vec::new();
1707        let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1708        let mut table_id: Option<TableId> = None;
1709
1710        tracing::warn!(
1711            fields = fields.len(),
1712            rows = rows_to_delete.len(),
1713            "delete_rows begin"
1714        );
1715        for field_id in fields {
1716            tracing::warn!(field = ?field_id, "delete_rows iter field");
1717            if let Some(expected) = table_id {
1718                if field_id.table_id() != expected {
1719                    return Err(Error::InvalidArgumentError(
1720                        "delete_rows requires fields from the same table".into(),
1721                    ));
1722                }
1723            } else {
1724                table_id = Some(field_id.table_id());
1725            }
1726
1727            if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1728                touched.insert(*field_id);
1729            }
1730        }
1731
1732        if puts.is_empty() {
1733            return Ok(());
1734        }
1735
1736        self.pager.batch_put(&puts)?;
1737
1738        tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1739
1740        for field_id in touched {
1741            self.compact_field_bounded(field_id)?;
1742        }
1743        tracing::warn!("delete_rows complete");
1744        Ok(())
1745    }
1746
1747    // TODO: Move to descriptor module?
1748    // NOTE: This helper mutates descriptor metadata in place because it needs
1749    // access to the pager and catalog locks owned by `ColumnStore`.
1750    /// Write a descriptor chain from a meta slice. Reuses first page when
1751    /// possible. Frees surplus pages via `frees`.
1752    pub(crate) fn write_descriptor_chain(
1753        &self,
1754        descriptor_pk: PhysicalKey,
1755        descriptor: &mut ColumnDescriptor,
1756        new_metas: &[ChunkMetadata],
1757        puts: &mut Vec<BatchPut>,
1758        frees: &mut Vec<PhysicalKey>,
1759    ) -> Result<()> {
1760        // Collect existing page chain.
1761        let mut old_pages = Vec::new();
1762        let mut pk = descriptor.head_page_pk;
1763        while pk != 0 {
1764            let page_blob = self
1765                .pager
1766                .batch_get(&[BatchGet::Raw { key: pk }])?
1767                .pop()
1768                .and_then(|res| match res {
1769                    GetResult::Raw { bytes, .. } => Some(bytes),
1770                    _ => None,
1771                })
1772                .ok_or(Error::NotFound)?;
1773            let header = DescriptorPageHeader::from_le_bytes(
1774                &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1775            );
1776            old_pages.push(pk);
1777            pk = header.next_page_pk;
1778        }
1779
1780        // Required pages for new metas.
1781        let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1782        let need_pages = if new_metas.is_empty() {
1783            0
1784        } else {
1785            new_metas.len().div_ceil(per)
1786        };
1787        // If empty: free everything and clear counters.
1788        if need_pages == 0 {
1789            frees.extend(old_pages.iter().copied());
1790
1791            // Setting both page pointers to 0 indicates an empty descriptor state.
1792            // This signals that the descriptor needs reinitialization on next access,
1793            // as this pattern (head_page_pk == 0 && tail_page_pk == 0) is checked
1794            // in the `load_or_create` method to determine if allocation is needed.
1795            descriptor.head_page_pk = 0;
1796            descriptor.tail_page_pk = 0;
1797            descriptor.total_row_count = 0;
1798            descriptor.total_chunk_count = 0;
1799            puts.push(BatchPut::Raw {
1800                key: descriptor_pk,
1801                bytes: descriptor.to_le_bytes(),
1802            });
1803            return Ok(());
1804        }
1805
1806        // Reuse first page; alloc more if needed.
1807        let mut pages = Vec::with_capacity(need_pages);
1808        if !old_pages.is_empty() {
1809            pages.push(old_pages[0]);
1810        } else {
1811            pages.push(self.pager.alloc_many(1)?[0]);
1812            descriptor.head_page_pk = pages[0];
1813        }
1814        if need_pages > pages.len() {
1815            let extra = self.pager.alloc_many(need_pages - pages.len())?;
1816            pages.extend(extra);
1817        }
1818
1819        // Free surplus old pages.
1820        if old_pages.len() > need_pages {
1821            frees.extend(old_pages[need_pages..].iter().copied());
1822        }
1823
1824        // Write page contents.
1825        let mut off = 0usize;
1826        for (i, page_pk) in pages.iter().copied().enumerate() {
1827            let remain = new_metas.len() - off;
1828            let count = remain.min(per);
1829            let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1830            let header = DescriptorPageHeader {
1831                next_page_pk: next,
1832                entry_count: count as u32,
1833                _padding: [0; 4],
1834            };
1835            let mut page_bytes = header.to_le_bytes().to_vec();
1836            for m in &new_metas[off..off + count] {
1837                page_bytes.extend_from_slice(&m.to_le_bytes());
1838            }
1839            puts.push(BatchPut::Raw {
1840                key: page_pk,
1841                bytes: page_bytes,
1842            });
1843            off += count;
1844        }
1845
1846        descriptor.tail_page_pk = *pages.last().unwrap();
1847        descriptor.total_chunk_count = new_metas.len() as u64;
1848        descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1849        puts.push(BatchPut::Raw {
1850            key: descriptor_pk,
1851            bytes: descriptor.to_le_bytes(),
1852        });
1853        Ok(())
1854    }
1855
1856    /// Bounded, local field compaction. Merges adjacent small chunks into
1857    /// ~TARGET_CHUNK_BYTES; leaves large chunks intact. Recomputes perms if
1858    /// any source chunk in a run had one. Frees obsolete chunks/pages.
1859    fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1860        // We may rewrite descriptors; take a write lock.
1861        let mut catalog = self.catalog.write().unwrap();
1862
1863        let desc_pk = match catalog.map.get(&field_id) {
1864            Some(&pk) => pk,
1865            None => return Ok(()),
1866        };
1867        let rid_fid = rowid_fid(field_id);
1868        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1869            Some(&pk) => pk,
1870            None => return Ok(()),
1871        };
1872
1873        // True batching for the two descriptor reads.
1874        let gets = vec![
1875            BatchGet::Raw { key: desc_pk },
1876            BatchGet::Raw { key: desc_pk_rid },
1877        ];
1878        let results = self.pager.batch_get(&gets)?;
1879        let mut blobs_by_pk = FxHashMap::default();
1880        for res in results {
1881            if let GetResult::Raw { key, bytes } = res {
1882                blobs_by_pk.insert(key, bytes);
1883            }
1884        }
1885        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1886        let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1887        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1888        let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1889
1890        // Load metas.
1891        let mut metas = Vec::new();
1892        for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1893            metas.push(m?);
1894        }
1895        let mut metas_rid = Vec::new();
1896        for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1897            metas_rid.push(m?);
1898        }
1899        if metas.is_empty() || metas_rid.is_empty() {
1900            return Ok(());
1901        }
1902
1903        let mut puts: Vec<BatchPut> = Vec::new();
1904        let mut frees: Vec<PhysicalKey> = Vec::new();
1905        let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1906        let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1907
1908        let mut i = 0usize;
1909        while i < metas.len() {
1910            let sz = metas[i].serialized_bytes as usize;
1911            // Keep large chunks as-is.
1912            if sz >= MIN_CHUNK_BYTES {
1913                new_metas.push(metas[i]);
1914                new_rid_metas.push(metas_rid[i]);
1915                i += 1;
1916                continue;
1917            }
1918
1919            // Build a small run [i, j) capped by MAX_MERGE_RUN_BYTES.
1920            let mut j = i;
1921            let mut run_bytes = 0usize;
1922            while j < metas.len() {
1923                let b = metas[j].serialized_bytes as usize;
1924                if b >= TARGET_CHUNK_BYTES {
1925                    break;
1926                }
1927                if run_bytes + b > MAX_MERGE_RUN_BYTES {
1928                    break;
1929                }
1930                run_bytes += b;
1931                j += 1;
1932            }
1933            if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1934                new_metas.push(metas[i]);
1935                new_rid_metas.push(metas_rid[i]);
1936                i += 1;
1937                continue;
1938            }
1939
1940            // Fetch and concatenate the run's data and row_ids.
1941            let mut gets = Vec::with_capacity((j - i) * 2);
1942            for k in i..j {
1943                gets.push(BatchGet::Raw {
1944                    key: metas[k].chunk_pk,
1945                });
1946                gets.push(BatchGet::Raw {
1947                    key: metas_rid[k].chunk_pk,
1948                });
1949            }
1950            let results = self.pager.batch_get(&gets)?;
1951            let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1952            for r in results {
1953                match r {
1954                    GetResult::Raw { key, bytes } => {
1955                        by_pk.insert(key, bytes);
1956                    }
1957                    _ => return Err(Error::NotFound),
1958                }
1959            }
1960            let mut data_parts = Vec::with_capacity(j - i);
1961            let mut rid_parts = Vec::with_capacity(j - i);
1962            for k in i..j {
1963                let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1964                data_parts.push(deserialize_array(db.clone())?);
1965                let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1966                rid_parts.push(deserialize_array(rb.clone())?);
1967            }
1968            let merged_data = concat_many(data_parts.iter().collect())?;
1969            let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1970
1971            // Split merged run into ~target-sized chunks.
1972            let slices = split_to_target_bytes(
1973                &merged_data,
1974                TARGET_CHUNK_BYTES,
1975                self.cfg.varwidth_fallback_rows_per_slice,
1976            );
1977            let mut rid_off = 0usize;
1978            let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1979
1980            for s in slices {
1981                let rows = s.len();
1982                // Slice rid to match s (avoid double Arc).
1983                let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1984                let rid_norm = zero_offset(&rid_ref);
1985                let rid_pk = self.pager.alloc_many(1)?[0];
1986                let rid_bytes = serialize_array(rid_norm.as_ref())?;
1987
1988                let data_pk = self.pager.alloc_many(1)?[0];
1989                let s_norm = zero_offset(&s);
1990                let data_bytes = serialize_array(s_norm.as_ref())?;
1991                puts.push(BatchPut::Raw {
1992                    key: data_pk,
1993                    bytes: data_bytes,
1994                });
1995                puts.push(BatchPut::Raw {
1996                    key: rid_pk,
1997                    bytes: rid_bytes,
1998                });
1999                let mut meta = ChunkMetadata {
2000                    chunk_pk: data_pk,
2001                    row_count: rows as u64,
2002                    serialized_bytes: s_norm.get_array_memory_size() as u64,
2003                    max_val_u64: u64::MAX,
2004                    ..Default::default()
2005                };
2006                // If any source chunk had a perm, recompute for this slice.
2007                if need_perms {
2008                    let sort_col = SortColumn {
2009                        values: s.clone(),
2010                        options: None,
2011                    };
2012                    let idx = lexsort_to_indices(&[sort_col], None)?;
2013                    let perm_bytes = serialize_array(&idx)?;
2014                    let perm_pk = self.pager.alloc_many(1)?[0];
2015                    puts.push(BatchPut::Raw {
2016                        key: perm_pk,
2017                        bytes: perm_bytes,
2018                    });
2019                    meta.value_order_perm_pk = perm_pk;
2020                }
2021
2022                // Build presence index for rids and min/max
2023                let rid_any = rid_norm.clone();
2024                let rids = rid_any
2025                    .as_any()
2026                    .downcast_ref::<UInt64Array>()
2027                    .ok_or_else(|| Error::Internal("rid downcast".into()))?;
2028                let mut min = u64::MAX;
2029                let mut max = 0u64;
2030                let mut sorted_rids = true;
2031                let mut last_v = 0u64;
2032                for ii in 0..rids.len() {
2033                    let v = rids.value(ii);
2034                    if ii == 0 {
2035                        last_v = v;
2036                    } else if v < last_v {
2037                        sorted_rids = false;
2038                    } else {
2039                        last_v = v;
2040                    }
2041                    if v < min {
2042                        min = v;
2043                    }
2044                    if v > max {
2045                        max = v;
2046                    }
2047                }
2048                let mut rid_perm_pk = 0u64;
2049                if !sorted_rids {
2050                    let rid_sort_col = SortColumn {
2051                        values: rid_any,
2052                        options: None,
2053                    };
2054                    let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
2055                    let rid_perm_bytes = serialize_array(&rid_idx)?;
2056                    rid_perm_pk = self.pager.alloc_many(1)?[0];
2057                    puts.push(BatchPut::Raw {
2058                        key: rid_perm_pk,
2059                        bytes: rid_perm_bytes,
2060                    });
2061                }
2062                let rid_meta = ChunkMetadata {
2063                    chunk_pk: rid_pk,
2064                    value_order_perm_pk: rid_perm_pk,
2065                    row_count: rows as u64,
2066                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
2067                    min_val_u64: if rows > 0 { min } else { 0 },
2068                    max_val_u64: if rows > 0 { max } else { 0 },
2069                };
2070                new_metas.push(meta);
2071                new_rid_metas.push(rid_meta);
2072                rid_off += rows;
2073            }
2074
2075            // Free all old data/rid/perms in the merged run.
2076            for k in i..j {
2077                frees.push(metas[k].chunk_pk);
2078                if metas[k].value_order_perm_pk != 0 {
2079                    frees.push(metas[k].value_order_perm_pk);
2080                }
2081                frees.push(metas_rid[k].chunk_pk);
2082                if metas_rid[k].value_order_perm_pk != 0 {
2083                    frees.push(metas_rid[k].value_order_perm_pk);
2084                }
2085            }
2086
2087            i = j;
2088        }
2089
2090        // If everything was deleted, drop the field entirely.
2091        if new_metas.is_empty() {
2092            // Drop descriptors and catalog mapping.
2093            self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
2094            self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
2095            catalog.map.remove(&field_id);
2096            catalog.map.remove(&rid_fid);
2097            puts.push(BatchPut::Raw {
2098                key: CATALOG_ROOT_PKEY,
2099                bytes: catalog.to_bytes(),
2100            });
2101            if !puts.is_empty() {
2102                self.pager.batch_put(&puts)?;
2103            }
2104            if !frees.is_empty() {
2105                self.pager.free_many(&frees)?;
2106            }
2107            return Ok(());
2108        }
2109
2110        // Rewrite descriptor chains to match the new meta lists.
2111        self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
2112        self.write_descriptor_chain(
2113            desc_pk_rid,
2114            &mut desc_rid,
2115            &new_rid_metas,
2116            &mut puts,
2117            &mut frees,
2118        )?;
2119        // Persist new/updated blobs and free the old ones.
2120        if !puts.is_empty() {
2121            self.pager.batch_put(&puts)?;
2122        }
2123        if !frees.is_empty() {
2124            self.pager.free_many(&frees)?;
2125        }
2126
2127        Ok(())
2128    }
2129
2130    /// (Internal) Helper for batch appends. Appends metadata to the current
2131    /// in-memory tail page, creating a new one if necessary.
2132    fn append_meta_in_loop(
2133        &self,
2134        descriptor: &mut ColumnDescriptor,
2135        tail_page_bytes: &mut Vec<u8>,
2136        meta: ChunkMetadata,
2137        puts: &mut Vec<BatchPut>,
2138    ) -> Result<()> {
2139        let mut header = DescriptorPageHeader::from_le_bytes(
2140            &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
2141        );
2142        if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
2143            && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
2144        {
2145            // Case 1: There's room in the current tail page.
2146            tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
2147            header.entry_count += 1;
2148            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
2149                .copy_from_slice(&header.to_le_bytes());
2150        } else {
2151            // Case 2: The tail page is full. Write it out and start a new one.
2152            let new_tail_pk = self.pager.alloc_many(1)?[0];
2153            header.next_page_pk = new_tail_pk;
2154            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
2155                .copy_from_slice(&header.to_le_bytes());
2156            // --- PERFORMANCE FIX: Move the full page's bytes instead of
2157            // cloning ---
2158            let full_page_to_write = std::mem::take(tail_page_bytes);
2159            puts.push(BatchPut::Raw {
2160                key: descriptor.tail_page_pk,
2161                bytes: full_page_to_write,
2162            });
2163            // Create the new tail page.
2164            let new_header = DescriptorPageHeader {
2165                next_page_pk: 0,
2166                entry_count: 1,
2167                _padding: [0; 4],
2168            };
2169            let mut new_page_bytes = new_header.to_le_bytes().to_vec();
2170            new_page_bytes.extend_from_slice(&meta.to_le_bytes());
2171            // Update our live state to point to the new tail.
2172            descriptor.tail_page_pk = new_tail_pk;
2173            *tail_page_bytes = new_page_bytes;
2174        }
2175
2176        descriptor.total_row_count += meta.row_count;
2177        descriptor.total_chunk_count += 1;
2178        Ok(())
2179    }
2180
2181    /// Verifies the integrity of the column store's metadata.
2182    ///
2183    /// This check is useful for tests and debugging. It verifies:
2184    /// 1. The catalog can be read.
2185    /// 2. All descriptor chains are walkable.
2186    /// 3. The row and chunk counts in the descriptors match the sum of the
2187    ///    chunk metadata.
2188    ///
2189    /// Returns `Ok(())` if consistent, otherwise returns an `Error`.
2190    pub fn verify_integrity(&self) -> Result<()> {
2191        let catalog = self.catalog.read().unwrap();
2192        for (&field_id, &descriptor_pk) in &catalog.map {
2193            let desc_blob = self
2194                .pager
2195                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2196                .pop()
2197                .and_then(|r| match r {
2198                    GetResult::Raw { bytes, .. } => Some(bytes),
2199                    _ => None,
2200                })
2201                .ok_or_else(|| {
2202                    Error::Internal(format!(
2203                        "Catalog points to missing descriptor pk={}",
2204                        descriptor_pk
2205                    ))
2206                })?;
2207            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2208            if descriptor.field_id != field_id {
2209                return Err(Error::Internal(format!(
2210                    "Descriptor at pk={} has wrong field_id: expected {:?}, \
2211                     got {:?}",
2212                    descriptor_pk, field_id, descriptor.field_id
2213                )));
2214            }
2215
2216            let mut actual_rows = 0;
2217            let mut actual_chunks = 0;
2218            let mut current_page_pk = descriptor.head_page_pk;
2219            while current_page_pk != 0 {
2220                let page_blob = self
2221                    .pager
2222                    .batch_get(&[BatchGet::Raw {
2223                        key: current_page_pk,
2224                    }])?
2225                    .pop()
2226                    .and_then(|r| match r {
2227                        GetResult::Raw { bytes, .. } => Some(bytes),
2228                        _ => None,
2229                    })
2230                    .ok_or_else(|| {
2231                        Error::Internal(format!(
2232                            "Descriptor page chain broken at pk={}",
2233                            current_page_pk
2234                        ))
2235                    })?;
2236                let header = DescriptorPageHeader::from_le_bytes(
2237                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2238                );
2239                for i in 0..(header.entry_count as usize) {
2240                    let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2241                    let end = off + ChunkMetadata::DISK_SIZE;
2242                    let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2243                    actual_rows += meta.row_count;
2244                    actual_chunks += 1;
2245                }
2246                current_page_pk = header.next_page_pk;
2247            }
2248
2249            if descriptor.total_row_count != actual_rows {
2250                return Err(Error::Internal(format!(
2251                    "Row count mismatch for field {:?}: descriptor says {}, \
2252                     actual is {}",
2253                    field_id, descriptor.total_row_count, actual_rows
2254                )));
2255            }
2256            if descriptor.total_chunk_count != actual_chunks {
2257                return Err(Error::Internal(format!(
2258                    "Chunk count mismatch for field {:?}: descriptor says {}, \
2259                     actual is {}",
2260                    field_id, descriptor.total_chunk_count, actual_chunks
2261                )));
2262            }
2263        }
2264        Ok(())
2265    }
2266
2267    /// Gathers detailed statistics about the storage layout.
2268    ///
2269    /// This method is designed for low-level analysis and debugging, allowing
2270    /// you to check for under- or over-utilization of descriptor pages.
2271    pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2272        let catalog = self.catalog.read().unwrap();
2273        let mut all_stats = Vec::new();
2274
2275        for (&field_id, &descriptor_pk) in &catalog.map {
2276            let desc_blob = self
2277                .pager
2278                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2279                .pop()
2280                .and_then(|r| match r {
2281                    GetResult::Raw { bytes, .. } => Some(bytes),
2282                    _ => None,
2283                })
2284                .ok_or(Error::NotFound)?;
2285            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2286
2287            let mut page_stats = Vec::new();
2288            let mut current_page_pk = descriptor.head_page_pk;
2289            while current_page_pk != 0 {
2290                let page_blob = self
2291                    .pager
2292                    .batch_get(&[BatchGet::Raw {
2293                        key: current_page_pk,
2294                    }])?
2295                    .pop()
2296                    .and_then(|r| match r {
2297                        GetResult::Raw { bytes, .. } => Some(bytes),
2298                        _ => None,
2299                    })
2300                    .ok_or(Error::NotFound)?;
2301                let header = DescriptorPageHeader::from_le_bytes(
2302                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2303                );
2304                page_stats.push(DescriptorPageStats {
2305                    page_pk: current_page_pk,
2306                    entry_count: header.entry_count,
2307                    page_size_bytes: page_blob.as_ref().len(),
2308                });
2309                current_page_pk = header.next_page_pk;
2310            }
2311
2312            all_stats.push(ColumnLayoutStats {
2313                field_id,
2314                total_rows: descriptor.total_row_count,
2315                total_chunks: descriptor.total_chunk_count,
2316                pages: page_stats,
2317            });
2318        }
2319        Ok(all_stats)
2320    }
2321}