Skip to main content

ailake_query/
writer.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4
5use ailake_catalog::{
6    encode_centroid_b64, make_data_file_entry, make_data_file_entry_indexing,
7    make_multi_column_data_file_entry, new_snapshot_id, CatalogProvider, DataFileEntry,
8    ExtraVectorIndex, IcebergSchemaUpdate, IndexStatus, NewSnapshot, SnapshotId, SnapshotOperation,
9    TableIdent, TableProperties, VectorIndexInfo,
10};
11use ailake_core::{AilakeError, AilakeResult, EmbeddingModelInfo, VectorStoragePolicy};
12use ailake_file::{AilakeFileReader, AilakeFileWriter, IndexType, VectorColumnBatch};
13use ailake_index::{IvfPqCodebook, IvfPqConfig};
14use ailake_store::Store;
15use ailake_vec::compute_centroid_and_radius;
16use arrow_array::RecordBatch;
17use arrow_schema::SchemaRef;
18use bytes::Bytes;
19use serde_json;
20use tracing::{error, info, warn};
21
22/// One vector column for a multi-column write batch.
23pub struct MultiVectorBatch<'a> {
24    pub policy: VectorStoragePolicy,
25    pub embeddings: &'a [Vec<f32>],
26}
27
28pub struct TableWriter {
29    catalog: Arc<dyn CatalogProvider>,
30    store: Arc<dyn Store>,
31    policy: VectorStoragePolicy,
32    table: TableIdent,
33    part_counter: Arc<AtomicU32>,
34    pending_files: Vec<DataFileEntry>,
35    parent_snapshot_id: Option<SnapshotId>,
36    /// Arrow schema captured from the first write_batch call; used to populate
37    /// Iceberg schema fields and schema.name-mapping.default on commit.
38    captured_schema: Option<SchemaRef>,
39    /// Extra vector column policies from write_batch_multi (columns beyond primary).
40    extra_vec_policies: Vec<VectorStoragePolicy>,
41    /// IVF-PQ codebook trained on the first shard and reused for all subsequent shards.
42    /// Ensures cross-shard ADC distances are comparable — no reranking needed.
43    cached_ivf_codebook: Option<Arc<IvfPqCodebook>>,
44    /// Shared codebook cell for deferred IVF-PQ builds. Cloneable Arc so each
45    /// background task can access it; OnceCell guarantees training runs exactly once.
46    deferred_ivf_codebook: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
47}
48
49impl TableWriter {
50    pub fn new(
51        catalog: Arc<dyn CatalogProvider>,
52        store: Arc<dyn Store>,
53        policy: VectorStoragePolicy,
54        table: TableIdent,
55    ) -> Self {
56        Self {
57            catalog,
58            store,
59            policy,
60            table,
61            part_counter: Arc::new(AtomicU32::new(0)),
62            pending_files: Vec::new(),
63            parent_snapshot_id: None,
64            captured_schema: None,
65            extra_vec_policies: Vec::new(),
66            cached_ivf_codebook: None,
67            deferred_ivf_codebook: Arc::new(tokio::sync::OnceCell::new()),
68        }
69    }
70
71    pub fn with_parent_snapshot(mut self, id: SnapshotId) -> Self {
72        self.parent_snapshot_id = Some(id);
73        self
74    }
75
76    /// Write batch as Parquet-only immediately, build HNSW in background.
77    ///
78    /// Returns after the Parquet file is persisted (~LanceDB write speed).
79    /// A tokio task runs concurrently to build the HNSW index, rewrite the
80    /// file with the AILK section, and update the catalog entry.
81    ///
82    /// During the build window, `SearchSession` serves this shard via flat scan
83    /// (brute-force, exact) instead of HNSW. The transition is automatic once
84    /// the background task commits the updated manifest entry.
85    pub async fn write_batch_deferred(
86        &mut self,
87        batch: &RecordBatch,
88        embeddings: &[Vec<f32>],
89    ) -> AilakeResult<()> {
90        self.validate_embedding_dim(embeddings)?;
91        if self.captured_schema.is_none() {
92            self.captured_schema = Some(batch.schema());
93        }
94        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
95        let file_path = format!("data/part-{:05}.parquet", part_num);
96
97        // Fast path: persist Parquet without HNSW.
98        let file_writer = AilakeFileWriter::new(self.policy.clone());
99        let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
100        let file_size = parquet_bytes.len() as u64;
101        self.store.put(&file_path, parquet_bytes).await?;
102
103        // Centroid needed immediately for geometric pruning during the build window.
104        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
105        let mut entry = make_data_file_entry_indexing(
106            &file_path,
107            embeddings.len() as u64,
108            file_size,
109            &centroid,
110            &self.policy.column_name,
111            self.policy.dim,
112        );
113        entry.embedding_model = self
114            .policy
115            .embedding_model
116            .as_ref()
117            .map(|m| m.to_property_value());
118        self.pending_files.push(entry);
119
120        // Spawn background HNSW build (fire-and-forget; errors are logged).
121        let store = self.store.clone();
122        let catalog = self.catalog.clone();
123        let policy = self.policy.clone();
124        let table = self.table.clone();
125        let fp = file_path.clone();
126        tokio::spawn(async move {
127            if let Err(e) = build_and_patch_index(store, catalog, policy, table, fp).await {
128                error!(
129                    "ailake: deferred HNSW build failed — file is indexed as Parquet-only until \
130                     next compaction rebuilds the index: {}",
131                    e
132                );
133            }
134        });
135
136        Ok(())
137    }
138
139    /// Write batch as Parquet-only immediately; train IVF-PQ index in background.
140    ///
141    /// The first shard trains the shared codebook (k-means). All subsequent shards
142    /// reuse it via `OnceCell` — build is O(n) assign+encode, not O(n×k) k-means.
143    /// Returns after Parquet is persisted. Index transitions Indexing → Ready async.
144    pub async fn write_batch_ivf_pq_deferred(
145        &mut self,
146        batch: &RecordBatch,
147        embeddings: &[Vec<f32>],
148        ivf_config: IvfPqConfig,
149    ) -> AilakeResult<()> {
150        if self.captured_schema.is_none() {
151            self.captured_schema = Some(batch.schema());
152        }
153        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
154        let file_path = format!("data/part-{:05}.parquet", part_num);
155
156        let file_writer = AilakeFileWriter::new(self.policy.clone());
157        let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
158        let file_size = parquet_bytes.len() as u64;
159        self.store.put(&file_path, parquet_bytes).await?;
160
161        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
162        let mut entry = make_data_file_entry_indexing(
163            &file_path,
164            embeddings.len() as u64,
165            file_size,
166            &centroid,
167            &self.policy.column_name,
168            self.policy.dim,
169        );
170        entry.embedding_model = self
171            .policy
172            .embedding_model
173            .as_ref()
174            .map(|m| m.to_property_value());
175        self.pending_files.push(entry);
176
177        let store = self.store.clone();
178        let catalog = self.catalog.clone();
179        let policy = self.policy.clone();
180        let table = self.table.clone();
181        let fp = file_path.clone();
182        let codebook_cell = self.deferred_ivf_codebook.clone();
183        tokio::spawn(async move {
184            if let Err(e) = build_ivf_pq_and_patch_index(
185                store,
186                catalog,
187                policy,
188                table,
189                fp,
190                ivf_config,
191                codebook_cell,
192            )
193            .await
194            {
195                error!(
196                    "ailake: deferred IVF-PQ build failed — file is indexed as Parquet-only until \
197                     next compaction rebuilds the index: {}",
198                    e
199                );
200            }
201        });
202
203        Ok(())
204    }
205
206    /// Idempotent variant of `write_batch`.
207    ///
208    /// Before any I/O, checks if `batch_id` already appears in the current
209    /// snapshot. If it does, this is a no-op — safe for Airflow/Kestra retries.
210    /// If not found, writes the batch and tags the `DataFileEntry` with `batch_id`
211    /// so future retries can detect it.
212    ///
213    /// `commit()` is likewise a no-op when `pending_files` is empty.
214    pub async fn write_batch_idempotent(
215        &mut self,
216        batch: &RecordBatch,
217        embeddings: &[Vec<f32>],
218        batch_id: &str,
219    ) -> AilakeResult<()> {
220        let existing = self.catalog.list_files(&self.table, None).await?;
221        if existing
222            .iter()
223            .any(|f| f.batch_id.as_deref() == Some(batch_id))
224        {
225            return Ok(());
226        }
227        self.write_batch_with_id(batch, embeddings, Some(batch_id.to_string()))
228            .await
229    }
230
231    /// Write a batch to a new AI-Lake file and stage it for commit.
232    /// Validates that provided embeddings match the table's configured dimension.
233    /// Returns `ModelMismatch` error when dim differs — prevents silently mixing
234    /// incompatible vectors (same error type used across write paths for consistency).
235    fn validate_embedding_dim(&self, embeddings: &[Vec<f32>]) -> AilakeResult<()> {
236        if let Some(first) = embeddings.first() {
237            let actual = first.len() as u32;
238            if actual != self.policy.dim {
239                let table_model = self
240                    .policy
241                    .embedding_model
242                    .as_ref()
243                    .map(|m| m.to_property_value())
244                    .unwrap_or_else(|| format!("dim={}", self.policy.dim));
245                return Err(AilakeError::ModelMismatch {
246                    table_model,
247                    table_dim: self.policy.dim,
248                    batch_model: format!("dim={}", actual),
249                    batch_dim: actual,
250                });
251            }
252        }
253        Ok(())
254    }
255
256    pub async fn write_batch(
257        &mut self,
258        batch: &RecordBatch,
259        embeddings: &[Vec<f32>],
260    ) -> AilakeResult<()> {
261        self.write_batch_with_id(batch, embeddings, None).await
262    }
263
264    async fn write_batch_with_id(
265        &mut self,
266        batch: &RecordBatch,
267        embeddings: &[Vec<f32>],
268        batch_id: Option<String>,
269    ) -> AilakeResult<()> {
270        self.validate_embedding_dim(embeddings)?;
271        if self.captured_schema.is_none() {
272            self.captured_schema = Some(batch.schema());
273        }
274        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
275        let file_path = format!("data/part-{:05}.parquet", part_num);
276
277        // Write AI-Lake file
278        let file_writer = AilakeFileWriter::new(self.policy.clone());
279        let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
280        let file_size = file_bytes.len() as u64;
281
282        // Store the file
283        self.store.put(&file_path, file_bytes.clone()).await?;
284
285        // Compute centroid for catalog entry
286        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
287
288        // Read back the HNSW offsets from the written file
289        let reader = ailake_file::AilakeFileReader::new(
290            file_bytes,
291            &self.policy.column_name,
292            self.policy.dim,
293        );
294        let header = reader.read_header()?;
295        let ailk_start = reader.ailk_offset()?;
296        let hnsw_abs_offset = ailk_start + header.hnsw_offset;
297        let hnsw_len = header.hnsw_len;
298
299        let mut entry = make_data_file_entry(
300            &file_path,
301            embeddings.len() as u64,
302            file_size,
303            &centroid,
304            VectorIndexInfo {
305                column: &self.policy.column_name,
306                dim: self.policy.dim,
307                hnsw_offset: hnsw_abs_offset,
308                hnsw_len,
309            },
310        );
311        entry.batch_id = batch_id;
312        entry.embedding_model = self
313            .policy
314            .embedding_model
315            .as_ref()
316            .map(|m| m.to_property_value());
317        self.pending_files.push(entry);
318        Ok(())
319    }
320
321    /// Write batch, auto-selecting the index based on detected hardware.
322    ///
323    /// Picks IVF-PQ when a CUDA GPU or ≥8 CPU cores are present AND the batch
324    /// has ≥5 000 vectors. Falls back to HNSW for weaker / local hardware.
325    /// Uses `IvfPqConfig::for_dataset` to scale nlist with dataset size.
326    pub async fn write_batch_auto(
327        &mut self,
328        batch: &RecordBatch,
329        embeddings: &[Vec<f32>],
330    ) -> AilakeResult<()> {
331        let profile = ailake_index::HardwareProfile::detect();
332        if profile.recommend_ivf_pq(embeddings.len()) {
333            let mut ivf_config =
334                ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
335            if self.policy.ivf_residual {
336                ivf_config = ivf_config.with_residual();
337            }
338            self.write_batch_ivf_pq(batch, embeddings, ivf_config).await
339        } else {
340            self.write_batch(batch, embeddings).await
341        }
342    }
343
344    /// Write batch, auto-selecting the index based on detected hardware — deferred variant.
345    ///
346    /// Same hardware detection as `write_batch_auto`: picks IVF-PQ when a CUDA GPU or
347    /// ≥8 CPU cores are present AND the batch has ≥5 000 vectors; falls back to HNSW.
348    ///
349    /// Unlike `write_batch_auto`, the index is built in a background tokio task:
350    /// - Parquet is persisted immediately (~200k vec/s, same as write_parquet_only).
351    /// - HNSW or IVF-PQ index built asynchronously; shard served via flat scan meanwhile.
352    ///
353    /// Use this when ingest throughput matters more than immediate searchability.
354    pub async fn write_batch_auto_deferred(
355        &mut self,
356        batch: &RecordBatch,
357        embeddings: &[Vec<f32>],
358    ) -> AilakeResult<()> {
359        let profile = ailake_index::HardwareProfile::detect();
360        if profile.recommend_ivf_pq(embeddings.len()) {
361            let mut ivf_config =
362                ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
363            if self.policy.ivf_residual {
364                ivf_config = ivf_config.with_residual();
365            }
366            self.write_batch_ivf_pq_deferred(batch, embeddings, ivf_config)
367                .await
368        } else {
369            self.write_batch_deferred(batch, embeddings).await
370        }
371    }
372
373    /// Write batch with IVF-PQ index built synchronously (no background task).
374    ///
375    /// Smaller index than HNSW; better for S3 sequential-scan workloads.
376    pub async fn write_batch_ivf_pq(
377        &mut self,
378        batch: &RecordBatch,
379        embeddings: &[Vec<f32>],
380        ivf_config: IvfPqConfig,
381    ) -> AilakeResult<()> {
382        if self.captured_schema.is_none() {
383            self.captured_schema = Some(batch.schema());
384        }
385        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
386        let file_path = format!("data/part-{:05}.parquet", part_num);
387
388        // Train codebook once on the first shard; all subsequent shards reuse it.
389        // This makes cross-shard ADC distances comparable, eliminating the need
390        // for exact reranking during multi-shard search.
391        if self.cached_ivf_codebook.is_none() {
392            let codebook = tokio::task::spawn_blocking({
393                let embeddings = embeddings.to_vec();
394                let metric = self.policy.metric;
395                let config = ivf_config.clone();
396                move || ailake_index::IvfPqIndex::train_codebook(&embeddings, metric, &config)
397            })
398            .await
399            .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
400            self.cached_ivf_codebook = Some(Arc::new(codebook));
401        }
402        // SAFETY: set to Some in the block above (either pre-existing or just trained).
403        let codebook = self
404            .cached_ivf_codebook
405            .as_ref()
406            .expect("IVF-PQ codebook must be Some after training block")
407            .clone();
408
409        let file_writer = AilakeFileWriter::new(self.policy.clone())
410            .with_index_type(IndexType::IvfPq(ivf_config))
411            .with_shared_ivf_codebook(codebook);
412        let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
413        let file_size = file_bytes.len() as u64;
414
415        self.store.put(&file_path, file_bytes.clone()).await?;
416
417        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
418
419        let reader = ailake_file::AilakeFileReader::new(
420            file_bytes,
421            &self.policy.column_name,
422            self.policy.dim,
423        );
424        let header = reader.read_header()?;
425        let ailk_start = reader.ailk_offset()?;
426        let index_abs_offset = ailk_start + header.hnsw_offset;
427        let index_len = header.hnsw_len;
428
429        let mut entry = make_data_file_entry(
430            &file_path,
431            embeddings.len() as u64,
432            file_size,
433            &centroid,
434            VectorIndexInfo {
435                column: &self.policy.column_name,
436                dim: self.policy.dim,
437                hnsw_offset: index_abs_offset,
438                hnsw_len: index_len,
439            },
440        );
441        entry.embedding_model = self
442            .policy
443            .embedding_model
444            .as_ref()
445            .map(|m| m.to_property_value());
446        self.pending_files.push(entry);
447        Ok(())
448    }
449
450    /// Write a batch with multiple vector columns into a single AI-Lake file.
451    ///
452    /// The first entry in `columns` is treated as the primary column (used for
453    /// geometric pruning). Additional columns each get their own HNSW section.
454    pub async fn write_batch_multi(
455        &mut self,
456        batch: &RecordBatch,
457        columns: &[MultiVectorBatch<'_>],
458    ) -> AilakeResult<()> {
459        use ailake_core::AilakeError;
460        if self.captured_schema.is_none() {
461            self.captured_schema = Some(batch.schema());
462        }
463        if self.extra_vec_policies.is_empty() && columns.len() > 1 {
464            self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
465        }
466
467        if columns.is_empty() {
468            return Err(AilakeError::InvalidArgument(
469                "write_batch_multi requires at least one column".into(),
470            ));
471        }
472
473        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
474        let file_path = format!("data/part-{:05}.parquet", part_num);
475
476        let col_batches: Vec<VectorColumnBatch<'_>> = columns
477            .iter()
478            .map(|c| VectorColumnBatch {
479                policy: &c.policy,
480                embeddings: c.embeddings,
481            })
482            .collect();
483
484        let primary_policy = &columns[0].policy;
485        let file_writer = AilakeFileWriter::new(primary_policy.clone());
486        let file_bytes: Bytes = file_writer.write_multi(batch, &col_batches)?;
487        let file_size = file_bytes.len() as u64;
488
489        self.store.put(&file_path, file_bytes.clone()).await?;
490
491        // Primary centroid for pruning
492        let primary_centroid =
493            compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
494
495        // Read primary AILK header for offsets
496        let reader = ailake_file::AilakeFileReader::new(
497            file_bytes.clone(),
498            &primary_policy.column_name,
499            primary_policy.dim,
500        );
501        let primary_ailk_start = reader.ailk_offset()?;
502        let primary_header = {
503            use ailake_file::HEADER_SIZE;
504            let start = primary_ailk_start as usize;
505            let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
506                .try_into()
507                .map_err(|_| AilakeError::NotAnAilakeFile)?;
508            ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
509        };
510        let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
511
512        // Extra column index metadata
513        let mut extra: Vec<ExtraVectorIndex> = Vec::new();
514        for col in columns.iter().skip(1) {
515            let col_ailk_start = reader.ailk_offset_for_column(&col.policy.column_name)?;
516            let col_header = {
517                use ailake_file::HEADER_SIZE;
518                let start = col_ailk_start as usize;
519                let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
520                    .try_into()
521                    .map_err(|_| AilakeError::NotAnAilakeFile)?;
522                ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
523            };
524            let col_centroid = compute_centroid_and_radius(col.embeddings, col.policy.metric);
525            extra.push(ExtraVectorIndex {
526                column: col.policy.column_name.clone(),
527                dim: col.policy.dim,
528                hnsw_offset: col_ailk_start + col_header.hnsw_offset,
529                hnsw_len: col_header.hnsw_len,
530                centroid_b64: Some(encode_centroid_b64(&col_centroid)),
531                radius: Some(col_centroid.radius),
532            });
533        }
534
535        let mut entry = make_multi_column_data_file_entry(
536            &file_path,
537            columns[0].embeddings.len() as u64,
538            file_size,
539            &primary_centroid,
540            VectorIndexInfo {
541                column: &primary_policy.column_name,
542                dim: primary_policy.dim,
543                hnsw_offset: primary_hnsw_abs,
544                hnsw_len: primary_header.hnsw_len,
545            },
546            &extra,
547        );
548        entry.embedding_model = self
549            .policy
550            .embedding_model
551            .as_ref()
552            .map(|m| m.to_property_value());
553        self.pending_files.push(entry);
554        Ok(())
555    }
556
557    /// Write a multi-column batch as Parquet-only immediately; build all N column
558    /// HNSW indexes in a single background task.
559    ///
560    /// Same semantics as `write_batch_deferred` but for N vector columns:
561    /// - Parquet (primary column bytes) is persisted immediately (~200k vec/s).
562    /// - A background tokio task rebuilds the full AILK file via `write_multi` and
563    ///   patches the catalog entry with primary + extra column offsets once ready.
564    /// - During the build window, `SearchSession` serves this shard via GPU/CPU flat
565    ///   scan. Transition to HNSW-indexed search is automatic on `IndexStatus::Ready`.
566    ///
567    /// All N column embeddings are cloned into the background task; choose batch size
568    /// so that N×rows×dim×4 bytes fits comfortably in RAM while the task runs.
569    pub async fn write_batch_multi_deferred(
570        &mut self,
571        batch: &RecordBatch,
572        columns: &[MultiVectorBatch<'_>],
573    ) -> AilakeResult<()> {
574        use ailake_core::AilakeError;
575        if columns.is_empty() {
576            return Err(AilakeError::InvalidArgument(
577                "write_batch_multi_deferred requires at least one column".into(),
578            ));
579        }
580        if self.captured_schema.is_none() {
581            self.captured_schema = Some(batch.schema());
582        }
583        if self.extra_vec_policies.is_empty() && columns.len() > 1 {
584            self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
585        }
586
587        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
588        let file_path = format!("data/part-{:05}.parquet", part_num);
589
590        // Immediate path: write Parquet with primary column only (no AILK sections yet).
591        let primary_policy = &columns[0].policy;
592        let file_writer = AilakeFileWriter::new(primary_policy.clone());
593        let parquet_bytes = file_writer.write_parquet_only(batch, columns[0].embeddings)?;
594        let file_size = parquet_bytes.len() as u64;
595        self.store.put(&file_path, parquet_bytes).await?;
596
597        // Primary centroid enables geometric pruning during the build window.
598        let primary_centroid =
599            compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
600        let mut entry = make_data_file_entry_indexing(
601            &file_path,
602            columns[0].embeddings.len() as u64,
603            file_size,
604            &primary_centroid,
605            &primary_policy.column_name,
606            primary_policy.dim,
607        );
608        // Populate extra_vector_indexes with centroids/radii for pruning.
609        // hnsw_offset/len are 0 until the background task patches them to non-zero.
610        entry.extra_vector_indexes = columns[1..]
611            .iter()
612            .map(|c| {
613                let col_centroid = compute_centroid_and_radius(c.embeddings, c.policy.metric);
614                ExtraVectorIndex {
615                    column: c.policy.column_name.clone(),
616                    dim: c.policy.dim,
617                    hnsw_offset: 0,
618                    hnsw_len: 0,
619                    centroid_b64: Some(encode_centroid_b64(&col_centroid)),
620                    radius: Some(col_centroid.radius),
621                }
622            })
623            .collect();
624        entry.embedding_model = self
625            .policy
626            .embedding_model
627            .as_ref()
628            .map(|m| m.to_property_value());
629        self.pending_files.push(entry);
630
631        // Clone all column data for the background task.
632        let all_policies: Vec<VectorStoragePolicy> =
633            columns.iter().map(|c| c.policy.clone()).collect();
634        let all_embeddings: Vec<Vec<Vec<f32>>> =
635            columns.iter().map(|c| c.embeddings.to_vec()).collect();
636        let store = self.store.clone();
637        let catalog = self.catalog.clone();
638        let table = self.table.clone();
639        let fp = file_path.clone();
640        tokio::spawn(async move {
641            if let Err(e) =
642                build_and_patch_multi_index(store, catalog, all_policies, table, fp, all_embeddings)
643                    .await
644            {
645                error!(
646                    "ailake: deferred multi-column HNSW build failed — shard stays in flat-scan \
647                     mode until next compaction rebuilds the index: {}",
648                    e
649                );
650            }
651        });
652
653        Ok(())
654    }
655
656    /// Commit all staged files as a new Iceberg snapshot.
657    ///
658    /// No-op when `pending_files` is empty (e.g., all `write_batch_idempotent`
659    /// calls were skipped because their `batch_id` was already committed).
660    /// Returns the current snapshot id in that case (or 0 if no snapshot exists yet).
661    pub async fn commit(mut self) -> AilakeResult<SnapshotId> {
662        if self.pending_files.is_empty() {
663            let current = self
664                .catalog
665                .load_table(&self.table)
666                .await
667                .ok()
668                .and_then(|m| m.current_snapshot_id)
669                .unwrap_or(0);
670            return Ok(current);
671        }
672        let iceberg_schema = self
673            .captured_schema
674            .as_deref()
675            .map(|s| arrow_schema_to_iceberg_update(s, &self.policy, &self.extra_vec_policies));
676        // Store secondary column dims/metrics as table-level properties so
677        // search_multimodal can discover them without reading Parquet files.
678        let mut extra_properties = std::collections::HashMap::new();
679        for ep in &self.extra_vec_policies {
680            extra_properties.insert(format!("ailake.dim-{}", ep.column_name), ep.dim.to_string());
681            extra_properties.insert(
682                format!("ailake.metric-{}", ep.column_name),
683                ailake_parquet::schema::metric_str(ep.metric).to_string(),
684            );
685        }
686        let snapshot = NewSnapshot {
687            snapshot_id: new_snapshot_id(),
688            parent_snapshot_id: self.parent_snapshot_id,
689            files: std::mem::take(&mut self.pending_files),
690            operation: SnapshotOperation::Append,
691            iceberg_schema,
692            extra_properties,
693        };
694        self.catalog.commit_snapshot(&self.table, snapshot).await
695    }
696
697    /// Create a table if it doesn't exist, then return a writer for it.
698    pub async fn create_or_open(
699        catalog: Arc<dyn CatalogProvider>,
700        store: Arc<dyn Store>,
701        policy: VectorStoragePolicy,
702        table: TableIdent,
703    ) -> AilakeResult<Self> {
704        // Try to load; if not found, create
705        match catalog.load_table(&table).await {
706            Ok(existing_meta) => {
707                // Warn when writing with a different model name into an existing table.
708                // Dim mismatch is a hard error caught at write_batch time; name divergence
709                // is softer — same dim, different model (e.g. fine-tune vs base) — warn only.
710                if let Some(incoming) = &policy.embedding_model {
711                    if let Some(stored_val) = existing_meta
712                        .properties
713                        .get(EmbeddingModelInfo::property_key())
714                    {
715                        let stored = EmbeddingModelInfo::from_property_value(stored_val);
716                        if stored.name != incoming.name {
717                            warn!(
718                                "ailake: embedding model name changed: table has '{}', writing with '{}' \
719                                 (dim={}). Vectors may be incompatible for similarity search.",
720                                stored.name, incoming.name, policy.dim
721                            );
722                        }
723                    }
724                }
725            }
726            Err(_) => {
727                catalog
728                    .create_table(
729                        &table,
730                        &TableProperties {
731                            policy: policy.clone(),
732                            extra: std::collections::HashMap::new(),
733                        },
734                    )
735                    .await?;
736            }
737        }
738        Ok(Self::new(catalog, store, policy, table))
739    }
740}
741
742/// Convert an Arrow schema to an Iceberg schema update for catalog commits.
743///
744/// Top-level field IDs are assigned sequentially (1-based) and match the
745/// `PARQUET:field_id` stamps written by `ParquetVectorWriter`. Nested element
746/// IDs (inside List/Struct/Map) are assigned after all top-level IDs are
747/// pre-reserved, so they never collide with Parquet column field IDs.
748fn arrow_schema_to_iceberg_update(
749    schema: &arrow_schema::Schema,
750    policy: &VectorStoragePolicy,
751    extra_vec_policies: &[VectorStoragePolicy],
752) -> IcebergSchemaUpdate {
753    let bytes_per_dim = policy.precision.bytes_per_element() as u32;
754    let vec_fixed_len = policy.dim * bytes_per_dim;
755
756    // Collect all vector column names that will appear in the final schema.
757    let has_primary_in_batch = schema
758        .fields()
759        .iter()
760        .any(|f| f.name() == &policy.column_name);
761    let vec_cols: Vec<(String, u32)> = {
762        let mut v = Vec::new();
763        if !has_primary_in_batch {
764            v.push((policy.column_name.clone(), vec_fixed_len));
765        }
766        for ep in extra_vec_policies {
767            let ep_fixed_len = ep.dim * ep.precision.bytes_per_element() as u32;
768            if !schema.fields().iter().any(|f| f.name() == &ep.column_name) {
769                v.push((ep.column_name.clone(), ep_fixed_len));
770            }
771        }
772        v
773    };
774
775    // Total top-level columns = batch fields + appended vec columns.
776    let top_level_count = schema.fields().len() + vec_cols.len();
777    // Nested element IDs start after all top-level IDs are pre-reserved.
778    let mut nested_id = top_level_count as i32;
779
780    let mut fields: Vec<serde_json::Value> = Vec::new();
781    let mut name_mapping: Vec<serde_json::Value> = Vec::new();
782
783    for (idx, field) in schema.fields().iter().enumerate() {
784        let field_id = (idx + 1) as i32;
785        let iceberg_type = arrow_type_to_iceberg(field.data_type(), &mut nested_id);
786        fields.push(serde_json::json!({
787            "id": field_id,
788            "name": field.name(),
789            "required": false,
790            "type": iceberg_type,
791        }));
792        name_mapping.push(serde_json::json!({
793            "field-id": field_id,
794            "names": [field.name()],
795        }));
796    }
797
798    // Append vector columns that live outside the RecordBatch schema.
799    for (i, (col_name, fixed_len)) in vec_cols.iter().enumerate() {
800        let field_id = (schema.fields().len() + 1 + i) as i32;
801        fields.push(serde_json::json!({
802            "id": field_id,
803            "name": col_name,
804            "required": false,
805            "type": format!("fixed[{fixed_len}]"),
806        }));
807        name_mapping.push(serde_json::json!({
808            "field-id": field_id,
809            "names": [col_name],
810        }));
811    }
812
813    let last_column_id = nested_id;
814    let name_mapping_json = serde_json::to_string(&name_mapping).unwrap_or_else(|_| "[]".into());
815
816    IcebergSchemaUpdate {
817        fields,
818        last_column_id,
819        name_mapping_json,
820    }
821}
822
823/// Map an Arrow DataType to an Iceberg schema type value (string or JSON object).
824///
825/// `nested_id` is a shared counter for generating unique element/field IDs inside
826/// List, Struct, and Map types. It must start beyond all pre-reserved top-level IDs.
827fn arrow_type_to_iceberg(dt: &arrow_schema::DataType, nested_id: &mut i32) -> serde_json::Value {
828    use arrow_schema::DataType;
829    match dt {
830        DataType::Boolean => serde_json::json!("boolean"),
831        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
832            serde_json::json!("int")
833        }
834        DataType::Int64 | DataType::UInt32 | DataType::UInt64 => serde_json::json!("long"),
835        DataType::Float16 | DataType::Float32 => serde_json::json!("float"),
836        DataType::Float64 => serde_json::json!("double"),
837        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => serde_json::json!("string"),
838        DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
839            serde_json::json!("binary")
840        }
841        DataType::Date32 | DataType::Date64 => serde_json::json!("date"),
842        // Timestamp with timezone → timestamptz; without → timestamp.
843        DataType::Timestamp(_, Some(_)) => serde_json::json!("timestamptz"),
844        DataType::Timestamp(_, None) => serde_json::json!("timestamp"),
845        DataType::Time32(_) | DataType::Time64(_) => serde_json::json!("time"),
846        DataType::FixedSizeBinary(n) => serde_json::json!(format!("fixed[{n}]")),
847        DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
848            serde_json::json!(format!("decimal({p}, {s})"))
849        }
850        DataType::List(inner)
851        | DataType::LargeList(inner)
852        | DataType::ListView(inner)
853        | DataType::FixedSizeList(inner, _) => {
854            *nested_id += 1;
855            let element_id = *nested_id;
856            let element_type = arrow_type_to_iceberg(inner.data_type(), nested_id);
857            serde_json::json!({
858                "type": "list",
859                "element-id": element_id,
860                "element": element_type,
861                "element-required": !inner.is_nullable(),
862            })
863        }
864        DataType::Struct(arrow_fields) => {
865            let struct_fields: Vec<serde_json::Value> = arrow_fields
866                .iter()
867                .map(|f| {
868                    *nested_id += 1;
869                    let fid = *nested_id;
870                    let ftype = arrow_type_to_iceberg(f.data_type(), nested_id);
871                    serde_json::json!({
872                        "id": fid,
873                        "name": f.name(),
874                        "required": !f.is_nullable(),
875                        "type": ftype,
876                    })
877                })
878                .collect();
879            serde_json::json!({ "type": "struct", "fields": struct_fields })
880        }
881        DataType::Map(entries, _) => {
882            // Arrow Map is List<Struct<key: K, value: V>>.
883            *nested_id += 1;
884            let key_id = *nested_id;
885            *nested_id += 1;
886            let val_id = *nested_id;
887            if let DataType::Struct(kv_fields) = entries.data_type() {
888                let key_f = kv_fields
889                    .iter()
890                    .find(|f| f.name() == "key" || f.name() == "keys");
891                let val_f = kv_fields
892                    .iter()
893                    .find(|f| f.name() == "value" || f.name() == "values");
894                let key_type = key_f
895                    .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
896                    .unwrap_or(serde_json::json!("binary"));
897                let val_type = val_f
898                    .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
899                    .unwrap_or(serde_json::json!("binary"));
900                let val_required = val_f.map(|f| !f.is_nullable()).unwrap_or(false);
901                serde_json::json!({
902                    "type": "map",
903                    "key-id": key_id,
904                    "key": key_type,
905                    "value-id": val_id,
906                    "value": val_type,
907                    "value-required": val_required,
908                })
909            } else {
910                serde_json::json!("binary")
911            }
912        }
913        _ => serde_json::json!("binary"),
914    }
915}
916
917/// Background task: reads a Parquet-only shard, builds full AILK file, patches catalog.
918async fn build_and_patch_index(
919    store: Arc<dyn Store>,
920    catalog: Arc<dyn CatalogProvider>,
921    policy: VectorStoragePolicy,
922    table: TableIdent,
923    file_path: String,
924) -> AilakeResult<()> {
925    // Read the Parquet-only bytes already stored.
926    let parquet_bytes = store.get(&file_path).await?;
927    let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
928    let (batch, embeddings) = reader.read_parquet()?;
929
930    // Build the full AILK file (Parquet + HNSW) — CPU-intensive; run on blocking pool
931    // so the tokio async threads aren't starved when many shards build concurrently.
932    let full_bytes = tokio::task::spawn_blocking({
933        let policy = policy.clone();
934        move || {
935            let file_writer = AilakeFileWriter::new(policy);
936            file_writer.write(&batch, &embeddings)
937        }
938    })
939    .await
940    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
941
942    // Extract HNSW offsets from the newly written file.
943    let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
944    let header = full_reader.read_header()?;
945    let ailk_start = full_reader.ailk_offset()?;
946    let hnsw_abs_offset = ailk_start + header.hnsw_offset;
947    let hnsw_len = header.hnsw_len;
948
949    // Overwrite the Parquet-only file with the full AILK version.
950    store.put(&file_path, full_bytes).await?;
951
952    // Wait for the initial writer commit to appear (max 60 s).
953    // HNSW builds can finish before the main write loop calls commit_snapshot.
954    let mut committed = false;
955    for _ in 0..120u32 {
956        match catalog.load_table(&table).await {
957            Ok(meta) if meta.current_snapshot_id.is_some() => {
958                committed = true;
959                break;
960            }
961            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
962        }
963    }
964    if !committed {
965        return Err(ailake_core::AilakeError::Store(format!(
966            "deferred HNSW build: no snapshot committed for {file_path} after 60 s — \
967             did you call TableWriter::commit()?"
968        )));
969    }
970
971    // Update the catalog with CAS-like retry to handle concurrent background tasks.
972    // Multiple tasks can race on commit_snapshot(Replace): the last writer wins and
973    // may overwrite a sibling task's Ready status. Retry until we confirm our file
974    // is marked Ready in the current snapshot.
975    for attempt in 0..50u32 {
976        let table_meta = catalog.load_table(&table).await?;
977        let parent_snapshot_id = table_meta.current_snapshot_id;
978        let mut files = catalog.list_files(&table, None).await?;
979
980        // Already marked Ready by a previous successful attempt.
981        if files
982            .iter()
983            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
984        {
985            break;
986        }
987
988        for f in &mut files {
989            if f.path == file_path {
990                f.hnsw_offset = Some(hnsw_abs_offset);
991                f.hnsw_len = Some(hnsw_len);
992                f.index_status = IndexStatus::Ready;
993                break;
994            }
995        }
996        catalog
997            .commit_snapshot(
998                &table,
999                NewSnapshot {
1000                    snapshot_id: new_snapshot_id(),
1001                    parent_snapshot_id,
1002                    files,
1003                    operation: SnapshotOperation::Replace,
1004                    iceberg_schema: None,
1005                    extra_properties: std::collections::HashMap::new(),
1006                },
1007            )
1008            .await?;
1009
1010        // Brief yield so sibling tasks can commit, then verify our change survived.
1011        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1012
1013        let verify = catalog.list_files(&table, None).await?;
1014        if verify
1015            .iter()
1016            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1017        {
1018            break;
1019        }
1020        // Another task overwrote us — retry.
1021    }
1022
1023    info!(
1024        "ailake: deferred HNSW index built for {} (offset={}, len={})",
1025        file_path, hnsw_abs_offset, hnsw_len
1026    );
1027    Ok(())
1028}
1029
1030/// Background task: train IVF-PQ (using shared codebook) and patch catalog entry.
1031///
1032/// The OnceCell guarantees that k-means training runs exactly once across all
1033/// concurrent background tasks — subsequent tasks skip directly to assign+encode.
1034async fn build_ivf_pq_and_patch_index(
1035    store: Arc<dyn Store>,
1036    catalog: Arc<dyn CatalogProvider>,
1037    policy: VectorStoragePolicy,
1038    table: TableIdent,
1039    file_path: String,
1040    ivf_config: IvfPqConfig,
1041    codebook_cell: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
1042) -> AilakeResult<()> {
1043    let parquet_bytes = store.get(&file_path).await?;
1044    let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
1045    let (batch, embeddings) = reader.read_parquet()?;
1046
1047    // Get or train the shared codebook. First task trains; all others await the result.
1048    let codebook = codebook_cell
1049        .get_or_try_init(|| async {
1050            let vecs = embeddings.clone();
1051            let metric = policy.metric;
1052            let cfg = ivf_config.clone();
1053            tokio::task::spawn_blocking(move || {
1054                ailake_index::IvfPqIndex::train_codebook(&vecs, metric, &cfg)
1055            })
1056            .await
1057            .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))?
1058        })
1059        .await?;
1060
1061    let full_bytes = tokio::task::spawn_blocking({
1062        let policy = policy.clone();
1063        let codebook = codebook.clone();
1064        move || {
1065            let file_writer = AilakeFileWriter::new(policy)
1066                .with_index_type(IndexType::IvfPq(ivf_config))
1067                .with_shared_ivf_codebook(Arc::new(codebook));
1068            file_writer.write(&batch, &embeddings)
1069        }
1070    })
1071    .await
1072    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1073
1074    let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
1075    let header = full_reader.read_header()?;
1076    let ailk_start = full_reader.ailk_offset()?;
1077    let hnsw_abs_offset = ailk_start + header.hnsw_offset;
1078    let hnsw_len = header.hnsw_len;
1079
1080    store.put(&file_path, full_bytes).await?;
1081
1082    // Wait for initial commit to appear then patch IndexStatus::Ready (max 60 s).
1083    let mut committed = false;
1084    for _ in 0..120u32 {
1085        match catalog.load_table(&table).await {
1086            Ok(meta) if meta.current_snapshot_id.is_some() => {
1087                committed = true;
1088                break;
1089            }
1090            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1091        }
1092    }
1093    if !committed {
1094        return Err(ailake_core::AilakeError::Store(format!(
1095            "deferred IVF-PQ build: no snapshot committed for {file_path} after 60 s — \
1096             did you call TableWriter::commit()?"
1097        )));
1098    }
1099
1100    for attempt in 0..50u32 {
1101        let table_meta = catalog.load_table(&table).await?;
1102        let parent_snapshot_id = table_meta.current_snapshot_id;
1103        let mut files = catalog.list_files(&table, None).await?;
1104
1105        if files
1106            .iter()
1107            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1108        {
1109            break;
1110        }
1111
1112        for f in &mut files {
1113            if f.path == file_path {
1114                f.hnsw_offset = Some(hnsw_abs_offset);
1115                f.hnsw_len = Some(hnsw_len);
1116                f.index_status = IndexStatus::Ready;
1117                break;
1118            }
1119        }
1120        catalog
1121            .commit_snapshot(
1122                &table,
1123                NewSnapshot {
1124                    snapshot_id: new_snapshot_id(),
1125                    parent_snapshot_id,
1126                    files,
1127                    operation: SnapshotOperation::Replace,
1128                    iceberg_schema: None,
1129                    extra_properties: std::collections::HashMap::new(),
1130                },
1131            )
1132            .await?;
1133
1134        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1135
1136        let verify = catalog.list_files(&table, None).await?;
1137        if verify
1138            .iter()
1139            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1140        {
1141            break;
1142        }
1143    }
1144
1145    info!(
1146        "ailake: deferred IVF-PQ index built for {} (offset={}, len={})",
1147        file_path, hnsw_abs_offset, hnsw_len
1148    );
1149    Ok(())
1150}
1151
1152/// Background task: rebuild full multi-column AILK file and patch all column offsets.
1153///
1154/// Reads the Parquet-only shard, calls `write_multi` with all N column embeddings
1155/// (cloned from the caller), extracts per-column HNSW offsets, overwrites the file,
1156/// then applies the same CAS retry loop used by single-column deferred tasks.
1157async fn build_and_patch_multi_index(
1158    store: Arc<dyn Store>,
1159    catalog: Arc<dyn CatalogProvider>,
1160    policies: Vec<VectorStoragePolicy>,
1161    table: TableIdent,
1162    file_path: String,
1163    all_embeddings: Vec<Vec<Vec<f32>>>,
1164) -> AilakeResult<()> {
1165    // Read the Parquet-only shard (primary column only).
1166    let parquet_bytes = store.get(&file_path).await?;
1167    let primary_reader =
1168        AilakeFileReader::new(parquet_bytes, &policies[0].column_name, policies[0].dim);
1169    let (batch, _) = primary_reader.read_parquet()?;
1170
1171    // Build full AILK file with all N column HNSW sections on the blocking pool.
1172    let full_bytes = tokio::task::spawn_blocking({
1173        let policies = policies.clone();
1174        let all_embeddings = all_embeddings.clone();
1175        move || {
1176            let col_batches: Vec<VectorColumnBatch<'_>> = policies
1177                .iter()
1178                .zip(all_embeddings.iter())
1179                .map(|(p, embs)| VectorColumnBatch {
1180                    policy: p,
1181                    embeddings: embs.as_slice(),
1182                })
1183                .collect();
1184            let file_writer = AilakeFileWriter::new(policies[0].clone());
1185            file_writer.write_multi(&batch, &col_batches)
1186        }
1187    })
1188    .await
1189    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1190
1191    // Extract primary HNSW offsets.
1192    let primary_reader = AilakeFileReader::new(
1193        full_bytes.clone(),
1194        &policies[0].column_name,
1195        policies[0].dim,
1196    );
1197    let primary_header = primary_reader.read_header()?;
1198    let primary_ailk_start = primary_reader.ailk_offset()?;
1199    let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
1200    let primary_hnsw_len = primary_header.hnsw_len;
1201
1202    // Extract extra column HNSW offsets (one reader per column).
1203    // Must use ailk_offset_for_column / read_header_for_column so each column's
1204    // own `ailake.{col}.footer_offset` is used — ailk_offset() always returns the
1205    // primary column offset, which is wrong for extra columns.
1206    let mut extra_offsets: Vec<(u64, u64)> = Vec::with_capacity(policies.len().saturating_sub(1));
1207    for col_policy in policies.iter().skip(1) {
1208        let col_reader =
1209            AilakeFileReader::new(full_bytes.clone(), &col_policy.column_name, col_policy.dim);
1210        let col_ailk_start = col_reader.ailk_offset_for_column(&col_policy.column_name)?;
1211        let col_header = col_reader.read_header_for_column(&col_policy.column_name)?;
1212        extra_offsets.push((col_ailk_start + col_header.hnsw_offset, col_header.hnsw_len));
1213    }
1214
1215    // Overwrite the Parquet-only shard with the full AILK file.
1216    store.put(&file_path, full_bytes).await?;
1217
1218    // Wait for the initial writer commit to appear (max 60 s).
1219    let mut committed = false;
1220    for _ in 0..120u32 {
1221        match catalog.load_table(&table).await {
1222            Ok(meta) if meta.current_snapshot_id.is_some() => {
1223                committed = true;
1224                break;
1225            }
1226            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1227        }
1228    }
1229    if !committed {
1230        return Err(ailake_core::AilakeError::Store(format!(
1231            "deferred index build: no snapshot committed for {file_path} after 60 s — \
1232             did you call TableWriter::commit()?"
1233        )));
1234    }
1235
1236    // CAS retry loop: patch primary offsets + extra_vector_indexes + IndexStatus::Ready.
1237    for attempt in 0..50u32 {
1238        let table_meta = catalog.load_table(&table).await?;
1239        let parent_snapshot_id = table_meta.current_snapshot_id;
1240        let mut files = catalog.list_files(&table, None).await?;
1241
1242        if files
1243            .iter()
1244            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1245        {
1246            break;
1247        }
1248
1249        for f in &mut files {
1250            if f.path == file_path {
1251                f.hnsw_offset = Some(primary_hnsw_abs);
1252                f.hnsw_len = Some(primary_hnsw_len);
1253                f.index_status = IndexStatus::Ready;
1254                for (i, &(off, len)) in extra_offsets.iter().enumerate() {
1255                    if let Some(xi) = f.extra_vector_indexes.get_mut(i) {
1256                        xi.hnsw_offset = off;
1257                        xi.hnsw_len = len;
1258                    }
1259                }
1260                break;
1261            }
1262        }
1263        catalog
1264            .commit_snapshot(
1265                &table,
1266                NewSnapshot {
1267                    snapshot_id: new_snapshot_id(),
1268                    parent_snapshot_id,
1269                    files,
1270                    operation: SnapshotOperation::Replace,
1271                    iceberg_schema: None,
1272                    extra_properties: std::collections::HashMap::new(),
1273                },
1274            )
1275            .await?;
1276
1277        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1278
1279        let verify = catalog.list_files(&table, None).await?;
1280        if verify
1281            .iter()
1282            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1283        {
1284            break;
1285        }
1286    }
1287
1288    info!(
1289        "ailake: deferred multi-column HNSW built for {} ({} cols, primary offset={})",
1290        file_path,
1291        policies.len(),
1292        primary_hnsw_abs
1293    );
1294    Ok(())
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299    use super::*;
1300    use ailake_core::{VectorMetric, VectorPrecision};
1301    use arrow_schema::{DataType, Field, Schema, TimeUnit};
1302
1303    fn policy(col: &str, dim: u32) -> VectorStoragePolicy {
1304        VectorStoragePolicy {
1305            column_name: col.to_string(),
1306            dim,
1307            metric: VectorMetric::Cosine,
1308            precision: VectorPrecision::F16,
1309            pq: None,
1310            keep_raw_for_reranking: true,
1311            pre_normalize: false,
1312            hnsw_m: None,
1313            hnsw_ef_construction: None,
1314            ivf_residual: false,
1315            embedding_model: None,
1316            modality: None,
1317        }
1318    }
1319
1320    fn update_for(schema: &Schema, pol: &VectorStoragePolicy) -> IcebergSchemaUpdate {
1321        arrow_schema_to_iceberg_update(schema, pol, &[])
1322    }
1323
1324    #[test]
1325    fn simple_schema_produces_correct_fields() {
1326        let schema = Schema::new(vec![
1327            Field::new("id", DataType::Int32, false),
1328            Field::new("text", DataType::Utf8, false),
1329        ]);
1330        let pol = policy("embedding", 8);
1331        let upd = update_for(&schema, &pol);
1332
1333        assert_eq!(upd.fields.len(), 3);
1334        assert_eq!(upd.fields[0]["id"], 1);
1335        assert_eq!(upd.fields[0]["type"], "int");
1336        assert_eq!(upd.fields[1]["id"], 2);
1337        assert_eq!(upd.fields[1]["type"], "string");
1338        assert_eq!(upd.fields[2]["id"], 3);
1339        assert_eq!(upd.fields[2]["type"], "fixed[16]"); // dim=8, F16=2 bytes
1340
1341        let nm: Vec<serde_json::Value> = serde_json::from_str(&upd.name_mapping_json).unwrap();
1342        assert_eq!(nm.len(), 3);
1343        assert_eq!(nm[2]["field-id"], 3);
1344        assert_eq!(nm[2]["names"][0], "embedding");
1345        assert_eq!(upd.last_column_id, 3);
1346    }
1347
1348    #[test]
1349    fn timestamp_without_tz_maps_to_timestamp_not_timestamptz() {
1350        let schema = Schema::new(vec![
1351            Field::new(
1352                "created_at",
1353                DataType::Timestamp(TimeUnit::Microsecond, None),
1354                true,
1355            ),
1356            Field::new(
1357                "updated_at",
1358                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1359                true,
1360            ),
1361        ]);
1362        let pol = policy("vec", 4);
1363        let upd = update_for(&schema, &pol);
1364
1365        assert_eq!(upd.fields[0]["type"], "timestamp");
1366        assert_eq!(upd.fields[1]["type"], "timestamptz");
1367    }
1368
1369    #[test]
1370    fn list_type_produces_iceberg_list_object() {
1371        let schema = Schema::new(vec![Field::new(
1372            "tags",
1373            DataType::List(std::sync::Arc::new(Field::new(
1374                "item",
1375                DataType::Utf8,
1376                true,
1377            ))),
1378            true,
1379        )]);
1380        let pol = policy("vec", 4);
1381        let upd = update_for(&schema, &pol);
1382
1383        let t = &upd.fields[0]["type"];
1384        assert_eq!(t["type"], "list");
1385        assert_eq!(t["element"], "string");
1386        // element-id must be > top-level field count (2: tags + vec)
1387        assert!(t["element-id"].as_i64().unwrap() > 2);
1388    }
1389
1390    #[test]
1391    fn struct_type_produces_nested_fields() {
1392        let schema = Schema::new(vec![Field::new(
1393            "meta",
1394            DataType::Struct(
1395                vec![
1396                    Field::new("key", DataType::Utf8, false),
1397                    Field::new("val", DataType::Int64, false),
1398                ]
1399                .into(),
1400            ),
1401            true,
1402        )]);
1403        let pol = policy("vec", 4);
1404        let upd = update_for(&schema, &pol);
1405
1406        let t = &upd.fields[0]["type"];
1407        assert_eq!(t["type"], "struct");
1408        let nested = t["fields"].as_array().unwrap();
1409        assert_eq!(nested.len(), 2);
1410        assert_eq!(nested[0]["name"], "key");
1411        assert_eq!(nested[0]["type"], "string");
1412        assert_eq!(nested[1]["name"], "val");
1413        assert_eq!(nested[1]["type"], "long");
1414        // Nested IDs must be > top-level count (2: meta + vec)
1415        assert!(nested[0]["id"].as_i64().unwrap() > 2);
1416    }
1417
1418    #[test]
1419    fn no_duplicate_vec_column_when_already_in_batch() {
1420        // If for some reason the vec column is in the batch schema, don't add it twice.
1421        let schema = Schema::new(vec![
1422            Field::new("id", DataType::Int32, false),
1423            Field::new("embedding", DataType::FixedSizeBinary(16), false),
1424        ]);
1425        let pol = policy("embedding", 8);
1426        let upd = update_for(&schema, &pol);
1427
1428        assert_eq!(upd.fields.len(), 2, "should not add embedding twice");
1429        let names: Vec<&str> = upd
1430            .fields
1431            .iter()
1432            .map(|f| f["name"].as_str().unwrap())
1433            .collect();
1434        assert_eq!(names.iter().filter(|&&n| n == "embedding").count(), 1);
1435    }
1436
1437    #[test]
1438    fn multi_vec_policies_all_appended() {
1439        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1440        let primary = policy("embedding", 4);
1441        let extra = vec![policy("context_embedding", 4)];
1442        let upd = arrow_schema_to_iceberg_update(&schema, &primary, &extra);
1443
1444        assert_eq!(upd.fields.len(), 3); // id + embedding + context_embedding
1445        let names: Vec<&str> = upd
1446            .fields
1447            .iter()
1448            .map(|f| f["name"].as_str().unwrap())
1449            .collect();
1450        assert!(names.contains(&"embedding"));
1451        assert!(names.contains(&"context_embedding"));
1452    }
1453
1454    #[test]
1455    fn top_level_field_ids_match_parquet_stamp_sequence() {
1456        // Top-level IDs must be 1, 2, ..., N regardless of nested element IDs.
1457        let schema = Schema::new(vec![
1458            Field::new("id", DataType::Int64, false),
1459            Field::new(
1460                "tags",
1461                DataType::List(std::sync::Arc::new(Field::new(
1462                    "item",
1463                    DataType::Utf8,
1464                    true,
1465                ))),
1466                true,
1467            ),
1468        ]);
1469        let pol = policy("vec", 4);
1470        let upd = update_for(&schema, &pol);
1471
1472        // Top-level: id=1, tags=2, vec=3
1473        assert_eq!(upd.fields[0]["id"], 1);
1474        assert_eq!(upd.fields[1]["id"], 2);
1475        assert_eq!(upd.fields[2]["id"], 3);
1476
1477        // Nested element ID must be > 3
1478        assert!(upd.fields[1]["type"]["element-id"].as_i64().unwrap() > 3);
1479    }
1480
1481    /// Smoke-test write_batch_auto_deferred: verifies that it completes without error
1482    /// and stages a pending file entry (index built asynchronously in background).
1483    #[tokio::test]
1484    async fn write_batch_auto_deferred_stages_file() {
1485        use ailake_catalog::{HadoopCatalog, TableIdent};
1486        use ailake_store::LocalStore;
1487        use arrow_schema::{DataType, Field, Schema};
1488
1489        let dir = tempfile::tempdir().unwrap();
1490        let store: std::sync::Arc<dyn ailake_store::Store> =
1491            std::sync::Arc::new(LocalStore::new(dir.path().to_str().unwrap()));
1492        let catalog = std::sync::Arc::new(HadoopCatalog::new(std::sync::Arc::clone(&store), ""));
1493        let pol = policy("embedding", 4);
1494        let ident = TableIdent::new("default", "t");
1495
1496        let mut writer = TableWriter::create_or_open(catalog, store, pol, ident)
1497            .await
1498            .unwrap();
1499
1500        let schema =
1501            std::sync::Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
1502        let batch = arrow_array::RecordBatch::try_new(
1503            schema,
1504            vec![std::sync::Arc::new(arrow_array::StringArray::from(vec![
1505                "hello",
1506            ]))],
1507        )
1508        .unwrap();
1509        let embeddings = vec![vec![1.0f32, 0.0, 0.0, 0.0]];
1510
1511        writer
1512            .write_batch_auto_deferred(&batch, &embeddings)
1513            .await
1514            .unwrap();
1515
1516        // One pending file should be staged even before commit.
1517        assert_eq!(writer.pending_files.len(), 1);
1518    }
1519
1520    /// Smoke-test write_batch_multi_deferred: verifies Parquet staged immediately,
1521    /// placeholder extra_vector_indexes populated, and background task spawned.
1522    #[tokio::test]
1523    async fn write_batch_multi_deferred_stages_file_with_extra_indexes() {
1524        use ailake_catalog::{HadoopCatalog, IndexStatus, TableIdent};
1525        use ailake_store::LocalStore;
1526        use arrow_schema::{DataType, Field, Schema};
1527
1528        let dir = tempfile::tempdir().unwrap();
1529        let store: std::sync::Arc<dyn ailake_store::Store> =
1530            std::sync::Arc::new(LocalStore::new(dir.path().to_str().unwrap()));
1531        let catalog = std::sync::Arc::new(HadoopCatalog::new(std::sync::Arc::clone(&store), ""));
1532        let primary_pol = policy("embedding", 4);
1533        let ident = TableIdent::new("default", "t");
1534
1535        let mut writer = TableWriter::create_or_open(catalog, store, primary_pol, ident)
1536            .await
1537            .unwrap();
1538
1539        let schema =
1540            std::sync::Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
1541        let batch = arrow_array::RecordBatch::try_new(
1542            schema,
1543            vec![std::sync::Arc::new(arrow_array::StringArray::from(vec![
1544                "hello", "world",
1545            ]))],
1546        )
1547        .unwrap();
1548
1549        let text_embs = vec![vec![1.0f32, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
1550        let img_embs = vec![vec![1.0f32, 0.0], vec![0.0, 1.0]];
1551
1552        let columns = vec![
1553            MultiVectorBatch {
1554                policy: policy("embedding", 4),
1555                embeddings: &text_embs,
1556            },
1557            MultiVectorBatch {
1558                policy: policy("img_embedding", 2),
1559                embeddings: &img_embs,
1560            },
1561        ];
1562
1563        writer
1564            .write_batch_multi_deferred(&batch, &columns)
1565            .await
1566            .unwrap();
1567
1568        assert_eq!(writer.pending_files.len(), 1);
1569        let entry = &writer.pending_files[0];
1570        // IndexStatus::Indexing — index build is async
1571        assert_eq!(entry.index_status, IndexStatus::Indexing);
1572        // Primary centroid populated for pruning during build window
1573        assert!(entry.centroid_b64.is_some());
1574        // Placeholder extra column entry (centroid present, offsets zero)
1575        assert_eq!(entry.extra_vector_indexes.len(), 1);
1576        let xi = &entry.extra_vector_indexes[0];
1577        assert_eq!(xi.column, "img_embedding");
1578        assert_eq!(xi.dim, 2);
1579        assert_eq!(xi.hnsw_offset, 0); // not yet built
1580        assert_eq!(xi.hnsw_len, 0); // not yet built
1581        assert!(xi.centroid_b64.is_some());
1582    }
1583}