Skip to main content

ailake_query/
writer.rs

1use std::sync::atomic::{AtomicU32, Ordering};
2use std::sync::Arc;
3
4use ailake_catalog::{
5    encode_centroid_b64, make_data_file_entry, make_data_file_entry_indexing,
6    make_multi_column_data_file_entry, new_snapshot_id, CatalogProvider, DataFileEntry,
7    ExtraVectorIndex, IcebergSchemaUpdate, IndexStatus, NewSnapshot, SnapshotId, SnapshotOperation,
8    TableIdent, TableProperties, VectorIndexInfo,
9};
10use ailake_core::{AilakeResult, VectorStoragePolicy};
11use ailake_file::{AilakeFileReader, AilakeFileWriter, IndexType, VectorColumnBatch};
12use ailake_index::IvfPqConfig;
13use ailake_store::Store;
14use ailake_vec::compute_centroid_and_radius;
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use bytes::Bytes;
18use serde_json;
19
20/// One vector column for a multi-column write batch.
21pub struct MultiVectorBatch<'a> {
22    pub policy: VectorStoragePolicy,
23    pub embeddings: &'a [Vec<f32>],
24}
25
26pub struct TableWriter {
27    catalog: Arc<dyn CatalogProvider>,
28    store: Arc<dyn Store>,
29    policy: VectorStoragePolicy,
30    table: TableIdent,
31    part_counter: Arc<AtomicU32>,
32    pending_files: Vec<DataFileEntry>,
33    parent_snapshot_id: Option<SnapshotId>,
34    /// Arrow schema captured from the first write_batch call; used to populate
35    /// Iceberg schema fields and schema.name-mapping.default on commit.
36    captured_schema: Option<SchemaRef>,
37    /// Extra vector column policies from write_batch_multi (columns beyond primary).
38    extra_vec_policies: Vec<VectorStoragePolicy>,
39}
40
41impl TableWriter {
42    pub fn new(
43        catalog: Arc<dyn CatalogProvider>,
44        store: Arc<dyn Store>,
45        policy: VectorStoragePolicy,
46        table: TableIdent,
47    ) -> Self {
48        Self {
49            catalog,
50            store,
51            policy,
52            table,
53            part_counter: Arc::new(AtomicU32::new(0)),
54            pending_files: Vec::new(),
55            parent_snapshot_id: None,
56            captured_schema: None,
57            extra_vec_policies: Vec::new(),
58        }
59    }
60
61    pub fn with_parent_snapshot(mut self, id: SnapshotId) -> Self {
62        self.parent_snapshot_id = Some(id);
63        self
64    }
65
66    /// Write batch as Parquet-only immediately, build HNSW in background.
67    ///
68    /// Returns after the Parquet file is persisted (~LanceDB write speed).
69    /// A tokio task runs concurrently to build the HNSW index, rewrite the
70    /// file with the AILK section, and update the catalog entry.
71    ///
72    /// During the build window, `SearchSession` serves this shard via flat scan
73    /// (brute-force, exact) instead of HNSW. The transition is automatic once
74    /// the background task commits the updated manifest entry.
75    pub async fn write_batch_deferred(
76        &mut self,
77        batch: &RecordBatch,
78        embeddings: &[Vec<f32>],
79    ) -> AilakeResult<()> {
80        if self.captured_schema.is_none() {
81            self.captured_schema = Some(batch.schema());
82        }
83        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
84        let file_path = format!("data/part-{:05}.parquet", part_num);
85
86        // Fast path: persist Parquet without HNSW.
87        let file_writer = AilakeFileWriter::new(self.policy.clone());
88        let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
89        let file_size = parquet_bytes.len() as u64;
90        self.store.put(&file_path, parquet_bytes).await?;
91
92        // Centroid needed immediately for geometric pruning during the build window.
93        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
94        let entry = make_data_file_entry_indexing(
95            &file_path,
96            embeddings.len() as u64,
97            file_size,
98            &centroid,
99            &self.policy.column_name,
100            self.policy.dim,
101        );
102        self.pending_files.push(entry);
103
104        // Spawn background HNSW build (fire-and-forget; errors are logged).
105        let store = self.store.clone();
106        let catalog = self.catalog.clone();
107        let policy = self.policy.clone();
108        let table = self.table.clone();
109        let fp = file_path.clone();
110        tokio::spawn(async move {
111            if let Err(e) = build_and_patch_index(store, catalog, policy, table, fp).await {
112                eprintln!("[ailake] deferred HNSW build failed: {e}");
113            }
114        });
115
116        Ok(())
117    }
118
119    /// Write a batch to a new AI-Lake file and stage it for commit.
120    pub async fn write_batch(
121        &mut self,
122        batch: &RecordBatch,
123        embeddings: &[Vec<f32>],
124    ) -> AilakeResult<()> {
125        if self.captured_schema.is_none() {
126            self.captured_schema = Some(batch.schema());
127        }
128        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
129        let file_path = format!("data/part-{:05}.parquet", part_num);
130
131        // Write AI-Lake file
132        let file_writer = AilakeFileWriter::new(self.policy.clone());
133        let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
134        let file_size = file_bytes.len() as u64;
135
136        // Store the file
137        self.store.put(&file_path, file_bytes.clone()).await?;
138
139        // Compute centroid for catalog entry
140        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
141
142        // Read back the HNSW offsets from the written file
143        let reader = ailake_file::AilakeFileReader::new(
144            file_bytes,
145            &self.policy.column_name,
146            self.policy.dim,
147        );
148        let header = reader.read_header()?;
149        let ailk_start = reader.ailk_offset()?;
150        let hnsw_abs_offset = ailk_start + header.hnsw_offset;
151        let hnsw_len = header.hnsw_len;
152
153        let entry = make_data_file_entry(
154            &file_path,
155            embeddings.len() as u64,
156            file_size,
157            &centroid,
158            VectorIndexInfo {
159                column: &self.policy.column_name,
160                dim: self.policy.dim,
161                hnsw_offset: hnsw_abs_offset,
162                hnsw_len,
163            },
164        );
165        self.pending_files.push(entry);
166        Ok(())
167    }
168
169    /// Write batch, auto-selecting the index based on detected hardware.
170    ///
171    /// Picks IVF-PQ when a CUDA GPU or ≥8 CPU cores are present AND the batch
172    /// has ≥5 000 vectors. Falls back to HNSW for weaker / local hardware.
173    /// Uses `IvfPqConfig::for_dataset` to scale nlist with dataset size.
174    pub async fn write_batch_auto(
175        &mut self,
176        batch: &RecordBatch,
177        embeddings: &[Vec<f32>],
178    ) -> AilakeResult<()> {
179        let profile = ailake_index::HardwareProfile::detect();
180        if profile.recommend_ivf_pq(embeddings.len()) {
181            let ivf_config =
182                ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
183            self.write_batch_ivf_pq(batch, embeddings, ivf_config).await
184        } else {
185            self.write_batch(batch, embeddings).await
186        }
187    }
188
189    /// Write batch with IVF-PQ index built synchronously (no background task).
190    ///
191    /// Smaller index than HNSW; better for S3 sequential-scan workloads.
192    pub async fn write_batch_ivf_pq(
193        &mut self,
194        batch: &RecordBatch,
195        embeddings: &[Vec<f32>],
196        ivf_config: IvfPqConfig,
197    ) -> AilakeResult<()> {
198        if self.captured_schema.is_none() {
199            self.captured_schema = Some(batch.schema());
200        }
201        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
202        let file_path = format!("data/part-{:05}.parquet", part_num);
203
204        let file_writer = AilakeFileWriter::new(self.policy.clone())
205            .with_index_type(IndexType::IvfPq(ivf_config));
206        let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
207        let file_size = file_bytes.len() as u64;
208
209        self.store.put(&file_path, file_bytes.clone()).await?;
210
211        let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
212
213        let reader = ailake_file::AilakeFileReader::new(
214            file_bytes,
215            &self.policy.column_name,
216            self.policy.dim,
217        );
218        let header = reader.read_header()?;
219        let ailk_start = reader.ailk_offset()?;
220        let index_abs_offset = ailk_start + header.hnsw_offset;
221        let index_len = header.hnsw_len;
222
223        let entry = make_data_file_entry(
224            &file_path,
225            embeddings.len() as u64,
226            file_size,
227            &centroid,
228            VectorIndexInfo {
229                column: &self.policy.column_name,
230                dim: self.policy.dim,
231                hnsw_offset: index_abs_offset,
232                hnsw_len: index_len,
233            },
234        );
235        self.pending_files.push(entry);
236        Ok(())
237    }
238
239    /// Write a batch with multiple vector columns into a single AI-Lake file.
240    ///
241    /// The first entry in `columns` is treated as the primary column (used for
242    /// geometric pruning). Additional columns each get their own HNSW section.
243    pub async fn write_batch_multi(
244        &mut self,
245        batch: &RecordBatch,
246        columns: &[MultiVectorBatch<'_>],
247    ) -> AilakeResult<()> {
248        use ailake_core::AilakeError;
249        if self.captured_schema.is_none() {
250            self.captured_schema = Some(batch.schema());
251        }
252        if self.extra_vec_policies.is_empty() && columns.len() > 1 {
253            self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
254        }
255
256        if columns.is_empty() {
257            return Err(AilakeError::InvalidArgument(
258                "write_batch_multi requires at least one column".into(),
259            ));
260        }
261
262        let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
263        let file_path = format!("data/part-{:05}.parquet", part_num);
264
265        let col_batches: Vec<VectorColumnBatch<'_>> = columns
266            .iter()
267            .map(|c| VectorColumnBatch {
268                policy: &c.policy,
269                embeddings: c.embeddings,
270            })
271            .collect();
272
273        let primary_policy = &columns[0].policy;
274        let file_writer = AilakeFileWriter::new(primary_policy.clone());
275        let file_bytes: Bytes = file_writer.write_multi(batch, &col_batches)?;
276        let file_size = file_bytes.len() as u64;
277
278        self.store.put(&file_path, file_bytes.clone()).await?;
279
280        // Primary centroid for pruning
281        let primary_centroid =
282            compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
283
284        // Read primary AILK header for offsets
285        let reader = ailake_file::AilakeFileReader::new(
286            file_bytes.clone(),
287            &primary_policy.column_name,
288            primary_policy.dim,
289        );
290        let primary_ailk_start = reader.ailk_offset()?;
291        let primary_header = {
292            use ailake_file::HEADER_SIZE;
293            let start = primary_ailk_start as usize;
294            let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
295                .try_into()
296                .map_err(|_| AilakeError::NotAnAilakeFile)?;
297            ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
298        };
299        let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
300
301        // Extra column index metadata
302        let mut extra: Vec<ExtraVectorIndex> = Vec::new();
303        for col in columns.iter().skip(1) {
304            let col_ailk_start = reader.ailk_offset_for_column(&col.policy.column_name)?;
305            let col_header = {
306                use ailake_file::HEADER_SIZE;
307                let start = col_ailk_start as usize;
308                let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
309                    .try_into()
310                    .map_err(|_| AilakeError::NotAnAilakeFile)?;
311                ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
312            };
313            let col_centroid = compute_centroid_and_radius(col.embeddings, col.policy.metric);
314            extra.push(ExtraVectorIndex {
315                column: col.policy.column_name.clone(),
316                dim: col.policy.dim,
317                hnsw_offset: col_ailk_start + col_header.hnsw_offset,
318                hnsw_len: col_header.hnsw_len,
319                centroid_b64: Some(encode_centroid_b64(&col_centroid)),
320                radius: Some(col_centroid.radius),
321            });
322        }
323
324        let entry = make_multi_column_data_file_entry(
325            &file_path,
326            columns[0].embeddings.len() as u64,
327            file_size,
328            &primary_centroid,
329            VectorIndexInfo {
330                column: &primary_policy.column_name,
331                dim: primary_policy.dim,
332                hnsw_offset: primary_hnsw_abs,
333                hnsw_len: primary_header.hnsw_len,
334            },
335            &extra,
336        );
337        self.pending_files.push(entry);
338        Ok(())
339    }
340
341    /// Commit all staged files as a new Iceberg snapshot.
342    pub async fn commit(mut self) -> AilakeResult<SnapshotId> {
343        let iceberg_schema = self
344            .captured_schema
345            .as_deref()
346            .map(|s| arrow_schema_to_iceberg_update(s, &self.policy, &self.extra_vec_policies));
347        let snapshot = NewSnapshot {
348            snapshot_id: new_snapshot_id(),
349            parent_snapshot_id: self.parent_snapshot_id,
350            files: std::mem::take(&mut self.pending_files),
351            operation: SnapshotOperation::Append,
352            iceberg_schema,
353        };
354        self.catalog.commit_snapshot(&self.table, snapshot).await
355    }
356
357    /// Create a table if it doesn't exist, then return a writer for it.
358    pub async fn create_or_open(
359        catalog: Arc<dyn CatalogProvider>,
360        store: Arc<dyn Store>,
361        policy: VectorStoragePolicy,
362        table: TableIdent,
363    ) -> AilakeResult<Self> {
364        // Try to load; if not found, create
365        if catalog.load_table(&table).await.is_err() {
366            catalog
367                .create_table(
368                    &table,
369                    &TableProperties {
370                        policy: policy.clone(),
371                        extra: std::collections::HashMap::new(),
372                    },
373                )
374                .await?;
375        }
376        Ok(Self::new(catalog, store, policy, table))
377    }
378}
379
380/// Convert an Arrow schema to an Iceberg schema update for catalog commits.
381///
382/// Top-level field IDs are assigned sequentially (1-based) and match the
383/// `PARQUET:field_id` stamps written by `ParquetVectorWriter`. Nested element
384/// IDs (inside List/Struct/Map) are assigned after all top-level IDs are
385/// pre-reserved, so they never collide with Parquet column field IDs.
386fn arrow_schema_to_iceberg_update(
387    schema: &arrow_schema::Schema,
388    policy: &VectorStoragePolicy,
389    extra_vec_policies: &[VectorStoragePolicy],
390) -> IcebergSchemaUpdate {
391    let bytes_per_dim = policy.precision.bytes_per_element() as u32;
392    let vec_fixed_len = policy.dim * bytes_per_dim;
393
394    // Collect all vector column names that will appear in the final schema.
395    let has_primary_in_batch = schema
396        .fields()
397        .iter()
398        .any(|f| f.name() == &policy.column_name);
399    let vec_cols: Vec<(String, u32)> = {
400        let mut v = Vec::new();
401        if !has_primary_in_batch {
402            v.push((policy.column_name.clone(), vec_fixed_len));
403        }
404        for ep in extra_vec_policies {
405            let ep_fixed_len = ep.dim * ep.precision.bytes_per_element() as u32;
406            if !schema.fields().iter().any(|f| f.name() == &ep.column_name) {
407                v.push((ep.column_name.clone(), ep_fixed_len));
408            }
409        }
410        v
411    };
412
413    // Total top-level columns = batch fields + appended vec columns.
414    let top_level_count = schema.fields().len() + vec_cols.len();
415    // Nested element IDs start after all top-level IDs are pre-reserved.
416    let mut nested_id = top_level_count as i32;
417
418    let mut fields: Vec<serde_json::Value> = Vec::new();
419    let mut name_mapping: Vec<serde_json::Value> = Vec::new();
420
421    for (idx, field) in schema.fields().iter().enumerate() {
422        let field_id = (idx + 1) as i32;
423        let iceberg_type = arrow_type_to_iceberg(field.data_type(), &mut nested_id);
424        fields.push(serde_json::json!({
425            "id": field_id,
426            "name": field.name(),
427            "required": false,
428            "type": iceberg_type,
429        }));
430        name_mapping.push(serde_json::json!({
431            "field-id": field_id,
432            "names": [field.name()],
433        }));
434    }
435
436    // Append vector columns that live outside the RecordBatch schema.
437    for (i, (col_name, fixed_len)) in vec_cols.iter().enumerate() {
438        let field_id = (schema.fields().len() + 1 + i) as i32;
439        fields.push(serde_json::json!({
440            "id": field_id,
441            "name": col_name,
442            "required": false,
443            "type": format!("fixed[{fixed_len}]"),
444        }));
445        name_mapping.push(serde_json::json!({
446            "field-id": field_id,
447            "names": [col_name],
448        }));
449    }
450
451    let last_column_id = nested_id;
452    let name_mapping_json = serde_json::to_string(&name_mapping).unwrap_or_else(|_| "[]".into());
453
454    IcebergSchemaUpdate {
455        fields,
456        last_column_id,
457        name_mapping_json,
458    }
459}
460
461/// Map an Arrow DataType to an Iceberg schema type value (string or JSON object).
462///
463/// `nested_id` is a shared counter for generating unique element/field IDs inside
464/// List, Struct, and Map types. It must start beyond all pre-reserved top-level IDs.
465fn arrow_type_to_iceberg(dt: &arrow_schema::DataType, nested_id: &mut i32) -> serde_json::Value {
466    use arrow_schema::DataType;
467    match dt {
468        DataType::Boolean => serde_json::json!("boolean"),
469        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
470            serde_json::json!("int")
471        }
472        DataType::Int64 | DataType::UInt32 | DataType::UInt64 => serde_json::json!("long"),
473        DataType::Float16 | DataType::Float32 => serde_json::json!("float"),
474        DataType::Float64 => serde_json::json!("double"),
475        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => serde_json::json!("string"),
476        DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
477            serde_json::json!("binary")
478        }
479        DataType::Date32 | DataType::Date64 => serde_json::json!("date"),
480        // Timestamp with timezone → timestamptz; without → timestamp.
481        DataType::Timestamp(_, Some(_)) => serde_json::json!("timestamptz"),
482        DataType::Timestamp(_, None) => serde_json::json!("timestamp"),
483        DataType::Time32(_) | DataType::Time64(_) => serde_json::json!("time"),
484        DataType::FixedSizeBinary(n) => serde_json::json!(format!("fixed[{n}]")),
485        DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
486            serde_json::json!(format!("decimal({p}, {s})"))
487        }
488        DataType::List(inner)
489        | DataType::LargeList(inner)
490        | DataType::ListView(inner)
491        | DataType::FixedSizeList(inner, _) => {
492            *nested_id += 1;
493            let element_id = *nested_id;
494            let element_type = arrow_type_to_iceberg(inner.data_type(), nested_id);
495            serde_json::json!({
496                "type": "list",
497                "element-id": element_id,
498                "element": element_type,
499                "element-required": !inner.is_nullable(),
500            })
501        }
502        DataType::Struct(arrow_fields) => {
503            let struct_fields: Vec<serde_json::Value> = arrow_fields
504                .iter()
505                .map(|f| {
506                    *nested_id += 1;
507                    let fid = *nested_id;
508                    let ftype = arrow_type_to_iceberg(f.data_type(), nested_id);
509                    serde_json::json!({
510                        "id": fid,
511                        "name": f.name(),
512                        "required": !f.is_nullable(),
513                        "type": ftype,
514                    })
515                })
516                .collect();
517            serde_json::json!({ "type": "struct", "fields": struct_fields })
518        }
519        DataType::Map(entries, _) => {
520            // Arrow Map is List<Struct<key: K, value: V>>.
521            *nested_id += 1;
522            let key_id = *nested_id;
523            *nested_id += 1;
524            let val_id = *nested_id;
525            if let DataType::Struct(kv_fields) = entries.data_type() {
526                let key_f = kv_fields
527                    .iter()
528                    .find(|f| f.name() == "key" || f.name() == "keys");
529                let val_f = kv_fields
530                    .iter()
531                    .find(|f| f.name() == "value" || f.name() == "values");
532                let key_type = key_f
533                    .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
534                    .unwrap_or(serde_json::json!("binary"));
535                let val_type = val_f
536                    .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
537                    .unwrap_or(serde_json::json!("binary"));
538                let val_required = val_f.map(|f| !f.is_nullable()).unwrap_or(false);
539                serde_json::json!({
540                    "type": "map",
541                    "key-id": key_id,
542                    "key": key_type,
543                    "value-id": val_id,
544                    "value": val_type,
545                    "value-required": val_required,
546                })
547            } else {
548                serde_json::json!("binary")
549            }
550        }
551        _ => serde_json::json!("binary"),
552    }
553}
554
555/// Background task: reads a Parquet-only shard, builds full AILK file, patches catalog.
556async fn build_and_patch_index(
557    store: Arc<dyn Store>,
558    catalog: Arc<dyn CatalogProvider>,
559    policy: VectorStoragePolicy,
560    table: TableIdent,
561    file_path: String,
562) -> AilakeResult<()> {
563    // Read the Parquet-only bytes already stored.
564    let parquet_bytes = store.get(&file_path).await?;
565    let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
566    let (batch, embeddings) = reader.read_parquet()?;
567
568    // Build the full AILK file (Parquet + HNSW) — CPU-intensive; run on blocking pool
569    // so the tokio async threads aren't starved when many shards build concurrently.
570    let full_bytes = tokio::task::spawn_blocking({
571        let policy = policy.clone();
572        move || {
573            let file_writer = AilakeFileWriter::new(policy);
574            file_writer.write(&batch, &embeddings)
575        }
576    })
577    .await
578    .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
579
580    // Extract HNSW offsets from the newly written file.
581    let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
582    let header = full_reader.read_header()?;
583    let ailk_start = full_reader.ailk_offset()?;
584    let hnsw_abs_offset = ailk_start + header.hnsw_offset;
585    let hnsw_len = header.hnsw_len;
586
587    // Overwrite the Parquet-only file with the full AILK version.
588    store.put(&file_path, full_bytes).await?;
589
590    // Wait for the initial writer commit to appear (HNSW builds can finish before
591    // the main write loop calls commit_snapshot, so the catalog has no snapshot yet).
592    for _ in 0..120u32 {
593        match catalog.load_table(&table).await {
594            Ok(meta) if meta.current_snapshot_id.is_some() => break,
595            _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
596        }
597    }
598
599    // Update the catalog with CAS-like retry to handle concurrent background tasks.
600    // Multiple tasks can race on commit_snapshot(Replace): the last writer wins and
601    // may overwrite a sibling task's Ready status. Retry until we confirm our file
602    // is marked Ready in the current snapshot.
603    for attempt in 0..50u32 {
604        let table_meta = catalog.load_table(&table).await?;
605        let parent_snapshot_id = table_meta.current_snapshot_id;
606        let mut files = catalog.list_files(&table, None).await?;
607
608        // Already marked Ready by a previous successful attempt.
609        if files
610            .iter()
611            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
612        {
613            break;
614        }
615
616        for f in &mut files {
617            if f.path == file_path {
618                f.hnsw_offset = Some(hnsw_abs_offset);
619                f.hnsw_len = Some(hnsw_len);
620                f.index_status = IndexStatus::Ready;
621                break;
622            }
623        }
624        catalog
625            .commit_snapshot(
626                &table,
627                NewSnapshot {
628                    snapshot_id: new_snapshot_id(),
629                    parent_snapshot_id,
630                    files,
631                    operation: SnapshotOperation::Replace,
632                    iceberg_schema: None,
633                },
634            )
635            .await?;
636
637        // Brief yield so sibling tasks can commit, then verify our change survived.
638        tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
639
640        let verify = catalog.list_files(&table, None).await?;
641        if verify
642            .iter()
643            .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
644        {
645            break;
646        }
647        // Another task overwrote us — retry.
648    }
649
650    eprintln!(
651        "[ailake] deferred HNSW built for {file_path} (offset={hnsw_abs_offset}, len={hnsw_len})"
652    );
653    Ok(())
654}
655
656#[cfg(test)]
657mod tests {
658    use super::*;
659    use ailake_core::{VectorMetric, VectorPrecision};
660    use arrow_schema::{DataType, Field, Schema, TimeUnit};
661
662    fn policy(col: &str, dim: u32) -> VectorStoragePolicy {
663        VectorStoragePolicy {
664            column_name: col.to_string(),
665            dim,
666            metric: VectorMetric::Cosine,
667            precision: VectorPrecision::F16,
668            pq: None,
669            keep_raw_for_reranking: false,
670        }
671    }
672
673    fn update_for(schema: &Schema, pol: &VectorStoragePolicy) -> IcebergSchemaUpdate {
674        arrow_schema_to_iceberg_update(schema, pol, &[])
675    }
676
677    #[test]
678    fn simple_schema_produces_correct_fields() {
679        let schema = Schema::new(vec![
680            Field::new("id", DataType::Int32, false),
681            Field::new("text", DataType::Utf8, false),
682        ]);
683        let pol = policy("embedding", 8);
684        let upd = update_for(&schema, &pol);
685
686        assert_eq!(upd.fields.len(), 3);
687        assert_eq!(upd.fields[0]["id"], 1);
688        assert_eq!(upd.fields[0]["type"], "int");
689        assert_eq!(upd.fields[1]["id"], 2);
690        assert_eq!(upd.fields[1]["type"], "string");
691        assert_eq!(upd.fields[2]["id"], 3);
692        assert_eq!(upd.fields[2]["type"], "fixed[16]"); // dim=8, F16=2 bytes
693
694        let nm: Vec<serde_json::Value> = serde_json::from_str(&upd.name_mapping_json).unwrap();
695        assert_eq!(nm.len(), 3);
696        assert_eq!(nm[2]["field-id"], 3);
697        assert_eq!(nm[2]["names"][0], "embedding");
698        assert_eq!(upd.last_column_id, 3);
699    }
700
701    #[test]
702    fn timestamp_without_tz_maps_to_timestamp_not_timestamptz() {
703        let schema = Schema::new(vec![
704            Field::new(
705                "created_at",
706                DataType::Timestamp(TimeUnit::Microsecond, None),
707                true,
708            ),
709            Field::new(
710                "updated_at",
711                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
712                true,
713            ),
714        ]);
715        let pol = policy("vec", 4);
716        let upd = update_for(&schema, &pol);
717
718        assert_eq!(upd.fields[0]["type"], "timestamp");
719        assert_eq!(upd.fields[1]["type"], "timestamptz");
720    }
721
722    #[test]
723    fn list_type_produces_iceberg_list_object() {
724        let schema = Schema::new(vec![Field::new(
725            "tags",
726            DataType::List(std::sync::Arc::new(Field::new(
727                "item",
728                DataType::Utf8,
729                true,
730            ))),
731            true,
732        )]);
733        let pol = policy("vec", 4);
734        let upd = update_for(&schema, &pol);
735
736        let t = &upd.fields[0]["type"];
737        assert_eq!(t["type"], "list");
738        assert_eq!(t["element"], "string");
739        // element-id must be > top-level field count (2: tags + vec)
740        assert!(t["element-id"].as_i64().unwrap() > 2);
741    }
742
743    #[test]
744    fn struct_type_produces_nested_fields() {
745        let schema = Schema::new(vec![Field::new(
746            "meta",
747            DataType::Struct(
748                vec![
749                    Field::new("key", DataType::Utf8, false),
750                    Field::new("val", DataType::Int64, false),
751                ]
752                .into(),
753            ),
754            true,
755        )]);
756        let pol = policy("vec", 4);
757        let upd = update_for(&schema, &pol);
758
759        let t = &upd.fields[0]["type"];
760        assert_eq!(t["type"], "struct");
761        let nested = t["fields"].as_array().unwrap();
762        assert_eq!(nested.len(), 2);
763        assert_eq!(nested[0]["name"], "key");
764        assert_eq!(nested[0]["type"], "string");
765        assert_eq!(nested[1]["name"], "val");
766        assert_eq!(nested[1]["type"], "long");
767        // Nested IDs must be > top-level count (2: meta + vec)
768        assert!(nested[0]["id"].as_i64().unwrap() > 2);
769    }
770
771    #[test]
772    fn no_duplicate_vec_column_when_already_in_batch() {
773        // If for some reason the vec column is in the batch schema, don't add it twice.
774        let schema = Schema::new(vec![
775            Field::new("id", DataType::Int32, false),
776            Field::new("embedding", DataType::FixedSizeBinary(16), false),
777        ]);
778        let pol = policy("embedding", 8);
779        let upd = update_for(&schema, &pol);
780
781        assert_eq!(upd.fields.len(), 2, "should not add embedding twice");
782        let names: Vec<&str> = upd
783            .fields
784            .iter()
785            .map(|f| f["name"].as_str().unwrap())
786            .collect();
787        assert_eq!(names.iter().filter(|&&n| n == "embedding").count(), 1);
788    }
789
790    #[test]
791    fn multi_vec_policies_all_appended() {
792        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
793        let primary = policy("embedding", 4);
794        let extra = vec![policy("context_embedding", 4)];
795        let upd = arrow_schema_to_iceberg_update(&schema, &primary, &extra);
796
797        assert_eq!(upd.fields.len(), 3); // id + embedding + context_embedding
798        let names: Vec<&str> = upd
799            .fields
800            .iter()
801            .map(|f| f["name"].as_str().unwrap())
802            .collect();
803        assert!(names.contains(&"embedding"));
804        assert!(names.contains(&"context_embedding"));
805    }
806
807    #[test]
808    fn top_level_field_ids_match_parquet_stamp_sequence() {
809        // Top-level IDs must be 1, 2, ..., N regardless of nested element IDs.
810        let schema = Schema::new(vec![
811            Field::new("id", DataType::Int64, false),
812            Field::new(
813                "tags",
814                DataType::List(std::sync::Arc::new(Field::new(
815                    "item",
816                    DataType::Utf8,
817                    true,
818                ))),
819                true,
820            ),
821        ]);
822        let pol = policy("vec", 4);
823        let upd = update_for(&schema, &pol);
824
825        // Top-level: id=1, tags=2, vec=3
826        assert_eq!(upd.fields[0]["id"], 1);
827        assert_eq!(upd.fields[1]["id"], 2);
828        assert_eq!(upd.fields[2]["id"], 3);
829
830        // Nested element ID must be > 3
831        assert!(upd.fields[1]["type"]["element-id"].as_i64().unwrap() > 3);
832    }
833}