llkv_column_map/store/
core.rs

1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4    ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::{LogicalFieldId, RowId, TableId};
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16    constants::CATALOG_ROOT_PKEY,
17    pager::{BatchGet, BatchPut, GetResult, Pager},
18    serialization::{deserialize_array, serialize_array},
19    types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26/// Columnar storage engine for managing Arrow-based data.
27///
28/// `ColumnStore` provides the primary interface for persisting and retrieving columnar
29/// data using Apache Arrow [`RecordBatch`]es. It manages:
30///
31/// - Column descriptors and metadata (chunk locations, row counts, min/max values)
32/// - Data type caching for efficient schema queries
33/// - Index management (presence indexes, value indexes)
34/// - Integration with the [`Pager`] for persistent storage
35///
36/// # Namespaces
37///
38/// Columns are identified by [`LogicalFieldId`], which combines a namespace, table ID,
39/// and field ID. This prevents collisions between user data, row IDs, and MVCC metadata:
40///
41/// - `UserData`: Regular table columns
42/// - `RowIdShadow`: Internal row ID tracking
43/// - `TxnCreatedBy`: MVCC transaction creation timestamps
44/// - `TxnDeletedBy`: MVCC transaction deletion timestamps
45///
46/// # Thread Safety
47///
48/// `ColumnStore` is `Send + Sync` and can be safely shared across threads via `Arc`.
49/// Internal state (catalog, caches) uses `RwLock` for concurrent access.
50pub struct ColumnStore<P: Pager> {
51    pub(crate) pager: Arc<P>,
52    pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
53    cfg: ColumnStoreConfig,
54    dtype_cache: DTypeCache<P>,
55    index_manager: IndexManager<P>,
56}
57
58impl<P> ColumnStore<P>
59where
60    P: Pager<Blob = EntryHandle> + Send + Sync,
61{
62    /// Open or create a `ColumnStore` using the provided pager.
63    ///
64    /// This loads the column catalog from the pager's root catalog key, or initializes
65    /// an empty catalog if none exists. The catalog maps [`LogicalFieldId`] to the
66    /// physical keys of column descriptors.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the pager fails to load the catalog or if deserialization fails.
71    pub fn open(pager: Arc<P>) -> Result<Self> {
72        let cfg = ColumnStoreConfig::default();
73        let catalog = match pager
74            .batch_get(&[BatchGet::Raw {
75                key: CATALOG_ROOT_PKEY,
76            }])?
77            .pop()
78        {
79            Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
80            _ => ColumnCatalog::default(),
81        };
82        let arc_catalog = Arc::new(RwLock::new(catalog));
83
84        let index_manager = IndexManager::new(Arc::clone(&pager));
85
86        Ok(Self {
87            pager: Arc::clone(&pager),
88            catalog: Arc::clone(&arc_catalog),
89            cfg,
90            dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
91            index_manager,
92        })
93    }
94
95    /// Create and persist an index for a column.
96    ///
97    /// This builds the specified index type for all existing data in the column and
98    /// persists it atomically. The index will be maintained automatically on subsequent
99    /// appends and updates.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the column doesn't exist or if index creation fails.
104    pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
105        self.index_manager.register_index(self, field_id, kind)
106    }
107
108    /// Remove a persisted index from a column.
109    ///
110    /// This atomically removes the index and frees associated storage. The column data
111    /// itself is not affected.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the column or index doesn't exist.
116    pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
117        self.index_manager.unregister_index(self, field_id, kind)
118    }
119
120    /// Get the Arrow data type of a column.
121    ///
122    /// This returns the data type from cache if available, otherwise loads it from
123    /// the column descriptor and caches it for future queries.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
128    pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
129        if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
130            return Ok(dt);
131        }
132        self.dtype_cache.dtype_for_field(field_id)
133    }
134
135    /// Find all row IDs where a column satisfies a predicate.
136    ///
137    /// This evaluates the predicate against the column's data and returns a vector
138    /// of matching row IDs. Uses indexes and chunk metadata (min/max values) to
139    /// skip irrelevant data when possible.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
144    pub fn filter_row_ids<T>(
145        &self,
146        field_id: LogicalFieldId,
147        predicate: &Predicate<T::Value>,
148    ) -> Result<Vec<u64>>
149    where
150        T: FilterDispatch,
151    {
152        tracing::trace!(field=?field_id, "filter_row_ids start");
153        let res = T::run_filter(self, field_id, predicate);
154        if let Err(ref err) = res {
155            tracing::trace!(field=?field_id, error=?err, "filter_row_ids error");
156        } else {
157            tracing::trace!(field=?field_id, "filter_row_ids ok");
158        }
159        res
160    }
161
162    // TODO: Document
163    pub fn filter_matches<T, F>(
164        &self,
165        field_id: LogicalFieldId,
166        predicate: F,
167    ) -> Result<FilterResult>
168    where
169        T: FilterPrimitive,
170        F: FnMut(T::Native) -> bool,
171    {
172        T::run_filter_with_result(self, field_id, predicate)
173    }
174
175    /// List all indexes registered for a column.
176    ///
177    /// Returns the types of indexes (e.g., presence, value) that are currently
178    /// persisted for the specified column.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
183    pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
184        let catalog = self.catalog.read().unwrap();
185        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
186
187        let desc_blob = self
188            .pager
189            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
190            .pop()
191            .and_then(|r| match r {
192                GetResult::Raw { bytes, .. } => Some(bytes),
193                _ => None,
194            })
195            .ok_or(Error::NotFound)?;
196        let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
197
198        let kinds = descriptor.get_indexes()?;
199        Ok(kinds)
200    }
201
202    /// Get the total number of rows in a column.
203    ///
204    /// Returns the persisted row count from the column's descriptor. This value is
205    /// updated by append and delete operations.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if the column doesn't exist or if the descriptor is corrupted.
210    pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
211        let catalog = self.catalog.read().unwrap();
212        let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
213        drop(catalog);
214
215        let desc_blob = self
216            .pager
217            .batch_get(&[BatchGet::Raw { key: desc_pk }])?
218            .pop()
219            .and_then(|r| match r {
220                GetResult::Raw { bytes, .. } => Some(bytes),
221                _ => None,
222            })
223            .ok_or(Error::NotFound)?;
224
225        let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
226        Ok(desc.total_row_count)
227    }
228
229    /// Get the total number of rows in a table.
230    ///
231    /// This returns the maximum row count across all user-data columns in the table.
232    /// If the table has no persisted columns, returns `0`.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if column descriptors cannot be loaded.
237    pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
238        use crate::types::Namespace;
239        // Acquire read lock on catalog and find any matching user-data field
240        let catalog = self.catalog.read().unwrap();
241        // Collect all user-data logical field ids for this table.
242        let candidates: Vec<LogicalFieldId> = catalog
243            .map
244            .keys()
245            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
246            .copied()
247            .collect();
248        drop(catalog);
249
250        if candidates.is_empty() {
251            return Ok(0);
252        }
253
254        // Return the maximum total_row_count across all user columns for the table.
255        let mut max_rows: u64 = 0;
256        for field in candidates {
257            let rows = self.total_rows_for_field(field)?;
258            if rows > max_rows {
259                max_rows = rows;
260            }
261        }
262        Ok(max_rows)
263    }
264
265    /// Get all user-data column IDs for a table.
266    ///
267    /// This returns the [`LogicalFieldId`]s of all persisted user columns (namespace
268    /// `UserData`) belonging to the specified table. MVCC and row ID columns are not
269    /// included.
270    pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
271        use crate::types::Namespace;
272
273        let catalog = self.catalog.read().unwrap();
274        catalog
275            .map
276            .keys()
277            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
278            .copied()
279            .collect()
280    }
281
282    /// Check whether a specific row ID exists in a column.
283    ///
284    /// This uses presence indexes and binary search when available for fast lookups.
285    /// If no presence index exists, it scans chunks and uses min/max metadata to
286    /// prune irrelevant data.
287    ///
288    /// # Errors
289    ///
290    /// Returns an error if the column doesn't exist or if chunk data is corrupted.
291    pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: RowId) -> Result<bool> {
292        let rid_fid = rowid_fid(field_id);
293        let catalog = self.catalog.read().unwrap();
294        let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
295        let rid_desc_blob = self
296            .pager
297            .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
298            .pop()
299            .and_then(|r| match r {
300                GetResult::Raw { bytes, .. } => Some(bytes),
301                _ => None,
302            })
303            .ok_or(Error::NotFound)?;
304        let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
305        drop(catalog);
306
307        // Walk metas; prune by min/max when available.
308        for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
309            let meta = m?;
310            if meta.row_count == 0 {
311                continue;
312            }
313            if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
314                || row_id > meta.max_val_u64
315            {
316                continue;
317            }
318            // Fetch rid chunk and, if present, the presence perm
319            let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
320            if meta.value_order_perm_pk != 0 {
321                gets.push(BatchGet::Raw {
322                    key: meta.value_order_perm_pk,
323                });
324            }
325            let results = self.pager.batch_get(&gets)?;
326            let mut rid_blob: Option<EntryHandle> = None;
327            let mut perm_blob: Option<EntryHandle> = None;
328            for r in results {
329                if let GetResult::Raw { key, bytes } = r {
330                    if key == meta.chunk_pk {
331                        rid_blob = Some(bytes);
332                    } else if key == meta.value_order_perm_pk {
333                        perm_blob = Some(bytes);
334                    }
335                }
336            }
337            // If the rid blob for this chunk is missing, treat as absent and continue
338            let Some(rid_blob) = rid_blob else { continue };
339            let rid_any = deserialize_array(rid_blob)?;
340            let rids = rid_any
341                .as_any()
342                .downcast_ref::<UInt64Array>()
343                .ok_or_else(|| Error::Internal("rid downcast".into()))?;
344            if let Some(pblob) = perm_blob {
345                let perm_any = deserialize_array(pblob)?;
346                let perm = perm_any
347                    .as_any()
348                    .downcast_ref::<UInt32Array>()
349                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
350                // Binary search over sorted-by-perm view
351                let mut lo: isize = 0;
352                let mut hi: isize = (perm.len() as isize) - 1;
353                while lo <= hi {
354                    let mid = ((lo + hi) >> 1) as usize;
355                    let rid = rids.value(perm.value(mid) as usize);
356                    if rid == row_id {
357                        return Ok(true);
358                    } else if rid < row_id {
359                        lo = mid as isize + 1;
360                    } else {
361                        hi = mid as isize - 1;
362                    }
363                }
364            } else {
365                // Assume rid chunk is sorted ascending (common for appends/compaction) and binary search
366                let mut lo: isize = 0;
367                let mut hi: isize = (rids.len() as isize) - 1;
368                while lo <= hi {
369                    let mid = ((lo + hi) >> 1) as usize;
370                    let rid = rids.value(mid);
371                    if rid == row_id {
372                        return Ok(true);
373                    } else if rid < row_id {
374                        lo = mid as isize + 1;
375                    } else {
376                        hi = mid as isize - 1;
377                    }
378                }
379            }
380        }
381        Ok(false)
382    }
383
384    // TODO: Set up a way to optionally auto-increment a row ID column if not provided.
385    /// Append a [`RecordBatch`] to the store.
386    ///
387    /// The batch must include a `rowid` column (type `UInt64`) that uniquely identifies
388    /// each row. Each other column must have `field_id` metadata mapping it to a
389    /// [`LogicalFieldId`].
390    ///
391    /// ## Last-Write-Wins Updates
392    ///
393    /// If any row IDs in the batch already exist, they are updated in-place (overwritten)
394    /// rather than creating duplicates. This happens in a separate transaction before
395    /// appending new rows.
396    ///
397    /// ## Row ID Ordering
398    ///
399    /// The batch is automatically sorted by `rowid` if not already sorted. This ensures
400    /// efficient metadata updates and naturally sorted shadow columns.
401    ///
402    /// ## Table Separation
403    ///
404    /// Each batch should contain columns from only one table. To append to multiple
405    /// tables, call `append` separately for each table's batch (may be concurrent).
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if:
410    /// - The batch is missing the `rowid` column
411    /// - Column metadata is missing or invalid
412    /// - Storage operations fail
413    #[allow(unused_variables, unused_assignments)] // TODO: Keep `presence_index_created`?
414    pub fn append(&self, batch: &RecordBatch) -> Result<()> {
415        // --- PHASE 1: PRE-PROCESSING THE INCOMING BATCH ---
416        // The `append` logic relies on row IDs being processed in ascending order to handle
417        // metadata updates efficiently and to ensure the shadow row_id chunks are naturally sorted.
418        // This block checks if the incoming batch is already sorted by `row_id`. If not, it creates a
419        // new, sorted `RecordBatch` to work with for the rest of the function.
420        let working_batch: RecordBatch;
421        let batch_ref = {
422            let schema = batch.schema();
423            let row_id_idx = schema
424                .index_of(ROW_ID_COLUMN_NAME)
425                .map_err(|_| Error::Internal("row_id column required".into()))?;
426            let row_id_any = batch.column(row_id_idx).clone();
427            let row_id_arr = row_id_any
428                .as_any()
429                .downcast_ref::<UInt64Array>()
430                .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
431
432            // Manually check if the row_id column is sorted.
433            let mut is_sorted = true;
434            if !row_id_arr.is_empty() {
435                let mut last = row_id_arr.value(0);
436                for i in 1..row_id_arr.len() {
437                    let current = row_id_arr.value(i);
438                    if current < last {
439                        is_sorted = false;
440                        break;
441                    }
442                    last = current;
443                }
444            }
445
446            // If sorted, we can use the original batch directly.
447            // Otherwise, we compute a permutation and reorder all columns.
448            if is_sorted {
449                batch
450            } else {
451                let sort_col = SortColumn {
452                    values: row_id_any,
453                    options: None,
454                };
455                let idx = lexsort_to_indices(&[sort_col], None)?;
456                let perm = idx
457                    .as_any()
458                    .downcast_ref::<UInt32Array>()
459                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
460                let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
461                for i in 0..batch.num_columns() {
462                    cols.push(compute::take(batch.column(i), perm, None)?);
463                }
464                working_batch = RecordBatch::try_new(schema.clone(), cols)
465                    .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
466                &working_batch
467            }
468        };
469
470        // --- PHASE 2: LAST-WRITER-WINS (LWW) REWRITE ---
471        // This phase handles updates. It identifies any rows in the incoming batch that
472        // already exist in the store and rewrites them in-place. This is a separate
473        // transaction that happens before the main append of new rows.
474        let schema = batch_ref.schema();
475        let row_id_idx = schema
476            .index_of(ROW_ID_COLUMN_NAME)
477            .map_err(|_| Error::Internal("row_id column required".into()))?;
478
479        // Create a quick lookup map of incoming row IDs to their positions in the batch.
480        let row_id_arr = batch_ref
481            .column(row_id_idx)
482            .as_any()
483            .downcast_ref::<UInt64Array>()
484            .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
485        let mut incoming_ids_map = FxHashMap::default();
486        incoming_ids_map.reserve(row_id_arr.len());
487        for i in 0..row_id_arr.len() {
488            incoming_ids_map.insert(row_id_arr.value(i), i);
489        }
490
491        // These variables will track the state of the LWW transaction.
492        let mut catalog_dirty = false;
493        let mut puts_rewrites: Vec<BatchPut> = Vec::new();
494        let mut all_rewritten_ids = FxHashSet::default();
495
496        // Iterate through each column in the batch (except row_id) to perform rewrites.
497        let mut catalog_lock = self.catalog.write().unwrap();
498        for i in 0..batch_ref.num_columns() {
499            if i == row_id_idx {
500                continue;
501            }
502            let field = schema.field(i);
503            if let Some(field_id_str) = field.metadata().get(crate::store::FIELD_ID_META_KEY) {
504                let field_id = field_id_str
505                    .parse::<u64>()
506                    .map(LogicalFieldId::from)
507                    .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
508
509                // `lww_rewrite_for_field` finds overlapping row IDs and rewrites the data chunks.
510                // It returns the set of row IDs that were updated.
511                let rewritten = self.lww_rewrite_for_field(
512                    &mut catalog_lock,
513                    field_id,
514                    &incoming_ids_map,
515                    batch_ref.column(i),
516                    batch_ref.column(row_id_idx),
517                    &mut puts_rewrites,
518                )?;
519                all_rewritten_ids.extend(rewritten);
520            }
521        }
522        drop(catalog_lock);
523
524        // Commit the LWW changes to the pager immediately.
525        if !puts_rewrites.is_empty() {
526            self.pager.batch_put(&puts_rewrites)?;
527        }
528
529        // --- PHASE 3: FILTERING FOR NEW ROWS ---
530        // After handling updates, we filter the incoming batch to remove the rows that were
531        // just rewritten. The remaining rows are guaranteed to be new additions to the store.
532        let batch_to_append = if !all_rewritten_ids.is_empty() {
533            let keep_mask: Vec<bool> = (0..row_id_arr.len())
534                .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
535                .collect();
536            let keep_array = BooleanArray::from(keep_mask);
537            compute::filter_record_batch(batch_ref, &keep_array)?
538        } else {
539            batch_ref.clone()
540        };
541
542        // If no new rows are left, we are done.
543        if batch_to_append.num_rows() == 0 {
544            return Ok(());
545        }
546
547        // --- PHASE 4: APPENDING NEW DATA ---
548        // This is the main append transaction. All writes generated in this phase will be
549        // collected and committed atomically at the very end.
550        let append_schema = batch_to_append.schema();
551        let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
552        let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
553        let mut puts_appends: Vec<BatchPut> = Vec::new();
554
555        // Loop through each column of the filtered batch to append its data.
556        for (i, array) in batch_to_append.columns().iter().enumerate() {
557            if i == append_row_id_idx {
558                continue;
559            }
560
561            let field = append_schema.field(i);
562
563            let field_id = field
564                .metadata()
565                .get(crate::store::FIELD_ID_META_KEY)
566                .ok_or_else(|| Error::Internal("Missing field_id".into()))?
567                .parse::<u64>()
568                .map(LogicalFieldId::from)
569                .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
570
571            // Populate the data type cache for this column. This is a performance optimization
572            // to avoid reading a chunk from storage later just to determine its type.
573            self.dtype_cache.insert(field_id, field.data_type().clone());
574
575            // Null values are treated as deletions, so we filter them out. The `rids_clean`
576            // array contains the row IDs corresponding to the non-null values.
577            let (array_clean, rids_clean) = if array.null_count() == 0 {
578                (array.clone(), append_row_id_any.clone())
579            } else {
580                let keep =
581                    BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
582                let a = compute::filter(array, &keep)?;
583                let r = compute::filter(&append_row_id_any, &keep)?;
584                (a, r)
585            };
586
587            if array_clean.is_empty() {
588                continue;
589            }
590
591            // Get or create the physical keys for the column's data descriptor and its
592            // shadow row_id descriptor from the catalog.
593            let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
594                let mut catalog = self.catalog.write().unwrap();
595                let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
596                    catalog_dirty = true;
597                    self.pager.alloc_many(1).unwrap()[0]
598                });
599                let r_fid = rowid_fid(field_id);
600                let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
601                    catalog_dirty = true;
602                    self.pager.alloc_many(1).unwrap()[0]
603                });
604                (pk1, pk2, r_fid)
605            };
606
607            // Load the descriptors and their tail metadata pages into memory.
608            // If they don't exist, `load_or_create` will initialize new ones.
609            let (mut data_descriptor, mut data_tail_page) =
610                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
611            let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
612                Arc::clone(&self.pager),
613                rid_descriptor_pk,
614                rid_fid,
615            )?;
616
617            // Logically register the Presence index on the main data descriptor. This ensures
618            // that even if no physical index chunks are created (because data arrived sorted),
619            // the system knows a Presence index is conceptually active for this column.
620            self.index_manager
621                .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
622
623            // Split the data to be appended into chunks of a target size.
624            let slices = split_to_target_bytes(
625                &array_clean,
626                TARGET_CHUNK_BYTES,
627                self.cfg.varwidth_fallback_rows_per_slice,
628            );
629            let mut row_off = 0usize;
630
631            // Loop through each new slice to create and stage its chunks.
632            for s in slices {
633                let rows = s.len();
634                // Create and stage the data chunk.
635                let data_pk = self.pager.alloc_many(1)?[0];
636                let s_norm = zero_offset(&s);
637                let data_bytes = serialize_array(s_norm.as_ref())?;
638                puts_appends.push(BatchPut::Raw {
639                    key: data_pk,
640                    bytes: data_bytes,
641                });
642
643                // Create and stage the corresponding row_id chunk.
644                let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
645                let rid_norm = zero_offset(&rid_slice);
646                let rid_pk = self.pager.alloc_many(1)?[0];
647                let rid_bytes = serialize_array(rid_norm.as_ref())?;
648                puts_appends.push(BatchPut::Raw {
649                    key: rid_pk,
650                    bytes: rid_bytes,
651                });
652
653                // Compute min/max for the row_id chunk to enable pruning during scans.
654                let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
655                let (min, max) = if !rids_for_meta.is_empty() {
656                    let mut min_val = rids_for_meta.value(0);
657                    let mut max_val = rids_for_meta.value(0);
658                    for i in 1..rids_for_meta.len() {
659                        let v = rids_for_meta.value(i);
660                        if v < min_val {
661                            min_val = v;
662                        }
663                        if v > max_val {
664                            max_val = v;
665                        }
666                    }
667                    (min_val, max_val)
668                } else {
669                    (0, 0)
670                };
671
672                // Create the initial metadata for both chunks.
673                // The `value_order_perm_pk` is initialized to 0 (None).
674                let mut data_meta = ChunkMetadata {
675                    chunk_pk: data_pk,
676                    row_count: rows as u64,
677                    serialized_bytes: s_norm.get_array_memory_size() as u64,
678                    max_val_u64: u64::MAX,
679                    ..Default::default()
680                };
681                let mut rid_meta = ChunkMetadata {
682                    chunk_pk: rid_pk,
683                    row_count: rows as u64,
684                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
685                    min_val_u64: min,
686                    max_val_u64: max,
687                    ..Default::default()
688                };
689
690                // **GENERIC INDEX UPDATE DISPATCH**
691                // This is the single, index-agnostic call. The IndexManager will look up all
692                // active indexes for this column (e.g., Presence, Sort) and call their respective
693                // `stage_update_for_new_chunk` methods. This is where the physical index data
694                // (like permutation blobs) is created and staged.
695                self.index_manager.stage_updates_for_new_chunk(
696                    field_id,
697                    &data_descriptor,
698                    &s_norm,
699                    &rid_norm,
700                    &mut data_meta,
701                    &mut rid_meta,
702                    &mut puts_appends,
703                )?;
704
705                // Append the (potentially modified) metadata to their respective descriptor chains.
706                self.append_meta_in_loop(
707                    &mut data_descriptor,
708                    &mut data_tail_page,
709                    data_meta,
710                    &mut puts_appends,
711                )?;
712                self.append_meta_in_loop(
713                    &mut rid_descriptor,
714                    &mut rid_tail_page,
715                    rid_meta,
716                    &mut puts_appends,
717                )?;
718                row_off += rows;
719            }
720
721            // After processing all slices, stage the final writes for the updated tail pages
722            // and the root descriptor objects themselves.
723            puts_appends.push(BatchPut::Raw {
724                key: data_descriptor.tail_page_pk,
725                bytes: data_tail_page,
726            });
727            puts_appends.push(BatchPut::Raw {
728                key: descriptor_pk,
729                bytes: data_descriptor.to_le_bytes(),
730            });
731            puts_appends.push(BatchPut::Raw {
732                key: rid_descriptor.tail_page_pk,
733                bytes: rid_tail_page,
734            });
735            puts_appends.push(BatchPut::Raw {
736                key: rid_descriptor_pk,
737                bytes: rid_descriptor.to_le_bytes(),
738            });
739        }
740
741        // --- PHASE 5: FINAL ATOMIC COMMIT ---
742        // If the catalog was modified (e.g., new columns were created), stage its write.
743        if catalog_dirty {
744            let catalog = self.catalog.read().unwrap();
745            puts_appends.push(BatchPut::Raw {
746                key: CATALOG_ROOT_PKEY,
747                bytes: catalog.to_bytes(),
748            });
749        }
750
751        // Commit all staged puts (new data chunks, new row_id chunks, new index permutations,
752        // updated descriptor pages, updated root descriptors, and the updated catalog)
753        // in a single atomic operation.
754        if !puts_appends.is_empty() {
755            self.pager.batch_put(&puts_appends)?;
756        }
757        Ok(())
758    }
759
760    fn lww_rewrite_for_field(
761        &self,
762        catalog: &mut ColumnCatalog,
763        field_id: LogicalFieldId,
764        incoming_ids_map: &FxHashMap<u64, usize>,
765        incoming_data: &ArrayRef,
766        incoming_row_ids: &ArrayRef,
767        puts: &mut Vec<BatchPut>,
768    ) -> Result<FxHashSet<u64>> {
769        use crate::store::descriptor::DescriptorIterator;
770        use crate::store::ingest::ChunkEdit;
771
772        // Fast exit if nothing to rewrite.
773        if incoming_ids_map.is_empty() {
774            return Ok(FxHashSet::default());
775        }
776        let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
777
778        // Resolve descriptors for data and row_id columns.
779        let desc_pk_data = match catalog.map.get(&field_id) {
780            Some(pk) => *pk,
781            None => return Ok(FxHashSet::default()),
782        };
783        let rid_fid = rowid_fid(field_id);
784        let desc_pk_rid = match catalog.map.get(&rid_fid) {
785            Some(pk) => *pk,
786            None => return Ok(FxHashSet::default()),
787        };
788
789        // Batch fetch both descriptors.
790        let gets = vec![
791            BatchGet::Raw { key: desc_pk_data },
792            BatchGet::Raw { key: desc_pk_rid },
793        ];
794        let results = self.pager.batch_get(&gets)?;
795        let mut blobs_by_pk = FxHashMap::default();
796        for r in results {
797            if let GetResult::Raw { key, bytes } = r {
798                blobs_by_pk.insert(key, bytes);
799            }
800        }
801
802        let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or(Error::NotFound)?;
803        let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
804
805        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
806        let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
807
808        // Collect chunk metadata.
809        let mut metas_data: Vec<ChunkMetadata> = Vec::new();
810        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
811        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
812            metas_data.push(m?);
813        }
814        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
815            metas_rid.push(m?);
816        }
817
818        // Classify incoming rows: delete vs upsert.
819        let rid_in = incoming_row_ids
820            .as_any()
821            .downcast_ref::<UInt64Array>()
822            .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
823        let mut ids_to_delete = FxHashSet::default();
824        let mut ids_to_upsert = FxHashSet::default();
825        for i in 0..rid_in.len() {
826            let rid = rid_in.value(i);
827            if incoming_data.is_null(i) {
828                ids_to_delete.insert(rid);
829            } else {
830                ids_to_upsert.insert(rid);
831            }
832        }
833
834        // Scan row_id chunks to find hits, bucketed by chunk index.
835        let mut rewritten_ids = FxHashSet::default();
836        let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
837        let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
838
839        let n = metas_data.len().min(metas_rid.len());
840        if n > 0 {
841            // Fetch only row_id chunks to locate matches.
842            let mut gets_rid = Vec::with_capacity(n);
843            for rm in metas_rid.iter().take(n) {
844                gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
845            }
846            let rid_results = self.pager.batch_get(&gets_rid)?;
847            let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
848            for r in rid_results {
849                if let GetResult::Raw { key, bytes } = r {
850                    rid_blobs.insert(key, bytes);
851                }
852            }
853
854            for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
855                if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
856                    let rid_arr_any = deserialize_array(rid_blob.clone())?;
857                    let rid_arr = rid_arr_any
858                        .as_any()
859                        .downcast_ref::<UInt64Array>()
860                        .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
861                    for j in 0..rid_arr.len() {
862                        let rid = rid_arr.value(j);
863                        if incoming_ids.contains(&rid) {
864                            if ids_to_delete.contains(&rid) {
865                                hit_del.entry(i).or_default().push(rid);
866                            } else if ids_to_upsert.contains(&rid) {
867                                hit_up.entry(i).or_default().push(rid);
868                            }
869                            rewritten_ids.insert(rid);
870                        }
871                    }
872                }
873            }
874        }
875
876        if hit_up.is_empty() && hit_del.is_empty() {
877            return Ok(rewritten_ids);
878        }
879
880        // Batch fetch data+rid blobs for all chunks with hits.
881        let mut hit_set = FxHashSet::default();
882        hit_set.extend(hit_up.keys().copied());
883        hit_set.extend(hit_del.keys().copied());
884        let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
885
886        let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
887        for &i in &hit_idxs {
888            gets.push(BatchGet::Raw {
889                key: metas_data[i].chunk_pk,
890            });
891            gets.push(BatchGet::Raw {
892                key: metas_rid[i].chunk_pk,
893            });
894        }
895        let results = self.pager.batch_get(&gets)?;
896        let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
897        for r in results {
898            if let GetResult::Raw { key, bytes } = r {
899                blob_map.insert(key, bytes);
900            }
901        }
902
903        // Apply per-chunk edits and stage writebacks.
904        for i in hit_idxs {
905            let old_data_arr =
906                deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
907            let old_rid_arr_any =
908                deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
909            let old_rid_arr = old_rid_arr_any
910                .as_any()
911                .downcast_ref::<UInt64Array>()
912                .unwrap();
913
914            let up_vec = hit_up.remove(&i).unwrap_or_default();
915            let del_vec = hit_del.remove(&i).unwrap_or_default();
916
917            // Centralized LWW edit: builds keep mask and inject tails.
918            let edit = ChunkEdit::from_lww_upsert(
919                old_rid_arr,
920                &up_vec,
921                &del_vec,
922                incoming_data,
923                incoming_row_ids,
924                incoming_ids_map,
925            )?;
926
927            let (new_data_arr, new_rid_arr) =
928                ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
929
930            // Stage data writeback.
931            let data_bytes = serialize_array(&new_data_arr)?;
932            puts.push(BatchPut::Raw {
933                key: metas_data[i].chunk_pk,
934                bytes: data_bytes,
935            });
936            metas_data[i].row_count = new_data_arr.len() as u64;
937            metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
938
939            // Stage row_id writeback.
940            if let Some(rarr) = new_rid_arr {
941                let rid_bytes = serialize_array(&rarr)?;
942                puts.push(BatchPut::Raw {
943                    key: metas_rid[i].chunk_pk,
944                    bytes: rid_bytes,
945                });
946                metas_rid[i].row_count = rarr.len() as u64;
947                metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
948            }
949
950            // Refresh permutation if present.
951            if metas_data[i].value_order_perm_pk != 0 {
952                let sort_col = SortColumn {
953                    values: new_data_arr,
954                    options: None,
955                };
956                let idx = lexsort_to_indices(&[sort_col], None)?;
957                let perm_bytes = serialize_array(&idx)?;
958                puts.push(BatchPut::Raw {
959                    key: metas_data[i].value_order_perm_pk,
960                    bytes: perm_bytes,
961                });
962            }
963        }
964
965        // Rewrite descriptor chains/totals for both columns.
966        descriptor_data.rewrite_pages(
967            Arc::clone(&self.pager),
968            desc_pk_data,
969            &mut metas_data,
970            puts,
971        )?;
972        descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
973
974        Ok(rewritten_ids)
975    }
976
977    fn stage_delete_rows_for_field(
978        &self,
979        field_id: LogicalFieldId,
980        rows_to_delete: &[RowId],
981        staged_puts: &mut Vec<BatchPut>,
982    ) -> Result<bool> {
983        tracing::warn!(
984            field_id = ?field_id,
985            rows = rows_to_delete.len(),
986            "delete_rows stage_delete_rows_for_field: start"
987        );
988        use crate::store::descriptor::DescriptorIterator;
989        use crate::store::ingest::ChunkEdit;
990
991        if rows_to_delete.is_empty() {
992            return Ok(false);
993        }
994
995        // Stream and validate ascending, unique positions.
996        let mut del_iter = rows_to_delete.iter().copied();
997        let mut cur_del = del_iter.next();
998        let mut last_seen: Option<u64> = cur_del;
999
1000        // Lookup descriptors (data and optional row_id).
1001        let catalog = self.catalog.read().unwrap();
1002        let desc_pk = match catalog.map.get(&field_id) {
1003            Some(pk) => *pk,
1004            None => {
1005                tracing::trace!(
1006                    field_id = ?field_id,
1007                    "delete_rows stage_delete_rows_for_field: data descriptor missing"
1008                );
1009                return Err(Error::NotFound);
1010            }
1011        };
1012        let rid_fid = rowid_fid(field_id);
1013        let desc_pk_rid = catalog.map.get(&rid_fid).copied();
1014        tracing::warn!(
1015            field_id = ?field_id,
1016            desc_pk,
1017            desc_pk_rid = ?desc_pk_rid,
1018            "delete_rows stage_delete_rows_for_field: descriptor keys"
1019        );
1020
1021        // Batch fetch descriptor blobs up front.
1022        let mut gets = vec![BatchGet::Raw { key: desc_pk }];
1023        if let Some(pk) = desc_pk_rid {
1024            gets.push(BatchGet::Raw { key: pk });
1025        }
1026        let results = match self.pager.batch_get(&gets) {
1027            Ok(res) => res,
1028            Err(err) => {
1029                tracing::trace!(
1030                    field_id = ?field_id,
1031                    error = ?err,
1032                    "delete_rows stage_delete_rows_for_field: descriptor batch_get failed"
1033                );
1034                return Err(err);
1035            }
1036        };
1037        let mut blobs_by_pk = FxHashMap::default();
1038        for res in results {
1039            if let GetResult::Raw { key, bytes } = res {
1040                blobs_by_pk.insert(key, bytes);
1041            }
1042        }
1043
1044        tracing::warn!(
1045            field_id = ?field_id,
1046            desc_blob_found = blobs_by_pk.contains_key(&desc_pk),
1047            rid_blob_found = desc_pk_rid.map(|pk| blobs_by_pk.contains_key(&pk)),
1048            "delete_rows stage_delete_rows_for_field: descriptor fetch status"
1049        );
1050
1051        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or_else(|| {
1052            tracing::trace!(
1053                field_id = ?field_id,
1054                desc_pk,
1055                "delete_rows stage_delete_rows_for_field: descriptor blob missing"
1056            );
1057            Error::Internal(format!(
1058                "descriptor pk={} missing during delete_rows for field {:?}",
1059                desc_pk, field_id
1060            ))
1061        })?;
1062        let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1063
1064        // Build metas for data column.
1065        let mut metas: Vec<ChunkMetadata> = Vec::new();
1066        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
1067            metas.push(m?);
1068        }
1069        if metas.is_empty() {
1070            drop(catalog);
1071            return Ok(false);
1072        }
1073
1074        // Optionally mirror metas for row_id column.
1075        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
1076        let mut descriptor_rid: Option<ColumnDescriptor> = None;
1077        tracing::warn!(
1078            field_id = ?field_id,
1079            metas_len = metas.len(),
1080            desc_pk_rid = ?desc_pk_rid,
1081            "delete_rows stage_delete_rows_for_field: data metas loaded"
1082        );
1083        if let Some(pk_rid) = desc_pk_rid
1084            && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
1085        {
1086            let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1087            for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
1088                metas_rid.push(m?);
1089            }
1090            descriptor_rid = Some(d_rid);
1091        }
1092
1093        tracing::warn!(
1094            field_id = ?field_id,
1095            metas_rid_len = metas_rid.len(),
1096            "delete_rows stage_delete_rows_for_field: rowid metas loaded"
1097        );
1098
1099        let mut cum_rows = 0u64;
1100        let mut any_changed = false;
1101
1102        for (i, meta) in metas.iter_mut().enumerate() {
1103            let start_u64 = cum_rows;
1104            let end_u64 = start_u64 + meta.row_count;
1105
1106            // Advance deletes into this chunk window [start, end).
1107            while let Some(d) = cur_del {
1108                if d < start_u64
1109                    && let Some(prev) = last_seen
1110                {
1111                    if d < prev {
1112                        return Err(Error::Internal(
1113                            "rows_to_delete must be ascending/unique".into(),
1114                        ));
1115                    }
1116
1117                    last_seen = Some(d);
1118                    cur_del = del_iter.next();
1119                } else {
1120                    break;
1121                }
1122            }
1123
1124            // Collect local delete indices.
1125            let rows = meta.row_count as usize;
1126            let mut del_local: FxHashSet<usize> = FxHashSet::default();
1127            while let Some(d) = cur_del {
1128                if d >= end_u64 {
1129                    break;
1130                }
1131                del_local.insert((d - start_u64) as usize);
1132                last_seen = Some(d);
1133                cur_del = del_iter.next();
1134            }
1135
1136            if del_local.is_empty() {
1137                cum_rows = end_u64;
1138                continue;
1139            }
1140
1141            // Batch get chunk blobs (data and optional row_id).
1142            let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1143            if let Some(rm) = metas_rid.get(i) {
1144                chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1145            }
1146            let chunk_results = match self.pager.batch_get(&chunk_gets) {
1147                Ok(res) => res,
1148                Err(err) => {
1149                    tracing::trace!(
1150                        field_id = ?field_id,
1151                        chunk_pk = meta.chunk_pk,
1152                        error = ?err,
1153                        "delete_rows stage_delete_rows_for_field: chunk batch_get failed"
1154                    );
1155                    return Err(err);
1156                }
1157            };
1158            let mut chunk_blobs = FxHashMap::default();
1159            for res in chunk_results {
1160                if let GetResult::Raw { key, bytes } = res {
1161                    chunk_blobs.insert(key, bytes);
1162                }
1163            }
1164
1165            tracing::warn!(
1166                field_id = ?field_id,
1167                chunk_pk = meta.chunk_pk,
1168                rid_chunk_pk = metas_rid.get(i).map(|rm| rm.chunk_pk),
1169                data_found = chunk_blobs.contains_key(&meta.chunk_pk),
1170                rid_found = metas_rid
1171                    .get(i)
1172                    .map(|rm| chunk_blobs.contains_key(&rm.chunk_pk)),
1173                "delete_rows stage_delete_rows_for_field: chunk fetch status"
1174            );
1175
1176            let data_blob = match chunk_blobs.remove(&meta.chunk_pk) {
1177                Some(bytes) => bytes,
1178                None => {
1179                    tracing::trace!(
1180                        field_id = ?field_id,
1181                        chunk_pk = meta.chunk_pk,
1182                        "delete_rows stage_delete_rows_for_field: chunk missing"
1183                    );
1184                    return Err(Error::NotFound);
1185                }
1186            };
1187            let data_arr = deserialize_array(data_blob)?;
1188
1189            let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1190                let rid_blob = match chunk_blobs.remove(&rm.chunk_pk) {
1191                    Some(bytes) => bytes,
1192                    None => {
1193                        tracing::trace!(
1194                            field_id = ?field_id,
1195                            rowid_chunk_pk = rm.chunk_pk,
1196                            "delete_rows stage_delete_rows_for_field: rowid chunk missing"
1197                        );
1198                        return Err(Error::NotFound);
1199                    }
1200                };
1201                Some(deserialize_array(rid_blob)?)
1202            } else {
1203                None
1204            };
1205
1206            // *** Wired: build edit via ingest helper.
1207            let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1208
1209            // Apply edit (pure array ops).
1210            let (new_data_arr, new_rid_arr) =
1211                ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1212
1213            // Write back data.
1214            let data_bytes = serialize_array(&new_data_arr)?;
1215            staged_puts.push(BatchPut::Raw {
1216                key: meta.chunk_pk,
1217                bytes: data_bytes,
1218            });
1219            meta.row_count = new_data_arr.len() as u64;
1220            meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1221
1222            // Write back row_ids if present.
1223            if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1224                let rm = metas_rid.get_mut(i).unwrap();
1225                let rid_bytes = serialize_array(&rids)?;
1226                staged_puts.push(BatchPut::Raw {
1227                    key: rm.chunk_pk,
1228                    bytes: rid_bytes,
1229                });
1230                rm.row_count = rids.len() as u64;
1231                rm.serialized_bytes = rids.get_array_memory_size() as u64;
1232            }
1233
1234            // Refresh permutation if this chunk has one.
1235            if meta.value_order_perm_pk != 0 {
1236                let sort_column = SortColumn {
1237                    values: new_data_arr,
1238                    options: None,
1239                };
1240                let indices = lexsort_to_indices(&[sort_column], None)?;
1241                let perm_bytes = serialize_array(&indices)?;
1242                staged_puts.push(BatchPut::Raw {
1243                    key: meta.value_order_perm_pk,
1244                    bytes: perm_bytes,
1245                });
1246            }
1247
1248            cum_rows = end_u64;
1249            any_changed = true;
1250        }
1251
1252        // Rewrite descriptor chains/totals and commit.
1253        descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, staged_puts)?;
1254        if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1255            rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, staged_puts)?;
1256        }
1257        drop(catalog);
1258        tracing::trace!(
1259            field_id = ?field_id,
1260            changed = any_changed,
1261            "delete_rows stage_delete_rows_for_field: finished stage"
1262        );
1263        Ok(any_changed)
1264    }
1265
1266    /// Delete row positions for one or more logical fields in a single atomic batch.
1267    ///
1268    /// The same set of global row positions is applied to every field in
1269    /// `fields`. All staged metadata and chunk updates are committed in a
1270    /// single pager batch.
1271    pub fn delete_rows(&self, fields: &[LogicalFieldId], rows_to_delete: &[RowId]) -> Result<()> {
1272        if fields.is_empty() || rows_to_delete.is_empty() {
1273            return Ok(());
1274        }
1275
1276        let mut puts = Vec::new();
1277        let mut touched: FxHashSet<LogicalFieldId> = FxHashSet::default();
1278        let mut table_id: Option<TableId> = None;
1279
1280        tracing::warn!(
1281            fields = fields.len(),
1282            rows = rows_to_delete.len(),
1283            "delete_rows begin"
1284        );
1285        for field_id in fields {
1286            tracing::warn!(field = ?field_id, "delete_rows iter field");
1287            if let Some(expected) = table_id {
1288                if field_id.table_id() != expected {
1289                    return Err(Error::InvalidArgumentError(
1290                        "delete_rows requires fields from the same table".into(),
1291                    ));
1292                }
1293            } else {
1294                table_id = Some(field_id.table_id());
1295            }
1296
1297            if self.stage_delete_rows_for_field(*field_id, rows_to_delete, &mut puts)? {
1298                touched.insert(*field_id);
1299            }
1300        }
1301
1302        if puts.is_empty() {
1303            return Ok(());
1304        }
1305
1306        self.pager.batch_put(&puts)?;
1307
1308        tracing::warn!(touched = touched.len(), "delete_rows apply writes");
1309
1310        for field_id in touched {
1311            self.compact_field_bounded(field_id)?;
1312        }
1313        tracing::warn!("delete_rows complete");
1314        Ok(())
1315    }
1316
1317    // TODO: Move to descriptor module?
1318    /// Write a descriptor chain from a meta slice. Reuses first page when
1319    /// possible. Frees surplus pages via `frees`.
1320    pub(crate) fn write_descriptor_chain(
1321        &self,
1322        descriptor_pk: PhysicalKey,
1323        descriptor: &mut ColumnDescriptor,
1324        new_metas: &[ChunkMetadata],
1325        puts: &mut Vec<BatchPut>,
1326        frees: &mut Vec<PhysicalKey>,
1327    ) -> Result<()> {
1328        // Collect existing page chain.
1329        let mut old_pages = Vec::new();
1330        let mut pk = descriptor.head_page_pk;
1331        while pk != 0 {
1332            let page_blob = self
1333                .pager
1334                .batch_get(&[BatchGet::Raw { key: pk }])?
1335                .pop()
1336                .and_then(|res| match res {
1337                    GetResult::Raw { bytes, .. } => Some(bytes),
1338                    _ => None,
1339                })
1340                .ok_or(Error::NotFound)?;
1341            let header = DescriptorPageHeader::from_le_bytes(
1342                &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1343            );
1344            old_pages.push(pk);
1345            pk = header.next_page_pk;
1346        }
1347
1348        // Required pages for new metas.
1349        let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1350        let need_pages = if new_metas.is_empty() {
1351            0
1352        } else {
1353            new_metas.len().div_ceil(per)
1354        };
1355        // If empty: free everything and clear counters.
1356        if need_pages == 0 {
1357            frees.extend(old_pages.iter().copied());
1358            descriptor.total_row_count = 0;
1359            descriptor.total_chunk_count = 0;
1360            puts.push(BatchPut::Raw {
1361                key: descriptor_pk,
1362                bytes: descriptor.to_le_bytes(),
1363            });
1364            return Ok(());
1365        }
1366
1367        // Reuse first page; alloc more if needed.
1368        let mut pages = Vec::with_capacity(need_pages);
1369        if !old_pages.is_empty() {
1370            pages.push(old_pages[0]);
1371        } else {
1372            pages.push(self.pager.alloc_many(1)?[0]);
1373            descriptor.head_page_pk = pages[0];
1374        }
1375        if need_pages > pages.len() {
1376            let extra = self.pager.alloc_many(need_pages - pages.len())?;
1377            pages.extend(extra);
1378        }
1379
1380        // Free surplus old pages.
1381        if old_pages.len() > need_pages {
1382            frees.extend(old_pages[need_pages..].iter().copied());
1383        }
1384
1385        // Write page contents.
1386        let mut off = 0usize;
1387        for (i, page_pk) in pages.iter().copied().enumerate() {
1388            let remain = new_metas.len() - off;
1389            let count = remain.min(per);
1390            let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1391            let header = DescriptorPageHeader {
1392                next_page_pk: next,
1393                entry_count: count as u32,
1394                _padding: [0; 4],
1395            };
1396            let mut page_bytes = header.to_le_bytes().to_vec();
1397            for m in &new_metas[off..off + count] {
1398                page_bytes.extend_from_slice(&m.to_le_bytes());
1399            }
1400            puts.push(BatchPut::Raw {
1401                key: page_pk,
1402                bytes: page_bytes,
1403            });
1404            off += count;
1405        }
1406
1407        descriptor.tail_page_pk = *pages.last().unwrap();
1408        descriptor.total_chunk_count = new_metas.len() as u64;
1409        descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1410        puts.push(BatchPut::Raw {
1411            key: descriptor_pk,
1412            bytes: descriptor.to_le_bytes(),
1413        });
1414        Ok(())
1415    }
1416
1417    /// Bounded, local field compaction. Merges adjacent small chunks into
1418    /// ~TARGET_CHUNK_BYTES; leaves large chunks intact. Recomputes perms if
1419    /// any source chunk in a run had one. Frees obsolete chunks/pages.
1420    fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1421        // We may rewrite descriptors; take a write lock.
1422        let mut catalog = self.catalog.write().unwrap();
1423
1424        let desc_pk = match catalog.map.get(&field_id) {
1425            Some(&pk) => pk,
1426            None => return Ok(()),
1427        };
1428        let rid_fid = rowid_fid(field_id);
1429        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1430            Some(&pk) => pk,
1431            None => return Ok(()),
1432        };
1433
1434        // True batching for the two descriptor reads.
1435        let gets = vec![
1436            BatchGet::Raw { key: desc_pk },
1437            BatchGet::Raw { key: desc_pk_rid },
1438        ];
1439        let results = self.pager.batch_get(&gets)?;
1440        let mut blobs_by_pk = FxHashMap::default();
1441        for res in results {
1442            if let GetResult::Raw { key, bytes } = res {
1443                blobs_by_pk.insert(key, bytes);
1444            }
1445        }
1446        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1447        let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1448        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1449        let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1450
1451        // Load metas.
1452        let mut metas = Vec::new();
1453        for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1454            metas.push(m?);
1455        }
1456        let mut metas_rid = Vec::new();
1457        for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1458            metas_rid.push(m?);
1459        }
1460        if metas.is_empty() || metas_rid.is_empty() {
1461            return Ok(());
1462        }
1463
1464        let mut puts: Vec<BatchPut> = Vec::new();
1465        let mut frees: Vec<PhysicalKey> = Vec::new();
1466        let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1467        let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1468
1469        let mut i = 0usize;
1470        while i < metas.len() {
1471            let sz = metas[i].serialized_bytes as usize;
1472            // Keep large chunks as-is.
1473            if sz >= MIN_CHUNK_BYTES {
1474                new_metas.push(metas[i]);
1475                new_rid_metas.push(metas_rid[i]);
1476                i += 1;
1477                continue;
1478            }
1479
1480            // Build a small run [i, j) capped by MAX_MERGE_RUN_BYTES.
1481            let mut j = i;
1482            let mut run_bytes = 0usize;
1483            while j < metas.len() {
1484                let b = metas[j].serialized_bytes as usize;
1485                if b >= TARGET_CHUNK_BYTES {
1486                    break;
1487                }
1488                if run_bytes + b > MAX_MERGE_RUN_BYTES {
1489                    break;
1490                }
1491                run_bytes += b;
1492                j += 1;
1493            }
1494            if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1495                new_metas.push(metas[i]);
1496                new_rid_metas.push(metas_rid[i]);
1497                i += 1;
1498                continue;
1499            }
1500
1501            // Fetch and concatenate the run's data and row_ids.
1502            let mut gets = Vec::with_capacity((j - i) * 2);
1503            for k in i..j {
1504                gets.push(BatchGet::Raw {
1505                    key: metas[k].chunk_pk,
1506                });
1507                gets.push(BatchGet::Raw {
1508                    key: metas_rid[k].chunk_pk,
1509                });
1510            }
1511            let results = self.pager.batch_get(&gets)?;
1512            let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1513            for r in results {
1514                match r {
1515                    GetResult::Raw { key, bytes } => {
1516                        by_pk.insert(key, bytes);
1517                    }
1518                    _ => return Err(Error::NotFound),
1519                }
1520            }
1521            let mut data_parts = Vec::with_capacity(j - i);
1522            let mut rid_parts = Vec::with_capacity(j - i);
1523            for k in i..j {
1524                let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1525                data_parts.push(deserialize_array(db.clone())?);
1526                let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1527                rid_parts.push(deserialize_array(rb.clone())?);
1528            }
1529            let merged_data = concat_many(data_parts.iter().collect())?;
1530            let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1531
1532            // Split merged run into ~target-sized chunks.
1533            let slices = split_to_target_bytes(
1534                &merged_data,
1535                TARGET_CHUNK_BYTES,
1536                self.cfg.varwidth_fallback_rows_per_slice,
1537            );
1538            let mut rid_off = 0usize;
1539            let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1540
1541            for s in slices {
1542                let rows = s.len();
1543                // Slice rid to match s (avoid double Arc).
1544                let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1545                let rid_norm = zero_offset(&rid_ref);
1546                let rid_pk = self.pager.alloc_many(1)?[0];
1547                let rid_bytes = serialize_array(rid_norm.as_ref())?;
1548
1549                let data_pk = self.pager.alloc_many(1)?[0];
1550                let s_norm = zero_offset(&s);
1551                let data_bytes = serialize_array(s_norm.as_ref())?;
1552                puts.push(BatchPut::Raw {
1553                    key: data_pk,
1554                    bytes: data_bytes,
1555                });
1556                puts.push(BatchPut::Raw {
1557                    key: rid_pk,
1558                    bytes: rid_bytes,
1559                });
1560                let mut meta = ChunkMetadata {
1561                    chunk_pk: data_pk,
1562                    row_count: rows as u64,
1563                    serialized_bytes: s_norm.get_array_memory_size() as u64,
1564                    max_val_u64: u64::MAX,
1565                    ..Default::default()
1566                };
1567                // If any source chunk had a perm, recompute for this slice.
1568                if need_perms {
1569                    let sort_col = SortColumn {
1570                        values: s.clone(),
1571                        options: None,
1572                    };
1573                    let idx = lexsort_to_indices(&[sort_col], None)?;
1574                    let perm_bytes = serialize_array(&idx)?;
1575                    let perm_pk = self.pager.alloc_many(1)?[0];
1576                    puts.push(BatchPut::Raw {
1577                        key: perm_pk,
1578                        bytes: perm_bytes,
1579                    });
1580                    meta.value_order_perm_pk = perm_pk;
1581                }
1582
1583                // Build presence index for rids and min/max
1584                let rid_any = rid_norm.clone();
1585                let rids = rid_any
1586                    .as_any()
1587                    .downcast_ref::<UInt64Array>()
1588                    .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1589                let mut min = u64::MAX;
1590                let mut max = 0u64;
1591                let mut sorted_rids = true;
1592                let mut last_v = 0u64;
1593                for ii in 0..rids.len() {
1594                    let v = rids.value(ii);
1595                    if ii == 0 {
1596                        last_v = v;
1597                    } else if v < last_v {
1598                        sorted_rids = false;
1599                    } else {
1600                        last_v = v;
1601                    }
1602                    if v < min {
1603                        min = v;
1604                    }
1605                    if v > max {
1606                        max = v;
1607                    }
1608                }
1609                let mut rid_perm_pk = 0u64;
1610                if !sorted_rids {
1611                    let rid_sort_col = SortColumn {
1612                        values: rid_any,
1613                        options: None,
1614                    };
1615                    let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1616                    let rid_perm_bytes = serialize_array(&rid_idx)?;
1617                    rid_perm_pk = self.pager.alloc_many(1)?[0];
1618                    puts.push(BatchPut::Raw {
1619                        key: rid_perm_pk,
1620                        bytes: rid_perm_bytes,
1621                    });
1622                }
1623                let rid_meta = ChunkMetadata {
1624                    chunk_pk: rid_pk,
1625                    value_order_perm_pk: rid_perm_pk,
1626                    row_count: rows as u64,
1627                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
1628                    min_val_u64: if rows > 0 { min } else { 0 },
1629                    max_val_u64: if rows > 0 { max } else { 0 },
1630                };
1631                new_metas.push(meta);
1632                new_rid_metas.push(rid_meta);
1633                rid_off += rows;
1634            }
1635
1636            // Free all old data/rid/perms in the merged run.
1637            for k in i..j {
1638                frees.push(metas[k].chunk_pk);
1639                if metas[k].value_order_perm_pk != 0 {
1640                    frees.push(metas[k].value_order_perm_pk);
1641                }
1642                frees.push(metas_rid[k].chunk_pk);
1643                if metas_rid[k].value_order_perm_pk != 0 {
1644                    frees.push(metas_rid[k].value_order_perm_pk);
1645                }
1646            }
1647
1648            i = j;
1649        }
1650
1651        // If everything was deleted, drop the field entirely.
1652        if new_metas.is_empty() {
1653            // Drop descriptors and catalog mapping.
1654            self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1655            self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1656            catalog.map.remove(&field_id);
1657            catalog.map.remove(&rid_fid);
1658            puts.push(BatchPut::Raw {
1659                key: CATALOG_ROOT_PKEY,
1660                bytes: catalog.to_bytes(),
1661            });
1662            if !puts.is_empty() {
1663                self.pager.batch_put(&puts)?;
1664            }
1665            if !frees.is_empty() {
1666                self.pager.free_many(&frees)?;
1667            }
1668            return Ok(());
1669        }
1670
1671        // Rewrite descriptor chains to match the new meta lists.
1672        self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1673        self.write_descriptor_chain(
1674            desc_pk_rid,
1675            &mut desc_rid,
1676            &new_rid_metas,
1677            &mut puts,
1678            &mut frees,
1679        )?;
1680        // Persist new/updated blobs and free the old ones.
1681        if !puts.is_empty() {
1682            self.pager.batch_put(&puts)?;
1683        }
1684        if !frees.is_empty() {
1685            self.pager.free_many(&frees)?;
1686        }
1687
1688        Ok(())
1689    }
1690
1691    /// (Internal) Helper for batch appends. Appends metadata to the current
1692    /// in-memory tail page, creating a new one if necessary.
1693    fn append_meta_in_loop(
1694        &self,
1695        descriptor: &mut ColumnDescriptor,
1696        tail_page_bytes: &mut Vec<u8>,
1697        meta: ChunkMetadata,
1698        puts: &mut Vec<BatchPut>,
1699    ) -> Result<()> {
1700        let mut header = DescriptorPageHeader::from_le_bytes(
1701            &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1702        );
1703        if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1704            && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1705        {
1706            // Case 1: There's room in the current tail page.
1707            tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1708            header.entry_count += 1;
1709            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1710                .copy_from_slice(&header.to_le_bytes());
1711        } else {
1712            // Case 2: The tail page is full. Write it out and start a new one.
1713            let new_tail_pk = self.pager.alloc_many(1)?[0];
1714            header.next_page_pk = new_tail_pk;
1715            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1716                .copy_from_slice(&header.to_le_bytes());
1717            // --- PERFORMANCE FIX: Move the full page's bytes instead of
1718            // cloning ---
1719            let full_page_to_write = std::mem::take(tail_page_bytes);
1720            puts.push(BatchPut::Raw {
1721                key: descriptor.tail_page_pk,
1722                bytes: full_page_to_write,
1723            });
1724            // Create the new tail page.
1725            let new_header = DescriptorPageHeader {
1726                next_page_pk: 0,
1727                entry_count: 1,
1728                _padding: [0; 4],
1729            };
1730            let mut new_page_bytes = new_header.to_le_bytes().to_vec();
1731            new_page_bytes.extend_from_slice(&meta.to_le_bytes());
1732            // Update our live state to point to the new tail.
1733            descriptor.tail_page_pk = new_tail_pk;
1734            *tail_page_bytes = new_page_bytes;
1735        }
1736
1737        descriptor.total_row_count += meta.row_count;
1738        descriptor.total_chunk_count += 1;
1739        Ok(())
1740    }
1741
1742    /// Verifies the integrity of the column store's metadata.
1743    ///
1744    /// This check is useful for tests and debugging. It verifies:
1745    /// 1. The catalog can be read.
1746    /// 2. All descriptor chains are walkable.
1747    /// 3. The row and chunk counts in the descriptors match the sum of the
1748    ///    chunk metadata.
1749    ///
1750    /// Returns `Ok(())` if consistent, otherwise returns an `Error`.
1751    pub fn verify_integrity(&self) -> Result<()> {
1752        let catalog = self.catalog.read().unwrap();
1753        for (&field_id, &descriptor_pk) in &catalog.map {
1754            let desc_blob = self
1755                .pager
1756                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1757                .pop()
1758                .and_then(|r| match r {
1759                    GetResult::Raw { bytes, .. } => Some(bytes),
1760                    _ => None,
1761                })
1762                .ok_or_else(|| {
1763                    Error::Internal(format!(
1764                        "Catalog points to missing descriptor pk={}",
1765                        descriptor_pk
1766                    ))
1767                })?;
1768            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1769            if descriptor.field_id != field_id {
1770                return Err(Error::Internal(format!(
1771                    "Descriptor at pk={} has wrong field_id: expected {:?}, \
1772                     got {:?}",
1773                    descriptor_pk, field_id, descriptor.field_id
1774                )));
1775            }
1776
1777            let mut actual_rows = 0;
1778            let mut actual_chunks = 0;
1779            let mut current_page_pk = descriptor.head_page_pk;
1780            while current_page_pk != 0 {
1781                let page_blob = self
1782                    .pager
1783                    .batch_get(&[BatchGet::Raw {
1784                        key: current_page_pk,
1785                    }])?
1786                    .pop()
1787                    .and_then(|r| match r {
1788                        GetResult::Raw { bytes, .. } => Some(bytes),
1789                        _ => None,
1790                    })
1791                    .ok_or_else(|| {
1792                        Error::Internal(format!(
1793                            "Descriptor page chain broken at pk={}",
1794                            current_page_pk
1795                        ))
1796                    })?;
1797                let header = DescriptorPageHeader::from_le_bytes(
1798                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1799                );
1800                for i in 0..(header.entry_count as usize) {
1801                    let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
1802                    let end = off + ChunkMetadata::DISK_SIZE;
1803                    let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
1804                    actual_rows += meta.row_count;
1805                    actual_chunks += 1;
1806                }
1807                current_page_pk = header.next_page_pk;
1808            }
1809
1810            if descriptor.total_row_count != actual_rows {
1811                return Err(Error::Internal(format!(
1812                    "Row count mismatch for field {:?}: descriptor says {}, \
1813                     actual is {}",
1814                    field_id, descriptor.total_row_count, actual_rows
1815                )));
1816            }
1817            if descriptor.total_chunk_count != actual_chunks {
1818                return Err(Error::Internal(format!(
1819                    "Chunk count mismatch for field {:?}: descriptor says {}, \
1820                     actual is {}",
1821                    field_id, descriptor.total_chunk_count, actual_chunks
1822                )));
1823            }
1824        }
1825        Ok(())
1826    }
1827
1828    /// Gathers detailed statistics about the storage layout.
1829    ///
1830    /// This method is designed for low-level analysis and debugging, allowing
1831    /// you to check for under- or over-utilization of descriptor pages.
1832    pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
1833        let catalog = self.catalog.read().unwrap();
1834        let mut all_stats = Vec::new();
1835
1836        for (&field_id, &descriptor_pk) in &catalog.map {
1837            let desc_blob = self
1838                .pager
1839                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1840                .pop()
1841                .and_then(|r| match r {
1842                    GetResult::Raw { bytes, .. } => Some(bytes),
1843                    _ => None,
1844                })
1845                .ok_or(Error::NotFound)?;
1846            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1847
1848            let mut page_stats = Vec::new();
1849            let mut current_page_pk = descriptor.head_page_pk;
1850            while current_page_pk != 0 {
1851                let page_blob = self
1852                    .pager
1853                    .batch_get(&[BatchGet::Raw {
1854                        key: current_page_pk,
1855                    }])?
1856                    .pop()
1857                    .and_then(|r| match r {
1858                        GetResult::Raw { bytes, .. } => Some(bytes),
1859                        _ => None,
1860                    })
1861                    .ok_or(Error::NotFound)?;
1862                let header = DescriptorPageHeader::from_le_bytes(
1863                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1864                );
1865                page_stats.push(DescriptorPageStats {
1866                    page_pk: current_page_pk,
1867                    entry_count: header.entry_count,
1868                    page_size_bytes: page_blob.as_ref().len(),
1869                });
1870                current_page_pk = header.next_page_pk;
1871            }
1872
1873            all_stats.push(ColumnLayoutStats {
1874                field_id,
1875                total_rows: descriptor.total_row_count,
1876                total_chunks: descriptor.total_chunk_count,
1877                pages: page_stats,
1878            });
1879        }
1880        Ok(all_stats)
1881    }
1882}