llkv_column_map/store/
core.rs

1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4    ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16    constants::CATALOG_ROOT_PKEY,
17    pager::{BatchGet, BatchPut, GetResult, Pager},
18    serialization::{deserialize_array, serialize_array},
19    types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26/// Columnar storage engine for managing Arrow-based data.
27///
28/// `ColumnStore` provides the primary interface for persisting and retrieving columnar
29/// data using Apache Arrow [`RecordBatch`]es. It manages:
30///
31/// - Column descriptors and metadata (chunk locations, row counts, min/max values)
32/// - Data type caching for efficient schema queries
33/// - Index management (presence indexes, value indexes)
34/// - Integration with the [`Pager`] for persistent storage
35///
36/// # Namespaces
37///
38/// Columns are identified by [`LogicalFieldId`], which combines a namespace, table ID,
39/// and field ID. This prevents collisions between user data, row IDs, and MVCC metadata:
40///
41/// - `UserData`: Regular table columns
42/// - `RowIdShadow`: Internal row ID tracking
43/// - `TxnCreatedBy`: MVCC transaction creation timestamps
44/// - `TxnDeletedBy`: MVCC transaction deletion timestamps
45///
46/// # Thread Safety
47///
48/// `ColumnStore` is `Send + Sync` and can be safely shared across threads via `Arc`.
49/// Internal state (catalog, caches) uses `RwLock` for concurrent access.
50pub struct ColumnStore<P: Pager> {
51    pub(crate) pager: Arc<P>,
52    pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
53    cfg: ColumnStoreConfig,
54    dtype_cache: DTypeCache<P>,
55    index_manager: IndexManager<P>,
56}
57
58impl<P> Clone for ColumnStore<P>
59where
60    P: Pager<Blob = EntryHandle> + Send + Sync,
61{
62    fn clone(&self) -> Self {
63        Self {
64            pager: Arc::clone(&self.pager),
65            catalog: Arc::clone(&self.catalog),
66            cfg: self.cfg.clone(),
67            dtype_cache: self.dtype_cache.clone(),
68            index_manager: self.index_manager.clone(),
69        }
70    }
71}
72
73impl<P> ColumnStore<P>
74where
75    P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77    /// Opens or creates a `ColumnStore` using the provided pager.
78    ///
79    /// Loads the column catalog from the pager's root catalog key, or initializes
80    /// an empty catalog if none exists. The catalog maps [`LogicalFieldId`] to the
81    /// physical keys of column descriptors.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the pager fails to load the catalog or if deserialization fails.
86    pub fn open(pager: Arc<P>) -> Result<Self> {
87        let cfg = ColumnStoreConfig::default();
88        let catalog = match pager
89            .batch_get(&[BatchGet::Raw {
90                key: CATALOG_ROOT_PKEY,
91            }])?
92            .pop()
93        {
94            Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
95            _ => ColumnCatalog::default(),
96        };
97        let arc_catalog = Arc::new(RwLock::new(catalog));
98
99        let index_manager = IndexManager::new(Arc::clone(&pager));
100
101        Ok(Self {
102            pager: Arc::clone(&pager),
103            catalog: Arc::clone(&arc_catalog),
104            cfg,
105            dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
106            index_manager,
107        })
108    }
109
110    /// Creates and persists an index for a column.
111    ///
112    /// Builds the specified index type for all existing data in the column and
113    /// persists it atomically. The index will be maintained automatically on subsequent
114    /// appends and updates.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the column doesn't exist or if index creation fails.
119    pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
120        self.index_manager.register_index(self, field_id, kind)
121    }
122
123    /// Checks if a logical field is registered in the catalog.
124    pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
125        let catalog = self.catalog.read().unwrap();
126        catalog.map.contains_key(&field_id)
127    }
128
129    /// Removes a persisted index from a column.
130    ///
131    /// Atomically removes the index and frees associated storage. The column data
132    /// itself is not affected.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the column or index doesn't exist.
137    pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
138        self.index_manager.unregister_index(self, field_id, kind)
139    }
140
141    /// Returns the Arrow data type of a column.
142    ///
143    /// Returns the data type from cache if available, otherwise loads it from
144    /// the column descriptor and caches it for future queries.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
149    pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
150        if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
151            return Ok(dt);
152        }
153        self.dtype_cache.dtype_for_field(field_id)
154    }
155
156    /// Updates the data type of an existing column.
157    ///
158    /// This updates the descriptor's data type fingerprint and cache. Note that this
159    /// does NOT migrate existing data - the caller must ensure data compatibility.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the column doesn't exist or if the descriptor update fails.
164    pub fn update_data_type(
165        &self,
166        field_id: LogicalFieldId,
167        new_data_type: &DataType,
168    ) -> Result<()> {
169        // Get the descriptor physical key from catalog
170        let descriptor_pk = {
171            let catalog = self.catalog.read().unwrap();
172            *catalog.map.get(&field_id).ok_or_else(|| Error::NotFound)?
173        };
174
175        // Load the existing descriptor
176        let mut descriptor = match self
177            .pager
178            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
179            .pop()
180        {
181            Some(GetResult::Raw { bytes, .. }) => ColumnDescriptor::from_le_bytes(bytes.as_ref()),
182            _ => return Err(Error::NotFound),
183        };
184
185        // Update the data type fingerprint
186        let new_fingerprint = DTypeCache::<P>::dtype_fingerprint(new_data_type);
187        if new_fingerprint != 0 {
188            DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, new_fingerprint);
189        }
190
191        // Persist the updated descriptor
192        self.pager.batch_put(&[BatchPut::Raw {
193            key: descriptor_pk,
194            bytes: descriptor.to_le_bytes(),
195        }])?;
196
197        // Update the cache
198        self.dtype_cache.insert(field_id, new_data_type.clone());
199
200        Ok(())
201    }
202
203    /// Ensures that catalog entries and descriptors exist for a logical column.
204    ///
205    /// Primarily used when creating empty tables so that subsequent
206    /// operations (like `CREATE INDEX`) can resolve column metadata before any
207    /// data has been appended.
208    pub fn ensure_column_registered(
209        &self,
210        field_id: LogicalFieldId,
211        data_type: &DataType,
212    ) -> Result<()> {
213        let rid_field_id = rowid_fid(field_id);
214
215        let mut catalog_dirty = false;
216        let descriptor_pk;
217        let rid_descriptor_pk;
218
219        {
220            let mut catalog = self.catalog.write().unwrap();
221            descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
222                pk
223            } else {
224                let pk = self.pager.alloc_many(1)?[0];
225                catalog.map.insert(field_id, pk);
226                catalog_dirty = true;
227                pk
228            };
229
230            rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
231                pk
232            } else {
233                let pk = self.pager.alloc_many(1)?[0];
234                catalog.map.insert(rid_field_id, pk);
235                catalog_dirty = true;
236                pk
237            };
238        }
239
240        let mut puts: Vec<BatchPut> = Vec::new();
241
242        let data_descriptor_missing = self
243            .pager
244            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
245            .pop()
246            .and_then(|r| match r {
247                GetResult::Raw { bytes, .. } => Some(bytes),
248                _ => None,
249            })
250            .is_none();
251
252        if data_descriptor_missing {
253            let (mut descriptor, tail_page) =
254                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
255            let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
256            if fingerprint != 0 {
257                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
258            }
259            puts.push(BatchPut::Raw {
260                key: descriptor.tail_page_pk,
261                bytes: tail_page,
262            });
263            puts.push(BatchPut::Raw {
264                key: descriptor_pk,
265                bytes: descriptor.to_le_bytes(),
266            });
267        }
268
269        let rid_descriptor_missing = self
270            .pager
271            .batch_get(&[BatchGet::Raw {
272                key: rid_descriptor_pk,
273            }])?
274            .pop()
275            .and_then(|r| match r {
276                GetResult::Raw { bytes, .. } => Some(bytes),
277                _ => None,
278            })
279            .is_none();
280
281        if rid_descriptor_missing {
282            let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
283                Arc::clone(&self.pager),
284                rid_descriptor_pk,
285                rid_field_id,
286            )?;
287            let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
288            if fingerprint != 0 {
289                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
290            }
291            puts.push(BatchPut::Raw {
292                key: rid_descriptor.tail_page_pk,
293                bytes: tail_page,
294            });
295            puts.push(BatchPut::Raw {
296                key: rid_descriptor_pk,
297                bytes: rid_descriptor.to_le_bytes(),
298            });
299        }
300
301        self.dtype_cache.insert(field_id, data_type.clone());
302
303        if catalog_dirty {
304            let catalog_bytes = {
305                let catalog = self.catalog.read().unwrap();
306                catalog.to_bytes()
307            };
308            puts.push(BatchPut::Raw {
309                key: CATALOG_ROOT_PKEY,
310                bytes: catalog_bytes,
311            });
312        }
313
314        if !puts.is_empty() {
315            self.pager.batch_put(&puts)?;
316        }
317
318        Ok(())
319    }
320
321    /// Find all row IDs where a column satisfies a predicate.
322    ///
323    /// This evaluates the predicate against the column's data and returns a vector
324    /// of matching row IDs. Uses indexes and chunk metadata (min/max values) to
325    /// skip irrelevant data when possible.
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
330    pub fn filter_row_ids<T>(
331        &self,
332        field_id: LogicalFieldId,
333        predicate: &Predicate<T::Value>,
334    ) -> Result<Vec<u64>>
335    where
336        T: FilterDispatch,
337    {
338        tracing::trace!(field=?field_id, "filter_row_ids start");
339        let res = T::run_filter(self, field_id, predicate);
340        if let Err(ref err) = res {
341            tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
342        } else {
343            tracing::trace!(field=?field_id, "filter_row_ids ok");
344        }
345        res
346    }
347
348    /// Evaluate a predicate against a column and return match metadata.
349    ///
350    /// This variant drives a primitive predicate over the column and returns a
351    /// [`FilterResult`] describing contiguous match regions. Callers can use the
352    /// result to build paginated scans or gather row identifiers.
353    ///
354    /// # Arguments
355    /// - `field_id`: Logical column to filter.
356    /// - `predicate`: Callable invoked on each value; should be cheap and free of
357    ///   side effects.
358    ///
359    /// # Errors
360    /// Returns an error if the column metadata cannot be loaded or if decoding a
361    /// chunk fails.
362    pub fn filter_matches<T, F>(
363        &self,
364        field_id: LogicalFieldId,
365        predicate: F,
366    ) -> Result<FilterResult>
367    where
368        T: FilterPrimitive,
369        F: FnMut(T::Native) -> bool,
370    {
371        T::run_filter_with_result(self, field_id, predicate)
372    }
373
374    /// List all indexes registered for a column.
375    ///
376    /// Returns the types of indexes (e.g., presence, value) that are currently
377    /// persisted for the specified column.
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
382    pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
383        let catalog = self.catalog.read().unwrap();
384        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
385
386        let desc_blob = self
387            .pager
388            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
389            .pop()
390            .and_then(|r| match r {
391                GetResult::Raw { bytes, .. } => Some(bytes),
392                _ => None,
393            })
394            .ok_or(Error::NotFound)?;
395        let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
396
397        let kinds = descriptor.get_indexes()?;
398        Ok(kinds)
399    }
400
401    /// Get the total number of rows in a column.
402    ///
403    /// Returns the persisted row count from the column's descriptor. This value is
404    /// updated by append and delete operations.
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
409    pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
410        let catalog = self.catalog.read().unwrap();
411        let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
412        drop(catalog);
413
414        let desc_blob = self
415            .pager
416            .batch_get(&[BatchGet::Raw { key: desc_pk }])?
417            .pop()
418            .and_then(|r| match r {
419                GetResult::Raw { bytes, .. } => Some(bytes),
420                _ => None,
421            })
422            .ok_or(Error::NotFound)?;
423
424        let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
425        Ok(desc.total_row_count)
426    }
427
428    /// Get the total number of rows in a table.
429    ///
430    /// This returns the maximum row count across all user-data columns in the table.
431    /// If the table has no persisted columns, returns `0`.
432    ///
433    /// # Errors
434    ///
435    /// Returns an error if column descriptors cannot be loaded.
436    pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
437        use crate::types::Namespace;
438        // Acquire read lock on catalog and find any matching user-data field
439        let catalog = self.catalog.read().unwrap();
440        // Collect all user-data logical field ids for this table.
441        let candidates: Vec<LogicalFieldId> = catalog
442            .map
443            .keys()
444            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
445            .copied()
446            .collect();
447        drop(catalog);
448
449        if candidates.is_empty() {
450            return Ok(0);
451        }
452
453        // Return the maximum total_row_count across all user columns for the table.
454        let mut max_rows: u64 = 0;
455        for field in candidates {
456            let rows = self.total_rows_for_field(field)?;
457            if rows > max_rows {
458                max_rows = rows;
459            }
460        }
461        Ok(max_rows)
462    }
463
464    /// Get all user-data column IDs for a table.
465    ///
466    /// This returns the [`LogicalFieldId`]s of all persisted user columns (namespace
467    /// `UserData`) belonging to the specified table. MVCC and row ID columns are not
468    /// included.
469    pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
470        use crate::types::Namespace;
471
472        let catalog = self.catalog.read().unwrap();
473        catalog
474            .map
475            .keys()
476            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
477            .copied()
478            .collect()
479    }
480
481    /// Check whether a specific row ID exists in a column.
482    ///
483    /// This uses presence indexes and binary search when available for fast lookups.
484    /// If no presence index exists, it scans chunks and uses min/max metadata to
485    /// prune irrelevant data.
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
490    pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
491        let rid_fid = rowid_fid(field_id);
492        let catalog = self.catalog.read().unwrap();
493        let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
494        let rid_desc_blob = self
495            .pager
496            .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
497            .pop()
498            .and_then(|r| match r {
499                GetResult::Raw { bytes, .. } => Some(bytes),
500                _ => None,
501            })
502            .ok_or(Error::NotFound)?;
503        let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
504        drop(catalog);
505
506        // Walk metas; prune by min/max when available.
507        for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
508            let meta = m?;
509            if meta.row_count == 0 {
510                continue;
511            }
512            if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
513                || row_id > meta.max_val_u64
514            {
515                continue;
516            }
517            // Fetch rid chunk and, if present, the presence perm
518            let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
519            if meta.value_order_perm_pk != 0 {
520                gets.push(BatchGet::Raw {
521                    key: meta.value_order_perm_pk,
522                });
523            }
524            let results = self.pager.batch_get(&gets)?;
525            let mut rid_blob: Option<EntryHandle> = None;
526            let mut perm_blob: Option<EntryHandle> = None;
527            for r in results {
528                if let GetResult::Raw { key, bytes } = r {
529                    if key == meta.chunk_pk {
530                        rid_blob = Some(bytes);
531                    } else if key == meta.value_order_perm_pk {
532                        perm_blob = Some(bytes);
533                    }
534                }
535            }
536            // If the rid blob for this chunk is missing, treat as absent and continue
537            let Some(rid_blob) = rid_blob else { continue };
538            let rid_any = deserialize_array(rid_blob)?;
539            let rids = rid_any
540                .as_any()
541                .downcast_ref::<UInt64Array>()
542                .ok_or_else(|| Error::Internal("rid downcast".into()))?;
543            if let Some(pblob) = perm_blob {
544                let perm_any = deserialize_array(pblob)?;
545                let perm = perm_any
546                    .as_any()
547                    .downcast_ref::<UInt32Array>()
548                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
549                // Binary search over sorted-by-perm view
550                let mut lo: isize = 0;
551                let mut hi: isize = (perm.len() as isize) - 1;
552                while lo <= hi {
553                    let mid = ((lo + hi) >> 1) as usize;
554                    let rid = rids.value(perm.value(mid) as usize);
555                    if rid == row_id {
556                        return Ok(true);
557                    } else if rid < row_id {
558                        lo = mid as isize + 1;
559                    } else {
560                        hi = mid as isize - 1;
561                    }
562                }
563            } else {
564                // Assume rid chunk is sorted ascending (common for appends/compaction) and binary search
565                let mut lo: isize = 0;
566                let mut hi: isize = (rids.len() as isize) - 1;
567                while lo <= hi {
568                    let mid = ((lo + hi) >> 1) as usize;
569                    let rid = rids.value(mid);
570                    if rid == row_id {
571                        return Ok(true);
572                    } else if rid < row_id {
573                        lo = mid as isize + 1;
574                    } else {
575                        hi = mid as isize - 1;
576                    }
577                }
578            }
579        }
580        Ok(false)
581    }
582
583    // NOTE: Row IDs must be provided explicitly today. Consider introducing an
584    // opt-in auto-increment mode once table-level schema metadata can enforce it.
585    /// Append a [`RecordBatch`] to the store.
586    ///
587    /// The batch must include a `rowid` column (type `UInt64`) that uniquely identifies
588    /// each row. Each other column must have `field_id` metadata mapping it to a
589    /// [`LogicalFieldId`].
590    ///
591    /// # Last-Write-Wins Updates
592    ///
593    /// If any row IDs in the batch already exist, they are updated in-place (overwritten)
594    /// rather than creating duplicates. This happens in a separate transaction before
595    /// appending new rows.
596    ///
597    /// # Row ID Ordering
598    ///
599    /// The batch is automatically sorted by `rowid` if not already sorted. This ensures
600    /// efficient metadata updates and naturally sorted shadow columns.
601    ///
602    /// # Table Separation
603    ///
604    /// Each batch should contain columns from only one table. To append to multiple
605    /// tables, call `append` separately for each table's batch (may be concurrent).
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if:
610    /// - The batch is missing the `rowid` column
611    /// - Column metadata is missing or invalid
612    /// - Storage operations fail
613    #[allow(unused_variables, unused_assignments)] // NOTE: Preserve variable hooks used during feature gating of presence indexes.
614    pub fn append(&self, batch: &RecordBatch) -> Result<()> {
615        tracing::trace!(
616            num_columns = batch.num_columns(),
617            num_rows = batch.num_rows(),
618            "ColumnStore::append BEGIN"
619        );
620        // --- PHASE 1: PRE-PROCESSING THE INCOMING BATCH ---
621        // The `append` logic relies on row IDs being processed in ascending order to handle
622        // metadata updates efficiently and to ensure the shadow row_id chunks are naturally sorted.
623        // This block checks if the incoming batch is already sorted by `row_id`. If not, it creates a
624        // new, sorted `RecordBatch` to work with for the rest of the function.
625        let working_batch: RecordBatch;
626        let batch_ref = {
627            let schema = batch.schema();
628            let row_id_idx = schema
629                .index_of(ROW_ID_COLUMN_NAME)
630                .map_err(|_| Error::Internal("row_id column required".into()))?;
631            let row_id_any = batch.column(row_id_idx).clone();
632            let row_id_arr = row_id_any
633                .as_any()
634                .downcast_ref::<UInt64Array>()
635                .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
636
637            // Manually check if the row_id column is sorted.
638            let mut is_sorted = true;
639            if !row_id_arr.is_empty() {
640                let mut last = row_id_arr.value(0);
641                for i in 1..row_id_arr.len() {
642                    let current = row_id_arr.value(i);
643                    if current < last {
644                        is_sorted = false;
645                        break;
646                    }
647                    last = current;
648                }
649            }
650
651            // If sorted, we can use the original batch directly.
652            // Otherwise, we compute a permutation and reorder all columns.
653            if is_sorted {
654                batch
655            } else {
656                let sort_col = SortColumn {
657                    values: row_id_any,
658                    options: None,
659                };
660                let idx = lexsort_to_indices(&[sort_col], None)?;
661                let perm = idx
662                    .as_any()
663                    .downcast_ref::<UInt32Array>()
664                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
665                let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
666                for i in 0..batch.num_columns() {
667                    cols.push(compute::take(batch.column(i), perm, None)?);
668                }
669                working_batch = RecordBatch::try_new(schema.clone(), cols)
670                    .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
671                &working_batch
672            }
673        };
674
675        tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
676
677        // --- PHASE 2: LAST-WRITER-WINS (LWW) REWRITE ---
678        // This phase handles updates. It identifies any rows in the incoming batch that
679        // already exist in the store and rewrites them in-place. This is a separate
680        // transaction that happens before the main append of new rows.
681        let schema = batch_ref.schema();
682        let row_id_idx = schema
683            .index_of(ROW_ID_COLUMN_NAME)
684            .map_err(|_| Error::Internal("row_id column required".into()))?;
685
686        // Create a quick lookup map of incoming row IDs to their positions in the batch.
687        let row_id_arr = batch_ref
688            .column(row_id_idx)
689            .as_any()
690            .downcast_ref::<UInt64Array>()
691            .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
692        let mut incoming_ids_map = FxHashMap::default();
693        incoming_ids_map.reserve(row_id_arr.len());
694        for i in 0..row_id_arr.len() {
695            incoming_ids_map.insert(row_id_arr.value(i), i);
696        }
697
698        // These variables will track the state of the LWW transaction.
699        let mut catalog_dirty = false;
700        let mut puts_rewrites: Vec<BatchPut> = Vec::new();
701        let mut all_rewritten_ids = FxHashSet::default();
702
703        // Iterate through each column in the batch (except row_id) to perform rewrites.
704        let mut catalog_lock = self.catalog.write().unwrap();
705        for i in 0..batch_ref.num_columns() {
706            if i == row_id_idx {
707                continue;
708            }
709            let field = schema.field(i);
710            if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
711                let field_id = field_id_str
712                    .parse::<u64>()
713                    .map(LogicalFieldId::from)
714                    .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
715
716                // `lww_rewrite_for_field` finds overlapping row IDs and rewrites the data chunks.
717                // It returns the set of row IDs that were updated.
718                let rewritten = self.lww_rewrite_for_field(
719                    &mut catalog_lock,
720                    field_id,
721                    &incoming_ids_map,
722                    batch_ref.column(i),
723                    batch_ref.column(row_id_idx),
724                    &mut puts_rewrites,
725                )?;
726                all_rewritten_ids.extend(rewritten);
727            }
728        }
729        drop(catalog_lock);
730
731        // Commit the LWW changes to the pager immediately.
732        if !puts_rewrites.is_empty() {
733            self.pager.batch_put(&puts_rewrites)?;
734        }
735
736        tracing::trace!("ColumnStore::append PHASE 2 complete - LWW rewrites done");
737
738        // --- PHASE 3: FILTERING FOR NEW ROWS ---
739        // After handling updates, we filter the incoming batch to remove the rows that were
740        // just rewritten. The remaining rows are guaranteed to be new additions to the store.
741        let batch_to_append = if !all_rewritten_ids.is_empty() {
742            let keep_mask: Vec<bool> = (0..row_id_arr.len())
743                .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
744                .collect();
745            let keep_array = BooleanArray::from(keep_mask);
746            compute::filter_record_batch(batch_ref, &keep_array)?
747        } else {
748            batch_ref.clone()
749        };
750
751        // If no new rows are left, we are done.
752        if batch_to_append.num_rows() == 0 {
753            tracing::trace!("ColumnStore::append early exit - no new rows to append");
754            return Ok(());
755        }
756
757        tracing::trace!("ColumnStore::append PHASE 3 complete - filtered for new rows");
758
759        // --- PHASE 4: APPENDING NEW DATA ---
760        // This is the main append transaction. All writes generated in this phase will be
761        // collected and committed atomically at the very end.
762        let append_schema = batch_to_append.schema();
763        let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
764        let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
765        let mut puts_appends: Vec<BatchPut> = Vec::new();
766
767        // Loop through each column of the filtered batch to append its data.
768        for (i, array) in batch_to_append.columns().iter().enumerate() {
769            if i == append_row_id_idx {
770                continue;
771            }
772
773            let field = append_schema.field(i);
774
775            let field_id = field
776                .metadata()
777                .get(crate::store::FIELD_ID_META_KEY)
778                .ok_or_else(|| Error::Internal("Missing field_id".into()))?
779                .parse::<u64>()
780                .map(LogicalFieldId::from)
781                .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
782
783            // Populate the data type cache for this column. This is a performance optimization
784            // to avoid reading a chunk from storage later just to determine its type.
785            self.dtype_cache.insert(field_id, field.data_type().clone());
786
787            // Null values are treated as deletions, so we filter them out. The `rids_clean`
788            // array contains the row IDs corresponding to the non-null values.
789            let (array_clean, rids_clean) = if array.null_count() == 0 {
790                (array.clone(), append_row_id_any.clone())
791            } else {
792                let keep =
793                    BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
794                let a = compute::filter(array, &keep)?;
795                let r = compute::filter(&append_row_id_any, &keep)?;
796                (a, r)
797            };
798
799            if array_clean.is_empty() {
800                continue;
801            }
802
803            // Get or create the physical keys for the column's data descriptor and its
804            // shadow row_id descriptor from the catalog.
805            let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
806                let mut catalog = self.catalog.write().unwrap();
807                let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
808                    catalog_dirty = true;
809                    self.pager.alloc_many(1).unwrap()[0]
810                });
811                let r_fid = rowid_fid(field_id);
812                let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
813                    catalog_dirty = true;
814                    self.pager.alloc_many(1).unwrap()[0]
815                });
816                (pk1, pk2, r_fid)
817            };
818
819            // Load the descriptors and their tail metadata pages into memory.
820            // If they don't exist, `load_or_create` will initialize new ones.
821            let (mut data_descriptor, mut data_tail_page) =
822                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
823                    .map_err(|e| {
824                        tracing::error!(
825                            ?field_id,
826                            descriptor_pk,
827                            error = ?e,
828                            "append: load_or_create failed for data descriptor"
829                        );
830                        e
831                    })?;
832            let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
833                Arc::clone(&self.pager),
834                rid_descriptor_pk,
835                rid_fid,
836            )
837            .map_err(|e| {
838                tracing::error!(
839                    ?rid_fid,
840                    rid_descriptor_pk,
841                    error = ?e,
842                    "append: load_or_create failed for rid descriptor"
843                );
844                e
845            })?;
846
847            // Logically register the Presence index on the main data descriptor. This ensures
848            // that even if no physical index chunks are created (because data arrived sorted),
849            // the system knows a Presence index is conceptually active for this column.
850            self.index_manager
851                .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
852
853            // Split the data to be appended into chunks of a target size.
854            let slices = split_to_target_bytes(
855                &array_clean,
856                TARGET_CHUNK_BYTES,
857                self.cfg.varwidth_fallback_rows_per_slice,
858            );
859            let mut row_off = 0usize;
860
861            // Loop through each new slice to create and stage its chunks.
862            for s in slices {
863                let rows = s.len();
864                // Create and stage the data chunk.
865                let data_pk = self.pager.alloc_many(1)?[0];
866                let s_norm = zero_offset(&s);
867                let data_bytes = serialize_array(s_norm.as_ref())?;
868                puts_appends.push(BatchPut::Raw {
869                    key: data_pk,
870                    bytes: data_bytes,
871                });
872
873                // Create and stage the corresponding row_id chunk.
874                let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
875                let rid_norm = zero_offset(&rid_slice);
876                let rid_pk = self.pager.alloc_many(1)?[0];
877                let rid_bytes = serialize_array(rid_norm.as_ref())?;
878                puts_appends.push(BatchPut::Raw {
879                    key: rid_pk,
880                    bytes: rid_bytes,
881                });
882
883                // Compute min/max for the row_id chunk to enable pruning during scans.
884                let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
885                let (min, max) = if !rids_for_meta.is_empty() {
886                    let mut min_val = rids_for_meta.value(0);
887                    let mut max_val = rids_for_meta.value(0);
888                    for i in 1..rids_for_meta.len() {
889                        let v = rids_for_meta.value(i);
890                        if v < min_val {
891                            min_val = v;
892                        }
893                        if v > max_val {
894                            max_val = v;
895                        }
896                    }
897                    (min_val, max_val)
898                } else {
899                    (0, 0)
900                };
901
902                // Create the initial metadata for both chunks.
903                // The `value_order_perm_pk` is initialized to 0 (None).
904                let mut data_meta = ChunkMetadata {
905                    chunk_pk: data_pk,
906                    row_count: rows as u64,
907                    serialized_bytes: s_norm.get_array_memory_size() as u64,
908                    max_val_u64: u64::MAX,
909                    ..Default::default()
910                };
911                let mut rid_meta = ChunkMetadata {
912                    chunk_pk: rid_pk,
913                    row_count: rows as u64,
914                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
915                    min_val_u64: min,
916                    max_val_u64: max,
917                    ..Default::default()
918                };
919
920                // **GENERIC INDEX UPDATE DISPATCH**
921                // This is the single, index-agnostic call. The IndexManager will look up all
922                // active indexes for this column (e.g., Presence, Sort) and call their respective
923                // `stage_update_for_new_chunk` methods. This is where the physical index data
924                // (like permutation blobs) is created and staged.
925                self.index_manager.stage_updates_for_new_chunk(
926                    field_id,
927                    &data_descriptor,
928                    &s_norm,
929                    &rid_norm,
930                    &mut data_meta,
931                    &mut rid_meta,
932                    &mut puts_appends,
933                )?;
934
935                // Append the (potentially modified) metadata to their respective descriptor chains.
936                self.append_meta_in_loop(
937                    &mut data_descriptor,
938                    &mut data_tail_page,
939                    data_meta,
940                    &mut puts_appends,
941                )?;
942                self.append_meta_in_loop(
943                    &mut rid_descriptor,
944                    &mut rid_tail_page,
945                    rid_meta,
946                    &mut puts_appends,
947                )?;
948                row_off += rows;
949            }
950
951            // After processing all slices, stage the final writes for the updated tail pages
952            // and the root descriptor objects themselves.
953            puts_appends.push(BatchPut::Raw {
954                key: data_descriptor.tail_page_pk,
955                bytes: data_tail_page,
956            });
957            puts_appends.push(BatchPut::Raw {
958                key: descriptor_pk,
959                bytes: data_descriptor.to_le_bytes(),
960            });
961            puts_appends.push(BatchPut::Raw {
962                key: rid_descriptor.tail_page_pk,
963                bytes: rid_tail_page,
964            });
965            puts_appends.push(BatchPut::Raw {
966                key: rid_descriptor_pk,
967                bytes: rid_descriptor.to_le_bytes(),
968            });
969        }
970
971        // --- PHASE 5: FINAL ATOMIC COMMIT ---
972        // If the catalog was modified (e.g., new columns were created), stage its write.
973        if catalog_dirty {
974            let catalog = self.catalog.read().unwrap();
975            puts_appends.push(BatchPut::Raw {
976                key: CATALOG_ROOT_PKEY,
977                bytes: catalog.to_bytes(),
978            });
979        }
980
981        // Commit all staged puts (new data chunks, new row_id chunks, new index permutations,
982        // updated descriptor pages, updated root descriptors, and the updated catalog)
983        // in a single atomic operation.
984        if !puts_appends.is_empty() {
985            self.pager.batch_put(&puts_appends)?;
986        }
987        tracing::trace!("ColumnStore::append END - success");
988        Ok(())
989    }
990
991    fn lww_rewrite_for_field(
992        &self,
993        catalog: &mut ColumnCatalog,
994        field_id: LogicalFieldId,
995        incoming_ids_map: &FxHashMap<u64, usize>,
996        incoming_data: &ArrayRef,
997        incoming_row_ids: &ArrayRef,
998        puts: &mut Vec<BatchPut>,
999    ) -> Result<FxHashSet<u64>> {
1000        use crate::store::descriptor::DescriptorIterator;
1001        use crate::store::ingest::ChunkEdit;
1002
1003        // Fast exit if nothing to rewrite.
1004        if incoming_ids_map.is_empty() {
1005            return Ok(FxHashSet::default());
1006        }
1007        let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
1008
1009        // Resolve descriptors for data and row_id columns.
1010        let desc_pk_data = match catalog.map.get(&field_id) {
1011            Some(pk) => *pk,
1012            None => return Ok(FxHashSet::default()),
1013        };
1014        let rid_fid = rowid_fid(field_id);
1015        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1016            Some(pk) => *pk,
1017            None => return Ok(FxHashSet::default()),
1018        };
1019
1020        // Batch fetch both descriptors.
1021        let gets = vec![
1022            BatchGet::Raw { key: desc_pk_data },
1023            BatchGet::Raw { key: desc_pk_rid },
1024        ];
1025        let results = self.pager.batch_get(&gets)?;
1026        let mut blobs_by_pk = FxHashMap::default();
1027        for r in results {
1028            if let GetResult::Raw { key, bytes } = r {
1029                blobs_by_pk.insert(key, bytes);
1030            }
1031        }
1032
1033        let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
1034            tracing::error!(
1035                ?field_id,
1036                desc_pk_data,
1037                "lww_rewrite: data descriptor blob not found in pager"
1038            );
1039            Error::NotFound
1040        })?;
1041        let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
1042
1043        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
1044            tracing::error!(
1045                ?rid_fid,
1046                desc_pk_rid,
1047                "lww_rewrite: rid descriptor blob not found in pager"
1048            );
1049            Error::NotFound
1050        })?;
1051        let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1052
1053        tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
1054
1055        // Collect chunk metadata.
1056        let mut metas_data: Vec<ChunkMetadata> = Vec::new();
1057        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1058        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
1059            metas_data.push(m.map_err(|e| {
1060                tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1061                e
1062            })?);
1063        }
1064        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1065            metas_rid.push(m.map_err(|e| {
1066                tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1067                e
1068            })?);
1069        }
1070
1071        tracing::trace!(
1072            ?field_id,
1073            data_chunks = metas_data.len(),
1074            rid_chunks = metas_rid.len(),
1075            "lww_rewrite: chunk metadata collected"
1076        );
1077
1078        // Classify incoming rows: delete vs upsert.
1079        let rid_in = incoming_row_ids
1080            .as_any()
1081            .downcast_ref::<UInt64Array>()
1082            .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1083        let mut ids_to_delete = FxHashSet::default();
1084        let mut ids_to_upsert = FxHashSet::default();
1085        for i in 0..rid_in.len() {
1086            let rid = rid_in.value(i);
1087            if incoming_data.is_null(i) {
1088                ids_to_delete.insert(rid);
1089            } else {
1090                ids_to_upsert.insert(rid);
1091            }
1092        }
1093
1094        // Scan row_id chunks to find hits, bucketed by chunk index.
1095        let mut rewritten_ids = FxHashSet::default();
1096        let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1097        let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1098
1099        let n = metas_data.len().min(metas_rid.len());
1100        if n > 0 {
1101            // Fetch only row_id chunks to locate matches.
1102            let mut gets_rid = Vec::with_capacity(n);
1103            for rm in metas_rid.iter().take(n) {
1104                gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
1105            }
1106            let rid_results = self.pager.batch_get(&gets_rid)?;
1107            let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1108            for r in rid_results {
1109                if let GetResult::Raw { key, bytes } = r {
1110                    rid_blobs.insert(key, bytes);
1111                }
1112            }
1113
1114            for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
1115                if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1116                    let rid_arr_any = deserialize_array(rid_blob.clone())?;
1117                    let rid_arr = rid_arr_any
1118                        .as_any()
1119                        .downcast_ref::<UInt64Array>()
1120                        .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1121                    for j in 0..rid_arr.len() {
1122                        let rid = rid_arr.value(j);
1123                        if incoming_ids.contains(&rid) {
1124                            if ids_to_delete.contains(&rid) {
1125                                hit_del.entry(i).or_default().push(rid);
1126                            } else if ids_to_upsert.contains(&rid) {
1127                                hit_up.entry(i).or_default().push(rid);
1128                            }
1129                            rewritten_ids.insert(rid);
1130                        }
1131                    }
1132                }
1133            }
1134        }
1135
1136        if hit_up.is_empty() && hit_del.is_empty() {
1137            return Ok(rewritten_ids);
1138        }
1139
1140        // Batch fetch data+rid blobs for all chunks with hits.
1141        let mut hit_set = FxHashSet::default();
1142        hit_set.extend(hit_up.keys().copied());
1143        hit_set.extend(hit_del.keys().copied());
1144        let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1145
1146        let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1147        for &i in &hit_idxs {
1148            gets.push(BatchGet::Raw {
1149                key: metas_data[i].chunk_pk,
1150            });
1151            gets.push(BatchGet::Raw {
1152                key: metas_rid[i].chunk_pk,
1153            });
1154        }
1155        let results = self.pager.batch_get(&gets)?;
1156        let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1157        for r in results {
1158            if let GetResult::Raw { key, bytes } = r {
1159                blob_map.insert(key, bytes);
1160            }
1161        }
1162
1163        // Apply per-chunk edits and stage writebacks.
1164        for i in hit_idxs {
1165            let old_data_arr =
1166                deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1167            let old_rid_arr_any =
1168                deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1169            let old_rid_arr = old_rid_arr_any
1170                .as_any()
1171                .downcast_ref::<UInt64Array>()
1172                .unwrap();
1173
1174            let up_vec = hit_up.remove(&i).unwrap_or_default();
1175            let del_vec = hit_del.remove(&i).unwrap_or_default();
1176
1177            // Centralized LWW edit: builds keep mask and inject tails.
1178            let edit = ChunkEdit::from_lww_upsert(
1179                old_rid_arr,
1180                &up_vec,
1181                &del_vec,
1182                incoming_data,
1183                incoming_row_ids,
1184                incoming_ids_map,
1185            )?;
1186
1187            let (new_data_arr, new_rid_arr) =
1188                ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1189
1190            // Stage data writeback.
1191            let data_bytes = serialize_array(&new_data_arr)?;
1192            puts.push(BatchPut::Raw {
1193                key: metas_data[i].chunk_pk,
1194                bytes: data_bytes,
1195            });
1196            metas_data[i].row_count = new_data_arr.len() as u64;
1197            metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1198
1199            // Stage row_id writeback.
1200            if let Some(rarr) = new_rid_arr {
1201                let rid_bytes = serialize_array(&rarr)?;
1202                puts.push(BatchPut::Raw {
1203                    key: metas_rid[i].chunk_pk,
1204                    bytes: rid_bytes,
1205                });
1206                metas_rid[i].row_count = rarr.len() as u64;
1207                metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1208            }
1209
1210            // Refresh permutation if present.
1211            if metas_data[i].value_order_perm_pk != 0 {
1212                let sort_col = SortColumn {
1213                    values: new_data_arr,
1214                    options: None,
1215                };
1216                let idx = lexsort_to_indices(&[sort_col], None)?;
1217                let perm_bytes = serialize_array(&idx)?;
1218                puts.push(BatchPut::Raw {
1219                    key: metas_data[i].value_order_perm_pk,
1220                    bytes: perm_bytes,
1221                });
1222            }
1223        }
1224
1225        // Rewrite descriptor chains/totals for both columns.
1226        descriptor_data.rewrite_pages(
1227            Arc::clone(&self.pager),
1228            desc_pk_data,
1229            &mut metas_data,
1230            puts,
1231        )?;
1232        descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1233
1234        Ok(rewritten_ids)
1235    }
1236
1237    fn stage_delete_rows_for_field(
1238        &self,
1239        field_id: LogicalFieldId,
1240        rows_to_delete: &[RowId],
1241        staged_puts: &mut Vec<BatchPut>,
1242    ) -> Result<bool> {
1243        tracing::warn!(
1244            field_id = ?field_id,
1245            rows = rows_to_delete.len(),
1246            "delete_rows stage_delete_rows_for_field: start"
1247        );
1248        use crate::store::descriptor::DescriptorIterator;
1249        use crate::store::ingest::ChunkEdit;
1250
1251        if rows_to_delete.is_empty() {
1252            return Ok(false);
1253        }
1254
1255        // Stream and validate ascending, unique positions.
1256        let mut del_iter = rows_to_delete.iter().copied();
1257        let mut cur_del = del_iter.next();
1258        let mut last_seen: Option<u64> = cur_del;
1259
1260        // Lookup descriptors (data and optional row_id).
1261        let catalog = self.catalog.read().unwrap();
1262        let desc_pk = match catalog.map.get(&field_id) {
1263            Some(pk) => *pk,
1264            None => {
1265                tracing::trace!(
1266                    field_id = ?field_id,
1267                    "delete_rows stage_delete_rows_for_field: data descriptor missing"
1268                );
1269                return Err(Error::NotFound);
1270            }
1271        };
1272        let rid_fid = rowid_fid(field_id);
1273        let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1274        tracing::warn!(
1275            field_id = ?field_id,
1276            desc_pk,
1277            desc_pk_rid = ?desc_pk_rid,
1278            "delete_rows stage_delete_rows_for_field: descriptor keys"
1279        );
1280
1281        // Batch fetch descriptor blobs up front.
1282        let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1283        if let Some(pk) = desc_pk_rid {
1284            gets.push(BatchGet::Raw { key: pk });
1285        }
1286        let results = match self.pager.batch_get(&gets) {
1287            Ok(res) => res,
1288            Err(err) => {
1289                tracing::trace!(
1290                    field_id = ?field_id,
1291                    error = ?err,
1292                    "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1293                );
1294                return Err(err);
1295            }
1296        };
1297        let mut blobs_by_pk = FxHashMap::default();
1298        for res in results {
1299            if let GetResult::Raw { key, bytes } = res {
1300                blobs_by_pk.insert(key, bytes);
1301            }
1302        }
1303
1304        tracing::warn!(
1305            field_id = ?field_id,
1306            desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1307            rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1308            "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1309        );
1310
1311        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1312            tracing::trace!(
1313                field_id = ?field_id,
1314                desc_pk,
1315                "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1316            );
1317            Error::Internal(format!(
1318                "descriptor pk={} missing during delete_rows for field {:?}",
1319                desc_pk, field_id
1320            ))
1321        })?;
1322        let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1323
1324        // Build metas for data column.
1325        let mut metas: Vec<ChunkMetadata> = Vec::new();
1326        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1327            metas.push(m?);
1328        }
1329        if metas.is_empty() {
1330            drop(catalog);
1331            return Ok(false);
1332        }
1333
1334        // Optionally mirror metas for row_id column.
1335        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1336        let mut descriptor_rid: Option<ColumnDescriptor> = None;
1337        tracing::warn!(
1338            field_id = ?field_id,
1339            metas_len = metas.len(),
1340            desc_pk_rid = ?desc_pk_rid,
1341            "delete_rows stage_delete_rows_for_field: data metas loaded"
1342        );
1343        if let Some(pk_rid) = desc_pk_rid
1344            && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1345        {
1346            let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1347            for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1348                metas_rid.push(m?);
1349            }
1350            descriptor_rid = Some(d_rid);
1351        }
1352
1353        tracing::warn!(
1354            field_id = ?field_id,
1355            metas_rid_len = metas_rid.len(),
1356            "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1357        );
1358
1359        let mut cum_rows = 0u64;
1360        let mut any_changed = false;
1361
1362        for (i, meta) in metas.iter_mut().enumerate() {
1363            let start_u64 = cum_rows;
1364            let end_u64 = start_u64 + meta.row_count;
1365
1366            // Advance deletes into this chunk window [start, end).
1367            while let Some(d) = cur_del {
1368                if d < start_u64
1369                    && let Some(prev) = last_seen
1370                {
1371                    if d < prev {
1372                        return Err(Error::Internal(
1373                            "rows_to_delete must be ascending/unique".into(),
1374                        ));
1375                    }
1376
1377                    last_seen = Some(d);
1378                    cur_del = del_iter.next();
1379                } else {
1380                    break;
1381                }
1382            }
1383
1384            // Collect local delete indices.
1385            let rows = meta.row_count as usize;
1386            let mut del_local: FxHashSet<usize> = FxHashSet::default();
1387            while let Some(d) = cur_del {
1388                if d >= end_u64 {
1389                    break;
1390                }
1391                del_local.insert((d - start_u64) as usize);
1392                last_seen = Some(d);
1393                cur_del = del_iter.next();
1394            }
1395
1396            if del_local.is_empty() {
1397                cum_rows = end_u64;
1398                continue;
1399            }
1400
1401            // Batch get chunk blobs (data and optional row_id).
1402            let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1403            if let Some(rm) = metas_rid.get(i) {
1404                chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1405            }
1406            let chunk_results = match self.pager.batch_get(&chunk_gets) {
1407                Ok(res) => res,
1408                Err(err) => {
1409                    tracing::trace!(
1410                        field_id = ?field_id,
1411                        chunk_pk = meta.chunk_pk,
1412                        error = ?err,
1413                        "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1414                    );
1415                    return Err(err);
1416                }
1417            };
1418            let mut chunk_blobs = FxHashMap::default();
1419            for res in chunk_results {
1420                if let GetResult::Raw { key, bytes } = res {
1421                    chunk_blobs.insert(key, bytes);
1422                }
1423            }
1424
1425            tracing::warn!(
1426                field_id = ?field_id,
1427                chunk_pk = meta.chunk_pk,
1428                rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1429                data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1430                rid_found = metas_rid
1431                    .get(i)
1432                    .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1433                "delete_rows stage_delete_rows_for_field: chunk fetch status"
1434            );
1435
1436            let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1437                Some(bytes) => bytes,
1438                None => {
1439                    tracing::trace!(
1440                        field_id = ?field_id,
1441                        chunk_pk = meta.chunk_pk,
1442                        "delete_rows stage_delete_rows_for_field: chunk missing"
1443                    );
1444                    return Err(Error::NotFound);
1445                }
1446            };
1447            let data_arr = deserialize_array(data_blob)?;
1448
1449            let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1450                let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1451                    Some(bytes) => bytes,
1452                    None => {
1453                        tracing::trace!(
1454                            field_id = ?field_id,
1455                            rowid_chunk_pk = rm.chunk_pk,
1456                            "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1457                        );
1458                        return Err(Error::NotFound);
1459                    }
1460                };
1461                Some(deserialize_array(rid_blob)?)
1462            } else {
1463                None
1464            };
1465
1466            // *** Wired: build edit via ingest helper.
1467            let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1468
1469            // Apply edit (pure array ops).
1470            let (new_data_arr, new_rid_arr) =
1471                ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1472
1473            // Write back data.
1474            let data_bytes = serialize_array(&new_data_arr)?;
1475            staged_puts.push(BatchPut::Raw {
1476                key: meta.chunk_pk,
1477                bytes: data_bytes,
1478            });
1479            meta.row_count = new_data_arr.len() as u64;
1480            meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1481
1482            // Write back row_ids if present.
1483            if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1484                let rm = metas_rid.get_mut(i).unwrap();
1485                let rid_bytes = serialize_array(&rids)?;
1486                staged_puts.push(BatchPut::Raw {
1487                    key: rm.chunk_pk,
1488                    bytes: rid_bytes,
1489                });
1490                rm.row_count = rids.len() as u64;
1491                rm.serialized_bytes = rids.get_array_memory_size() as u64;
1492            }
1493
1494            // Refresh permutation if this chunk has one.
1495            if meta.value_order_perm_pk != 0 {
1496                let sort_column = SortColumn {
1497                    values: new_data_arr,
1498                    options: None,
1499                };
1500                let indices = lexsort_to_indices(&[sort_column], None)?;
1501                let perm_bytes = serialize_array(&indices)?;
1502                staged_puts.push(BatchPut::Raw {
1503                    key: meta.value_order_perm_pk,
1504                    bytes: perm_bytes,
1505                });
1506            }
1507
1508            cum_rows = end_u64;
1509            any_changed = true;
1510        }
1511
1512        // Rewrite descriptor chains/totals and commit.
1513        descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1514        if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1515            rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1516        }
1517        drop(catalog);
1518        tracing::trace!(
1519            field_id = ?field_id,
1520            changed = any_changed,
1521            "delete_rows stage_delete_rows_for_field: finished stage"
1522        );
1523        Ok(any_changed)
1524    }
1525
1526    /// Delete row positions for one or more logical fields in a single atomic batch.
1527    ///
1528    /// The same set of global row positions is applied to every field in
1529    /// `fields`. All staged metadata and chunk updates are committed in a
1530    /// single pager batch.
1531    pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1532        if fields.is_empty() || rows_to_delete.is_empty() {
1533            return Ok(());
1534        }
1535
1536        let mut puts = Vec::new();
1537        let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1538        let mut table_id: Option<TableId> = None;
1539
1540        tracing::warn!(
1541            fields = fields.len(),
1542            rows = rows_to_delete.len(),
1543            "delete_rows begin"
1544        );
1545        for field_id in fields {
1546            tracing::warn!(field = ?field_id, "delete_rows iter field");
1547            if let Some(expected) = table_id {
1548                if field_id.table_id() != expected {
1549                    return Err(Error::InvalidArgumentError(
1550                        "delete_rows requires fields from the same table".into(),
1551                    ));
1552                }
1553            } else {
1554                table_id = Some(field_id.table_id());
1555            }
1556
1557            if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1558                touched.insert(*field_id);
1559            }
1560        }
1561
1562        if puts.is_empty() {
1563            return Ok(());
1564        }
1565
1566        self.pager.batch_put(&puts)?;
1567
1568        tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1569
1570        for field_id in touched {
1571            self.compact_field_bounded(field_id)?;
1572        }
1573        tracing::warn!("delete_rows complete");
1574        Ok(())
1575    }
1576
1577    // TODO: Move to descriptor module?
1578    // NOTE: This helper mutates descriptor metadata in place because it needs
1579    // access to the pager and catalog locks owned by `ColumnStore`.
1580    /// Write a descriptor chain from a meta slice. Reuses first page when
1581    /// possible. Frees surplus pages via `frees`.
1582    pub(crate) fn write_descriptor_chain(
1583        &self,
1584        descriptor_pk: PhysicalKey,
1585        descriptor: &mut ColumnDescriptor,
1586        new_metas: &[ChunkMetadata],
1587        puts: &mut Vec<BatchPut>,
1588        frees: &mut Vec<PhysicalKey>,
1589    ) -> Result<()> {
1590        // Collect existing page chain.
1591        let mut old_pages = Vec::new();
1592        let mut pk = descriptor.head_page_pk;
1593        while pk != 0 {
1594            let page_blob = self
1595                .pager
1596                .batch_get(&[BatchGet::Raw { key: pk }])?
1597                .pop()
1598                .and_then(|res| match res {
1599                    GetResult::Raw { bytes, .. } => Some(bytes),
1600                    _ => None,
1601                })
1602                .ok_or(Error::NotFound)?;
1603            let header = DescriptorPageHeader::from_le_bytes(
1604                &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1605            );
1606            old_pages.push(pk);
1607            pk = header.next_page_pk;
1608        }
1609
1610        // Required pages for new metas.
1611        let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1612        let need_pages = if new_metas.is_empty() {
1613            0
1614        } else {
1615            new_metas.len().div_ceil(per)
1616        };
1617        // If empty: free everything and clear counters.
1618        if need_pages == 0 {
1619            frees.extend(old_pages.iter().copied());
1620
1621            // Setting both page pointers to 0 indicates an empty descriptor state.
1622            // This signals that the descriptor needs reinitialization on next access,
1623            // as this pattern (head_page_pk == 0 && tail_page_pk == 0) is checked
1624            // in the `load_or_create` method to determine if allocation is needed.
1625            descriptor.head_page_pk = 0;
1626            descriptor.tail_page_pk = 0;
1627            descriptor.total_row_count = 0;
1628            descriptor.total_chunk_count = 0;
1629            puts.push(BatchPut::Raw {
1630                key: descriptor_pk,
1631                bytes: descriptor.to_le_bytes(),
1632            });
1633            return Ok(());
1634        }
1635
1636        // Reuse first page; alloc more if needed.
1637        let mut pages = Vec::with_capacity(need_pages);
1638        if !old_pages.is_empty() {
1639            pages.push(old_pages[0]);
1640        } else {
1641            pages.push(self.pager.alloc_many(1)?[0]);
1642            descriptor.head_page_pk = pages[0];
1643        }
1644        if need_pages > pages.len() {
1645            let extra = self.pager.alloc_many(need_pages - pages.len())?;
1646            pages.extend(extra);
1647        }
1648
1649        // Free surplus old pages.
1650        if old_pages.len() > need_pages {
1651            frees.extend(old_pages[need_pages..].iter().copied());
1652        }
1653
1654        // Write page contents.
1655        let mut off = 0usize;
1656        for (i, page_pk) in pages.iter().copied().enumerate() {
1657            let remain = new_metas.len() - off;
1658            let count = remain.min(per);
1659            let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1660            let header = DescriptorPageHeader {
1661                next_page_pk: next,
1662                entry_count: count as u32,
1663                _padding: [0; 4],
1664            };
1665            let mut page_bytes = header.to_le_bytes().to_vec();
1666            for m in &new_metas[off..off + count] {
1667                page_bytes.extend_from_slice(&m.to_le_bytes());
1668            }
1669            puts.push(BatchPut::Raw {
1670                key: page_pk,
1671                bytes: page_bytes,
1672            });
1673            off += count;
1674        }
1675
1676        descriptor.tail_page_pk = *pages.last().unwrap();
1677        descriptor.total_chunk_count = new_metas.len() as u64;
1678        descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1679        puts.push(BatchPut::Raw {
1680            key: descriptor_pk,
1681            bytes: descriptor.to_le_bytes(),
1682        });
1683        Ok(())
1684    }
1685
1686    /// Bounded, local field compaction. Merges adjacent small chunks into
1687    /// ~TARGET_CHUNK_BYTES; leaves large chunks intact. Recomputes perms if
1688    /// any source chunk in a run had one. Frees obsolete chunks/pages.
1689    fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1690        // We may rewrite descriptors; take a write lock.
1691        let mut catalog = self.catalog.write().unwrap();
1692
1693        let desc_pk = match catalog.map.get(&field_id) {
1694            Some(&pk) => pk,
1695            None => return Ok(()),
1696        };
1697        let rid_fid = rowid_fid(field_id);
1698        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1699            Some(&pk) => pk,
1700            None => return Ok(()),
1701        };
1702
1703        // True batching for the two descriptor reads.
1704        let gets = vec![
1705            BatchGet::Raw { key: desc_pk },
1706            BatchGet::Raw { key: desc_pk_rid },
1707        ];
1708        let results = self.pager.batch_get(&gets)?;
1709        let mut blobs_by_pk = FxHashMap::default();
1710        for res in results {
1711            if let GetResult::Raw { key, bytes } = res {
1712                blobs_by_pk.insert(key, bytes);
1713            }
1714        }
1715        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1716        let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1717        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1718        let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1719
1720        // Load metas.
1721        let mut metas = Vec::new();
1722        for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1723            metas.push(m?);
1724        }
1725        let mut metas_rid = Vec::new();
1726        for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1727            metas_rid.push(m?);
1728        }
1729        if metas.is_empty() || metas_rid.is_empty() {
1730            return Ok(());
1731        }
1732
1733        let mut puts: Vec<BatchPut> = Vec::new();
1734        let mut frees: Vec<PhysicalKey> = Vec::new();
1735        let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1736        let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1737
1738        let mut i = 0usize;
1739        while i < metas.len() {
1740            let sz = metas[i].serialized_bytes as usize;
1741            // Keep large chunks as-is.
1742            if sz >= MIN_CHUNK_BYTES {
1743                new_metas.push(metas[i]);
1744                new_rid_metas.push(metas_rid[i]);
1745                i += 1;
1746                continue;
1747            }
1748
1749            // Build a small run [i, j) capped by MAX_MERGE_RUN_BYTES.
1750            let mut j = i;
1751            let mut run_bytes = 0usize;
1752            while j < metas.len() {
1753                let b = metas[j].serialized_bytes as usize;
1754                if b >= TARGET_CHUNK_BYTES {
1755                    break;
1756                }
1757                if run_bytes + b > MAX_MERGE_RUN_BYTES {
1758                    break;
1759                }
1760                run_bytes += b;
1761                j += 1;
1762            }
1763            if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1764                new_metas.push(metas[i]);
1765                new_rid_metas.push(metas_rid[i]);
1766                i += 1;
1767                continue;
1768            }
1769
1770            // Fetch and concatenate the run's data and row_ids.
1771            let mut gets = Vec::with_capacity((j - i) * 2);
1772            for k in i..j {
1773                gets.push(BatchGet::Raw {
1774                    key: metas[k].chunk_pk,
1775                });
1776                gets.push(BatchGet::Raw {
1777                    key: metas_rid[k].chunk_pk,
1778                });
1779            }
1780            let results = self.pager.batch_get(&gets)?;
1781            let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1782            for r in results {
1783                match r {
1784                    GetResult::Raw { key, bytes } => {
1785                        by_pk.insert(key, bytes);
1786                    }
1787                    _ => return Err(Error::NotFound),
1788                }
1789            }
1790            let mut data_parts = Vec::with_capacity(j - i);
1791            let mut rid_parts = Vec::with_capacity(j - i);
1792            for k in i..j {
1793                let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1794                data_parts.push(deserialize_array(db.clone())?);
1795                let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1796                rid_parts.push(deserialize_array(rb.clone())?);
1797            }
1798            let merged_data = concat_many(data_parts.iter().collect())?;
1799            let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1800
1801            // Split merged run into ~target-sized chunks.
1802            let slices = split_to_target_bytes(
1803                &merged_data,
1804                TARGET_CHUNK_BYTES,
1805                self.cfg.varwidth_fallback_rows_per_slice,
1806            );
1807            let mut rid_off = 0usize;
1808            let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1809
1810            for s in slices {
1811                let rows = s.len();
1812                // Slice rid to match s (avoid double Arc).
1813                let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1814                let rid_norm = zero_offset(&rid_ref);
1815                let rid_pk = self.pager.alloc_many(1)?[0];
1816                let rid_bytes = serialize_array(rid_norm.as_ref())?;
1817
1818                let data_pk = self.pager.alloc_many(1)?[0];
1819                let s_norm = zero_offset(&s);
1820                let data_bytes = serialize_array(s_norm.as_ref())?;
1821                puts.push(BatchPut::Raw {
1822                    key: data_pk,
1823                    bytes: data_bytes,
1824                });
1825                puts.push(BatchPut::Raw {
1826                    key: rid_pk,
1827                    bytes: rid_bytes,
1828                });
1829                let mut meta = ChunkMetadata {
1830                    chunk_pk: data_pk,
1831                    row_count: rows as u64,
1832                    serialized_bytes: s_norm.get_array_memory_size() as u64,
1833                    max_val_u64: u64::MAX,
1834                    ..Default::default()
1835                };
1836                // If any source chunk had a perm, recompute for this slice.
1837                if need_perms {
1838                    let sort_col = SortColumn {
1839                        values: s.clone(),
1840                        options: None,
1841                    };
1842                    let idx = lexsort_to_indices(&[sort_col], None)?;
1843                    let perm_bytes = serialize_array(&idx)?;
1844                    let perm_pk = self.pager.alloc_many(1)?[0];
1845                    puts.push(BatchPut::Raw {
1846                        key: perm_pk,
1847                        bytes: perm_bytes,
1848                    });
1849                    meta.value_order_perm_pk = perm_pk;
1850                }
1851
1852                // Build presence index for rids and min/max
1853                let rid_any = rid_norm.clone();
1854                let rids = rid_any
1855                    .as_any()
1856                    .downcast_ref::<UInt64Array>()
1857                    .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1858                let mut min = u64::MAX;
1859                let mut max = 0u64;
1860                let mut sorted_rids = true;
1861                let mut last_v = 0u64;
1862                for ii in 0..rids.len() {
1863                    let v = rids.value(ii);
1864                    if ii == 0 {
1865                        last_v = v;
1866                    } else if v < last_v {
1867                        sorted_rids = false;
1868                    } else {
1869                        last_v = v;
1870                    }
1871                    if v < min {
1872                        min = v;
1873                    }
1874                    if v > max {
1875                        max = v;
1876                    }
1877                }
1878                let mut rid_perm_pk = 0u64;
1879                if !sorted_rids {
1880                    let rid_sort_col = SortColumn {
1881                        values: rid_any,
1882                        options: None,
1883                    };
1884                    let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1885                    let rid_perm_bytes = serialize_array(&rid_idx)?;
1886                    rid_perm_pk = self.pager.alloc_many(1)?[0];
1887                    puts.push(BatchPut::Raw {
1888                        key: rid_perm_pk,
1889                        bytes: rid_perm_bytes,
1890                    });
1891                }
1892                let rid_meta = ChunkMetadata {
1893                    chunk_pk: rid_pk,
1894                    value_order_perm_pk: rid_perm_pk,
1895                    row_count: rows as u64,
1896                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
1897                    min_val_u64: if rows > 0 { min } else { 0 },
1898                    max_val_u64: if rows > 0 { max } else { 0 },
1899                };
1900                new_metas.push(meta);
1901                new_rid_metas.push(rid_meta);
1902                rid_off += rows;
1903            }
1904
1905            // Free all old data/rid/perms in the merged run.
1906            for k in i..j {
1907                frees.push(metas[k].chunk_pk);
1908                if metas[k].value_order_perm_pk != 0 {
1909                    frees.push(metas[k].value_order_perm_pk);
1910                }
1911                frees.push(metas_rid[k].chunk_pk);
1912                if metas_rid[k].value_order_perm_pk != 0 {
1913                    frees.push(metas_rid[k].value_order_perm_pk);
1914                }
1915            }
1916
1917            i = j;
1918        }
1919
1920        // If everything was deleted, drop the field entirely.
1921        if new_metas.is_empty() {
1922            // Drop descriptors and catalog mapping.
1923            self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1924            self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1925            catalog.map.remove(&field_id);
1926            catalog.map.remove(&rid_fid);
1927            puts.push(BatchPut::Raw {
1928                key: CATALOG_ROOT_PKEY,
1929                bytes: catalog.to_bytes(),
1930            });
1931            if !puts.is_empty() {
1932                self.pager.batch_put(&puts)?;
1933            }
1934            if !frees.is_empty() {
1935                self.pager.free_many(&frees)?;
1936            }
1937            return Ok(());
1938        }
1939
1940        // Rewrite descriptor chains to match the new meta lists.
1941        self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1942        self.write_descriptor_chain(
1943            desc_pk_rid,
1944            &mut desc_rid,
1945            &new_rid_metas,
1946            &mut puts,
1947            &mut frees,
1948        )?;
1949        // Persist new/updated blobs and free the old ones.
1950        if !puts.is_empty() {
1951            self.pager.batch_put(&puts)?;
1952        }
1953        if !frees.is_empty() {
1954            self.pager.free_many(&frees)?;
1955        }
1956
1957        Ok(())
1958    }
1959
1960    /// (Internal) Helper for batch appends. Appends metadata to the current
1961    /// in-memory tail page, creating a new one if necessary.
1962    fn append_meta_in_loop(
1963        &self,
1964        descriptor: &mut ColumnDescriptor,
1965        tail_page_bytes: &mut Vec<u8>,
1966        meta: ChunkMetadata,
1967        puts: &mut Vec<BatchPut>,
1968    ) -> Result<()> {
1969        let mut header = DescriptorPageHeader::from_le_bytes(
1970            &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1971        );
1972        if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1973            && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1974        {
1975            // Case 1: There's room in the current tail page.
1976            tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1977            header.entry_count += 1;
1978            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1979                .copy_from_slice(&header.to_le_bytes());
1980        } else {
1981            // Case 2: The tail page is full. Write it out and start a new one.
1982            let new_tail_pk = self.pager.alloc_many(1)?[0];
1983            header.next_page_pk = new_tail_pk;
1984            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1985                .copy_from_slice(&header.to_le_bytes());
1986            // --- PERFORMANCE FIX: Move the full page's bytes instead of
1987            // cloning ---
1988            let full_page_to_write = std::mem::take(tail_page_bytes);
1989            puts.push(BatchPut::Raw {
1990                key: descriptor.tail_page_pk,
1991                bytes: full_page_to_write,
1992            });
1993            // Create the new tail page.
1994            let new_header = DescriptorPageHeader {
1995                next_page_pk: 0,
1996                entry_count: 1,
1997                _padding: [0; 4],
1998            };
1999            let mut new_page_bytes = new_header.to_le_bytes().to_vec();
2000            new_page_bytes.extend_from_slice(&meta.to_le_bytes());
2001            // Update our live state to point to the new tail.
2002            descriptor.tail_page_pk = new_tail_pk;
2003            *tail_page_bytes = new_page_bytes;
2004        }
2005
2006        descriptor.total_row_count += meta.row_count;
2007        descriptor.total_chunk_count += 1;
2008        Ok(())
2009    }
2010
2011    /// Verifies the integrity of the column store's metadata.
2012    ///
2013    /// This check is useful for tests and debugging. It verifies:
2014    /// 1. The catalog can be read.
2015    /// 2. All descriptor chains are walkable.
2016    /// 3. The row and chunk counts in the descriptors match the sum of the
2017    ///    chunk metadata.
2018    ///
2019    /// Returns `Ok(())` if consistent, otherwise returns an `Error`.
2020    pub fn verify_integrity(&self) -> Result<()> {
2021        let catalog = self.catalog.read().unwrap();
2022        for (&field_id, &descriptor_pk) in &catalog.map {
2023            let desc_blob = self
2024                .pager
2025                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2026                .pop()
2027                .and_then(|r| match r {
2028                    GetResult::Raw { bytes, .. } => Some(bytes),
2029                    _ => None,
2030                })
2031                .ok_or_else(|| {
2032                    Error::Internal(format!(
2033                        "Catalog points to missing descriptor pk={}",
2034                        descriptor_pk
2035                    ))
2036                })?;
2037            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2038            if descriptor.field_id != field_id {
2039                return Err(Error::Internal(format!(
2040                    "Descriptor at pk={} has wrong field_id: expected {:?}, \
2041                     got {:?}",
2042                    descriptor_pk, field_id, descriptor.field_id
2043                )));
2044            }
2045
2046            let mut actual_rows = 0;
2047            let mut actual_chunks = 0;
2048            let mut current_page_pk = descriptor.head_page_pk;
2049            while current_page_pk != 0 {
2050                let page_blob = self
2051                    .pager
2052                    .batch_get(&[BatchGet::Raw {
2053                        key: current_page_pk,
2054                    }])?
2055                    .pop()
2056                    .and_then(|r| match r {
2057                        GetResult::Raw { bytes, .. } => Some(bytes),
2058                        _ => None,
2059                    })
2060                    .ok_or_else(|| {
2061                        Error::Internal(format!(
2062                            "Descriptor page chain broken at pk={}",
2063                            current_page_pk
2064                        ))
2065                    })?;
2066                let header = DescriptorPageHeader::from_le_bytes(
2067                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2068                );
2069                for i in 0..(header.entry_count as usize) {
2070                    let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2071                    let end = off + ChunkMetadata::DISK_SIZE;
2072                    let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2073                    actual_rows += meta.row_count;
2074                    actual_chunks += 1;
2075                }
2076                current_page_pk = header.next_page_pk;
2077            }
2078
2079            if descriptor.total_row_count != actual_rows {
2080                return Err(Error::Internal(format!(
2081                    "Row count mismatch for field {:?}: descriptor says {}, \
2082                     actual is {}",
2083                    field_id, descriptor.total_row_count, actual_rows
2084                )));
2085            }
2086            if descriptor.total_chunk_count != actual_chunks {
2087                return Err(Error::Internal(format!(
2088                    "Chunk count mismatch for field {:?}: descriptor says {}, \
2089                     actual is {}",
2090                    field_id, descriptor.total_chunk_count, actual_chunks
2091                )));
2092            }
2093        }
2094        Ok(())
2095    }
2096
2097    /// Gathers detailed statistics about the storage layout.
2098    ///
2099    /// This method is designed for low-level analysis and debugging, allowing
2100    /// you to check for under- or over-utilization of descriptor pages.
2101    pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2102        let catalog = self.catalog.read().unwrap();
2103        let mut all_stats = Vec::new();
2104
2105        for (&field_id, &descriptor_pk) in &catalog.map {
2106            let desc_blob = self
2107                .pager
2108                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2109                .pop()
2110                .and_then(|r| match r {
2111                    GetResult::Raw { bytes, .. } => Some(bytes),
2112                    _ => None,
2113                })
2114                .ok_or(Error::NotFound)?;
2115            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2116
2117            let mut page_stats = Vec::new();
2118            let mut current_page_pk = descriptor.head_page_pk;
2119            while current_page_pk != 0 {
2120                let page_blob = self
2121                    .pager
2122                    .batch_get(&[BatchGet::Raw {
2123                        key: current_page_pk,
2124                    }])?
2125                    .pop()
2126                    .and_then(|r| match r {
2127                        GetResult::Raw { bytes, .. } => Some(bytes),
2128                        _ => None,
2129                    })
2130                    .ok_or(Error::NotFound)?;
2131                let header = DescriptorPageHeader::from_le_bytes(
2132                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2133                );
2134                page_stats.push(DescriptorPageStats {
2135                    page_pk: current_page_pk,
2136                    entry_count: header.entry_count,
2137                    page_size_bytes: page_blob.as_ref().len(),
2138                });
2139                current_page_pk = header.next_page_pk;
2140            }
2141
2142            all_stats.push(ColumnLayoutStats {
2143                field_id,
2144                total_rows: descriptor.total_row_count,
2145                total_chunks: descriptor.total_chunk_count,
2146                pages: page_stats,
2147            });
2148        }
2149        Ok(all_stats)
2150    }
2151}