Skip to main content

ailake_query/
writer.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4
5use ailake_catalog::{
6    encode_centroid_b64, make_data_file_entry, make_data_file_entry_indexing,
7    make_multi_column_data_file_entry, new_snapshot_id, CatalogProvider, DataFileEntry,
8    ExtraVectorIndex, IcebergSchemaUpdate, IndexStatus, NewSnapshot, SnapshotId, SnapshotOperation,
9    TableIdent, TableProperties, VectorIndexInfo,
10};
11use ailake_core::{AilakeError, AilakeResult, EmbeddingModelInfo, VectorStoragePolicy};
12use ailake_file::{AilakeFileReader, AilakeFileWriter, IndexType, VectorColumnBatch};
13use ailake_index::{IvfPqCodebook, IvfPqConfig};
14use ailake_store::Store;
15use ailake_vec::compute_centroid_and_radius;
16use arrow_array::Array;
17use arrow_array::RecordBatch;
18use arrow_schema::SchemaRef;
19use bytes::Bytes;
20use serde_json;
21use tracing::{error, info, warn};
22
23/// Apply partition transforms and return the final stored value.
24/// For multi-column specs, raw must be \x1f-separated; each part is transformed
25/// independently and the result is rejoined with \x1f.
26/// For single-column (partition_by path), raw is returned as-is (identity only).
27fn apply_partition_transforms(policy: &VectorStoragePolicy, raw: Option<&str>) -> Option<String> {
28    let raw = raw?;
29    if policy.partition_fields.is_empty() {
30        return Some(raw.to_string());
31    }
32    let parts: Vec<&str> = raw.split('\x1f').collect();
33    let transformed: Vec<String> = policy
34        .partition_fields
35        .iter()
36        .enumerate()
37        .map(|(i, pf)| {
38            let v = parts.get(i).copied().unwrap_or("");
39            pf.apply(v)
40        })
41        .collect();
42    Some(transformed.join("\x1f"))
43}
44
45/// One vector column for a multi-column write batch.
46pub struct MultiVectorBatch<'a> {
47    pub policy: VectorStoragePolicy,
48    pub embeddings: &'a [Vec<f32>],
49}
50
51pub struct TableWriter {
52    catalog: Arc<dyn CatalogProvider>,
53    store: Arc<dyn Store>,
54    policy: VectorStoragePolicy,
55    table: TableIdent,
56    part_counter: Arc<AtomicU32>,
57    pending_files: Vec<DataFileEntry>,
58    parent_snapshot_id: Option<SnapshotId>,
59    /// Arrow schema captured from the first write_batch call; used to populate
60    /// Iceberg schema fields and schema.name-mapping.default on commit.
61    captured_schema: Option<SchemaRef>,
62    /// Extra vector column policies from write_batch_multi (columns beyond primary).
63    extra_vec_policies: Vec<VectorStoragePolicy>,
64    /// IVF-PQ codebook trained on the first shard and reused for all subsequent shards.
65    /// Ensures cross-shard ADC distances are comparable — no reranking needed.
66    cached_ivf_codebook: Option<Arc<IvfPqCodebook>>,
67    /// Shared codebook cell for deferred IVF-PQ builds. Cloneable Arc so each
68    /// background task can access it; OnceCell guarantees training runs exactly once.
69    deferred_ivf_codebook: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
70    /// When set, BM25 IDF stats are accumulated from this Parquet column on each
71    /// write_batch call and persisted to `metadata/ailake_bm25_stats.bin`.
72    /// Enables hybrid vector+BM25 search via `SearchConfig::hybrid`.
73    bm25_text_column: Option<String>,
74    /// Per-file Bloom filters built during write_batch when bm25_text_column is set.
75    /// Flushed to NewSnapshot::bloom_filters on commit (Phase F Puffin stats).
76    pending_blooms: Vec<(String, Vec<u8>)>,
77}
78
79impl TableWriter {
80    pub fn new(
81        catalog: Arc<dyn CatalogProvider>,
82        store: Arc<dyn Store>,
83        policy: VectorStoragePolicy,
84        table: TableIdent,
85    ) -> Self {
86        Self {
87            catalog,
88            store,
89            policy,
90            table,
91            part_counter: Arc::new(AtomicU32::new(0)),
92            pending_files: Vec::new(),
93            parent_snapshot_id: None,
94            captured_schema: None,
95            extra_vec_policies: Vec::new(),
96            cached_ivf_codebook: None,
97            deferred_ivf_codebook: Arc::new(tokio::sync::OnceCell::new()),
98            bm25_text_column: None,
99            pending_blooms: Vec::new(),
100        }
101    }
102
103    /// Enable BM25 hybrid search by accumulating IDF stats from `column` on each write.
104    ///
105    /// After calling this, every `write_batch*` call will tokenize the specified column,
106    /// update the corpus IDF stats, and persist them to `metadata/ailake_bm25_stats.bin`.
107    /// This file is then loaded automatically by `SearchConfig::hybrid` at query time.
108    ///
109    /// Typical usage: `TableWriter::new(...).with_bm25("chunk_text")`.
110    pub fn with_bm25(mut self, text_column: impl Into<String>) -> Self {
111        self.bm25_text_column = Some(text_column.into());
112        self
113    }
114
115    pub fn with_parent_snapshot(mut self, id: SnapshotId) -> Self {
116        self.parent_snapshot_id = Some(id);
117        self
118    }
119
120    /// Write batch as Parquet-only immediately, build HNSW in background.
121    ///
122    /// Returns after the Parquet file is persisted (~LanceDB write speed).
123    /// A tokio task runs concurrently to build the HNSW index, rewrite the
124    /// file with the AILK section, and update the catalog entry.
125    ///
126    /// During the build window, `SearchSession` serves this shard via flat scan
127    /// (brute-force, exact) instead of HNSW. The transition is automatic once
128    /// the background task commits the updated manifest entry.
129    pub async fn write_batch_deferred(
130        &mut self,
131        batch: &RecordBatch,
132        embeddings: &[Vec<f32>],
133    ) -> AilakeResult<()> {
134        self.validate_embedding_dim(embeddings)?;
135        if self.captured_schema.is_none() {
136            self.captured_schema = Some(batch.schema());
137        }
138        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
139        let file_path = format!("data/part-{:05}.parquet", part_num);
140
141        // Fast path: persist Parquet without HNSW.
142        let file_writer = AilakeFileWriter::new(self.policy.clone());
143        let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
144        let file_size = parquet_bytes.len() as u64;
145        self.store.put(&file_path, parquet_bytes).await?;
146
147        // Centroid needed immediately for geometric pruning during the build window.
148        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
149        let mut entry = make_data_file_entry_indexing(
150            &file_path,
151            embeddings.len() as u64,
152            file_size,
153            &centroid,
154            &self.policy.column_name,
155            self.policy.dim,
156        );
157        entry.embedding_model = self
158            .policy
159            .embedding_model
160            .as_ref()
161            .map(|m| m.to_property_value());
162        entry.partition_value =
163            apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
164        self.pending_files.push(entry);
165
166        // Spawn background HNSW build (fire-and-forget; errors are logged).
167        let store = self.store.clone();
168        let catalog = self.catalog.clone();
169        let policy = self.policy.clone();
170        let table = self.table.clone();
171        let fp = file_path.clone();
172        tokio::spawn(async move {
173            if let Err(e) = build_and_patch_index(store, catalog, policy, table, fp).await {
174                error!(
175                    "ailake: deferred HNSW build failed — file is indexed as Parquet-only until \
176                     next compaction rebuilds the index: {}",
177                    e
178                );
179            }
180        });
181
182        // Update BM25 IDF stats + build Bloom filter (Phase F) for the new file.
183        if self.bm25_text_column.is_some() {
184            self.update_bm25_stats_from_batch(batch).await?;
185            self.build_bloom_for_file(batch, &file_path);
186        }
187
188        Ok(())
189    }
190
191    /// Write batch as Parquet-only immediately; train IVF-PQ index in background.
192    ///
193    /// The first shard trains the shared codebook (k-means). All subsequent shards
194    /// reuse it via `OnceCell` — build is O(n) assign+encode, not O(n×k) k-means.
195    /// Returns after Parquet is persisted. Index transitions Indexing → Ready async.
196    pub async fn write_batch_ivf_pq_deferred(
197        &mut self,
198        batch: &RecordBatch,
199        embeddings: &[Vec<f32>],
200        ivf_config: IvfPqConfig,
201    ) -> AilakeResult<()> {
202        if self.captured_schema.is_none() {
203            self.captured_schema = Some(batch.schema());
204        }
205        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
206        let file_path = format!("data/part-{:05}.parquet", part_num);
207
208        let file_writer = AilakeFileWriter::new(self.policy.clone());
209        let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
210        let file_size = parquet_bytes.len() as u64;
211        self.store.put(&file_path, parquet_bytes).await?;
212
213        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
214        let mut entry = make_data_file_entry_indexing(
215            &file_path,
216            embeddings.len() as u64,
217            file_size,
218            &centroid,
219            &self.policy.column_name,
220            self.policy.dim,
221        );
222        entry.embedding_model = self
223            .policy
224            .embedding_model
225            .as_ref()
226            .map(|m| m.to_property_value());
227        entry.partition_value =
228            apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
229        self.pending_files.push(entry);
230
231        let store = self.store.clone();
232        let catalog = self.catalog.clone();
233        let policy = self.policy.clone();
234        let table = self.table.clone();
235        let fp = file_path.clone();
236        let codebook_cell = self.deferred_ivf_codebook.clone();
237        tokio::spawn(async move {
238            if let Err(e) = build_ivf_pq_and_patch_index(
239                store,
240                catalog,
241                policy,
242                table,
243                fp,
244                ivf_config,
245                codebook_cell,
246            )
247            .await
248            {
249                error!(
250                    "ailake: deferred IVF-PQ build failed — file is indexed as Parquet-only until \
251                     next compaction rebuilds the index: {}",
252                    e
253                );
254            }
255        });
256
257        Ok(())
258    }
259
260    /// Idempotent variant of `write_batch`.
261    ///
262    /// Before any I/O, checks if `batch_id` already appears in the current
263    /// snapshot. If it does, this is a no-op — safe for Airflow/Kestra retries.
264    /// If not found, writes the batch and tags the `DataFileEntry` with `batch_id`
265    /// so future retries can detect it.
266    ///
267    /// `commit()` is likewise a no-op when `pending_files` is empty.
268    pub async fn write_batch_idempotent(
269        &mut self,
270        batch: &RecordBatch,
271        embeddings: &[Vec<f32>],
272        batch_id: &str,
273    ) -> AilakeResult<()> {
274        let existing = self.catalog.list_files(&self.table, None).await?;
275        if existing
276            .iter()
277            .any(|f| f.batch_id.as_deref() == Some(batch_id))
278        {
279            return Ok(());
280        }
281        self.write_batch_with_id(batch, embeddings, Some(batch_id.to_string()))
282            .await
283    }
284
285    /// Write a batch to a new AI-Lake file and stage it for commit.
286    /// Validates that provided embeddings match the table's configured dimension.
287    /// Returns `ModelMismatch` error when dim differs — prevents silently mixing
288    /// incompatible vectors (same error type used across write paths for consistency).
289    fn validate_embedding_dim(&self, embeddings: &[Vec<f32>]) -> AilakeResult<()> {
290        if let Some(first) = embeddings.first() {
291            let actual = first.len() as u32;
292            if actual != self.policy.dim {
293                let table_model = self
294                    .policy
295                    .embedding_model
296                    .as_ref()
297                    .map(|m| m.to_property_value())
298                    .unwrap_or_else(|| format!("dim={}", self.policy.dim));
299                return Err(AilakeError::ModelMismatch {
300                    table_model,
301                    table_dim: self.policy.dim,
302                    batch_model: format!("dim={}", actual),
303                    batch_dim: actual,
304                });
305            }
306        }
307        Ok(())
308    }
309
310    pub async fn write_batch(
311        &mut self,
312        batch: &RecordBatch,
313        embeddings: &[Vec<f32>],
314    ) -> AilakeResult<()> {
315        self.write_batch_with_id(batch, embeddings, None).await
316    }
317
318    async fn write_batch_with_id(
319        &mut self,
320        batch: &RecordBatch,
321        embeddings: &[Vec<f32>],
322        batch_id: Option<String>,
323    ) -> AilakeResult<()> {
324        self.validate_embedding_dim(embeddings)?;
325        if self.captured_schema.is_none() {
326            self.captured_schema = Some(batch.schema());
327        }
328        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
329        let file_path = format!("data/part-{:05}.parquet", part_num);
330
331        // Write AI-Lake file
332        let file_writer = AilakeFileWriter::new(self.policy.clone());
333        let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
334        let file_size = file_bytes.len() as u64;
335
336        // Store the file
337        self.store.put(&file_path, file_bytes.clone()).await?;
338
339        // Compute centroid for catalog entry
340        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
341
342        // Read back the HNSW offsets from the written file
343        let reader = ailake_file::AilakeFileReader::new(
344            file_bytes,
345            &self.policy.column_name,
346            self.policy.dim,
347        );
348        let header = reader.read_header()?;
349        let ailk_start = reader.ailk_offset()?;
350        let hnsw_abs_offset = ailk_start + header.hnsw_offset;
351        let hnsw_len = header.hnsw_len;
352
353        let mut entry = make_data_file_entry(
354            &file_path,
355            embeddings.len() as u64,
356            file_size,
357            &centroid,
358            VectorIndexInfo {
359                column: &self.policy.column_name,
360                dim: self.policy.dim,
361                hnsw_offset: hnsw_abs_offset,
362                hnsw_len,
363            },
364        );
365        entry.batch_id = batch_id;
366        entry.embedding_model = self
367            .policy
368            .embedding_model
369            .as_ref()
370            .map(|m| m.to_property_value());
371        entry.partition_value =
372            apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
373        self.pending_files.push(entry);
374
375        // Update BM25 IDF stats + build Bloom filter (Phase F).
376        if self.bm25_text_column.is_some() {
377            self.update_bm25_stats_from_batch(batch).await?;
378            self.build_bloom_for_file(batch, &file_path);
379        }
380        Ok(())
381    }
382
383    /// Write batch, auto-selecting the index based on detected hardware.
384    ///
385    /// Picks IVF-PQ when a CUDA GPU or ≥8 CPU cores are present AND the batch
386    /// has ≥5 000 vectors. Falls back to HNSW for weaker / local hardware.
387    /// Uses `IvfPqConfig::for_dataset` to scale nlist with dataset size.
388    pub async fn write_batch_auto(
389        &mut self,
390        batch: &RecordBatch,
391        embeddings: &[Vec<f32>],
392    ) -> AilakeResult<()> {
393        let profile = ailake_index::HardwareProfile::detect();
394        if profile.recommend_ivf_pq(embeddings.len()) {
395            let mut ivf_config =
396                ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
397            if self.policy.ivf_residual {
398                ivf_config = ivf_config.with_residual();
399            }
400            self.write_batch_ivf_pq(batch, embeddings, ivf_config).await
401        } else {
402            self.write_batch(batch, embeddings).await
403        }
404    }
405
406    /// Write batch, auto-selecting the index based on detected hardware — deferred variant.
407    ///
408    /// Same hardware detection as `write_batch_auto`: picks IVF-PQ when a CUDA GPU or
409    /// ≥8 CPU cores are present AND the batch has ≥5 000 vectors; falls back to HNSW.
410    ///
411    /// Unlike `write_batch_auto`, the index is built in a background tokio task:
412    /// - Parquet is persisted immediately (~200k vec/s, same as write_parquet_only).
413    /// - HNSW or IVF-PQ index built asynchronously; shard served via flat scan meanwhile.
414    ///
415    /// Use this when ingest throughput matters more than immediate searchability.
416    pub async fn write_batch_auto_deferred(
417        &mut self,
418        batch: &RecordBatch,
419        embeddings: &[Vec<f32>],
420    ) -> AilakeResult<()> {
421        let profile = ailake_index::HardwareProfile::detect();
422        if profile.recommend_ivf_pq(embeddings.len()) {
423            let mut ivf_config =
424                ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
425            if self.policy.ivf_residual {
426                ivf_config = ivf_config.with_residual();
427            }
428            self.write_batch_ivf_pq_deferred(batch, embeddings, ivf_config)
429                .await
430        } else {
431            self.write_batch_deferred(batch, embeddings).await
432        }
433    }
434
435    /// Write batch with IVF-PQ index built synchronously (no background task).
436    ///
437    /// Smaller index than HNSW; better for S3 sequential-scan workloads.
438    pub async fn write_batch_ivf_pq(
439        &mut self,
440        batch: &RecordBatch,
441        embeddings: &[Vec<f32>],
442        ivf_config: IvfPqConfig,
443    ) -> AilakeResult<()> {
444        if self.captured_schema.is_none() {
445            self.captured_schema = Some(batch.schema());
446        }
447        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
448        let file_path = format!("data/part-{:05}.parquet", part_num);
449
450        // Train codebook once on the first shard; all subsequent shards reuse it.
451        // This makes cross-shard ADC distances comparable, eliminating the need
452        // for exact reranking during multi-shard search.
453        if self.cached_ivf_codebook.is_none() {
454            let codebook = tokio::task::spawn_blocking({
455                let embeddings = embeddings.to_vec();
456                let metric = self.policy.metric;
457                let config = ivf_config.clone();
458                move || ailake_index::IvfPqIndex::train_codebook(&embeddings, metric, &config)
459            })
460            .await
461            .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
462            self.cached_ivf_codebook = Some(Arc::new(codebook));
463        }
464        // SAFETY: set to Some in the block above (either pre-existing or just trained).
465        let codebook = self
466            .cached_ivf_codebook
467            .as_ref()
468            .expect("IVF-PQ codebook must be Some after training block")
469            .clone();
470
471        let file_writer = AilakeFileWriter::new(self.policy.clone())
472            .with_index_type(IndexType::IvfPq(ivf_config))
473            .with_shared_ivf_codebook(codebook);
474        let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
475        let file_size = file_bytes.len() as u64;
476
477        self.store.put(&file_path, file_bytes.clone()).await?;
478
479        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
480
481        let reader = ailake_file::AilakeFileReader::new(
482            file_bytes,
483            &self.policy.column_name,
484            self.policy.dim,
485        );
486        let header = reader.read_header()?;
487        let ailk_start = reader.ailk_offset()?;
488        let index_abs_offset = ailk_start + header.hnsw_offset;
489        let index_len = header.hnsw_len;
490
491        let mut entry = make_data_file_entry(
492            &file_path,
493            embeddings.len() as u64,
494            file_size,
495            &centroid,
496            VectorIndexInfo {
497                column: &self.policy.column_name,
498                dim: self.policy.dim,
499                hnsw_offset: index_abs_offset,
500                hnsw_len: index_len,
501            },
502        );
503        entry.embedding_model = self
504            .policy
505            .embedding_model
506            .as_ref()
507            .map(|m| m.to_property_value());
508        entry.partition_value =
509            apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
510        self.pending_files.push(entry);
511        Ok(())
512    }
513
514    /// Write a batch with multiple vector columns into a single AI-Lake file.
515    ///
516    /// The first entry in `columns` is treated as the primary column (used for
517    /// geometric pruning). Additional columns each get their own HNSW section.
518    pub async fn write_batch_multi(
519        &mut self,
520        batch: &RecordBatch,
521        columns: &[MultiVectorBatch<'_>],
522    ) -> AilakeResult<()> {
523        use ailake_core::AilakeError;
524        if self.captured_schema.is_none() {
525            self.captured_schema = Some(batch.schema());
526        }
527        if self.extra_vec_policies.is_empty() && columns.len() > 1 {
528            self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
529        }
530
531        if columns.is_empty() {
532            return Err(AilakeError::InvalidArgument(
533                "write_batch_multi requires at least one column".into(),
534            ));
535        }
536
537        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
538        let file_path = format!("data/part-{:05}.parquet", part_num);
539
540        let col_batches: Vec<VectorColumnBatch<'_>> = columns
541            .iter()
542            .map(|c| VectorColumnBatch {
543                policy: &c.policy,
544                embeddings: c.embeddings,
545            })
546            .collect();
547
548        let primary_policy = &columns[0].policy;
549        let file_writer = AilakeFileWriter::new(primary_policy.clone());
550        let file_bytes: Bytes = file_writer.write_multi(batch, &col_batches)?;
551        let file_size = file_bytes.len() as u64;
552
553        self.store.put(&file_path, file_bytes.clone()).await?;
554
555        // Primary centroid for pruning
556        let primary_centroid =
557            compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
558
559        // Read primary AILK header for offsets
560        let reader = ailake_file::AilakeFileReader::new(
561            file_bytes.clone(),
562            &primary_policy.column_name,
563            primary_policy.dim,
564        );
565        let primary_ailk_start = reader.ailk_offset()?;
566        let primary_header = {
567            use ailake_file::HEADER_SIZE;
568            let start = primary_ailk_start as usize;
569            let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
570                .try_into()
571                .map_err(|_| AilakeError::NotAnAilakeFile)?;
572            ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
573        };
574        let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
575
576        // Extra column index metadata
577        let mut extra: Vec<ExtraVectorIndex> = Vec::new();
578        for col in columns.iter().skip(1) {
579            let col_ailk_start = reader.ailk_offset_for_column(&col.policy.column_name)?;
580            let col_header = {
581                use ailake_file::HEADER_SIZE;
582                let start = col_ailk_start as usize;
583                let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
584                    .try_into()
585                    .map_err(|_| AilakeError::NotAnAilakeFile)?;
586                ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
587            };
588            let col_centroid = compute_centroid_and_radius(col.embeddings, col.policy.metric);
589            extra.push(ExtraVectorIndex {
590                column: col.policy.column_name.clone(),
591                dim: col.policy.dim,
592                hnsw_offset: col_ailk_start + col_header.hnsw_offset,
593                hnsw_len: col_header.hnsw_len,
594                centroid_b64: Some(encode_centroid_b64(&col_centroid)),
595                radius: Some(col_centroid.radius),
596            });
597        }
598
599        let mut entry = make_multi_column_data_file_entry(
600            &file_path,
601            columns[0].embeddings.len() as u64,
602            file_size,
603            &primary_centroid,
604            VectorIndexInfo {
605                column: &primary_policy.column_name,
606                dim: primary_policy.dim,
607                hnsw_offset: primary_hnsw_abs,
608                hnsw_len: primary_header.hnsw_len,
609            },
610            &extra,
611        );
612        entry.embedding_model = self
613            .policy
614            .embedding_model
615            .as_ref()
616            .map(|m| m.to_property_value());
617        entry.partition_value =
618            apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
619        self.pending_files.push(entry);
620        Ok(())
621    }
622
623    /// Write a multi-column batch as Parquet-only immediately; build all N column
624    /// HNSW indexes in a single background task.
625    ///
626    /// Same semantics as `write_batch_deferred` but for N vector columns:
627    /// - Parquet (primary column bytes) is persisted immediately (~200k vec/s).
628    /// - A background tokio task rebuilds the full AILK file via `write_multi` and
629    ///   patches the catalog entry with primary + extra column offsets once ready.
630    /// - During the build window, `SearchSession` serves this shard via GPU/CPU flat
631    ///   scan. Transition to HNSW-indexed search is automatic on `IndexStatus::Ready`.
632    ///
633    /// All N column embeddings are cloned into the background task; choose batch size
634    /// so that N×rows×dim×4 bytes fits comfortably in RAM while the task runs.
635    pub async fn write_batch_multi_deferred(
636        &mut self,
637        batch: &RecordBatch,
638        columns: &[MultiVectorBatch<'_>],
639    ) -> AilakeResult<()> {
640        use ailake_core::AilakeError;
641        if columns.is_empty() {
642            return Err(AilakeError::InvalidArgument(
643                "write_batch_multi_deferred requires at least one column".into(),
644            ));
645        }
646        if self.captured_schema.is_none() {
647            self.captured_schema = Some(batch.schema());
648        }
649        if self.extra_vec_policies.is_empty() && columns.len() > 1 {
650            self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
651        }
652
653        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
654        let file_path = format!("data/part-{:05}.parquet", part_num);
655
656        // Immediate path: write Parquet with primary column only (no AILK sections yet).
657        let primary_policy = &columns[0].policy;
658        let file_writer = AilakeFileWriter::new(primary_policy.clone());
659        let parquet_bytes = file_writer.write_parquet_only(batch, columns[0].embeddings)?;
660        let file_size = parquet_bytes.len() as u64;
661        self.store.put(&file_path, parquet_bytes).await?;
662
663        // Primary centroid enables geometric pruning during the build window.
664        let primary_centroid =
665            compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
666        let mut entry = make_data_file_entry_indexing(
667            &file_path,
668            columns[0].embeddings.len() as u64,
669            file_size,
670            &primary_centroid,
671            &primary_policy.column_name,
672            primary_policy.dim,
673        );
674        // Populate extra_vector_indexes with centroids/radii for pruning.
675        // hnsw_offset/len are 0 until the background task patches them to non-zero.
676        entry.extra_vector_indexes = columns[1..]
677            .iter()
678            .map(|c| {
679                let col_centroid = compute_centroid_and_radius(c.embeddings, c.policy.metric);
680                ExtraVectorIndex {
681                    column: c.policy.column_name.clone(),
682                    dim: c.policy.dim,
683                    hnsw_offset: 0,
684                    hnsw_len: 0,
685                    centroid_b64: Some(encode_centroid_b64(&col_centroid)),
686                    radius: Some(col_centroid.radius),
687                }
688            })
689            .collect();
690        entry.embedding_model = self
691            .policy
692            .embedding_model
693            .as_ref()
694            .map(|m| m.to_property_value());
695        entry.partition_value =
696            apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
697        self.pending_files.push(entry);
698
699        // Clone all column data for the background task.
700        let all_policies: Vec<VectorStoragePolicy> =
701            columns.iter().map(|c| c.policy.clone()).collect();
702        let all_embeddings: Vec<Vec<Vec<f32>>> =
703            columns.iter().map(|c| c.embeddings.to_vec()).collect();
704        let store = self.store.clone();
705        let catalog = self.catalog.clone();
706        let table = self.table.clone();
707        let fp = file_path.clone();
708        tokio::spawn(async move {
709            if let Err(e) =
710                build_and_patch_multi_index(store, catalog, all_policies, table, fp, all_embeddings)
711                    .await
712            {
713                error!(
714                    "ailake: deferred multi-column HNSW build failed — shard stays in flat-scan \
715                     mode until next compaction rebuilds the index: {}",
716                    e
717                );
718            }
719        });
720
721        Ok(())
722    }
723
724    /// Commit all staged files as a new Iceberg snapshot.
725    ///
726    /// No-op when `pending_files` is empty (e.g., all `write_batch_idempotent`
727    /// calls were skipped because their `batch_id` was already committed).
728    /// Returns the current snapshot id in that case (or 0 if no snapshot exists yet).
729    /// Build a Bloom filter from the BM25 text column and store it for the given file.
730    /// Called alongside `update_bm25_stats_from_batch` for every write_batch. The filter
731    /// is flushed to the Puffin stats file at commit time (Phase F).
732    fn build_bloom_for_file(&mut self, batch: &RecordBatch, file_path: &str) {
733        use arrow_array::cast::AsArray;
734        let col_name = match &self.bm25_text_column {
735            Some(c) => c.clone(),
736            None => return,
737        };
738        let col = match batch.column_by_name(&col_name) {
739            Some(c) => c,
740            None => return,
741        };
742        let str_arr = match col.as_string_opt::<i32>() {
743            Some(a) => a,
744            None => return,
745        };
746        // Size the filter for ~10× unique terms per row at 1% FPR.
747        let cap = (batch.num_rows() * 10).max(128);
748        let mut bloom = crate::bloom::BloomFilter::with_capacity(cap, 0.01);
749        for i in 0..str_arr.len() {
750            if str_arr.is_valid(i) {
751                for term in crate::bm25::tokenize(str_arr.value(i)) {
752                    bloom.insert(&term);
753                }
754            }
755        }
756        self.pending_blooms
757            .push((file_path.to_string(), bloom.to_bytes()));
758    }
759
760    /// Update BM25 IDF stats from a batch's text column and persist to storage.
761    ///
762    /// Read-modify-write: loads existing stats (if any), merges new DF counts,
763    /// writes back. Concurrent writers may lose some DF deltas; acceptable for
764    /// approximate BM25 (same as Iceberg without OCC). Compaction rebuilds accurately.
765    async fn update_bm25_stats_from_batch(&self, batch: &RecordBatch) -> AilakeResult<()> {
766        use arrow_array::cast::AsArray;
767
768        let col_name = match &self.bm25_text_column {
769            Some(c) => c.as_str(),
770            None => return Ok(()),
771        };
772        let col = match batch.column_by_name(col_name) {
773            Some(c) => c,
774            None => {
775                tracing::warn!(
776                    "ailake: BM25 text column '{}' not found in batch — skipping IDF update",
777                    col_name
778                );
779                return Ok(());
780            }
781        };
782        let str_arr = match col.as_string_opt::<i32>() {
783            Some(a) => a,
784            None => {
785                tracing::warn!(
786                    "ailake: BM25 text column '{}' is not a Utf8 column — skipping",
787                    col_name
788                );
789                return Ok(());
790            }
791        };
792
793        let texts: Vec<&str> = (0..str_arr.len())
794            .filter(|&i| str_arr.is_valid(i))
795            .map(|i| str_arr.value(i))
796            .collect();
797
798        // Load existing stats
799        let stats_path = crate::bm25::BM25_STATS_FILE;
800        let mut stats: crate::bm25::IdfStats = match self.store.get(stats_path).await {
801            Ok(bytes) => crate::bm25::IdfStats::from_bytes(&bytes).unwrap_or_default(),
802            Err(_) => crate::bm25::IdfStats::default(),
803        };
804
805        stats.merge_batch(&texts);
806
807        let bytes = stats.to_bytes()?;
808        self.store
809            .put(stats_path, bytes::Bytes::from(bytes))
810            .await?;
811        Ok(())
812    }
813
814    pub async fn commit(mut self) -> AilakeResult<SnapshotId> {
815        if self.pending_files.is_empty() {
816            let current = self
817                .catalog
818                .load_table(&self.table)
819                .await
820                .ok()
821                .and_then(|m| m.current_snapshot_id)
822                .unwrap_or(0);
823            return Ok(current);
824        }
825        let iceberg_schema = self
826            .captured_schema
827            .as_deref()
828            .map(|s| arrow_schema_to_iceberg_update(s, &self.policy, &self.extra_vec_policies));
829        // Store secondary column dims/metrics as table-level properties so
830        // search_multimodal can discover them without reading Parquet files.
831        let mut extra_properties = std::collections::HashMap::new();
832        for ep in &self.extra_vec_policies {
833            extra_properties.insert(format!("ailake.dim-{}", ep.column_name), ep.dim.to_string());
834            extra_properties.insert(
835                format!("ailake.metric-{}", ep.column_name),
836                ailake_parquet::schema::metric_str(ep.metric).to_string(),
837            );
838            if let Some(modality) = ep.modality {
839                extra_properties.insert(
840                    format!("ailake.modality-{}", ep.column_name),
841                    modality.as_str().to_string(),
842                );
843            }
844        }
845        let snapshot = NewSnapshot {
846            snapshot_id: new_snapshot_id(),
847            parent_snapshot_id: self.parent_snapshot_id,
848            files: std::mem::take(&mut self.pending_files),
849            operation: SnapshotOperation::Append,
850            iceberg_schema,
851            extra_properties,
852            bloom_filters: std::mem::take(&mut self.pending_blooms),
853            equality_delete_files: vec![],
854        };
855        self.catalog.commit_snapshot(&self.table, snapshot).await
856    }
857
858    /// Create a table if it doesn't exist, then return a writer for it.
859    pub async fn create_or_open(
860        catalog: Arc<dyn CatalogProvider>,
861        store: Arc<dyn Store>,
862        policy: VectorStoragePolicy,
863        table: TableIdent,
864        format_version: u8,
865    ) -> AilakeResult<Self> {
866        // Track existing file count so new writers start their part counter past
867        // any already-committed files, preventing name collisions on sequential writes.
868        let existing_file_count: u32;
869
870        match catalog.load_table(&table).await {
871            Ok(existing_meta) => {
872                // Hard error: dim stored in table metadata must match the policy dim.
873                // validate_embedding_dim() only checks vectors vs policy.dim; without this
874                // check a caller can open with dim=16 on a dim=8 table and silently corrupt it.
875                if let Some(stored_dim_str) = existing_meta.properties.get("ailake.vector-dim") {
876                    if let Ok(stored_dim) = stored_dim_str.parse::<u32>() {
877                        if stored_dim != policy.dim {
878                            let table_model = policy
879                                .embedding_model
880                                .as_ref()
881                                .map(|m| m.to_property_value())
882                                .unwrap_or_else(|| format!("dim={}", stored_dim));
883                            return Err(AilakeError::ModelMismatch {
884                                table_model,
885                                table_dim: stored_dim,
886                                batch_model: format!("dim={}", policy.dim),
887                                batch_dim: policy.dim,
888                            });
889                        }
890                    }
891                }
892                // Warn when writing with a different model name into an existing table.
893                // Name divergence is softer — same dim, different model (e.g. fine-tune vs
894                // base) — warn only.
895                if let Some(incoming) = &policy.embedding_model {
896                    if let Some(stored_val) = existing_meta
897                        .properties
898                        .get(EmbeddingModelInfo::property_key())
899                    {
900                        let stored = EmbeddingModelInfo::from_property_value(stored_val);
901                        if stored.name != incoming.name {
902                            warn!(
903                                "ailake: embedding model name changed: table has '{}', writing with '{}' \
904                                 (dim={}). Vectors may be incompatible for similarity search.",
905                                stored.name, incoming.name, policy.dim
906                            );
907                        }
908                    }
909                }
910                existing_file_count = catalog
911                    .list_files(&table, None)
912                    .await
913                    .unwrap_or_default()
914                    .len() as u32;
915            }
916            Err(_) => {
917                catalog
918                    .create_table(
919                        &table,
920                        &TableProperties {
921                            partition_column_type: policy.partition_column_type.clone(),
922                            policy: policy.clone(),
923                            extra: std::collections::HashMap::new(),
924                            format_version,
925                        },
926                    )
927                    .await?;
928                existing_file_count = 0;
929            }
930        }
931        let mut writer = Self::new(catalog, store, policy, table);
932        writer.part_counter = Arc::new(AtomicU32::new(existing_file_count));
933        Ok(writer)
934    }
935}
936
937/// Convert an Arrow schema to an Iceberg schema update for catalog commits.
938///
939/// Top-level field IDs are assigned sequentially (1-based) and match the
940/// `PARQUET:field_id` stamps written by `ParquetVectorWriter`. Nested element
941/// IDs (inside List/Struct/Map) are assigned after all top-level IDs are
942/// pre-reserved, so they never collide with Parquet column field IDs.
943fn arrow_schema_to_iceberg_update(
944    schema: &arrow_schema::Schema,
945    policy: &VectorStoragePolicy,
946    extra_vec_policies: &[VectorStoragePolicy],
947) -> IcebergSchemaUpdate {
948    let bytes_per_dim = policy.precision.bytes_per_element() as u32;
949    let vec_fixed_len = policy.dim * bytes_per_dim;
950
951    // Collect all vector column names that will appear in the final schema.
952    let has_primary_in_batch = schema
953        .fields()
954        .iter()
955        .any(|f| f.name() == &policy.column_name);
956    let vec_cols: Vec<(String, u32)> = {
957        let mut v = Vec::new();
958        if !has_primary_in_batch {
959            v.push((policy.column_name.clone(), vec_fixed_len));
960        }
961        for ep in extra_vec_policies {
962            let ep_fixed_len = ep.dim * ep.precision.bytes_per_element() as u32;
963            if !schema.fields().iter().any(|f| f.name() == &ep.column_name) {
964                v.push((ep.column_name.clone(), ep_fixed_len));
965            }
966        }
967        v
968    };
969
970    // Total top-level columns = batch fields + appended vec columns.
971    let top_level_count = schema.fields().len() + vec_cols.len();
972    // Nested element IDs start after all top-level IDs are pre-reserved.
973    let mut nested_id = top_level_count as i32;
974
975    let mut fields: Vec<serde_json::Value> = Vec::new();
976    let mut name_mapping: Vec<serde_json::Value> = Vec::new();
977
978    for (idx, field) in schema.fields().iter().enumerate() {
979        let field_id = (idx + 1) as i32;
980        let iceberg_type = arrow_type_to_iceberg(field.data_type(), &mut nested_id);
981        fields.push(serde_json::json!({
982            "id": field_id,
983            "name": field.name(),
984            "required": false,
985            "type": iceberg_type,
986        }));
987        name_mapping.push(serde_json::json!({
988            "field-id": field_id,
989            "names": [field.name()],
990        }));
991    }
992
993    // Append vector columns that live outside the RecordBatch schema.
994    for (i, (col_name, fixed_len)) in vec_cols.iter().enumerate() {
995        let field_id = (schema.fields().len() + 1 + i) as i32;
996        fields.push(serde_json::json!({
997            "id": field_id,
998            "name": col_name,
999            "required": false,
1000            "type": format!("fixed[{fixed_len}]"),
1001        }));
1002        name_mapping.push(serde_json::json!({
1003            "field-id": field_id,
1004            "names": [col_name],
1005        }));
1006    }
1007
1008    let last_column_id = nested_id;
1009    let name_mapping_json = serde_json::to_string(&name_mapping).unwrap_or_else(|_| "[]".into());
1010
1011    IcebergSchemaUpdate {
1012        fields,
1013        last_column_id,
1014        name_mapping_json,
1015    }
1016}
1017
1018/// Map an Arrow DataType to an Iceberg schema type value (string or JSON object).
1019///
1020/// `nested_id` is a shared counter for generating unique element/field IDs inside
1021/// List, Struct, and Map types. It must start beyond all pre-reserved top-level IDs.
1022fn arrow_type_to_iceberg(dt: &arrow_schema::DataType, nested_id: &mut i32) -> serde_json::Value {
1023    use arrow_schema::DataType;
1024    match dt {
1025        DataType::Boolean => serde_json::json!("boolean"),
1026        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
1027            serde_json::json!("int")
1028        }
1029        DataType::Int64 | DataType::UInt32 | DataType::UInt64 => serde_json::json!("long"),
1030        DataType::Float16 | DataType::Float32 => serde_json::json!("float"),
1031        DataType::Float64 => serde_json::json!("double"),
1032        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => serde_json::json!("string"),
1033        DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
1034            serde_json::json!("binary")
1035        }
1036        DataType::Date32 | DataType::Date64 => serde_json::json!("date"),
1037        // Timestamp with timezone → timestamptz; without → timestamp.
1038        DataType::Timestamp(_, Some(_)) => serde_json::json!("timestamptz"),
1039        DataType::Timestamp(_, None) => serde_json::json!("timestamp"),
1040        DataType::Time32(_) | DataType::Time64(_) => serde_json::json!("time"),
1041        DataType::FixedSizeBinary(n) => serde_json::json!(format!("fixed[{n}]")),
1042        DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
1043            serde_json::json!(format!("decimal({p}, {s})"))
1044        }
1045        DataType::List(inner)
1046        | DataType::LargeList(inner)
1047        | DataType::ListView(inner)
1048        | DataType::FixedSizeList(inner, _) => {
1049            *nested_id += 1;
1050            let element_id = *nested_id;
1051            let element_type = arrow_type_to_iceberg(inner.data_type(), nested_id);
1052            serde_json::json!({
1053                "type": "list",
1054                "element-id": element_id,
1055                "element": element_type,
1056                "element-required": !inner.is_nullable(),
1057            })
1058        }
1059        DataType::Struct(arrow_fields) => {
1060            let struct_fields: Vec<serde_json::Value> = arrow_fields
1061                .iter()
1062                .map(|f| {
1063                    *nested_id += 1;
1064                    let fid = *nested_id;
1065                    let ftype = arrow_type_to_iceberg(f.data_type(), nested_id);
1066                    serde_json::json!({
1067                        "id": fid,
1068                        "name": f.name(),
1069                        "required": !f.is_nullable(),
1070                        "type": ftype,
1071                    })
1072                })
1073                .collect();
1074            serde_json::json!({ "type": "struct", "fields": struct_fields })
1075        }
1076        DataType::Map(entries, _) => {
1077            // Arrow Map is List<Struct<key: K, value: V>>.
1078            *nested_id += 1;
1079            let key_id = *nested_id;
1080            *nested_id += 1;
1081            let val_id = *nested_id;
1082            if let DataType::Struct(kv_fields) = entries.data_type() {
1083                let key_f = kv_fields
1084                    .iter()
1085                    .find(|f| f.name() == "key" || f.name() == "keys");
1086                let val_f = kv_fields
1087                    .iter()
1088                    .find(|f| f.name() == "value" || f.name() == "values");
1089                let key_type = key_f
1090                    .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
1091                    .unwrap_or(serde_json::json!("binary"));
1092                let val_type = val_f
1093                    .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
1094                    .unwrap_or(serde_json::json!("binary"));
1095                let val_required = val_f.map(|f| !f.is_nullable()).unwrap_or(false);
1096                serde_json::json!({
1097                    "type": "map",
1098                    "key-id": key_id,
1099                    "key": key_type,
1100                    "value-id": val_id,
1101                    "value": val_type,
1102                    "value-required": val_required,
1103                })
1104            } else {
1105                serde_json::json!("binary")
1106            }
1107        }
1108        _ => serde_json::json!("binary"),
1109    }
1110}
1111
1112/// Background task: reads a Parquet-only shard, builds full AILK file, patches catalog.
1113pub(crate) async fn build_and_patch_index(
1114    store: Arc<dyn Store>,
1115    catalog: Arc<dyn CatalogProvider>,
1116    policy: VectorStoragePolicy,
1117    table: TableIdent,
1118    file_path: String,
1119) -> AilakeResult<()> {
1120    // Read the Parquet-only bytes already stored.
1121    let parquet_bytes = store.get(&file_path).await?;
1122    let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
1123    let (batch, embeddings) = reader.read_parquet()?;
1124
1125    // Build the full AILK file (Parquet + HNSW) — CPU-intensive; run on blocking pool
1126    // so the tokio async threads aren't starved when many shards build concurrently.
1127    let full_bytes = tokio::task::spawn_blocking({
1128        let policy = policy.clone();
1129        move || {
1130            let file_writer = AilakeFileWriter::new(policy);
1131            file_writer.write(&batch, &embeddings)
1132        }
1133    })
1134    .await
1135    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1136
1137    // Extract HNSW offsets from the newly written file.
1138    let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
1139    let header = full_reader.read_header()?;
1140    let ailk_start = full_reader.ailk_offset()?;
1141    let hnsw_abs_offset = ailk_start + header.hnsw_offset;
1142    let hnsw_len = header.hnsw_len;
1143
1144    // Overwrite the Parquet-only file with the full AILK version.
1145    store.put(&file_path, full_bytes).await?;
1146
1147    // Wait for the initial writer commit to appear (max 60 s).
1148    // HNSW builds can finish before the main write loop calls commit_snapshot.
1149    let mut committed = false;
1150    for _ in 0..120u32 {
1151        match catalog.load_table(&table).await {
1152            Ok(meta) if meta.current_snapshot_id.is_some() => {
1153                committed = true;
1154                break;
1155            }
1156            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1157        }
1158    }
1159    if !committed {
1160        return Err(ailake_core::AilakeError::Store(format!(
1161            "deferred HNSW build: no snapshot committed for {file_path} after 60 s — \
1162             did you call TableWriter::commit()?"
1163        )));
1164    }
1165
1166    // Update the catalog with CAS-like retry to handle concurrent background tasks.
1167    // Multiple tasks can race on commit_snapshot(Replace): the last writer wins and
1168    // may overwrite a sibling task's Ready status. Retry until we confirm our file
1169    // is marked Ready in the current snapshot.
1170    for attempt in 0..50u32 {
1171        let table_meta = catalog.load_table(&table).await?;
1172        let parent_snapshot_id = table_meta.current_snapshot_id;
1173        let mut files = catalog.list_files(&table, None).await?;
1174
1175        // Already marked Ready by a previous successful attempt.
1176        if files
1177            .iter()
1178            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1179        {
1180            break;
1181        }
1182
1183        for f in &mut files {
1184            if f.path == file_path {
1185                f.hnsw_offset = Some(hnsw_abs_offset);
1186                f.hnsw_len = Some(hnsw_len);
1187                f.index_status = IndexStatus::Ready;
1188                break;
1189            }
1190        }
1191        catalog
1192            .commit_snapshot(
1193                &table,
1194                NewSnapshot {
1195                    snapshot_id: new_snapshot_id(),
1196                    parent_snapshot_id,
1197                    files,
1198                    operation: SnapshotOperation::Replace,
1199                    iceberg_schema: None,
1200                    extra_properties: std::collections::HashMap::new(),
1201                    bloom_filters: vec![],
1202                    equality_delete_files: vec![],
1203                },
1204            )
1205            .await?;
1206
1207        // Brief yield so sibling tasks can commit, then verify our change survived.
1208        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1209
1210        let verify = catalog.list_files(&table, None).await?;
1211        if verify
1212            .iter()
1213            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1214        {
1215            break;
1216        }
1217        // Another task overwrote us — retry.
1218    }
1219
1220    info!(
1221        "ailake: deferred HNSW index built for {} (offset={}, len={})",
1222        file_path, hnsw_abs_offset, hnsw_len
1223    );
1224    Ok(())
1225}
1226
1227/// Background task: train IVF-PQ (using shared codebook) and patch catalog entry.
1228///
1229/// The OnceCell guarantees that k-means training runs exactly once across all
1230/// concurrent background tasks — subsequent tasks skip directly to assign+encode.
1231async fn build_ivf_pq_and_patch_index(
1232    store: Arc<dyn Store>,
1233    catalog: Arc<dyn CatalogProvider>,
1234    policy: VectorStoragePolicy,
1235    table: TableIdent,
1236    file_path: String,
1237    ivf_config: IvfPqConfig,
1238    codebook_cell: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
1239) -> AilakeResult<()> {
1240    let parquet_bytes = store.get(&file_path).await?;
1241    let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
1242    let (batch, embeddings) = reader.read_parquet()?;
1243
1244    // Get or train the shared codebook. First task trains; all others await the result.
1245    let codebook = codebook_cell
1246        .get_or_try_init(|| async {
1247            let vecs = embeddings.clone();
1248            let metric = policy.metric;
1249            let cfg = ivf_config.clone();
1250            tokio::task::spawn_blocking(move || {
1251                ailake_index::IvfPqIndex::train_codebook(&vecs, metric, &cfg)
1252            })
1253            .await
1254            .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))?
1255        })
1256        .await?;
1257
1258    let full_bytes = tokio::task::spawn_blocking({
1259        let policy = policy.clone();
1260        let codebook = codebook.clone();
1261        move || {
1262            let file_writer = AilakeFileWriter::new(policy)
1263                .with_index_type(IndexType::IvfPq(ivf_config))
1264                .with_shared_ivf_codebook(Arc::new(codebook));
1265            file_writer.write(&batch, &embeddings)
1266        }
1267    })
1268    .await
1269    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1270
1271    let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
1272    let header = full_reader.read_header()?;
1273    let ailk_start = full_reader.ailk_offset()?;
1274    let hnsw_abs_offset = ailk_start + header.hnsw_offset;
1275    let hnsw_len = header.hnsw_len;
1276
1277    store.put(&file_path, full_bytes).await?;
1278
1279    // Wait for initial commit to appear then patch IndexStatus::Ready (max 60 s).
1280    let mut committed = false;
1281    for _ in 0..120u32 {
1282        match catalog.load_table(&table).await {
1283            Ok(meta) if meta.current_snapshot_id.is_some() => {
1284                committed = true;
1285                break;
1286            }
1287            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1288        }
1289    }
1290    if !committed {
1291        return Err(ailake_core::AilakeError::Store(format!(
1292            "deferred IVF-PQ build: no snapshot committed for {file_path} after 60 s — \
1293             did you call TableWriter::commit()?"
1294        )));
1295    }
1296
1297    for attempt in 0..50u32 {
1298        let table_meta = catalog.load_table(&table).await?;
1299        let parent_snapshot_id = table_meta.current_snapshot_id;
1300        let mut files = catalog.list_files(&table, None).await?;
1301
1302        if files
1303            .iter()
1304            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1305        {
1306            break;
1307        }
1308
1309        for f in &mut files {
1310            if f.path == file_path {
1311                f.hnsw_offset = Some(hnsw_abs_offset);
1312                f.hnsw_len = Some(hnsw_len);
1313                f.index_status = IndexStatus::Ready;
1314                break;
1315            }
1316        }
1317        catalog
1318            .commit_snapshot(
1319                &table,
1320                NewSnapshot {
1321                    snapshot_id: new_snapshot_id(),
1322                    parent_snapshot_id,
1323                    files,
1324                    operation: SnapshotOperation::Replace,
1325                    iceberg_schema: None,
1326                    extra_properties: std::collections::HashMap::new(),
1327                    bloom_filters: vec![],
1328                    equality_delete_files: vec![],
1329                },
1330            )
1331            .await?;
1332
1333        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1334
1335        let verify = catalog.list_files(&table, None).await?;
1336        if verify
1337            .iter()
1338            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1339        {
1340            break;
1341        }
1342    }
1343
1344    info!(
1345        "ailake: deferred IVF-PQ index built for {} (offset={}, len={})",
1346        file_path, hnsw_abs_offset, hnsw_len
1347    );
1348    Ok(())
1349}
1350
1351/// Background task: rebuild full multi-column AILK file and patch all column offsets.
1352///
1353/// Reads the Parquet-only shard, calls `write_multi` with all N column embeddings
1354/// (cloned from the caller), extracts per-column HNSW offsets, overwrites the file,
1355/// then applies the same CAS retry loop used by single-column deferred tasks.
1356async fn build_and_patch_multi_index(
1357    store: Arc<dyn Store>,
1358    catalog: Arc<dyn CatalogProvider>,
1359    policies: Vec<VectorStoragePolicy>,
1360    table: TableIdent,
1361    file_path: String,
1362    all_embeddings: Vec<Vec<Vec<f32>>>,
1363) -> AilakeResult<()> {
1364    // Read the Parquet-only shard (primary column only).
1365    let parquet_bytes = store.get(&file_path).await?;
1366    let primary_reader =
1367        AilakeFileReader::new(parquet_bytes, &policies[0].column_name, policies[0].dim);
1368    let (batch, _) = primary_reader.read_parquet()?;
1369
1370    // Build full AILK file with all N column HNSW sections on the blocking pool.
1371    let full_bytes = tokio::task::spawn_blocking({
1372        let policies = policies.clone();
1373        let all_embeddings = all_embeddings.clone();
1374        move || {
1375            let col_batches: Vec<VectorColumnBatch<'_>> = policies
1376                .iter()
1377                .zip(all_embeddings.iter())
1378                .map(|(p, embs)| VectorColumnBatch {
1379                    policy: p,
1380                    embeddings: embs.as_slice(),
1381                })
1382                .collect();
1383            let file_writer = AilakeFileWriter::new(policies[0].clone());
1384            file_writer.write_multi(&batch, &col_batches)
1385        }
1386    })
1387    .await
1388    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1389
1390    // Extract primary HNSW offsets.
1391    let primary_reader = AilakeFileReader::new(
1392        full_bytes.clone(),
1393        &policies[0].column_name,
1394        policies[0].dim,
1395    );
1396    let primary_header = primary_reader.read_header()?;
1397    let primary_ailk_start = primary_reader.ailk_offset()?;
1398    let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
1399    let primary_hnsw_len = primary_header.hnsw_len;
1400
1401    // Extract extra column HNSW offsets (one reader per column).
1402    // Must use ailk_offset_for_column / read_header_for_column so each column's
1403    // own `ailake.{col}.footer_offset` is used — ailk_offset() always returns the
1404    // primary column offset, which is wrong for extra columns.
1405    let mut extra_offsets: Vec<(u64, u64)> = Vec::with_capacity(policies.len().saturating_sub(1));
1406    for col_policy in policies.iter().skip(1) {
1407        let col_reader =
1408            AilakeFileReader::new(full_bytes.clone(), &col_policy.column_name, col_policy.dim);
1409        let col_ailk_start = col_reader.ailk_offset_for_column(&col_policy.column_name)?;
1410        let col_header = col_reader.read_header_for_column(&col_policy.column_name)?;
1411        extra_offsets.push((col_ailk_start + col_header.hnsw_offset, col_header.hnsw_len));
1412    }
1413
1414    // Overwrite the Parquet-only shard with the full AILK file.
1415    store.put(&file_path, full_bytes).await?;
1416
1417    // Wait for the initial writer commit to appear (max 60 s).
1418    let mut committed = false;
1419    for _ in 0..120u32 {
1420        match catalog.load_table(&table).await {
1421            Ok(meta) if meta.current_snapshot_id.is_some() => {
1422                committed = true;
1423                break;
1424            }
1425            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1426        }
1427    }
1428    if !committed {
1429        return Err(ailake_core::AilakeError::Store(format!(
1430            "deferred index build: no snapshot committed for {file_path} after 60 s — \
1431             did you call TableWriter::commit()?"
1432        )));
1433    }
1434
1435    // CAS retry loop: patch primary offsets + extra_vector_indexes + IndexStatus::Ready.
1436    for attempt in 0..50u32 {
1437        let table_meta = catalog.load_table(&table).await?;
1438        let parent_snapshot_id = table_meta.current_snapshot_id;
1439        let mut files = catalog.list_files(&table, None).await?;
1440
1441        if files
1442            .iter()
1443            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1444        {
1445            break;
1446        }
1447
1448        for f in &mut files {
1449            if f.path == file_path {
1450                f.hnsw_offset = Some(primary_hnsw_abs);
1451                f.hnsw_len = Some(primary_hnsw_len);
1452                f.index_status = IndexStatus::Ready;
1453                for (i, &(off, len)) in extra_offsets.iter().enumerate() {
1454                    if let Some(xi) = f.extra_vector_indexes.get_mut(i) {
1455                        xi.hnsw_offset = off;
1456                        xi.hnsw_len = len;
1457                    }
1458                }
1459                break;
1460            }
1461        }
1462        catalog
1463            .commit_snapshot(
1464                &table,
1465                NewSnapshot {
1466                    snapshot_id: new_snapshot_id(),
1467                    parent_snapshot_id,
1468                    files,
1469                    operation: SnapshotOperation::Replace,
1470                    iceberg_schema: None,
1471                    extra_properties: std::collections::HashMap::new(),
1472                    bloom_filters: vec![],
1473                    equality_delete_files: vec![],
1474                },
1475            )
1476            .await?;
1477
1478        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1479
1480        let verify = catalog.list_files(&table, None).await?;
1481        if verify
1482            .iter()
1483            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1484        {
1485            break;
1486        }
1487    }
1488
1489    info!(
1490        "ailake: deferred multi-column HNSW built for {} ({} cols, primary offset={})",
1491        file_path,
1492        policies.len(),
1493        primary_hnsw_abs
1494    );
1495    Ok(())
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500    use super::*;
1501    use ailake_core::{VectorMetric, VectorPrecision};
1502    use arrow_schema::{DataType, Field, Schema, TimeUnit};
1503
1504    fn policy(col: &str, dim: u32) -> VectorStoragePolicy {
1505        VectorStoragePolicy {
1506            column_name: col.to_string(),
1507            dim,
1508            metric: VectorMetric::Cosine,
1509            precision: VectorPrecision::F16,
1510            pq: None,
1511            keep_raw_for_reranking: true,
1512            pre_normalize: false,
1513            hnsw_m: None,
1514            hnsw_ef_construction: None,
1515            ivf_residual: false,
1516            embedding_model: None,
1517            modality: None,
1518            partition_by: None,
1519            partition_value: None,
1520            partition_column_type: None,
1521            partition_fields: vec![],
1522        }
1523    }
1524
1525    fn update_for(schema: &Schema, pol: &VectorStoragePolicy) -> IcebergSchemaUpdate {
1526        arrow_schema_to_iceberg_update(schema, pol, &[])
1527    }
1528
1529    #[test]
1530    fn simple_schema_produces_correct_fields() {
1531        let schema = Schema::new(vec![
1532            Field::new("id", DataType::Int32, false),
1533            Field::new("text", DataType::Utf8, false),
1534        ]);
1535        let pol = policy("embedding", 8);
1536        let upd = update_for(&schema, &pol);
1537
1538        assert_eq!(upd.fields.len(), 3);
1539        assert_eq!(upd.fields[0]["id"], 1);
1540        assert_eq!(upd.fields[0]["type"], "int");
1541        assert_eq!(upd.fields[1]["id"], 2);
1542        assert_eq!(upd.fields[1]["type"], "string");
1543        assert_eq!(upd.fields[2]["id"], 3);
1544        assert_eq!(upd.fields[2]["type"], "fixed[16]"); // dim=8, F16=2 bytes
1545
1546        let nm: Vec<serde_json::Value> = serde_json::from_str(&upd.name_mapping_json).unwrap();
1547        assert_eq!(nm.len(), 3);
1548        assert_eq!(nm[2]["field-id"], 3);
1549        assert_eq!(nm[2]["names"][0], "embedding");
1550        assert_eq!(upd.last_column_id, 3);
1551    }
1552
1553    #[test]
1554    fn timestamp_without_tz_maps_to_timestamp_not_timestamptz() {
1555        let schema = Schema::new(vec![
1556            Field::new(
1557                "created_at",
1558                DataType::Timestamp(TimeUnit::Microsecond, None),
1559                true,
1560            ),
1561            Field::new(
1562                "updated_at",
1563                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1564                true,
1565            ),
1566        ]);
1567        let pol = policy("vec", 4);
1568        let upd = update_for(&schema, &pol);
1569
1570        assert_eq!(upd.fields[0]["type"], "timestamp");
1571        assert_eq!(upd.fields[1]["type"], "timestamptz");
1572    }
1573
1574    #[test]
1575    fn list_type_produces_iceberg_list_object() {
1576        let schema = Schema::new(vec![Field::new(
1577            "tags",
1578            DataType::List(std::sync::Arc::new(Field::new(
1579                "item",
1580                DataType::Utf8,
1581                true,
1582            ))),
1583            true,
1584        )]);
1585        let pol = policy("vec", 4);
1586        let upd = update_for(&schema, &pol);
1587
1588        let t = &upd.fields[0]["type"];
1589        assert_eq!(t["type"], "list");
1590        assert_eq!(t["element"], "string");
1591        // element-id must be > top-level field count (2: tags + vec)
1592        assert!(t["element-id"].as_i64().unwrap() > 2);
1593    }
1594
1595    #[test]
1596    fn struct_type_produces_nested_fields() {
1597        let schema = Schema::new(vec![Field::new(
1598            "meta",
1599            DataType::Struct(
1600                vec![
1601                    Field::new("key", DataType::Utf8, false),
1602                    Field::new("val", DataType::Int64, false),
1603                ]
1604                .into(),
1605            ),
1606            true,
1607        )]);
1608        let pol = policy("vec", 4);
1609        let upd = update_for(&schema, &pol);
1610
1611        let t = &upd.fields[0]["type"];
1612        assert_eq!(t["type"], "struct");
1613        let nested = t["fields"].as_array().unwrap();
1614        assert_eq!(nested.len(), 2);
1615        assert_eq!(nested[0]["name"], "key");
1616        assert_eq!(nested[0]["type"], "string");
1617        assert_eq!(nested[1]["name"], "val");
1618        assert_eq!(nested[1]["type"], "long");
1619        // Nested IDs must be > top-level count (2: meta + vec)
1620        assert!(nested[0]["id"].as_i64().unwrap() > 2);
1621    }
1622
1623    #[test]
1624    fn no_duplicate_vec_column_when_already_in_batch() {
1625        // If for some reason the vec column is in the batch schema, don't add it twice.
1626        let schema = Schema::new(vec![
1627            Field::new("id", DataType::Int32, false),
1628            Field::new("embedding", DataType::FixedSizeBinary(16), false),
1629        ]);
1630        let pol = policy("embedding", 8);
1631        let upd = update_for(&schema, &pol);
1632
1633        assert_eq!(upd.fields.len(), 2, "should not add embedding twice");
1634        let names: Vec<&str> = upd
1635            .fields
1636            .iter()
1637            .map(|f| f["name"].as_str().unwrap())
1638            .collect();
1639        assert_eq!(names.iter().filter(|&&n| n == "embedding").count(), 1);
1640    }
1641
1642    #[test]
1643    fn multi_vec_policies_all_appended() {
1644        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1645        let primary = policy("embedding", 4);
1646        let extra = vec![policy("context_embedding", 4)];
1647        let upd = arrow_schema_to_iceberg_update(&schema, &primary, &extra);
1648
1649        assert_eq!(upd.fields.len(), 3); // id + embedding + context_embedding
1650        let names: Vec<&str> = upd
1651            .fields
1652            .iter()
1653            .map(|f| f["name"].as_str().unwrap())
1654            .collect();
1655        assert!(names.contains(&"embedding"));
1656        assert!(names.contains(&"context_embedding"));
1657    }
1658
1659    #[test]
1660    fn top_level_field_ids_match_parquet_stamp_sequence() {
1661        // Top-level IDs must be 1, 2, ..., N regardless of nested element IDs.
1662        let schema = Schema::new(vec![
1663            Field::new("id", DataType::Int64, false),
1664            Field::new(
1665                "tags",
1666                DataType::List(std::sync::Arc::new(Field::new(
1667                    "item",
1668                    DataType::Utf8,
1669                    true,
1670                ))),
1671                true,
1672            ),
1673        ]);
1674        let pol = policy("vec", 4);
1675        let upd = update_for(&schema, &pol);
1676
1677        // Top-level: id=1, tags=2, vec=3
1678        assert_eq!(upd.fields[0]["id"], 1);
1679        assert_eq!(upd.fields[1]["id"], 2);
1680        assert_eq!(upd.fields[2]["id"], 3);
1681
1682        // Nested element ID must be > 3
1683        assert!(upd.fields[1]["type"]["element-id"].as_i64().unwrap() > 3);
1684    }
1685
1686    /// Smoke-test write_batch_auto_deferred: verifies that it completes without error
1687    /// and stages a pending file entry (index built asynchronously in background).
1688    #[tokio::test]
1689    async fn write_batch_auto_deferred_stages_file() {
1690        use ailake_catalog::{HadoopCatalog, TableIdent};
1691        use ailake_store::LocalStore;
1692        use arrow_schema::{DataType, Field, Schema};
1693
1694        let dir = tempfile::tempdir().unwrap();
1695        let store: std::sync::Arc<dyn ailake_store::Store> =
1696            std::sync::Arc::new(LocalStore::new(dir.path().to_str().unwrap()));
1697        let catalog = std::sync::Arc::new(HadoopCatalog::new(std::sync::Arc::clone(&store), ""));
1698        let pol = policy("embedding", 4);
1699        let ident = TableIdent::new("default", "t");
1700
1701        let mut writer = TableWriter::create_or_open(catalog, store, pol, ident, 2)
1702            .await
1703            .unwrap();
1704
1705        let schema =
1706            std::sync::Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
1707        let batch = arrow_array::RecordBatch::try_new(
1708            schema,
1709            vec![std::sync::Arc::new(arrow_array::StringArray::from(vec![
1710                "hello",
1711            ]))],
1712        )
1713        .unwrap();
1714        let embeddings = vec![vec![1.0f32, 0.0, 0.0, 0.0]];
1715
1716        writer
1717            .write_batch_auto_deferred(&batch, &embeddings)
1718            .await
1719            .unwrap();
1720
1721        // One pending file should be staged even before commit.
1722        assert_eq!(writer.pending_files.len(), 1);
1723    }
1724
1725    /// Smoke-test write_batch_multi_deferred: verifies Parquet staged immediately,
1726    /// placeholder extra_vector_indexes populated, and background task spawned.
1727    #[tokio::test]
1728    async fn write_batch_multi_deferred_stages_file_with_extra_indexes() {
1729        use ailake_catalog::{HadoopCatalog, IndexStatus, TableIdent};
1730        use ailake_store::LocalStore;
1731        use arrow_schema::{DataType, Field, Schema};
1732
1733        let dir = tempfile::tempdir().unwrap();
1734        let store: std::sync::Arc<dyn ailake_store::Store> =
1735            std::sync::Arc::new(LocalStore::new(dir.path().to_str().unwrap()));
1736        let catalog = std::sync::Arc::new(HadoopCatalog::new(std::sync::Arc::clone(&store), ""));
1737        let primary_pol = policy("embedding", 4);
1738        let ident = TableIdent::new("default", "t");
1739
1740        let mut writer = TableWriter::create_or_open(catalog, store, primary_pol, ident, 2)
1741            .await
1742            .unwrap();
1743
1744        let schema =
1745            std::sync::Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
1746        let batch = arrow_array::RecordBatch::try_new(
1747            schema,
1748            vec![std::sync::Arc::new(arrow_array::StringArray::from(vec![
1749                "hello", "world",
1750            ]))],
1751        )
1752        .unwrap();
1753
1754        let text_embs = vec![vec![1.0f32, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
1755        let img_embs = vec![vec![1.0f32, 0.0], vec![0.0, 1.0]];
1756
1757        let columns = vec![
1758            MultiVectorBatch {
1759                policy: policy("embedding", 4),
1760                embeddings: &text_embs,
1761            },
1762            MultiVectorBatch {
1763                policy: policy("img_embedding", 2),
1764                embeddings: &img_embs,
1765            },
1766        ];
1767
1768        writer
1769            .write_batch_multi_deferred(&batch, &columns)
1770            .await
1771            .unwrap();
1772
1773        assert_eq!(writer.pending_files.len(), 1);
1774        let entry = &writer.pending_files[0];
1775        // IndexStatus::Indexing — index build is async
1776        assert_eq!(entry.index_status, IndexStatus::Indexing);
1777        // Primary centroid populated for pruning during build window
1778        assert!(entry.centroid_b64.is_some());
1779        // Placeholder extra column entry (centroid present, offsets zero)
1780        assert_eq!(entry.extra_vector_indexes.len(), 1);
1781        let xi = &entry.extra_vector_indexes[0];
1782        assert_eq!(xi.column, "img_embedding");
1783        assert_eq!(xi.dim, 2);
1784        assert_eq!(xi.hnsw_offset, 0); // not yet built
1785        assert_eq!(xi.hnsw_len, 0); // not yet built
1786        assert!(xi.centroid_b64.is_some());
1787    }
1788}