1use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4
5use ailake_catalog::{
6 encode_centroid_b64, make_data_file_entry, make_data_file_entry_indexing,
7 make_multi_column_data_file_entry, new_snapshot_id, CatalogProvider, DataFileEntry,
8 ExtraVectorIndex, IcebergSchemaUpdate, IndexStatus, NewSnapshot, SnapshotId, SnapshotOperation,
9 TableIdent, TableProperties, VectorIndexInfo,
10};
11use ailake_core::{AilakeResult, VectorStoragePolicy};
12use ailake_file::{AilakeFileReader, AilakeFileWriter, IndexType, VectorColumnBatch};
13use ailake_index::{IvfPqCodebook, IvfPqConfig};
14use ailake_store::Store;
15use ailake_vec::compute_centroid_and_radius;
16use arrow_array::RecordBatch;
17use arrow_schema::SchemaRef;
18use bytes::Bytes;
19use serde_json;
20use tracing::{error, info};
21
22pub struct MultiVectorBatch<'a> {
24 pub policy: VectorStoragePolicy,
25 pub embeddings: &'a [Vec<f32>],
26}
27
28pub struct TableWriter {
29 catalog: Arc<dyn CatalogProvider>,
30 store: Arc<dyn Store>,
31 policy: VectorStoragePolicy,
32 table: TableIdent,
33 part_counter: Arc<AtomicU32>,
34 pending_files: Vec<DataFileEntry>,
35 parent_snapshot_id: Option<SnapshotId>,
36 captured_schema: Option<SchemaRef>,
39 extra_vec_policies: Vec<VectorStoragePolicy>,
41 cached_ivf_codebook: Option<Arc<IvfPqCodebook>>,
44 deferred_ivf_codebook: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
47}
48
49impl TableWriter {
50 pub fn new(
51 catalog: Arc<dyn CatalogProvider>,
52 store: Arc<dyn Store>,
53 policy: VectorStoragePolicy,
54 table: TableIdent,
55 ) -> Self {
56 Self {
57 catalog,
58 store,
59 policy,
60 table,
61 part_counter: Arc::new(AtomicU32::new(0)),
62 pending_files: Vec::new(),
63 parent_snapshot_id: None,
64 captured_schema: None,
65 extra_vec_policies: Vec::new(),
66 cached_ivf_codebook: None,
67 deferred_ivf_codebook: Arc::new(tokio::sync::OnceCell::new()),
68 }
69 }
70
71 pub fn with_parent_snapshot(mut self, id: SnapshotId) -> Self {
72 self.parent_snapshot_id = Some(id);
73 self
74 }
75
76 pub async fn write_batch_deferred(
86 &mut self,
87 batch: &RecordBatch,
88 embeddings: &[Vec<f32>],
89 ) -> AilakeResult<()> {
90 if self.captured_schema.is_none() {
91 self.captured_schema = Some(batch.schema());
92 }
93 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
94 let file_path = format!("data/part-{:05}.parquet", part_num);
95
96 let file_writer = AilakeFileWriter::new(self.policy.clone());
98 let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
99 let file_size = parquet_bytes.len() as u64;
100 self.store.put(&file_path, parquet_bytes).await?;
101
102 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
104 let entry = make_data_file_entry_indexing(
105 &file_path,
106 embeddings.len() as u64,
107 file_size,
108 ¢roid,
109 &self.policy.column_name,
110 self.policy.dim,
111 );
112 self.pending_files.push(entry);
113
114 let store = self.store.clone();
116 let catalog = self.catalog.clone();
117 let policy = self.policy.clone();
118 let table = self.table.clone();
119 let fp = file_path.clone();
120 tokio::spawn(async move {
121 if let Err(e) = build_and_patch_index(store, catalog, policy, table, fp).await {
122 error!(
123 "ailake: deferred HNSW build failed — file is indexed as Parquet-only until \
124 next compaction rebuilds the index: {}",
125 e
126 );
127 }
128 });
129
130 Ok(())
131 }
132
133 pub async fn write_batch_ivf_pq_deferred(
139 &mut self,
140 batch: &RecordBatch,
141 embeddings: &[Vec<f32>],
142 ivf_config: IvfPqConfig,
143 ) -> AilakeResult<()> {
144 if self.captured_schema.is_none() {
145 self.captured_schema = Some(batch.schema());
146 }
147 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
148 let file_path = format!("data/part-{:05}.parquet", part_num);
149
150 let file_writer = AilakeFileWriter::new(self.policy.clone());
151 let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
152 let file_size = parquet_bytes.len() as u64;
153 self.store.put(&file_path, parquet_bytes).await?;
154
155 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
156 let entry = make_data_file_entry_indexing(
157 &file_path,
158 embeddings.len() as u64,
159 file_size,
160 ¢roid,
161 &self.policy.column_name,
162 self.policy.dim,
163 );
164 self.pending_files.push(entry);
165
166 let store = self.store.clone();
167 let catalog = self.catalog.clone();
168 let policy = self.policy.clone();
169 let table = self.table.clone();
170 let fp = file_path.clone();
171 let codebook_cell = self.deferred_ivf_codebook.clone();
172 tokio::spawn(async move {
173 if let Err(e) = build_ivf_pq_and_patch_index(
174 store,
175 catalog,
176 policy,
177 table,
178 fp,
179 ivf_config,
180 codebook_cell,
181 )
182 .await
183 {
184 error!(
185 "ailake: deferred IVF-PQ build failed — file is indexed as Parquet-only until \
186 next compaction rebuilds the index: {}",
187 e
188 );
189 }
190 });
191
192 Ok(())
193 }
194
195 pub async fn write_batch_idempotent(
204 &mut self,
205 batch: &RecordBatch,
206 embeddings: &[Vec<f32>],
207 batch_id: &str,
208 ) -> AilakeResult<()> {
209 let existing = self.catalog.list_files(&self.table, None).await?;
210 if existing
211 .iter()
212 .any(|f| f.batch_id.as_deref() == Some(batch_id))
213 {
214 return Ok(());
215 }
216 self.write_batch_with_id(batch, embeddings, Some(batch_id.to_string()))
217 .await
218 }
219
220 pub async fn write_batch(
222 &mut self,
223 batch: &RecordBatch,
224 embeddings: &[Vec<f32>],
225 ) -> AilakeResult<()> {
226 self.write_batch_with_id(batch, embeddings, None).await
227 }
228
229 async fn write_batch_with_id(
230 &mut self,
231 batch: &RecordBatch,
232 embeddings: &[Vec<f32>],
233 batch_id: Option<String>,
234 ) -> AilakeResult<()> {
235 if self.captured_schema.is_none() {
236 self.captured_schema = Some(batch.schema());
237 }
238 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
239 let file_path = format!("data/part-{:05}.parquet", part_num);
240
241 let file_writer = AilakeFileWriter::new(self.policy.clone());
243 let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
244 let file_size = file_bytes.len() as u64;
245
246 self.store.put(&file_path, file_bytes.clone()).await?;
248
249 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
251
252 let reader = ailake_file::AilakeFileReader::new(
254 file_bytes,
255 &self.policy.column_name,
256 self.policy.dim,
257 );
258 let header = reader.read_header()?;
259 let ailk_start = reader.ailk_offset()?;
260 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
261 let hnsw_len = header.hnsw_len;
262
263 let mut entry = make_data_file_entry(
264 &file_path,
265 embeddings.len() as u64,
266 file_size,
267 ¢roid,
268 VectorIndexInfo {
269 column: &self.policy.column_name,
270 dim: self.policy.dim,
271 hnsw_offset: hnsw_abs_offset,
272 hnsw_len,
273 },
274 );
275 entry.batch_id = batch_id;
276 self.pending_files.push(entry);
277 Ok(())
278 }
279
280 pub async fn write_batch_auto(
286 &mut self,
287 batch: &RecordBatch,
288 embeddings: &[Vec<f32>],
289 ) -> AilakeResult<()> {
290 let profile = ailake_index::HardwareProfile::detect();
291 if profile.recommend_ivf_pq(embeddings.len()) {
292 let ivf_config =
293 ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
294 self.write_batch_ivf_pq(batch, embeddings, ivf_config).await
295 } else {
296 self.write_batch(batch, embeddings).await
297 }
298 }
299
300 pub async fn write_batch_ivf_pq(
304 &mut self,
305 batch: &RecordBatch,
306 embeddings: &[Vec<f32>],
307 ivf_config: IvfPqConfig,
308 ) -> AilakeResult<()> {
309 if self.captured_schema.is_none() {
310 self.captured_schema = Some(batch.schema());
311 }
312 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
313 let file_path = format!("data/part-{:05}.parquet", part_num);
314
315 if self.cached_ivf_codebook.is_none() {
319 let codebook = tokio::task::spawn_blocking({
320 let embeddings = embeddings.to_vec();
321 let metric = self.policy.metric;
322 let config = ivf_config.clone();
323 move || ailake_index::IvfPqIndex::train_codebook(&embeddings, metric, &config)
324 })
325 .await
326 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
327 self.cached_ivf_codebook = Some(Arc::new(codebook));
328 }
329 let codebook = self.cached_ivf_codebook.as_ref().unwrap().clone();
330
331 let file_writer = AilakeFileWriter::new(self.policy.clone())
332 .with_index_type(IndexType::IvfPq(ivf_config))
333 .with_shared_ivf_codebook(codebook);
334 let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
335 let file_size = file_bytes.len() as u64;
336
337 self.store.put(&file_path, file_bytes.clone()).await?;
338
339 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
340
341 let reader = ailake_file::AilakeFileReader::new(
342 file_bytes,
343 &self.policy.column_name,
344 self.policy.dim,
345 );
346 let header = reader.read_header()?;
347 let ailk_start = reader.ailk_offset()?;
348 let index_abs_offset = ailk_start + header.hnsw_offset;
349 let index_len = header.hnsw_len;
350
351 let entry = make_data_file_entry(
352 &file_path,
353 embeddings.len() as u64,
354 file_size,
355 ¢roid,
356 VectorIndexInfo {
357 column: &self.policy.column_name,
358 dim: self.policy.dim,
359 hnsw_offset: index_abs_offset,
360 hnsw_len: index_len,
361 },
362 );
363 self.pending_files.push(entry);
364 Ok(())
365 }
366
367 pub async fn write_batch_multi(
372 &mut self,
373 batch: &RecordBatch,
374 columns: &[MultiVectorBatch<'_>],
375 ) -> AilakeResult<()> {
376 use ailake_core::AilakeError;
377 if self.captured_schema.is_none() {
378 self.captured_schema = Some(batch.schema());
379 }
380 if self.extra_vec_policies.is_empty() && columns.len() > 1 {
381 self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
382 }
383
384 if columns.is_empty() {
385 return Err(AilakeError::InvalidArgument(
386 "write_batch_multi requires at least one column".into(),
387 ));
388 }
389
390 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
391 let file_path = format!("data/part-{:05}.parquet", part_num);
392
393 let col_batches: Vec<VectorColumnBatch<'_>> = columns
394 .iter()
395 .map(|c| VectorColumnBatch {
396 policy: &c.policy,
397 embeddings: c.embeddings,
398 })
399 .collect();
400
401 let primary_policy = &columns[0].policy;
402 let file_writer = AilakeFileWriter::new(primary_policy.clone());
403 let file_bytes: Bytes = file_writer.write_multi(batch, &col_batches)?;
404 let file_size = file_bytes.len() as u64;
405
406 self.store.put(&file_path, file_bytes.clone()).await?;
407
408 let primary_centroid =
410 compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
411
412 let reader = ailake_file::AilakeFileReader::new(
414 file_bytes.clone(),
415 &primary_policy.column_name,
416 primary_policy.dim,
417 );
418 let primary_ailk_start = reader.ailk_offset()?;
419 let primary_header = {
420 use ailake_file::HEADER_SIZE;
421 let start = primary_ailk_start as usize;
422 let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
423 .try_into()
424 .map_err(|_| AilakeError::NotAnAilakeFile)?;
425 ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
426 };
427 let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
428
429 let mut extra: Vec<ExtraVectorIndex> = Vec::new();
431 for col in columns.iter().skip(1) {
432 let col_ailk_start = reader.ailk_offset_for_column(&col.policy.column_name)?;
433 let col_header = {
434 use ailake_file::HEADER_SIZE;
435 let start = col_ailk_start as usize;
436 let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
437 .try_into()
438 .map_err(|_| AilakeError::NotAnAilakeFile)?;
439 ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
440 };
441 let col_centroid = compute_centroid_and_radius(col.embeddings, col.policy.metric);
442 extra.push(ExtraVectorIndex {
443 column: col.policy.column_name.clone(),
444 dim: col.policy.dim,
445 hnsw_offset: col_ailk_start + col_header.hnsw_offset,
446 hnsw_len: col_header.hnsw_len,
447 centroid_b64: Some(encode_centroid_b64(&col_centroid)),
448 radius: Some(col_centroid.radius),
449 });
450 }
451
452 let entry = make_multi_column_data_file_entry(
453 &file_path,
454 columns[0].embeddings.len() as u64,
455 file_size,
456 &primary_centroid,
457 VectorIndexInfo {
458 column: &primary_policy.column_name,
459 dim: primary_policy.dim,
460 hnsw_offset: primary_hnsw_abs,
461 hnsw_len: primary_header.hnsw_len,
462 },
463 &extra,
464 );
465 self.pending_files.push(entry);
466 Ok(())
467 }
468
469 pub async fn commit(mut self) -> AilakeResult<SnapshotId> {
475 if self.pending_files.is_empty() {
476 let current = self
477 .catalog
478 .load_table(&self.table)
479 .await
480 .ok()
481 .and_then(|m| m.current_snapshot_id)
482 .unwrap_or(0);
483 return Ok(current);
484 }
485 let iceberg_schema = self
486 .captured_schema
487 .as_deref()
488 .map(|s| arrow_schema_to_iceberg_update(s, &self.policy, &self.extra_vec_policies));
489 let snapshot = NewSnapshot {
490 snapshot_id: new_snapshot_id(),
491 parent_snapshot_id: self.parent_snapshot_id,
492 files: std::mem::take(&mut self.pending_files),
493 operation: SnapshotOperation::Append,
494 iceberg_schema,
495 };
496 self.catalog.commit_snapshot(&self.table, snapshot).await
497 }
498
499 pub async fn create_or_open(
501 catalog: Arc<dyn CatalogProvider>,
502 store: Arc<dyn Store>,
503 policy: VectorStoragePolicy,
504 table: TableIdent,
505 ) -> AilakeResult<Self> {
506 if catalog.load_table(&table).await.is_err() {
508 catalog
509 .create_table(
510 &table,
511 &TableProperties {
512 policy: policy.clone(),
513 extra: std::collections::HashMap::new(),
514 },
515 )
516 .await?;
517 }
518 Ok(Self::new(catalog, store, policy, table))
519 }
520}
521
522fn arrow_schema_to_iceberg_update(
529 schema: &arrow_schema::Schema,
530 policy: &VectorStoragePolicy,
531 extra_vec_policies: &[VectorStoragePolicy],
532) -> IcebergSchemaUpdate {
533 let bytes_per_dim = policy.precision.bytes_per_element() as u32;
534 let vec_fixed_len = policy.dim * bytes_per_dim;
535
536 let has_primary_in_batch = schema
538 .fields()
539 .iter()
540 .any(|f| f.name() == &policy.column_name);
541 let vec_cols: Vec<(String, u32)> = {
542 let mut v = Vec::new();
543 if !has_primary_in_batch {
544 v.push((policy.column_name.clone(), vec_fixed_len));
545 }
546 for ep in extra_vec_policies {
547 let ep_fixed_len = ep.dim * ep.precision.bytes_per_element() as u32;
548 if !schema.fields().iter().any(|f| f.name() == &ep.column_name) {
549 v.push((ep.column_name.clone(), ep_fixed_len));
550 }
551 }
552 v
553 };
554
555 let top_level_count = schema.fields().len() + vec_cols.len();
557 let mut nested_id = top_level_count as i32;
559
560 let mut fields: Vec<serde_json::Value> = Vec::new();
561 let mut name_mapping: Vec<serde_json::Value> = Vec::new();
562
563 for (idx, field) in schema.fields().iter().enumerate() {
564 let field_id = (idx + 1) as i32;
565 let iceberg_type = arrow_type_to_iceberg(field.data_type(), &mut nested_id);
566 fields.push(serde_json::json!({
567 "id": field_id,
568 "name": field.name(),
569 "required": false,
570 "type": iceberg_type,
571 }));
572 name_mapping.push(serde_json::json!({
573 "field-id": field_id,
574 "names": [field.name()],
575 }));
576 }
577
578 for (i, (col_name, fixed_len)) in vec_cols.iter().enumerate() {
580 let field_id = (schema.fields().len() + 1 + i) as i32;
581 fields.push(serde_json::json!({
582 "id": field_id,
583 "name": col_name,
584 "required": false,
585 "type": format!("fixed[{fixed_len}]"),
586 }));
587 name_mapping.push(serde_json::json!({
588 "field-id": field_id,
589 "names": [col_name],
590 }));
591 }
592
593 let last_column_id = nested_id;
594 let name_mapping_json = serde_json::to_string(&name_mapping).unwrap_or_else(|_| "[]".into());
595
596 IcebergSchemaUpdate {
597 fields,
598 last_column_id,
599 name_mapping_json,
600 }
601}
602
603fn arrow_type_to_iceberg(dt: &arrow_schema::DataType, nested_id: &mut i32) -> serde_json::Value {
608 use arrow_schema::DataType;
609 match dt {
610 DataType::Boolean => serde_json::json!("boolean"),
611 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
612 serde_json::json!("int")
613 }
614 DataType::Int64 | DataType::UInt32 | DataType::UInt64 => serde_json::json!("long"),
615 DataType::Float16 | DataType::Float32 => serde_json::json!("float"),
616 DataType::Float64 => serde_json::json!("double"),
617 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => serde_json::json!("string"),
618 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
619 serde_json::json!("binary")
620 }
621 DataType::Date32 | DataType::Date64 => serde_json::json!("date"),
622 DataType::Timestamp(_, Some(_)) => serde_json::json!("timestamptz"),
624 DataType::Timestamp(_, None) => serde_json::json!("timestamp"),
625 DataType::Time32(_) | DataType::Time64(_) => serde_json::json!("time"),
626 DataType::FixedSizeBinary(n) => serde_json::json!(format!("fixed[{n}]")),
627 DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
628 serde_json::json!(format!("decimal({p}, {s})"))
629 }
630 DataType::List(inner)
631 | DataType::LargeList(inner)
632 | DataType::ListView(inner)
633 | DataType::FixedSizeList(inner, _) => {
634 *nested_id += 1;
635 let element_id = *nested_id;
636 let element_type = arrow_type_to_iceberg(inner.data_type(), nested_id);
637 serde_json::json!({
638 "type": "list",
639 "element-id": element_id,
640 "element": element_type,
641 "element-required": !inner.is_nullable(),
642 })
643 }
644 DataType::Struct(arrow_fields) => {
645 let struct_fields: Vec<serde_json::Value> = arrow_fields
646 .iter()
647 .map(|f| {
648 *nested_id += 1;
649 let fid = *nested_id;
650 let ftype = arrow_type_to_iceberg(f.data_type(), nested_id);
651 serde_json::json!({
652 "id": fid,
653 "name": f.name(),
654 "required": !f.is_nullable(),
655 "type": ftype,
656 })
657 })
658 .collect();
659 serde_json::json!({ "type": "struct", "fields": struct_fields })
660 }
661 DataType::Map(entries, _) => {
662 *nested_id += 1;
664 let key_id = *nested_id;
665 *nested_id += 1;
666 let val_id = *nested_id;
667 if let DataType::Struct(kv_fields) = entries.data_type() {
668 let key_f = kv_fields
669 .iter()
670 .find(|f| f.name() == "key" || f.name() == "keys");
671 let val_f = kv_fields
672 .iter()
673 .find(|f| f.name() == "value" || f.name() == "values");
674 let key_type = key_f
675 .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
676 .unwrap_or(serde_json::json!("binary"));
677 let val_type = val_f
678 .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
679 .unwrap_or(serde_json::json!("binary"));
680 let val_required = val_f.map(|f| !f.is_nullable()).unwrap_or(false);
681 serde_json::json!({
682 "type": "map",
683 "key-id": key_id,
684 "key": key_type,
685 "value-id": val_id,
686 "value": val_type,
687 "value-required": val_required,
688 })
689 } else {
690 serde_json::json!("binary")
691 }
692 }
693 _ => serde_json::json!("binary"),
694 }
695}
696
697async fn build_and_patch_index(
699 store: Arc<dyn Store>,
700 catalog: Arc<dyn CatalogProvider>,
701 policy: VectorStoragePolicy,
702 table: TableIdent,
703 file_path: String,
704) -> AilakeResult<()> {
705 let parquet_bytes = store.get(&file_path).await?;
707 let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
708 let (batch, embeddings) = reader.read_parquet()?;
709
710 let full_bytes = tokio::task::spawn_blocking({
713 let policy = policy.clone();
714 move || {
715 let file_writer = AilakeFileWriter::new(policy);
716 file_writer.write(&batch, &embeddings)
717 }
718 })
719 .await
720 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
721
722 let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
724 let header = full_reader.read_header()?;
725 let ailk_start = full_reader.ailk_offset()?;
726 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
727 let hnsw_len = header.hnsw_len;
728
729 store.put(&file_path, full_bytes).await?;
731
732 for _ in 0..120u32 {
735 match catalog.load_table(&table).await {
736 Ok(meta) if meta.current_snapshot_id.is_some() => break,
737 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
738 }
739 }
740
741 for attempt in 0..50u32 {
746 let table_meta = catalog.load_table(&table).await?;
747 let parent_snapshot_id = table_meta.current_snapshot_id;
748 let mut files = catalog.list_files(&table, None).await?;
749
750 if files
752 .iter()
753 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
754 {
755 break;
756 }
757
758 for f in &mut files {
759 if f.path == file_path {
760 f.hnsw_offset = Some(hnsw_abs_offset);
761 f.hnsw_len = Some(hnsw_len);
762 f.index_status = IndexStatus::Ready;
763 break;
764 }
765 }
766 catalog
767 .commit_snapshot(
768 &table,
769 NewSnapshot {
770 snapshot_id: new_snapshot_id(),
771 parent_snapshot_id,
772 files,
773 operation: SnapshotOperation::Replace,
774 iceberg_schema: None,
775 },
776 )
777 .await?;
778
779 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
781
782 let verify = catalog.list_files(&table, None).await?;
783 if verify
784 .iter()
785 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
786 {
787 break;
788 }
789 }
791
792 info!(
793 "ailake: deferred HNSW index built for {} (offset={}, len={})",
794 file_path, hnsw_abs_offset, hnsw_len
795 );
796 Ok(())
797}
798
799async fn build_ivf_pq_and_patch_index(
804 store: Arc<dyn Store>,
805 catalog: Arc<dyn CatalogProvider>,
806 policy: VectorStoragePolicy,
807 table: TableIdent,
808 file_path: String,
809 ivf_config: IvfPqConfig,
810 codebook_cell: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
811) -> AilakeResult<()> {
812 let parquet_bytes = store.get(&file_path).await?;
813 let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
814 let (batch, embeddings) = reader.read_parquet()?;
815
816 let codebook = codebook_cell
818 .get_or_try_init(|| async {
819 let vecs = embeddings.clone();
820 let metric = policy.metric;
821 let cfg = ivf_config.clone();
822 tokio::task::spawn_blocking(move || {
823 ailake_index::IvfPqIndex::train_codebook(&vecs, metric, &cfg)
824 })
825 .await
826 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))?
827 })
828 .await?;
829
830 let full_bytes = tokio::task::spawn_blocking({
831 let policy = policy.clone();
832 let codebook = codebook.clone();
833 move || {
834 let file_writer = AilakeFileWriter::new(policy)
835 .with_index_type(IndexType::IvfPq(ivf_config))
836 .with_shared_ivf_codebook(Arc::new(codebook));
837 file_writer.write(&batch, &embeddings)
838 }
839 })
840 .await
841 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
842
843 let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
844 let header = full_reader.read_header()?;
845 let ailk_start = full_reader.ailk_offset()?;
846 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
847 let hnsw_len = header.hnsw_len;
848
849 store.put(&file_path, full_bytes).await?;
850
851 for _ in 0..120u32 {
853 match catalog.load_table(&table).await {
854 Ok(meta) if meta.current_snapshot_id.is_some() => break,
855 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
856 }
857 }
858
859 for attempt in 0..50u32 {
860 let table_meta = catalog.load_table(&table).await?;
861 let parent_snapshot_id = table_meta.current_snapshot_id;
862 let mut files = catalog.list_files(&table, None).await?;
863
864 if files
865 .iter()
866 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
867 {
868 break;
869 }
870
871 for f in &mut files {
872 if f.path == file_path {
873 f.hnsw_offset = Some(hnsw_abs_offset);
874 f.hnsw_len = Some(hnsw_len);
875 f.index_status = IndexStatus::Ready;
876 break;
877 }
878 }
879 catalog
880 .commit_snapshot(
881 &table,
882 NewSnapshot {
883 snapshot_id: new_snapshot_id(),
884 parent_snapshot_id,
885 files,
886 operation: SnapshotOperation::Replace,
887 iceberg_schema: None,
888 },
889 )
890 .await?;
891
892 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
893
894 let verify = catalog.list_files(&table, None).await?;
895 if verify
896 .iter()
897 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
898 {
899 break;
900 }
901 }
902
903 info!(
904 "ailake: deferred IVF-PQ index built for {} (offset={}, len={})",
905 file_path, hnsw_abs_offset, hnsw_len
906 );
907 Ok(())
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913 use ailake_core::{VectorMetric, VectorPrecision};
914 use arrow_schema::{DataType, Field, Schema, TimeUnit};
915
916 fn policy(col: &str, dim: u32) -> VectorStoragePolicy {
917 VectorStoragePolicy {
918 column_name: col.to_string(),
919 dim,
920 metric: VectorMetric::Cosine,
921 precision: VectorPrecision::F16,
922 pq: None,
923 keep_raw_for_reranking: false,
924 pre_normalize: false,
925 hnsw_m: None,
926 hnsw_ef_construction: None,
927 rabitq: None,
928 }
929 }
930
931 fn update_for(schema: &Schema, pol: &VectorStoragePolicy) -> IcebergSchemaUpdate {
932 arrow_schema_to_iceberg_update(schema, pol, &[])
933 }
934
935 #[test]
936 fn simple_schema_produces_correct_fields() {
937 let schema = Schema::new(vec![
938 Field::new("id", DataType::Int32, false),
939 Field::new("text", DataType::Utf8, false),
940 ]);
941 let pol = policy("embedding", 8);
942 let upd = update_for(&schema, &pol);
943
944 assert_eq!(upd.fields.len(), 3);
945 assert_eq!(upd.fields[0]["id"], 1);
946 assert_eq!(upd.fields[0]["type"], "int");
947 assert_eq!(upd.fields[1]["id"], 2);
948 assert_eq!(upd.fields[1]["type"], "string");
949 assert_eq!(upd.fields[2]["id"], 3);
950 assert_eq!(upd.fields[2]["type"], "fixed[16]"); let nm: Vec<serde_json::Value> = serde_json::from_str(&upd.name_mapping_json).unwrap();
953 assert_eq!(nm.len(), 3);
954 assert_eq!(nm[2]["field-id"], 3);
955 assert_eq!(nm[2]["names"][0], "embedding");
956 assert_eq!(upd.last_column_id, 3);
957 }
958
959 #[test]
960 fn timestamp_without_tz_maps_to_timestamp_not_timestamptz() {
961 let schema = Schema::new(vec![
962 Field::new(
963 "created_at",
964 DataType::Timestamp(TimeUnit::Microsecond, None),
965 true,
966 ),
967 Field::new(
968 "updated_at",
969 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
970 true,
971 ),
972 ]);
973 let pol = policy("vec", 4);
974 let upd = update_for(&schema, &pol);
975
976 assert_eq!(upd.fields[0]["type"], "timestamp");
977 assert_eq!(upd.fields[1]["type"], "timestamptz");
978 }
979
980 #[test]
981 fn list_type_produces_iceberg_list_object() {
982 let schema = Schema::new(vec![Field::new(
983 "tags",
984 DataType::List(std::sync::Arc::new(Field::new(
985 "item",
986 DataType::Utf8,
987 true,
988 ))),
989 true,
990 )]);
991 let pol = policy("vec", 4);
992 let upd = update_for(&schema, &pol);
993
994 let t = &upd.fields[0]["type"];
995 assert_eq!(t["type"], "list");
996 assert_eq!(t["element"], "string");
997 assert!(t["element-id"].as_i64().unwrap() > 2);
999 }
1000
1001 #[test]
1002 fn struct_type_produces_nested_fields() {
1003 let schema = Schema::new(vec![Field::new(
1004 "meta",
1005 DataType::Struct(
1006 vec![
1007 Field::new("key", DataType::Utf8, false),
1008 Field::new("val", DataType::Int64, false),
1009 ]
1010 .into(),
1011 ),
1012 true,
1013 )]);
1014 let pol = policy("vec", 4);
1015 let upd = update_for(&schema, &pol);
1016
1017 let t = &upd.fields[0]["type"];
1018 assert_eq!(t["type"], "struct");
1019 let nested = t["fields"].as_array().unwrap();
1020 assert_eq!(nested.len(), 2);
1021 assert_eq!(nested[0]["name"], "key");
1022 assert_eq!(nested[0]["type"], "string");
1023 assert_eq!(nested[1]["name"], "val");
1024 assert_eq!(nested[1]["type"], "long");
1025 assert!(nested[0]["id"].as_i64().unwrap() > 2);
1027 }
1028
1029 #[test]
1030 fn no_duplicate_vec_column_when_already_in_batch() {
1031 let schema = Schema::new(vec![
1033 Field::new("id", DataType::Int32, false),
1034 Field::new("embedding", DataType::FixedSizeBinary(16), false),
1035 ]);
1036 let pol = policy("embedding", 8);
1037 let upd = update_for(&schema, &pol);
1038
1039 assert_eq!(upd.fields.len(), 2, "should not add embedding twice");
1040 let names: Vec<&str> = upd
1041 .fields
1042 .iter()
1043 .map(|f| f["name"].as_str().unwrap())
1044 .collect();
1045 assert_eq!(names.iter().filter(|&&n| n == "embedding").count(), 1);
1046 }
1047
1048 #[test]
1049 fn multi_vec_policies_all_appended() {
1050 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1051 let primary = policy("embedding", 4);
1052 let extra = vec![policy("context_embedding", 4)];
1053 let upd = arrow_schema_to_iceberg_update(&schema, &primary, &extra);
1054
1055 assert_eq!(upd.fields.len(), 3); let names: Vec<&str> = upd
1057 .fields
1058 .iter()
1059 .map(|f| f["name"].as_str().unwrap())
1060 .collect();
1061 assert!(names.contains(&"embedding"));
1062 assert!(names.contains(&"context_embedding"));
1063 }
1064
1065 #[test]
1066 fn top_level_field_ids_match_parquet_stamp_sequence() {
1067 let schema = Schema::new(vec![
1069 Field::new("id", DataType::Int64, false),
1070 Field::new(
1071 "tags",
1072 DataType::List(std::sync::Arc::new(Field::new(
1073 "item",
1074 DataType::Utf8,
1075 true,
1076 ))),
1077 true,
1078 ),
1079 ]);
1080 let pol = policy("vec", 4);
1081 let upd = update_for(&schema, &pol);
1082
1083 assert_eq!(upd.fields[0]["id"], 1);
1085 assert_eq!(upd.fields[1]["id"], 2);
1086 assert_eq!(upd.fields[2]["id"], 3);
1087
1088 assert!(upd.fields[1]["type"]["element-id"].as_i64().unwrap() > 3);
1090 }
1091}