1use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4
5use ailake_catalog::{
6 encode_centroid_b64, make_data_file_entry, make_data_file_entry_indexing,
7 make_multi_column_data_file_entry, new_snapshot_id, CatalogProvider, DataFileEntry,
8 ExtraVectorIndex, IcebergSchemaUpdate, IndexStatus, NewSnapshot, SnapshotId, SnapshotOperation,
9 TableIdent, TableProperties, VectorIndexInfo,
10};
11use ailake_core::{AilakeError, AilakeResult, EmbeddingModelInfo, VectorStoragePolicy};
12use ailake_file::{AilakeFileReader, AilakeFileWriter, IndexType, VectorColumnBatch};
13use ailake_index::{IvfPqCodebook, IvfPqConfig};
14use ailake_store::Store;
15use ailake_vec::compute_centroid_and_radius;
16use arrow_array::Array;
17use arrow_array::RecordBatch;
18use arrow_schema::SchemaRef;
19use bytes::Bytes;
20use serde_json;
21use tracing::{error, info, warn};
22
23fn apply_partition_transforms(policy: &VectorStoragePolicy, raw: Option<&str>) -> Option<String> {
28 let raw = raw?;
29 if policy.partition_fields.is_empty() {
30 return Some(raw.to_string());
31 }
32 let parts: Vec<&str> = raw.split('\x1f').collect();
33 let transformed: Vec<String> = policy
34 .partition_fields
35 .iter()
36 .enumerate()
37 .map(|(i, pf)| {
38 let v = parts.get(i).copied().unwrap_or("");
39 pf.apply(v)
40 })
41 .collect();
42 Some(transformed.join("\x1f"))
43}
44
45pub struct MultiVectorBatch<'a> {
47 pub policy: VectorStoragePolicy,
48 pub embeddings: &'a [Vec<f32>],
49}
50
51pub struct TableWriter {
52 catalog: Arc<dyn CatalogProvider>,
53 store: Arc<dyn Store>,
54 policy: VectorStoragePolicy,
55 table: TableIdent,
56 part_counter: Arc<AtomicU32>,
57 pending_files: Vec<DataFileEntry>,
58 parent_snapshot_id: Option<SnapshotId>,
59 captured_schema: Option<SchemaRef>,
62 extra_vec_policies: Vec<VectorStoragePolicy>,
64 cached_ivf_codebook: Option<Arc<IvfPqCodebook>>,
67 deferred_ivf_codebook: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
70 bm25_text_column: Option<String>,
74 pending_blooms: Vec<(String, Vec<u8>)>,
77}
78
79impl TableWriter {
80 pub fn new(
81 catalog: Arc<dyn CatalogProvider>,
82 store: Arc<dyn Store>,
83 policy: VectorStoragePolicy,
84 table: TableIdent,
85 ) -> Self {
86 Self {
87 catalog,
88 store,
89 policy,
90 table,
91 part_counter: Arc::new(AtomicU32::new(0)),
92 pending_files: Vec::new(),
93 parent_snapshot_id: None,
94 captured_schema: None,
95 extra_vec_policies: Vec::new(),
96 cached_ivf_codebook: None,
97 deferred_ivf_codebook: Arc::new(tokio::sync::OnceCell::new()),
98 bm25_text_column: None,
99 pending_blooms: Vec::new(),
100 }
101 }
102
103 pub fn with_bm25(mut self, text_column: impl Into<String>) -> Self {
111 self.bm25_text_column = Some(text_column.into());
112 self
113 }
114
115 pub fn with_parent_snapshot(mut self, id: SnapshotId) -> Self {
116 self.parent_snapshot_id = Some(id);
117 self
118 }
119
120 pub async fn write_batch_deferred(
130 &mut self,
131 batch: &RecordBatch,
132 embeddings: &[Vec<f32>],
133 ) -> AilakeResult<()> {
134 self.validate_embedding_dim(embeddings)?;
135 if self.captured_schema.is_none() {
136 self.captured_schema = Some(batch.schema());
137 }
138 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
139 let file_path = format!("data/part-{:05}.parquet", part_num);
140
141 let file_writer = AilakeFileWriter::new(self.policy.clone());
143 let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
144 let file_size = parquet_bytes.len() as u64;
145 self.store.put(&file_path, parquet_bytes).await?;
146
147 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
149 let mut entry = make_data_file_entry_indexing(
150 &file_path,
151 embeddings.len() as u64,
152 file_size,
153 ¢roid,
154 &self.policy.column_name,
155 self.policy.dim,
156 );
157 entry.embedding_model = self
158 .policy
159 .embedding_model
160 .as_ref()
161 .map(|m| m.to_property_value());
162 entry.partition_value =
163 apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
164 self.pending_files.push(entry);
165
166 let store = self.store.clone();
168 let catalog = self.catalog.clone();
169 let policy = self.policy.clone();
170 let table = self.table.clone();
171 let fp = file_path.clone();
172 tokio::spawn(async move {
173 if let Err(e) = build_and_patch_index(store, catalog, policy, table, fp).await {
174 error!(
175 "ailake: deferred HNSW build failed — file is indexed as Parquet-only until \
176 next compaction rebuilds the index: {}",
177 e
178 );
179 }
180 });
181
182 if self.bm25_text_column.is_some() {
184 self.update_bm25_stats_from_batch(batch).await?;
185 self.build_bloom_for_file(batch, &file_path);
186 }
187
188 Ok(())
189 }
190
191 pub async fn write_batch_ivf_pq_deferred(
197 &mut self,
198 batch: &RecordBatch,
199 embeddings: &[Vec<f32>],
200 ivf_config: IvfPqConfig,
201 ) -> AilakeResult<()> {
202 if self.captured_schema.is_none() {
203 self.captured_schema = Some(batch.schema());
204 }
205 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
206 let file_path = format!("data/part-{:05}.parquet", part_num);
207
208 let file_writer = AilakeFileWriter::new(self.policy.clone());
209 let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
210 let file_size = parquet_bytes.len() as u64;
211 self.store.put(&file_path, parquet_bytes).await?;
212
213 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
214 let mut entry = make_data_file_entry_indexing(
215 &file_path,
216 embeddings.len() as u64,
217 file_size,
218 ¢roid,
219 &self.policy.column_name,
220 self.policy.dim,
221 );
222 entry.embedding_model = self
223 .policy
224 .embedding_model
225 .as_ref()
226 .map(|m| m.to_property_value());
227 entry.partition_value =
228 apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
229 self.pending_files.push(entry);
230
231 let store = self.store.clone();
232 let catalog = self.catalog.clone();
233 let policy = self.policy.clone();
234 let table = self.table.clone();
235 let fp = file_path.clone();
236 let codebook_cell = self.deferred_ivf_codebook.clone();
237 tokio::spawn(async move {
238 if let Err(e) = build_ivf_pq_and_patch_index(
239 store,
240 catalog,
241 policy,
242 table,
243 fp,
244 ivf_config,
245 codebook_cell,
246 )
247 .await
248 {
249 error!(
250 "ailake: deferred IVF-PQ build failed — file is indexed as Parquet-only until \
251 next compaction rebuilds the index: {}",
252 e
253 );
254 }
255 });
256
257 Ok(())
258 }
259
260 pub async fn write_batch_idempotent(
269 &mut self,
270 batch: &RecordBatch,
271 embeddings: &[Vec<f32>],
272 batch_id: &str,
273 ) -> AilakeResult<()> {
274 let existing = self.catalog.list_files(&self.table, None).await?;
275 if existing
276 .iter()
277 .any(|f| f.batch_id.as_deref() == Some(batch_id))
278 {
279 return Ok(());
280 }
281 self.write_batch_with_id(batch, embeddings, Some(batch_id.to_string()))
282 .await
283 }
284
285 fn validate_embedding_dim(&self, embeddings: &[Vec<f32>]) -> AilakeResult<()> {
290 if let Some(first) = embeddings.first() {
291 let actual = first.len() as u32;
292 if actual != self.policy.dim {
293 let table_model = self
294 .policy
295 .embedding_model
296 .as_ref()
297 .map(|m| m.to_property_value())
298 .unwrap_or_else(|| format!("dim={}", self.policy.dim));
299 return Err(AilakeError::ModelMismatch {
300 table_model,
301 table_dim: self.policy.dim,
302 batch_model: format!("dim={}", actual),
303 batch_dim: actual,
304 });
305 }
306 }
307 Ok(())
308 }
309
310 pub async fn write_batch(
311 &mut self,
312 batch: &RecordBatch,
313 embeddings: &[Vec<f32>],
314 ) -> AilakeResult<()> {
315 self.write_batch_with_id(batch, embeddings, None).await
316 }
317
318 async fn write_batch_with_id(
319 &mut self,
320 batch: &RecordBatch,
321 embeddings: &[Vec<f32>],
322 batch_id: Option<String>,
323 ) -> AilakeResult<()> {
324 self.validate_embedding_dim(embeddings)?;
325 if self.captured_schema.is_none() {
326 self.captured_schema = Some(batch.schema());
327 }
328 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
329 let file_path = format!("data/part-{:05}.parquet", part_num);
330
331 let file_writer = AilakeFileWriter::new(self.policy.clone());
333 let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
334 let file_size = file_bytes.len() as u64;
335
336 self.store.put(&file_path, file_bytes.clone()).await?;
338
339 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
341
342 let reader = ailake_file::AilakeFileReader::new(
344 file_bytes,
345 &self.policy.column_name,
346 self.policy.dim,
347 );
348 let header = reader.read_header()?;
349 let ailk_start = reader.ailk_offset()?;
350 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
351 let hnsw_len = header.hnsw_len;
352
353 let mut entry = make_data_file_entry(
354 &file_path,
355 embeddings.len() as u64,
356 file_size,
357 ¢roid,
358 VectorIndexInfo {
359 column: &self.policy.column_name,
360 dim: self.policy.dim,
361 hnsw_offset: hnsw_abs_offset,
362 hnsw_len,
363 },
364 );
365 entry.batch_id = batch_id;
366 entry.embedding_model = self
367 .policy
368 .embedding_model
369 .as_ref()
370 .map(|m| m.to_property_value());
371 entry.partition_value =
372 apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
373 self.pending_files.push(entry);
374
375 if self.bm25_text_column.is_some() {
377 self.update_bm25_stats_from_batch(batch).await?;
378 self.build_bloom_for_file(batch, &file_path);
379 }
380 Ok(())
381 }
382
383 pub async fn write_batch_auto(
389 &mut self,
390 batch: &RecordBatch,
391 embeddings: &[Vec<f32>],
392 ) -> AilakeResult<()> {
393 let profile = ailake_index::HardwareProfile::detect();
394 if profile.recommend_ivf_pq(embeddings.len()) {
395 let mut ivf_config =
396 ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
397 if self.policy.ivf_residual {
398 ivf_config = ivf_config.with_residual();
399 }
400 self.write_batch_ivf_pq(batch, embeddings, ivf_config).await
401 } else {
402 self.write_batch(batch, embeddings).await
403 }
404 }
405
406 pub async fn write_batch_auto_deferred(
417 &mut self,
418 batch: &RecordBatch,
419 embeddings: &[Vec<f32>],
420 ) -> AilakeResult<()> {
421 let profile = ailake_index::HardwareProfile::detect();
422 if profile.recommend_ivf_pq(embeddings.len()) {
423 let mut ivf_config =
424 ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
425 if self.policy.ivf_residual {
426 ivf_config = ivf_config.with_residual();
427 }
428 self.write_batch_ivf_pq_deferred(batch, embeddings, ivf_config)
429 .await
430 } else {
431 self.write_batch_deferred(batch, embeddings).await
432 }
433 }
434
435 pub async fn write_batch_ivf_pq(
439 &mut self,
440 batch: &RecordBatch,
441 embeddings: &[Vec<f32>],
442 ivf_config: IvfPqConfig,
443 ) -> AilakeResult<()> {
444 if self.captured_schema.is_none() {
445 self.captured_schema = Some(batch.schema());
446 }
447 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
448 let file_path = format!("data/part-{:05}.parquet", part_num);
449
450 if self.cached_ivf_codebook.is_none() {
454 let codebook = tokio::task::spawn_blocking({
455 let embeddings = embeddings.to_vec();
456 let metric = self.policy.metric;
457 let config = ivf_config.clone();
458 move || ailake_index::IvfPqIndex::train_codebook(&embeddings, metric, &config)
459 })
460 .await
461 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
462 self.cached_ivf_codebook = Some(Arc::new(codebook));
463 }
464 let codebook = self
466 .cached_ivf_codebook
467 .as_ref()
468 .expect("IVF-PQ codebook must be Some after training block")
469 .clone();
470
471 let file_writer = AilakeFileWriter::new(self.policy.clone())
472 .with_index_type(IndexType::IvfPq(ivf_config))
473 .with_shared_ivf_codebook(codebook);
474 let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
475 let file_size = file_bytes.len() as u64;
476
477 self.store.put(&file_path, file_bytes.clone()).await?;
478
479 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
480
481 let reader = ailake_file::AilakeFileReader::new(
482 file_bytes,
483 &self.policy.column_name,
484 self.policy.dim,
485 );
486 let header = reader.read_header()?;
487 let ailk_start = reader.ailk_offset()?;
488 let index_abs_offset = ailk_start + header.hnsw_offset;
489 let index_len = header.hnsw_len;
490
491 let mut entry = make_data_file_entry(
492 &file_path,
493 embeddings.len() as u64,
494 file_size,
495 ¢roid,
496 VectorIndexInfo {
497 column: &self.policy.column_name,
498 dim: self.policy.dim,
499 hnsw_offset: index_abs_offset,
500 hnsw_len: index_len,
501 },
502 );
503 entry.embedding_model = self
504 .policy
505 .embedding_model
506 .as_ref()
507 .map(|m| m.to_property_value());
508 entry.partition_value =
509 apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
510 self.pending_files.push(entry);
511 Ok(())
512 }
513
514 pub async fn write_batch_multi(
519 &mut self,
520 batch: &RecordBatch,
521 columns: &[MultiVectorBatch<'_>],
522 ) -> AilakeResult<()> {
523 use ailake_core::AilakeError;
524 if self.captured_schema.is_none() {
525 self.captured_schema = Some(batch.schema());
526 }
527 if self.extra_vec_policies.is_empty() && columns.len() > 1 {
528 self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
529 }
530
531 if columns.is_empty() {
532 return Err(AilakeError::InvalidArgument(
533 "write_batch_multi requires at least one column".into(),
534 ));
535 }
536
537 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
538 let file_path = format!("data/part-{:05}.parquet", part_num);
539
540 let col_batches: Vec<VectorColumnBatch<'_>> = columns
541 .iter()
542 .map(|c| VectorColumnBatch {
543 policy: &c.policy,
544 embeddings: c.embeddings,
545 })
546 .collect();
547
548 let primary_policy = &columns[0].policy;
549 let file_writer = AilakeFileWriter::new(primary_policy.clone());
550 let file_bytes: Bytes = file_writer.write_multi(batch, &col_batches)?;
551 let file_size = file_bytes.len() as u64;
552
553 self.store.put(&file_path, file_bytes.clone()).await?;
554
555 let primary_centroid =
557 compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
558
559 let reader = ailake_file::AilakeFileReader::new(
561 file_bytes.clone(),
562 &primary_policy.column_name,
563 primary_policy.dim,
564 );
565 let primary_ailk_start = reader.ailk_offset()?;
566 let primary_header = {
567 use ailake_file::HEADER_SIZE;
568 let start = primary_ailk_start as usize;
569 let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
570 .try_into()
571 .map_err(|_| AilakeError::NotAnAilakeFile)?;
572 ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
573 };
574 let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
575
576 let mut extra: Vec<ExtraVectorIndex> = Vec::new();
578 for col in columns.iter().skip(1) {
579 let col_ailk_start = reader.ailk_offset_for_column(&col.policy.column_name)?;
580 let col_header = {
581 use ailake_file::HEADER_SIZE;
582 let start = col_ailk_start as usize;
583 let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
584 .try_into()
585 .map_err(|_| AilakeError::NotAnAilakeFile)?;
586 ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
587 };
588 let col_centroid = compute_centroid_and_radius(col.embeddings, col.policy.metric);
589 extra.push(ExtraVectorIndex {
590 column: col.policy.column_name.clone(),
591 dim: col.policy.dim,
592 hnsw_offset: col_ailk_start + col_header.hnsw_offset,
593 hnsw_len: col_header.hnsw_len,
594 centroid_b64: Some(encode_centroid_b64(&col_centroid)),
595 radius: Some(col_centroid.radius),
596 });
597 }
598
599 let mut entry = make_multi_column_data_file_entry(
600 &file_path,
601 columns[0].embeddings.len() as u64,
602 file_size,
603 &primary_centroid,
604 VectorIndexInfo {
605 column: &primary_policy.column_name,
606 dim: primary_policy.dim,
607 hnsw_offset: primary_hnsw_abs,
608 hnsw_len: primary_header.hnsw_len,
609 },
610 &extra,
611 );
612 entry.embedding_model = self
613 .policy
614 .embedding_model
615 .as_ref()
616 .map(|m| m.to_property_value());
617 entry.partition_value =
618 apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
619 self.pending_files.push(entry);
620 Ok(())
621 }
622
623 pub async fn write_batch_multi_deferred(
636 &mut self,
637 batch: &RecordBatch,
638 columns: &[MultiVectorBatch<'_>],
639 ) -> AilakeResult<()> {
640 use ailake_core::AilakeError;
641 if columns.is_empty() {
642 return Err(AilakeError::InvalidArgument(
643 "write_batch_multi_deferred requires at least one column".into(),
644 ));
645 }
646 if self.captured_schema.is_none() {
647 self.captured_schema = Some(batch.schema());
648 }
649 if self.extra_vec_policies.is_empty() && columns.len() > 1 {
650 self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
651 }
652
653 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
654 let file_path = format!("data/part-{:05}.parquet", part_num);
655
656 let primary_policy = &columns[0].policy;
658 let file_writer = AilakeFileWriter::new(primary_policy.clone());
659 let parquet_bytes = file_writer.write_parquet_only(batch, columns[0].embeddings)?;
660 let file_size = parquet_bytes.len() as u64;
661 self.store.put(&file_path, parquet_bytes).await?;
662
663 let primary_centroid =
665 compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
666 let mut entry = make_data_file_entry_indexing(
667 &file_path,
668 columns[0].embeddings.len() as u64,
669 file_size,
670 &primary_centroid,
671 &primary_policy.column_name,
672 primary_policy.dim,
673 );
674 entry.extra_vector_indexes = columns[1..]
677 .iter()
678 .map(|c| {
679 let col_centroid = compute_centroid_and_radius(c.embeddings, c.policy.metric);
680 ExtraVectorIndex {
681 column: c.policy.column_name.clone(),
682 dim: c.policy.dim,
683 hnsw_offset: 0,
684 hnsw_len: 0,
685 centroid_b64: Some(encode_centroid_b64(&col_centroid)),
686 radius: Some(col_centroid.radius),
687 }
688 })
689 .collect();
690 entry.embedding_model = self
691 .policy
692 .embedding_model
693 .as_ref()
694 .map(|m| m.to_property_value());
695 entry.partition_value =
696 apply_partition_transforms(&self.policy, self.policy.partition_value.as_deref());
697 self.pending_files.push(entry);
698
699 let all_policies: Vec<VectorStoragePolicy> =
701 columns.iter().map(|c| c.policy.clone()).collect();
702 let all_embeddings: Vec<Vec<Vec<f32>>> =
703 columns.iter().map(|c| c.embeddings.to_vec()).collect();
704 let store = self.store.clone();
705 let catalog = self.catalog.clone();
706 let table = self.table.clone();
707 let fp = file_path.clone();
708 tokio::spawn(async move {
709 if let Err(e) =
710 build_and_patch_multi_index(store, catalog, all_policies, table, fp, all_embeddings)
711 .await
712 {
713 error!(
714 "ailake: deferred multi-column HNSW build failed — shard stays in flat-scan \
715 mode until next compaction rebuilds the index: {}",
716 e
717 );
718 }
719 });
720
721 Ok(())
722 }
723
724 fn build_bloom_for_file(&mut self, batch: &RecordBatch, file_path: &str) {
733 use arrow_array::cast::AsArray;
734 let col_name = match &self.bm25_text_column {
735 Some(c) => c.clone(),
736 None => return,
737 };
738 let col = match batch.column_by_name(&col_name) {
739 Some(c) => c,
740 None => return,
741 };
742 let str_arr = match col.as_string_opt::<i32>() {
743 Some(a) => a,
744 None => return,
745 };
746 let cap = (batch.num_rows() * 10).max(128);
748 let mut bloom = crate::bloom::BloomFilter::with_capacity(cap, 0.01);
749 for i in 0..str_arr.len() {
750 if str_arr.is_valid(i) {
751 for term in crate::bm25::tokenize(str_arr.value(i)) {
752 bloom.insert(&term);
753 }
754 }
755 }
756 self.pending_blooms
757 .push((file_path.to_string(), bloom.to_bytes()));
758 }
759
760 async fn update_bm25_stats_from_batch(&self, batch: &RecordBatch) -> AilakeResult<()> {
766 use arrow_array::cast::AsArray;
767
768 let col_name = match &self.bm25_text_column {
769 Some(c) => c.as_str(),
770 None => return Ok(()),
771 };
772 let col = match batch.column_by_name(col_name) {
773 Some(c) => c,
774 None => {
775 tracing::warn!(
776 "ailake: BM25 text column '{}' not found in batch — skipping IDF update",
777 col_name
778 );
779 return Ok(());
780 }
781 };
782 let str_arr = match col.as_string_opt::<i32>() {
783 Some(a) => a,
784 None => {
785 tracing::warn!(
786 "ailake: BM25 text column '{}' is not a Utf8 column — skipping",
787 col_name
788 );
789 return Ok(());
790 }
791 };
792
793 let texts: Vec<&str> = (0..str_arr.len())
794 .filter(|&i| str_arr.is_valid(i))
795 .map(|i| str_arr.value(i))
796 .collect();
797
798 let stats_path = crate::bm25::BM25_STATS_FILE;
800 let mut stats: crate::bm25::IdfStats = match self.store.get(stats_path).await {
801 Ok(bytes) => crate::bm25::IdfStats::from_bytes(&bytes).unwrap_or_default(),
802 Err(_) => crate::bm25::IdfStats::default(),
803 };
804
805 stats.merge_batch(&texts);
806
807 let bytes = stats.to_bytes()?;
808 self.store
809 .put(stats_path, bytes::Bytes::from(bytes))
810 .await?;
811 Ok(())
812 }
813
814 pub async fn commit(mut self) -> AilakeResult<SnapshotId> {
815 if self.pending_files.is_empty() {
816 let current = self
817 .catalog
818 .load_table(&self.table)
819 .await
820 .ok()
821 .and_then(|m| m.current_snapshot_id)
822 .unwrap_or(0);
823 return Ok(current);
824 }
825 let iceberg_schema = self
826 .captured_schema
827 .as_deref()
828 .map(|s| arrow_schema_to_iceberg_update(s, &self.policy, &self.extra_vec_policies));
829 let mut extra_properties = std::collections::HashMap::new();
832 for ep in &self.extra_vec_policies {
833 extra_properties.insert(format!("ailake.dim-{}", ep.column_name), ep.dim.to_string());
834 extra_properties.insert(
835 format!("ailake.metric-{}", ep.column_name),
836 ailake_parquet::schema::metric_str(ep.metric).to_string(),
837 );
838 if let Some(modality) = ep.modality {
839 extra_properties.insert(
840 format!("ailake.modality-{}", ep.column_name),
841 modality.as_str().to_string(),
842 );
843 }
844 }
845 let snapshot = NewSnapshot {
846 snapshot_id: new_snapshot_id(),
847 parent_snapshot_id: self.parent_snapshot_id,
848 files: std::mem::take(&mut self.pending_files),
849 operation: SnapshotOperation::Append,
850 iceberg_schema,
851 extra_properties,
852 bloom_filters: std::mem::take(&mut self.pending_blooms),
853 equality_delete_files: vec![],
854 };
855 self.catalog.commit_snapshot(&self.table, snapshot).await
856 }
857
858 pub async fn create_or_open(
860 catalog: Arc<dyn CatalogProvider>,
861 store: Arc<dyn Store>,
862 policy: VectorStoragePolicy,
863 table: TableIdent,
864 format_version: u8,
865 ) -> AilakeResult<Self> {
866 let existing_file_count: u32;
869
870 match catalog.load_table(&table).await {
871 Ok(existing_meta) => {
872 if let Some(stored_dim_str) = existing_meta.properties.get("ailake.vector-dim") {
876 if let Ok(stored_dim) = stored_dim_str.parse::<u32>() {
877 if stored_dim != policy.dim {
878 let table_model = policy
879 .embedding_model
880 .as_ref()
881 .map(|m| m.to_property_value())
882 .unwrap_or_else(|| format!("dim={}", stored_dim));
883 return Err(AilakeError::ModelMismatch {
884 table_model,
885 table_dim: stored_dim,
886 batch_model: format!("dim={}", policy.dim),
887 batch_dim: policy.dim,
888 });
889 }
890 }
891 }
892 if let Some(incoming) = &policy.embedding_model {
896 if let Some(stored_val) = existing_meta
897 .properties
898 .get(EmbeddingModelInfo::property_key())
899 {
900 let stored = EmbeddingModelInfo::from_property_value(stored_val);
901 if stored.name != incoming.name {
902 warn!(
903 "ailake: embedding model name changed: table has '{}', writing with '{}' \
904 (dim={}). Vectors may be incompatible for similarity search.",
905 stored.name, incoming.name, policy.dim
906 );
907 }
908 }
909 }
910 existing_file_count = catalog
911 .list_files(&table, None)
912 .await
913 .unwrap_or_default()
914 .len() as u32;
915 }
916 Err(_) => {
917 catalog
918 .create_table(
919 &table,
920 &TableProperties {
921 partition_column_type: policy.partition_column_type.clone(),
922 policy: policy.clone(),
923 extra: std::collections::HashMap::new(),
924 format_version,
925 },
926 )
927 .await?;
928 existing_file_count = 0;
929 }
930 }
931 let mut writer = Self::new(catalog, store, policy, table);
932 writer.part_counter = Arc::new(AtomicU32::new(existing_file_count));
933 Ok(writer)
934 }
935}
936
937fn arrow_schema_to_iceberg_update(
944 schema: &arrow_schema::Schema,
945 policy: &VectorStoragePolicy,
946 extra_vec_policies: &[VectorStoragePolicy],
947) -> IcebergSchemaUpdate {
948 let bytes_per_dim = policy.precision.bytes_per_element() as u32;
949 let vec_fixed_len = policy.dim * bytes_per_dim;
950
951 let has_primary_in_batch = schema
953 .fields()
954 .iter()
955 .any(|f| f.name() == &policy.column_name);
956 let vec_cols: Vec<(String, u32)> = {
957 let mut v = Vec::new();
958 if !has_primary_in_batch {
959 v.push((policy.column_name.clone(), vec_fixed_len));
960 }
961 for ep in extra_vec_policies {
962 let ep_fixed_len = ep.dim * ep.precision.bytes_per_element() as u32;
963 if !schema.fields().iter().any(|f| f.name() == &ep.column_name) {
964 v.push((ep.column_name.clone(), ep_fixed_len));
965 }
966 }
967 v
968 };
969
970 let top_level_count = schema.fields().len() + vec_cols.len();
972 let mut nested_id = top_level_count as i32;
974
975 let mut fields: Vec<serde_json::Value> = Vec::new();
976 let mut name_mapping: Vec<serde_json::Value> = Vec::new();
977
978 for (idx, field) in schema.fields().iter().enumerate() {
979 let field_id = (idx + 1) as i32;
980 let iceberg_type = arrow_type_to_iceberg(field.data_type(), &mut nested_id);
981 fields.push(serde_json::json!({
982 "id": field_id,
983 "name": field.name(),
984 "required": false,
985 "type": iceberg_type,
986 }));
987 name_mapping.push(serde_json::json!({
988 "field-id": field_id,
989 "names": [field.name()],
990 }));
991 }
992
993 for (i, (col_name, fixed_len)) in vec_cols.iter().enumerate() {
995 let field_id = (schema.fields().len() + 1 + i) as i32;
996 fields.push(serde_json::json!({
997 "id": field_id,
998 "name": col_name,
999 "required": false,
1000 "type": format!("fixed[{fixed_len}]"),
1001 }));
1002 name_mapping.push(serde_json::json!({
1003 "field-id": field_id,
1004 "names": [col_name],
1005 }));
1006 }
1007
1008 let last_column_id = nested_id;
1009 let name_mapping_json = serde_json::to_string(&name_mapping).unwrap_or_else(|_| "[]".into());
1010
1011 IcebergSchemaUpdate {
1012 fields,
1013 last_column_id,
1014 name_mapping_json,
1015 }
1016}
1017
1018fn arrow_type_to_iceberg(dt: &arrow_schema::DataType, nested_id: &mut i32) -> serde_json::Value {
1023 use arrow_schema::DataType;
1024 match dt {
1025 DataType::Boolean => serde_json::json!("boolean"),
1026 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
1027 serde_json::json!("int")
1028 }
1029 DataType::Int64 | DataType::UInt32 | DataType::UInt64 => serde_json::json!("long"),
1030 DataType::Float16 | DataType::Float32 => serde_json::json!("float"),
1031 DataType::Float64 => serde_json::json!("double"),
1032 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => serde_json::json!("string"),
1033 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
1034 serde_json::json!("binary")
1035 }
1036 DataType::Date32 | DataType::Date64 => serde_json::json!("date"),
1037 DataType::Timestamp(_, Some(_)) => serde_json::json!("timestamptz"),
1039 DataType::Timestamp(_, None) => serde_json::json!("timestamp"),
1040 DataType::Time32(_) | DataType::Time64(_) => serde_json::json!("time"),
1041 DataType::FixedSizeBinary(n) => serde_json::json!(format!("fixed[{n}]")),
1042 DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
1043 serde_json::json!(format!("decimal({p}, {s})"))
1044 }
1045 DataType::List(inner)
1046 | DataType::LargeList(inner)
1047 | DataType::ListView(inner)
1048 | DataType::FixedSizeList(inner, _) => {
1049 *nested_id += 1;
1050 let element_id = *nested_id;
1051 let element_type = arrow_type_to_iceberg(inner.data_type(), nested_id);
1052 serde_json::json!({
1053 "type": "list",
1054 "element-id": element_id,
1055 "element": element_type,
1056 "element-required": !inner.is_nullable(),
1057 })
1058 }
1059 DataType::Struct(arrow_fields) => {
1060 let struct_fields: Vec<serde_json::Value> = arrow_fields
1061 .iter()
1062 .map(|f| {
1063 *nested_id += 1;
1064 let fid = *nested_id;
1065 let ftype = arrow_type_to_iceberg(f.data_type(), nested_id);
1066 serde_json::json!({
1067 "id": fid,
1068 "name": f.name(),
1069 "required": !f.is_nullable(),
1070 "type": ftype,
1071 })
1072 })
1073 .collect();
1074 serde_json::json!({ "type": "struct", "fields": struct_fields })
1075 }
1076 DataType::Map(entries, _) => {
1077 *nested_id += 1;
1079 let key_id = *nested_id;
1080 *nested_id += 1;
1081 let val_id = *nested_id;
1082 if let DataType::Struct(kv_fields) = entries.data_type() {
1083 let key_f = kv_fields
1084 .iter()
1085 .find(|f| f.name() == "key" || f.name() == "keys");
1086 let val_f = kv_fields
1087 .iter()
1088 .find(|f| f.name() == "value" || f.name() == "values");
1089 let key_type = key_f
1090 .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
1091 .unwrap_or(serde_json::json!("binary"));
1092 let val_type = val_f
1093 .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
1094 .unwrap_or(serde_json::json!("binary"));
1095 let val_required = val_f.map(|f| !f.is_nullable()).unwrap_or(false);
1096 serde_json::json!({
1097 "type": "map",
1098 "key-id": key_id,
1099 "key": key_type,
1100 "value-id": val_id,
1101 "value": val_type,
1102 "value-required": val_required,
1103 })
1104 } else {
1105 serde_json::json!("binary")
1106 }
1107 }
1108 _ => serde_json::json!("binary"),
1109 }
1110}
1111
1112pub(crate) async fn build_and_patch_index(
1114 store: Arc<dyn Store>,
1115 catalog: Arc<dyn CatalogProvider>,
1116 policy: VectorStoragePolicy,
1117 table: TableIdent,
1118 file_path: String,
1119) -> AilakeResult<()> {
1120 let parquet_bytes = store.get(&file_path).await?;
1122 let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
1123 let (batch, embeddings) = reader.read_parquet()?;
1124
1125 let full_bytes = tokio::task::spawn_blocking({
1128 let policy = policy.clone();
1129 move || {
1130 let file_writer = AilakeFileWriter::new(policy);
1131 file_writer.write(&batch, &embeddings)
1132 }
1133 })
1134 .await
1135 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1136
1137 let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
1139 let header = full_reader.read_header()?;
1140 let ailk_start = full_reader.ailk_offset()?;
1141 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
1142 let hnsw_len = header.hnsw_len;
1143
1144 store.put(&file_path, full_bytes).await?;
1146
1147 let mut committed = false;
1150 for _ in 0..120u32 {
1151 match catalog.load_table(&table).await {
1152 Ok(meta) if meta.current_snapshot_id.is_some() => {
1153 committed = true;
1154 break;
1155 }
1156 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1157 }
1158 }
1159 if !committed {
1160 return Err(ailake_core::AilakeError::Store(format!(
1161 "deferred HNSW build: no snapshot committed for {file_path} after 60 s — \
1162 did you call TableWriter::commit()?"
1163 )));
1164 }
1165
1166 for attempt in 0..50u32 {
1171 let table_meta = catalog.load_table(&table).await?;
1172 let parent_snapshot_id = table_meta.current_snapshot_id;
1173 let mut files = catalog.list_files(&table, None).await?;
1174
1175 if files
1177 .iter()
1178 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1179 {
1180 break;
1181 }
1182
1183 for f in &mut files {
1184 if f.path == file_path {
1185 f.hnsw_offset = Some(hnsw_abs_offset);
1186 f.hnsw_len = Some(hnsw_len);
1187 f.index_status = IndexStatus::Ready;
1188 break;
1189 }
1190 }
1191 catalog
1192 .commit_snapshot(
1193 &table,
1194 NewSnapshot {
1195 snapshot_id: new_snapshot_id(),
1196 parent_snapshot_id,
1197 files,
1198 operation: SnapshotOperation::Replace,
1199 iceberg_schema: None,
1200 extra_properties: std::collections::HashMap::new(),
1201 bloom_filters: vec![],
1202 equality_delete_files: vec![],
1203 },
1204 )
1205 .await?;
1206
1207 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1209
1210 let verify = catalog.list_files(&table, None).await?;
1211 if verify
1212 .iter()
1213 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1214 {
1215 break;
1216 }
1217 }
1219
1220 info!(
1221 "ailake: deferred HNSW index built for {} (offset={}, len={})",
1222 file_path, hnsw_abs_offset, hnsw_len
1223 );
1224 Ok(())
1225}
1226
1227async fn build_ivf_pq_and_patch_index(
1232 store: Arc<dyn Store>,
1233 catalog: Arc<dyn CatalogProvider>,
1234 policy: VectorStoragePolicy,
1235 table: TableIdent,
1236 file_path: String,
1237 ivf_config: IvfPqConfig,
1238 codebook_cell: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
1239) -> AilakeResult<()> {
1240 let parquet_bytes = store.get(&file_path).await?;
1241 let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
1242 let (batch, embeddings) = reader.read_parquet()?;
1243
1244 let codebook = codebook_cell
1246 .get_or_try_init(|| async {
1247 let vecs = embeddings.clone();
1248 let metric = policy.metric;
1249 let cfg = ivf_config.clone();
1250 tokio::task::spawn_blocking(move || {
1251 ailake_index::IvfPqIndex::train_codebook(&vecs, metric, &cfg)
1252 })
1253 .await
1254 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))?
1255 })
1256 .await?;
1257
1258 let full_bytes = tokio::task::spawn_blocking({
1259 let policy = policy.clone();
1260 let codebook = codebook.clone();
1261 move || {
1262 let file_writer = AilakeFileWriter::new(policy)
1263 .with_index_type(IndexType::IvfPq(ivf_config))
1264 .with_shared_ivf_codebook(Arc::new(codebook));
1265 file_writer.write(&batch, &embeddings)
1266 }
1267 })
1268 .await
1269 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1270
1271 let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
1272 let header = full_reader.read_header()?;
1273 let ailk_start = full_reader.ailk_offset()?;
1274 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
1275 let hnsw_len = header.hnsw_len;
1276
1277 store.put(&file_path, full_bytes).await?;
1278
1279 let mut committed = false;
1281 for _ in 0..120u32 {
1282 match catalog.load_table(&table).await {
1283 Ok(meta) if meta.current_snapshot_id.is_some() => {
1284 committed = true;
1285 break;
1286 }
1287 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1288 }
1289 }
1290 if !committed {
1291 return Err(ailake_core::AilakeError::Store(format!(
1292 "deferred IVF-PQ build: no snapshot committed for {file_path} after 60 s — \
1293 did you call TableWriter::commit()?"
1294 )));
1295 }
1296
1297 for attempt in 0..50u32 {
1298 let table_meta = catalog.load_table(&table).await?;
1299 let parent_snapshot_id = table_meta.current_snapshot_id;
1300 let mut files = catalog.list_files(&table, None).await?;
1301
1302 if files
1303 .iter()
1304 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1305 {
1306 break;
1307 }
1308
1309 for f in &mut files {
1310 if f.path == file_path {
1311 f.hnsw_offset = Some(hnsw_abs_offset);
1312 f.hnsw_len = Some(hnsw_len);
1313 f.index_status = IndexStatus::Ready;
1314 break;
1315 }
1316 }
1317 catalog
1318 .commit_snapshot(
1319 &table,
1320 NewSnapshot {
1321 snapshot_id: new_snapshot_id(),
1322 parent_snapshot_id,
1323 files,
1324 operation: SnapshotOperation::Replace,
1325 iceberg_schema: None,
1326 extra_properties: std::collections::HashMap::new(),
1327 bloom_filters: vec![],
1328 equality_delete_files: vec![],
1329 },
1330 )
1331 .await?;
1332
1333 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1334
1335 let verify = catalog.list_files(&table, None).await?;
1336 if verify
1337 .iter()
1338 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1339 {
1340 break;
1341 }
1342 }
1343
1344 info!(
1345 "ailake: deferred IVF-PQ index built for {} (offset={}, len={})",
1346 file_path, hnsw_abs_offset, hnsw_len
1347 );
1348 Ok(())
1349}
1350
1351async fn build_and_patch_multi_index(
1357 store: Arc<dyn Store>,
1358 catalog: Arc<dyn CatalogProvider>,
1359 policies: Vec<VectorStoragePolicy>,
1360 table: TableIdent,
1361 file_path: String,
1362 all_embeddings: Vec<Vec<Vec<f32>>>,
1363) -> AilakeResult<()> {
1364 let parquet_bytes = store.get(&file_path).await?;
1366 let primary_reader =
1367 AilakeFileReader::new(parquet_bytes, &policies[0].column_name, policies[0].dim);
1368 let (batch, _) = primary_reader.read_parquet()?;
1369
1370 let full_bytes = tokio::task::spawn_blocking({
1372 let policies = policies.clone();
1373 let all_embeddings = all_embeddings.clone();
1374 move || {
1375 let col_batches: Vec<VectorColumnBatch<'_>> = policies
1376 .iter()
1377 .zip(all_embeddings.iter())
1378 .map(|(p, embs)| VectorColumnBatch {
1379 policy: p,
1380 embeddings: embs.as_slice(),
1381 })
1382 .collect();
1383 let file_writer = AilakeFileWriter::new(policies[0].clone());
1384 file_writer.write_multi(&batch, &col_batches)
1385 }
1386 })
1387 .await
1388 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1389
1390 let primary_reader = AilakeFileReader::new(
1392 full_bytes.clone(),
1393 &policies[0].column_name,
1394 policies[0].dim,
1395 );
1396 let primary_header = primary_reader.read_header()?;
1397 let primary_ailk_start = primary_reader.ailk_offset()?;
1398 let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
1399 let primary_hnsw_len = primary_header.hnsw_len;
1400
1401 let mut extra_offsets: Vec<(u64, u64)> = Vec::with_capacity(policies.len().saturating_sub(1));
1406 for col_policy in policies.iter().skip(1) {
1407 let col_reader =
1408 AilakeFileReader::new(full_bytes.clone(), &col_policy.column_name, col_policy.dim);
1409 let col_ailk_start = col_reader.ailk_offset_for_column(&col_policy.column_name)?;
1410 let col_header = col_reader.read_header_for_column(&col_policy.column_name)?;
1411 extra_offsets.push((col_ailk_start + col_header.hnsw_offset, col_header.hnsw_len));
1412 }
1413
1414 store.put(&file_path, full_bytes).await?;
1416
1417 let mut committed = false;
1419 for _ in 0..120u32 {
1420 match catalog.load_table(&table).await {
1421 Ok(meta) if meta.current_snapshot_id.is_some() => {
1422 committed = true;
1423 break;
1424 }
1425 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1426 }
1427 }
1428 if !committed {
1429 return Err(ailake_core::AilakeError::Store(format!(
1430 "deferred index build: no snapshot committed for {file_path} after 60 s — \
1431 did you call TableWriter::commit()?"
1432 )));
1433 }
1434
1435 for attempt in 0..50u32 {
1437 let table_meta = catalog.load_table(&table).await?;
1438 let parent_snapshot_id = table_meta.current_snapshot_id;
1439 let mut files = catalog.list_files(&table, None).await?;
1440
1441 if files
1442 .iter()
1443 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1444 {
1445 break;
1446 }
1447
1448 for f in &mut files {
1449 if f.path == file_path {
1450 f.hnsw_offset = Some(primary_hnsw_abs);
1451 f.hnsw_len = Some(primary_hnsw_len);
1452 f.index_status = IndexStatus::Ready;
1453 for (i, &(off, len)) in extra_offsets.iter().enumerate() {
1454 if let Some(xi) = f.extra_vector_indexes.get_mut(i) {
1455 xi.hnsw_offset = off;
1456 xi.hnsw_len = len;
1457 }
1458 }
1459 break;
1460 }
1461 }
1462 catalog
1463 .commit_snapshot(
1464 &table,
1465 NewSnapshot {
1466 snapshot_id: new_snapshot_id(),
1467 parent_snapshot_id,
1468 files,
1469 operation: SnapshotOperation::Replace,
1470 iceberg_schema: None,
1471 extra_properties: std::collections::HashMap::new(),
1472 bloom_filters: vec![],
1473 equality_delete_files: vec![],
1474 },
1475 )
1476 .await?;
1477
1478 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1479
1480 let verify = catalog.list_files(&table, None).await?;
1481 if verify
1482 .iter()
1483 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1484 {
1485 break;
1486 }
1487 }
1488
1489 info!(
1490 "ailake: deferred multi-column HNSW built for {} ({} cols, primary offset={})",
1491 file_path,
1492 policies.len(),
1493 primary_hnsw_abs
1494 );
1495 Ok(())
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500 use super::*;
1501 use ailake_core::{VectorMetric, VectorPrecision};
1502 use arrow_schema::{DataType, Field, Schema, TimeUnit};
1503
1504 fn policy(col: &str, dim: u32) -> VectorStoragePolicy {
1505 VectorStoragePolicy {
1506 column_name: col.to_string(),
1507 dim,
1508 metric: VectorMetric::Cosine,
1509 precision: VectorPrecision::F16,
1510 pq: None,
1511 keep_raw_for_reranking: true,
1512 pre_normalize: false,
1513 hnsw_m: None,
1514 hnsw_ef_construction: None,
1515 ivf_residual: false,
1516 embedding_model: None,
1517 modality: None,
1518 partition_by: None,
1519 partition_value: None,
1520 partition_column_type: None,
1521 partition_fields: vec![],
1522 }
1523 }
1524
1525 fn update_for(schema: &Schema, pol: &VectorStoragePolicy) -> IcebergSchemaUpdate {
1526 arrow_schema_to_iceberg_update(schema, pol, &[])
1527 }
1528
1529 #[test]
1530 fn simple_schema_produces_correct_fields() {
1531 let schema = Schema::new(vec![
1532 Field::new("id", DataType::Int32, false),
1533 Field::new("text", DataType::Utf8, false),
1534 ]);
1535 let pol = policy("embedding", 8);
1536 let upd = update_for(&schema, &pol);
1537
1538 assert_eq!(upd.fields.len(), 3);
1539 assert_eq!(upd.fields[0]["id"], 1);
1540 assert_eq!(upd.fields[0]["type"], "int");
1541 assert_eq!(upd.fields[1]["id"], 2);
1542 assert_eq!(upd.fields[1]["type"], "string");
1543 assert_eq!(upd.fields[2]["id"], 3);
1544 assert_eq!(upd.fields[2]["type"], "fixed[16]"); let nm: Vec<serde_json::Value> = serde_json::from_str(&upd.name_mapping_json).unwrap();
1547 assert_eq!(nm.len(), 3);
1548 assert_eq!(nm[2]["field-id"], 3);
1549 assert_eq!(nm[2]["names"][0], "embedding");
1550 assert_eq!(upd.last_column_id, 3);
1551 }
1552
1553 #[test]
1554 fn timestamp_without_tz_maps_to_timestamp_not_timestamptz() {
1555 let schema = Schema::new(vec![
1556 Field::new(
1557 "created_at",
1558 DataType::Timestamp(TimeUnit::Microsecond, None),
1559 true,
1560 ),
1561 Field::new(
1562 "updated_at",
1563 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1564 true,
1565 ),
1566 ]);
1567 let pol = policy("vec", 4);
1568 let upd = update_for(&schema, &pol);
1569
1570 assert_eq!(upd.fields[0]["type"], "timestamp");
1571 assert_eq!(upd.fields[1]["type"], "timestamptz");
1572 }
1573
1574 #[test]
1575 fn list_type_produces_iceberg_list_object() {
1576 let schema = Schema::new(vec![Field::new(
1577 "tags",
1578 DataType::List(std::sync::Arc::new(Field::new(
1579 "item",
1580 DataType::Utf8,
1581 true,
1582 ))),
1583 true,
1584 )]);
1585 let pol = policy("vec", 4);
1586 let upd = update_for(&schema, &pol);
1587
1588 let t = &upd.fields[0]["type"];
1589 assert_eq!(t["type"], "list");
1590 assert_eq!(t["element"], "string");
1591 assert!(t["element-id"].as_i64().unwrap() > 2);
1593 }
1594
1595 #[test]
1596 fn struct_type_produces_nested_fields() {
1597 let schema = Schema::new(vec![Field::new(
1598 "meta",
1599 DataType::Struct(
1600 vec![
1601 Field::new("key", DataType::Utf8, false),
1602 Field::new("val", DataType::Int64, false),
1603 ]
1604 .into(),
1605 ),
1606 true,
1607 )]);
1608 let pol = policy("vec", 4);
1609 let upd = update_for(&schema, &pol);
1610
1611 let t = &upd.fields[0]["type"];
1612 assert_eq!(t["type"], "struct");
1613 let nested = t["fields"].as_array().unwrap();
1614 assert_eq!(nested.len(), 2);
1615 assert_eq!(nested[0]["name"], "key");
1616 assert_eq!(nested[0]["type"], "string");
1617 assert_eq!(nested[1]["name"], "val");
1618 assert_eq!(nested[1]["type"], "long");
1619 assert!(nested[0]["id"].as_i64().unwrap() > 2);
1621 }
1622
1623 #[test]
1624 fn no_duplicate_vec_column_when_already_in_batch() {
1625 let schema = Schema::new(vec![
1627 Field::new("id", DataType::Int32, false),
1628 Field::new("embedding", DataType::FixedSizeBinary(16), false),
1629 ]);
1630 let pol = policy("embedding", 8);
1631 let upd = update_for(&schema, &pol);
1632
1633 assert_eq!(upd.fields.len(), 2, "should not add embedding twice");
1634 let names: Vec<&str> = upd
1635 .fields
1636 .iter()
1637 .map(|f| f["name"].as_str().unwrap())
1638 .collect();
1639 assert_eq!(names.iter().filter(|&&n| n == "embedding").count(), 1);
1640 }
1641
1642 #[test]
1643 fn multi_vec_policies_all_appended() {
1644 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1645 let primary = policy("embedding", 4);
1646 let extra = vec![policy("context_embedding", 4)];
1647 let upd = arrow_schema_to_iceberg_update(&schema, &primary, &extra);
1648
1649 assert_eq!(upd.fields.len(), 3); let names: Vec<&str> = upd
1651 .fields
1652 .iter()
1653 .map(|f| f["name"].as_str().unwrap())
1654 .collect();
1655 assert!(names.contains(&"embedding"));
1656 assert!(names.contains(&"context_embedding"));
1657 }
1658
1659 #[test]
1660 fn top_level_field_ids_match_parquet_stamp_sequence() {
1661 let schema = Schema::new(vec![
1663 Field::new("id", DataType::Int64, false),
1664 Field::new(
1665 "tags",
1666 DataType::List(std::sync::Arc::new(Field::new(
1667 "item",
1668 DataType::Utf8,
1669 true,
1670 ))),
1671 true,
1672 ),
1673 ]);
1674 let pol = policy("vec", 4);
1675 let upd = update_for(&schema, &pol);
1676
1677 assert_eq!(upd.fields[0]["id"], 1);
1679 assert_eq!(upd.fields[1]["id"], 2);
1680 assert_eq!(upd.fields[2]["id"], 3);
1681
1682 assert!(upd.fields[1]["type"]["element-id"].as_i64().unwrap() > 3);
1684 }
1685
1686 #[tokio::test]
1689 async fn write_batch_auto_deferred_stages_file() {
1690 use ailake_catalog::{HadoopCatalog, TableIdent};
1691 use ailake_store::LocalStore;
1692 use arrow_schema::{DataType, Field, Schema};
1693
1694 let dir = tempfile::tempdir().unwrap();
1695 let store: std::sync::Arc<dyn ailake_store::Store> =
1696 std::sync::Arc::new(LocalStore::new(dir.path().to_str().unwrap()));
1697 let catalog = std::sync::Arc::new(HadoopCatalog::new(std::sync::Arc::clone(&store), ""));
1698 let pol = policy("embedding", 4);
1699 let ident = TableIdent::new("default", "t");
1700
1701 let mut writer = TableWriter::create_or_open(catalog, store, pol, ident, 2)
1702 .await
1703 .unwrap();
1704
1705 let schema =
1706 std::sync::Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
1707 let batch = arrow_array::RecordBatch::try_new(
1708 schema,
1709 vec![std::sync::Arc::new(arrow_array::StringArray::from(vec![
1710 "hello",
1711 ]))],
1712 )
1713 .unwrap();
1714 let embeddings = vec![vec![1.0f32, 0.0, 0.0, 0.0]];
1715
1716 writer
1717 .write_batch_auto_deferred(&batch, &embeddings)
1718 .await
1719 .unwrap();
1720
1721 assert_eq!(writer.pending_files.len(), 1);
1723 }
1724
1725 #[tokio::test]
1728 async fn write_batch_multi_deferred_stages_file_with_extra_indexes() {
1729 use ailake_catalog::{HadoopCatalog, IndexStatus, TableIdent};
1730 use ailake_store::LocalStore;
1731 use arrow_schema::{DataType, Field, Schema};
1732
1733 let dir = tempfile::tempdir().unwrap();
1734 let store: std::sync::Arc<dyn ailake_store::Store> =
1735 std::sync::Arc::new(LocalStore::new(dir.path().to_str().unwrap()));
1736 let catalog = std::sync::Arc::new(HadoopCatalog::new(std::sync::Arc::clone(&store), ""));
1737 let primary_pol = policy("embedding", 4);
1738 let ident = TableIdent::new("default", "t");
1739
1740 let mut writer = TableWriter::create_or_open(catalog, store, primary_pol, ident, 2)
1741 .await
1742 .unwrap();
1743
1744 let schema =
1745 std::sync::Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
1746 let batch = arrow_array::RecordBatch::try_new(
1747 schema,
1748 vec![std::sync::Arc::new(arrow_array::StringArray::from(vec![
1749 "hello", "world",
1750 ]))],
1751 )
1752 .unwrap();
1753
1754 let text_embs = vec![vec![1.0f32, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
1755 let img_embs = vec![vec![1.0f32, 0.0], vec![0.0, 1.0]];
1756
1757 let columns = vec![
1758 MultiVectorBatch {
1759 policy: policy("embedding", 4),
1760 embeddings: &text_embs,
1761 },
1762 MultiVectorBatch {
1763 policy: policy("img_embedding", 2),
1764 embeddings: &img_embs,
1765 },
1766 ];
1767
1768 writer
1769 .write_batch_multi_deferred(&batch, &columns)
1770 .await
1771 .unwrap();
1772
1773 assert_eq!(writer.pending_files.len(), 1);
1774 let entry = &writer.pending_files[0];
1775 assert_eq!(entry.index_status, IndexStatus::Indexing);
1777 assert!(entry.centroid_b64.is_some());
1779 assert_eq!(entry.extra_vector_indexes.len(), 1);
1781 let xi = &entry.extra_vector_indexes[0];
1782 assert_eq!(xi.column, "img_embedding");
1783 assert_eq!(xi.dim, 2);
1784 assert_eq!(xi.hnsw_offset, 0); assert_eq!(xi.hnsw_len, 0); assert!(xi.centroid_b64.is_some());
1787 }
1788}