llkv_column_map/store/
core.rs

1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4    ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16    constants::CATALOG_ROOT_PKEY,
17    pager::{BatchGet, BatchPut, GetResult, Pager},
18    serialization::{deserialize_array, serialize_array},
19    types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26/// Columnar storage engine for managing Arrow-based data.
27///
28/// `ColumnStore` provides the primary interface for persisting and retrieving columnar
29/// data using Apache Arrow [`RecordBatch`]es. It manages:
30///
31/// - Column descriptors and metadata (chunk locations, row counts, min/max values)
32/// - Data type caching for efficient schema queries
33/// - Index management (presence indexes, value indexes)
34/// - Integration with the [`Pager`] for persistent storage
35///
36/// # Namespaces
37///
38/// Columns are identified by [`LogicalFieldId`], which combines a namespace, table ID,
39/// and field ID. This prevents collisions between user data, row IDs, and MVCC metadata:
40///
41/// - `UserData`: Regular table columns
42/// - `RowIdShadow`: Internal row ID tracking
43/// - `TxnCreatedBy`: MVCC transaction creation timestamps
44/// - `TxnDeletedBy`: MVCC transaction deletion timestamps
45///
46/// # Thread Safety
47///
48/// `ColumnStore` is `Send + Sync` and can be safely shared across threads via `Arc`.
49/// Internal state (catalog, caches) uses `RwLock` for concurrent access.
50pub struct ColumnStore<P: Pager> {
51    pub(crate) pager: Arc<P>,
52    pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
53    cfg: ColumnStoreConfig,
54    dtype_cache: DTypeCache<P>,
55    index_manager: IndexManager<P>,
56}
57
58impl<P> Clone for ColumnStore<P>
59where
60    P: Pager<Blob = EntryHandle> + Send + Sync,
61{
62    fn clone(&self) -> Self {
63        Self {
64            pager: Arc::clone(&self.pager),
65            catalog: Arc::clone(&self.catalog),
66            cfg: self.cfg.clone(),
67            dtype_cache: self.dtype_cache.clone(),
68            index_manager: self.index_manager.clone(),
69        }
70    }
71}
72
73impl<P> ColumnStore<P>
74where
75    P: Pager<Blob = EntryHandle> + Send + Sync,
76{
77    /// Open or create a `ColumnStore` using the provided pager.
78    ///
79    /// This loads the column catalog from the pager's root catalog key, or initializes
80    /// an empty catalog if none exists. The catalog maps [`LogicalFieldId`] to the
81    /// physical keys of column descriptors.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the pager fails to load the catalog or if deserialization fails.
86    pub fn open(pager: Arc<P>) -> Result<Self> {
87        let cfg = ColumnStoreConfig::default();
88        let catalog = match pager
89            .batch_get(&[BatchGet::Raw {
90                key: CATALOG_ROOT_PKEY,
91            }])?
92            .pop()
93        {
94            Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
95            _ => ColumnCatalog::default(),
96        };
97        let arc_catalog = Arc::new(RwLock::new(catalog));
98
99        let index_manager = IndexManager::new(Arc::clone(&pager));
100
101        Ok(Self {
102            pager: Arc::clone(&pager),
103            catalog: Arc::clone(&arc_catalog),
104            cfg,
105            dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
106            index_manager,
107        })
108    }
109
110    /// Create and persist an index for a column.
111    ///
112    /// This builds the specified index type for all existing data in the column and
113    /// persists it atomically. The index will be maintained automatically on subsequent
114    /// appends and updates.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the column doesn't exist or if index creation fails.
119    pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
120        self.index_manager.register_index(self, field_id, kind)
121    }
122
123    /// Check if a logical field is registered in the catalog.
124    pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
125        let catalog = self.catalog.read().unwrap();
126        catalog.map.contains_key(&field_id)
127    }
128
129    /// Remove a persisted index from a column.
130    ///
131    /// This atomically removes the index and frees associated storage. The column data
132    /// itself is not affected.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the column or index doesn't exist.
137    pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
138        self.index_manager.unregister_index(self, field_id, kind)
139    }
140
141    /// Get the Arrow data type of a column.
142    ///
143    /// This returns the data type from cache if available, otherwise loads it from
144    /// the column descriptor and caches it for future queries.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
149    pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
150        if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
151            return Ok(dt);
152        }
153        self.dtype_cache.dtype_for_field(field_id)
154    }
155
156    /// Ensure that catalog entries and descriptors exist for a logical column.
157    ///
158    /// This is primarily used when creating empty tables so that subsequent
159    /// operations (like `CREATE INDEX`) can resolve column metadata before any
160    /// data has been appended.
161    pub fn ensure_column_registered(
162        &self,
163        field_id: LogicalFieldId,
164        data_type: &DataType,
165    ) -> Result<()> {
166        let rid_field_id = rowid_fid(field_id);
167
168        let mut catalog_dirty = false;
169        let descriptor_pk;
170        let rid_descriptor_pk;
171
172        {
173            let mut catalog = self.catalog.write().unwrap();
174            descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
175                pk
176            } else {
177                let pk = self.pager.alloc_many(1)?[0];
178                catalog.map.insert(field_id, pk);
179                catalog_dirty = true;
180                pk
181            };
182
183            rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
184                pk
185            } else {
186                let pk = self.pager.alloc_many(1)?[0];
187                catalog.map.insert(rid_field_id, pk);
188                catalog_dirty = true;
189                pk
190            };
191        }
192
193        let mut puts: Vec<BatchPut> = Vec::new();
194
195        let data_descriptor_missing = self
196            .pager
197            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
198            .pop()
199            .and_then(|r| match r {
200                GetResult::Raw { bytes, .. } => Some(bytes),
201                _ => None,
202            })
203            .is_none();
204
205        if data_descriptor_missing {
206            let (mut descriptor, tail_page) =
207                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
208            let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
209            if fingerprint != 0 {
210                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
211            }
212            puts.push(BatchPut::Raw {
213                key: descriptor.tail_page_pk,
214                bytes: tail_page,
215            });
216            puts.push(BatchPut::Raw {
217                key: descriptor_pk,
218                bytes: descriptor.to_le_bytes(),
219            });
220        }
221
222        let rid_descriptor_missing = self
223            .pager
224            .batch_get(&[BatchGet::Raw {
225                key: rid_descriptor_pk,
226            }])?
227            .pop()
228            .and_then(|r| match r {
229                GetResult::Raw { bytes, .. } => Some(bytes),
230                _ => None,
231            })
232            .is_none();
233
234        if rid_descriptor_missing {
235            let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
236                Arc::clone(&self.pager),
237                rid_descriptor_pk,
238                rid_field_id,
239            )?;
240            let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
241            if fingerprint != 0 {
242                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
243            }
244            puts.push(BatchPut::Raw {
245                key: rid_descriptor.tail_page_pk,
246                bytes: tail_page,
247            });
248            puts.push(BatchPut::Raw {
249                key: rid_descriptor_pk,
250                bytes: rid_descriptor.to_le_bytes(),
251            });
252        }
253
254        self.dtype_cache.insert(field_id, data_type.clone());
255
256        if catalog_dirty {
257            let catalog_bytes = {
258                let catalog = self.catalog.read().unwrap();
259                catalog.to_bytes()
260            };
261            puts.push(BatchPut::Raw {
262                key: CATALOG_ROOT_PKEY,
263                bytes: catalog_bytes,
264            });
265        }
266
267        if !puts.is_empty() {
268            self.pager.batch_put(&puts)?;
269        }
270
271        Ok(())
272    }
273
274    /// Find all row IDs where a column satisfies a predicate.
275    ///
276    /// This evaluates the predicate against the column's data and returns a vector
277    /// of matching row IDs. Uses indexes and chunk metadata (min/max values) to
278    /// skip irrelevant data when possible.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
283    pub fn filter_row_ids<T>(
284        &self,
285        field_id: LogicalFieldId,
286        predicate: &Predicate<T::Value>,
287    ) -> Result<Vec<u64>>
288    where
289        T: FilterDispatch,
290    {
291        tracing::trace!(field=?field_id, "filter_row_ids start");
292        let res = T::run_filter(self, field_id, predicate);
293        if let Err(ref err) = res {
294            tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
295        } else {
296            tracing::trace!(field=?field_id, "filter_row_ids ok");
297        }
298        res
299    }
300
301    // TODO: Document
302    pub fn filter_matches<T, F>(
303        &self,
304        field_id: LogicalFieldId,
305        predicate: F,
306    ) -> Result<FilterResult>
307    where
308        T: FilterPrimitive,
309        F: FnMut(T::Native) -> bool,
310    {
311        T::run_filter_with_result(self, field_id, predicate)
312    }
313
314    /// List all indexes registered for a column.
315    ///
316    /// Returns the types of indexes (e.g., presence, value) that are currently
317    /// persisted for the specified column.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
322    pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
323        let catalog = self.catalog.read().unwrap();
324        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
325
326        let desc_blob = self
327            .pager
328            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
329            .pop()
330            .and_then(|r| match r {
331                GetResult::Raw { bytes, .. } => Some(bytes),
332                _ => None,
333            })
334            .ok_or(Error::NotFound)?;
335        let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
336
337        let kinds = descriptor.get_indexes()?;
338        Ok(kinds)
339    }
340
341    /// Get the total number of rows in a column.
342    ///
343    /// Returns the persisted row count from the column's descriptor. This value is
344    /// updated by append and delete operations.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
349    pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
350        let catalog = self.catalog.read().unwrap();
351        let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
352        drop(catalog);
353
354        let desc_blob = self
355            .pager
356            .batch_get(&[BatchGet::Raw { key: desc_pk }])?
357            .pop()
358            .and_then(|r| match r {
359                GetResult::Raw { bytes, .. } => Some(bytes),
360                _ => None,
361            })
362            .ok_or(Error::NotFound)?;
363
364        let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
365        Ok(desc.total_row_count)
366    }
367
368    /// Get the total number of rows in a table.
369    ///
370    /// This returns the maximum row count across all user-data columns in the table.
371    /// If the table has no persisted columns, returns `0`.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if column descriptors cannot be loaded.
376    pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
377        use crate::types::Namespace;
378        // Acquire read lock on catalog and find any matching user-data field
379        let catalog = self.catalog.read().unwrap();
380        // Collect all user-data logical field ids for this table.
381        let candidates: Vec<LogicalFieldId> = catalog
382            .map
383            .keys()
384            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
385            .copied()
386            .collect();
387        drop(catalog);
388
389        if candidates.is_empty() {
390            return Ok(0);
391        }
392
393        // Return the maximum total_row_count across all user columns for the table.
394        let mut max_rows: u64 = 0;
395        for field in candidates {
396            let rows = self.total_rows_for_field(field)?;
397            if rows > max_rows {
398                max_rows = rows;
399            }
400        }
401        Ok(max_rows)
402    }
403
404    /// Get all user-data column IDs for a table.
405    ///
406    /// This returns the [`LogicalFieldId`]s of all persisted user columns (namespace
407    /// `UserData`) belonging to the specified table. MVCC and row ID columns are not
408    /// included.
409    pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
410        use crate::types::Namespace;
411
412        let catalog = self.catalog.read().unwrap();
413        catalog
414            .map
415            .keys()
416            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
417            .copied()
418            .collect()
419    }
420
421    /// Check whether a specific row ID exists in a column.
422    ///
423    /// This uses presence indexes and binary search when available for fast lookups.
424    /// If no presence index exists, it scans chunks and uses min/max metadata to
425    /// prune irrelevant data.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
430    pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
431        let rid_fid = rowid_fid(field_id);
432        let catalog = self.catalog.read().unwrap();
433        let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
434        let rid_desc_blob = self
435            .pager
436            .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
437            .pop()
438            .and_then(|r| match r {
439                GetResult::Raw { bytes, .. } => Some(bytes),
440                _ => None,
441            })
442            .ok_or(Error::NotFound)?;
443        let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
444        drop(catalog);
445
446        // Walk metas; prune by min/max when available.
447        for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
448            let meta = m?;
449            if meta.row_count == 0 {
450                continue;
451            }
452            if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
453                || row_id > meta.max_val_u64
454            {
455                continue;
456            }
457            // Fetch rid chunk and, if present, the presence perm
458            let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
459            if meta.value_order_perm_pk != 0 {
460                gets.push(BatchGet::Raw {
461                    key: meta.value_order_perm_pk,
462                });
463            }
464            let results = self.pager.batch_get(&gets)?;
465            let mut rid_blob: Option<EntryHandle> = None;
466            let mut perm_blob: Option<EntryHandle> = None;
467            for r in results {
468                if let GetResult::Raw { key, bytes } = r {
469                    if key == meta.chunk_pk {
470                        rid_blob = Some(bytes);
471                    } else if key == meta.value_order_perm_pk {
472                        perm_blob = Some(bytes);
473                    }
474                }
475            }
476            // If the rid blob for this chunk is missing, treat as absent and continue
477            let Some(rid_blob) = rid_blob else { continue };
478            let rid_any = deserialize_array(rid_blob)?;
479            let rids = rid_any
480                .as_any()
481                .downcast_ref::<UInt64Array>()
482                .ok_or_else(|| Error::Internal("rid downcast".into()))?;
483            if let Some(pblob) = perm_blob {
484                let perm_any = deserialize_array(pblob)?;
485                let perm = perm_any
486                    .as_any()
487                    .downcast_ref::<UInt32Array>()
488                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
489                // Binary search over sorted-by-perm view
490                let mut lo: isize = 0;
491                let mut hi: isize = (perm.len() as isize) - 1;
492                while lo <= hi {
493                    let mid = ((lo + hi) >> 1) as usize;
494                    let rid = rids.value(perm.value(mid) as usize);
495                    if rid == row_id {
496                        return Ok(true);
497                    } else if rid < row_id {
498                        lo = mid as isize + 1;
499                    } else {
500                        hi = mid as isize - 1;
501                    }
502                }
503            } else {
504                // Assume rid chunk is sorted ascending (common for appends/compaction) and binary search
505                let mut lo: isize = 0;
506                let mut hi: isize = (rids.len() as isize) - 1;
507                while lo <= hi {
508                    let mid = ((lo + hi) >> 1) as usize;
509                    let rid = rids.value(mid);
510                    if rid == row_id {
511                        return Ok(true);
512                    } else if rid < row_id {
513                        lo = mid as isize + 1;
514                    } else {
515                        hi = mid as isize - 1;
516                    }
517                }
518            }
519        }
520        Ok(false)
521    }
522
523    // TODO: Set up a way to optionally auto-increment a row ID column if not provided.
524    /// Append a [`RecordBatch`] to the store.
525    ///
526    /// The batch must include a `rowid` column (type `UInt64`) that uniquely identifies
527    /// each row. Each other column must have `field_id` metadata mapping it to a
528    /// [`LogicalFieldId`].
529    ///
530    /// ## Last-Write-Wins Updates
531    ///
532    /// If any row IDs in the batch already exist, they are updated in-place (overwritten)
533    /// rather than creating duplicates. This happens in a separate transaction before
534    /// appending new rows.
535    ///
536    /// ## Row ID Ordering
537    ///
538    /// The batch is automatically sorted by `rowid` if not already sorted. This ensures
539    /// efficient metadata updates and naturally sorted shadow columns.
540    ///
541    /// ## Table Separation
542    ///
543    /// Each batch should contain columns from only one table. To append to multiple
544    /// tables, call `append` separately for each table's batch (may be concurrent).
545    ///
546    /// # Errors
547    ///
548    /// Returns an error if:
549    /// - The batch is missing the `rowid` column
550    /// - Column metadata is missing or invalid
551    /// - Storage operations fail
552    #[allow(unused_variables, unused_assignments)] // TODO: Keep `presence_index_created`?
553    pub fn append(&self, batch: &RecordBatch) -> Result<()> {
554        tracing::trace!(
555            num_columns = batch.num_columns(),
556            num_rows = batch.num_rows(),
557            "ColumnStore::append BEGIN"
558        );
559        // --- PHASE 1: PRE-PROCESSING THE INCOMING BATCH ---
560        // The `append` logic relies on row IDs being processed in ascending order to handle
561        // metadata updates efficiently and to ensure the shadow row_id chunks are naturally sorted.
562        // This block checks if the incoming batch is already sorted by `row_id`. If not, it creates a
563        // new, sorted `RecordBatch` to work with for the rest of the function.
564        let working_batch: RecordBatch;
565        let batch_ref = {
566            let schema = batch.schema();
567            let row_id_idx = schema
568                .index_of(ROW_ID_COLUMN_NAME)
569                .map_err(|_| Error::Internal("row_id column required".into()))?;
570            let row_id_any = batch.column(row_id_idx).clone();
571            let row_id_arr = row_id_any
572                .as_any()
573                .downcast_ref::<UInt64Array>()
574                .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
575
576            // Manually check if the row_id column is sorted.
577            let mut is_sorted = true;
578            if !row_id_arr.is_empty() {
579                let mut last = row_id_arr.value(0);
580                for i in 1..row_id_arr.len() {
581                    let current = row_id_arr.value(i);
582                    if current < last {
583                        is_sorted = false;
584                        break;
585                    }
586                    last = current;
587                }
588            }
589
590            // If sorted, we can use the original batch directly.
591            // Otherwise, we compute a permutation and reorder all columns.
592            if is_sorted {
593                batch
594            } else {
595                let sort_col = SortColumn {
596                    values: row_id_any,
597                    options: None,
598                };
599                let idx = lexsort_to_indices(&[sort_col], None)?;
600                let perm = idx
601                    .as_any()
602                    .downcast_ref::<UInt32Array>()
603                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
604                let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
605                for i in 0..batch.num_columns() {
606                    cols.push(compute::take(batch.column(i), perm, None)?);
607                }
608                working_batch = RecordBatch::try_new(schema.clone(), cols)
609                    .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
610                &working_batch
611            }
612        };
613
614        tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
615
616        // --- PHASE 2: LAST-WRITER-WINS (LWW) REWRITE ---
617        // This phase handles updates. It identifies any rows in the incoming batch that
618        // already exist in the store and rewrites them in-place. This is a separate
619        // transaction that happens before the main append of new rows.
620        let schema = batch_ref.schema();
621        let row_id_idx = schema
622            .index_of(ROW_ID_COLUMN_NAME)
623            .map_err(|_| Error::Internal("row_id column required".into()))?;
624
625        // Create a quick lookup map of incoming row IDs to their positions in the batch.
626        let row_id_arr = batch_ref
627            .column(row_id_idx)
628            .as_any()
629            .downcast_ref::<UInt64Array>()
630            .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
631        let mut incoming_ids_map = FxHashMap::default();
632        incoming_ids_map.reserve(row_id_arr.len());
633        for i in 0..row_id_arr.len() {
634            incoming_ids_map.insert(row_id_arr.value(i), i);
635        }
636
637        // These variables will track the state of the LWW transaction.
638        let mut catalog_dirty = false;
639        let mut puts_rewrites: Vec<BatchPut> = Vec::new();
640        let mut all_rewritten_ids = FxHashSet::default();
641
642        // Iterate through each column in the batch (except row_id) to perform rewrites.
643        let mut catalog_lock = self.catalog.write().unwrap();
644        for i in 0..batch_ref.num_columns() {
645            if i == row_id_idx {
646                continue;
647            }
648            let field = schema.field(i);
649            if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
650                let field_id = field_id_str
651                    .parse::<u64>()
652                    .map(LogicalFieldId::from)
653                    .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
654
655                // `lww_rewrite_for_field` finds overlapping row IDs and rewrites the data chunks.
656                // It returns the set of row IDs that were updated.
657                let rewritten = self.lww_rewrite_for_field(
658                    &mut catalog_lock,
659                    field_id,
660                    &incoming_ids_map,
661                    batch_ref.column(i),
662                    batch_ref.column(row_id_idx),
663                    &mut puts_rewrites,
664                )?;
665                all_rewritten_ids.extend(rewritten);
666            }
667        }
668        drop(catalog_lock);
669
670        // Commit the LWW changes to the pager immediately.
671        if !puts_rewrites.is_empty() {
672            self.pager.batch_put(&puts_rewrites)?;
673        }
674
675        tracing::trace!("ColumnStore::append PHASE 2 complete - LWW rewrites done");
676
677        // --- PHASE 3: FILTERING FOR NEW ROWS ---
678        // After handling updates, we filter the incoming batch to remove the rows that were
679        // just rewritten. The remaining rows are guaranteed to be new additions to the store.
680        let batch_to_append = if !all_rewritten_ids.is_empty() {
681            let keep_mask: Vec<bool> = (0..row_id_arr.len())
682                .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
683                .collect();
684            let keep_array = BooleanArray::from(keep_mask);
685            compute::filter_record_batch(batch_ref, &keep_array)?
686        } else {
687            batch_ref.clone()
688        };
689
690        // If no new rows are left, we are done.
691        if batch_to_append.num_rows() == 0 {
692            tracing::trace!("ColumnStore::append early exit - no new rows to append");
693            return Ok(());
694        }
695
696        tracing::trace!("ColumnStore::append PHASE 3 complete - filtered for new rows");
697
698        // --- PHASE 4: APPENDING NEW DATA ---
699        // This is the main append transaction. All writes generated in this phase will be
700        // collected and committed atomically at the very end.
701        let append_schema = batch_to_append.schema();
702        let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
703        let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
704        let mut puts_appends: Vec<BatchPut> = Vec::new();
705
706        // Loop through each column of the filtered batch to append its data.
707        for (i, array) in batch_to_append.columns().iter().enumerate() {
708            if i == append_row_id_idx {
709                continue;
710            }
711
712            let field = append_schema.field(i);
713
714            let field_id = field
715                .metadata()
716                .get(crate::store::FIELD_ID_META_KEY)
717                .ok_or_else(|| Error::Internal("Missing field_id".into()))?
718                .parse::<u64>()
719                .map(LogicalFieldId::from)
720                .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
721
722            // Populate the data type cache for this column. This is a performance optimization
723            // to avoid reading a chunk from storage later just to determine its type.
724            self.dtype_cache.insert(field_id, field.data_type().clone());
725
726            // Null values are treated as deletions, so we filter them out. The `rids_clean`
727            // array contains the row IDs corresponding to the non-null values.
728            let (array_clean, rids_clean) = if array.null_count() == 0 {
729                (array.clone(), append_row_id_any.clone())
730            } else {
731                let keep =
732                    BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
733                let a = compute::filter(array, &keep)?;
734                let r = compute::filter(&append_row_id_any, &keep)?;
735                (a, r)
736            };
737
738            if array_clean.is_empty() {
739                continue;
740            }
741
742            // Get or create the physical keys for the column's data descriptor and its
743            // shadow row_id descriptor from the catalog.
744            let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
745                let mut catalog = self.catalog.write().unwrap();
746                let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
747                    catalog_dirty = true;
748                    self.pager.alloc_many(1).unwrap()[0]
749                });
750                let r_fid = rowid_fid(field_id);
751                let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
752                    catalog_dirty = true;
753                    self.pager.alloc_many(1).unwrap()[0]
754                });
755                (pk1, pk2, r_fid)
756            };
757
758            // Load the descriptors and their tail metadata pages into memory.
759            // If they don't exist, `load_or_create` will initialize new ones.
760            let (mut data_descriptor, mut data_tail_page) =
761                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
762                    .map_err(|e| {
763                        tracing::error!(
764                            ?field_id,
765                            descriptor_pk,
766                            error = ?e,
767                            "append: load_or_create failed for data descriptor"
768                        );
769                        e
770                    })?;
771            let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
772                Arc::clone(&self.pager),
773                rid_descriptor_pk,
774                rid_fid,
775            )
776            .map_err(|e| {
777                tracing::error!(
778                    ?rid_fid,
779                    rid_descriptor_pk,
780                    error = ?e,
781                    "append: load_or_create failed for rid descriptor"
782                );
783                e
784            })?;
785
786            // Logically register the Presence index on the main data descriptor. This ensures
787            // that even if no physical index chunks are created (because data arrived sorted),
788            // the system knows a Presence index is conceptually active for this column.
789            self.index_manager
790                .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
791
792            // Split the data to be appended into chunks of a target size.
793            let slices = split_to_target_bytes(
794                &array_clean,
795                TARGET_CHUNK_BYTES,
796                self.cfg.varwidth_fallback_rows_per_slice,
797            );
798            let mut row_off = 0usize;
799
800            // Loop through each new slice to create and stage its chunks.
801            for s in slices {
802                let rows = s.len();
803                // Create and stage the data chunk.
804                let data_pk = self.pager.alloc_many(1)?[0];
805                let s_norm = zero_offset(&s);
806                let data_bytes = serialize_array(s_norm.as_ref())?;
807                puts_appends.push(BatchPut::Raw {
808                    key: data_pk,
809                    bytes: data_bytes,
810                });
811
812                // Create and stage the corresponding row_id chunk.
813                let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
814                let rid_norm = zero_offset(&rid_slice);
815                let rid_pk = self.pager.alloc_many(1)?[0];
816                let rid_bytes = serialize_array(rid_norm.as_ref())?;
817                puts_appends.push(BatchPut::Raw {
818                    key: rid_pk,
819                    bytes: rid_bytes,
820                });
821
822                // Compute min/max for the row_id chunk to enable pruning during scans.
823                let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
824                let (min, max) = if !rids_for_meta.is_empty() {
825                    let mut min_val = rids_for_meta.value(0);
826                    let mut max_val = rids_for_meta.value(0);
827                    for i in 1..rids_for_meta.len() {
828                        let v = rids_for_meta.value(i);
829                        if v < min_val {
830                            min_val = v;
831                        }
832                        if v > max_val {
833                            max_val = v;
834                        }
835                    }
836                    (min_val, max_val)
837                } else {
838                    (0, 0)
839                };
840
841                // Create the initial metadata for both chunks.
842                // The `value_order_perm_pk` is initialized to 0 (None).
843                let mut data_meta = ChunkMetadata {
844                    chunk_pk: data_pk,
845                    row_count: rows as u64,
846                    serialized_bytes: s_norm.get_array_memory_size() as u64,
847                    max_val_u64: u64::MAX,
848                    ..Default::default()
849                };
850                let mut rid_meta = ChunkMetadata {
851                    chunk_pk: rid_pk,
852                    row_count: rows as u64,
853                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
854                    min_val_u64: min,
855                    max_val_u64: max,
856                    ..Default::default()
857                };
858
859                // **GENERIC INDEX UPDATE DISPATCH**
860                // This is the single, index-agnostic call. The IndexManager will look up all
861                // active indexes for this column (e.g., Presence, Sort) and call their respective
862                // `stage_update_for_new_chunk` methods. This is where the physical index data
863                // (like permutation blobs) is created and staged.
864                self.index_manager.stage_updates_for_new_chunk(
865                    field_id,
866                    &data_descriptor,
867                    &s_norm,
868                    &rid_norm,
869                    &mut data_meta,
870                    &mut rid_meta,
871                    &mut puts_appends,
872                )?;
873
874                // Append the (potentially modified) metadata to their respective descriptor chains.
875                self.append_meta_in_loop(
876                    &mut data_descriptor,
877                    &mut data_tail_page,
878                    data_meta,
879                    &mut puts_appends,
880                )?;
881                self.append_meta_in_loop(
882                    &mut rid_descriptor,
883                    &mut rid_tail_page,
884                    rid_meta,
885                    &mut puts_appends,
886                )?;
887                row_off += rows;
888            }
889
890            // After processing all slices, stage the final writes for the updated tail pages
891            // and the root descriptor objects themselves.
892            puts_appends.push(BatchPut::Raw {
893                key: data_descriptor.tail_page_pk,
894                bytes: data_tail_page,
895            });
896            puts_appends.push(BatchPut::Raw {
897                key: descriptor_pk,
898                bytes: data_descriptor.to_le_bytes(),
899            });
900            puts_appends.push(BatchPut::Raw {
901                key: rid_descriptor.tail_page_pk,
902                bytes: rid_tail_page,
903            });
904            puts_appends.push(BatchPut::Raw {
905                key: rid_descriptor_pk,
906                bytes: rid_descriptor.to_le_bytes(),
907            });
908        }
909
910        // --- PHASE 5: FINAL ATOMIC COMMIT ---
911        // If the catalog was modified (e.g., new columns were created), stage its write.
912        if catalog_dirty {
913            let catalog = self.catalog.read().unwrap();
914            puts_appends.push(BatchPut::Raw {
915                key: CATALOG_ROOT_PKEY,
916                bytes: catalog.to_bytes(),
917            });
918        }
919
920        // Commit all staged puts (new data chunks, new row_id chunks, new index permutations,
921        // updated descriptor pages, updated root descriptors, and the updated catalog)
922        // in a single atomic operation.
923        if !puts_appends.is_empty() {
924            self.pager.batch_put(&puts_appends)?;
925        }
926        tracing::trace!("ColumnStore::append END - success");
927        Ok(())
928    }
929
930    fn lww_rewrite_for_field(
931        &self,
932        catalog: &mut ColumnCatalog,
933        field_id: LogicalFieldId,
934        incoming_ids_map: &FxHashMap<u64, usize>,
935        incoming_data: &ArrayRef,
936        incoming_row_ids: &ArrayRef,
937        puts: &mut Vec<BatchPut>,
938    ) -> Result<FxHashSet<u64>> {
939        use crate::store::descriptor::DescriptorIterator;
940        use crate::store::ingest::ChunkEdit;
941
942        // Fast exit if nothing to rewrite.
943        if incoming_ids_map.is_empty() {
944            return Ok(FxHashSet::default());
945        }
946        let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
947
948        // Resolve descriptors for data and row_id columns.
949        let desc_pk_data = match catalog.map.get(&field_id) {
950            Some(pk) => *pk,
951            None => return Ok(FxHashSet::default()),
952        };
953        let rid_fid = rowid_fid(field_id);
954        let desc_pk_rid = match catalog.map.get(&rid_fid) {
955            Some(pk) => *pk,
956            None => return Ok(FxHashSet::default()),
957        };
958
959        // Batch fetch both descriptors.
960        let gets = vec![
961            BatchGet::Raw { key: desc_pk_data },
962            BatchGet::Raw { key: desc_pk_rid },
963        ];
964        let results = self.pager.batch_get(&gets)?;
965        let mut blobs_by_pk = FxHashMap::default();
966        for r in results {
967            if let GetResult::Raw { key, bytes } = r {
968                blobs_by_pk.insert(key, bytes);
969            }
970        }
971
972        let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
973            tracing::error!(
974                ?field_id,
975                desc_pk_data,
976                "lww_rewrite: data descriptor blob not found in pager"
977            );
978            Error::NotFound
979        })?;
980        let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
981
982        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
983            tracing::error!(
984                ?rid_fid,
985                desc_pk_rid,
986                "lww_rewrite: rid descriptor blob not found in pager"
987            );
988            Error::NotFound
989        })?;
990        let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
991
992        tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
993
994        // Collect chunk metadata.
995        let mut metas_data: Vec<ChunkMetadata> = Vec::new();
996        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
997        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
998            metas_data.push(m.map_err(|e| {
999                tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1000                e
1001            })?);
1002        }
1003        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1004            metas_rid.push(m.map_err(|e| {
1005                tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1006                e
1007            })?);
1008        }
1009
1010        tracing::trace!(
1011            ?field_id,
1012            data_chunks = metas_data.len(),
1013            rid_chunks = metas_rid.len(),
1014            "lww_rewrite: chunk metadata collected"
1015        );
1016
1017        // Classify incoming rows: delete vs upsert.
1018        let rid_in = incoming_row_ids
1019            .as_any()
1020            .downcast_ref::<UInt64Array>()
1021            .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1022        let mut ids_to_delete = FxHashSet::default();
1023        let mut ids_to_upsert = FxHashSet::default();
1024        for i in 0..rid_in.len() {
1025            let rid = rid_in.value(i);
1026            if incoming_data.is_null(i) {
1027                ids_to_delete.insert(rid);
1028            } else {
1029                ids_to_upsert.insert(rid);
1030            }
1031        }
1032
1033        // Scan row_id chunks to find hits, bucketed by chunk index.
1034        let mut rewritten_ids = FxHashSet::default();
1035        let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1036        let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1037
1038        let n = metas_data.len().min(metas_rid.len());
1039        if n > 0 {
1040            // Fetch only row_id chunks to locate matches.
1041            let mut gets_rid = Vec::with_capacity(n);
1042            for rm in metas_rid.iter().take(n) {
1043                gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
1044            }
1045            let rid_results = self.pager.batch_get(&gets_rid)?;
1046            let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1047            for r in rid_results {
1048                if let GetResult::Raw { key, bytes } = r {
1049                    rid_blobs.insert(key, bytes);
1050                }
1051            }
1052
1053            for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
1054                if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1055                    let rid_arr_any = deserialize_array(rid_blob.clone())?;
1056                    let rid_arr = rid_arr_any
1057                        .as_any()
1058                        .downcast_ref::<UInt64Array>()
1059                        .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1060                    for j in 0..rid_arr.len() {
1061                        let rid = rid_arr.value(j);
1062                        if incoming_ids.contains(&rid) {
1063                            if ids_to_delete.contains(&rid) {
1064                                hit_del.entry(i).or_default().push(rid);
1065                            } else if ids_to_upsert.contains(&rid) {
1066                                hit_up.entry(i).or_default().push(rid);
1067                            }
1068                            rewritten_ids.insert(rid);
1069                        }
1070                    }
1071                }
1072            }
1073        }
1074
1075        if hit_up.is_empty() && hit_del.is_empty() {
1076            return Ok(rewritten_ids);
1077        }
1078
1079        // Batch fetch data+rid blobs for all chunks with hits.
1080        let mut hit_set = FxHashSet::default();
1081        hit_set.extend(hit_up.keys().copied());
1082        hit_set.extend(hit_del.keys().copied());
1083        let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1084
1085        let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1086        for &i in &hit_idxs {
1087            gets.push(BatchGet::Raw {
1088                key: metas_data[i].chunk_pk,
1089            });
1090            gets.push(BatchGet::Raw {
1091                key: metas_rid[i].chunk_pk,
1092            });
1093        }
1094        let results = self.pager.batch_get(&gets)?;
1095        let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1096        for r in results {
1097            if let GetResult::Raw { key, bytes } = r {
1098                blob_map.insert(key, bytes);
1099            }
1100        }
1101
1102        // Apply per-chunk edits and stage writebacks.
1103        for i in hit_idxs {
1104            let old_data_arr =
1105                deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1106            let old_rid_arr_any =
1107                deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1108            let old_rid_arr = old_rid_arr_any
1109                .as_any()
1110                .downcast_ref::<UInt64Array>()
1111                .unwrap();
1112
1113            let up_vec = hit_up.remove(&i).unwrap_or_default();
1114            let del_vec = hit_del.remove(&i).unwrap_or_default();
1115
1116            // Centralized LWW edit: builds keep mask and inject tails.
1117            let edit = ChunkEdit::from_lww_upsert(
1118                old_rid_arr,
1119                &up_vec,
1120                &del_vec,
1121                incoming_data,
1122                incoming_row_ids,
1123                incoming_ids_map,
1124            )?;
1125
1126            let (new_data_arr, new_rid_arr) =
1127                ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1128
1129            // Stage data writeback.
1130            let data_bytes = serialize_array(&new_data_arr)?;
1131            puts.push(BatchPut::Raw {
1132                key: metas_data[i].chunk_pk,
1133                bytes: data_bytes,
1134            });
1135            metas_data[i].row_count = new_data_arr.len() as u64;
1136            metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1137
1138            // Stage row_id writeback.
1139            if let Some(rarr) = new_rid_arr {
1140                let rid_bytes = serialize_array(&rarr)?;
1141                puts.push(BatchPut::Raw {
1142                    key: metas_rid[i].chunk_pk,
1143                    bytes: rid_bytes,
1144                });
1145                metas_rid[i].row_count = rarr.len() as u64;
1146                metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1147            }
1148
1149            // Refresh permutation if present.
1150            if metas_data[i].value_order_perm_pk != 0 {
1151                let sort_col = SortColumn {
1152                    values: new_data_arr,
1153                    options: None,
1154                };
1155                let idx = lexsort_to_indices(&[sort_col], None)?;
1156                let perm_bytes = serialize_array(&idx)?;
1157                puts.push(BatchPut::Raw {
1158                    key: metas_data[i].value_order_perm_pk,
1159                    bytes: perm_bytes,
1160                });
1161            }
1162        }
1163
1164        // Rewrite descriptor chains/totals for both columns.
1165        descriptor_data.rewrite_pages(
1166            Arc::clone(&self.pager),
1167            desc_pk_data,
1168            &mut metas_data,
1169            puts,
1170        )?;
1171        descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1172
1173        Ok(rewritten_ids)
1174    }
1175
1176    fn stage_delete_rows_for_field(
1177        &self,
1178        field_id: LogicalFieldId,
1179        rows_to_delete: &[RowId],
1180        staged_puts: &mut Vec<BatchPut>,
1181    ) -> Result<bool> {
1182        tracing::warn!(
1183            field_id = ?field_id,
1184            rows = rows_to_delete.len(),
1185            "delete_rows stage_delete_rows_for_field: start"
1186        );
1187        use crate::store::descriptor::DescriptorIterator;
1188        use crate::store::ingest::ChunkEdit;
1189
1190        if rows_to_delete.is_empty() {
1191            return Ok(false);
1192        }
1193
1194        // Stream and validate ascending, unique positions.
1195        let mut del_iter = rows_to_delete.iter().copied();
1196        let mut cur_del = del_iter.next();
1197        let mut last_seen: Option<u64> = cur_del;
1198
1199        // Lookup descriptors (data and optional row_id).
1200        let catalog = self.catalog.read().unwrap();
1201        let desc_pk = match catalog.map.get(&field_id) {
1202            Some(pk) => *pk,
1203            None => {
1204                tracing::trace!(
1205                    field_id = ?field_id,
1206                    "delete_rows stage_delete_rows_for_field: data descriptor missing"
1207                );
1208                return Err(Error::NotFound);
1209            }
1210        };
1211        let rid_fid = rowid_fid(field_id);
1212        let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1213        tracing::warn!(
1214            field_id = ?field_id,
1215            desc_pk,
1216            desc_pk_rid = ?desc_pk_rid,
1217            "delete_rows stage_delete_rows_for_field: descriptor keys"
1218        );
1219
1220        // Batch fetch descriptor blobs up front.
1221        let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1222        if let Some(pk) = desc_pk_rid {
1223            gets.push(BatchGet::Raw { key: pk });
1224        }
1225        let results = match self.pager.batch_get(&gets) {
1226            Ok(res) => res,
1227            Err(err) => {
1228                tracing::trace!(
1229                    field_id = ?field_id,
1230                    error = ?err,
1231                    "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1232                );
1233                return Err(err);
1234            }
1235        };
1236        let mut blobs_by_pk = FxHashMap::default();
1237        for res in results {
1238            if let GetResult::Raw { key, bytes } = res {
1239                blobs_by_pk.insert(key, bytes);
1240            }
1241        }
1242
1243        tracing::warn!(
1244            field_id = ?field_id,
1245            desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1246            rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1247            "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1248        );
1249
1250        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1251            tracing::trace!(
1252                field_id = ?field_id,
1253                desc_pk,
1254                "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1255            );
1256            Error::Internal(format!(
1257                "descriptor pk={} missing during delete_rows for field {:?}",
1258                desc_pk, field_id
1259            ))
1260        })?;
1261        let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1262
1263        // Build metas for data column.
1264        let mut metas: Vec<ChunkMetadata> = Vec::new();
1265        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1266            metas.push(m?);
1267        }
1268        if metas.is_empty() {
1269            drop(catalog);
1270            return Ok(false);
1271        }
1272
1273        // Optionally mirror metas for row_id column.
1274        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1275        let mut descriptor_rid: Option<ColumnDescriptor> = None;
1276        tracing::warn!(
1277            field_id = ?field_id,
1278            metas_len = metas.len(),
1279            desc_pk_rid = ?desc_pk_rid,
1280            "delete_rows stage_delete_rows_for_field: data metas loaded"
1281        );
1282        if let Some(pk_rid) = desc_pk_rid
1283            && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1284        {
1285            let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1286            for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1287                metas_rid.push(m?);
1288            }
1289            descriptor_rid = Some(d_rid);
1290        }
1291
1292        tracing::warn!(
1293            field_id = ?field_id,
1294            metas_rid_len = metas_rid.len(),
1295            "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1296        );
1297
1298        let mut cum_rows = 0u64;
1299        let mut any_changed = false;
1300
1301        for (i, meta) in metas.iter_mut().enumerate() {
1302            let start_u64 = cum_rows;
1303            let end_u64 = start_u64 + meta.row_count;
1304
1305            // Advance deletes into this chunk window [start, end).
1306            while let Some(d) = cur_del {
1307                if d < start_u64
1308                    && let Some(prev) = last_seen
1309                {
1310                    if d < prev {
1311                        return Err(Error::Internal(
1312                            "rows_to_delete must be ascending/unique".into(),
1313                        ));
1314                    }
1315
1316                    last_seen = Some(d);
1317                    cur_del = del_iter.next();
1318                } else {
1319                    break;
1320                }
1321            }
1322
1323            // Collect local delete indices.
1324            let rows = meta.row_count as usize;
1325            let mut del_local: FxHashSet<usize> = FxHashSet::default();
1326            while let Some(d) = cur_del {
1327                if d >= end_u64 {
1328                    break;
1329                }
1330                del_local.insert((d - start_u64) as usize);
1331                last_seen = Some(d);
1332                cur_del = del_iter.next();
1333            }
1334
1335            if del_local.is_empty() {
1336                cum_rows = end_u64;
1337                continue;
1338            }
1339
1340            // Batch get chunk blobs (data and optional row_id).
1341            let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1342            if let Some(rm) = metas_rid.get(i) {
1343                chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1344            }
1345            let chunk_results = match self.pager.batch_get(&chunk_gets) {
1346                Ok(res) => res,
1347                Err(err) => {
1348                    tracing::trace!(
1349                        field_id = ?field_id,
1350                        chunk_pk = meta.chunk_pk,
1351                        error = ?err,
1352                        "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1353                    );
1354                    return Err(err);
1355                }
1356            };
1357            let mut chunk_blobs = FxHashMap::default();
1358            for res in chunk_results {
1359                if let GetResult::Raw { key, bytes } = res {
1360                    chunk_blobs.insert(key, bytes);
1361                }
1362            }
1363
1364            tracing::warn!(
1365                field_id = ?field_id,
1366                chunk_pk = meta.chunk_pk,
1367                rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1368                data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1369                rid_found = metas_rid
1370                    .get(i)
1371                    .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1372                "delete_rows stage_delete_rows_for_field: chunk fetch status"
1373            );
1374
1375            let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1376                Some(bytes) => bytes,
1377                None => {
1378                    tracing::trace!(
1379                        field_id = ?field_id,
1380                        chunk_pk = meta.chunk_pk,
1381                        "delete_rows stage_delete_rows_for_field: chunk missing"
1382                    );
1383                    return Err(Error::NotFound);
1384                }
1385            };
1386            let data_arr = deserialize_array(data_blob)?;
1387
1388            let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1389                let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1390                    Some(bytes) => bytes,
1391                    None => {
1392                        tracing::trace!(
1393                            field_id = ?field_id,
1394                            rowid_chunk_pk = rm.chunk_pk,
1395                            "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1396                        );
1397                        return Err(Error::NotFound);
1398                    }
1399                };
1400                Some(deserialize_array(rid_blob)?)
1401            } else {
1402                None
1403            };
1404
1405            // *** Wired: build edit via ingest helper.
1406            let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1407
1408            // Apply edit (pure array ops).
1409            let (new_data_arr, new_rid_arr) =
1410                ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1411
1412            // Write back data.
1413            let data_bytes = serialize_array(&new_data_arr)?;
1414            staged_puts.push(BatchPut::Raw {
1415                key: meta.chunk_pk,
1416                bytes: data_bytes,
1417            });
1418            meta.row_count = new_data_arr.len() as u64;
1419            meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1420
1421            // Write back row_ids if present.
1422            if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1423                let rm = metas_rid.get_mut(i).unwrap();
1424                let rid_bytes = serialize_array(&rids)?;
1425                staged_puts.push(BatchPut::Raw {
1426                    key: rm.chunk_pk,
1427                    bytes: rid_bytes,
1428                });
1429                rm.row_count = rids.len() as u64;
1430                rm.serialized_bytes = rids.get_array_memory_size() as u64;
1431            }
1432
1433            // Refresh permutation if this chunk has one.
1434            if meta.value_order_perm_pk != 0 {
1435                let sort_column = SortColumn {
1436                    values: new_data_arr,
1437                    options: None,
1438                };
1439                let indices = lexsort_to_indices(&[sort_column], None)?;
1440                let perm_bytes = serialize_array(&indices)?;
1441                staged_puts.push(BatchPut::Raw {
1442                    key: meta.value_order_perm_pk,
1443                    bytes: perm_bytes,
1444                });
1445            }
1446
1447            cum_rows = end_u64;
1448            any_changed = true;
1449        }
1450
1451        // Rewrite descriptor chains/totals and commit.
1452        descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1453        if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1454            rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1455        }
1456        drop(catalog);
1457        tracing::trace!(
1458            field_id = ?field_id,
1459            changed = any_changed,
1460            "delete_rows stage_delete_rows_for_field: finished stage"
1461        );
1462        Ok(any_changed)
1463    }
1464
1465    /// Delete row positions for one or more logical fields in a single atomic batch.
1466    ///
1467    /// The same set of global row positions is applied to every field in
1468    /// `fields`. All staged metadata and chunk updates are committed in a
1469    /// single pager batch.
1470    pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1471        if fields.is_empty() || rows_to_delete.is_empty() {
1472            return Ok(());
1473        }
1474
1475        let mut puts = Vec::new();
1476        let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1477        let mut table_id: Option<TableId> = None;
1478
1479        tracing::warn!(
1480            fields = fields.len(),
1481            rows = rows_to_delete.len(),
1482            "delete_rows begin"
1483        );
1484        for field_id in fields {
1485            tracing::warn!(field = ?field_id, "delete_rows iter field");
1486            if let Some(expected) = table_id {
1487                if field_id.table_id() != expected {
1488                    return Err(Error::InvalidArgumentError(
1489                        "delete_rows requires fields from the same table".into(),
1490                    ));
1491                }
1492            } else {
1493                table_id = Some(field_id.table_id());
1494            }
1495
1496            if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1497                touched.insert(*field_id);
1498            }
1499        }
1500
1501        if puts.is_empty() {
1502            return Ok(());
1503        }
1504
1505        self.pager.batch_put(&puts)?;
1506
1507        tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1508
1509        for field_id in touched {
1510            self.compact_field_bounded(field_id)?;
1511        }
1512        tracing::warn!("delete_rows complete");
1513        Ok(())
1514    }
1515
1516    // TODO: Move to descriptor module?
1517    /// Write a descriptor chain from a meta slice. Reuses first page when
1518    /// possible. Frees surplus pages via `frees`.
1519    pub(crate) fn write_descriptor_chain(
1520        &self,
1521        descriptor_pk: PhysicalKey,
1522        descriptor: &mut ColumnDescriptor,
1523        new_metas: &[ChunkMetadata],
1524        puts: &mut Vec<BatchPut>,
1525        frees: &mut Vec<PhysicalKey>,
1526    ) -> Result<()> {
1527        // Collect existing page chain.
1528        let mut old_pages = Vec::new();
1529        let mut pk = descriptor.head_page_pk;
1530        while pk != 0 {
1531            let page_blob = self
1532                .pager
1533                .batch_get(&[BatchGet::Raw { key: pk }])?
1534                .pop()
1535                .and_then(|res| match res {
1536                    GetResult::Raw { bytes, .. } => Some(bytes),
1537                    _ => None,
1538                })
1539                .ok_or(Error::NotFound)?;
1540            let header = DescriptorPageHeader::from_le_bytes(
1541                &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1542            );
1543            old_pages.push(pk);
1544            pk = header.next_page_pk;
1545        }
1546
1547        // Required pages for new metas.
1548        let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1549        let need_pages = if new_metas.is_empty() {
1550            0
1551        } else {
1552            new_metas.len().div_ceil(per)
1553        };
1554        // If empty: free everything and clear counters.
1555        if need_pages == 0 {
1556            frees.extend(old_pages.iter().copied());
1557
1558            // Setting both page pointers to 0 indicates an empty descriptor state.
1559            // This signals that the descriptor needs reinitialization on next access,
1560            // as this pattern (head_page_pk == 0 && tail_page_pk == 0) is checked
1561            // in the `load_or_create` method to determine if allocation is needed.
1562            descriptor.head_page_pk = 0;
1563            descriptor.tail_page_pk = 0;
1564            descriptor.total_row_count = 0;
1565            descriptor.total_chunk_count = 0;
1566            puts.push(BatchPut::Raw {
1567                key: descriptor_pk,
1568                bytes: descriptor.to_le_bytes(),
1569            });
1570            return Ok(());
1571        }
1572
1573        // Reuse first page; alloc more if needed.
1574        let mut pages = Vec::with_capacity(need_pages);
1575        if !old_pages.is_empty() {
1576            pages.push(old_pages[0]);
1577        } else {
1578            pages.push(self.pager.alloc_many(1)?[0]);
1579            descriptor.head_page_pk = pages[0];
1580        }
1581        if need_pages > pages.len() {
1582            let extra = self.pager.alloc_many(need_pages - pages.len())?;
1583            pages.extend(extra);
1584        }
1585
1586        // Free surplus old pages.
1587        if old_pages.len() > need_pages {
1588            frees.extend(old_pages[need_pages..].iter().copied());
1589        }
1590
1591        // Write page contents.
1592        let mut off = 0usize;
1593        for (i, page_pk) in pages.iter().copied().enumerate() {
1594            let remain = new_metas.len() - off;
1595            let count = remain.min(per);
1596            let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1597            let header = DescriptorPageHeader {
1598                next_page_pk: next,
1599                entry_count: count as u32,
1600                _padding: [0; 4],
1601            };
1602            let mut page_bytes = header.to_le_bytes().to_vec();
1603            for m in &new_metas[off..off + count] {
1604                page_bytes.extend_from_slice(&m.to_le_bytes());
1605            }
1606            puts.push(BatchPut::Raw {
1607                key: page_pk,
1608                bytes: page_bytes,
1609            });
1610            off += count;
1611        }
1612
1613        descriptor.tail_page_pk = *pages.last().unwrap();
1614        descriptor.total_chunk_count = new_metas.len() as u64;
1615        descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1616        puts.push(BatchPut::Raw {
1617            key: descriptor_pk,
1618            bytes: descriptor.to_le_bytes(),
1619        });
1620        Ok(())
1621    }
1622
1623    /// Bounded, local field compaction. Merges adjacent small chunks into
1624    /// ~TARGET_CHUNK_BYTES; leaves large chunks intact. Recomputes perms if
1625    /// any source chunk in a run had one. Frees obsolete chunks/pages.
1626    fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1627        // We may rewrite descriptors; take a write lock.
1628        let mut catalog = self.catalog.write().unwrap();
1629
1630        let desc_pk = match catalog.map.get(&field_id) {
1631            Some(&pk) => pk,
1632            None => return Ok(()),
1633        };
1634        let rid_fid = rowid_fid(field_id);
1635        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1636            Some(&pk) => pk,
1637            None => return Ok(()),
1638        };
1639
1640        // True batching for the two descriptor reads.
1641        let gets = vec![
1642            BatchGet::Raw { key: desc_pk },
1643            BatchGet::Raw { key: desc_pk_rid },
1644        ];
1645        let results = self.pager.batch_get(&gets)?;
1646        let mut blobs_by_pk = FxHashMap::default();
1647        for res in results {
1648            if let GetResult::Raw { key, bytes } = res {
1649                blobs_by_pk.insert(key, bytes);
1650            }
1651        }
1652        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1653        let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1654        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1655        let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1656
1657        // Load metas.
1658        let mut metas = Vec::new();
1659        for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1660            metas.push(m?);
1661        }
1662        let mut metas_rid = Vec::new();
1663        for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1664            metas_rid.push(m?);
1665        }
1666        if metas.is_empty() || metas_rid.is_empty() {
1667            return Ok(());
1668        }
1669
1670        let mut puts: Vec<BatchPut> = Vec::new();
1671        let mut frees: Vec<PhysicalKey> = Vec::new();
1672        let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1673        let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1674
1675        let mut i = 0usize;
1676        while i < metas.len() {
1677            let sz = metas[i].serialized_bytes as usize;
1678            // Keep large chunks as-is.
1679            if sz >= MIN_CHUNK_BYTES {
1680                new_metas.push(metas[i]);
1681                new_rid_metas.push(metas_rid[i]);
1682                i += 1;
1683                continue;
1684            }
1685
1686            // Build a small run [i, j) capped by MAX_MERGE_RUN_BYTES.
1687            let mut j = i;
1688            let mut run_bytes = 0usize;
1689            while j < metas.len() {
1690                let b = metas[j].serialized_bytes as usize;
1691                if b >= TARGET_CHUNK_BYTES {
1692                    break;
1693                }
1694                if run_bytes + b > MAX_MERGE_RUN_BYTES {
1695                    break;
1696                }
1697                run_bytes += b;
1698                j += 1;
1699            }
1700            if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1701                new_metas.push(metas[i]);
1702                new_rid_metas.push(metas_rid[i]);
1703                i += 1;
1704                continue;
1705            }
1706
1707            // Fetch and concatenate the run's data and row_ids.
1708            let mut gets = Vec::with_capacity((j - i) * 2);
1709            for k in i..j {
1710                gets.push(BatchGet::Raw {
1711                    key: metas[k].chunk_pk,
1712                });
1713                gets.push(BatchGet::Raw {
1714                    key: metas_rid[k].chunk_pk,
1715                });
1716            }
1717            let results = self.pager.batch_get(&gets)?;
1718            let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1719            for r in results {
1720                match r {
1721                    GetResult::Raw { key, bytes } => {
1722                        by_pk.insert(key, bytes);
1723                    }
1724                    _ => return Err(Error::NotFound),
1725                }
1726            }
1727            let mut data_parts = Vec::with_capacity(j - i);
1728            let mut rid_parts = Vec::with_capacity(j - i);
1729            for k in i..j {
1730                let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1731                data_parts.push(deserialize_array(db.clone())?);
1732                let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1733                rid_parts.push(deserialize_array(rb.clone())?);
1734            }
1735            let merged_data = concat_many(data_parts.iter().collect())?;
1736            let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1737
1738            // Split merged run into ~target-sized chunks.
1739            let slices = split_to_target_bytes(
1740                &merged_data,
1741                TARGET_CHUNK_BYTES,
1742                self.cfg.varwidth_fallback_rows_per_slice,
1743            );
1744            let mut rid_off = 0usize;
1745            let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1746
1747            for s in slices {
1748                let rows = s.len();
1749                // Slice rid to match s (avoid double Arc).
1750                let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1751                let rid_norm = zero_offset(&rid_ref);
1752                let rid_pk = self.pager.alloc_many(1)?[0];
1753                let rid_bytes = serialize_array(rid_norm.as_ref())?;
1754
1755                let data_pk = self.pager.alloc_many(1)?[0];
1756                let s_norm = zero_offset(&s);
1757                let data_bytes = serialize_array(s_norm.as_ref())?;
1758                puts.push(BatchPut::Raw {
1759                    key: data_pk,
1760                    bytes: data_bytes,
1761                });
1762                puts.push(BatchPut::Raw {
1763                    key: rid_pk,
1764                    bytes: rid_bytes,
1765                });
1766                let mut meta = ChunkMetadata {
1767                    chunk_pk: data_pk,
1768                    row_count: rows as u64,
1769                    serialized_bytes: s_norm.get_array_memory_size() as u64,
1770                    max_val_u64: u64::MAX,
1771                    ..Default::default()
1772                };
1773                // If any source chunk had a perm, recompute for this slice.
1774                if need_perms {
1775                    let sort_col = SortColumn {
1776                        values: s.clone(),
1777                        options: None,
1778                    };
1779                    let idx = lexsort_to_indices(&[sort_col], None)?;
1780                    let perm_bytes = serialize_array(&idx)?;
1781                    let perm_pk = self.pager.alloc_many(1)?[0];
1782                    puts.push(BatchPut::Raw {
1783                        key: perm_pk,
1784                        bytes: perm_bytes,
1785                    });
1786                    meta.value_order_perm_pk = perm_pk;
1787                }
1788
1789                // Build presence index for rids and min/max
1790                let rid_any = rid_norm.clone();
1791                let rids = rid_any
1792                    .as_any()
1793                    .downcast_ref::<UInt64Array>()
1794                    .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1795                let mut min = u64::MAX;
1796                let mut max = 0u64;
1797                let mut sorted_rids = true;
1798                let mut last_v = 0u64;
1799                for ii in 0..rids.len() {
1800                    let v = rids.value(ii);
1801                    if ii == 0 {
1802                        last_v = v;
1803                    } else if v < last_v {
1804                        sorted_rids = false;
1805                    } else {
1806                        last_v = v;
1807                    }
1808                    if v < min {
1809                        min = v;
1810                    }
1811                    if v > max {
1812                        max = v;
1813                    }
1814                }
1815                let mut rid_perm_pk = 0u64;
1816                if !sorted_rids {
1817                    let rid_sort_col = SortColumn {
1818                        values: rid_any,
1819                        options: None,
1820                    };
1821                    let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1822                    let rid_perm_bytes = serialize_array(&rid_idx)?;
1823                    rid_perm_pk = self.pager.alloc_many(1)?[0];
1824                    puts.push(BatchPut::Raw {
1825                        key: rid_perm_pk,
1826                        bytes: rid_perm_bytes,
1827                    });
1828                }
1829                let rid_meta = ChunkMetadata {
1830                    chunk_pk: rid_pk,
1831                    value_order_perm_pk: rid_perm_pk,
1832                    row_count: rows as u64,
1833                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
1834                    min_val_u64: if rows > 0 { min } else { 0 },
1835                    max_val_u64: if rows > 0 { max } else { 0 },
1836                };
1837                new_metas.push(meta);
1838                new_rid_metas.push(rid_meta);
1839                rid_off += rows;
1840            }
1841
1842            // Free all old data/rid/perms in the merged run.
1843            for k in i..j {
1844                frees.push(metas[k].chunk_pk);
1845                if metas[k].value_order_perm_pk != 0 {
1846                    frees.push(metas[k].value_order_perm_pk);
1847                }
1848                frees.push(metas_rid[k].chunk_pk);
1849                if metas_rid[k].value_order_perm_pk != 0 {
1850                    frees.push(metas_rid[k].value_order_perm_pk);
1851                }
1852            }
1853
1854            i = j;
1855        }
1856
1857        // If everything was deleted, drop the field entirely.
1858        if new_metas.is_empty() {
1859            // Drop descriptors and catalog mapping.
1860            self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1861            self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1862            catalog.map.remove(&field_id);
1863            catalog.map.remove(&rid_fid);
1864            puts.push(BatchPut::Raw {
1865                key: CATALOG_ROOT_PKEY,
1866                bytes: catalog.to_bytes(),
1867            });
1868            if !puts.is_empty() {
1869                self.pager.batch_put(&puts)?;
1870            }
1871            if !frees.is_empty() {
1872                self.pager.free_many(&frees)?;
1873            }
1874            return Ok(());
1875        }
1876
1877        // Rewrite descriptor chains to match the new meta lists.
1878        self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1879        self.write_descriptor_chain(
1880            desc_pk_rid,
1881            &mut desc_rid,
1882            &new_rid_metas,
1883            &mut puts,
1884            &mut frees,
1885        )?;
1886        // Persist new/updated blobs and free the old ones.
1887        if !puts.is_empty() {
1888            self.pager.batch_put(&puts)?;
1889        }
1890        if !frees.is_empty() {
1891            self.pager.free_many(&frees)?;
1892        }
1893
1894        Ok(())
1895    }
1896
1897    /// (Internal) Helper for batch appends. Appends metadata to the current
1898    /// in-memory tail page, creating a new one if necessary.
1899    fn append_meta_in_loop(
1900        &self,
1901        descriptor: &mut ColumnDescriptor,
1902        tail_page_bytes: &mut Vec<u8>,
1903        meta: ChunkMetadata,
1904        puts: &mut Vec<BatchPut>,
1905    ) -> Result<()> {
1906        let mut header = DescriptorPageHeader::from_le_bytes(
1907            &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1908        );
1909        if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1910            && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1911        {
1912            // Case 1: There's room in the current tail page.
1913            tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1914            header.entry_count += 1;
1915            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1916                .copy_from_slice(&header.to_le_bytes());
1917        } else {
1918            // Case 2: The tail page is full. Write it out and start a new one.
1919            let new_tail_pk = self.pager.alloc_many(1)?[0];
1920            header.next_page_pk = new_tail_pk;
1921            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1922                .copy_from_slice(&header.to_le_bytes());
1923            // --- PERFORMANCE FIX: Move the full page's bytes instead of
1924            // cloning ---
1925            let full_page_to_write = std::mem::take(tail_page_bytes);
1926            puts.push(BatchPut::Raw {
1927                key: descriptor.tail_page_pk,
1928                bytes: full_page_to_write,
1929            });
1930            // Create the new tail page.
1931            let new_header = DescriptorPageHeader {
1932                next_page_pk: 0,
1933                entry_count: 1,
1934                _padding: [0; 4],
1935            };
1936            let mut new_page_bytes = new_header.to_le_bytes().to_vec();
1937            new_page_bytes.extend_from_slice(&meta.to_le_bytes());
1938            // Update our live state to point to the new tail.
1939            descriptor.tail_page_pk = new_tail_pk;
1940            *tail_page_bytes = new_page_bytes;
1941        }
1942
1943        descriptor.total_row_count += meta.row_count;
1944        descriptor.total_chunk_count += 1;
1945        Ok(())
1946    }
1947
1948    /// Verifies the integrity of the column store's metadata.
1949    ///
1950    /// This check is useful for tests and debugging. It verifies:
1951    /// 1. The catalog can be read.
1952    /// 2. All descriptor chains are walkable.
1953    /// 3. The row and chunk counts in the descriptors match the sum of the
1954    ///    chunk metadata.
1955    ///
1956    /// Returns `Ok(())` if consistent, otherwise returns an `Error`.
1957    pub fn verify_integrity(&self) -> Result<()> {
1958        let catalog = self.catalog.read().unwrap();
1959        for (&field_id, &descriptor_pk) in &catalog.map {
1960            let desc_blob = self
1961                .pager
1962                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1963                .pop()
1964                .and_then(|r| match r {
1965                    GetResult::Raw { bytes, .. } => Some(bytes),
1966                    _ => None,
1967                })
1968                .ok_or_else(|| {
1969                    Error::Internal(format!(
1970                        "Catalog points to missing descriptor pk={}",
1971                        descriptor_pk
1972                    ))
1973                })?;
1974            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1975            if descriptor.field_id != field_id {
1976                return Err(Error::Internal(format!(
1977                    "Descriptor at pk={} has wrong field_id: expected {:?}, \
1978                     got {:?}",
1979                    descriptor_pk, field_id, descriptor.field_id
1980                )));
1981            }
1982
1983            let mut actual_rows = 0;
1984            let mut actual_chunks = 0;
1985            let mut current_page_pk = descriptor.head_page_pk;
1986            while current_page_pk != 0 {
1987                let page_blob = self
1988                    .pager
1989                    .batch_get(&[BatchGet::Raw {
1990                        key: current_page_pk,
1991                    }])?
1992                    .pop()
1993                    .and_then(|r| match r {
1994                        GetResult::Raw { bytes, .. } => Some(bytes),
1995                        _ => None,
1996                    })
1997                    .ok_or_else(|| {
1998                        Error::Internal(format!(
1999                            "Descriptor page chain broken at pk={}",
2000                            current_page_pk
2001                        ))
2002                    })?;
2003                let header = DescriptorPageHeader::from_le_bytes(
2004                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2005                );
2006                for i in 0..(header.entry_count as usize) {
2007                    let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2008                    let end = off + ChunkMetadata::DISK_SIZE;
2009                    let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2010                    actual_rows += meta.row_count;
2011                    actual_chunks += 1;
2012                }
2013                current_page_pk = header.next_page_pk;
2014            }
2015
2016            if descriptor.total_row_count != actual_rows {
2017                return Err(Error::Internal(format!(
2018                    "Row count mismatch for field {:?}: descriptor says {}, \
2019                     actual is {}",
2020                    field_id, descriptor.total_row_count, actual_rows
2021                )));
2022            }
2023            if descriptor.total_chunk_count != actual_chunks {
2024                return Err(Error::Internal(format!(
2025                    "Chunk count mismatch for field {:?}: descriptor says {}, \
2026                     actual is {}",
2027                    field_id, descriptor.total_chunk_count, actual_chunks
2028                )));
2029            }
2030        }
2031        Ok(())
2032    }
2033
2034    /// Gathers detailed statistics about the storage layout.
2035    ///
2036    /// This method is designed for low-level analysis and debugging, allowing
2037    /// you to check for under- or over-utilization of descriptor pages.
2038    pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2039        let catalog = self.catalog.read().unwrap();
2040        let mut all_stats = Vec::new();
2041
2042        for (&field_id, &descriptor_pk) in &catalog.map {
2043            let desc_blob = self
2044                .pager
2045                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2046                .pop()
2047                .and_then(|r| match r {
2048                    GetResult::Raw { bytes, .. } => Some(bytes),
2049                    _ => None,
2050                })
2051                .ok_or(Error::NotFound)?;
2052            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2053
2054            let mut page_stats = Vec::new();
2055            let mut current_page_pk = descriptor.head_page_pk;
2056            while current_page_pk != 0 {
2057                let page_blob = self
2058                    .pager
2059                    .batch_get(&[BatchGet::Raw {
2060                        key: current_page_pk,
2061                    }])?
2062                    .pop()
2063                    .and_then(|r| match r {
2064                        GetResult::Raw { bytes, .. } => Some(bytes),
2065                        _ => None,
2066                    })
2067                    .ok_or(Error::NotFound)?;
2068                let header = DescriptorPageHeader::from_le_bytes(
2069                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2070                );
2071                page_stats.push(DescriptorPageStats {
2072                    page_pk: current_page_pk,
2073                    entry_count: header.entry_count,
2074                    page_size_bytes: page_blob.as_ref().len(),
2075                });
2076                current_page_pk = header.next_page_pk;
2077            }
2078
2079            all_stats.push(ColumnLayoutStats {
2080                field_id,
2081                total_rows: descriptor.total_row_count,
2082                total_chunks: descriptor.total_chunk_count,
2083                pages: page_stats,
2084            });
2085        }
2086        Ok(all_stats)
2087    }
2088}