llkv_column_map/store/
core.rs

1use super::*;
2use crate::store::catalog::ColumnCatalog;
3use crate::store::descriptor::{
4    ChunkMetadata, ColumnDescriptor, DescriptorIterator, DescriptorPageHeader,
5};
6use crate::store::scan::filter::FilterDispatch;
7use crate::store::scan::{FilterPrimitive, FilterResult};
8use crate::types::LogicalFieldId;
9use arrow::array::{Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array};
10use arrow::compute::{self, SortColumn, lexsort_to_indices};
11use arrow::datatypes::DataType;
12use arrow::record_batch::RecordBatch;
13use llkv_expr::typed_predicate::Predicate;
14use llkv_result::{Error, Result};
15use llkv_storage::{
16    constants::CATALOG_ROOT_PKEY,
17    pager::{BatchGet, BatchPut, GetResult, Pager},
18    serialization::{deserialize_array, serialize_array},
19    types::PhysicalKey,
20};
21
22use rustc_hash::{FxHashMap, FxHashSet};
23use simd_r_drive_entry_handle::EntryHandle;
24use std::sync::{Arc, RwLock};
25
26pub struct ColumnStore<P: Pager> {
27    pub(crate) pager: Arc<P>,
28    pub(crate) catalog: Arc<RwLock<ColumnCatalog>>,
29    cfg: ColumnStoreConfig,
30    dtype_cache: DTypeCache<P>,
31    index_manager: IndexManager<P>,
32}
33
34impl<P> ColumnStore<P>
35where
36    P: Pager<Blob = EntryHandle> + Send + Sync,
37{
38    pub fn open(pager: Arc<P>) -> Result<Self> {
39        let cfg = ColumnStoreConfig::default();
40        let catalog = match pager
41            .batch_get(&[BatchGet::Raw {
42                key: CATALOG_ROOT_PKEY,
43            }])?
44            .pop()
45        {
46            Some(GetResult::Raw { bytes, .. }) => ColumnCatalog::from_bytes(bytes.as_ref())?,
47            _ => ColumnCatalog::default(),
48        };
49        let arc_catalog = Arc::new(RwLock::new(catalog));
50
51        let index_manager = IndexManager::new(Arc::clone(&pager));
52
53        Ok(Self {
54            pager: Arc::clone(&pager),
55            catalog: Arc::clone(&arc_catalog),
56            cfg,
57            dtype_cache: DTypeCache::new(Arc::clone(&pager), Arc::clone(&arc_catalog)),
58            index_manager,
59        })
60    }
61
62    /// Registers an index for a given column, building it for existing data atomically and with low memory usage.
63    pub fn register_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
64        self.index_manager.register_index(self, field_id, kind)
65    }
66
67    /// Unregisters a persisted index from a given column atomically.
68    pub fn unregister_index(&self, field_id: LogicalFieldId, kind: IndexKind) -> Result<()> {
69        self.index_manager.unregister_index(self, field_id, kind)
70    }
71
72    /// Returns the Arrow data type of the given field, loading it if not cached.
73    pub fn data_type(&self, field_id: LogicalFieldId) -> Result<DataType> {
74        if let Some(dt) = self.dtype_cache.cached_data_type(field_id) {
75            return Ok(dt);
76        }
77        self.dtype_cache.dtype_for_field(field_id)
78    }
79
80    /// Collects the row ids whose values satisfy the provided predicate.
81    pub fn filter_row_ids<T>(
82        &self,
83        field_id: LogicalFieldId,
84        predicate: &Predicate<T::Value>,
85    ) -> Result<Vec<u64>>
86    where
87        T: FilterDispatch,
88    {
89        T::run_filter(self, field_id, predicate)
90    }
91
92    pub fn filter_matches<T, F>(
93        &self,
94        field_id: LogicalFieldId,
95        predicate: F,
96    ) -> Result<FilterResult>
97    where
98        T: FilterPrimitive,
99        F: FnMut(T::Native) -> bool,
100    {
101        T::run_filter_with_result(self, field_id, predicate)
102    }
103
104    /// Lists the names of all persisted indexes for a given column.
105    pub fn list_persisted_indexes(&self, field_id: LogicalFieldId) -> Result<Vec<IndexKind>> {
106        let catalog = self.catalog.read().unwrap();
107        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
108
109        let desc_blob = self
110            .pager
111            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
112            .pop()
113            .and_then(|r| match r {
114                GetResult::Raw { bytes, .. } => Some(bytes),
115                _ => None,
116            })
117            .ok_or(Error::NotFound)?;
118        let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
119
120        let kinds = descriptor.get_indexes()?;
121        Ok(kinds)
122    }
123
124    /// Return the total number of rows recorded for the given logical field.
125    ///
126    /// This reads the ColumnDescriptor.total_row_count stored in the catalog and
127    /// descriptor pages and returns it as a u64. The value is updated by append
128    /// / delete code paths and represents the persisted row count for that column.
129    pub fn total_rows_for_field(&self, field_id: LogicalFieldId) -> Result<u64> {
130        let catalog = self.catalog.read().unwrap();
131        let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
132        drop(catalog);
133
134        let desc_blob = self
135            .pager
136            .batch_get(&[BatchGet::Raw { key: desc_pk }])?
137            .pop()
138            .and_then(|r| match r {
139                GetResult::Raw { bytes, .. } => Some(bytes),
140                _ => None,
141            })
142            .ok_or(Error::NotFound)?;
143
144        let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
145        Ok(desc.total_row_count)
146    }
147
148    /// Return the total number of rows for a table by inspecting any persisted
149    /// user-data column descriptor belonging to `table_id`.
150    ///
151    /// This method picks the first user-data column found in the in-memory
152    /// catalog for the table and returns its descriptor.total_row_count. If the
153    /// table has no persisted columns, `Ok(0)` is returned.
154    pub fn total_rows_for_table(&self, table_id: crate::types::TableId) -> Result<u64> {
155        use crate::types::Namespace;
156        // Acquire read lock on catalog and find any matching user-data field
157        let catalog = self.catalog.read().unwrap();
158        // Collect all user-data logical field ids for this table.
159        let candidates: Vec<LogicalFieldId> = catalog
160            .map
161            .keys()
162            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
163            .copied()
164            .collect();
165        drop(catalog);
166
167        if candidates.is_empty() {
168            return Ok(0);
169        }
170
171        // Return the maximum total_row_count across all user columns for the table.
172        let mut max_rows: u64 = 0;
173        for field in candidates {
174            let rows = self.total_rows_for_field(field)?;
175            if rows > max_rows {
176                max_rows = rows;
177            }
178        }
179        Ok(max_rows)
180    }
181
182    /// Return the logical field identifiers for all user columns in `table_id`.
183    pub fn user_field_ids_for_table(&self, table_id: crate::types::TableId) -> Vec<LogicalFieldId> {
184        use crate::types::Namespace;
185
186        let catalog = self.catalog.read().unwrap();
187        catalog
188            .map
189            .keys()
190            .filter(|fid| fid.namespace() == Namespace::UserData && fid.table_id() == table_id)
191            .copied()
192            .collect()
193    }
194
195    /// Fast presence check using the presence index (row-id permutation) if available.
196    /// Returns true if `row_id` exists in the column; false otherwise.
197    pub fn has_row_id(&self, field_id: LogicalFieldId, row_id: u64) -> Result<bool> {
198        let rid_fid = rowid_fid(field_id);
199        let catalog = self.catalog.read().unwrap();
200        let rid_desc_pk = *catalog.map.get(&rid_fid).ok_or(Error::NotFound)?;
201        let rid_desc_blob = self
202            .pager
203            .batch_get(&[BatchGet::Raw { key: rid_desc_pk }])?
204            .pop()
205            .and_then(|r| match r {
206                GetResult::Raw { bytes, .. } => Some(bytes),
207                _ => None,
208            })
209            .ok_or(Error::NotFound)?;
210        let rid_desc = ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
211        drop(catalog);
212
213        // Walk metas; prune by min/max when available.
214        for m in DescriptorIterator::new(self.pager.as_ref(), rid_desc.head_page_pk) {
215            let meta = m?;
216            if meta.row_count == 0 {
217                continue;
218            }
219            if (meta.min_val_u64 != 0 || meta.max_val_u64 != 0) && row_id < meta.min_val_u64
220                || row_id > meta.max_val_u64
221            {
222                continue;
223            }
224            // Fetch rid chunk and, if present, the presence perm
225            let mut gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
226            if meta.value_order_perm_pk != 0 {
227                gets.push(BatchGet::Raw {
228                    key: meta.value_order_perm_pk,
229                });
230            }
231            let results = self.pager.batch_get(&gets)?;
232            let mut rid_blob: Option<EntryHandle> = None;
233            let mut perm_blob: Option<EntryHandle> = None;
234            for r in results {
235                if let GetResult::Raw { key, bytes } = r {
236                    if key == meta.chunk_pk {
237                        rid_blob = Some(bytes);
238                    } else if key == meta.value_order_perm_pk {
239                        perm_blob = Some(bytes);
240                    }
241                }
242            }
243            // If the rid blob for this chunk is missing, treat as absent and continue
244            let Some(rid_blob) = rid_blob else { continue };
245            let rid_any = deserialize_array(rid_blob)?;
246            let rids = rid_any
247                .as_any()
248                .downcast_ref::<UInt64Array>()
249                .ok_or_else(|| Error::Internal("rid downcast".into()))?;
250            if let Some(pblob) = perm_blob {
251                let perm_any = deserialize_array(pblob)?;
252                let perm = perm_any
253                    .as_any()
254                    .downcast_ref::<UInt32Array>()
255                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
256                // Binary search over sorted-by-perm view
257                let mut lo: isize = 0;
258                let mut hi: isize = (perm.len() as isize) - 1;
259                while lo <= hi {
260                    let mid = ((lo + hi) >> 1) as usize;
261                    let rid = rids.value(perm.value(mid) as usize);
262                    if rid == row_id {
263                        return Ok(true);
264                    } else if rid < row_id {
265                        lo = mid as isize + 1;
266                    } else {
267                        hi = mid as isize - 1;
268                    }
269                }
270            } else {
271                // Assume rid chunk is sorted ascending (common for appends/compaction) and binary search
272                let mut lo: isize = 0;
273                let mut hi: isize = (rids.len() as isize) - 1;
274                while lo <= hi {
275                    let mid = ((lo + hi) >> 1) as usize;
276                    let rid = rids.value(mid);
277                    if rid == row_id {
278                        return Ok(true);
279                    } else if rid < row_id {
280                        lo = mid as isize + 1;
281                    } else {
282                        hi = mid as isize - 1;
283                    }
284                }
285            }
286        }
287        Ok(false)
288    }
289
290    // TODO: Set up a way to optionally auto-increment a row ID column if not provided.
291
292    /// NOTE (logical table separation):
293    ///
294    /// The ColumnStore stores data per logical field (a LogicalFieldId) rather
295    /// than per 'table' directly. A LogicalFieldId encodes a namespace and a
296    /// table id together (see [crate::types::LogicalFieldId]). This design lets
297    /// multiple tables share the same underlying storage layer without
298    /// collisions because every persisted column descriptor, chunk chain and
299    /// presence/permutation index is keyed by the full LogicalFieldId.
300    ///
301    /// Concretely:
302    /// - `catalog.map` maps each LogicalFieldId -> descriptor physical key .
303    /// - Each ColumnDescriptor contains chunk metadata and a persisted
304    ///   `total_row_count` for that logical field only.
305    /// - Row-id shadow columns (presence/permutation indices) are created per
306    ///   logical field (they are derived from the LogicalFieldId) and are used
307    ///   for fast existence checks and indexed gathers.
308    ///
309    /// The `append` implementation below expects incoming RecordBatches to be
310    /// namespaced: each field's metadata contains a "field_id" that, when
311    /// combined with the current table id, maps to the LogicalFieldId the
312    /// ColumnStore will append into. This ensures that appends for different
313    /// tables never overwrite each other's descriptors or chunks because the
314    /// LogicalFieldId namespace keeps them separate.
315    ///
316    /// Constraints and recommended usage:
317    /// - A single `RecordBatch` carries exactly one `ROW_ID` column which is
318    ///   shared by all other columns in that batch. Because of this, a batch
319    ///   is implicitly tied to a single table's row-id space and should not
320    ///   mix columns from different tables.
321    /// - To append data for multiple tables, create separate `RecordBatch`
322    ///   instances (one per table) and call `append` for each. These calls
323    ///   may be executed concurrently for throughput; the ColumnStore persists
324    ///   data per-logical-field so physical writes won't collide.
325    /// - If you need an atomic multi-table append, the current API does not
326    ///   provide that; you'd need a higher-level coordinator that issues the
327    ///   per-table appends within a single transaction boundary (or extend the
328    ///   API to accept per-column row-id arrays). Such a change is more
329    ///   invasive and requires reworking LWW/commit logic.
330    #[allow(unused_variables, unused_assignments)] // TODO: Keep `presence_index_created`?
331    pub fn append(&self, batch: &RecordBatch) -> Result<()> {
332        // --- PHASE 1: PRE-PROCESSING THE INCOMING BATCH ---
333        // The `append` logic relies on row IDs being processed in ascending order to handle
334        // metadata updates efficiently and to ensure the shadow row_id chunks are naturally sorted.
335        // This block checks if the incoming batch is already sorted by `row_id`. If not, it creates a
336        // new, sorted `RecordBatch` to work with for the rest of the function.
337        let working_batch: RecordBatch;
338        let batch_ref = {
339            let schema = batch.schema();
340            let row_id_idx = schema
341                .index_of(ROW_ID_COLUMN_NAME)
342                .map_err(|_| Error::Internal("row_id column required".into()))?;
343            let row_id_any = batch.column(row_id_idx).clone();
344            let row_id_arr = row_id_any
345                .as_any()
346                .downcast_ref::<UInt64Array>()
347                .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
348
349            // Manually check if the row_id column is sorted.
350            let mut is_sorted = true;
351            if !row_id_arr.is_empty() {
352                let mut last = row_id_arr.value(0);
353                for i in 1..row_id_arr.len() {
354                    let current = row_id_arr.value(i);
355                    if current < last {
356                        is_sorted = false;
357                        break;
358                    }
359                    last = current;
360                }
361            }
362
363            // If sorted, we can use the original batch directly.
364            // Otherwise, we compute a permutation and reorder all columns.
365            if is_sorted {
366                batch
367            } else {
368                let sort_col = SortColumn {
369                    values: row_id_any,
370                    options: None,
371                };
372                let idx = lexsort_to_indices(&[sort_col], None)?;
373                let perm = idx
374                    .as_any()
375                    .downcast_ref::<UInt32Array>()
376                    .ok_or_else(|| Error::Internal("perm not u32".into()))?;
377                let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
378                for i in 0..batch.num_columns() {
379                    cols.push(compute::take(batch.column(i), perm, None)?);
380                }
381                working_batch = RecordBatch::try_new(schema.clone(), cols)
382                    .map_err(|e| Error::Internal(format!("record batch rebuild: {e}")))?;
383                &working_batch
384            }
385        };
386
387        // --- PHASE 2: LAST-WRITER-WINS (LWW) REWRITE ---
388        // This phase handles updates. It identifies any rows in the incoming batch that
389        // already exist in the store and rewrites them in-place. This is a separate
390        // transaction that happens before the main append of new rows.
391        let schema = batch_ref.schema();
392        let row_id_idx = schema
393            .index_of(ROW_ID_COLUMN_NAME)
394            .map_err(|_| Error::Internal("row_id column required".into()))?;
395
396        // Create a quick lookup map of incoming row IDs to their positions in the batch.
397        let row_id_arr = batch_ref
398            .column(row_id_idx)
399            .as_any()
400            .downcast_ref::<UInt64Array>()
401            .ok_or_else(|| Error::Internal("row_id downcast failed".into()))?;
402        let mut incoming_ids_map = FxHashMap::default();
403        incoming_ids_map.reserve(row_id_arr.len());
404        for i in 0..row_id_arr.len() {
405            incoming_ids_map.insert(row_id_arr.value(i), i);
406        }
407
408        // These variables will track the state of the LWW transaction.
409        let mut catalog_dirty = false;
410        let mut puts_rewrites: Vec<BatchPut> = Vec::new();
411        let mut all_rewritten_ids = FxHashSet::default();
412
413        // Iterate through each column in the batch (except row_id) to perform rewrites.
414        let mut catalog_lock = self.catalog.write().unwrap();
415        for i in 0..batch_ref.num_columns() {
416            if i == row_id_idx {
417                continue;
418            }
419            let field = schema.field(i);
420            if let Some(field_id_str) = field.metadata().get("field_id") {
421                let field_id = field_id_str
422                    .parse::<u64>()
423                    .map(LogicalFieldId::from)
424                    .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
425
426                // `lww_rewrite_for_field` finds overlapping row IDs and rewrites the data chunks.
427                // It returns the set of row IDs that were updated.
428                let rewritten = self.lww_rewrite_for_field(
429                    &mut catalog_lock,
430                    field_id,
431                    &incoming_ids_map,
432                    batch_ref.column(i),
433                    batch_ref.column(row_id_idx),
434                    &mut puts_rewrites,
435                )?;
436                all_rewritten_ids.extend(rewritten);
437            }
438        }
439        drop(catalog_lock);
440
441        // Commit the LWW changes to the pager immediately.
442        if !puts_rewrites.is_empty() {
443            self.pager.batch_put(&puts_rewrites)?;
444        }
445
446        // --- PHASE 3: FILTERING FOR NEW ROWS ---
447        // After handling updates, we filter the incoming batch to remove the rows that were
448        // just rewritten. The remaining rows are guaranteed to be new additions to the store.
449        let batch_to_append = if !all_rewritten_ids.is_empty() {
450            let keep_mask: Vec<bool> = (0..row_id_arr.len())
451                .map(|i| !all_rewritten_ids.contains(&row_id_arr.value(i)))
452                .collect();
453            let keep_array = BooleanArray::from(keep_mask);
454            compute::filter_record_batch(batch_ref, &keep_array)?
455        } else {
456            batch_ref.clone()
457        };
458
459        // If no new rows are left, we are done.
460        if batch_to_append.num_rows() == 0 {
461            return Ok(());
462        }
463
464        // --- PHASE 4: APPENDING NEW DATA ---
465        // This is the main append transaction. All writes generated in this phase will be
466        // collected and committed atomically at the very end.
467        let append_schema = batch_to_append.schema();
468        let append_row_id_idx = append_schema.index_of(ROW_ID_COLUMN_NAME)?;
469        let append_row_id_any: ArrayRef = Arc::clone(batch_to_append.column(append_row_id_idx));
470        let mut puts_appends: Vec<BatchPut> = Vec::new();
471
472        // Loop through each column of the filtered batch to append its data.
473        for (i, array) in batch_to_append.columns().iter().enumerate() {
474            if i == append_row_id_idx {
475                continue;
476            }
477
478            let field = append_schema.field(i);
479            let field_id = field
480                .metadata()
481                .get("field_id")
482                .ok_or_else(|| Error::Internal("Missing field_id".into()))?
483                .parse::<u64>()
484                .map(LogicalFieldId::from)
485                .map_err(|e| Error::Internal(format!("Invalid field_id: {}", e)))?;
486
487            // Populate the data type cache for this column. This is a performance optimization
488            // to avoid reading a chunk from storage later just to determine its type.
489            self.dtype_cache.insert(field_id, field.data_type().clone());
490
491            // Null values are treated as deletions, so we filter them out. The `rids_clean`
492            // array contains the row IDs corresponding to the non-null values.
493            let (array_clean, rids_clean) = if array.null_count() == 0 {
494                (array.clone(), append_row_id_any.clone())
495            } else {
496                let keep =
497                    BooleanArray::from_iter((0..array.len()).map(|j| Some(!array.is_null(j))));
498                let a = compute::filter(array, &keep)?;
499                let r = compute::filter(&append_row_id_any, &keep)?;
500                (a, r)
501            };
502
503            if array_clean.is_empty() {
504                continue;
505            }
506
507            // Get or create the physical keys for the column's data descriptor and its
508            // shadow row_id descriptor from the catalog.
509            let (descriptor_pk, rid_descriptor_pk, rid_fid) = {
510                let mut catalog = self.catalog.write().unwrap();
511                let pk1 = *catalog.map.entry(field_id).or_insert_with(|| {
512                    catalog_dirty = true;
513                    self.pager.alloc_many(1).unwrap()[0]
514                });
515                let r_fid = rowid_fid(field_id);
516                let pk2 = *catalog.map.entry(r_fid).or_insert_with(|| {
517                    catalog_dirty = true;
518                    self.pager.alloc_many(1).unwrap()[0]
519                });
520                (pk1, pk2, r_fid)
521            };
522
523            // Load the descriptors and their tail metadata pages into memory.
524            // If they don't exist, `load_or_create` will initialize new ones.
525            let (mut data_descriptor, mut data_tail_page) =
526                ColumnDescriptor::load_or_create(Arc::clone(&self.pager), descriptor_pk, field_id)?;
527            let (mut rid_descriptor, mut rid_tail_page) = ColumnDescriptor::load_or_create(
528                Arc::clone(&self.pager),
529                rid_descriptor_pk,
530                rid_fid,
531            )?;
532
533            // Logically register the Presence index on the main data descriptor. This ensures
534            // that even if no physical index chunks are created (because data arrived sorted),
535            // the system knows a Presence index is conceptually active for this column.
536            self.index_manager
537                .stage_index_registration(&mut data_descriptor, IndexKind::Presence)?;
538
539            // Split the data to be appended into chunks of a target size.
540            let slices = split_to_target_bytes(
541                &array_clean,
542                TARGET_CHUNK_BYTES,
543                self.cfg.varwidth_fallback_rows_per_slice,
544            );
545            let mut row_off = 0usize;
546
547            // Loop through each new slice to create and stage its chunks.
548            for s in slices {
549                let rows = s.len();
550                // Create and stage the data chunk.
551                let data_pk = self.pager.alloc_many(1)?[0];
552                let s_norm = zero_offset(&s);
553                let data_bytes = serialize_array(s_norm.as_ref())?;
554                puts_appends.push(BatchPut::Raw {
555                    key: data_pk,
556                    bytes: data_bytes,
557                });
558
559                // Create and stage the corresponding row_id chunk.
560                let rid_slice: ArrayRef = rids_clean.slice(row_off, rows);
561                let rid_norm = zero_offset(&rid_slice);
562                let rid_pk = self.pager.alloc_many(1)?[0];
563                let rid_bytes = serialize_array(rid_norm.as_ref())?;
564                puts_appends.push(BatchPut::Raw {
565                    key: rid_pk,
566                    bytes: rid_bytes,
567                });
568
569                // Compute min/max for the row_id chunk to enable pruning during scans.
570                let rids_for_meta = rid_norm.as_any().downcast_ref::<UInt64Array>().unwrap();
571                let (min, max) = if !rids_for_meta.is_empty() {
572                    let mut min_val = rids_for_meta.value(0);
573                    let mut max_val = rids_for_meta.value(0);
574                    for i in 1..rids_for_meta.len() {
575                        let v = rids_for_meta.value(i);
576                        if v < min_val {
577                            min_val = v;
578                        }
579                        if v > max_val {
580                            max_val = v;
581                        }
582                    }
583                    (min_val, max_val)
584                } else {
585                    (0, 0)
586                };
587
588                // Create the initial metadata for both chunks.
589                // The `value_order_perm_pk` is initialized to 0 (None).
590                let mut data_meta = ChunkMetadata {
591                    chunk_pk: data_pk,
592                    row_count: rows as u64,
593                    serialized_bytes: s_norm.get_array_memory_size() as u64,
594                    max_val_u64: u64::MAX,
595                    ..Default::default()
596                };
597                let mut rid_meta = ChunkMetadata {
598                    chunk_pk: rid_pk,
599                    row_count: rows as u64,
600                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
601                    min_val_u64: min,
602                    max_val_u64: max,
603                    ..Default::default()
604                };
605
606                // **GENERIC INDEX UPDATE DISPATCH**
607                // This is the single, index-agnostic call. The IndexManager will look up all
608                // active indexes for this column (e.g., Presence, Sort) and call their respective
609                // `stage_update_for_new_chunk` methods. This is where the physical index data
610                // (like permutation blobs) is created and staged.
611                self.index_manager.stage_updates_for_new_chunk(
612                    field_id,
613                    &data_descriptor,
614                    &s_norm,
615                    &rid_norm,
616                    &mut data_meta,
617                    &mut rid_meta,
618                    &mut puts_appends,
619                )?;
620
621                // Append the (potentially modified) metadata to their respective descriptor chains.
622                self.append_meta_in_loop(
623                    &mut data_descriptor,
624                    &mut data_tail_page,
625                    data_meta,
626                    &mut puts_appends,
627                )?;
628                self.append_meta_in_loop(
629                    &mut rid_descriptor,
630                    &mut rid_tail_page,
631                    rid_meta,
632                    &mut puts_appends,
633                )?;
634                row_off += rows;
635            }
636
637            // After processing all slices, stage the final writes for the updated tail pages
638            // and the root descriptor objects themselves.
639            puts_appends.push(BatchPut::Raw {
640                key: data_descriptor.tail_page_pk,
641                bytes: data_tail_page,
642            });
643            puts_appends.push(BatchPut::Raw {
644                key: descriptor_pk,
645                bytes: data_descriptor.to_le_bytes(),
646            });
647            puts_appends.push(BatchPut::Raw {
648                key: rid_descriptor.tail_page_pk,
649                bytes: rid_tail_page,
650            });
651            puts_appends.push(BatchPut::Raw {
652                key: rid_descriptor_pk,
653                bytes: rid_descriptor.to_le_bytes(),
654            });
655        }
656
657        // --- PHASE 5: FINAL ATOMIC COMMIT ---
658        // If the catalog was modified (e.g., new columns were created), stage its write.
659        if catalog_dirty {
660            let catalog = self.catalog.read().unwrap();
661            puts_appends.push(BatchPut::Raw {
662                key: CATALOG_ROOT_PKEY,
663                bytes: catalog.to_bytes(),
664            });
665        }
666
667        // Commit all staged puts (new data chunks, new row_id chunks, new index permutations,
668        // updated descriptor pages, updated root descriptors, and the updated catalog)
669        // in a single atomic operation.
670        if !puts_appends.is_empty() {
671            self.pager.batch_put(&puts_appends)?;
672        }
673        Ok(())
674    }
675    fn lww_rewrite_for_field(
676        &self,
677        catalog: &mut ColumnCatalog,
678        field_id: LogicalFieldId,
679        incoming_ids_map: &FxHashMap<u64, usize>,
680        incoming_data: &ArrayRef,
681        incoming_row_ids: &ArrayRef,
682        puts: &mut Vec<BatchPut>,
683    ) -> Result<FxHashSet<u64>> {
684        use crate::store::descriptor::DescriptorIterator;
685        use crate::store::ingest::ChunkEdit;
686
687        // Fast exit if nothing to rewrite.
688        if incoming_ids_map.is_empty() {
689            return Ok(FxHashSet::default());
690        }
691        let incoming_ids: FxHashSet<u64> = incoming_ids_map.keys().copied().collect();
692
693        // Resolve descriptors for data and row_id columns.
694        let desc_pk_data = match catalog.map.get(&field_id) {
695            Some(pk) => *pk,
696            None => return Ok(FxHashSet::default()),
697        };
698        let rid_fid = rowid_fid(field_id);
699        let desc_pk_rid = match catalog.map.get(&rid_fid) {
700            Some(pk) => *pk,
701            None => return Ok(FxHashSet::default()),
702        };
703
704        // Batch fetch both descriptors.
705        let gets = vec![
706            BatchGet::Raw { key: desc_pk_data },
707            BatchGet::Raw { key: desc_pk_rid },
708        ];
709        let results = self.pager.batch_get(&gets)?;
710        let mut blobs_by_pk = FxHashMap::default();
711        for r in results {
712            if let GetResult::Raw { key, bytes } = r {
713                blobs_by_pk.insert(key, bytes);
714            }
715        }
716
717        let desc_blob_data = blobs_by_pk.remove(&desc_pk_data).ok_or(Error::NotFound)?;
718        let mut descriptor_data = ColumnDescriptor::from_le_bytes(desc_blob_data.as_ref());
719
720        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
721        let mut descriptor_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
722
723        // Collect chunk metadata.
724        let mut metas_data: Vec<ChunkMetadata> = Vec::new();
725        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
726        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_data.head_page_pk) {
727            metas_data.push(m?);
728        }
729        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor_rid.head_page_pk) {
730            metas_rid.push(m?);
731        }
732
733        // Classify incoming rows: delete vs upsert.
734        let rid_in = incoming_row_ids
735            .as_any()
736            .downcast_ref::<UInt64Array>()
737            .ok_or_else(|| Error::Internal("row_id must be u64".into()))?;
738        let mut ids_to_delete = FxHashSet::default();
739        let mut ids_to_upsert = FxHashSet::default();
740        for i in 0..rid_in.len() {
741            let rid = rid_in.value(i);
742            if incoming_data.is_null(i) {
743                ids_to_delete.insert(rid);
744            } else {
745                ids_to_upsert.insert(rid);
746            }
747        }
748
749        // Scan row_id chunks to find hits, bucketed by chunk index.
750        let mut rewritten_ids = FxHashSet::default();
751        let mut hit_up: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
752        let mut hit_del: FxHashMap<usize, Vec<u64>> = FxHashMap::default();
753
754        let n = metas_data.len().min(metas_rid.len());
755        if n > 0 {
756            // Fetch only row_id chunks to locate matches.
757            let mut gets_rid = Vec::with_capacity(n);
758            for rm in metas_rid.iter().take(n) {
759                gets_rid.push(BatchGet::Raw { key: rm.chunk_pk });
760            }
761            let rid_results = self.pager.batch_get(&gets_rid)?;
762            let mut rid_blobs: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
763            for r in rid_results {
764                if let GetResult::Raw { key, bytes } = r {
765                    rid_blobs.insert(key, bytes);
766                }
767            }
768
769            for (i, meta_rid) in metas_rid.iter().enumerate().take(n) {
770                if let Some(rid_blob) = rid_blobs.get(&meta_rid.chunk_pk) {
771                    let rid_arr_any = deserialize_array(rid_blob.clone())?;
772                    let rid_arr = rid_arr_any
773                        .as_any()
774                        .downcast_ref::<UInt64Array>()
775                        .ok_or_else(|| Error::Internal("rid type mismatch".into()))?;
776                    for j in 0..rid_arr.len() {
777                        let rid = rid_arr.value(j);
778                        if incoming_ids.contains(&rid) {
779                            if ids_to_delete.contains(&rid) {
780                                hit_del.entry(i).or_default().push(rid);
781                            } else if ids_to_upsert.contains(&rid) {
782                                hit_up.entry(i).or_default().push(rid);
783                            }
784                            rewritten_ids.insert(rid);
785                        }
786                    }
787                }
788            }
789        }
790
791        if hit_up.is_empty() && hit_del.is_empty() {
792            return Ok(rewritten_ids);
793        }
794
795        // Batch fetch data+rid blobs for all chunks with hits.
796        let mut hit_set = FxHashSet::default();
797        hit_set.extend(hit_up.keys().copied());
798        hit_set.extend(hit_del.keys().copied());
799        let hit_idxs: Vec<usize> = hit_set.into_iter().collect();
800
801        let mut gets = Vec::with_capacity(hit_idxs.len() * 2);
802        for &i in &hit_idxs {
803            gets.push(BatchGet::Raw {
804                key: metas_data[i].chunk_pk,
805            });
806            gets.push(BatchGet::Raw {
807                key: metas_rid[i].chunk_pk,
808            });
809        }
810        let results = self.pager.batch_get(&gets)?;
811        let mut blob_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
812        for r in results {
813            if let GetResult::Raw { key, bytes } = r {
814                blob_map.insert(key, bytes);
815            }
816        }
817
818        // Apply per-chunk edits and stage writebacks.
819        for i in hit_idxs {
820            let old_data_arr =
821                deserialize_array(blob_map.get(&metas_data[i].chunk_pk).unwrap().clone())?;
822            let old_rid_arr_any =
823                deserialize_array(blob_map.get(&metas_rid[i].chunk_pk).unwrap().clone())?;
824            let old_rid_arr = old_rid_arr_any
825                .as_any()
826                .downcast_ref::<UInt64Array>()
827                .unwrap();
828
829            let up_vec = hit_up.remove(&i).unwrap_or_default();
830            let del_vec = hit_del.remove(&i).unwrap_or_default();
831
832            // Centralized LWW edit: builds keep mask and inject tails.
833            let edit = ChunkEdit::from_lww_upsert(
834                old_rid_arr,
835                &up_vec,
836                &del_vec,
837                incoming_data,
838                incoming_row_ids,
839                incoming_ids_map,
840            )?;
841
842            let (new_data_arr, new_rid_arr) =
843                ChunkEdit::apply_edit_to_arrays(&old_data_arr, Some(&old_rid_arr_any), &edit)?;
844
845            // Stage data writeback.
846            let data_bytes = serialize_array(&new_data_arr)?;
847            puts.push(BatchPut::Raw {
848                key: metas_data[i].chunk_pk,
849                bytes: data_bytes,
850            });
851            metas_data[i].row_count = new_data_arr.len() as u64;
852            metas_data[i].serialized_bytes = new_data_arr.get_array_memory_size() as u64;
853
854            // Stage row_id writeback.
855            if let Some(rarr) = new_rid_arr {
856                let rid_bytes = serialize_array(&rarr)?;
857                puts.push(BatchPut::Raw {
858                    key: metas_rid[i].chunk_pk,
859                    bytes: rid_bytes,
860                });
861                metas_rid[i].row_count = rarr.len() as u64;
862                metas_rid[i].serialized_bytes = rarr.get_array_memory_size() as u64;
863            }
864
865            // Refresh permutation if present.
866            if metas_data[i].value_order_perm_pk != 0 {
867                let sort_col = SortColumn {
868                    values: new_data_arr,
869                    options: None,
870                };
871                let idx = lexsort_to_indices(&[sort_col], None)?;
872                let perm_bytes = serialize_array(&idx)?;
873                puts.push(BatchPut::Raw {
874                    key: metas_data[i].value_order_perm_pk,
875                    bytes: perm_bytes,
876                });
877            }
878        }
879
880        // Rewrite descriptor chains/totals for both columns.
881        descriptor_data.rewrite_pages(
882            Arc::clone(&self.pager),
883            desc_pk_data,
884            &mut metas_data,
885            puts,
886        )?;
887        descriptor_rid.rewrite_pages(Arc::clone(&self.pager), desc_pk_rid, &mut metas_rid, puts)?;
888
889        Ok(rewritten_ids)
890    }
891
892    // TODO: Remove pre-condition; accept vector of row ids instead
893    /// Explicit delete by global row **positions** (0-based, ascending,
894    /// unique) -> in-place chunk rewrite.
895    ///
896    /// Precondition: `rows_to_delete` yields strictly increasing positions.
897    /// Explicit delete by global row **positions** (0-based, ascending,
898    /// unique) -> in-place chunk rewrite.
899    pub fn delete_rows<I>(&self, field_id: LogicalFieldId, rows_to_delete: I) -> Result<()>
900    where
901        I: IntoIterator<Item = u64>,
902    {
903        use crate::store::descriptor::DescriptorIterator;
904        use crate::store::ingest::ChunkEdit;
905
906        // Stream and validate ascending, unique positions.
907        let mut del_iter = rows_to_delete.into_iter();
908        let mut cur_del = del_iter.next();
909        let mut last_seen: Option<u64> = None;
910        if let Some(v) = cur_del {
911            last_seen = Some(v);
912        }
913
914        // Lookup descriptors (data and optional row_id).
915        let catalog = self.catalog.read().unwrap();
916        let desc_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
917        let rid_fid = rowid_fid(field_id);
918        let desc_pk_rid = catalog.map.get(&rid_fid).copied();
919
920        // Batch fetch descriptor blobs up front.
921        let mut gets = vec![BatchGet::Raw { key: desc_pk }];
922        if let Some(pk) = desc_pk_rid {
923            gets.push(BatchGet::Raw { key: pk });
924        }
925        let results = self.pager.batch_get(&gets)?;
926        let mut blobs_by_pk = FxHashMap::default();
927        for res in results {
928            if let GetResult::Raw { key, bytes } = res {
929                blobs_by_pk.insert(key, bytes);
930            }
931        }
932
933        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
934        let mut descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
935
936        // Build metas for data column.
937        let mut metas: Vec<ChunkMetadata> = Vec::new();
938        for m in DescriptorIterator::new(self.pager.as_ref(), descriptor.head_page_pk) {
939            metas.push(m?);
940        }
941        if metas.is_empty() {
942            drop(catalog);
943            self.compact_field_bounded(field_id)?;
944            return Ok(());
945        }
946
947        // Optionally mirror metas for row_id column.
948        let mut metas_rid: Vec<ChunkMetadata> = Vec::new();
949        let mut descriptor_rid: Option<ColumnDescriptor> = None;
950        if let Some(pk_rid) = desc_pk_rid
951            && let Some(desc_blob_rid) = blobs_by_pk.remove(&pk_rid)
952        {
953            let d_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
954            for m in DescriptorIterator::new(self.pager.as_ref(), d_rid.head_page_pk) {
955                metas_rid.push(m?);
956            }
957            descriptor_rid = Some(d_rid);
958        }
959
960        let mut puts = Vec::new();
961        let mut cum_rows = 0u64;
962
963        for (i, meta) in metas.iter_mut().enumerate() {
964            let start_u64 = cum_rows;
965            let end_u64 = start_u64 + meta.row_count;
966
967            // Advance deletes into this chunk window [start, end).
968            while let Some(d) = cur_del {
969                if d < start_u64
970                    && let Some(prev) = last_seen
971                {
972                    if d < prev {
973                        return Err(Error::Internal(
974                            "rows_to_delete must be ascending/unique".into(),
975                        ));
976                    }
977
978                    last_seen = Some(d);
979                    cur_del = del_iter.next();
980                } else {
981                    break;
982                }
983            }
984
985            // Collect local delete indices.
986            let rows = meta.row_count as usize;
987            let mut del_local: FxHashSet<usize> = FxHashSet::default();
988            while let Some(d) = cur_del {
989                if d >= end_u64 {
990                    break;
991                }
992                del_local.insert((d - start_u64) as usize);
993                last_seen = Some(d);
994                cur_del = del_iter.next();
995            }
996
997            if del_local.is_empty() {
998                cum_rows = end_u64;
999                continue;
1000            }
1001
1002            // Batch get chunk blobs (data and optional row_id).
1003            let mut chunk_gets = vec![BatchGet::Raw { key: meta.chunk_pk }];
1004            if let Some(rm) = metas_rid.get(i) {
1005                chunk_gets.push(BatchGet::Raw { key: rm.chunk_pk });
1006            }
1007            let chunk_results = self.pager.batch_get(&chunk_gets)?;
1008            let mut chunk_blobs = FxHashMap::default();
1009            for res in chunk_results {
1010                if let GetResult::Raw { key, bytes } = res {
1011                    chunk_blobs.insert(key, bytes);
1012                }
1013            }
1014
1015            let data_blob = chunk_blobs.remove(&meta.chunk_pk).ok_or(Error::NotFound)?;
1016            let data_arr = deserialize_array(data_blob)?;
1017
1018            let rid_arr_any = if let Some(rm) = metas_rid.get(i) {
1019                let rid_blob = chunk_blobs.remove(&rm.chunk_pk).ok_or(Error::NotFound)?;
1020                Some(deserialize_array(rid_blob)?)
1021            } else {
1022                None
1023            };
1024
1025            // *** Wired: build edit via ingest helper.
1026            let edit = ChunkEdit::from_delete_indices(rows, &del_local);
1027
1028            // Apply edit (pure array ops).
1029            let (new_data_arr, new_rid_arr) =
1030                ChunkEdit::apply_edit_to_arrays(&data_arr, rid_arr_any.as_ref(), &edit)?;
1031
1032            // Write back data.
1033            let data_bytes = serialize_array(&new_data_arr)?;
1034            puts.push(BatchPut::Raw {
1035                key: meta.chunk_pk,
1036                bytes: data_bytes,
1037            });
1038            meta.row_count = new_data_arr.len() as u64;
1039            meta.serialized_bytes = new_data_arr.get_array_memory_size() as u64;
1040
1041            // Write back row_ids if present.
1042            if let (Some(_), Some(rids)) = (metas_rid.get_mut(i), new_rid_arr) {
1043                let rm = metas_rid.get_mut(i).unwrap();
1044                let rid_bytes = serialize_array(&rids)?;
1045                puts.push(BatchPut::Raw {
1046                    key: rm.chunk_pk,
1047                    bytes: rid_bytes,
1048                });
1049                rm.row_count = rids.len() as u64;
1050                rm.serialized_bytes = rids.get_array_memory_size() as u64;
1051            }
1052
1053            // Refresh permutation if this chunk has one.
1054            if meta.value_order_perm_pk != 0 {
1055                let sort_column = SortColumn {
1056                    values: new_data_arr,
1057                    options: None,
1058                };
1059                let indices = lexsort_to_indices(&[sort_column], None)?;
1060                let perm_bytes = serialize_array(&indices)?;
1061                puts.push(BatchPut::Raw {
1062                    key: meta.value_order_perm_pk,
1063                    bytes: perm_bytes,
1064                });
1065            }
1066
1067            cum_rows = end_u64;
1068        }
1069
1070        // Rewrite descriptor chains/totals and commit.
1071        descriptor.rewrite_pages(Arc::clone(&self.pager), desc_pk, &mut metas, &mut puts)?;
1072        if let (Some(rid_pk), Some(mut rid_desc)) = (desc_pk_rid, descriptor_rid) {
1073            rid_desc.rewrite_pages(Arc::clone(&self.pager), rid_pk, &mut metas_rid, &mut puts)?;
1074        }
1075        if !puts.is_empty() {
1076            self.pager.batch_put(&puts)?;
1077        }
1078
1079        drop(catalog);
1080        self.compact_field_bounded(field_id)
1081    }
1082
1083    // TODO: Move to descriptor module?
1084    /// Write a descriptor chain from a meta slice. Reuses first page when
1085    /// possible. Frees surplus pages via `frees`.
1086    pub(crate) fn write_descriptor_chain(
1087        &self,
1088        descriptor_pk: PhysicalKey,
1089        descriptor: &mut ColumnDescriptor,
1090        new_metas: &[ChunkMetadata],
1091        puts: &mut Vec<BatchPut>,
1092        frees: &mut Vec<PhysicalKey>,
1093    ) -> Result<()> {
1094        // Collect existing page chain.
1095        let mut old_pages = Vec::new();
1096        let mut pk = descriptor.head_page_pk;
1097        while pk != 0 {
1098            let page_blob = self
1099                .pager
1100                .batch_get(&[BatchGet::Raw { key: pk }])?
1101                .pop()
1102                .and_then(|res| match res {
1103                    GetResult::Raw { bytes, .. } => Some(bytes),
1104                    _ => None,
1105                })
1106                .ok_or(Error::NotFound)?;
1107            let header = DescriptorPageHeader::from_le_bytes(
1108                &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1109            );
1110            old_pages.push(pk);
1111            pk = header.next_page_pk;
1112        }
1113
1114        // Required pages for new metas.
1115        let per = DESCRIPTOR_ENTRIES_PER_PAGE;
1116        let need_pages = if new_metas.is_empty() {
1117            0
1118        } else {
1119            new_metas.len().div_ceil(per)
1120        };
1121        // If empty: free everything and clear counters.
1122        if need_pages == 0 {
1123            frees.extend(old_pages.iter().copied());
1124            descriptor.total_row_count = 0;
1125            descriptor.total_chunk_count = 0;
1126            puts.push(BatchPut::Raw {
1127                key: descriptor_pk,
1128                bytes: descriptor.to_le_bytes(),
1129            });
1130            return Ok(());
1131        }
1132
1133        // Reuse first page; alloc more if needed.
1134        let mut pages = Vec::with_capacity(need_pages);
1135        if !old_pages.is_empty() {
1136            pages.push(old_pages[0]);
1137        } else {
1138            pages.push(self.pager.alloc_many(1)?[0]);
1139            descriptor.head_page_pk = pages[0];
1140        }
1141        if need_pages > pages.len() {
1142            let extra = self.pager.alloc_many(need_pages - pages.len())?;
1143            pages.extend(extra);
1144        }
1145
1146        // Free surplus old pages.
1147        if old_pages.len() > need_pages {
1148            frees.extend(old_pages[need_pages..].iter().copied());
1149        }
1150
1151        // Write page contents.
1152        let mut off = 0usize;
1153        for (i, page_pk) in pages.iter().copied().enumerate() {
1154            let remain = new_metas.len() - off;
1155            let count = remain.min(per);
1156            let next = if i + 1 < pages.len() { pages[i + 1] } else { 0 };
1157            let header = DescriptorPageHeader {
1158                next_page_pk: next,
1159                entry_count: count as u32,
1160                _padding: [0; 4],
1161            };
1162            let mut page_bytes = header.to_le_bytes().to_vec();
1163            for m in &new_metas[off..off + count] {
1164                page_bytes.extend_from_slice(&m.to_le_bytes());
1165            }
1166            puts.push(BatchPut::Raw {
1167                key: page_pk,
1168                bytes: page_bytes,
1169            });
1170            off += count;
1171        }
1172
1173        descriptor.tail_page_pk = *pages.last().unwrap();
1174        descriptor.total_chunk_count = new_metas.len() as u64;
1175        descriptor.total_row_count = new_metas.iter().map(|m| m.row_count).sum();
1176        puts.push(BatchPut::Raw {
1177            key: descriptor_pk,
1178            bytes: descriptor.to_le_bytes(),
1179        });
1180        Ok(())
1181    }
1182
1183    /// Bounded, local field compaction. Merges adjacent small chunks into
1184    /// ~TARGET_CHUNK_BYTES; leaves large chunks intact. Recomputes perms if
1185    /// any source chunk in a run had one. Frees obsolete chunks/pages.
1186    fn compact_field_bounded(&self, field_id: LogicalFieldId) -> Result<()> {
1187        // We may rewrite descriptors; take a write lock.
1188        let mut catalog = self.catalog.write().unwrap();
1189
1190        let desc_pk = match catalog.map.get(&field_id) {
1191            Some(&pk) => pk,
1192            None => return Ok(()),
1193        };
1194        let rid_fid = rowid_fid(field_id);
1195        let desc_pk_rid = match catalog.map.get(&rid_fid) {
1196            Some(&pk) => pk,
1197            None => return Ok(()),
1198        };
1199
1200        // True batching for the two descriptor reads.
1201        let gets = vec![
1202            BatchGet::Raw { key: desc_pk },
1203            BatchGet::Raw { key: desc_pk_rid },
1204        ];
1205        let results = self.pager.batch_get(&gets)?;
1206        let mut blobs_by_pk = FxHashMap::default();
1207        for res in results {
1208            if let GetResult::Raw { key, bytes } = res {
1209                blobs_by_pk.insert(key, bytes);
1210            }
1211        }
1212        let desc_blob = blobs_by_pk.remove(&desc_pk).ok_or(Error::NotFound)?;
1213        let mut desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1214        let desc_blob_rid = blobs_by_pk.remove(&desc_pk_rid).ok_or(Error::NotFound)?;
1215        let mut desc_rid = ColumnDescriptor::from_le_bytes(desc_blob_rid.as_ref());
1216
1217        // Load metas.
1218        let mut metas = Vec::new();
1219        for m in DescriptorIterator::new(self.pager.as_ref(), desc.head_page_pk) {
1220            metas.push(m?);
1221        }
1222        let mut metas_rid = Vec::new();
1223        for m in DescriptorIterator::new(self.pager.as_ref(), desc_rid.head_page_pk) {
1224            metas_rid.push(m?);
1225        }
1226        if metas.is_empty() || metas_rid.is_empty() {
1227            return Ok(());
1228        }
1229
1230        let mut puts: Vec<BatchPut> = Vec::new();
1231        let mut frees: Vec<PhysicalKey> = Vec::new();
1232        let mut new_metas: Vec<ChunkMetadata> = Vec::new();
1233        let mut new_rid_metas: Vec<ChunkMetadata> = Vec::new();
1234
1235        let mut i = 0usize;
1236        while i < metas.len() {
1237            let sz = metas[i].serialized_bytes as usize;
1238            // Keep large chunks as-is.
1239            if sz >= MIN_CHUNK_BYTES {
1240                new_metas.push(metas[i]);
1241                new_rid_metas.push(metas_rid[i]);
1242                i += 1;
1243                continue;
1244            }
1245
1246            // Build a small run [i, j) capped by MAX_MERGE_RUN_BYTES.
1247            let mut j = i;
1248            let mut run_bytes = 0usize;
1249            while j < metas.len() {
1250                let b = metas[j].serialized_bytes as usize;
1251                if b >= TARGET_CHUNK_BYTES {
1252                    break;
1253                }
1254                if run_bytes + b > MAX_MERGE_RUN_BYTES {
1255                    break;
1256                }
1257                run_bytes += b;
1258                j += 1;
1259            }
1260            if j == i + 1 && sz >= MIN_CHUNK_BYTES {
1261                new_metas.push(metas[i]);
1262                new_rid_metas.push(metas_rid[i]);
1263                i += 1;
1264                continue;
1265            }
1266
1267            // Fetch and concatenate the run's data and row_ids.
1268            let mut gets = Vec::with_capacity((j - i) * 2);
1269            for k in i..j {
1270                gets.push(BatchGet::Raw {
1271                    key: metas[k].chunk_pk,
1272                });
1273                gets.push(BatchGet::Raw {
1274                    key: metas_rid[k].chunk_pk,
1275                });
1276            }
1277            let results = self.pager.batch_get(&gets)?;
1278            let mut by_pk: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
1279            for r in results {
1280                match r {
1281                    GetResult::Raw { key, bytes } => {
1282                        by_pk.insert(key, bytes);
1283                    }
1284                    _ => return Err(Error::NotFound),
1285                }
1286            }
1287            let mut data_parts = Vec::with_capacity(j - i);
1288            let mut rid_parts = Vec::with_capacity(j - i);
1289            for k in i..j {
1290                let db = by_pk.get(&metas[k].chunk_pk).ok_or(Error::NotFound)?;
1291                data_parts.push(deserialize_array(db.clone())?);
1292                let rb = by_pk.get(&metas_rid[k].chunk_pk).ok_or(Error::NotFound)?;
1293                rid_parts.push(deserialize_array(rb.clone())?);
1294            }
1295            let merged_data = concat_many(data_parts.iter().collect())?;
1296            let merged_rid_any = concat_many(rid_parts.iter().collect())?;
1297
1298            // Split merged run into ~target-sized chunks.
1299            let slices = split_to_target_bytes(
1300                &merged_data,
1301                TARGET_CHUNK_BYTES,
1302                self.cfg.varwidth_fallback_rows_per_slice,
1303            );
1304            let mut rid_off = 0usize;
1305            let need_perms = metas[i..j].iter().any(|m| m.value_order_perm_pk != 0);
1306
1307            for s in slices {
1308                let rows = s.len();
1309                // Slice rid to match s (avoid double Arc).
1310                let rid_ref: ArrayRef = merged_rid_any.slice(rid_off, rows);
1311                let rid_norm = zero_offset(&rid_ref);
1312                let rid_pk = self.pager.alloc_many(1)?[0];
1313                let rid_bytes = serialize_array(rid_norm.as_ref())?;
1314
1315                let data_pk = self.pager.alloc_many(1)?[0];
1316                let s_norm = zero_offset(&s);
1317                let data_bytes = serialize_array(s_norm.as_ref())?;
1318                puts.push(BatchPut::Raw {
1319                    key: data_pk,
1320                    bytes: data_bytes,
1321                });
1322                puts.push(BatchPut::Raw {
1323                    key: rid_pk,
1324                    bytes: rid_bytes,
1325                });
1326                let mut meta = ChunkMetadata {
1327                    chunk_pk: data_pk,
1328                    row_count: rows as u64,
1329                    serialized_bytes: s_norm.get_array_memory_size() as u64,
1330                    max_val_u64: u64::MAX,
1331                    ..Default::default()
1332                };
1333                // If any source chunk had a perm, recompute for this slice.
1334                if need_perms {
1335                    let sort_col = SortColumn {
1336                        values: s.clone(),
1337                        options: None,
1338                    };
1339                    let idx = lexsort_to_indices(&[sort_col], None)?;
1340                    let perm_bytes = serialize_array(&idx)?;
1341                    let perm_pk = self.pager.alloc_many(1)?[0];
1342                    puts.push(BatchPut::Raw {
1343                        key: perm_pk,
1344                        bytes: perm_bytes,
1345                    });
1346                    meta.value_order_perm_pk = perm_pk;
1347                }
1348
1349                // Build presence index for rids and min/max
1350                let rid_any = rid_norm.clone();
1351                let rids = rid_any
1352                    .as_any()
1353                    .downcast_ref::<UInt64Array>()
1354                    .ok_or_else(|| Error::Internal("rid downcast".into()))?;
1355                let mut min = u64::MAX;
1356                let mut max = 0u64;
1357                let mut sorted_rids = true;
1358                let mut last_v = 0u64;
1359                for ii in 0..rids.len() {
1360                    let v = rids.value(ii);
1361                    if ii == 0 {
1362                        last_v = v;
1363                    } else if v < last_v {
1364                        sorted_rids = false;
1365                    } else {
1366                        last_v = v;
1367                    }
1368                    if v < min {
1369                        min = v;
1370                    }
1371                    if v > max {
1372                        max = v;
1373                    }
1374                }
1375                let mut rid_perm_pk = 0u64;
1376                if !sorted_rids {
1377                    let rid_sort_col = SortColumn {
1378                        values: rid_any,
1379                        options: None,
1380                    };
1381                    let rid_idx = lexsort_to_indices(&[rid_sort_col], None)?;
1382                    let rid_perm_bytes = serialize_array(&rid_idx)?;
1383                    rid_perm_pk = self.pager.alloc_many(1)?[0];
1384                    puts.push(BatchPut::Raw {
1385                        key: rid_perm_pk,
1386                        bytes: rid_perm_bytes,
1387                    });
1388                }
1389                let rid_meta = ChunkMetadata {
1390                    chunk_pk: rid_pk,
1391                    value_order_perm_pk: rid_perm_pk,
1392                    row_count: rows as u64,
1393                    serialized_bytes: rid_norm.get_array_memory_size() as u64,
1394                    min_val_u64: if rows > 0 { min } else { 0 },
1395                    max_val_u64: if rows > 0 { max } else { 0 },
1396                };
1397                new_metas.push(meta);
1398                new_rid_metas.push(rid_meta);
1399                rid_off += rows;
1400            }
1401
1402            // Free all old data/rid/perms in the merged run.
1403            for k in i..j {
1404                frees.push(metas[k].chunk_pk);
1405                if metas[k].value_order_perm_pk != 0 {
1406                    frees.push(metas[k].value_order_perm_pk);
1407                }
1408                frees.push(metas_rid[k].chunk_pk);
1409                if metas_rid[k].value_order_perm_pk != 0 {
1410                    frees.push(metas_rid[k].value_order_perm_pk);
1411                }
1412            }
1413
1414            i = j;
1415        }
1416
1417        // If everything was deleted, drop the field entirely.
1418        if new_metas.is_empty() {
1419            // Drop descriptors and catalog mapping.
1420            self.write_descriptor_chain(desc_pk, &mut desc, &[], &mut puts, &mut frees)?;
1421            self.write_descriptor_chain(desc_pk_rid, &mut desc_rid, &[], &mut puts, &mut frees)?;
1422            catalog.map.remove(&field_id);
1423            catalog.map.remove(&rid_fid);
1424            puts.push(BatchPut::Raw {
1425                key: CATALOG_ROOT_PKEY,
1426                bytes: catalog.to_bytes(),
1427            });
1428            if !puts.is_empty() {
1429                self.pager.batch_put(&puts)?;
1430            }
1431            if !frees.is_empty() {
1432                self.pager.free_many(&frees)?;
1433            }
1434            return Ok(());
1435        }
1436
1437        // Rewrite descriptor chains to match the new meta lists.
1438        self.write_descriptor_chain(desc_pk, &mut desc, &new_metas, &mut puts, &mut frees)?;
1439        self.write_descriptor_chain(
1440            desc_pk_rid,
1441            &mut desc_rid,
1442            &new_rid_metas,
1443            &mut puts,
1444            &mut frees,
1445        )?;
1446        // Persist new/updated blobs and free the old ones.
1447        if !puts.is_empty() {
1448            self.pager.batch_put(&puts)?;
1449        }
1450        if !frees.is_empty() {
1451            self.pager.free_many(&frees)?;
1452        }
1453
1454        Ok(())
1455    }
1456
1457    /// (Internal) Helper for batch appends. Appends metadata to the current
1458    /// in-memory tail page, creating a new one if necessary.
1459    fn append_meta_in_loop(
1460        &self,
1461        descriptor: &mut ColumnDescriptor,
1462        tail_page_bytes: &mut Vec<u8>,
1463        meta: ChunkMetadata,
1464        puts: &mut Vec<BatchPut>,
1465    ) -> Result<()> {
1466        let mut header = DescriptorPageHeader::from_le_bytes(
1467            &tail_page_bytes[..DescriptorPageHeader::DISK_SIZE],
1468        );
1469        if tail_page_bytes.len() + ChunkMetadata::DISK_SIZE <= TARGET_DESCRIPTOR_PAGE_BYTES
1470            && (header.entry_count as usize) < DESCRIPTOR_ENTRIES_PER_PAGE
1471        {
1472            // Case 1: There's room in the current tail page.
1473            tail_page_bytes.extend_from_slice(&meta.to_le_bytes());
1474            header.entry_count += 1;
1475            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1476                .copy_from_slice(&header.to_le_bytes());
1477        } else {
1478            // Case 2: The tail page is full. Write it out and start a new one.
1479            let new_tail_pk = self.pager.alloc_many(1)?[0];
1480            header.next_page_pk = new_tail_pk;
1481            tail_page_bytes[..DescriptorPageHeader::DISK_SIZE]
1482                .copy_from_slice(&header.to_le_bytes());
1483            // --- PERFORMANCE FIX: Move the full page's bytes instead of
1484            // cloning ---
1485            let full_page_to_write = std::mem::take(tail_page_bytes);
1486            puts.push(BatchPut::Raw {
1487                key: descriptor.tail_page_pk,
1488                bytes: full_page_to_write,
1489            });
1490            // Create the new tail page.
1491            let new_header = DescriptorPageHeader {
1492                next_page_pk: 0,
1493                entry_count: 1,
1494                _padding: [0; 4],
1495            };
1496            let mut new_page_bytes = new_header.to_le_bytes().to_vec();
1497            new_page_bytes.extend_from_slice(&meta.to_le_bytes());
1498            // Update our live state to point to the new tail.
1499            descriptor.tail_page_pk = new_tail_pk;
1500            *tail_page_bytes = new_page_bytes;
1501        }
1502
1503        descriptor.total_row_count += meta.row_count;
1504        descriptor.total_chunk_count += 1;
1505        Ok(())
1506    }
1507
1508    /// Verifies the integrity of the column store's metadata.
1509    ///
1510    /// This check is useful for tests and debugging. It verifies:
1511    /// 1. The catalog can be read.
1512    /// 2. All descriptor chains are walkable.
1513    /// 3. The row and chunk counts in the descriptors match the sum of the
1514    ///    chunk metadata.
1515    ///
1516    /// Returns `Ok(())` if consistent, otherwise returns an `Error`.
1517    pub fn verify_integrity(&self) -> Result<()> {
1518        let catalog = self.catalog.read().unwrap();
1519        for (&field_id, &descriptor_pk) in &catalog.map {
1520            let desc_blob = self
1521                .pager
1522                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1523                .pop()
1524                .and_then(|r| match r {
1525                    GetResult::Raw { bytes, .. } => Some(bytes),
1526                    _ => None,
1527                })
1528                .ok_or_else(|| {
1529                    Error::Internal(format!(
1530                        "Catalog points to missing descriptor pk={}",
1531                        descriptor_pk
1532                    ))
1533                })?;
1534            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1535            if descriptor.field_id != field_id {
1536                return Err(Error::Internal(format!(
1537                    "Descriptor at pk={} has wrong field_id: expected {:?}, \
1538                     got {:?}",
1539                    descriptor_pk, field_id, descriptor.field_id
1540                )));
1541            }
1542
1543            let mut actual_rows = 0;
1544            let mut actual_chunks = 0;
1545            let mut current_page_pk = descriptor.head_page_pk;
1546            while current_page_pk != 0 {
1547                let page_blob = self
1548                    .pager
1549                    .batch_get(&[BatchGet::Raw {
1550                        key: current_page_pk,
1551                    }])?
1552                    .pop()
1553                    .and_then(|r| match r {
1554                        GetResult::Raw { bytes, .. } => Some(bytes),
1555                        _ => None,
1556                    })
1557                    .ok_or_else(|| {
1558                        Error::Internal(format!(
1559                            "Descriptor page chain broken at pk={}",
1560                            current_page_pk
1561                        ))
1562                    })?;
1563                let header = DescriptorPageHeader::from_le_bytes(
1564                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1565                );
1566                for i in 0..(header.entry_count as usize) {
1567                    let off = DescriptorPageHeader::DISK_SIZE + i * ChunkMetadata::DISK_SIZE;
1568                    let end = off + ChunkMetadata::DISK_SIZE;
1569                    let meta = ChunkMetadata::from_le_bytes(&page_blob.as_ref()[off..end]);
1570                    actual_rows += meta.row_count;
1571                    actual_chunks += 1;
1572                }
1573                current_page_pk = header.next_page_pk;
1574            }
1575
1576            if descriptor.total_row_count != actual_rows {
1577                return Err(Error::Internal(format!(
1578                    "Row count mismatch for field {:?}: descriptor says {}, \
1579                     actual is {}",
1580                    field_id, descriptor.total_row_count, actual_rows
1581                )));
1582            }
1583            if descriptor.total_chunk_count != actual_chunks {
1584                return Err(Error::Internal(format!(
1585                    "Chunk count mismatch for field {:?}: descriptor says {}, \
1586                     actual is {}",
1587                    field_id, descriptor.total_chunk_count, actual_chunks
1588                )));
1589            }
1590        }
1591        Ok(())
1592    }
1593
1594    /// Gathers detailed statistics about the storage layout.
1595    ///
1596    /// This method is designed for low-level analysis and debugging, allowing
1597    /// you to check for under- or over-utilization of descriptor pages.
1598    pub fn get_layout_stats(&self) -> Result<Vec<ColumnLayoutStats>> {
1599        let catalog = self.catalog.read().unwrap();
1600        let mut all_stats = Vec::new();
1601
1602        for (&field_id, &descriptor_pk) in &catalog.map {
1603            let desc_blob = self
1604                .pager
1605                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
1606                .pop()
1607                .and_then(|r| match r {
1608                    GetResult::Raw { bytes, .. } => Some(bytes),
1609                    _ => None,
1610                })
1611                .ok_or(Error::NotFound)?;
1612            let descriptor = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
1613
1614            let mut page_stats = Vec::new();
1615            let mut current_page_pk = descriptor.head_page_pk;
1616            while current_page_pk != 0 {
1617                let page_blob = self
1618                    .pager
1619                    .batch_get(&[BatchGet::Raw {
1620                        key: current_page_pk,
1621                    }])?
1622                    .pop()
1623                    .and_then(|r| match r {
1624                        GetResult::Raw { bytes, .. } => Some(bytes),
1625                        _ => None,
1626                    })
1627                    .ok_or(Error::NotFound)?;
1628                let header = DescriptorPageHeader::from_le_bytes(
1629                    &page_blob.as_ref()[..DescriptorPageHeader::DISK_SIZE],
1630                );
1631                page_stats.push(DescriptorPageStats {
1632                    page_pk: current_page_pk,
1633                    entry_count: header.entry_count,
1634                    page_size_bytes: page_blob.as_ref().len(),
1635                });
1636                current_page_pk = header.next_page_pk;
1637            }
1638
1639            all_stats.push(ColumnLayoutStats {
1640                field_id,
1641                total_rows: descriptor.total_row_count,
1642                total_chunks: descriptor.total_chunk_count,
1643                pages: page_stats,
1644            });
1645        }
1646        Ok(all_stats)
1647    }
1648}