llkv_column_map/store/
core.rs

1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4    ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16    constants::CATALOG_ROOT_PKEY,
17    pager::{BatchGet, BatchPut, GetResult, Pager},
18    serialization::{deserialize_array, serialize_array},
19    types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26/// Columnar storage engine for managing Arrow-based data.
27///
28/// `ColumnStore` provides the primary interface for persisting and retrieving columnar
29/// data using Apache Arrow [`RecordBatch`]es. It manages:
30///
31/// - Column descriptors and metadata (chunk locations, row counts, min/max values)
32/// - Data type caching for efficient schema queries
33/// - Index management (presence indexes, value indexes)
34/// - Integration with the [`Pager`] for persistent storage
35///
36/// # Namespaces
37///
38/// Columns are identified by [`LogicalFieldId`], which combines a namespace, table ID,
39/// and field ID. This prevents collisions between user data, row IDs, and MVCC metadata:
40///
41/// - `UserData`: Regular table columns
42/// - `RowIdShadow`: Internal row ID tracking
43/// - `TxnCreatedBy`: MVCC transaction creation timestamps
44/// - `TxnDeletedBy`: MVCC transaction deletion timestamps
45///
46/// # Thread Safety
47///
48/// `ColumnStore` is `Send + Sync` and can be safely shared across threads via `Arc`.
49/// Internal state (catalog, caches) uses `RwLock` for concurrent access.
50///
51/// # Test Harness Integration
52///
53/// - **SQLite `sqllogictest`**: Every upstream case exercises the column store, providing
54///   a compatibility baseline but not full parity with SQLite yet.
55/// - **DuckDB suites**: Early dialect-specific tests stress MVCC and typed casts, informing
56///   future work rather than proving comprehensive DuckDB coverage.
57/// - **Hardening mandate**: Failures uncovered by the suites result in storage fixes, not
58///   filtered tests, to preserve confidence in OLAP scenarios built atop this crate.
59pub struct ColumnStore<P: Pager> {
60    pub(crate) pager: Arc<P>,
61    pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
62    cfg: ColumnStoreConfig,
63    dtype_cache: DTypeCache<P>,
64    index_manager: IndexManager<P>,
65}
66
67impl<P> Clone for ColumnStore<P>
68where
69    P: Pager<Blob = EntryHandle> + Send + Sync,
70{
71    fn clone(&self) -> Self {
72        Self {
73            pager: Arc::clone(&self.pager),
74            catalog: Arc::clone(&self.catalog),
75            cfg: self.cfg.clone(),
76            dtype_cache: self.dtype_cache.clone(),
77            index_manager: self.index_manager.clone(),
78        }
79    }
80}
81
82impl<P> ColumnStore<P>
83where
84    P: Pager<Blob = EntryHandle> + Send + Sync,
85{
86    /// Opens or creates a `ColumnStore` using the provided pager.
87    ///
88    /// Loads the column catalog from the pager's root catalog key, or initializes
89    /// an empty catalog if none exists. The catalog maps [`LogicalFieldId`] to the
90    /// physical keys of column descriptors.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the pager fails to load the catalog or if deserialization fails.
95    pub fn open(pager: Arc<P>) -> Result<Self> {
96        let cfg = ColumnStoreConfig::default();
97        let catalog = match pager
98            .batch_get(&[BatchGet::Raw {
99                key: CATALOG_ROOT_PKEY,
100            }])?
101            .pop()
102        {
103            Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
104            _ => ColumnCatalog::default(),
105        };
106        let arc_catalog = Arc::new(RwLock::new(catalog));
107
108        let index_manager = IndexManager::new(Arc::clone(&pager));
109
110        Ok(Self {
111            pager: Arc::clone(&pager),
112            catalog: Arc::clone(&arc_catalog),
113            cfg,
114            dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
115            index_manager,
116        })
117    }
118
119    /// Creates and persists an index for a column.
120    ///
121    /// Builds the specified index type for all existing data in the column and
122    /// persists it atomically. The index will be maintained automatically on subsequent
123    /// appends and updates.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the column doesn't exist or if index creation fails.
128    pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
129        self.index_manager.register_index(self, field_id, kind)
130    }
131
132    /// Checks if a logical field is registered in the catalog.
133    pub fn has_field(&self, field_id: LogicalFieldId) -> bool {
134        let catalog = self.catalog.read().unwrap();
135        catalog.map.contains_key(&field_id)
136    }
137
138    /// Removes a persisted index from a column.
139    ///
140    /// Atomically removes the index and frees associated storage. The column data
141    /// itself is not affected.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the column or index doesn't exist.
146    pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
147        self.index_manager.unregister_index(self, field_id, kind)
148    }
149
150    /// Returns the Arrow data type of a column.
151    ///
152    /// Returns the data type from cache if available, otherwise loads it from
153    /// the column descriptor and caches it for future queries.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
158    pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
159        if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
160            return Ok(dt);
161        }
162        self.dtype_cache.dtype_for_field(field_id)
163    }
164
165    /// Updates the data type of an existing column.
166    ///
167    /// This updates the descriptor's data type fingerprint and cache. Note that this
168    /// does NOT migrate existing data - the caller must ensure data compatibility.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if the column doesn't exist or if the descriptor update fails.
173    pub fn update_data_type(
174        &self,
175        field_id: LogicalFieldId,
176        new_data_type: &DataType,
177    ) -> Result<()> {
178        // Get the descriptor physical key from catalog
179        let descriptor_pk = {
180            let catalog = self.catalog.read().unwrap();
181            *catalog.map.get(&field_id).ok_or_else(|| Error::NotFound)?
182        };
183
184        // Load the existing descriptor
185        let mut descriptor = match self
186            .pager
187            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
188            .pop()
189        {
190            Some(GetResult::Raw { bytes, .. }) => ColumnDescriptor::from_le_bytes(bytes.as_ref()),
191            _ => return Err(Error::NotFound),
192        };
193
194        // Update the data type fingerprint
195        let new_fingerprint = DTypeCache::<P>::dtype_fingerprint(new_data_type);
196        if new_fingerprint != 0 {
197            DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, new_fingerprint);
198        }
199
200        // Persist the updated descriptor
201        self.pager.batch_put(&[BatchPut::Raw {
202            key: descriptor_pk,
203            bytes: descriptor.to_le_bytes(),
204        }])?;
205
206        // Update the cache
207        self.dtype_cache.insert(field_id, new_data_type.clone());
208
209        Ok(())
210    }
211
212    /// Ensures that catalog entries and descriptors exist for a logical column.
213    ///
214    /// Primarily used when creating empty tables so that subsequent
215    /// operations (like `CREATE INDEX`) can resolve column metadata before any
216    /// data has been appended.
217    pub fn ensure_column_registered(
218        &self,
219        field_id: LogicalFieldId,
220        data_type: &DataType,
221    ) -> Result<()> {
222        let rid_field_id = rowid_fid(field_id);
223
224        let mut catalog_dirty = false;
225        let descriptor_pk;
226        let rid_descriptor_pk;
227
228        {
229            let mut catalog = self.catalog.write().unwrap();
230            descriptor_pk = if let Some(&pk) = catalog.map.get(&field_id) {
231                pk
232            } else {
233                let pk = self.pager.alloc_many(1)?[0];
234                catalog.map.insert(field_id, pk);
235                catalog_dirty = true;
236                pk
237            };
238
239            rid_descriptor_pk = if let Some(&pk) = catalog.map.get(&rid_field_id) {
240                pk
241            } else {
242                let pk = self.pager.alloc_many(1)?[0];
243                catalog.map.insert(rid_field_id, pk);
244                catalog_dirty = true;
245                pk
246            };
247        }
248
249        let mut puts: Vec<BatchPut> = Vec::new();
250
251        let data_descriptor_missing = self
252            .pager
253            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
254            .pop()
255            .and_then(|r| match r {
256                GetResult::Raw { bytes, .. } => Some(bytes),
257                _ => None,
258            })
259            .is_none();
260
261        if data_descriptor_missing {
262            let (mut descriptor, tail_page) =
263                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
264            let fingerprint = DTypeCache::<P>::dtype_fingerprint(data_type);
265            if fingerprint != 0 {
266                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut descriptor, fingerprint);
267            }
268            puts.push(BatchPut::Raw {
269                key: descriptor.tail_page_pk,
270                bytes: tail_page,
271            });
272            puts.push(BatchPut::Raw {
273                key: descriptor_pk,
274                bytes: descriptor.to_le_bytes(),
275            });
276        }
277
278        let rid_descriptor_missing = self
279            .pager
280            .batch_get(&[BatchGet::Raw {
281                key: rid_descriptor_pk,
282            }])?
283            .pop()
284            .and_then(|r| match r {
285                GetResult::Raw { bytes, .. } => Some(bytes),
286                _ => None,
287            })
288            .is_none();
289
290        if rid_descriptor_missing {
291            let (mut rid_descriptor, tail_page) = ColumnDescriptor::load_or_create(
292                Arc::clone(&self.pager),
293                rid_descriptor_pk,
294                rid_field_id,
295            )?;
296            let fingerprint = DTypeCache::<P>::dtype_fingerprint(&DataType::UInt64);
297            if fingerprint != 0 {
298                DTypeCache::<P>::set_desc_dtype_fingerprint(&mut rid_descriptor, fingerprint);
299            }
300            puts.push(BatchPut::Raw {
301                key: rid_descriptor.tail_page_pk,
302                bytes: tail_page,
303            });
304            puts.push(BatchPut::Raw {
305                key: rid_descriptor_pk,
306                bytes: rid_descriptor.to_le_bytes(),
307            });
308        }
309
310        self.dtype_cache.insert(field_id, data_type.clone());
311
312        if catalog_dirty {
313            let catalog_bytes = {
314                let catalog = self.catalog.read().unwrap();
315                catalog.to_bytes()
316            };
317            puts.push(BatchPut::Raw {
318                key: CATALOG_ROOT_PKEY,
319                bytes: catalog_bytes,
320            });
321        }
322
323        if !puts.is_empty() {
324            self.pager.batch_put(&puts)?;
325        }
326
327        Ok(())
328    }
329
330    /// Find all row IDs where a column satisfies a predicate.
331    ///
332    /// This evaluates the predicate against the column's data and returns a vector
333    /// of matching row IDs. Uses indexes and chunk metadata (min/max values) to
334    /// skip irrelevant data when possible.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
339    pub fn filter_row_ids<T>(
340        &self,
341        field_id: LogicalFieldId,
342        predicate: &Predicate<T::Value>,
343    ) -> Result<Vec<u64>>
344    where
345        T: FilterDispatch,
346    {
347        tracing::trace!(field=?field_id, "filter_row_ids start");
348        let res = T::run_filter(self, field_id, predicate);
349        if let Err(ref err) = res {
350            tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
351        } else {
352            tracing::trace!(field=?field_id, "filter_row_ids ok");
353        }
354        res
355    }
356
357    /// Evaluate a predicate against a column and return match metadata.
358    ///
359    /// This variant drives a primitive predicate over the column and returns a
360    /// [`FilterResult`] describing contiguous match regions. Callers can use the
361    /// result to build paginated scans or gather row identifiers.
362    ///
363    /// # Arguments
364    /// - `field_id`: Logical column to filter.
365    /// - `predicate`: Callable invoked on each value; should be cheap and free of
366    ///   side effects.
367    ///
368    /// # Errors
369    /// Returns an error if the column metadata cannot be loaded or if decoding a
370    /// chunk fails.
371    pub fn filter_matches<T, F>(
372        &self,
373        field_id: LogicalFieldId,
374        predicate: F,
375    ) -> Result<FilterResult>
376    where
377        T: FilterPrimitive,
378        F: FnMut(T::Native) -> bool,
379    {
380        T::run_filter_with_result(self, field_id, predicate)
381    }
382
383    /// List all indexes registered for a column.
384    ///
385    /// Returns the types of indexes (e.g., presence, value) that are currently
386    /// persisted for the specified column.
387    ///
388    /// # Errors
389    ///
390    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
391    pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
392        let catalog = self.catalog.read().unwrap();
393        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
394
395        let desc_blob = self
396            .pager
397            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
398            .pop()
399            .and_then(|r| match r {
400                GetResult::Raw { bytes, .. } => Some(bytes),
401                _ => None,
402            })
403            .ok_or(Error::NotFound)?;
404        let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
405
406        let kinds = descriptor.get_indexes()?;
407        Ok(kinds)
408    }
409
410    /// Get the total number of rows in a column.
411    ///
412    /// Returns the persisted row count from the column's descriptor. This value is
413    /// updated by append and delete operations.
414    ///
415    /// # Errors
416    ///
417    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
418    pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
419        let catalog = self.catalog.read().unwrap();
420        let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
421        drop(catalog);
422
423        let desc_blob = self
424            .pager
425            .batch_get(&[BatchGet::Raw { key: desc_pk }])?
426            .pop()
427            .and_then(|r| match r {
428                GetResult::Raw { bytes, .. } => Some(bytes),
429                _ => None,
430            })
431            .ok_or(Error::NotFound)?;
432
433        let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
434        Ok(desc.total_row_count)
435    }
436
437    /// Get the total number of rows in a table.
438    ///
439    /// This returns the maximum row count across all user-data columns in the table.
440    /// If the table has no persisted columns, returns `0`.
441    ///
442    /// # Errors
443    ///
444    /// Returns an error if column descriptors cannot be loaded.
445    pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
446        use crate::types::Namespace;
447        // Acquire read lock on catalog and find any matching user-data field
448        let catalog = self.catalog.read().unwrap();
449        // Collect all user-data logical field ids for this table.
450        let candidates: Vec<LogicalFieldId> = catalog
451            .map
452            .keys()
453            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
454            .copied()
455            .collect();
456        drop(catalog);
457
458        if candidates.is_empty() {
459            return Ok(0);
460        }
461
462        // Return the maximum total_row_count across all user columns for the table.
463        let mut max_rows: u64 = 0;
464        for field in candidates {
465            let rows = self.total_rows_for_field(field)?;
466            if rows > max_rows {
467                max_rows = rows;
468            }
469        }
470        Ok(max_rows)
471    }
472
473    /// Get all user-data column IDs for a table.
474    ///
475    /// This returns the [`LogicalFieldId`]s of all persisted user columns (namespace
476    /// `UserData`) belonging to the specified table. MVCC and row ID columns are not
477    /// included.
478    pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
479        use crate::types::Namespace;
480
481        let catalog = self.catalog.read().unwrap();
482        catalog
483            .map
484            .keys()
485            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
486            .copied()
487            .collect()
488    }
489
490    /// Check whether a specific row ID exists in a column.
491    ///
492    /// This uses presence indexes and binary search when available for fast lookups.
493    /// If no presence index exists, it scans chunks and uses min/max metadata to
494    /// prune irrelevant data.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
499    pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
500        let rid_fid = rowid_fid(field_id);
501        let catalog = self.catalog.read().unwrap();
502        let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
503        let rid_desc_blob = self
504            .pager
505            .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
506            .pop()
507            .and_then(|r| match r {
508                GetResult::Raw { bytes, .. } => Some(bytes),
509                _ => None,
510            })
511            .ok_or(Error::NotFound)?;
512        let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
513        drop(catalog);
514
515        // Walk metas; prune by min/max when available.
516        for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
517            let meta = m?;
518            if meta.row_count == 0 {
519                continue;
520            }
521            if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
522                || row_id > meta.max_val_u64
523            {
524                continue;
525            }
526            // Fetch rid chunk and, if present, the presence perm
527            let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
528            if meta.value_order_perm_pk != 0 {
529                gets.push(BatchGet::Raw {
530                    key: meta.value_order_perm_pk,
531                });
532            }
533            let results = self.pager.batch_get(&gets)?;
534            let mut rid_blob: Option<EntryHandle> = None;
535            let mut perm_blob: Option<EntryHandle> = None;
536            for r in results {
537                if let GetResult::Raw { key, bytes } = r {
538                    if key == meta.chunk_pk {
539                        rid_blob = Some(bytes);
540                    } else if key == meta.value_order_perm_pk {
541                        perm_blob = Some(bytes);
542                    }
543                }
544            }
545            // If the rid blob for this chunk is missing, treat as absent and continue
546            let Some(rid_blob) = rid_blob else { continue };
547            let rid_any = deserialize_array(rid_blob)?;
548            let rids = rid_any
549                .as_any()
550                .downcast_ref::<UInt64Array>()
551                .ok_or_else(|| Error::Internal("rid downcast".into()))?;
552            if let Some(pblob) = perm_blob {
553                let perm_any = deserialize_array(pblob)?;
554                let perm = perm_any
555                    .as_any()
556                    .downcast_ref::<UInt32Array>()
557                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
558                // Binary search over sorted-by-perm view
559                let mut lo: isize = 0;
560                let mut hi: isize = (perm.len() as isize) - 1;
561                while lo <= hi {
562                    let mid = ((lo + hi) >> 1) as usize;
563                    let rid = rids.value(perm.value(mid) as usize);
564                    if rid == row_id {
565                        return Ok(true);
566                    } else if rid < row_id {
567                        lo = mid as isize + 1;
568                    } else {
569                        hi = mid as isize - 1;
570                    }
571                }
572            } else {
573                // Assume rid chunk is sorted ascending (common for appends/compaction) and binary search
574                let mut lo: isize = 0;
575                let mut hi: isize = (rids.len() as isize) - 1;
576                while lo <= hi {
577                    let mid = ((lo + hi) >> 1) as usize;
578                    let rid = rids.value(mid);
579                    if rid == row_id {
580                        return Ok(true);
581                    } else if rid < row_id {
582                        lo = mid as isize + 1;
583                    } else {
584                        hi = mid as isize - 1;
585                    }
586                }
587            }
588        }
589        Ok(false)
590    }
591
592    // NOTE: Row IDs must be provided explicitly today. Consider introducing an
593    // opt-in auto-increment mode once table-level schema metadata can enforce it.
594    /// Append a [`RecordBatch`] to the store.
595    ///
596    /// The batch must include a `rowid` column (type `UInt64`) that uniquely identifies
597    /// each row. Each other column must have `field_id` metadata mapping it to a
598    /// [`LogicalFieldId`].
599    ///
600    /// # Last-Write-Wins Updates
601    ///
602    /// If any row IDs in the batch already exist, they are updated in-place (overwritten)
603    /// rather than creating duplicates. This happens in a separate transaction before
604    /// appending new rows.
605    ///
606    /// # Row ID Ordering
607    ///
608    /// The batch is automatically sorted by `rowid` if not already sorted. This ensures
609    /// efficient metadata updates and naturally sorted shadow columns.
610    ///
611    /// # Table Separation
612    ///
613    /// Each batch should contain columns from only one table. To append to multiple
614    /// tables, call `append` separately for each table's batch (may be concurrent).
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if:
619    /// - The batch is missing the `rowid` column
620    /// - Column metadata is missing or invalid
621    /// - Storage operations fail
622    #[allow(unused_variables, unused_assignments)] // NOTE: Preserve variable hooks used during feature gating of presence indexes.
623    pub fn append(&self, batch: &RecordBatch) -> Result<()> {
624        tracing::trace!(
625            num_columns = batch.num_columns(),
626            num_rows = batch.num_rows(),
627            "ColumnStore::append BEGIN"
628        );
629        // --- PHASE 1: PRE-PROCESSING THE INCOMING BATCH ---
630        // The `append` logic relies on row IDs being processed in ascending order to handle
631        // metadata updates efficiently and to ensure the shadow row_id chunks are naturally sorted.
632        // This block checks if the incoming batch is already sorted by `row_id`. If not, it creates a
633        // new, sorted `RecordBatch` to work with for the rest of the function.
634        let working_batch: RecordBatch;
635        let batch_ref = {
636            let schema = batch.schema();
637            let row_id_idx = schema
638                .index_of(ROW_ID_COLUMN_NAME)
639                .map_err(|_| Error::Internal("row_id column required".into()))?;
640            let row_id_any = batch.column(row_id_idx).clone();
641            let row_id_arr = row_id_any
642                .as_any()
643                .downcast_ref::<UInt64Array>()
644                .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
645
646            // Manually check if the row_id column is sorted.
647            let mut is_sorted = true;
648            if !row_id_arr.is_empty() {
649                let mut last = row_id_arr.value(0);
650                for i in 1..row_id_arr.len() {
651                    let current = row_id_arr.value(i);
652                    if current < last {
653                        is_sorted = false;
654                        break;
655                    }
656                    last = current;
657                }
658            }
659
660            // If sorted, we can use the original batch directly.
661            // Otherwise, we compute a permutation and reorder all columns.
662            if is_sorted {
663                batch
664            } else {
665                let sort_col = SortColumn {
666                    values: row_id_any,
667                    options: None,
668                };
669                let idx = lexsort_to_indices(&[sort_col], None)?;
670                let perm = idx
671                    .as_any()
672                    .downcast_ref::<UInt32Array>()
673                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
674                let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
675                for i in 0..batch.num_columns() {
676                    cols.push(compute::take(batch.column(i), perm, None)?);
677                }
678                working_batch = RecordBatch::try_new(schema.clone(), cols)
679                    .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
680                &working_batch
681            }
682        };
683
684        tracing::trace!("ColumnStore::append PHASE 1 complete - batch preprocessed");
685
686        // --- PHASE 2: LAST-WRITER-WINS (LWW) REWRITE ---
687        // This phase handles updates. It identifies any rows in the incoming batch that
688        // already exist in the store and rewrites them in-place. This is a separate
689        // transaction that happens before the main append of new rows.
690        let schema = batch_ref.schema();
691        let row_id_idx = schema
692            .index_of(ROW_ID_COLUMN_NAME)
693            .map_err(|_| Error::Internal("row_id column required".into()))?;
694
695        // Create a quick lookup map of incoming row IDs to their positions in the batch.
696        let row_id_arr = batch_ref
697            .column(row_id_idx)
698            .as_any()
699            .downcast_ref::<UInt64Array>()
700            .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
701        let mut incoming_ids_map = FxHashMap::default();
702        incoming_ids_map.reserve(row_id_arr.len());
703        for i in 0..row_id_arr.len() {
704            incoming_ids_map.insert(row_id_arr.value(i), i);
705        }
706
707        // These variables will track the state of the LWW transaction.
708        let mut catalog_dirty = false;
709        let mut puts_rewrites: Vec<BatchPut> = Vec::new();
710        let mut all_rewritten_ids = FxHashSet::default();
711
712        // Iterate through each column in the batch (except row_id) to perform rewrites.
713        let mut catalog_lock = self.catalog.write().unwrap();
714        for i in 0..batch_ref.num_columns() {
715            if i == row_id_idx {
716                continue;
717            }
718            let field = schema.field(i);
719            if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
720                let field_id = field_id_str
721                    .parse::<u64>()
722                    .map(LogicalFieldId::from)
723                    .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
724
725                // `lww_rewrite_for_field` finds overlapping row IDs and rewrites the data chunks.
726                // It returns the set of row IDs that were updated.
727                let rewritten = self.lww_rewrite_for_field(
728                    &mut catalog_lock,
729                    field_id,
730                    &incoming_ids_map,
731                    batch_ref.column(i),
732                    batch_ref.column(row_id_idx),
733                    &mut puts_rewrites,
734                )?;
735                all_rewritten_ids.extend(rewritten);
736            }
737        }
738        drop(catalog_lock);
739
740        // Commit the LWW changes to the pager immediately.
741        if !puts_rewrites.is_empty() {
742            self.pager.batch_put(&puts_rewrites)?;
743        }
744
745        tracing::trace!("ColumnStore::append PHASE 2 complete - LWW rewrites done");
746
747        // --- PHASE 3: FILTERING FOR NEW ROWS ---
748        // After handling updates, we filter the incoming batch to remove the rows that were
749        // just rewritten. The remaining rows are guaranteed to be new additions to the store.
750        let batch_to_append = if !all_rewritten_ids.is_empty() {
751            let keep_mask: Vec<bool> = (0..row_id_arr.len())
752                .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
753                .collect();
754            let keep_array = BooleanArray::from(keep_mask);
755            compute::filter_record_batch(batch_ref, &keep_array)?
756        } else {
757            batch_ref.clone()
758        };
759
760        // If no new rows are left, we are done.
761        if batch_to_append.num_rows() == 0 {
762            tracing::trace!("ColumnStore::append early exit - no new rows to append");
763            return Ok(());
764        }
765
766        tracing::trace!("ColumnStore::append PHASE 3 complete - filtered for new rows");
767
768        // --- PHASE 4: APPENDING NEW DATA ---
769        // This is the main append transaction. All writes generated in this phase will be
770        // collected and committed atomically at the very end.
771        let append_schema = batch_to_append.schema();
772        let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
773        let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
774        let mut puts_appends: Vec<BatchPut> = Vec::new();
775
776        // Loop through each column of the filtered batch to append its data.
777        for (i, array) in batch_to_append.columns().iter().enumerate() {
778            if i == append_row_id_idx {
779                continue;
780            }
781
782            let field = append_schema.field(i);
783
784            let field_id = field
785                .metadata()
786                .get(crate::store::FIELD_ID_META_KEY)
787                .ok_or_else(|| Error::Internal("Missing field_id".into()))?
788                .parse::<u64>()
789                .map(LogicalFieldId::from)
790                .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
791
792            // Populate the data type cache for this column. This is a performance optimization
793            // to avoid reading a chunk from storage later just to determine its type.
794            self.dtype_cache.insert(field_id, field.data_type().clone());
795
796            // Null values are treated as deletions, so we filter them out. The `rids_clean`
797            // array contains the row IDs corresponding to the non-null values.
798            let (array_clean, rids_clean) = if array.null_count() == 0 {
799                (array.clone(), append_row_id_any.clone())
800            } else {
801                let keep =
802                    BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
803                let a = compute::filter(array, &keep)?;
804                let r = compute::filter(&append_row_id_any, &keep)?;
805                (a, r)
806            };
807
808            if array_clean.is_empty() {
809                continue;
810            }
811
812            // Get or create the physical keys for the column's data descriptor and its
813            // shadow row_id descriptor from the catalog.
814            let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
815                let mut catalog = self.catalog.write().unwrap();
816                let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
817                    catalog_dirty = true;
818                    self.pager.alloc_many(1).unwrap()[0]
819                });
820                let r_fid = rowid_fid(field_id);
821                let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
822                    catalog_dirty = true;
823                    self.pager.alloc_many(1).unwrap()[0]
824                });
825                (pk1, pk2, r_fid)
826            };
827
828            // Load the descriptors and their tail metadata pages into memory.
829            // If they don't exist, `load_or_create` will initialize new ones.
830            let (mut data_descriptor, mut data_tail_page) =
831                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)
832                    .map_err(|e| {
833                        tracing::error!(
834                            ?field_id,
835                            descriptor_pk,
836                            error = ?e,
837                            "append: load_or_create failed for data descriptor"
838                        );
839                        e
840                    })?;
841            let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
842                Arc::clone(&self.pager),
843                rid_descriptor_pk,
844                rid_fid,
845            )
846            .map_err(|e| {
847                tracing::error!(
848                    ?rid_fid,
849                    rid_descriptor_pk,
850                    error = ?e,
851                    "append: load_or_create failed for rid descriptor"
852                );
853                e
854            })?;
855
856            // Logically register the Presence index on the main data descriptor. This ensures
857            // that even if no physical index chunks are created (because data arrived sorted),
858            // the system knows a Presence index is conceptually active for this column.
859            self.index_manager
860                .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
861
862            // Split the data to be appended into chunks of a target size.
863            let slices = split_to_target_bytes(
864                &array_clean,
865                TARGET_CHUNK_BYTES,
866                self.cfg.varwidth_fallback_rows_per_slice,
867            );
868            let mut row_off = 0usize;
869
870            // Loop through each new slice to create and stage its chunks.
871            for s in slices {
872                let rows = s.len();
873                // Create and stage the data chunk.
874                let data_pk = self.pager.alloc_many(1)?[0];
875                let s_norm = zero_offset(&s);
876                let data_bytes = serialize_array(s_norm.as_ref())?;
877                puts_appends.push(BatchPut::Raw {
878                    key: data_pk,
879                    bytes: data_bytes,
880                });
881
882                // Create and stage the corresponding row_id chunk.
883                let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
884                let rid_norm = zero_offset(&rid_slice);
885                let rid_pk = self.pager.alloc_many(1)?[0];
886                let rid_bytes = serialize_array(rid_norm.as_ref())?;
887                puts_appends.push(BatchPut::Raw {
888                    key: rid_pk,
889                    bytes: rid_bytes,
890                });
891
892                // Compute min/max for the row_id chunk to enable pruning during scans.
893                let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
894                let (min, max) = if !rids_for_meta.is_empty() {
895                    let mut min_val = rids_for_meta.value(0);
896                    let mut max_val = rids_for_meta.value(0);
897                    for i in 1..rids_for_meta.len() {
898                        let v = rids_for_meta.value(i);
899                        if v < min_val {
900                            min_val = v;
901                        }
902                        if v > max_val {
903                            max_val = v;
904                        }
905                    }
906                    (min_val, max_val)
907                } else {
908                    (0, 0)
909                };
910
911                // Create the initial metadata for both chunks.
912                // The `value_order_perm_pk` is initialized to 0 (None).
913                let mut data_meta = ChunkMetadata {
914                    chunk_pk: data_pk,
915                    row_count: rows as u64,
916                    serialized_bytes: s_norm.get_array_memory_size() as u64,
917                    max_val_u64: u64::MAX,
918                    ..Default::default()
919                };
920                let mut rid_meta = ChunkMetadata {
921                    chunk_pk: rid_pk,
922                    row_count: rows as u64,
923                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
924                    min_val_u64: min,
925                    max_val_u64: max,
926                    ..Default::default()
927                };
928
929                // **GENERIC INDEX UPDATE DISPATCH**
930                // This is the single, index-agnostic call. The IndexManager will look up all
931                // active indexes for this column (e.g., Presence, Sort) and call their respective
932                // `stage_update_for_new_chunk` methods. This is where the physical index data
933                // (like permutation blobs) is created and staged.
934                self.index_manager.stage_updates_for_new_chunk(
935                    field_id,
936                    &data_descriptor,
937                    &s_norm,
938                    &rid_norm,
939                    &mut data_meta,
940                    &mut rid_meta,
941                    &mut puts_appends,
942                )?;
943
944                // Append the (potentially modified) metadata to their respective descriptor chains.
945                self.append_meta_in_loop(
946                    &mut data_descriptor,
947                    &mut data_tail_page,
948                    data_meta,
949                    &mut puts_appends,
950                )?;
951                self.append_meta_in_loop(
952                    &mut rid_descriptor,
953                    &mut rid_tail_page,
954                    rid_meta,
955                    &mut puts_appends,
956                )?;
957                row_off += rows;
958            }
959
960            // After processing all slices, stage the final writes for the updated tail pages
961            // and the root descriptor objects themselves.
962            puts_appends.push(BatchPut::Raw {
963                key: data_descriptor.tail_page_pk,
964                bytes: data_tail_page,
965            });
966            puts_appends.push(BatchPut::Raw {
967                key: descriptor_pk,
968                bytes: data_descriptor.to_le_bytes(),
969            });
970            puts_appends.push(BatchPut::Raw {
971                key: rid_descriptor.tail_page_pk,
972                bytes: rid_tail_page,
973            });
974            puts_appends.push(BatchPut::Raw {
975                key: rid_descriptor_pk,
976                bytes: rid_descriptor.to_le_bytes(),
977            });
978        }
979
980        // --- PHASE 5: FINAL ATOMIC COMMIT ---
981        // If the catalog was modified (e.g., new columns were created), stage its write.
982        if catalog_dirty {
983            let catalog = self.catalog.read().unwrap();
984            puts_appends.push(BatchPut::Raw {
985                key: CATALOG_ROOT_PKEY,
986                bytes: catalog.to_bytes(),
987            });
988        }
989
990        // Commit all staged puts (new data chunks, new row_id chunks, new index permutations,
991        // updated descriptor pages, updated root descriptors, and the updated catalog)
992        // in a single atomic operation.
993        if !puts_appends.is_empty() {
994            self.pager.batch_put(&puts_appends)?;
995        }
996        tracing::trace!("ColumnStore::append END - success");
997        Ok(())
998    }
999
1000    fn lww_rewrite_for_field(
1001        &self,
1002        catalog: &mut ColumnCatalog,
1003        field_id: LogicalFieldId,
1004        incoming_ids_map: &FxHashMap<u64, usize>,
1005        incoming_data: &ArrayRef,
1006        incoming_row_ids: &ArrayRef,
1007        puts: &mut Vec<BatchPut>,
1008    ) -> Result<FxHashSet<u64>> {
1009        use crate::store::descriptor::DescriptorIterator;
1010        use crate::store::ingest::ChunkEdit;
1011
1012        // Fast exit if nothing to rewrite.
1013        if incoming_ids_map.is_empty() {
1014            return Ok(FxHashSet::default());
1015        }
1016        let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
1017
1018        // Resolve descriptors for data and row_id columns.
1019        let desc_pk_data = match catalog.map.get(&field_id) {
1020            Some(pk) => *pk,
1021            None => return Ok(FxHashSet::default()),
1022        };
1023        let rid_fid = rowid_fid(field_id);
1024        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1025            Some(pk) => *pk,
1026            None => return Ok(FxHashSet::default()),
1027        };
1028
1029        // Batch fetch both descriptors.
1030        let gets = vec![
1031            BatchGet::Raw { key: desc_pk_data },
1032            BatchGet::Raw { key: desc_pk_rid },
1033        ];
1034        let results = self.pager.batch_get(&gets)?;
1035        let mut blobs_by_pk = FxHashMap::default();
1036        for r in results {
1037            if let GetResult::Raw { key, bytes } = r {
1038                blobs_by_pk.insert(key, bytes);
1039            }
1040        }
1041
1042        let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or_else(|| {
1043            tracing::error!(
1044                ?field_id,
1045                desc_pk_data,
1046                "lww_rewrite: data descriptor blob not found in pager"
1047            );
1048            Error::NotFound
1049        })?;
1050        let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
1051
1052        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or_else(|| {
1053            tracing::error!(
1054                ?rid_fid,
1055                desc_pk_rid,
1056                "lww_rewrite: rid descriptor blob not found in pager"
1057            );
1058            Error::NotFound
1059        })?;
1060        let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1061
1062        tracing::trace!(?field_id, "lww_rewrite: descriptors loaded successfully");
1063
1064        // Collect chunk metadata.
1065        let mut metas_data: Vec<ChunkMetadata> = Vec::new();
1066        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1067        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
1068            metas_data.push(m.map_err(|e| {
1069                tracing::error!(?field_id, error = ?e, "lww_rewrite: failed to iterate data descriptor");
1070                e
1071            })?);
1072        }
1073        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
1074            metas_rid.push(m.map_err(|e| {
1075                tracing::error!(?rid_fid, error = ?e, "lww_rewrite: failed to iterate rid descriptor");
1076                e
1077            })?);
1078        }
1079
1080        tracing::trace!(
1081            ?field_id,
1082            data_chunks = metas_data.len(),
1083            rid_chunks = metas_rid.len(),
1084            "lww_rewrite: chunk metadata collected"
1085        );
1086
1087        // Classify incoming rows: delete vs upsert.
1088        let rid_in = incoming_row_ids
1089            .as_any()
1090            .downcast_ref::<UInt64Array>()
1091            .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
1092        let mut ids_to_delete = FxHashSet::default();
1093        let mut ids_to_upsert = FxHashSet::default();
1094        for i in 0..rid_in.len() {
1095            let rid = rid_in.value(i);
1096            if incoming_data.is_null(i) {
1097                ids_to_delete.insert(rid);
1098            } else {
1099                ids_to_upsert.insert(rid);
1100            }
1101        }
1102
1103        // Scan row_id chunks to find hits, bucketed by chunk index.
1104        let mut rewritten_ids = FxHashSet::default();
1105        let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1106        let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
1107
1108        let n = metas_data.len().min(metas_rid.len());
1109        if n > 0 {
1110            // Fetch only row_id chunks to locate matches.
1111            let mut gets_rid = Vec::with_capacity(n);
1112            for rm in metas_rid.iter().take(n) {
1113                gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
1114            }
1115            let rid_results = self.pager.batch_get(&gets_rid)?;
1116            let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1117            for r in rid_results {
1118                if let GetResult::Raw { key, bytes } = r {
1119                    rid_blobs.insert(key, bytes);
1120                }
1121            }
1122
1123            for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
1124                if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
1125                    let rid_arr_any = deserialize_array(rid_blob.clone())?;
1126                    let rid_arr = rid_arr_any
1127                        .as_any()
1128                        .downcast_ref::<UInt64Array>()
1129                        .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
1130                    for j in 0..rid_arr.len() {
1131                        let rid = rid_arr.value(j);
1132                        if incoming_ids.contains(&rid) {
1133                            if ids_to_delete.contains(&rid) {
1134                                hit_del.entry(i).or_default().push(rid);
1135                            } else if ids_to_upsert.contains(&rid) {
1136                                hit_up.entry(i).or_default().push(rid);
1137                            }
1138                            rewritten_ids.insert(rid);
1139                        }
1140                    }
1141                }
1142            }
1143        }
1144
1145        if hit_up.is_empty() && hit_del.is_empty() {
1146            return Ok(rewritten_ids);
1147        }
1148
1149        // Batch fetch data+rid blobs for all chunks with hits.
1150        let mut hit_set = FxHashSet::default();
1151        hit_set.extend(hit_up.keys().copied());
1152        hit_set.extend(hit_del.keys().copied());
1153        let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
1154
1155        let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
1156        for &i in &hit_idxs {
1157            gets.push(BatchGet::Raw {
1158                key: metas_data[i].chunk_pk,
1159            });
1160            gets.push(BatchGet::Raw {
1161                key: metas_rid[i].chunk_pk,
1162            });
1163        }
1164        let results = self.pager.batch_get(&gets)?;
1165        let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1166        for r in results {
1167            if let GetResult::Raw { key, bytes } = r {
1168                blob_map.insert(key, bytes);
1169            }
1170        }
1171
1172        // Apply per-chunk edits and stage writebacks.
1173        for i in hit_idxs {
1174            let old_data_arr =
1175                deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
1176            let old_rid_arr_any =
1177                deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
1178            let old_rid_arr = old_rid_arr_any
1179                .as_any()
1180                .downcast_ref::<UInt64Array>()
1181                .unwrap();
1182
1183            let up_vec = hit_up.remove(&i).unwrap_or_default();
1184            let del_vec = hit_del.remove(&i).unwrap_or_default();
1185
1186            // Centralized LWW edit: builds keep mask and inject tails.
1187            let edit = ChunkEdit::from_lww_upsert(
1188                old_rid_arr,
1189                &up_vec,
1190                &del_vec,
1191                incoming_data,
1192                incoming_row_ids,
1193                incoming_ids_map,
1194            )?;
1195
1196            let (new_data_arr, new_rid_arr) =
1197                ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
1198
1199            // Stage data writeback.
1200            let data_bytes = serialize_array(&new_data_arr)?;
1201            puts.push(BatchPut::Raw {
1202                key: metas_data[i].chunk_pk,
1203                bytes: data_bytes,
1204            });
1205            metas_data[i].row_count = new_data_arr.len() as u64;
1206            metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1207
1208            // Stage row_id writeback.
1209            if let Some(rarr) = new_rid_arr {
1210                let rid_bytes = serialize_array(&rarr)?;
1211                puts.push(BatchPut::Raw {
1212                    key: metas_rid[i].chunk_pk,
1213                    bytes: rid_bytes,
1214                });
1215                metas_rid[i].row_count = rarr.len() as u64;
1216                metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
1217            }
1218
1219            // Refresh permutation if present.
1220            if metas_data[i].value_order_perm_pk != 0 {
1221                let sort_col = SortColumn {
1222                    values: new_data_arr,
1223                    options: None,
1224                };
1225                let idx = lexsort_to_indices(&[sort_col], None)?;
1226                let perm_bytes = serialize_array(&idx)?;
1227                puts.push(BatchPut::Raw {
1228                    key: metas_data[i].value_order_perm_pk,
1229                    bytes: perm_bytes,
1230                });
1231            }
1232        }
1233
1234        // Rewrite descriptor chains/totals for both columns.
1235        descriptor_data.rewrite_pages(
1236            Arc::clone(&self.pager),
1237            desc_pk_data,
1238            &mut metas_data,
1239            puts,
1240        )?;
1241        descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
1242
1243        Ok(rewritten_ids)
1244    }
1245
1246    fn stage_delete_rows_for_field(
1247        &self,
1248        field_id: LogicalFieldId,
1249        rows_to_delete: &[RowId],
1250        staged_puts: &mut Vec<BatchPut>,
1251    ) -> Result<bool> {
1252        tracing::warn!(
1253            field_id = ?field_id,
1254            rows = rows_to_delete.len(),
1255            "delete_rows stage_delete_rows_for_field: start"
1256        );
1257        use crate::store::descriptor::DescriptorIterator;
1258        use crate::store::ingest::ChunkEdit;
1259
1260        if rows_to_delete.is_empty() {
1261            return Ok(false);
1262        }
1263
1264        // Stream and validate ascending, unique positions.
1265        let mut del_iter = rows_to_delete.iter().copied();
1266        let mut cur_del = del_iter.next();
1267        let mut last_seen: Option<u64> = cur_del;
1268
1269        // Lookup descriptors (data and optional row_id).
1270        let catalog = self.catalog.read().unwrap();
1271        let desc_pk = match catalog.map.get(&field_id) {
1272            Some(pk) => *pk,
1273            None => {
1274                tracing::trace!(
1275                    field_id = ?field_id,
1276                    "delete_rows stage_delete_rows_for_field: data descriptor missing"
1277                );
1278                return Err(Error::NotFound);
1279            }
1280        };
1281        let rid_fid = rowid_fid(field_id);
1282        let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1283        tracing::warn!(
1284            field_id = ?field_id,
1285            desc_pk,
1286            desc_pk_rid = ?desc_pk_rid,
1287            "delete_rows stage_delete_rows_for_field: descriptor keys"
1288        );
1289
1290        // Batch fetch descriptor blobs up front.
1291        let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1292        if let Some(pk) = desc_pk_rid {
1293            gets.push(BatchGet::Raw { key: pk });
1294        }
1295        let results = match self.pager.batch_get(&gets) {
1296            Ok(res) => res,
1297            Err(err) => {
1298                tracing::trace!(
1299                    field_id = ?field_id,
1300                    error = ?err,
1301                    "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1302                );
1303                return Err(err);
1304            }
1305        };
1306        let mut blobs_by_pk = FxHashMap::default();
1307        for res in results {
1308            if let GetResult::Raw { key, bytes } = res {
1309                blobs_by_pk.insert(key, bytes);
1310            }
1311        }
1312
1313        tracing::warn!(
1314            field_id = ?field_id,
1315            desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1316            rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1317            "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1318        );
1319
1320        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1321            tracing::trace!(
1322                field_id = ?field_id,
1323                desc_pk,
1324                "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1325            );
1326            Error::Internal(format!(
1327                "descriptor pk={} missing during delete_rows for field {:?}",
1328                desc_pk, field_id
1329            ))
1330        })?;
1331        let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1332
1333        // Build metas for data column.
1334        let mut metas: Vec<ChunkMetadata> = Vec::new();
1335        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1336            metas.push(m?);
1337        }
1338        if metas.is_empty() {
1339            drop(catalog);
1340            return Ok(false);
1341        }
1342
1343        // Optionally mirror metas for row_id column.
1344        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1345        let mut descriptor_rid: Option<ColumnDescriptor> = None;
1346        tracing::warn!(
1347            field_id = ?field_id,
1348            metas_len = metas.len(),
1349            desc_pk_rid = ?desc_pk_rid,
1350            "delete_rows stage_delete_rows_for_field: data metas loaded"
1351        );
1352        if let Some(pk_rid) = desc_pk_rid
1353            && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1354        {
1355            let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1356            for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1357                metas_rid.push(m?);
1358            }
1359            descriptor_rid = Some(d_rid);
1360        }
1361
1362        tracing::warn!(
1363            field_id = ?field_id,
1364            metas_rid_len = metas_rid.len(),
1365            "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1366        );
1367
1368        let mut cum_rows = 0u64;
1369        let mut any_changed = false;
1370
1371        for (i, meta) in metas.iter_mut().enumerate() {
1372            let start_u64 = cum_rows;
1373            let end_u64 = start_u64 + meta.row_count;
1374
1375            // Advance deletes into this chunk window [start, end).
1376            while let Some(d) = cur_del {
1377                if d < start_u64
1378                    && let Some(prev) = last_seen
1379                {
1380                    if d < prev {
1381                        return Err(Error::Internal(
1382                            "rows_to_delete must be ascending/unique".into(),
1383                        ));
1384                    }
1385
1386                    last_seen = Some(d);
1387                    cur_del = del_iter.next();
1388                } else {
1389                    break;
1390                }
1391            }
1392
1393            // Collect local delete indices.
1394            let rows = meta.row_count as usize;
1395            let mut del_local: FxHashSet<usize> = FxHashSet::default();
1396            while let Some(d) = cur_del {
1397                if d >= end_u64 {
1398                    break;
1399                }
1400                del_local.insert((d - start_u64) as usize);
1401                last_seen = Some(d);
1402                cur_del = del_iter.next();
1403            }
1404
1405            if del_local.is_empty() {
1406                cum_rows = end_u64;
1407                continue;
1408            }
1409
1410            // Batch get chunk blobs (data and optional row_id).
1411            let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1412            if let Some(rm) = metas_rid.get(i) {
1413                chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1414            }
1415            let chunk_results = match self.pager.batch_get(&chunk_gets) {
1416                Ok(res) => res,
1417                Err(err) => {
1418                    tracing::trace!(
1419                        field_id = ?field_id,
1420                        chunk_pk = meta.chunk_pk,
1421                        error = ?err,
1422                        "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1423                    );
1424                    return Err(err);
1425                }
1426            };
1427            let mut chunk_blobs = FxHashMap::default();
1428            for res in chunk_results {
1429                if let GetResult::Raw { key, bytes } = res {
1430                    chunk_blobs.insert(key, bytes);
1431                }
1432            }
1433
1434            tracing::warn!(
1435                field_id = ?field_id,
1436                chunk_pk = meta.chunk_pk,
1437                rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1438                data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1439                rid_found = metas_rid
1440                    .get(i)
1441                    .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1442                "delete_rows stage_delete_rows_for_field: chunk fetch status"
1443            );
1444
1445            let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1446                Some(bytes) => bytes,
1447                None => {
1448                    tracing::trace!(
1449                        field_id = ?field_id,
1450                        chunk_pk = meta.chunk_pk,
1451                        "delete_rows stage_delete_rows_for_field: chunk missing"
1452                    );
1453                    return Err(Error::NotFound);
1454                }
1455            };
1456            let data_arr = deserialize_array(data_blob)?;
1457
1458            let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1459                let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1460                    Some(bytes) => bytes,
1461                    None => {
1462                        tracing::trace!(
1463                            field_id = ?field_id,
1464                            rowid_chunk_pk = rm.chunk_pk,
1465                            "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1466                        );
1467                        return Err(Error::NotFound);
1468                    }
1469                };
1470                Some(deserialize_array(rid_blob)?)
1471            } else {
1472                None
1473            };
1474
1475            // *** Wired: build edit via ingest helper.
1476            let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1477
1478            // Apply edit (pure array ops).
1479            let (new_data_arr, new_rid_arr) =
1480                ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1481
1482            // Write back data.
1483            let data_bytes = serialize_array(&new_data_arr)?;
1484            staged_puts.push(BatchPut::Raw {
1485                key: meta.chunk_pk,
1486                bytes: data_bytes,
1487            });
1488            meta.row_count = new_data_arr.len() as u64;
1489            meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1490
1491            // Write back row_ids if present.
1492            if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1493                let rm = metas_rid.get_mut(i).unwrap();
1494                let rid_bytes = serialize_array(&rids)?;
1495                staged_puts.push(BatchPut::Raw {
1496                    key: rm.chunk_pk,
1497                    bytes: rid_bytes,
1498                });
1499                rm.row_count = rids.len() as u64;
1500                rm.serialized_bytes = rids.get_array_memory_size() as u64;
1501            }
1502
1503            // Refresh permutation if this chunk has one.
1504            if meta.value_order_perm_pk != 0 {
1505                let sort_column = SortColumn {
1506                    values: new_data_arr,
1507                    options: None,
1508                };
1509                let indices = lexsort_to_indices(&[sort_column], None)?;
1510                let perm_bytes = serialize_array(&indices)?;
1511                staged_puts.push(BatchPut::Raw {
1512                    key: meta.value_order_perm_pk,
1513                    bytes: perm_bytes,
1514                });
1515            }
1516
1517            cum_rows = end_u64;
1518            any_changed = true;
1519        }
1520
1521        // Rewrite descriptor chains/totals and commit.
1522        descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1523        if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1524            rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1525        }
1526        drop(catalog);
1527        tracing::trace!(
1528            field_id = ?field_id,
1529            changed = any_changed,
1530            "delete_rows stage_delete_rows_for_field: finished stage"
1531        );
1532        Ok(any_changed)
1533    }
1534
1535    /// Delete row positions for one or more logical fields in a single atomic batch.
1536    ///
1537    /// The same set of global row positions is applied to every field in
1538    /// `fields`. All staged metadata and chunk updates are committed in a
1539    /// single pager batch.
1540    pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1541        if fields.is_empty() || rows_to_delete.is_empty() {
1542            return Ok(());
1543        }
1544
1545        let mut puts = Vec::new();
1546        let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1547        let mut table_id: Option<TableId> = None;
1548
1549        tracing::warn!(
1550            fields = fields.len(),
1551            rows = rows_to_delete.len(),
1552            "delete_rows begin"
1553        );
1554        for field_id in fields {
1555            tracing::warn!(field = ?field_id, "delete_rows iter field");
1556            if let Some(expected) = table_id {
1557                if field_id.table_id() != expected {
1558                    return Err(Error::InvalidArgumentError(
1559                        "delete_rows requires fields from the same table".into(),
1560                    ));
1561                }
1562            } else {
1563                table_id = Some(field_id.table_id());
1564            }
1565
1566            if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1567                touched.insert(*field_id);
1568            }
1569        }
1570
1571        if puts.is_empty() {
1572            return Ok(());
1573        }
1574
1575        self.pager.batch_put(&puts)?;
1576
1577        tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1578
1579        for field_id in touched {
1580            self.compact_field_bounded(field_id)?;
1581        }
1582        tracing::warn!("delete_rows complete");
1583        Ok(())
1584    }
1585
1586    // TODO: Move to descriptor module?
1587    // NOTE: This helper mutates descriptor metadata in place because it needs
1588    // access to the pager and catalog locks owned by `ColumnStore`.
1589    /// Write a descriptor chain from a meta slice. Reuses first page when
1590    /// possible. Frees surplus pages via `frees`.
1591    pub(crate) fn write_descriptor_chain(
1592        &self,
1593        descriptor_pk: PhysicalKey,
1594        descriptor: &mut ColumnDescriptor,
1595        new_metas: &[ChunkMetadata],
1596        puts: &mut Vec<BatchPut>,
1597        frees: &mut Vec<PhysicalKey>,
1598    ) -> Result<()> {
1599        // Collect existing page chain.
1600        let mut old_pages = Vec::new();
1601        let mut pk = descriptor.head_page_pk;
1602        while pk != 0 {
1603            let page_blob = self
1604                .pager
1605                .batch_get(&[BatchGet::Raw { key: pk }])?
1606                .pop()
1607                .and_then(|res| match res {
1608                    GetResult::Raw { bytes, .. } => Some(bytes),
1609                    _ => None,
1610                })
1611                .ok_or(Error::NotFound)?;
1612            let header = DescriptorPageHeader::from_le_bytes(
1613                &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1614            );
1615            old_pages.push(pk);
1616            pk = header.next_page_pk;
1617        }
1618
1619        // Required pages for new metas.
1620        let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1621        let need_pages = if new_metas.is_empty() {
1622            0
1623        } else {
1624            new_metas.len().div_ceil(per)
1625        };
1626        // If empty: free everything and clear counters.
1627        if need_pages == 0 {
1628            frees.extend(old_pages.iter().copied());
1629
1630            // Setting both page pointers to 0 indicates an empty descriptor state.
1631            // This signals that the descriptor needs reinitialization on next access,
1632            // as this pattern (head_page_pk == 0 && tail_page_pk == 0) is checked
1633            // in the `load_or_create` method to determine if allocation is needed.
1634            descriptor.head_page_pk = 0;
1635            descriptor.tail_page_pk = 0;
1636            descriptor.total_row_count = 0;
1637            descriptor.total_chunk_count = 0;
1638            puts.push(BatchPut::Raw {
1639                key: descriptor_pk,
1640                bytes: descriptor.to_le_bytes(),
1641            });
1642            return Ok(());
1643        }
1644
1645        // Reuse first page; alloc more if needed.
1646        let mut pages = Vec::with_capacity(need_pages);
1647        if !old_pages.is_empty() {
1648            pages.push(old_pages[0]);
1649        } else {
1650            pages.push(self.pager.alloc_many(1)?[0]);
1651            descriptor.head_page_pk = pages[0];
1652        }
1653        if need_pages > pages.len() {
1654            let extra = self.pager.alloc_many(need_pages - pages.len())?;
1655            pages.extend(extra);
1656        }
1657
1658        // Free surplus old pages.
1659        if old_pages.len() > need_pages {
1660            frees.extend(old_pages[need_pages..].iter().copied());
1661        }
1662
1663        // Write page contents.
1664        let mut off = 0usize;
1665        for (i, page_pk) in pages.iter().copied().enumerate() {
1666            let remain = new_metas.len() - off;
1667            let count = remain.min(per);
1668            let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1669            let header = DescriptorPageHeader {
1670                next_page_pk: next,
1671                entry_count: count as u32,
1672                _padding: [0; 4],
1673            };
1674            let mut page_bytes = header.to_le_bytes().to_vec();
1675            for m in &new_metas[off..off + count] {
1676                page_bytes.extend_from_slice(&m.to_le_bytes());
1677            }
1678            puts.push(BatchPut::Raw {
1679                key: page_pk,
1680                bytes: page_bytes,
1681            });
1682            off += count;
1683        }
1684
1685        descriptor.tail_page_pk = *pages.last().unwrap();
1686        descriptor.total_chunk_count = new_metas.len() as u64;
1687        descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1688        puts.push(BatchPut::Raw {
1689            key: descriptor_pk,
1690            bytes: descriptor.to_le_bytes(),
1691        });
1692        Ok(())
1693    }
1694
1695    /// Bounded, local field compaction. Merges adjacent small chunks into
1696    /// ~TARGET_CHUNK_BYTES; leaves large chunks intact. Recomputes perms if
1697    /// any source chunk in a run had one. Frees obsolete chunks/pages.
1698    fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1699        // We may rewrite descriptors; take a write lock.
1700        let mut catalog = self.catalog.write().unwrap();
1701
1702        let desc_pk = match catalog.map.get(&field_id) {
1703            Some(&pk) => pk,
1704            None => return Ok(()),
1705        };
1706        let rid_fid = rowid_fid(field_id);
1707        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1708            Some(&pk) => pk,
1709            None => return Ok(()),
1710        };
1711
1712        // True batching for the two descriptor reads.
1713        let gets = vec![
1714            BatchGet::Raw { key: desc_pk },
1715            BatchGet::Raw { key: desc_pk_rid },
1716        ];
1717        let results = self.pager.batch_get(&gets)?;
1718        let mut blobs_by_pk = FxHashMap::default();
1719        for res in results {
1720            if let GetResult::Raw { key, bytes } = res {
1721                blobs_by_pk.insert(key, bytes);
1722            }
1723        }
1724        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1725        let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1726        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1727        let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1728
1729        // Load metas.
1730        let mut metas = Vec::new();
1731        for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1732            metas.push(m?);
1733        }
1734        let mut metas_rid = Vec::new();
1735        for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1736            metas_rid.push(m?);
1737        }
1738        if metas.is_empty() || metas_rid.is_empty() {
1739            return Ok(());
1740        }
1741
1742        let mut puts: Vec<BatchPut> = Vec::new();
1743        let mut frees: Vec<PhysicalKey> = Vec::new();
1744        let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1745        let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1746
1747        let mut i = 0usize;
1748        while i < metas.len() {
1749            let sz = metas[i].serialized_bytes as usize;
1750            // Keep large chunks as-is.
1751            if sz >= MIN_CHUNK_BYTES {
1752                new_metas.push(metas[i]);
1753                new_rid_metas.push(metas_rid[i]);
1754                i += 1;
1755                continue;
1756            }
1757
1758            // Build a small run [i, j) capped by MAX_MERGE_RUN_BYTES.
1759            let mut j = i;
1760            let mut run_bytes = 0usize;
1761            while j < metas.len() {
1762                let b = metas[j].serialized_bytes as usize;
1763                if b >= TARGET_CHUNK_BYTES {
1764                    break;
1765                }
1766                if run_bytes + b > MAX_MERGE_RUN_BYTES {
1767                    break;
1768                }
1769                run_bytes += b;
1770                j += 1;
1771            }
1772            if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1773                new_metas.push(metas[i]);
1774                new_rid_metas.push(metas_rid[i]);
1775                i += 1;
1776                continue;
1777            }
1778
1779            // Fetch and concatenate the run's data and row_ids.
1780            let mut gets = Vec::with_capacity((j - i) * 2);
1781            for k in i..j {
1782                gets.push(BatchGet::Raw {
1783                    key: metas[k].chunk_pk,
1784                });
1785                gets.push(BatchGet::Raw {
1786                    key: metas_rid[k].chunk_pk,
1787                });
1788            }
1789            let results = self.pager.batch_get(&gets)?;
1790            let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1791            for r in results {
1792                match r {
1793                    GetResult::Raw { key, bytes } => {
1794                        by_pk.insert(key, bytes);
1795                    }
1796                    _ => return Err(Error::NotFound),
1797                }
1798            }
1799            let mut data_parts = Vec::with_capacity(j - i);
1800            let mut rid_parts = Vec::with_capacity(j - i);
1801            for k in i..j {
1802                let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1803                data_parts.push(deserialize_array(db.clone())?);
1804                let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1805                rid_parts.push(deserialize_array(rb.clone())?);
1806            }
1807            let merged_data = concat_many(data_parts.iter().collect())?;
1808            let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1809
1810            // Split merged run into ~target-sized chunks.
1811            let slices = split_to_target_bytes(
1812                &merged_data,
1813                TARGET_CHUNK_BYTES,
1814                self.cfg.varwidth_fallback_rows_per_slice,
1815            );
1816            let mut rid_off = 0usize;
1817            let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1818
1819            for s in slices {
1820                let rows = s.len();
1821                // Slice rid to match s (avoid double Arc).
1822                let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1823                let rid_norm = zero_offset(&rid_ref);
1824                let rid_pk = self.pager.alloc_many(1)?[0];
1825                let rid_bytes = serialize_array(rid_norm.as_ref())?;
1826
1827                let data_pk = self.pager.alloc_many(1)?[0];
1828                let s_norm = zero_offset(&s);
1829                let data_bytes = serialize_array(s_norm.as_ref())?;
1830                puts.push(BatchPut::Raw {
1831                    key: data_pk,
1832                    bytes: data_bytes,
1833                });
1834                puts.push(BatchPut::Raw {
1835                    key: rid_pk,
1836                    bytes: rid_bytes,
1837                });
1838                let mut meta = ChunkMetadata {
1839                    chunk_pk: data_pk,
1840                    row_count: rows as u64,
1841                    serialized_bytes: s_norm.get_array_memory_size() as u64,
1842                    max_val_u64: u64::MAX,
1843                    ..Default::default()
1844                };
1845                // If any source chunk had a perm, recompute for this slice.
1846                if need_perms {
1847                    let sort_col = SortColumn {
1848                        values: s.clone(),
1849                        options: None,
1850                    };
1851                    let idx = lexsort_to_indices(&[sort_col], None)?;
1852                    let perm_bytes = serialize_array(&idx)?;
1853                    let perm_pk = self.pager.alloc_many(1)?[0];
1854                    puts.push(BatchPut::Raw {
1855                        key: perm_pk,
1856                        bytes: perm_bytes,
1857                    });
1858                    meta.value_order_perm_pk = perm_pk;
1859                }
1860
1861                // Build presence index for rids and min/max
1862                let rid_any = rid_norm.clone();
1863                let rids = rid_any
1864                    .as_any()
1865                    .downcast_ref::<UInt64Array>()
1866                    .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1867                let mut min = u64::MAX;
1868                let mut max = 0u64;
1869                let mut sorted_rids = true;
1870                let mut last_v = 0u64;
1871                for ii in 0..rids.len() {
1872                    let v = rids.value(ii);
1873                    if ii == 0 {
1874                        last_v = v;
1875                    } else if v < last_v {
1876                        sorted_rids = false;
1877                    } else {
1878                        last_v = v;
1879                    }
1880                    if v < min {
1881                        min = v;
1882                    }
1883                    if v > max {
1884                        max = v;
1885                    }
1886                }
1887                let mut rid_perm_pk = 0u64;
1888                if !sorted_rids {
1889                    let rid_sort_col = SortColumn {
1890                        values: rid_any,
1891                        options: None,
1892                    };
1893                    let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1894                    let rid_perm_bytes = serialize_array(&rid_idx)?;
1895                    rid_perm_pk = self.pager.alloc_many(1)?[0];
1896                    puts.push(BatchPut::Raw {
1897                        key: rid_perm_pk,
1898                        bytes: rid_perm_bytes,
1899                    });
1900                }
1901                let rid_meta = ChunkMetadata {
1902                    chunk_pk: rid_pk,
1903                    value_order_perm_pk: rid_perm_pk,
1904                    row_count: rows as u64,
1905                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
1906                    min_val_u64: if rows > 0 { min } else { 0 },
1907                    max_val_u64: if rows > 0 { max } else { 0 },
1908                };
1909                new_metas.push(meta);
1910                new_rid_metas.push(rid_meta);
1911                rid_off += rows;
1912            }
1913
1914            // Free all old data/rid/perms in the merged run.
1915            for k in i..j {
1916                frees.push(metas[k].chunk_pk);
1917                if metas[k].value_order_perm_pk != 0 {
1918                    frees.push(metas[k].value_order_perm_pk);
1919                }
1920                frees.push(metas_rid[k].chunk_pk);
1921                if metas_rid[k].value_order_perm_pk != 0 {
1922                    frees.push(metas_rid[k].value_order_perm_pk);
1923                }
1924            }
1925
1926            i = j;
1927        }
1928
1929        // If everything was deleted, drop the field entirely.
1930        if new_metas.is_empty() {
1931            // Drop descriptors and catalog mapping.
1932            self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1933            self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1934            catalog.map.remove(&field_id);
1935            catalog.map.remove(&rid_fid);
1936            puts.push(BatchPut::Raw {
1937                key: CATALOG_ROOT_PKEY,
1938                bytes: catalog.to_bytes(),
1939            });
1940            if !puts.is_empty() {
1941                self.pager.batch_put(&puts)?;
1942            }
1943            if !frees.is_empty() {
1944                self.pager.free_many(&frees)?;
1945            }
1946            return Ok(());
1947        }
1948
1949        // Rewrite descriptor chains to match the new meta lists.
1950        self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1951        self.write_descriptor_chain(
1952            desc_pk_rid,
1953            &mut desc_rid,
1954            &new_rid_metas,
1955            &mut puts,
1956            &mut frees,
1957        )?;
1958        // Persist new/updated blobs and free the old ones.
1959        if !puts.is_empty() {
1960            self.pager.batch_put(&puts)?;
1961        }
1962        if !frees.is_empty() {
1963            self.pager.free_many(&frees)?;
1964        }
1965
1966        Ok(())
1967    }
1968
1969    /// (Internal) Helper for batch appends. Appends metadata to the current
1970    /// in-memory tail page, creating a new one if necessary.
1971    fn append_meta_in_loop(
1972        &self,
1973        descriptor: &mut ColumnDescriptor,
1974        tail_page_bytes: &mut Vec<u8>,
1975        meta: ChunkMetadata,
1976        puts: &mut Vec<BatchPut>,
1977    ) -> Result<()> {
1978        let mut header = DescriptorPageHeader::from_le_bytes(
1979            &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1980        );
1981        if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1982            && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1983        {
1984            // Case 1: There's room in the current tail page.
1985            tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1986            header.entry_count += 1;
1987            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1988                .copy_from_slice(&header.to_le_bytes());
1989        } else {
1990            // Case 2: The tail page is full. Write it out and start a new one.
1991            let new_tail_pk = self.pager.alloc_many(1)?[0];
1992            header.next_page_pk = new_tail_pk;
1993            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1994                .copy_from_slice(&header.to_le_bytes());
1995            // --- PERFORMANCE FIX: Move the full page's bytes instead of
1996            // cloning ---
1997            let full_page_to_write = std::mem::take(tail_page_bytes);
1998            puts.push(BatchPut::Raw {
1999                key: descriptor.tail_page_pk,
2000                bytes: full_page_to_write,
2001            });
2002            // Create the new tail page.
2003            let new_header = DescriptorPageHeader {
2004                next_page_pk: 0,
2005                entry_count: 1,
2006                _padding: [0; 4],
2007            };
2008            let mut new_page_bytes = new_header.to_le_bytes().to_vec();
2009            new_page_bytes.extend_from_slice(&meta.to_le_bytes());
2010            // Update our live state to point to the new tail.
2011            descriptor.tail_page_pk = new_tail_pk;
2012            *tail_page_bytes = new_page_bytes;
2013        }
2014
2015        descriptor.total_row_count += meta.row_count;
2016        descriptor.total_chunk_count += 1;
2017        Ok(())
2018    }
2019
2020    /// Verifies the integrity of the column store's metadata.
2021    ///
2022    /// This check is useful for tests and debugging. It verifies:
2023    /// 1. The catalog can be read.
2024    /// 2. All descriptor chains are walkable.
2025    /// 3. The row and chunk counts in the descriptors match the sum of the
2026    ///    chunk metadata.
2027    ///
2028    /// Returns `Ok(())` if consistent, otherwise returns an `Error`.
2029    pub fn verify_integrity(&self) -> Result<()> {
2030        let catalog = self.catalog.read().unwrap();
2031        for (&field_id, &descriptor_pk) in &catalog.map {
2032            let desc_blob = self
2033                .pager
2034                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2035                .pop()
2036                .and_then(|r| match r {
2037                    GetResult::Raw { bytes, .. } => Some(bytes),
2038                    _ => None,
2039                })
2040                .ok_or_else(|| {
2041                    Error::Internal(format!(
2042                        "Catalog points to missing descriptor pk={}",
2043                        descriptor_pk
2044                    ))
2045                })?;
2046            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2047            if descriptor.field_id != field_id {
2048                return Err(Error::Internal(format!(
2049                    "Descriptor at pk={} has wrong field_id: expected {:?}, \
2050                     got {:?}",
2051                    descriptor_pk, field_id, descriptor.field_id
2052                )));
2053            }
2054
2055            let mut actual_rows = 0;
2056            let mut actual_chunks = 0;
2057            let mut current_page_pk = descriptor.head_page_pk;
2058            while current_page_pk != 0 {
2059                let page_blob = self
2060                    .pager
2061                    .batch_get(&[BatchGet::Raw {
2062                        key: current_page_pk,
2063                    }])?
2064                    .pop()
2065                    .and_then(|r| match r {
2066                        GetResult::Raw { bytes, .. } => Some(bytes),
2067                        _ => None,
2068                    })
2069                    .ok_or_else(|| {
2070                        Error::Internal(format!(
2071                            "Descriptor page chain broken at pk={}",
2072                            current_page_pk
2073                        ))
2074                    })?;
2075                let header = DescriptorPageHeader::from_le_bytes(
2076                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2077                );
2078                for i in 0..(header.entry_count as usize) {
2079                    let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
2080                    let end = off + ChunkMetadata::DISK_SIZE;
2081                    let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
2082                    actual_rows += meta.row_count;
2083                    actual_chunks += 1;
2084                }
2085                current_page_pk = header.next_page_pk;
2086            }
2087
2088            if descriptor.total_row_count != actual_rows {
2089                return Err(Error::Internal(format!(
2090                    "Row count mismatch for field {:?}: descriptor says {}, \
2091                     actual is {}",
2092                    field_id, descriptor.total_row_count, actual_rows
2093                )));
2094            }
2095            if descriptor.total_chunk_count != actual_chunks {
2096                return Err(Error::Internal(format!(
2097                    "Chunk count mismatch for field {:?}: descriptor says {}, \
2098                     actual is {}",
2099                    field_id, descriptor.total_chunk_count, actual_chunks
2100                )));
2101            }
2102        }
2103        Ok(())
2104    }
2105
2106    /// Gathers detailed statistics about the storage layout.
2107    ///
2108    /// This method is designed for low-level analysis and debugging, allowing
2109    /// you to check for under- or over-utilization of descriptor pages.
2110    pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
2111        let catalog = self.catalog.read().unwrap();
2112        let mut all_stats = Vec::new();
2113
2114        for (&field_id, &descriptor_pk) in &catalog.map {
2115            let desc_blob = self
2116                .pager
2117                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
2118                .pop()
2119                .and_then(|r| match r {
2120                    GetResult::Raw { bytes, .. } => Some(bytes),
2121                    _ => None,
2122                })
2123                .ok_or(Error::NotFound)?;
2124            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
2125
2126            let mut page_stats = Vec::new();
2127            let mut current_page_pk = descriptor.head_page_pk;
2128            while current_page_pk != 0 {
2129                let page_blob = self
2130                    .pager
2131                    .batch_get(&[BatchGet::Raw {
2132                        key: current_page_pk,
2133                    }])?
2134                    .pop()
2135                    .and_then(|r| match r {
2136                        GetResult::Raw { bytes, .. } => Some(bytes),
2137                        _ => None,
2138                    })
2139                    .ok_or(Error::NotFound)?;
2140                let header = DescriptorPageHeader::from_le_bytes(
2141                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
2142                );
2143                page_stats.push(DescriptorPageStats {
2144                    page_pk: current_page_pk,
2145                    entry_count: header.entry_count,
2146                    page_size_bytes: page_blob.as_ref().len(),
2147                });
2148                current_page_pk = header.next_page_pk;
2149            }
2150
2151            all_stats.push(ColumnLayoutStats {
2152                field_id,
2153                total_rows: descriptor.total_row_count,
2154                total_chunks: descriptor.total_chunk_count,
2155                pages: page_stats,
2156            });
2157        }
2158        Ok(all_stats)
2159    }
2160}