llkv_column_map/store/
core.rs

1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4    ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16    constants::CATALOG_ROOT_PKEY,
17    pager::{BatchGet, BatchPut, GetResult, Pager},
18    serialization::{deserialize_array, serialize_array},
19    types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26/// Columnar storage engine for managing Arrow-based data.
27///
28/// `ColumnStore` provides the primary interface for persisting and retrieving columnar
29/// data using Apache Arrow [`RecordBatch`]es. It manages:
30///
31/// - Column descriptors and metadata (chunk locations, row counts, min/max values)
32/// - Data type caching for efficient schema queries
33/// - Index management (presence indexes, value indexes)
34/// - Integration with the [`Pager`] for persistent storage
35///
36/// # Namespaces
37///
38/// Columns are identified by [`LogicalFieldId`], which combines a namespace, table ID,
39/// and field ID. This prevents collisions between user data, row IDs, and MVCC metadata:
40///
41/// - `UserData`: Regular table columns
42/// - `RowIdShadow`: Internal row ID tracking
43/// - `TxnCreatedBy`: MVCC transaction creation timestamps
44/// - `TxnDeletedBy`: MVCC transaction deletion timestamps
45///
46/// # Thread Safety
47///
48/// `ColumnStore` is `Send + Sync` and can be safely shared across threads via `Arc`.
49/// Internal state (catalog, caches) uses `RwLock` for concurrent access.
50pub struct ColumnStore<P: Pager> {
51    pub(crate) pager: Arc<P>,
52    pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
53    cfg: ColumnStoreConfig,
54    dtype_cache: DTypeCache<P>,
55    index_manager: IndexManager<P>,
56}
57
58impl<P> Clone for ColumnStore<P>
59where
60    P: Pager<Blob = EntryHandle> + Send + Sync,
61{
62    fn clone(&self) -> Self {
63        Self {
64            pager: Arc::clone(&self.pager),
65            catalog: Arc::clone(&self.catalog),
66            cfg: self.cfg.clone(),
67            dtype_cache: self.dtype_cache.clone(),
68            index_manager: self.index_manager.clone(),
69        }
70    }
71}
72
73impl<P> ColumnStore<P>
74where
75    P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77    /// Opens or creates a `ColumnStore` using the provided pager.
78    ///
79    /// Loads the column catalog from the pager's root catalog key, or initializes
80    /// an empty catalog if none exists. The catalog maps [`LogicalFieldId`] to the
81    /// physical keys of column descriptors.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the pager fails to load the catalog or if deserialization fails.
86    pub fn open(pager: Arc<P>) -> Result<Self> {
87        let cfg = ColumnStoreConfig::default();
88        let catalog = match pager
89            .batch_get(&[BatchGet::Raw {
90                key: CATALOG_ROOT_PKEY,
91            }])?
92            .pop()
93        {
94            Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
95            _ => ColumnCatalog::default(),
96        };
97        let arc_catalog = Arc::new(RwLock::new(catalog));
98
99        let index_manager = IndexManager::new(Arc::clone(&pager));
100
101        Ok(Self {
102            pager: Arc::clone(&pager),
103            catalog: Arc::clone(&arc_catalog),
104            cfg,
105            dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
106            index_manager,
107        })
108    }
109
110    /// Creates and persists an index for a column.
111    ///
112    /// Builds the specified index type for all existing data in the column and
113    /// persists it atomically. The index will be maintained automatically on subsequent
114    /// appends and updates.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the column doesn't exist or if index creation fails.
119    pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
120        self.index_manager.register_index(self, field_id, kind)
121    }
122
123    /// Checks if a logical field is registered in the catalog.
124    pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
125        let catalog = self.catalog.read().unwrap();
126        catalog.map.contains_key(&field_id)
127    }
128
129    /// Removes a persisted index from a column.
130    ///
131    /// Atomically removes the index and frees associated storage. The column data
132    /// itself is not affected.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the column or index doesn't exist.
137    pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
138        self.index_manager.unregister_index(self, field_id, kind)
139    }
140
141    /// Returns the Arrow data type of a column.
142    ///
143    /// Returns the data type from cache if available, otherwise loads it from
144    /// the column descriptor and caches it for future queries.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
149    pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
150        if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
151            return Ok(dt);
152        }
153        self.dtype_cache.dtype_for_field(field_id)
154    }
155
156    /// Ensures that catalog entries and descriptors exist for a logical column.
157    ///
158    /// Primarily used when creating empty tables so that subsequent
159    /// operations (like `CREATE INDEX`) can resolve column metadata before any
160    /// data has been appended.
161    pub fn ensure_column_registered(
162        &self,
163        field_id: LogicalFieldId,
164        data_type: &DataType,
165    ) -> Result<()> {
166        let rid_field_id = rowid_fid(field_id);
167
168        let mut catalog_dirty = false;
169        let descriptor_pk;
170        let rid_descriptor_pk;
171
172        {
173            let mut catalog = self.catalog.write().unwrap();
174            descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
175                pk
176            } else {
177                let pk = self.pager.alloc_many(1)?[0];
178                catalog.map.insert(field_id, pk);
179                catalog_dirty = true;
180                pk
181            };
182
183            rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
184                pk
185            } else {
186                let pk = self.pager.alloc_many(1)?[0];
187                catalog.map.insert(rid_field_id, pk);
188                catalog_dirty = true;
189                pk
190            };
191        }
192
193        let mut puts: Vec<BatchPut> = Vec::new();
194
195        let data_descriptor_missing = self
196            .pager
197            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
198            .pop()
199            .and_then(|r| match r {
200                GetResult::Raw { bytes, .. } => Some(bytes),
201                _ => None,
202            })
203            .is_none();
204
205        if data_descriptor_missing {
206            let (mut descriptor, tail_page) =
207                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
208            let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
209            if fingerprint != 0 {
210                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
211            }
212            puts.push(BatchPut::Raw {
213                key: descriptor.tail_page_pk,
214                bytes: tail_page,
215            });
216            puts.push(BatchPut::Raw {
217                key: descriptor_pk,
218                bytes: descriptor.to_le_bytes(),
219            });
220        }
221
222        let rid_descriptor_missing = self
223            .pager
224            .batch_get(&[BatchGet::Raw {
225                key: rid_descriptor_pk,
226            }])?
227            .pop()
228            .and_then(|r| match r {
229                GetResult::Raw { bytes, .. } => Some(bytes),
230                _ => None,
231            })
232            .is_none();
233
234        if rid_descriptor_missing {
235            let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
236                Arc::clone(&self.pager),
237                rid_descriptor_pk,
238                rid_field_id,
239            )?;
240            let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
241            if fingerprint != 0 {
242                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
243            }
244            puts.push(BatchPut::Raw {
245                key: rid_descriptor.tail_page_pk,
246                bytes: tail_page,
247            });
248            puts.push(BatchPut::Raw {
249                key: rid_descriptor_pk,
250                bytes: rid_descriptor.to_le_bytes(),
251            });
252        }
253
254        self.dtype_cache.insert(field_id, data_type.clone());
255
256        if catalog_dirty {
257            let catalog_bytes = {
258                let catalog = self.catalog.read().unwrap();
259                catalog.to_bytes()
260            };
261            puts.push(BatchPut::Raw {
262                key: CATALOG_ROOT_PKEY,
263                bytes: catalog_bytes,
264            });
265        }
266
267        if !puts.is_empty() {
268            self.pager.batch_put(&puts)?;
269        }
270
271        Ok(())
272    }
273
274    /// Find all row IDs where a column satisfies a predicate.
275    ///
276    /// This evaluates the predicate against the column's data and returns a vector
277    /// of matching row IDs. Uses indexes and chunk metadata (min/max values) to
278    /// skip irrelevant data when possible.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
283    pub fn filter_row_ids<T>(
284        &self,
285        field_id: LogicalFieldId,
286        predicate: &Predicate<T::Value>,
287    ) -> Result<Vec<u64>>
288    where
289        T: FilterDispatch,
290    {
291        tracing::trace!(field=?field_id, "filter_row_ids start");
292        let res = T::run_filter(self, field_id, predicate);
293        if let Err(ref err) = res {
294            tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
295        } else {
296            tracing::trace!(field=?field_id, "filter_row_ids ok");
297        }
298        res
299    }
300
301    /// Evaluate a predicate against a column and return match metadata.
302    ///
303    /// This variant drives a primitive predicate over the column and returns a
304    /// [`FilterResult`] describing contiguous match regions. Callers can use the
305    /// result to build paginated scans or gather row identifiers.
306    ///
307    /// # Arguments
308    /// - `field_id`: Logical column to filter.
309    /// - `predicate`: Callable invoked on each value; should be cheap and free of
310    ///   side effects.
311    ///
312    /// # Errors
313    /// Returns an error if the column metadata cannot be loaded or if decoding a
314    /// chunk fails.
315    pub fn filter_matches<T, F>(
316        &self,
317        field_id: LogicalFieldId,
318        predicate: F,
319    ) -> Result<FilterResult>
320    where
321        T: FilterPrimitive,
322        F: FnMut(T::Native) -> bool,
323    {
324        T::run_filter_with_result(self, field_id, predicate)
325    }
326
327    /// List all indexes registered for a column.
328    ///
329    /// Returns the types of indexes (e.g., presence, value) that are currently
330    /// persisted for the specified column.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
335    pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
336        let catalog = self.catalog.read().unwrap();
337        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
338
339        let desc_blob = self
340            .pager
341            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
342            .pop()
343            .and_then(|r| match r {
344                GetResult::Raw { bytes, .. } => Some(bytes),
345                _ => None,
346            })
347            .ok_or(Error::NotFound)?;
348        let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
349
350        let kinds = descriptor.get_indexes()?;
351        Ok(kinds)
352    }
353
354    /// Get the total number of rows in a column.
355    ///
356    /// Returns the persisted row count from the column's descriptor. This value is
357    /// updated by append and delete operations.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
362    pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
363        let catalog = self.catalog.read().unwrap();
364        let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
365        drop(catalog);
366
367        let desc_blob = self
368            .pager
369            .batch_get(&[BatchGet::Raw { key: desc_pk }])?
370            .pop()
371            .and_then(|r| match r {
372                GetResult::Raw { bytes, .. } => Some(bytes),
373                _ => None,
374            })
375            .ok_or(Error::NotFound)?;
376
377        let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
378        Ok(desc.total_row_count)
379    }
380
381    /// Get the total number of rows in a table.
382    ///
383    /// This returns the maximum row count across all user-data columns in the table.
384    /// If the table has no persisted columns, returns `0`.
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if column descriptors cannot be loaded.
389    pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
390        use crate::types::Namespace;
391        // Acquire read lock on catalog and find any matching user-data field
392        let catalog = self.catalog.read().unwrap();
393        // Collect all user-data logical field ids for this table.
394        let candidates: Vec<LogicalFieldId> = catalog
395            .map
396            .keys()
397            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
398            .copied()
399            .collect();
400        drop(catalog);
401
402        if candidates.is_empty() {
403            return Ok(0);
404        }
405
406        // Return the maximum total_row_count across all user columns for the table.
407        let mut max_rows: u64 = 0;
408        for field in candidates {
409            let rows = self.total_rows_for_field(field)?;
410            if rows > max_rows {
411                max_rows = rows;
412            }
413        }
414        Ok(max_rows)
415    }
416
417    /// Get all user-data column IDs for a table.
418    ///
419    /// This returns the [`LogicalFieldId`]s of all persisted user columns (namespace
420    /// `UserData`) belonging to the specified table. MVCC and row ID columns are not
421    /// included.
422    pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
423        use crate::types::Namespace;
424
425        let catalog = self.catalog.read().unwrap();
426        catalog
427            .map
428            .keys()
429            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
430            .copied()
431            .collect()
432    }
433
434    /// Check whether a specific row ID exists in a column.
435    ///
436    /// This uses presence indexes and binary search when available for fast lookups.
437    /// If no presence index exists, it scans chunks and uses min/max metadata to
438    /// prune irrelevant data.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
443    pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
444        let rid_fid = rowid_fid(field_id);
445        let catalog = self.catalog.read().unwrap();
446        let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
447        let rid_desc_blob = self
448            .pager
449            .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
450            .pop()
451            .and_then(|r| match r {
452                GetResult::Raw { bytes, .. } => Some(bytes),
453                _ => None,
454            })
455            .ok_or(Error::NotFound)?;
456        let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
457        drop(catalog);
458
459        // Walk metas; prune by min/max when available.
460        for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
461            let meta = m?;
462            if meta.row_count == 0 {
463                continue;
464            }
465            if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
466                || row_id > meta.max_val_u64
467            {
468                continue;
469            }
470            // Fetch rid chunk and, if present, the presence perm
471            let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
472            if meta.value_order_perm_pk != 0 {
473                gets.push(BatchGet::Raw {
474                    key: meta.value_order_perm_pk,
475                });
476            }
477            let results = self.pager.batch_get(&gets)?;
478            let mut rid_blob: Option<EntryHandle> = None;
479            let mut perm_blob: Option<EntryHandle> = None;
480            for r in results {
481                if let GetResult::Raw { key, bytes } = r {
482                    if key == meta.chunk_pk {
483                        rid_blob = Some(bytes);
484                    } else if key == meta.value_order_perm_pk {
485                        perm_blob = Some(bytes);
486                    }
487                }
488            }
489            // If the rid blob for this chunk is missing, treat as absent and continue
490            let Some(rid_blob) = rid_blob else { continue };
491            let rid_any = deserialize_array(rid_blob)?;
492            let rids = rid_any
493                .as_any()
494                .downcast_ref::<UInt64Array>()
495                .ok_or_else(|| Error::Internal("rid downcast".into()))?;
496            if let Some(pblob) = perm_blob {
497                let perm_any = deserialize_array(pblob)?;
498                let perm = perm_any
499                    .as_any()
500                    .downcast_ref::<UInt32Array>()
501                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
502                // Binary search over sorted-by-perm view
503                let mut lo: isize = 0;
504                let mut hi: isize = (perm.len() as isize) - 1;
505                while lo <= hi {
506                    let mid = ((lo + hi) >> 1) as usize;
507                    let rid = rids.value(perm.value(mid) as usize);
508                    if rid == row_id {
509                        return Ok(true);
510                    } else if rid < row_id {
511                        lo = mid as isize + 1;
512                    } else {
513                        hi = mid as isize - 1;
514                    }
515                }
516            } else {
517                // Assume rid chunk is sorted ascending (common for appends/compaction) and binary search
518                let mut lo: isize = 0;
519                let mut hi: isize = (rids.len() as isize) - 1;
520                while lo <= hi {
521                    let mid = ((lo + hi) >> 1) as usize;
522                    let rid = rids.value(mid);
523                    if rid == row_id {
524                        return Ok(true);
525                    } else if rid < row_id {
526                        lo = mid as isize + 1;
527                    } else {
528                        hi = mid as isize - 1;
529                    }
530                }
531            }
532        }
533        Ok(false)
534    }
535
536    // NOTE: Row IDs must be provided explicitly today. Consider introducing an
537    // opt-in auto-increment mode once table-level schema metadata can enforce it.
538    /// Append a [`RecordBatch`] to the store.
539    ///
540    /// The batch must include a `rowid` column (type `UInt64`) that uniquely identifies
541    /// each row. Each other column must have `field_id` metadata mapping it to a
542    /// [`LogicalFieldId`].
543    ///
544    /// # Last-Write-Wins Updates
545    ///
546    /// If any row IDs in the batch already exist, they are updated in-place (overwritten)
547    /// rather than creating duplicates. This happens in a separate transaction before
548    /// appending new rows.
549    ///
550    /// # Row ID Ordering
551    ///
552    /// The batch is automatically sorted by `rowid` if not already sorted. This ensures
553    /// efficient metadata updates and naturally sorted shadow columns.
554    ///
555    /// # Table Separation
556    ///
557    /// Each batch should contain columns from only one table. To append to multiple
558    /// tables, call `append` separately for each table's batch (may be concurrent).
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if:
563    /// - The batch is missing the `rowid` column
564    /// - Column metadata is missing or invalid
565    /// - Storage operations fail
566    #[allow(unused_variables, unused_assignments)] // NOTE: Preserve variable hooks used during feature gating of presence indexes.
567    pub fn append(&self, batch: &RecordBatch) -> Result<()> {
568        tracing::trace!(
569            num_columns = batch.num_columns(),
570            num_rows = batch.num_rows(),
571            "ColumnStore::append BEGIN"
572        );
573        // --- PHASE 1: PRE-PROCESSING THE INCOMING BATCH ---
574        // The `append` logic relies on row IDs being processed in ascending order to handle
575        // metadata updates efficiently and to ensure the shadow row_id chunks are naturally sorted.
576        // This block checks if the incoming batch is already sorted by `row_id`. If not, it creates a
577        // new, sorted `RecordBatch` to work with for the rest of the function.
578        let working_batch: RecordBatch;
579        let batch_ref = {
580            let schema = batch.schema();
581            let row_id_idx = schema
582                .index_of(ROW_ID_COLUMN_NAME)
583                .map_err(|_| Error::Internal("row_id column required".into()))?;
584            let row_id_any = batch.column(row_id_idx).clone();
585            let row_id_arr = row_id_any
586                .as_any()
587                .downcast_ref::<UInt64Array>()
588                .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
589
590            // Manually check if the row_id column is sorted.
591            let mut is_sorted = true;
592            if !row_id_arr.is_empty() {
593                let mut last = row_id_arr.value(0);
594                for i in 1..row_id_arr.len() {
595                    let current = row_id_arr.value(i);
596                    if current < last {
597                        is_sorted = false;
598                        break;
599                    }
600                    last = current;
601                }
602            }
603
604            // If sorted, we can use the original batch directly.
605            // Otherwise, we compute a permutation and reorder all columns.
606            if is_sorted {
607                batch
608            } else {
609                let sort_col = SortColumn {
610                    values: row_id_any,
611                    options: None,
612                };
613                let idx = lexsort_to_indices(&[sort_col], None)?;
614                let perm = idx
615                    .as_any()
616                    .downcast_ref::<UInt32Array>()
617                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
618                let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
619                for i in 0..batch.num_columns() {
620                    cols.push(compute::take(batch.column(i), perm, None)?);
621                }
622                working_batch = RecordBatch::try_new(schema.clone(), cols)
623                    .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
624                &working_batch
625            }
626        };
627
628        tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
629
630        // --- PHASE 2: LAST-WRITER-WINS (LWW) REWRITE ---
631        // This phase handles updates. It identifies any rows in the incoming batch that
632        // already exist in the store and rewrites them in-place. This is a separate
633        // transaction that happens before the main append of new rows.
634        let schema = batch_ref.schema();
635        let row_id_idx = schema
636            .index_of(ROW_ID_COLUMN_NAME)
637            .map_err(|_| Error::Internal("row_id column required".into()))?;
638
639        // Create a quick lookup map of incoming row IDs to their positions in the batch.
640        let row_id_arr = batch_ref
641            .column(row_id_idx)
642            .as_any()
643            .downcast_ref::<UInt64Array>()
644            .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
645        let mut incoming_ids_map = FxHashMap::default();
646        incoming_ids_map.reserve(row_id_arr.len());
647        for i in 0..row_id_arr.len() {
648            incoming_ids_map.insert(row_id_arr.value(i), i);
649        }
650
651        // These variables will track the state of the LWW transaction.
652        let mut catalog_dirty = false;
653        let mut puts_rewrites: Vec<BatchPut> = Vec::new();
654        let mut all_rewritten_ids = FxHashSet::default();
655
656        // Iterate through each column in the batch (except row_id) to perform rewrites.
657        let mut catalog_lock = self.catalog.write().unwrap();
658        for i in 0..batch_ref.num_columns() {
659            if i == row_id_idx {
660                continue;
661            }
662            let field = schema.field(i);
663            if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
664                let field_id = field_id_str
665                    .parse::<u64>()
666                    .map(LogicalFieldId::from)
667                    .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
668
669                // `lww_rewrite_for_field` finds overlapping row IDs and rewrites the data chunks.
670                // It returns the set of row IDs that were updated.
671                let rewritten = self.lww_rewrite_for_field(
672                    &mut catalog_lock,
673                    field_id,
674                    &incoming_ids_map,
675                    batch_ref.column(i),
676                    batch_ref.column(row_id_idx),
677                    &mut puts_rewrites,
678                )?;
679                all_rewritten_ids.extend(rewritten);
680            }
681        }
682        drop(catalog_lock);
683
684        // Commit the LWW changes to the pager immediately.
685        if !puts_rewrites.is_empty() {
686            self.pager.batch_put(&puts_rewrites)?;
687        }
688
689        tracing::trace!("ColumnStore::append PHASE 2 complete - LWW rewrites done");
690
691        // --- PHASE 3: FILTERING FOR NEW ROWS ---
692        // After handling updates, we filter the incoming batch to remove the rows that were
693        // just rewritten. The remaining rows are guaranteed to be new additions to the store.
694        let batch_to_append = if !all_rewritten_ids.is_empty() {
695            let keep_mask: Vec<bool> = (0..row_id_arr.len())
696                .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
697                .collect();
698            let keep_array = BooleanArray::from(keep_mask);
699            compute::filter_record_batch(batch_ref, &keep_array)?
700        } else {
701            batch_ref.clone()
702        };
703
704        // If no new rows are left, we are done.
705        if batch_to_append.num_rows() == 0 {
706            tracing::trace!("ColumnStore::append early exit - no new rows to append");
707            return Ok(());
708        }
709
710        tracing::trace!("ColumnStore::append PHASE 3 complete - filtered for new rows");
711
712        // --- PHASE 4: APPENDING NEW DATA ---
713        // This is the main append transaction. All writes generated in this phase will be
714        // collected and committed atomically at the very end.
715        let append_schema = batch_to_append.schema();
716        let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
717        let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
718        let mut puts_appends: Vec<BatchPut> = Vec::new();
719
720        // Loop through each column of the filtered batch to append its data.
721        for (i, array) in batch_to_append.columns().iter().enumerate() {
722            if i == append_row_id_idx {
723                continue;
724            }
725
726            let field = append_schema.field(i);
727
728            let field_id = field
729                .metadata()
730                .get(crate::store::FIELD_ID_META_KEY)
731                .ok_or_else(|| Error::Internal("Missing field_id".into()))?
732                .parse::<u64>()
733                .map(LogicalFieldId::from)
734                .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
735
736            // Populate the data type cache for this column. This is a performance optimization
737            // to avoid reading a chunk from storage later just to determine its type.
738            self.dtype_cache.insert(field_id, field.data_type().clone());
739
740            // Null values are treated as deletions, so we filter them out. The `rids_clean`
741            // array contains the row IDs corresponding to the non-null values.
742            let (array_clean, rids_clean) = if array.null_count() == 0 {
743                (array.clone(), append_row_id_any.clone())
744            } else {
745                let keep =
746                    BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
747                let a = compute::filter(array, &keep)?;
748                let r = compute::filter(&append_row_id_any, &keep)?;
749                (a, r)
750            };
751
752            if array_clean.is_empty() {
753                continue;
754            }
755
756            // Get or create the physical keys for the column's data descriptor and its
757            // shadow row_id descriptor from the catalog.
758            let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
759                let mut catalog = self.catalog.write().unwrap();
760                let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
761                    catalog_dirty = true;
762                    self.pager.alloc_many(1).unwrap()[0]
763                });
764                let r_fid = rowid_fid(field_id);
765                let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
766                    catalog_dirty = true;
767                    self.pager.alloc_many(1).unwrap()[0]
768                });
769                (pk1, pk2, r_fid)
770            };
771
772            // Load the descriptors and their tail metadata pages into memory.
773            // If they don't exist, `load_or_create` will initialize new ones.
774            let (mut data_descriptor, mut data_tail_page) =
775                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
776                    .map_err(|e| {
777                        tracing::error!(
778                            ?field_id,
779                            descriptor_pk,
780                            error = ?e,
781                            "append: load_or_create failed for data descriptor"
782                        );
783                        e
784                    })?;
785            let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
786                Arc::clone(&self.pager),
787                rid_descriptor_pk,
788                rid_fid,
789            )
790            .map_err(|e| {
791                tracing::error!(
792                    ?rid_fid,
793                    rid_descriptor_pk,
794                    error = ?e,
795                    "append: load_or_create failed for rid descriptor"
796                );
797                e
798            })?;
799
800            // Logically register the Presence index on the main data descriptor. This ensures
801            // that even if no physical index chunks are created (because data arrived sorted),
802            // the system knows a Presence index is conceptually active for this column.
803            self.index_manager
804                .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
805
806            // Split the data to be appended into chunks of a target size.
807            let slices = split_to_target_bytes(
808                &array_clean,
809                TARGET_CHUNK_BYTES,
810                self.cfg.varwidth_fallback_rows_per_slice,
811            );
812            let mut row_off = 0usize;
813
814            // Loop through each new slice to create and stage its chunks.
815            for s in slices {
816                let rows = s.len();
817                // Create and stage the data chunk.
818                let data_pk = self.pager.alloc_many(1)?[0];
819                let s_norm = zero_offset(&s);
820                let data_bytes = serialize_array(s_norm.as_ref())?;
821                puts_appends.push(BatchPut::Raw {
822                    key: data_pk,
823                    bytes: data_bytes,
824                });
825
826                // Create and stage the corresponding row_id chunk.
827                let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
828                let rid_norm = zero_offset(&rid_slice);
829                let rid_pk = self.pager.alloc_many(1)?[0];
830                let rid_bytes = serialize_array(rid_norm.as_ref())?;
831                puts_appends.push(BatchPut::Raw {
832                    key: rid_pk,
833                    bytes: rid_bytes,
834                });
835
836                // Compute min/max for the row_id chunk to enable pruning during scans.
837                let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
838                let (min, max) = if !rids_for_meta.is_empty() {
839                    let mut min_val = rids_for_meta.value(0);
840                    let mut max_val = rids_for_meta.value(0);
841                    for i in 1..rids_for_meta.len() {
842                        let v = rids_for_meta.value(i);
843                        if v < min_val {
844                            min_val = v;
845                        }
846                        if v > max_val {
847                            max_val = v;
848                        }
849                    }
850                    (min_val, max_val)
851                } else {
852                    (0, 0)
853                };
854
855                // Create the initial metadata for both chunks.
856                // The `value_order_perm_pk` is initialized to 0 (None).
857                let mut data_meta = ChunkMetadata {
858                    chunk_pk: data_pk,
859                    row_count: rows as u64,
860                    serialized_bytes: s_norm.get_array_memory_size() as u64,
861                    max_val_u64: u64::MAX,
862                    ..Default::default()
863                };
864                let mut rid_meta = ChunkMetadata {
865                    chunk_pk: rid_pk,
866                    row_count: rows as u64,
867                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
868                    min_val_u64: min,
869                    max_val_u64: max,
870                    ..Default::default()
871                };
872
873                // **GENERIC INDEX UPDATE DISPATCH**
874                // This is the single, index-agnostic call. The IndexManager will look up all
875                // active indexes for this column (e.g., Presence, Sort) and call their respective
876                // `stage_update_for_new_chunk` methods. This is where the physical index data
877                // (like permutation blobs) is created and staged.
878                self.index_manager.stage_updates_for_new_chunk(
879                    field_id,
880                    &data_descriptor,
881                    &s_norm,
882                    &rid_norm,
883                    &mut data_meta,
884                    &mut rid_meta,
885                    &mut puts_appends,
886                )?;
887
888                // Append the (potentially modified) metadata to their respective descriptor chains.
889                self.append_meta_in_loop(
890                    &mut data_descriptor,
891                    &mut data_tail_page,
892                    data_meta,
893                    &mut puts_appends,
894                )?;
895                self.append_meta_in_loop(
896                    &mut rid_descriptor,
897                    &mut rid_tail_page,
898                    rid_meta,
899                    &mut puts_appends,
900                )?;
901                row_off += rows;
902            }
903
904            // After processing all slices, stage the final writes for the updated tail pages
905            // and the root descriptor objects themselves.
906            puts_appends.push(BatchPut::Raw {
907                key: data_descriptor.tail_page_pk,
908                bytes: data_tail_page,
909            });
910            puts_appends.push(BatchPut::Raw {
911                key: descriptor_pk,
912                bytes: data_descriptor.to_le_bytes(),
913            });
914            puts_appends.push(BatchPut::Raw {
915                key: rid_descriptor.tail_page_pk,
916                bytes: rid_tail_page,
917            });
918            puts_appends.push(BatchPut::Raw {
919                key: rid_descriptor_pk,
920                bytes: rid_descriptor.to_le_bytes(),
921            });
922        }
923
924        // --- PHASE 5: FINAL ATOMIC COMMIT ---
925        // If the catalog was modified (e.g., new columns were created), stage its write.
926        if catalog_dirty {
927            let catalog = self.catalog.read().unwrap();
928            puts_appends.push(BatchPut::Raw {
929                key: CATALOG_ROOT_PKEY,
930                bytes: catalog.to_bytes(),
931            });
932        }
933
934        // Commit all staged puts (new data chunks, new row_id chunks, new index permutations,
935        // updated descriptor pages, updated root descriptors, and the updated catalog)
936        // in a single atomic operation.
937        if !puts_appends.is_empty() {
938            self.pager.batch_put(&puts_appends)?;
939        }
940        tracing::trace!("ColumnStore::append END - success");
941        Ok(())
942    }
943
944    fn lww_rewrite_for_field(
945        &self,
946        catalog: &mut ColumnCatalog,
947        field_id: LogicalFieldId,
948        incoming_ids_map: &FxHashMap<u64, usize>,
949        incoming_data: &ArrayRef,
950        incoming_row_ids: &ArrayRef,
951        puts: &mut Vec<BatchPut>,
952    ) -> Result<FxHashSet<u64>> {
953        use crate::store::descriptor::DescriptorIterator;
954        use crate::store::ingest::ChunkEdit;
955
956        // Fast exit if nothing to rewrite.
957        if incoming_ids_map.is_empty() {
958            return Ok(FxHashSet::default());
959        }
960        let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
961
962        // Resolve descriptors for data and row_id columns.
963        let desc_pk_data = match catalog.map.get(&field_id) {
964            Some(pk) => *pk,
965            None => return Ok(FxHashSet::default()),
966        };
967        let rid_fid = rowid_fid(field_id);
968        let desc_pk_rid = match catalog.map.get(&rid_fid) {
969            Some(pk) => *pk,
970            None => return Ok(FxHashSet::default()),
971        };
972
973        // Batch fetch both descriptors.
974        let gets = vec![
975            BatchGet::Raw { key: desc_pk_data },
976            BatchGet::Raw { key: desc_pk_rid },
977        ];
978        let results = self.pager.batch_get(&gets)?;
979        let mut blobs_by_pk = FxHashMap::default();
980        for r in results {
981            if let GetResult::Raw { key, bytes } = r {
982                blobs_by_pk.insert(key, bytes);
983            }
984        }
985
986        let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
987            tracing::error!(
988                ?field_id,
989                desc_pk_data,
990                "lww_rewrite: data descriptor blob not found in pager"
991            );
992            Error::NotFound
993        })?;
994        let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
995
996        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
997            tracing::error!(
998                ?rid_fid,
999                desc_pk_rid,
1000                "lww_rewrite: rid descriptor blob not found in pager"
1001            );
1002            Error::NotFound
1003        })?;
1004        let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1005
1006        tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
1007
1008        // Collect chunk metadata.
1009        let mut metas_data: Vec<ChunkMetadata> = Vec::new();
1010        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1011        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
1012            metas_data.push(m.map_err(|e| {
1013                tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1014                e
1015            })?);
1016        }
1017        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1018            metas_rid.push(m.map_err(|e| {
1019                tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1020                e
1021            })?);
1022        }
1023
1024        tracing::trace!(
1025            ?field_id,
1026            data_chunks = metas_data.len(),
1027            rid_chunks = metas_rid.len(),
1028            "lww_rewrite: chunk metadata collected"
1029        );
1030
1031        // Classify incoming rows: delete vs upsert.
1032        let rid_in = incoming_row_ids
1033            .as_any()
1034            .downcast_ref::<UInt64Array>()
1035            .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1036        let mut ids_to_delete = FxHashSet::default();
1037        let mut ids_to_upsert = FxHashSet::default();
1038        for i in 0..rid_in.len() {
1039            let rid = rid_in.value(i);
1040            if incoming_data.is_null(i) {
1041                ids_to_delete.insert(rid);
1042            } else {
1043                ids_to_upsert.insert(rid);
1044            }
1045        }
1046
1047        // Scan row_id chunks to find hits, bucketed by chunk index.
1048        let mut rewritten_ids = FxHashSet::default();
1049        let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1050        let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1051
1052        let n = metas_data.len().min(metas_rid.len());
1053        if n > 0 {
1054            // Fetch only row_id chunks to locate matches.
1055            let mut gets_rid = Vec::with_capacity(n);
1056            for rm in metas_rid.iter().take(n) {
1057                gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
1058            }
1059            let rid_results = self.pager.batch_get(&gets_rid)?;
1060            let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1061            for r in rid_results {
1062                if let GetResult::Raw { key, bytes } = r {
1063                    rid_blobs.insert(key, bytes);
1064                }
1065            }
1066
1067            for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
1068                if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1069                    let rid_arr_any = deserialize_array(rid_blob.clone())?;
1070                    let rid_arr = rid_arr_any
1071                        .as_any()
1072                        .downcast_ref::<UInt64Array>()
1073                        .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1074                    for j in 0..rid_arr.len() {
1075                        let rid = rid_arr.value(j);
1076                        if incoming_ids.contains(&rid) {
1077                            if ids_to_delete.contains(&rid) {
1078                                hit_del.entry(i).or_default().push(rid);
1079                            } else if ids_to_upsert.contains(&rid) {
1080                                hit_up.entry(i).or_default().push(rid);
1081                            }
1082                            rewritten_ids.insert(rid);
1083                        }
1084                    }
1085                }
1086            }
1087        }
1088
1089        if hit_up.is_empty() && hit_del.is_empty() {
1090            return Ok(rewritten_ids);
1091        }
1092
1093        // Batch fetch data+rid blobs for all chunks with hits.
1094        let mut hit_set = FxHashSet::default();
1095        hit_set.extend(hit_up.keys().copied());
1096        hit_set.extend(hit_del.keys().copied());
1097        let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1098
1099        let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1100        for &i in &hit_idxs {
1101            gets.push(BatchGet::Raw {
1102                key: metas_data[i].chunk_pk,
1103            });
1104            gets.push(BatchGet::Raw {
1105                key: metas_rid[i].chunk_pk,
1106            });
1107        }
1108        let results = self.pager.batch_get(&gets)?;
1109        let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1110        for r in results {
1111            if let GetResult::Raw { key, bytes } = r {
1112                blob_map.insert(key, bytes);
1113            }
1114        }
1115
1116        // Apply per-chunk edits and stage writebacks.
1117        for i in hit_idxs {
1118            let old_data_arr =
1119                deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1120            let old_rid_arr_any =
1121                deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1122            let old_rid_arr = old_rid_arr_any
1123                .as_any()
1124                .downcast_ref::<UInt64Array>()
1125                .unwrap();
1126
1127            let up_vec = hit_up.remove(&i).unwrap_or_default();
1128            let del_vec = hit_del.remove(&i).unwrap_or_default();
1129
1130            // Centralized LWW edit: builds keep mask and inject tails.
1131            let edit = ChunkEdit::from_lww_upsert(
1132                old_rid_arr,
1133                &up_vec,
1134                &del_vec,
1135                incoming_data,
1136                incoming_row_ids,
1137                incoming_ids_map,
1138            )?;
1139
1140            let (new_data_arr, new_rid_arr) =
1141                ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1142
1143            // Stage data writeback.
1144            let data_bytes = serialize_array(&new_data_arr)?;
1145            puts.push(BatchPut::Raw {
1146                key: metas_data[i].chunk_pk,
1147                bytes: data_bytes,
1148            });
1149            metas_data[i].row_count = new_data_arr.len() as u64;
1150            metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1151
1152            // Stage row_id writeback.
1153            if let Some(rarr) = new_rid_arr {
1154                let rid_bytes = serialize_array(&rarr)?;
1155                puts.push(BatchPut::Raw {
1156                    key: metas_rid[i].chunk_pk,
1157                    bytes: rid_bytes,
1158                });
1159                metas_rid[i].row_count = rarr.len() as u64;
1160                metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1161            }
1162
1163            // Refresh permutation if present.
1164            if metas_data[i].value_order_perm_pk != 0 {
1165                let sort_col = SortColumn {
1166                    values: new_data_arr,
1167                    options: None,
1168                };
1169                let idx = lexsort_to_indices(&[sort_col], None)?;
1170                let perm_bytes = serialize_array(&idx)?;
1171                puts.push(BatchPut::Raw {
1172                    key: metas_data[i].value_order_perm_pk,
1173                    bytes: perm_bytes,
1174                });
1175            }
1176        }
1177
1178        // Rewrite descriptor chains/totals for both columns.
1179        descriptor_data.rewrite_pages(
1180            Arc::clone(&self.pager),
1181            desc_pk_data,
1182            &mut metas_data,
1183            puts,
1184        )?;
1185        descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1186
1187        Ok(rewritten_ids)
1188    }
1189
1190    fn stage_delete_rows_for_field(
1191        &self,
1192        field_id: LogicalFieldId,
1193        rows_to_delete: &[RowId],
1194        staged_puts: &mut Vec<BatchPut>,
1195    ) -> Result<bool> {
1196        tracing::warn!(
1197            field_id = ?field_id,
1198            rows = rows_to_delete.len(),
1199            "delete_rows stage_delete_rows_for_field: start"
1200        );
1201        use crate::store::descriptor::DescriptorIterator;
1202        use crate::store::ingest::ChunkEdit;
1203
1204        if rows_to_delete.is_empty() {
1205            return Ok(false);
1206        }
1207
1208        // Stream and validate ascending, unique positions.
1209        let mut del_iter = rows_to_delete.iter().copied();
1210        let mut cur_del = del_iter.next();
1211        let mut last_seen: Option<u64> = cur_del;
1212
1213        // Lookup descriptors (data and optional row_id).
1214        let catalog = self.catalog.read().unwrap();
1215        let desc_pk = match catalog.map.get(&field_id) {
1216            Some(pk) => *pk,
1217            None => {
1218                tracing::trace!(
1219                    field_id = ?field_id,
1220                    "delete_rows stage_delete_rows_for_field: data descriptor missing"
1221                );
1222                return Err(Error::NotFound);
1223            }
1224        };
1225        let rid_fid = rowid_fid(field_id);
1226        let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1227        tracing::warn!(
1228            field_id = ?field_id,
1229            desc_pk,
1230            desc_pk_rid = ?desc_pk_rid,
1231            "delete_rows stage_delete_rows_for_field: descriptor keys"
1232        );
1233
1234        // Batch fetch descriptor blobs up front.
1235        let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1236        if let Some(pk) = desc_pk_rid {
1237            gets.push(BatchGet::Raw { key: pk });
1238        }
1239        let results = match self.pager.batch_get(&gets) {
1240            Ok(res) => res,
1241            Err(err) => {
1242                tracing::trace!(
1243                    field_id = ?field_id,
1244                    error = ?err,
1245                    "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1246                );
1247                return Err(err);
1248            }
1249        };
1250        let mut blobs_by_pk = FxHashMap::default();
1251        for res in results {
1252            if let GetResult::Raw { key, bytes } = res {
1253                blobs_by_pk.insert(key, bytes);
1254            }
1255        }
1256
1257        tracing::warn!(
1258            field_id = ?field_id,
1259            desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1260            rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1261            "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1262        );
1263
1264        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1265            tracing::trace!(
1266                field_id = ?field_id,
1267                desc_pk,
1268                "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1269            );
1270            Error::Internal(format!(
1271                "descriptor pk={} missing during delete_rows for field {:?}",
1272                desc_pk, field_id
1273            ))
1274        })?;
1275        let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1276
1277        // Build metas for data column.
1278        let mut metas: Vec<ChunkMetadata> = Vec::new();
1279        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1280            metas.push(m?);
1281        }
1282        if metas.is_empty() {
1283            drop(catalog);
1284            return Ok(false);
1285        }
1286
1287        // Optionally mirror metas for row_id column.
1288        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1289        let mut descriptor_rid: Option<ColumnDescriptor> = None;
1290        tracing::warn!(
1291            field_id = ?field_id,
1292            metas_len = metas.len(),
1293            desc_pk_rid = ?desc_pk_rid,
1294            "delete_rows stage_delete_rows_for_field: data metas loaded"
1295        );
1296        if let Some(pk_rid) = desc_pk_rid
1297            && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1298        {
1299            let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1300            for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1301                metas_rid.push(m?);
1302            }
1303            descriptor_rid = Some(d_rid);
1304        }
1305
1306        tracing::warn!(
1307            field_id = ?field_id,
1308            metas_rid_len = metas_rid.len(),
1309            "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1310        );
1311
1312        let mut cum_rows = 0u64;
1313        let mut any_changed = false;
1314
1315        for (i, meta) in metas.iter_mut().enumerate() {
1316            let start_u64 = cum_rows;
1317            let end_u64 = start_u64 + meta.row_count;
1318
1319            // Advance deletes into this chunk window [start, end).
1320            while let Some(d) = cur_del {
1321                if d < start_u64
1322                    && let Some(prev) = last_seen
1323                {
1324                    if d < prev {
1325                        return Err(Error::Internal(
1326                            "rows_to_delete must be ascending/unique".into(),
1327                        ));
1328                    }
1329
1330                    last_seen = Some(d);
1331                    cur_del = del_iter.next();
1332                } else {
1333                    break;
1334                }
1335            }
1336
1337            // Collect local delete indices.
1338            let rows = meta.row_count as usize;
1339            let mut del_local: FxHashSet<usize> = FxHashSet::default();
1340            while let Some(d) = cur_del {
1341                if d >= end_u64 {
1342                    break;
1343                }
1344                del_local.insert((d - start_u64) as usize);
1345                last_seen = Some(d);
1346                cur_del = del_iter.next();
1347            }
1348
1349            if del_local.is_empty() {
1350                cum_rows = end_u64;
1351                continue;
1352            }
1353
1354            // Batch get chunk blobs (data and optional row_id).
1355            let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1356            if let Some(rm) = metas_rid.get(i) {
1357                chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1358            }
1359            let chunk_results = match self.pager.batch_get(&chunk_gets) {
1360                Ok(res) => res,
1361                Err(err) => {
1362                    tracing::trace!(
1363                        field_id = ?field_id,
1364                        chunk_pk = meta.chunk_pk,
1365                        error = ?err,
1366                        "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1367                    );
1368                    return Err(err);
1369                }
1370            };
1371            let mut chunk_blobs = FxHashMap::default();
1372            for res in chunk_results {
1373                if let GetResult::Raw { key, bytes } = res {
1374                    chunk_blobs.insert(key, bytes);
1375                }
1376            }
1377
1378            tracing::warn!(
1379                field_id = ?field_id,
1380                chunk_pk = meta.chunk_pk,
1381                rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1382                data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1383                rid_found = metas_rid
1384                    .get(i)
1385                    .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1386                "delete_rows stage_delete_rows_for_field: chunk fetch status"
1387            );
1388
1389            let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1390                Some(bytes) => bytes,
1391                None => {
1392                    tracing::trace!(
1393                        field_id = ?field_id,
1394                        chunk_pk = meta.chunk_pk,
1395                        "delete_rows stage_delete_rows_for_field: chunk missing"
1396                    );
1397                    return Err(Error::NotFound);
1398                }
1399            };
1400            let data_arr = deserialize_array(data_blob)?;
1401
1402            let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1403                let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1404                    Some(bytes) => bytes,
1405                    None => {
1406                        tracing::trace!(
1407                            field_id = ?field_id,
1408                            rowid_chunk_pk = rm.chunk_pk,
1409                            "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1410                        );
1411                        return Err(Error::NotFound);
1412                    }
1413                };
1414                Some(deserialize_array(rid_blob)?)
1415            } else {
1416                None
1417            };
1418
1419            // *** Wired: build edit via ingest helper.
1420            let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1421
1422            // Apply edit (pure array ops).
1423            let (new_data_arr, new_rid_arr) =
1424                ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1425
1426            // Write back data.
1427            let data_bytes = serialize_array(&new_data_arr)?;
1428            staged_puts.push(BatchPut::Raw {
1429                key: meta.chunk_pk,
1430                bytes: data_bytes,
1431            });
1432            meta.row_count = new_data_arr.len() as u64;
1433            meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1434
1435            // Write back row_ids if present.
1436            if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1437                let rm = metas_rid.get_mut(i).unwrap();
1438                let rid_bytes = serialize_array(&rids)?;
1439                staged_puts.push(BatchPut::Raw {
1440                    key: rm.chunk_pk,
1441                    bytes: rid_bytes,
1442                });
1443                rm.row_count = rids.len() as u64;
1444                rm.serialized_bytes = rids.get_array_memory_size() as u64;
1445            }
1446
1447            // Refresh permutation if this chunk has one.
1448            if meta.value_order_perm_pk != 0 {
1449                let sort_column = SortColumn {
1450                    values: new_data_arr,
1451                    options: None,
1452                };
1453                let indices = lexsort_to_indices(&[sort_column], None)?;
1454                let perm_bytes = serialize_array(&indices)?;
1455                staged_puts.push(BatchPut::Raw {
1456                    key: meta.value_order_perm_pk,
1457                    bytes: perm_bytes,
1458                });
1459            }
1460
1461            cum_rows = end_u64;
1462            any_changed = true;
1463        }
1464
1465        // Rewrite descriptor chains/totals and commit.
1466        descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1467        if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1468            rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1469        }
1470        drop(catalog);
1471        tracing::trace!(
1472            field_id = ?field_id,
1473            changed = any_changed,
1474            "delete_rows stage_delete_rows_for_field: finished stage"
1475        );
1476        Ok(any_changed)
1477    }
1478
1479    /// Delete row positions for one or more logical fields in a single atomic batch.
1480    ///
1481    /// The same set of global row positions is applied to every field in
1482    /// `fields`. All staged metadata and chunk updates are committed in a
1483    /// single pager batch.
1484    pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1485        if fields.is_empty() || rows_to_delete.is_empty() {
1486            return Ok(());
1487        }
1488
1489        let mut puts = Vec::new();
1490        let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1491        let mut table_id: Option<TableId> = None;
1492
1493        tracing::warn!(
1494            fields = fields.len(),
1495            rows = rows_to_delete.len(),
1496            "delete_rows begin"
1497        );
1498        for field_id in fields {
1499            tracing::warn!(field = ?field_id, "delete_rows iter field");
1500            if let Some(expected) = table_id {
1501                if field_id.table_id() != expected {
1502                    return Err(Error::InvalidArgumentError(
1503                        "delete_rows requires fields from the same table".into(),
1504                    ));
1505                }
1506            } else {
1507                table_id = Some(field_id.table_id());
1508            }
1509
1510            if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1511                touched.insert(*field_id);
1512            }
1513        }
1514
1515        if puts.is_empty() {
1516            return Ok(());
1517        }
1518
1519        self.pager.batch_put(&puts)?;
1520
1521        tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1522
1523        for field_id in touched {
1524            self.compact_field_bounded(field_id)?;
1525        }
1526        tracing::warn!("delete_rows complete");
1527        Ok(())
1528    }
1529
1530    // TODO: Move to descriptor module?
1531    // NOTE: This helper mutates descriptor metadata in place because it needs
1532    // access to the pager and catalog locks owned by `ColumnStore`.
1533    /// Write a descriptor chain from a meta slice. Reuses first page when
1534    /// possible. Frees surplus pages via `frees`.
1535    pub(crate) fn write_descriptor_chain(
1536        &self,
1537        descriptor_pk: PhysicalKey,
1538        descriptor: &mut ColumnDescriptor,
1539        new_metas: &[ChunkMetadata],
1540        puts: &mut Vec<BatchPut>,
1541        frees: &mut Vec<PhysicalKey>,
1542    ) -> Result<()> {
1543        // Collect existing page chain.
1544        let mut old_pages = Vec::new();
1545        let mut pk = descriptor.head_page_pk;
1546        while pk != 0 {
1547            let page_blob = self
1548                .pager
1549                .batch_get(&[BatchGet::Raw { key: pk }])?
1550                .pop()
1551                .and_then(|res| match res {
1552                    GetResult::Raw { bytes, .. } => Some(bytes),
1553                    _ => None,
1554                })
1555                .ok_or(Error::NotFound)?;
1556            let header = DescriptorPageHeader::from_le_bytes(
1557                &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1558            );
1559            old_pages.push(pk);
1560            pk = header.next_page_pk;
1561        }
1562
1563        // Required pages for new metas.
1564        let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1565        let need_pages = if new_metas.is_empty() {
1566            0
1567        } else {
1568            new_metas.len().div_ceil(per)
1569        };
1570        // If empty: free everything and clear counters.
1571        if need_pages == 0 {
1572            frees.extend(old_pages.iter().copied());
1573
1574            // Setting both page pointers to 0 indicates an empty descriptor state.
1575            // This signals that the descriptor needs reinitialization on next access,
1576            // as this pattern (head_page_pk == 0 && tail_page_pk == 0) is checked
1577            // in the `load_or_create` method to determine if allocation is needed.
1578            descriptor.head_page_pk = 0;
1579            descriptor.tail_page_pk = 0;
1580            descriptor.total_row_count = 0;
1581            descriptor.total_chunk_count = 0;
1582            puts.push(BatchPut::Raw {
1583                key: descriptor_pk,
1584                bytes: descriptor.to_le_bytes(),
1585            });
1586            return Ok(());
1587        }
1588
1589        // Reuse first page; alloc more if needed.
1590        let mut pages = Vec::with_capacity(need_pages);
1591        if !old_pages.is_empty() {
1592            pages.push(old_pages[0]);
1593        } else {
1594            pages.push(self.pager.alloc_many(1)?[0]);
1595            descriptor.head_page_pk = pages[0];
1596        }
1597        if need_pages > pages.len() {
1598            let extra = self.pager.alloc_many(need_pages - pages.len())?;
1599            pages.extend(extra);
1600        }
1601
1602        // Free surplus old pages.
1603        if old_pages.len() > need_pages {
1604            frees.extend(old_pages[need_pages..].iter().copied());
1605        }
1606
1607        // Write page contents.
1608        let mut off = 0usize;
1609        for (i, page_pk) in pages.iter().copied().enumerate() {
1610            let remain = new_metas.len() - off;
1611            let count = remain.min(per);
1612            let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1613            let header = DescriptorPageHeader {
1614                next_page_pk: next,
1615                entry_count: count as u32,
1616                _padding: [0; 4],
1617            };
1618            let mut page_bytes = header.to_le_bytes().to_vec();
1619            for m in &new_metas[off..off + count] {
1620                page_bytes.extend_from_slice(&m.to_le_bytes());
1621            }
1622            puts.push(BatchPut::Raw {
1623                key: page_pk,
1624                bytes: page_bytes,
1625            });
1626            off += count;
1627        }
1628
1629        descriptor.tail_page_pk = *pages.last().unwrap();
1630        descriptor.total_chunk_count = new_metas.len() as u64;
1631        descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1632        puts.push(BatchPut::Raw {
1633            key: descriptor_pk,
1634            bytes: descriptor.to_le_bytes(),
1635        });
1636        Ok(())
1637    }
1638
1639    /// Bounded, local field compaction. Merges adjacent small chunks into
1640    /// ~TARGET_CHUNK_BYTES; leaves large chunks intact. Recomputes perms if
1641    /// any source chunk in a run had one. Frees obsolete chunks/pages.
1642    fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1643        // We may rewrite descriptors; take a write lock.
1644        let mut catalog = self.catalog.write().unwrap();
1645
1646        let desc_pk = match catalog.map.get(&field_id) {
1647            Some(&pk) => pk,
1648            None => return Ok(()),
1649        };
1650        let rid_fid = rowid_fid(field_id);
1651        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1652            Some(&pk) => pk,
1653            None => return Ok(()),
1654        };
1655
1656        // True batching for the two descriptor reads.
1657        let gets = vec![
1658            BatchGet::Raw { key: desc_pk },
1659            BatchGet::Raw { key: desc_pk_rid },
1660        ];
1661        let results = self.pager.batch_get(&gets)?;
1662        let mut blobs_by_pk = FxHashMap::default();
1663        for res in results {
1664            if let GetResult::Raw { key, bytes } = res {
1665                blobs_by_pk.insert(key, bytes);
1666            }
1667        }
1668        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1669        let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1670        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1671        let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1672
1673        // Load metas.
1674        let mut metas = Vec::new();
1675        for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1676            metas.push(m?);
1677        }
1678        let mut metas_rid = Vec::new();
1679        for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1680            metas_rid.push(m?);
1681        }
1682        if metas.is_empty() || metas_rid.is_empty() {
1683            return Ok(());
1684        }
1685
1686        let mut puts: Vec<BatchPut> = Vec::new();
1687        let mut frees: Vec<PhysicalKey> = Vec::new();
1688        let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1689        let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1690
1691        let mut i = 0usize;
1692        while i < metas.len() {
1693            let sz = metas[i].serialized_bytes as usize;
1694            // Keep large chunks as-is.
1695            if sz >= MIN_CHUNK_BYTES {
1696                new_metas.push(metas[i]);
1697                new_rid_metas.push(metas_rid[i]);
1698                i += 1;
1699                continue;
1700            }
1701
1702            // Build a small run [i, j) capped by MAX_MERGE_RUN_BYTES.
1703            let mut j = i;
1704            let mut run_bytes = 0usize;
1705            while j < metas.len() {
1706                let b = metas[j].serialized_bytes as usize;
1707                if b >= TARGET_CHUNK_BYTES {
1708                    break;
1709                }
1710                if run_bytes + b > MAX_MERGE_RUN_BYTES {
1711                    break;
1712                }
1713                run_bytes += b;
1714                j += 1;
1715            }
1716            if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1717                new_metas.push(metas[i]);
1718                new_rid_metas.push(metas_rid[i]);
1719                i += 1;
1720                continue;
1721            }
1722
1723            // Fetch and concatenate the run's data and row_ids.
1724            let mut gets = Vec::with_capacity((j - i) * 2);
1725            for k in i..j {
1726                gets.push(BatchGet::Raw {
1727                    key: metas[k].chunk_pk,
1728                });
1729                gets.push(BatchGet::Raw {
1730                    key: metas_rid[k].chunk_pk,
1731                });
1732            }
1733            let results = self.pager.batch_get(&gets)?;
1734            let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1735            for r in results {
1736                match r {
1737                    GetResult::Raw { key, bytes } => {
1738                        by_pk.insert(key, bytes);
1739                    }
1740                    _ => return Err(Error::NotFound),
1741                }
1742            }
1743            let mut data_parts = Vec::with_capacity(j - i);
1744            let mut rid_parts = Vec::with_capacity(j - i);
1745            for k in i..j {
1746                let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1747                data_parts.push(deserialize_array(db.clone())?);
1748                let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1749                rid_parts.push(deserialize_array(rb.clone())?);
1750            }
1751            let merged_data = concat_many(data_parts.iter().collect())?;
1752            let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1753
1754            // Split merged run into ~target-sized chunks.
1755            let slices = split_to_target_bytes(
1756                &merged_data,
1757                TARGET_CHUNK_BYTES,
1758                self.cfg.varwidth_fallback_rows_per_slice,
1759            );
1760            let mut rid_off = 0usize;
1761            let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1762
1763            for s in slices {
1764                let rows = s.len();
1765                // Slice rid to match s (avoid double Arc).
1766                let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1767                let rid_norm = zero_offset(&rid_ref);
1768                let rid_pk = self.pager.alloc_many(1)?[0];
1769                let rid_bytes = serialize_array(rid_norm.as_ref())?;
1770
1771                let data_pk = self.pager.alloc_many(1)?[0];
1772                let s_norm = zero_offset(&s);
1773                let data_bytes = serialize_array(s_norm.as_ref())?;
1774                puts.push(BatchPut::Raw {
1775                    key: data_pk,
1776                    bytes: data_bytes,
1777                });
1778                puts.push(BatchPut::Raw {
1779                    key: rid_pk,
1780                    bytes: rid_bytes,
1781                });
1782                let mut meta = ChunkMetadata {
1783                    chunk_pk: data_pk,
1784                    row_count: rows as u64,
1785                    serialized_bytes: s_norm.get_array_memory_size() as u64,
1786                    max_val_u64: u64::MAX,
1787                    ..Default::default()
1788                };
1789                // If any source chunk had a perm, recompute for this slice.
1790                if need_perms {
1791                    let sort_col = SortColumn {
1792                        values: s.clone(),
1793                        options: None,
1794                    };
1795                    let idx = lexsort_to_indices(&[sort_col], None)?;
1796                    let perm_bytes = serialize_array(&idx)?;
1797                    let perm_pk = self.pager.alloc_many(1)?[0];
1798                    puts.push(BatchPut::Raw {
1799                        key: perm_pk,
1800                        bytes: perm_bytes,
1801                    });
1802                    meta.value_order_perm_pk = perm_pk;
1803                }
1804
1805                // Build presence index for rids and min/max
1806                let rid_any = rid_norm.clone();
1807                let rids = rid_any
1808                    .as_any()
1809                    .downcast_ref::<UInt64Array>()
1810                    .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1811                let mut min = u64::MAX;
1812                let mut max = 0u64;
1813                let mut sorted_rids = true;
1814                let mut last_v = 0u64;
1815                for ii in 0..rids.len() {
1816                    let v = rids.value(ii);
1817                    if ii == 0 {
1818                        last_v = v;
1819                    } else if v < last_v {
1820                        sorted_rids = false;
1821                    } else {
1822                        last_v = v;
1823                    }
1824                    if v < min {
1825                        min = v;
1826                    }
1827                    if v > max {
1828                        max = v;
1829                    }
1830                }
1831                let mut rid_perm_pk = 0u64;
1832                if !sorted_rids {
1833                    let rid_sort_col = SortColumn {
1834                        values: rid_any,
1835                        options: None,
1836                    };
1837                    let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1838                    let rid_perm_bytes = serialize_array(&rid_idx)?;
1839                    rid_perm_pk = self.pager.alloc_many(1)?[0];
1840                    puts.push(BatchPut::Raw {
1841                        key: rid_perm_pk,
1842                        bytes: rid_perm_bytes,
1843                    });
1844                }
1845                let rid_meta = ChunkMetadata {
1846                    chunk_pk: rid_pk,
1847                    value_order_perm_pk: rid_perm_pk,
1848                    row_count: rows as u64,
1849                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
1850                    min_val_u64: if rows > 0 { min } else { 0 },
1851                    max_val_u64: if rows > 0 { max } else { 0 },
1852                };
1853                new_metas.push(meta);
1854                new_rid_metas.push(rid_meta);
1855                rid_off += rows;
1856            }
1857
1858            // Free all old data/rid/perms in the merged run.
1859            for k in i..j {
1860                frees.push(metas[k].chunk_pk);
1861                if metas[k].value_order_perm_pk != 0 {
1862                    frees.push(metas[k].value_order_perm_pk);
1863                }
1864                frees.push(metas_rid[k].chunk_pk);
1865                if metas_rid[k].value_order_perm_pk != 0 {
1866                    frees.push(metas_rid[k].value_order_perm_pk);
1867                }
1868            }
1869
1870            i = j;
1871        }
1872
1873        // If everything was deleted, drop the field entirely.
1874        if new_metas.is_empty() {
1875            // Drop descriptors and catalog mapping.
1876            self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1877            self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1878            catalog.map.remove(&field_id);
1879            catalog.map.remove(&rid_fid);
1880            puts.push(BatchPut::Raw {
1881                key: CATALOG_ROOT_PKEY,
1882                bytes: catalog.to_bytes(),
1883            });
1884            if !puts.is_empty() {
1885                self.pager.batch_put(&puts)?;
1886            }
1887            if !frees.is_empty() {
1888                self.pager.free_many(&frees)?;
1889            }
1890            return Ok(());
1891        }
1892
1893        // Rewrite descriptor chains to match the new meta lists.
1894        self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1895        self.write_descriptor_chain(
1896            desc_pk_rid,
1897            &mut desc_rid,
1898            &new_rid_metas,
1899            &mut puts,
1900            &mut frees,
1901        )?;
1902        // Persist new/updated blobs and free the old ones.
1903        if !puts.is_empty() {
1904            self.pager.batch_put(&puts)?;
1905        }
1906        if !frees.is_empty() {
1907            self.pager.free_many(&frees)?;
1908        }
1909
1910        Ok(())
1911    }
1912
1913    /// (Internal) Helper for batch appends. Appends metadata to the current
1914    /// in-memory tail page, creating a new one if necessary.
1915    fn append_meta_in_loop(
1916        &self,
1917        descriptor: &mut ColumnDescriptor,
1918        tail_page_bytes: &mut Vec<u8>,
1919        meta: ChunkMetadata,
1920        puts: &mut Vec<BatchPut>,
1921    ) -> Result<()> {
1922        let mut header = DescriptorPageHeader::from_le_bytes(
1923            &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1924        );
1925        if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1926            && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1927        {
1928            // Case 1: There's room in the current tail page.
1929            tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1930            header.entry_count += 1;
1931            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1932                .copy_from_slice(&header.to_le_bytes());
1933        } else {
1934            // Case 2: The tail page is full. Write it out and start a new one.
1935            let new_tail_pk = self.pager.alloc_many(1)?[0];
1936            header.next_page_pk = new_tail_pk;
1937            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1938                .copy_from_slice(&header.to_le_bytes());
1939            // --- PERFORMANCE FIX: Move the full page's bytes instead of
1940            // cloning ---
1941            let full_page_to_write = std::mem::take(tail_page_bytes);
1942            puts.push(BatchPut::Raw {
1943                key: descriptor.tail_page_pk,
1944                bytes: full_page_to_write,
1945            });
1946            // Create the new tail page.
1947            let new_header = DescriptorPageHeader {
1948                next_page_pk: 0,
1949                entry_count: 1,
1950                _padding: [0; 4],
1951            };
1952            let mut new_page_bytes = new_header.to_le_bytes().to_vec();
1953            new_page_bytes.extend_from_slice(&meta.to_le_bytes());
1954            // Update our live state to point to the new tail.
1955            descriptor.tail_page_pk = new_tail_pk;
1956            *tail_page_bytes = new_page_bytes;
1957        }
1958
1959        descriptor.total_row_count += meta.row_count;
1960        descriptor.total_chunk_count += 1;
1961        Ok(())
1962    }
1963
1964    /// Verifies the integrity of the column store's metadata.
1965    ///
1966    /// This check is useful for tests and debugging. It verifies:
1967    /// 1. The catalog can be read.
1968    /// 2. All descriptor chains are walkable.
1969    /// 3. The row and chunk counts in the descriptors match the sum of the
1970    ///    chunk metadata.
1971    ///
1972    /// Returns `Ok(())` if consistent, otherwise returns an `Error`.
1973    pub fn verify_integrity(&self) -> Result<()> {
1974        let catalog = self.catalog.read().unwrap();
1975        for (&field_id, &descriptor_pk) in &catalog.map {
1976            let desc_blob = self
1977                .pager
1978                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1979                .pop()
1980                .and_then(|r| match r {
1981                    GetResult::Raw { bytes, .. } => Some(bytes),
1982                    _ => None,
1983                })
1984                .ok_or_else(|| {
1985                    Error::Internal(format!(
1986                        "Catalog points to missing descriptor pk={}",
1987                        descriptor_pk
1988                    ))
1989                })?;
1990            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1991            if descriptor.field_id != field_id {
1992                return Err(Error::Internal(format!(
1993                    "Descriptor at pk={} has wrong field_id: expected {:?}, \
1994                     got {:?}",
1995                    descriptor_pk, field_id, descriptor.field_id
1996                )));
1997            }
1998
1999            let mut actual_rows = 0;
2000            let mut actual_chunks = 0;
2001            let mut current_page_pk = descriptor.head_page_pk;
2002            while current_page_pk != 0 {
2003                let page_blob = self
2004                    .pager
2005                    .batch_get(&[BatchGet::Raw {
2006                        key: current_page_pk,
2007                    }])?
2008                    .pop()
2009                    .and_then(|r| match r {
2010                        GetResult::Raw { bytes, .. } => Some(bytes),
2011                        _ => None,
2012                    })
2013                    .ok_or_else(|| {
2014                        Error::Internal(format!(
2015                            "Descriptor page chain broken at pk={}",
2016                            current_page_pk
2017                        ))
2018                    })?;
2019                let header = DescriptorPageHeader::from_le_bytes(
2020                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2021                );
2022                for i in 0..(header.entry_count as usize) {
2023                    let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2024                    let end = off + ChunkMetadata::DISK_SIZE;
2025                    let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2026                    actual_rows += meta.row_count;
2027                    actual_chunks += 1;
2028                }
2029                current_page_pk = header.next_page_pk;
2030            }
2031
2032            if descriptor.total_row_count != actual_rows {
2033                return Err(Error::Internal(format!(
2034                    "Row count mismatch for field {:?}: descriptor says {}, \
2035                     actual is {}",
2036                    field_id, descriptor.total_row_count, actual_rows
2037                )));
2038            }
2039            if descriptor.total_chunk_count != actual_chunks {
2040                return Err(Error::Internal(format!(
2041                    "Chunk count mismatch for field {:?}: descriptor says {}, \
2042                     actual is {}",
2043                    field_id, descriptor.total_chunk_count, actual_chunks
2044                )));
2045            }
2046        }
2047        Ok(())
2048    }
2049
2050    /// Gathers detailed statistics about the storage layout.
2051    ///
2052    /// This method is designed for low-level analysis and debugging, allowing
2053    /// you to check for under- or over-utilization of descriptor pages.
2054    pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2055        let catalog = self.catalog.read().unwrap();
2056        let mut all_stats = Vec::new();
2057
2058        for (&field_id, &descriptor_pk) in &catalog.map {
2059            let desc_blob = self
2060                .pager
2061                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2062                .pop()
2063                .and_then(|r| match r {
2064                    GetResult::Raw { bytes, .. } => Some(bytes),
2065                    _ => None,
2066                })
2067                .ok_or(Error::NotFound)?;
2068            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2069
2070            let mut page_stats = Vec::new();
2071            let mut current_page_pk = descriptor.head_page_pk;
2072            while current_page_pk != 0 {
2073                let page_blob = self
2074                    .pager
2075                    .batch_get(&[BatchGet::Raw {
2076                        key: current_page_pk,
2077                    }])?
2078                    .pop()
2079                    .and_then(|r| match r {
2080                        GetResult::Raw { bytes, .. } => Some(bytes),
2081                        _ => None,
2082                    })
2083                    .ok_or(Error::NotFound)?;
2084                let header = DescriptorPageHeader::from_le_bytes(
2085                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2086                );
2087                page_stats.push(DescriptorPageStats {
2088                    page_pk: current_page_pk,
2089                    entry_count: header.entry_count,
2090                    page_size_bytes: page_blob.as_ref().len(),
2091                });
2092                current_page_pk = header.next_page_pk;
2093            }
2094
2095            all_stats.push(ColumnLayoutStats {
2096                field_id,
2097                total_rows: descriptor.total_row_count,
2098                total_chunks: descriptor.total_chunk_count,
2099                pages: page_stats,
2100            });
2101        }
2102        Ok(all_stats)
2103    }
2104}