Skip to main content

ailake_query/
writer.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4
5use ailake_catalog::{
6    encode_centroid_b64, make_data_file_entry, make_data_file_entry_indexing,
7    make_multi_column_data_file_entry, new_snapshot_id, CatalogProvider, DataFileEntry,
8    ExtraVectorIndex, IcebergSchemaUpdate, IndexStatus, NewSnapshot, SnapshotId, SnapshotOperation,
9    TableIdent, TableProperties, VectorIndexInfo,
10};
11use ailake_core::{AilakeResult, VectorStoragePolicy};
12use ailake_file::{AilakeFileReader, AilakeFileWriter, IndexType, VectorColumnBatch};
13use ailake_index::{IvfPqCodebook, IvfPqConfig};
14use ailake_store::Store;
15use ailake_vec::compute_centroid_and_radius;
16use arrow_array::RecordBatch;
17use arrow_schema::SchemaRef;
18use bytes::Bytes;
19use serde_json;
20use tracing::{error, info};
21
22/// One vector column for a multi-column write batch.
23pub struct MultiVectorBatch<'a> {
24    pub policy: VectorStoragePolicy,
25    pub embeddings: &'a [Vec<f32>],
26}
27
28pub struct TableWriter {
29    catalog: Arc<dyn CatalogProvider>,
30    store: Arc<dyn Store>,
31    policy: VectorStoragePolicy,
32    table: TableIdent,
33    part_counter: Arc<AtomicU32>,
34    pending_files: Vec<DataFileEntry>,
35    parent_snapshot_id: Option<SnapshotId>,
36    /// Arrow schema captured from the first write_batch call; used to populate
37    /// Iceberg schema fields and schema.name-mapping.default on commit.
38    captured_schema: Option<SchemaRef>,
39    /// Extra vector column policies from write_batch_multi (columns beyond primary).
40    extra_vec_policies: Vec<VectorStoragePolicy>,
41    /// IVF-PQ codebook trained on the first shard and reused for all subsequent shards.
42    /// Ensures cross-shard ADC distances are comparable — no reranking needed.
43    cached_ivf_codebook: Option<Arc<IvfPqCodebook>>,
44    /// Shared codebook cell for deferred IVF-PQ builds. Cloneable Arc so each
45    /// background task can access it; OnceCell guarantees training runs exactly once.
46    deferred_ivf_codebook: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
47}
48
49impl TableWriter {
50    pub fn new(
51        catalog: Arc<dyn CatalogProvider>,
52        store: Arc<dyn Store>,
53        policy: VectorStoragePolicy,
54        table: TableIdent,
55    ) -> Self {
56        Self {
57            catalog,
58            store,
59            policy,
60            table,
61            part_counter: Arc::new(AtomicU32::new(0)),
62            pending_files: Vec::new(),
63            parent_snapshot_id: None,
64            captured_schema: None,
65            extra_vec_policies: Vec::new(),
66            cached_ivf_codebook: None,
67            deferred_ivf_codebook: Arc::new(tokio::sync::OnceCell::new()),
68        }
69    }
70
71    pub fn with_parent_snapshot(mut self, id: SnapshotId) -> Self {
72        self.parent_snapshot_id = Some(id);
73        self
74    }
75
76    /// Write batch as Parquet-only immediately, build HNSW in background.
77    ///
78    /// Returns after the Parquet file is persisted (~LanceDB write speed).
79    /// A tokio task runs concurrently to build the HNSW index, rewrite the
80    /// file with the AILK section, and update the catalog entry.
81    ///
82    /// During the build window, `SearchSession` serves this shard via flat scan
83    /// (brute-force, exact) instead of HNSW. The transition is automatic once
84    /// the background task commits the updated manifest entry.
85    pub async fn write_batch_deferred(
86        &mut self,
87        batch: &RecordBatch,
88        embeddings: &[Vec<f32>],
89    ) -> AilakeResult<()> {
90        if self.captured_schema.is_none() {
91            self.captured_schema = Some(batch.schema());
92        }
93        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
94        let file_path = format!("data/part-{:05}.parquet", part_num);
95
96        // Fast path: persist Parquet without HNSW.
97        let file_writer = AilakeFileWriter::new(self.policy.clone());
98        let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
99        let file_size = parquet_bytes.len() as u64;
100        self.store.put(&file_path, parquet_bytes).await?;
101
102        // Centroid needed immediately for geometric pruning during the build window.
103        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
104        let entry = make_data_file_entry_indexing(
105            &file_path,
106            embeddings.len() as u64,
107            file_size,
108            &centroid,
109            &self.policy.column_name,
110            self.policy.dim,
111        );
112        self.pending_files.push(entry);
113
114        // Spawn background HNSW build (fire-and-forget; errors are logged).
115        let store = self.store.clone();
116        let catalog = self.catalog.clone();
117        let policy = self.policy.clone();
118        let table = self.table.clone();
119        let fp = file_path.clone();
120        tokio::spawn(async move {
121            if let Err(e) = build_and_patch_index(store, catalog, policy, table, fp).await {
122                error!(
123                    "ailake: deferred HNSW build failed — file is indexed as Parquet-only until \
124                     next compaction rebuilds the index: {}",
125                    e
126                );
127            }
128        });
129
130        Ok(())
131    }
132
133    /// Write batch as Parquet-only immediately; train IVF-PQ index in background.
134    ///
135    /// The first shard trains the shared codebook (k-means). All subsequent shards
136    /// reuse it via `OnceCell` — build is O(n) assign+encode, not O(n×k) k-means.
137    /// Returns after Parquet is persisted. Index transitions Indexing → Ready async.
138    pub async fn write_batch_ivf_pq_deferred(
139        &mut self,
140        batch: &RecordBatch,
141        embeddings: &[Vec<f32>],
142        ivf_config: IvfPqConfig,
143    ) -> AilakeResult<()> {
144        if self.captured_schema.is_none() {
145            self.captured_schema = Some(batch.schema());
146        }
147        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
148        let file_path = format!("data/part-{:05}.parquet", part_num);
149
150        let file_writer = AilakeFileWriter::new(self.policy.clone());
151        let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
152        let file_size = parquet_bytes.len() as u64;
153        self.store.put(&file_path, parquet_bytes).await?;
154
155        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
156        let entry = make_data_file_entry_indexing(
157            &file_path,
158            embeddings.len() as u64,
159            file_size,
160            &centroid,
161            &self.policy.column_name,
162            self.policy.dim,
163        );
164        self.pending_files.push(entry);
165
166        let store = self.store.clone();
167        let catalog = self.catalog.clone();
168        let policy = self.policy.clone();
169        let table = self.table.clone();
170        let fp = file_path.clone();
171        let codebook_cell = self.deferred_ivf_codebook.clone();
172        tokio::spawn(async move {
173            if let Err(e) = build_ivf_pq_and_patch_index(
174                store,
175                catalog,
176                policy,
177                table,
178                fp,
179                ivf_config,
180                codebook_cell,
181            )
182            .await
183            {
184                error!(
185                    "ailake: deferred IVF-PQ build failed — file is indexed as Parquet-only until \
186                     next compaction rebuilds the index: {}",
187                    e
188                );
189            }
190        });
191
192        Ok(())
193    }
194
195    /// Idempotent variant of `write_batch`.
196    ///
197    /// Before any I/O, checks if `batch_id` already appears in the current
198    /// snapshot. If it does, this is a no-op — safe for Airflow/Kestra retries.
199    /// If not found, writes the batch and tags the `DataFileEntry` with `batch_id`
200    /// so future retries can detect it.
201    ///
202    /// `commit()` is likewise a no-op when `pending_files` is empty.
203    pub async fn write_batch_idempotent(
204        &mut self,
205        batch: &RecordBatch,
206        embeddings: &[Vec<f32>],
207        batch_id: &str,
208    ) -> AilakeResult<()> {
209        let existing = self.catalog.list_files(&self.table, None).await?;
210        if existing
211            .iter()
212            .any(|f| f.batch_id.as_deref() == Some(batch_id))
213        {
214            return Ok(());
215        }
216        self.write_batch_with_id(batch, embeddings, Some(batch_id.to_string()))
217            .await
218    }
219
220    /// Write a batch to a new AI-Lake file and stage it for commit.
221    pub async fn write_batch(
222        &mut self,
223        batch: &RecordBatch,
224        embeddings: &[Vec<f32>],
225    ) -> AilakeResult<()> {
226        self.write_batch_with_id(batch, embeddings, None).await
227    }
228
229    async fn write_batch_with_id(
230        &mut self,
231        batch: &RecordBatch,
232        embeddings: &[Vec<f32>],
233        batch_id: Option<String>,
234    ) -> AilakeResult<()> {
235        if self.captured_schema.is_none() {
236            self.captured_schema = Some(batch.schema());
237        }
238        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
239        let file_path = format!("data/part-{:05}.parquet", part_num);
240
241        // Write AI-Lake file
242        let file_writer = AilakeFileWriter::new(self.policy.clone());
243        let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
244        let file_size = file_bytes.len() as u64;
245
246        // Store the file
247        self.store.put(&file_path, file_bytes.clone()).await?;
248
249        // Compute centroid for catalog entry
250        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
251
252        // Read back the HNSW offsets from the written file
253        let reader = ailake_file::AilakeFileReader::new(
254            file_bytes,
255            &self.policy.column_name,
256            self.policy.dim,
257        );
258        let header = reader.read_header()?;
259        let ailk_start = reader.ailk_offset()?;
260        let hnsw_abs_offset = ailk_start + header.hnsw_offset;
261        let hnsw_len = header.hnsw_len;
262
263        let mut entry = make_data_file_entry(
264            &file_path,
265            embeddings.len() as u64,
266            file_size,
267            &centroid,
268            VectorIndexInfo {
269                column: &self.policy.column_name,
270                dim: self.policy.dim,
271                hnsw_offset: hnsw_abs_offset,
272                hnsw_len,
273            },
274        );
275        entry.batch_id = batch_id;
276        self.pending_files.push(entry);
277        Ok(())
278    }
279
280    /// Write batch, auto-selecting the index based on detected hardware.
281    ///
282    /// Picks IVF-PQ when a CUDA GPU or ≥8 CPU cores are present AND the batch
283    /// has ≥5 000 vectors. Falls back to HNSW for weaker / local hardware.
284    /// Uses `IvfPqConfig::for_dataset` to scale nlist with dataset size.
285    pub async fn write_batch_auto(
286        &mut self,
287        batch: &RecordBatch,
288        embeddings: &[Vec<f32>],
289    ) -> AilakeResult<()> {
290        let profile = ailake_index::HardwareProfile::detect();
291        if profile.recommend_ivf_pq(embeddings.len()) {
292            let ivf_config =
293                ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
294            self.write_batch_ivf_pq(batch, embeddings, ivf_config).await
295        } else {
296            self.write_batch(batch, embeddings).await
297        }
298    }
299
300    /// Write batch with IVF-PQ index built synchronously (no background task).
301    ///
302    /// Smaller index than HNSW; better for S3 sequential-scan workloads.
303    pub async fn write_batch_ivf_pq(
304        &mut self,
305        batch: &RecordBatch,
306        embeddings: &[Vec<f32>],
307        ivf_config: IvfPqConfig,
308    ) -> AilakeResult<()> {
309        if self.captured_schema.is_none() {
310            self.captured_schema = Some(batch.schema());
311        }
312        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
313        let file_path = format!("data/part-{:05}.parquet", part_num);
314
315        // Train codebook once on the first shard; all subsequent shards reuse it.
316        // This makes cross-shard ADC distances comparable, eliminating the need
317        // for exact reranking during multi-shard search.
318        if self.cached_ivf_codebook.is_none() {
319            let codebook = tokio::task::spawn_blocking({
320                let embeddings = embeddings.to_vec();
321                let metric = self.policy.metric;
322                let config = ivf_config.clone();
323                move || ailake_index::IvfPqIndex::train_codebook(&embeddings, metric, &config)
324            })
325            .await
326            .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
327            self.cached_ivf_codebook = Some(Arc::new(codebook));
328        }
329        let codebook = self.cached_ivf_codebook.as_ref().unwrap().clone();
330
331        let file_writer = AilakeFileWriter::new(self.policy.clone())
332            .with_index_type(IndexType::IvfPq(ivf_config))
333            .with_shared_ivf_codebook(codebook);
334        let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
335        let file_size = file_bytes.len() as u64;
336
337        self.store.put(&file_path, file_bytes.clone()).await?;
338
339        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
340
341        let reader = ailake_file::AilakeFileReader::new(
342            file_bytes,
343            &self.policy.column_name,
344            self.policy.dim,
345        );
346        let header = reader.read_header()?;
347        let ailk_start = reader.ailk_offset()?;
348        let index_abs_offset = ailk_start + header.hnsw_offset;
349        let index_len = header.hnsw_len;
350
351        let entry = make_data_file_entry(
352            &file_path,
353            embeddings.len() as u64,
354            file_size,
355            &centroid,
356            VectorIndexInfo {
357                column: &self.policy.column_name,
358                dim: self.policy.dim,
359                hnsw_offset: index_abs_offset,
360                hnsw_len: index_len,
361            },
362        );
363        self.pending_files.push(entry);
364        Ok(())
365    }
366
367    /// Write a batch with multiple vector columns into a single AI-Lake file.
368    ///
369    /// The first entry in `columns` is treated as the primary column (used for
370    /// geometric pruning). Additional columns each get their own HNSW section.
371    pub async fn write_batch_multi(
372        &mut self,
373        batch: &RecordBatch,
374        columns: &[MultiVectorBatch<'_>],
375    ) -> AilakeResult<()> {
376        use ailake_core::AilakeError;
377        if self.captured_schema.is_none() {
378            self.captured_schema = Some(batch.schema());
379        }
380        if self.extra_vec_policies.is_empty() && columns.len() > 1 {
381            self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
382        }
383
384        if columns.is_empty() {
385            return Err(AilakeError::InvalidArgument(
386                "write_batch_multi requires at least one column".into(),
387            ));
388        }
389
390        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
391        let file_path = format!("data/part-{:05}.parquet", part_num);
392
393        let col_batches: Vec<VectorColumnBatch<'_>> = columns
394            .iter()
395            .map(|c| VectorColumnBatch {
396                policy: &c.policy,
397                embeddings: c.embeddings,
398            })
399            .collect();
400
401        let primary_policy = &columns[0].policy;
402        let file_writer = AilakeFileWriter::new(primary_policy.clone());
403        let file_bytes: Bytes = file_writer.write_multi(batch, &col_batches)?;
404        let file_size = file_bytes.len() as u64;
405
406        self.store.put(&file_path, file_bytes.clone()).await?;
407
408        // Primary centroid for pruning
409        let primary_centroid =
410            compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
411
412        // Read primary AILK header for offsets
413        let reader = ailake_file::AilakeFileReader::new(
414            file_bytes.clone(),
415            &primary_policy.column_name,
416            primary_policy.dim,
417        );
418        let primary_ailk_start = reader.ailk_offset()?;
419        let primary_header = {
420            use ailake_file::HEADER_SIZE;
421            let start = primary_ailk_start as usize;
422            let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
423                .try_into()
424                .map_err(|_| AilakeError::NotAnAilakeFile)?;
425            ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
426        };
427        let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
428
429        // Extra column index metadata
430        let mut extra: Vec<ExtraVectorIndex> = Vec::new();
431        for col in columns.iter().skip(1) {
432            let col_ailk_start = reader.ailk_offset_for_column(&col.policy.column_name)?;
433            let col_header = {
434                use ailake_file::HEADER_SIZE;
435                let start = col_ailk_start as usize;
436                let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
437                    .try_into()
438                    .map_err(|_| AilakeError::NotAnAilakeFile)?;
439                ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
440            };
441            let col_centroid = compute_centroid_and_radius(col.embeddings, col.policy.metric);
442            extra.push(ExtraVectorIndex {
443                column: col.policy.column_name.clone(),
444                dim: col.policy.dim,
445                hnsw_offset: col_ailk_start + col_header.hnsw_offset,
446                hnsw_len: col_header.hnsw_len,
447                centroid_b64: Some(encode_centroid_b64(&col_centroid)),
448                radius: Some(col_centroid.radius),
449            });
450        }
451
452        let entry = make_multi_column_data_file_entry(
453            &file_path,
454            columns[0].embeddings.len() as u64,
455            file_size,
456            &primary_centroid,
457            VectorIndexInfo {
458                column: &primary_policy.column_name,
459                dim: primary_policy.dim,
460                hnsw_offset: primary_hnsw_abs,
461                hnsw_len: primary_header.hnsw_len,
462            },
463            &extra,
464        );
465        self.pending_files.push(entry);
466        Ok(())
467    }
468
469    /// Commit all staged files as a new Iceberg snapshot.
470    ///
471    /// No-op when `pending_files` is empty (e.g., all `write_batch_idempotent`
472    /// calls were skipped because their `batch_id` was already committed).
473    /// Returns the current snapshot id in that case (or 0 if no snapshot exists yet).
474    pub async fn commit(mut self) -> AilakeResult<SnapshotId> {
475        if self.pending_files.is_empty() {
476            let current = self
477                .catalog
478                .load_table(&self.table)
479                .await
480                .ok()
481                .and_then(|m| m.current_snapshot_id)
482                .unwrap_or(0);
483            return Ok(current);
484        }
485        let iceberg_schema = self
486            .captured_schema
487            .as_deref()
488            .map(|s| arrow_schema_to_iceberg_update(s, &self.policy, &self.extra_vec_policies));
489        let snapshot = NewSnapshot {
490            snapshot_id: new_snapshot_id(),
491            parent_snapshot_id: self.parent_snapshot_id,
492            files: std::mem::take(&mut self.pending_files),
493            operation: SnapshotOperation::Append,
494            iceberg_schema,
495        };
496        self.catalog.commit_snapshot(&self.table, snapshot).await
497    }
498
499    /// Create a table if it doesn't exist, then return a writer for it.
500    pub async fn create_or_open(
501        catalog: Arc<dyn CatalogProvider>,
502        store: Arc<dyn Store>,
503        policy: VectorStoragePolicy,
504        table: TableIdent,
505    ) -> AilakeResult<Self> {
506        // Try to load; if not found, create
507        if catalog.load_table(&table).await.is_err() {
508            catalog
509                .create_table(
510                    &table,
511                    &TableProperties {
512                        policy: policy.clone(),
513                        extra: std::collections::HashMap::new(),
514                    },
515                )
516                .await?;
517        }
518        Ok(Self::new(catalog, store, policy, table))
519    }
520}
521
522/// Convert an Arrow schema to an Iceberg schema update for catalog commits.
523///
524/// Top-level field IDs are assigned sequentially (1-based) and match the
525/// `PARQUET:field_id` stamps written by `ParquetVectorWriter`. Nested element
526/// IDs (inside List/Struct/Map) are assigned after all top-level IDs are
527/// pre-reserved, so they never collide with Parquet column field IDs.
528fn arrow_schema_to_iceberg_update(
529    schema: &arrow_schema::Schema,
530    policy: &VectorStoragePolicy,
531    extra_vec_policies: &[VectorStoragePolicy],
532) -> IcebergSchemaUpdate {
533    let bytes_per_dim = policy.precision.bytes_per_element() as u32;
534    let vec_fixed_len = policy.dim * bytes_per_dim;
535
536    // Collect all vector column names that will appear in the final schema.
537    let has_primary_in_batch = schema
538        .fields()
539        .iter()
540        .any(|f| f.name() == &policy.column_name);
541    let vec_cols: Vec<(String, u32)> = {
542        let mut v = Vec::new();
543        if !has_primary_in_batch {
544            v.push((policy.column_name.clone(), vec_fixed_len));
545        }
546        for ep in extra_vec_policies {
547            let ep_fixed_len = ep.dim * ep.precision.bytes_per_element() as u32;
548            if !schema.fields().iter().any(|f| f.name() == &ep.column_name) {
549                v.push((ep.column_name.clone(), ep_fixed_len));
550            }
551        }
552        v
553    };
554
555    // Total top-level columns = batch fields + appended vec columns.
556    let top_level_count = schema.fields().len() + vec_cols.len();
557    // Nested element IDs start after all top-level IDs are pre-reserved.
558    let mut nested_id = top_level_count as i32;
559
560    let mut fields: Vec<serde_json::Value> = Vec::new();
561    let mut name_mapping: Vec<serde_json::Value> = Vec::new();
562
563    for (idx, field) in schema.fields().iter().enumerate() {
564        let field_id = (idx + 1) as i32;
565        let iceberg_type = arrow_type_to_iceberg(field.data_type(), &mut nested_id);
566        fields.push(serde_json::json!({
567            "id": field_id,
568            "name": field.name(),
569            "required": false,
570            "type": iceberg_type,
571        }));
572        name_mapping.push(serde_json::json!({
573            "field-id": field_id,
574            "names": [field.name()],
575        }));
576    }
577
578    // Append vector columns that live outside the RecordBatch schema.
579    for (i, (col_name, fixed_len)) in vec_cols.iter().enumerate() {
580        let field_id = (schema.fields().len() + 1 + i) as i32;
581        fields.push(serde_json::json!({
582            "id": field_id,
583            "name": col_name,
584            "required": false,
585            "type": format!("fixed[{fixed_len}]"),
586        }));
587        name_mapping.push(serde_json::json!({
588            "field-id": field_id,
589            "names": [col_name],
590        }));
591    }
592
593    let last_column_id = nested_id;
594    let name_mapping_json = serde_json::to_string(&name_mapping).unwrap_or_else(|_| "[]".into());
595
596    IcebergSchemaUpdate {
597        fields,
598        last_column_id,
599        name_mapping_json,
600    }
601}
602
603/// Map an Arrow DataType to an Iceberg schema type value (string or JSON object).
604///
605/// `nested_id` is a shared counter for generating unique element/field IDs inside
606/// List, Struct, and Map types. It must start beyond all pre-reserved top-level IDs.
607fn arrow_type_to_iceberg(dt: &arrow_schema::DataType, nested_id: &mut i32) -> serde_json::Value {
608    use arrow_schema::DataType;
609    match dt {
610        DataType::Boolean => serde_json::json!("boolean"),
611        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
612            serde_json::json!("int")
613        }
614        DataType::Int64 | DataType::UInt32 | DataType::UInt64 => serde_json::json!("long"),
615        DataType::Float16 | DataType::Float32 => serde_json::json!("float"),
616        DataType::Float64 => serde_json::json!("double"),
617        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => serde_json::json!("string"),
618        DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
619            serde_json::json!("binary")
620        }
621        DataType::Date32 | DataType::Date64 => serde_json::json!("date"),
622        // Timestamp with timezone → timestamptz; without → timestamp.
623        DataType::Timestamp(_, Some(_)) => serde_json::json!("timestamptz"),
624        DataType::Timestamp(_, None) => serde_json::json!("timestamp"),
625        DataType::Time32(_) | DataType::Time64(_) => serde_json::json!("time"),
626        DataType::FixedSizeBinary(n) => serde_json::json!(format!("fixed[{n}]")),
627        DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
628            serde_json::json!(format!("decimal({p}, {s})"))
629        }
630        DataType::List(inner)
631        | DataType::LargeList(inner)
632        | DataType::ListView(inner)
633        | DataType::FixedSizeList(inner, _) => {
634            *nested_id += 1;
635            let element_id = *nested_id;
636            let element_type = arrow_type_to_iceberg(inner.data_type(), nested_id);
637            serde_json::json!({
638                "type": "list",
639                "element-id": element_id,
640                "element": element_type,
641                "element-required": !inner.is_nullable(),
642            })
643        }
644        DataType::Struct(arrow_fields) => {
645            let struct_fields: Vec<serde_json::Value> = arrow_fields
646                .iter()
647                .map(|f| {
648                    *nested_id += 1;
649                    let fid = *nested_id;
650                    let ftype = arrow_type_to_iceberg(f.data_type(), nested_id);
651                    serde_json::json!({
652                        "id": fid,
653                        "name": f.name(),
654                        "required": !f.is_nullable(),
655                        "type": ftype,
656                    })
657                })
658                .collect();
659            serde_json::json!({ "type": "struct", "fields": struct_fields })
660        }
661        DataType::Map(entries, _) => {
662            // Arrow Map is List<Struct<key: K, value: V>>.
663            *nested_id += 1;
664            let key_id = *nested_id;
665            *nested_id += 1;
666            let val_id = *nested_id;
667            if let DataType::Struct(kv_fields) = entries.data_type() {
668                let key_f = kv_fields
669                    .iter()
670                    .find(|f| f.name() == "key" || f.name() == "keys");
671                let val_f = kv_fields
672                    .iter()
673                    .find(|f| f.name() == "value" || f.name() == "values");
674                let key_type = key_f
675                    .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
676                    .unwrap_or(serde_json::json!("binary"));
677                let val_type = val_f
678                    .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
679                    .unwrap_or(serde_json::json!("binary"));
680                let val_required = val_f.map(|f| !f.is_nullable()).unwrap_or(false);
681                serde_json::json!({
682                    "type": "map",
683                    "key-id": key_id,
684                    "key": key_type,
685                    "value-id": val_id,
686                    "value": val_type,
687                    "value-required": val_required,
688                })
689            } else {
690                serde_json::json!("binary")
691            }
692        }
693        _ => serde_json::json!("binary"),
694    }
695}
696
697/// Background task: reads a Parquet-only shard, builds full AILK file, patches catalog.
698async fn build_and_patch_index(
699    store: Arc<dyn Store>,
700    catalog: Arc<dyn CatalogProvider>,
701    policy: VectorStoragePolicy,
702    table: TableIdent,
703    file_path: String,
704) -> AilakeResult<()> {
705    // Read the Parquet-only bytes already stored.
706    let parquet_bytes = store.get(&file_path).await?;
707    let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
708    let (batch, embeddings) = reader.read_parquet()?;
709
710    // Build the full AILK file (Parquet + HNSW) — CPU-intensive; run on blocking pool
711    // so the tokio async threads aren't starved when many shards build concurrently.
712    let full_bytes = tokio::task::spawn_blocking({
713        let policy = policy.clone();
714        move || {
715            let file_writer = AilakeFileWriter::new(policy);
716            file_writer.write(&batch, &embeddings)
717        }
718    })
719    .await
720    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
721
722    // Extract HNSW offsets from the newly written file.
723    let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
724    let header = full_reader.read_header()?;
725    let ailk_start = full_reader.ailk_offset()?;
726    let hnsw_abs_offset = ailk_start + header.hnsw_offset;
727    let hnsw_len = header.hnsw_len;
728
729    // Overwrite the Parquet-only file with the full AILK version.
730    store.put(&file_path, full_bytes).await?;
731
732    // Wait for the initial writer commit to appear (HNSW builds can finish before
733    // the main write loop calls commit_snapshot, so the catalog has no snapshot yet).
734    for _ in 0..120u32 {
735        match catalog.load_table(&table).await {
736            Ok(meta) if meta.current_snapshot_id.is_some() => break,
737            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
738        }
739    }
740
741    // Update the catalog with CAS-like retry to handle concurrent background tasks.
742    // Multiple tasks can race on commit_snapshot(Replace): the last writer wins and
743    // may overwrite a sibling task's Ready status. Retry until we confirm our file
744    // is marked Ready in the current snapshot.
745    for attempt in 0..50u32 {
746        let table_meta = catalog.load_table(&table).await?;
747        let parent_snapshot_id = table_meta.current_snapshot_id;
748        let mut files = catalog.list_files(&table, None).await?;
749
750        // Already marked Ready by a previous successful attempt.
751        if files
752            .iter()
753            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
754        {
755            break;
756        }
757
758        for f in &mut files {
759            if f.path == file_path {
760                f.hnsw_offset = Some(hnsw_abs_offset);
761                f.hnsw_len = Some(hnsw_len);
762                f.index_status = IndexStatus::Ready;
763                break;
764            }
765        }
766        catalog
767            .commit_snapshot(
768                &table,
769                NewSnapshot {
770                    snapshot_id: new_snapshot_id(),
771                    parent_snapshot_id,
772                    files,
773                    operation: SnapshotOperation::Replace,
774                    iceberg_schema: None,
775                },
776            )
777            .await?;
778
779        // Brief yield so sibling tasks can commit, then verify our change survived.
780        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
781
782        let verify = catalog.list_files(&table, None).await?;
783        if verify
784            .iter()
785            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
786        {
787            break;
788        }
789        // Another task overwrote us — retry.
790    }
791
792    info!(
793        "ailake: deferred HNSW index built for {} (offset={}, len={})",
794        file_path, hnsw_abs_offset, hnsw_len
795    );
796    Ok(())
797}
798
799/// Background task: train IVF-PQ (using shared codebook) and patch catalog entry.
800///
801/// The OnceCell guarantees that k-means training runs exactly once across all
802/// concurrent background tasks — subsequent tasks skip directly to assign+encode.
803async fn build_ivf_pq_and_patch_index(
804    store: Arc<dyn Store>,
805    catalog: Arc<dyn CatalogProvider>,
806    policy: VectorStoragePolicy,
807    table: TableIdent,
808    file_path: String,
809    ivf_config: IvfPqConfig,
810    codebook_cell: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
811) -> AilakeResult<()> {
812    let parquet_bytes = store.get(&file_path).await?;
813    let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
814    let (batch, embeddings) = reader.read_parquet()?;
815
816    // Get or train the shared codebook. First task trains; all others await the result.
817    let codebook = codebook_cell
818        .get_or_try_init(|| async {
819            let vecs = embeddings.clone();
820            let metric = policy.metric;
821            let cfg = ivf_config.clone();
822            tokio::task::spawn_blocking(move || {
823                ailake_index::IvfPqIndex::train_codebook(&vecs, metric, &cfg)
824            })
825            .await
826            .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))?
827        })
828        .await?;
829
830    let full_bytes = tokio::task::spawn_blocking({
831        let policy = policy.clone();
832        let codebook = codebook.clone();
833        move || {
834            let file_writer = AilakeFileWriter::new(policy)
835                .with_index_type(IndexType::IvfPq(ivf_config))
836                .with_shared_ivf_codebook(Arc::new(codebook));
837            file_writer.write(&batch, &embeddings)
838        }
839    })
840    .await
841    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
842
843    let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
844    let header = full_reader.read_header()?;
845    let ailk_start = full_reader.ailk_offset()?;
846    let hnsw_abs_offset = ailk_start + header.hnsw_offset;
847    let hnsw_len = header.hnsw_len;
848
849    store.put(&file_path, full_bytes).await?;
850
851    // Wait for initial commit to appear then patch IndexStatus::Ready (same CAS loop as HNSW).
852    for _ in 0..120u32 {
853        match catalog.load_table(&table).await {
854            Ok(meta) if meta.current_snapshot_id.is_some() => break,
855            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
856        }
857    }
858
859    for attempt in 0..50u32 {
860        let table_meta = catalog.load_table(&table).await?;
861        let parent_snapshot_id = table_meta.current_snapshot_id;
862        let mut files = catalog.list_files(&table, None).await?;
863
864        if files
865            .iter()
866            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
867        {
868            break;
869        }
870
871        for f in &mut files {
872            if f.path == file_path {
873                f.hnsw_offset = Some(hnsw_abs_offset);
874                f.hnsw_len = Some(hnsw_len);
875                f.index_status = IndexStatus::Ready;
876                break;
877            }
878        }
879        catalog
880            .commit_snapshot(
881                &table,
882                NewSnapshot {
883                    snapshot_id: new_snapshot_id(),
884                    parent_snapshot_id,
885                    files,
886                    operation: SnapshotOperation::Replace,
887                    iceberg_schema: None,
888                },
889            )
890            .await?;
891
892        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
893
894        let verify = catalog.list_files(&table, None).await?;
895        if verify
896            .iter()
897            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
898        {
899            break;
900        }
901    }
902
903    info!(
904        "ailake: deferred IVF-PQ index built for {} (offset={}, len={})",
905        file_path, hnsw_abs_offset, hnsw_len
906    );
907    Ok(())
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913    use ailake_core::{VectorMetric, VectorPrecision};
914    use arrow_schema::{DataType, Field, Schema, TimeUnit};
915
916    fn policy(col: &str, dim: u32) -> VectorStoragePolicy {
917        VectorStoragePolicy {
918            column_name: col.to_string(),
919            dim,
920            metric: VectorMetric::Cosine,
921            precision: VectorPrecision::F16,
922            pq: None,
923            keep_raw_for_reranking: false,
924            pre_normalize: false,
925            hnsw_m: None,
926            hnsw_ef_construction: None,
927            rabitq: None,
928        }
929    }
930
931    fn update_for(schema: &Schema, pol: &VectorStoragePolicy) -> IcebergSchemaUpdate {
932        arrow_schema_to_iceberg_update(schema, pol, &[])
933    }
934
935    #[test]
936    fn simple_schema_produces_correct_fields() {
937        let schema = Schema::new(vec![
938            Field::new("id", DataType::Int32, false),
939            Field::new("text", DataType::Utf8, false),
940        ]);
941        let pol = policy("embedding", 8);
942        let upd = update_for(&schema, &pol);
943
944        assert_eq!(upd.fields.len(), 3);
945        assert_eq!(upd.fields[0]["id"], 1);
946        assert_eq!(upd.fields[0]["type"], "int");
947        assert_eq!(upd.fields[1]["id"], 2);
948        assert_eq!(upd.fields[1]["type"], "string");
949        assert_eq!(upd.fields[2]["id"], 3);
950        assert_eq!(upd.fields[2]["type"], "fixed[16]"); // dim=8, F16=2 bytes
951
952        let nm: Vec<serde_json::Value> = serde_json::from_str(&upd.name_mapping_json).unwrap();
953        assert_eq!(nm.len(), 3);
954        assert_eq!(nm[2]["field-id"], 3);
955        assert_eq!(nm[2]["names"][0], "embedding");
956        assert_eq!(upd.last_column_id, 3);
957    }
958
959    #[test]
960    fn timestamp_without_tz_maps_to_timestamp_not_timestamptz() {
961        let schema = Schema::new(vec![
962            Field::new(
963                "created_at",
964                DataType::Timestamp(TimeUnit::Microsecond, None),
965                true,
966            ),
967            Field::new(
968                "updated_at",
969                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
970                true,
971            ),
972        ]);
973        let pol = policy("vec", 4);
974        let upd = update_for(&schema, &pol);
975
976        assert_eq!(upd.fields[0]["type"], "timestamp");
977        assert_eq!(upd.fields[1]["type"], "timestamptz");
978    }
979
980    #[test]
981    fn list_type_produces_iceberg_list_object() {
982        let schema = Schema::new(vec![Field::new(
983            "tags",
984            DataType::List(std::sync::Arc::new(Field::new(
985                "item",
986                DataType::Utf8,
987                true,
988            ))),
989            true,
990        )]);
991        let pol = policy("vec", 4);
992        let upd = update_for(&schema, &pol);
993
994        let t = &upd.fields[0]["type"];
995        assert_eq!(t["type"], "list");
996        assert_eq!(t["element"], "string");
997        // element-id must be > top-level field count (2: tags + vec)
998        assert!(t["element-id"].as_i64().unwrap() > 2);
999    }
1000
1001    #[test]
1002    fn struct_type_produces_nested_fields() {
1003        let schema = Schema::new(vec![Field::new(
1004            "meta",
1005            DataType::Struct(
1006                vec![
1007                    Field::new("key", DataType::Utf8, false),
1008                    Field::new("val", DataType::Int64, false),
1009                ]
1010                .into(),
1011            ),
1012            true,
1013        )]);
1014        let pol = policy("vec", 4);
1015        let upd = update_for(&schema, &pol);
1016
1017        let t = &upd.fields[0]["type"];
1018        assert_eq!(t["type"], "struct");
1019        let nested = t["fields"].as_array().unwrap();
1020        assert_eq!(nested.len(), 2);
1021        assert_eq!(nested[0]["name"], "key");
1022        assert_eq!(nested[0]["type"], "string");
1023        assert_eq!(nested[1]["name"], "val");
1024        assert_eq!(nested[1]["type"], "long");
1025        // Nested IDs must be > top-level count (2: meta + vec)
1026        assert!(nested[0]["id"].as_i64().unwrap() > 2);
1027    }
1028
1029    #[test]
1030    fn no_duplicate_vec_column_when_already_in_batch() {
1031        // If for some reason the vec column is in the batch schema, don't add it twice.
1032        let schema = Schema::new(vec![
1033            Field::new("id", DataType::Int32, false),
1034            Field::new("embedding", DataType::FixedSizeBinary(16), false),
1035        ]);
1036        let pol = policy("embedding", 8);
1037        let upd = update_for(&schema, &pol);
1038
1039        assert_eq!(upd.fields.len(), 2, "should not add embedding twice");
1040        let names: Vec<&str> = upd
1041            .fields
1042            .iter()
1043            .map(|f| f["name"].as_str().unwrap())
1044            .collect();
1045        assert_eq!(names.iter().filter(|&&n| n == "embedding").count(), 1);
1046    }
1047
1048    #[test]
1049    fn multi_vec_policies_all_appended() {
1050        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1051        let primary = policy("embedding", 4);
1052        let extra = vec![policy("context_embedding", 4)];
1053        let upd = arrow_schema_to_iceberg_update(&schema, &primary, &extra);
1054
1055        assert_eq!(upd.fields.len(), 3); // id + embedding + context_embedding
1056        let names: Vec<&str> = upd
1057            .fields
1058            .iter()
1059            .map(|f| f["name"].as_str().unwrap())
1060            .collect();
1061        assert!(names.contains(&"embedding"));
1062        assert!(names.contains(&"context_embedding"));
1063    }
1064
1065    #[test]
1066    fn top_level_field_ids_match_parquet_stamp_sequence() {
1067        // Top-level IDs must be 1, 2, ..., N regardless of nested element IDs.
1068        let schema = Schema::new(vec![
1069            Field::new("id", DataType::Int64, false),
1070            Field::new(
1071                "tags",
1072                DataType::List(std::sync::Arc::new(Field::new(
1073                    "item",
1074                    DataType::Utf8,
1075                    true,
1076                ))),
1077                true,
1078            ),
1079        ]);
1080        let pol = policy("vec", 4);
1081        let upd = update_for(&schema, &pol);
1082
1083        // Top-level: id=1, tags=2, vec=3
1084        assert_eq!(upd.fields[0]["id"], 1);
1085        assert_eq!(upd.fields[1]["id"], 2);
1086        assert_eq!(upd.fields[2]["id"], 3);
1087
1088        // Nested element ID must be > 3
1089        assert!(upd.fields[1]["type"]["element-id"].as_i64().unwrap() > 3);
1090    }
1091}