1use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4
5use ailake_catalog::{
6 encode_centroid_b64, make_data_file_entry, make_data_file_entry_indexing,
7 make_multi_column_data_file_entry, new_snapshot_id, CatalogProvider, DataFileEntry,
8 ExtraVectorIndex, IcebergSchemaUpdate, IndexStatus, NewSnapshot, SnapshotId, SnapshotOperation,
9 TableIdent, TableProperties, VectorIndexInfo,
10};
11use ailake_core::{AilakeError, AilakeResult, EmbeddingModelInfo, VectorStoragePolicy};
12use ailake_file::{AilakeFileReader, AilakeFileWriter, IndexType, VectorColumnBatch};
13use ailake_index::{IvfPqCodebook, IvfPqConfig};
14use ailake_store::Store;
15use ailake_vec::compute_centroid_and_radius;
16use arrow_array::RecordBatch;
17use arrow_schema::SchemaRef;
18use bytes::Bytes;
19use serde_json;
20use tracing::{error, info, warn};
21
22pub struct MultiVectorBatch<'a> {
24 pub policy: VectorStoragePolicy,
25 pub embeddings: &'a [Vec<f32>],
26}
27
28pub struct TableWriter {
29 catalog: Arc<dyn CatalogProvider>,
30 store: Arc<dyn Store>,
31 policy: VectorStoragePolicy,
32 table: TableIdent,
33 part_counter: Arc<AtomicU32>,
34 pending_files: Vec<DataFileEntry>,
35 parent_snapshot_id: Option<SnapshotId>,
36 captured_schema: Option<SchemaRef>,
39 extra_vec_policies: Vec<VectorStoragePolicy>,
41 cached_ivf_codebook: Option<Arc<IvfPqCodebook>>,
44 deferred_ivf_codebook: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
47}
48
49impl TableWriter {
50 pub fn new(
51 catalog: Arc<dyn CatalogProvider>,
52 store: Arc<dyn Store>,
53 policy: VectorStoragePolicy,
54 table: TableIdent,
55 ) -> Self {
56 Self {
57 catalog,
58 store,
59 policy,
60 table,
61 part_counter: Arc::new(AtomicU32::new(0)),
62 pending_files: Vec::new(),
63 parent_snapshot_id: None,
64 captured_schema: None,
65 extra_vec_policies: Vec::new(),
66 cached_ivf_codebook: None,
67 deferred_ivf_codebook: Arc::new(tokio::sync::OnceCell::new()),
68 }
69 }
70
71 pub fn with_parent_snapshot(mut self, id: SnapshotId) -> Self {
72 self.parent_snapshot_id = Some(id);
73 self
74 }
75
76 pub async fn write_batch_deferred(
86 &mut self,
87 batch: &RecordBatch,
88 embeddings: &[Vec<f32>],
89 ) -> AilakeResult<()> {
90 self.validate_embedding_dim(embeddings)?;
91 if self.captured_schema.is_none() {
92 self.captured_schema = Some(batch.schema());
93 }
94 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
95 let file_path = format!("data/part-{:05}.parquet", part_num);
96
97 let file_writer = AilakeFileWriter::new(self.policy.clone());
99 let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
100 let file_size = parquet_bytes.len() as u64;
101 self.store.put(&file_path, parquet_bytes).await?;
102
103 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
105 let mut entry = make_data_file_entry_indexing(
106 &file_path,
107 embeddings.len() as u64,
108 file_size,
109 ¢roid,
110 &self.policy.column_name,
111 self.policy.dim,
112 );
113 entry.embedding_model = self
114 .policy
115 .embedding_model
116 .as_ref()
117 .map(|m| m.to_property_value());
118 self.pending_files.push(entry);
119
120 let store = self.store.clone();
122 let catalog = self.catalog.clone();
123 let policy = self.policy.clone();
124 let table = self.table.clone();
125 let fp = file_path.clone();
126 tokio::spawn(async move {
127 if let Err(e) = build_and_patch_index(store, catalog, policy, table, fp).await {
128 error!(
129 "ailake: deferred HNSW build failed — file is indexed as Parquet-only until \
130 next compaction rebuilds the index: {}",
131 e
132 );
133 }
134 });
135
136 Ok(())
137 }
138
139 pub async fn write_batch_ivf_pq_deferred(
145 &mut self,
146 batch: &RecordBatch,
147 embeddings: &[Vec<f32>],
148 ivf_config: IvfPqConfig,
149 ) -> AilakeResult<()> {
150 if self.captured_schema.is_none() {
151 self.captured_schema = Some(batch.schema());
152 }
153 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
154 let file_path = format!("data/part-{:05}.parquet", part_num);
155
156 let file_writer = AilakeFileWriter::new(self.policy.clone());
157 let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
158 let file_size = parquet_bytes.len() as u64;
159 self.store.put(&file_path, parquet_bytes).await?;
160
161 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
162 let mut entry = make_data_file_entry_indexing(
163 &file_path,
164 embeddings.len() as u64,
165 file_size,
166 ¢roid,
167 &self.policy.column_name,
168 self.policy.dim,
169 );
170 entry.embedding_model = self
171 .policy
172 .embedding_model
173 .as_ref()
174 .map(|m| m.to_property_value());
175 self.pending_files.push(entry);
176
177 let store = self.store.clone();
178 let catalog = self.catalog.clone();
179 let policy = self.policy.clone();
180 let table = self.table.clone();
181 let fp = file_path.clone();
182 let codebook_cell = self.deferred_ivf_codebook.clone();
183 tokio::spawn(async move {
184 if let Err(e) = build_ivf_pq_and_patch_index(
185 store,
186 catalog,
187 policy,
188 table,
189 fp,
190 ivf_config,
191 codebook_cell,
192 )
193 .await
194 {
195 error!(
196 "ailake: deferred IVF-PQ build failed — file is indexed as Parquet-only until \
197 next compaction rebuilds the index: {}",
198 e
199 );
200 }
201 });
202
203 Ok(())
204 }
205
206 pub async fn write_batch_idempotent(
215 &mut self,
216 batch: &RecordBatch,
217 embeddings: &[Vec<f32>],
218 batch_id: &str,
219 ) -> AilakeResult<()> {
220 let existing = self.catalog.list_files(&self.table, None).await?;
221 if existing
222 .iter()
223 .any(|f| f.batch_id.as_deref() == Some(batch_id))
224 {
225 return Ok(());
226 }
227 self.write_batch_with_id(batch, embeddings, Some(batch_id.to_string()))
228 .await
229 }
230
231 fn validate_embedding_dim(&self, embeddings: &[Vec<f32>]) -> AilakeResult<()> {
236 if let Some(first) = embeddings.first() {
237 let actual = first.len() as u32;
238 if actual != self.policy.dim {
239 let table_model = self
240 .policy
241 .embedding_model
242 .as_ref()
243 .map(|m| m.to_property_value())
244 .unwrap_or_else(|| format!("dim={}", self.policy.dim));
245 return Err(AilakeError::ModelMismatch {
246 table_model,
247 table_dim: self.policy.dim,
248 batch_model: format!("dim={}", actual),
249 batch_dim: actual,
250 });
251 }
252 }
253 Ok(())
254 }
255
256 pub async fn write_batch(
257 &mut self,
258 batch: &RecordBatch,
259 embeddings: &[Vec<f32>],
260 ) -> AilakeResult<()> {
261 self.write_batch_with_id(batch, embeddings, None).await
262 }
263
264 async fn write_batch_with_id(
265 &mut self,
266 batch: &RecordBatch,
267 embeddings: &[Vec<f32>],
268 batch_id: Option<String>,
269 ) -> AilakeResult<()> {
270 self.validate_embedding_dim(embeddings)?;
271 if self.captured_schema.is_none() {
272 self.captured_schema = Some(batch.schema());
273 }
274 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
275 let file_path = format!("data/part-{:05}.parquet", part_num);
276
277 let file_writer = AilakeFileWriter::new(self.policy.clone());
279 let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
280 let file_size = file_bytes.len() as u64;
281
282 self.store.put(&file_path, file_bytes.clone()).await?;
284
285 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
287
288 let reader = ailake_file::AilakeFileReader::new(
290 file_bytes,
291 &self.policy.column_name,
292 self.policy.dim,
293 );
294 let header = reader.read_header()?;
295 let ailk_start = reader.ailk_offset()?;
296 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
297 let hnsw_len = header.hnsw_len;
298
299 let mut entry = make_data_file_entry(
300 &file_path,
301 embeddings.len() as u64,
302 file_size,
303 ¢roid,
304 VectorIndexInfo {
305 column: &self.policy.column_name,
306 dim: self.policy.dim,
307 hnsw_offset: hnsw_abs_offset,
308 hnsw_len,
309 },
310 );
311 entry.batch_id = batch_id;
312 entry.embedding_model = self
313 .policy
314 .embedding_model
315 .as_ref()
316 .map(|m| m.to_property_value());
317 self.pending_files.push(entry);
318 Ok(())
319 }
320
321 pub async fn write_batch_auto(
327 &mut self,
328 batch: &RecordBatch,
329 embeddings: &[Vec<f32>],
330 ) -> AilakeResult<()> {
331 let profile = ailake_index::HardwareProfile::detect();
332 if profile.recommend_ivf_pq(embeddings.len()) {
333 let mut ivf_config =
334 ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
335 if self.policy.ivf_residual {
336 ivf_config = ivf_config.with_residual();
337 }
338 self.write_batch_ivf_pq(batch, embeddings, ivf_config).await
339 } else {
340 self.write_batch(batch, embeddings).await
341 }
342 }
343
344 pub async fn write_batch_auto_deferred(
355 &mut self,
356 batch: &RecordBatch,
357 embeddings: &[Vec<f32>],
358 ) -> AilakeResult<()> {
359 let profile = ailake_index::HardwareProfile::detect();
360 if profile.recommend_ivf_pq(embeddings.len()) {
361 let mut ivf_config =
362 ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
363 if self.policy.ivf_residual {
364 ivf_config = ivf_config.with_residual();
365 }
366 self.write_batch_ivf_pq_deferred(batch, embeddings, ivf_config)
367 .await
368 } else {
369 self.write_batch_deferred(batch, embeddings).await
370 }
371 }
372
373 pub async fn write_batch_ivf_pq(
377 &mut self,
378 batch: &RecordBatch,
379 embeddings: &[Vec<f32>],
380 ivf_config: IvfPqConfig,
381 ) -> AilakeResult<()> {
382 if self.captured_schema.is_none() {
383 self.captured_schema = Some(batch.schema());
384 }
385 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
386 let file_path = format!("data/part-{:05}.parquet", part_num);
387
388 if self.cached_ivf_codebook.is_none() {
392 let codebook = tokio::task::spawn_blocking({
393 let embeddings = embeddings.to_vec();
394 let metric = self.policy.metric;
395 let config = ivf_config.clone();
396 move || ailake_index::IvfPqIndex::train_codebook(&embeddings, metric, &config)
397 })
398 .await
399 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
400 self.cached_ivf_codebook = Some(Arc::new(codebook));
401 }
402 let codebook = self
404 .cached_ivf_codebook
405 .as_ref()
406 .expect("IVF-PQ codebook must be Some after training block")
407 .clone();
408
409 let file_writer = AilakeFileWriter::new(self.policy.clone())
410 .with_index_type(IndexType::IvfPq(ivf_config))
411 .with_shared_ivf_codebook(codebook);
412 let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
413 let file_size = file_bytes.len() as u64;
414
415 self.store.put(&file_path, file_bytes.clone()).await?;
416
417 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
418
419 let reader = ailake_file::AilakeFileReader::new(
420 file_bytes,
421 &self.policy.column_name,
422 self.policy.dim,
423 );
424 let header = reader.read_header()?;
425 let ailk_start = reader.ailk_offset()?;
426 let index_abs_offset = ailk_start + header.hnsw_offset;
427 let index_len = header.hnsw_len;
428
429 let mut entry = make_data_file_entry(
430 &file_path,
431 embeddings.len() as u64,
432 file_size,
433 ¢roid,
434 VectorIndexInfo {
435 column: &self.policy.column_name,
436 dim: self.policy.dim,
437 hnsw_offset: index_abs_offset,
438 hnsw_len: index_len,
439 },
440 );
441 entry.embedding_model = self
442 .policy
443 .embedding_model
444 .as_ref()
445 .map(|m| m.to_property_value());
446 self.pending_files.push(entry);
447 Ok(())
448 }
449
450 pub async fn write_batch_multi(
455 &mut self,
456 batch: &RecordBatch,
457 columns: &[MultiVectorBatch<'_>],
458 ) -> AilakeResult<()> {
459 use ailake_core::AilakeError;
460 if self.captured_schema.is_none() {
461 self.captured_schema = Some(batch.schema());
462 }
463 if self.extra_vec_policies.is_empty() && columns.len() > 1 {
464 self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
465 }
466
467 if columns.is_empty() {
468 return Err(AilakeError::InvalidArgument(
469 "write_batch_multi requires at least one column".into(),
470 ));
471 }
472
473 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
474 let file_path = format!("data/part-{:05}.parquet", part_num);
475
476 let col_batches: Vec<VectorColumnBatch<'_>> = columns
477 .iter()
478 .map(|c| VectorColumnBatch {
479 policy: &c.policy,
480 embeddings: c.embeddings,
481 })
482 .collect();
483
484 let primary_policy = &columns[0].policy;
485 let file_writer = AilakeFileWriter::new(primary_policy.clone());
486 let file_bytes: Bytes = file_writer.write_multi(batch, &col_batches)?;
487 let file_size = file_bytes.len() as u64;
488
489 self.store.put(&file_path, file_bytes.clone()).await?;
490
491 let primary_centroid =
493 compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
494
495 let reader = ailake_file::AilakeFileReader::new(
497 file_bytes.clone(),
498 &primary_policy.column_name,
499 primary_policy.dim,
500 );
501 let primary_ailk_start = reader.ailk_offset()?;
502 let primary_header = {
503 use ailake_file::HEADER_SIZE;
504 let start = primary_ailk_start as usize;
505 let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
506 .try_into()
507 .map_err(|_| AilakeError::NotAnAilakeFile)?;
508 ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
509 };
510 let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
511
512 let mut extra: Vec<ExtraVectorIndex> = Vec::new();
514 for col in columns.iter().skip(1) {
515 let col_ailk_start = reader.ailk_offset_for_column(&col.policy.column_name)?;
516 let col_header = {
517 use ailake_file::HEADER_SIZE;
518 let start = col_ailk_start as usize;
519 let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
520 .try_into()
521 .map_err(|_| AilakeError::NotAnAilakeFile)?;
522 ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
523 };
524 let col_centroid = compute_centroid_and_radius(col.embeddings, col.policy.metric);
525 extra.push(ExtraVectorIndex {
526 column: col.policy.column_name.clone(),
527 dim: col.policy.dim,
528 hnsw_offset: col_ailk_start + col_header.hnsw_offset,
529 hnsw_len: col_header.hnsw_len,
530 centroid_b64: Some(encode_centroid_b64(&col_centroid)),
531 radius: Some(col_centroid.radius),
532 });
533 }
534
535 let mut entry = make_multi_column_data_file_entry(
536 &file_path,
537 columns[0].embeddings.len() as u64,
538 file_size,
539 &primary_centroid,
540 VectorIndexInfo {
541 column: &primary_policy.column_name,
542 dim: primary_policy.dim,
543 hnsw_offset: primary_hnsw_abs,
544 hnsw_len: primary_header.hnsw_len,
545 },
546 &extra,
547 );
548 entry.embedding_model = self
549 .policy
550 .embedding_model
551 .as_ref()
552 .map(|m| m.to_property_value());
553 self.pending_files.push(entry);
554 Ok(())
555 }
556
557 pub async fn write_batch_multi_deferred(
570 &mut self,
571 batch: &RecordBatch,
572 columns: &[MultiVectorBatch<'_>],
573 ) -> AilakeResult<()> {
574 use ailake_core::AilakeError;
575 if columns.is_empty() {
576 return Err(AilakeError::InvalidArgument(
577 "write_batch_multi_deferred requires at least one column".into(),
578 ));
579 }
580 if self.captured_schema.is_none() {
581 self.captured_schema = Some(batch.schema());
582 }
583 if self.extra_vec_policies.is_empty() && columns.len() > 1 {
584 self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
585 }
586
587 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
588 let file_path = format!("data/part-{:05}.parquet", part_num);
589
590 let primary_policy = &columns[0].policy;
592 let file_writer = AilakeFileWriter::new(primary_policy.clone());
593 let parquet_bytes = file_writer.write_parquet_only(batch, columns[0].embeddings)?;
594 let file_size = parquet_bytes.len() as u64;
595 self.store.put(&file_path, parquet_bytes).await?;
596
597 let primary_centroid =
599 compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
600 let mut entry = make_data_file_entry_indexing(
601 &file_path,
602 columns[0].embeddings.len() as u64,
603 file_size,
604 &primary_centroid,
605 &primary_policy.column_name,
606 primary_policy.dim,
607 );
608 entry.extra_vector_indexes = columns[1..]
611 .iter()
612 .map(|c| {
613 let col_centroid = compute_centroid_and_radius(c.embeddings, c.policy.metric);
614 ExtraVectorIndex {
615 column: c.policy.column_name.clone(),
616 dim: c.policy.dim,
617 hnsw_offset: 0,
618 hnsw_len: 0,
619 centroid_b64: Some(encode_centroid_b64(&col_centroid)),
620 radius: Some(col_centroid.radius),
621 }
622 })
623 .collect();
624 entry.embedding_model = self
625 .policy
626 .embedding_model
627 .as_ref()
628 .map(|m| m.to_property_value());
629 self.pending_files.push(entry);
630
631 let all_policies: Vec<VectorStoragePolicy> =
633 columns.iter().map(|c| c.policy.clone()).collect();
634 let all_embeddings: Vec<Vec<Vec<f32>>> =
635 columns.iter().map(|c| c.embeddings.to_vec()).collect();
636 let store = self.store.clone();
637 let catalog = self.catalog.clone();
638 let table = self.table.clone();
639 let fp = file_path.clone();
640 tokio::spawn(async move {
641 if let Err(e) =
642 build_and_patch_multi_index(store, catalog, all_policies, table, fp, all_embeddings)
643 .await
644 {
645 error!(
646 "ailake: deferred multi-column HNSW build failed — shard stays in flat-scan \
647 mode until next compaction rebuilds the index: {}",
648 e
649 );
650 }
651 });
652
653 Ok(())
654 }
655
656 pub async fn commit(mut self) -> AilakeResult<SnapshotId> {
662 if self.pending_files.is_empty() {
663 let current = self
664 .catalog
665 .load_table(&self.table)
666 .await
667 .ok()
668 .and_then(|m| m.current_snapshot_id)
669 .unwrap_or(0);
670 return Ok(current);
671 }
672 let iceberg_schema = self
673 .captured_schema
674 .as_deref()
675 .map(|s| arrow_schema_to_iceberg_update(s, &self.policy, &self.extra_vec_policies));
676 let mut extra_properties = std::collections::HashMap::new();
679 for ep in &self.extra_vec_policies {
680 extra_properties.insert(format!("ailake.dim-{}", ep.column_name), ep.dim.to_string());
681 extra_properties.insert(
682 format!("ailake.metric-{}", ep.column_name),
683 ailake_parquet::schema::metric_str(ep.metric).to_string(),
684 );
685 }
686 let snapshot = NewSnapshot {
687 snapshot_id: new_snapshot_id(),
688 parent_snapshot_id: self.parent_snapshot_id,
689 files: std::mem::take(&mut self.pending_files),
690 operation: SnapshotOperation::Append,
691 iceberg_schema,
692 extra_properties,
693 };
694 self.catalog.commit_snapshot(&self.table, snapshot).await
695 }
696
697 pub async fn create_or_open(
699 catalog: Arc<dyn CatalogProvider>,
700 store: Arc<dyn Store>,
701 policy: VectorStoragePolicy,
702 table: TableIdent,
703 ) -> AilakeResult<Self> {
704 match catalog.load_table(&table).await {
706 Ok(existing_meta) => {
707 if let Some(incoming) = &policy.embedding_model {
711 if let Some(stored_val) = existing_meta
712 .properties
713 .get(EmbeddingModelInfo::property_key())
714 {
715 let stored = EmbeddingModelInfo::from_property_value(stored_val);
716 if stored.name != incoming.name {
717 warn!(
718 "ailake: embedding model name changed: table has '{}', writing with '{}' \
719 (dim={}). Vectors may be incompatible for similarity search.",
720 stored.name, incoming.name, policy.dim
721 );
722 }
723 }
724 }
725 }
726 Err(_) => {
727 catalog
728 .create_table(
729 &table,
730 &TableProperties {
731 policy: policy.clone(),
732 extra: std::collections::HashMap::new(),
733 },
734 )
735 .await?;
736 }
737 }
738 Ok(Self::new(catalog, store, policy, table))
739 }
740}
741
742fn arrow_schema_to_iceberg_update(
749 schema: &arrow_schema::Schema,
750 policy: &VectorStoragePolicy,
751 extra_vec_policies: &[VectorStoragePolicy],
752) -> IcebergSchemaUpdate {
753 let bytes_per_dim = policy.precision.bytes_per_element() as u32;
754 let vec_fixed_len = policy.dim * bytes_per_dim;
755
756 let has_primary_in_batch = schema
758 .fields()
759 .iter()
760 .any(|f| f.name() == &policy.column_name);
761 let vec_cols: Vec<(String, u32)> = {
762 let mut v = Vec::new();
763 if !has_primary_in_batch {
764 v.push((policy.column_name.clone(), vec_fixed_len));
765 }
766 for ep in extra_vec_policies {
767 let ep_fixed_len = ep.dim * ep.precision.bytes_per_element() as u32;
768 if !schema.fields().iter().any(|f| f.name() == &ep.column_name) {
769 v.push((ep.column_name.clone(), ep_fixed_len));
770 }
771 }
772 v
773 };
774
775 let top_level_count = schema.fields().len() + vec_cols.len();
777 let mut nested_id = top_level_count as i32;
779
780 let mut fields: Vec<serde_json::Value> = Vec::new();
781 let mut name_mapping: Vec<serde_json::Value> = Vec::new();
782
783 for (idx, field) in schema.fields().iter().enumerate() {
784 let field_id = (idx + 1) as i32;
785 let iceberg_type = arrow_type_to_iceberg(field.data_type(), &mut nested_id);
786 fields.push(serde_json::json!({
787 "id": field_id,
788 "name": field.name(),
789 "required": false,
790 "type": iceberg_type,
791 }));
792 name_mapping.push(serde_json::json!({
793 "field-id": field_id,
794 "names": [field.name()],
795 }));
796 }
797
798 for (i, (col_name, fixed_len)) in vec_cols.iter().enumerate() {
800 let field_id = (schema.fields().len() + 1 + i) as i32;
801 fields.push(serde_json::json!({
802 "id": field_id,
803 "name": col_name,
804 "required": false,
805 "type": format!("fixed[{fixed_len}]"),
806 }));
807 name_mapping.push(serde_json::json!({
808 "field-id": field_id,
809 "names": [col_name],
810 }));
811 }
812
813 let last_column_id = nested_id;
814 let name_mapping_json = serde_json::to_string(&name_mapping).unwrap_or_else(|_| "[]".into());
815
816 IcebergSchemaUpdate {
817 fields,
818 last_column_id,
819 name_mapping_json,
820 }
821}
822
823fn arrow_type_to_iceberg(dt: &arrow_schema::DataType, nested_id: &mut i32) -> serde_json::Value {
828 use arrow_schema::DataType;
829 match dt {
830 DataType::Boolean => serde_json::json!("boolean"),
831 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
832 serde_json::json!("int")
833 }
834 DataType::Int64 | DataType::UInt32 | DataType::UInt64 => serde_json::json!("long"),
835 DataType::Float16 | DataType::Float32 => serde_json::json!("float"),
836 DataType::Float64 => serde_json::json!("double"),
837 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => serde_json::json!("string"),
838 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
839 serde_json::json!("binary")
840 }
841 DataType::Date32 | DataType::Date64 => serde_json::json!("date"),
842 DataType::Timestamp(_, Some(_)) => serde_json::json!("timestamptz"),
844 DataType::Timestamp(_, None) => serde_json::json!("timestamp"),
845 DataType::Time32(_) | DataType::Time64(_) => serde_json::json!("time"),
846 DataType::FixedSizeBinary(n) => serde_json::json!(format!("fixed[{n}]")),
847 DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
848 serde_json::json!(format!("decimal({p}, {s})"))
849 }
850 DataType::List(inner)
851 | DataType::LargeList(inner)
852 | DataType::ListView(inner)
853 | DataType::FixedSizeList(inner, _) => {
854 *nested_id += 1;
855 let element_id = *nested_id;
856 let element_type = arrow_type_to_iceberg(inner.data_type(), nested_id);
857 serde_json::json!({
858 "type": "list",
859 "element-id": element_id,
860 "element": element_type,
861 "element-required": !inner.is_nullable(),
862 })
863 }
864 DataType::Struct(arrow_fields) => {
865 let struct_fields: Vec<serde_json::Value> = arrow_fields
866 .iter()
867 .map(|f| {
868 *nested_id += 1;
869 let fid = *nested_id;
870 let ftype = arrow_type_to_iceberg(f.data_type(), nested_id);
871 serde_json::json!({
872 "id": fid,
873 "name": f.name(),
874 "required": !f.is_nullable(),
875 "type": ftype,
876 })
877 })
878 .collect();
879 serde_json::json!({ "type": "struct", "fields": struct_fields })
880 }
881 DataType::Map(entries, _) => {
882 *nested_id += 1;
884 let key_id = *nested_id;
885 *nested_id += 1;
886 let val_id = *nested_id;
887 if let DataType::Struct(kv_fields) = entries.data_type() {
888 let key_f = kv_fields
889 .iter()
890 .find(|f| f.name() == "key" || f.name() == "keys");
891 let val_f = kv_fields
892 .iter()
893 .find(|f| f.name() == "value" || f.name() == "values");
894 let key_type = key_f
895 .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
896 .unwrap_or(serde_json::json!("binary"));
897 let val_type = val_f
898 .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
899 .unwrap_or(serde_json::json!("binary"));
900 let val_required = val_f.map(|f| !f.is_nullable()).unwrap_or(false);
901 serde_json::json!({
902 "type": "map",
903 "key-id": key_id,
904 "key": key_type,
905 "value-id": val_id,
906 "value": val_type,
907 "value-required": val_required,
908 })
909 } else {
910 serde_json::json!("binary")
911 }
912 }
913 _ => serde_json::json!("binary"),
914 }
915}
916
917async fn build_and_patch_index(
919 store: Arc<dyn Store>,
920 catalog: Arc<dyn CatalogProvider>,
921 policy: VectorStoragePolicy,
922 table: TableIdent,
923 file_path: String,
924) -> AilakeResult<()> {
925 let parquet_bytes = store.get(&file_path).await?;
927 let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
928 let (batch, embeddings) = reader.read_parquet()?;
929
930 let full_bytes = tokio::task::spawn_blocking({
933 let policy = policy.clone();
934 move || {
935 let file_writer = AilakeFileWriter::new(policy);
936 file_writer.write(&batch, &embeddings)
937 }
938 })
939 .await
940 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
941
942 let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
944 let header = full_reader.read_header()?;
945 let ailk_start = full_reader.ailk_offset()?;
946 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
947 let hnsw_len = header.hnsw_len;
948
949 store.put(&file_path, full_bytes).await?;
951
952 let mut committed = false;
955 for _ in 0..120u32 {
956 match catalog.load_table(&table).await {
957 Ok(meta) if meta.current_snapshot_id.is_some() => {
958 committed = true;
959 break;
960 }
961 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
962 }
963 }
964 if !committed {
965 return Err(ailake_core::AilakeError::Store(format!(
966 "deferred HNSW build: no snapshot committed for {file_path} after 60 s — \
967 did you call TableWriter::commit()?"
968 )));
969 }
970
971 for attempt in 0..50u32 {
976 let table_meta = catalog.load_table(&table).await?;
977 let parent_snapshot_id = table_meta.current_snapshot_id;
978 let mut files = catalog.list_files(&table, None).await?;
979
980 if files
982 .iter()
983 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
984 {
985 break;
986 }
987
988 for f in &mut files {
989 if f.path == file_path {
990 f.hnsw_offset = Some(hnsw_abs_offset);
991 f.hnsw_len = Some(hnsw_len);
992 f.index_status = IndexStatus::Ready;
993 break;
994 }
995 }
996 catalog
997 .commit_snapshot(
998 &table,
999 NewSnapshot {
1000 snapshot_id: new_snapshot_id(),
1001 parent_snapshot_id,
1002 files,
1003 operation: SnapshotOperation::Replace,
1004 iceberg_schema: None,
1005 extra_properties: std::collections::HashMap::new(),
1006 },
1007 )
1008 .await?;
1009
1010 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1012
1013 let verify = catalog.list_files(&table, None).await?;
1014 if verify
1015 .iter()
1016 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1017 {
1018 break;
1019 }
1020 }
1022
1023 info!(
1024 "ailake: deferred HNSW index built for {} (offset={}, len={})",
1025 file_path, hnsw_abs_offset, hnsw_len
1026 );
1027 Ok(())
1028}
1029
1030async fn build_ivf_pq_and_patch_index(
1035 store: Arc<dyn Store>,
1036 catalog: Arc<dyn CatalogProvider>,
1037 policy: VectorStoragePolicy,
1038 table: TableIdent,
1039 file_path: String,
1040 ivf_config: IvfPqConfig,
1041 codebook_cell: Arc<tokio::sync::OnceCell<IvfPqCodebook>>,
1042) -> AilakeResult<()> {
1043 let parquet_bytes = store.get(&file_path).await?;
1044 let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
1045 let (batch, embeddings) = reader.read_parquet()?;
1046
1047 let codebook = codebook_cell
1049 .get_or_try_init(|| async {
1050 let vecs = embeddings.clone();
1051 let metric = policy.metric;
1052 let cfg = ivf_config.clone();
1053 tokio::task::spawn_blocking(move || {
1054 ailake_index::IvfPqIndex::train_codebook(&vecs, metric, &cfg)
1055 })
1056 .await
1057 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))?
1058 })
1059 .await?;
1060
1061 let full_bytes = tokio::task::spawn_blocking({
1062 let policy = policy.clone();
1063 let codebook = codebook.clone();
1064 move || {
1065 let file_writer = AilakeFileWriter::new(policy)
1066 .with_index_type(IndexType::IvfPq(ivf_config))
1067 .with_shared_ivf_codebook(Arc::new(codebook));
1068 file_writer.write(&batch, &embeddings)
1069 }
1070 })
1071 .await
1072 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1073
1074 let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
1075 let header = full_reader.read_header()?;
1076 let ailk_start = full_reader.ailk_offset()?;
1077 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
1078 let hnsw_len = header.hnsw_len;
1079
1080 store.put(&file_path, full_bytes).await?;
1081
1082 let mut committed = false;
1084 for _ in 0..120u32 {
1085 match catalog.load_table(&table).await {
1086 Ok(meta) if meta.current_snapshot_id.is_some() => {
1087 committed = true;
1088 break;
1089 }
1090 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1091 }
1092 }
1093 if !committed {
1094 return Err(ailake_core::AilakeError::Store(format!(
1095 "deferred IVF-PQ build: no snapshot committed for {file_path} after 60 s — \
1096 did you call TableWriter::commit()?"
1097 )));
1098 }
1099
1100 for attempt in 0..50u32 {
1101 let table_meta = catalog.load_table(&table).await?;
1102 let parent_snapshot_id = table_meta.current_snapshot_id;
1103 let mut files = catalog.list_files(&table, None).await?;
1104
1105 if files
1106 .iter()
1107 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1108 {
1109 break;
1110 }
1111
1112 for f in &mut files {
1113 if f.path == file_path {
1114 f.hnsw_offset = Some(hnsw_abs_offset);
1115 f.hnsw_len = Some(hnsw_len);
1116 f.index_status = IndexStatus::Ready;
1117 break;
1118 }
1119 }
1120 catalog
1121 .commit_snapshot(
1122 &table,
1123 NewSnapshot {
1124 snapshot_id: new_snapshot_id(),
1125 parent_snapshot_id,
1126 files,
1127 operation: SnapshotOperation::Replace,
1128 iceberg_schema: None,
1129 extra_properties: std::collections::HashMap::new(),
1130 },
1131 )
1132 .await?;
1133
1134 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1135
1136 let verify = catalog.list_files(&table, None).await?;
1137 if verify
1138 .iter()
1139 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1140 {
1141 break;
1142 }
1143 }
1144
1145 info!(
1146 "ailake: deferred IVF-PQ index built for {} (offset={}, len={})",
1147 file_path, hnsw_abs_offset, hnsw_len
1148 );
1149 Ok(())
1150}
1151
1152async fn build_and_patch_multi_index(
1158 store: Arc<dyn Store>,
1159 catalog: Arc<dyn CatalogProvider>,
1160 policies: Vec<VectorStoragePolicy>,
1161 table: TableIdent,
1162 file_path: String,
1163 all_embeddings: Vec<Vec<Vec<f32>>>,
1164) -> AilakeResult<()> {
1165 let parquet_bytes = store.get(&file_path).await?;
1167 let primary_reader =
1168 AilakeFileReader::new(parquet_bytes, &policies[0].column_name, policies[0].dim);
1169 let (batch, _) = primary_reader.read_parquet()?;
1170
1171 let full_bytes = tokio::task::spawn_blocking({
1173 let policies = policies.clone();
1174 let all_embeddings = all_embeddings.clone();
1175 move || {
1176 let col_batches: Vec<VectorColumnBatch<'_>> = policies
1177 .iter()
1178 .zip(all_embeddings.iter())
1179 .map(|(p, embs)| VectorColumnBatch {
1180 policy: p,
1181 embeddings: embs.as_slice(),
1182 })
1183 .collect();
1184 let file_writer = AilakeFileWriter::new(policies[0].clone());
1185 file_writer.write_multi(&batch, &col_batches)
1186 }
1187 })
1188 .await
1189 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
1190
1191 let primary_reader = AilakeFileReader::new(
1193 full_bytes.clone(),
1194 &policies[0].column_name,
1195 policies[0].dim,
1196 );
1197 let primary_header = primary_reader.read_header()?;
1198 let primary_ailk_start = primary_reader.ailk_offset()?;
1199 let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
1200 let primary_hnsw_len = primary_header.hnsw_len;
1201
1202 let mut extra_offsets: Vec<(u64, u64)> = Vec::with_capacity(policies.len().saturating_sub(1));
1207 for col_policy in policies.iter().skip(1) {
1208 let col_reader =
1209 AilakeFileReader::new(full_bytes.clone(), &col_policy.column_name, col_policy.dim);
1210 let col_ailk_start = col_reader.ailk_offset_for_column(&col_policy.column_name)?;
1211 let col_header = col_reader.read_header_for_column(&col_policy.column_name)?;
1212 extra_offsets.push((col_ailk_start + col_header.hnsw_offset, col_header.hnsw_len));
1213 }
1214
1215 store.put(&file_path, full_bytes).await?;
1217
1218 let mut committed = false;
1220 for _ in 0..120u32 {
1221 match catalog.load_table(&table).await {
1222 Ok(meta) if meta.current_snapshot_id.is_some() => {
1223 committed = true;
1224 break;
1225 }
1226 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
1227 }
1228 }
1229 if !committed {
1230 return Err(ailake_core::AilakeError::Store(format!(
1231 "deferred index build: no snapshot committed for {file_path} after 60 s — \
1232 did you call TableWriter::commit()?"
1233 )));
1234 }
1235
1236 for attempt in 0..50u32 {
1238 let table_meta = catalog.load_table(&table).await?;
1239 let parent_snapshot_id = table_meta.current_snapshot_id;
1240 let mut files = catalog.list_files(&table, None).await?;
1241
1242 if files
1243 .iter()
1244 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1245 {
1246 break;
1247 }
1248
1249 for f in &mut files {
1250 if f.path == file_path {
1251 f.hnsw_offset = Some(primary_hnsw_abs);
1252 f.hnsw_len = Some(primary_hnsw_len);
1253 f.index_status = IndexStatus::Ready;
1254 for (i, &(off, len)) in extra_offsets.iter().enumerate() {
1255 if let Some(xi) = f.extra_vector_indexes.get_mut(i) {
1256 xi.hnsw_offset = off;
1257 xi.hnsw_len = len;
1258 }
1259 }
1260 break;
1261 }
1262 }
1263 catalog
1264 .commit_snapshot(
1265 &table,
1266 NewSnapshot {
1267 snapshot_id: new_snapshot_id(),
1268 parent_snapshot_id,
1269 files,
1270 operation: SnapshotOperation::Replace,
1271 iceberg_schema: None,
1272 extra_properties: std::collections::HashMap::new(),
1273 },
1274 )
1275 .await?;
1276
1277 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
1278
1279 let verify = catalog.list_files(&table, None).await?;
1280 if verify
1281 .iter()
1282 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
1283 {
1284 break;
1285 }
1286 }
1287
1288 info!(
1289 "ailake: deferred multi-column HNSW built for {} ({} cols, primary offset={})",
1290 file_path,
1291 policies.len(),
1292 primary_hnsw_abs
1293 );
1294 Ok(())
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299 use super::*;
1300 use ailake_core::{VectorMetric, VectorPrecision};
1301 use arrow_schema::{DataType, Field, Schema, TimeUnit};
1302
1303 fn policy(col: &str, dim: u32) -> VectorStoragePolicy {
1304 VectorStoragePolicy {
1305 column_name: col.to_string(),
1306 dim,
1307 metric: VectorMetric::Cosine,
1308 precision: VectorPrecision::F16,
1309 pq: None,
1310 keep_raw_for_reranking: true,
1311 pre_normalize: false,
1312 hnsw_m: None,
1313 hnsw_ef_construction: None,
1314 ivf_residual: false,
1315 embedding_model: None,
1316 modality: None,
1317 }
1318 }
1319
1320 fn update_for(schema: &Schema, pol: &VectorStoragePolicy) -> IcebergSchemaUpdate {
1321 arrow_schema_to_iceberg_update(schema, pol, &[])
1322 }
1323
1324 #[test]
1325 fn simple_schema_produces_correct_fields() {
1326 let schema = Schema::new(vec![
1327 Field::new("id", DataType::Int32, false),
1328 Field::new("text", DataType::Utf8, false),
1329 ]);
1330 let pol = policy("embedding", 8);
1331 let upd = update_for(&schema, &pol);
1332
1333 assert_eq!(upd.fields.len(), 3);
1334 assert_eq!(upd.fields[0]["id"], 1);
1335 assert_eq!(upd.fields[0]["type"], "int");
1336 assert_eq!(upd.fields[1]["id"], 2);
1337 assert_eq!(upd.fields[1]["type"], "string");
1338 assert_eq!(upd.fields[2]["id"], 3);
1339 assert_eq!(upd.fields[2]["type"], "fixed[16]"); let nm: Vec<serde_json::Value> = serde_json::from_str(&upd.name_mapping_json).unwrap();
1342 assert_eq!(nm.len(), 3);
1343 assert_eq!(nm[2]["field-id"], 3);
1344 assert_eq!(nm[2]["names"][0], "embedding");
1345 assert_eq!(upd.last_column_id, 3);
1346 }
1347
1348 #[test]
1349 fn timestamp_without_tz_maps_to_timestamp_not_timestamptz() {
1350 let schema = Schema::new(vec![
1351 Field::new(
1352 "created_at",
1353 DataType::Timestamp(TimeUnit::Microsecond, None),
1354 true,
1355 ),
1356 Field::new(
1357 "updated_at",
1358 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1359 true,
1360 ),
1361 ]);
1362 let pol = policy("vec", 4);
1363 let upd = update_for(&schema, &pol);
1364
1365 assert_eq!(upd.fields[0]["type"], "timestamp");
1366 assert_eq!(upd.fields[1]["type"], "timestamptz");
1367 }
1368
1369 #[test]
1370 fn list_type_produces_iceberg_list_object() {
1371 let schema = Schema::new(vec![Field::new(
1372 "tags",
1373 DataType::List(std::sync::Arc::new(Field::new(
1374 "item",
1375 DataType::Utf8,
1376 true,
1377 ))),
1378 true,
1379 )]);
1380 let pol = policy("vec", 4);
1381 let upd = update_for(&schema, &pol);
1382
1383 let t = &upd.fields[0]["type"];
1384 assert_eq!(t["type"], "list");
1385 assert_eq!(t["element"], "string");
1386 assert!(t["element-id"].as_i64().unwrap() > 2);
1388 }
1389
1390 #[test]
1391 fn struct_type_produces_nested_fields() {
1392 let schema = Schema::new(vec![Field::new(
1393 "meta",
1394 DataType::Struct(
1395 vec![
1396 Field::new("key", DataType::Utf8, false),
1397 Field::new("val", DataType::Int64, false),
1398 ]
1399 .into(),
1400 ),
1401 true,
1402 )]);
1403 let pol = policy("vec", 4);
1404 let upd = update_for(&schema, &pol);
1405
1406 let t = &upd.fields[0]["type"];
1407 assert_eq!(t["type"], "struct");
1408 let nested = t["fields"].as_array().unwrap();
1409 assert_eq!(nested.len(), 2);
1410 assert_eq!(nested[0]["name"], "key");
1411 assert_eq!(nested[0]["type"], "string");
1412 assert_eq!(nested[1]["name"], "val");
1413 assert_eq!(nested[1]["type"], "long");
1414 assert!(nested[0]["id"].as_i64().unwrap() > 2);
1416 }
1417
1418 #[test]
1419 fn no_duplicate_vec_column_when_already_in_batch() {
1420 let schema = Schema::new(vec![
1422 Field::new("id", DataType::Int32, false),
1423 Field::new("embedding", DataType::FixedSizeBinary(16), false),
1424 ]);
1425 let pol = policy("embedding", 8);
1426 let upd = update_for(&schema, &pol);
1427
1428 assert_eq!(upd.fields.len(), 2, "should not add embedding twice");
1429 let names: Vec<&str> = upd
1430 .fields
1431 .iter()
1432 .map(|f| f["name"].as_str().unwrap())
1433 .collect();
1434 assert_eq!(names.iter().filter(|&&n| n == "embedding").count(), 1);
1435 }
1436
1437 #[test]
1438 fn multi_vec_policies_all_appended() {
1439 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
1440 let primary = policy("embedding", 4);
1441 let extra = vec![policy("context_embedding", 4)];
1442 let upd = arrow_schema_to_iceberg_update(&schema, &primary, &extra);
1443
1444 assert_eq!(upd.fields.len(), 3); let names: Vec<&str> = upd
1446 .fields
1447 .iter()
1448 .map(|f| f["name"].as_str().unwrap())
1449 .collect();
1450 assert!(names.contains(&"embedding"));
1451 assert!(names.contains(&"context_embedding"));
1452 }
1453
1454 #[test]
1455 fn top_level_field_ids_match_parquet_stamp_sequence() {
1456 let schema = Schema::new(vec![
1458 Field::new("id", DataType::Int64, false),
1459 Field::new(
1460 "tags",
1461 DataType::List(std::sync::Arc::new(Field::new(
1462 "item",
1463 DataType::Utf8,
1464 true,
1465 ))),
1466 true,
1467 ),
1468 ]);
1469 let pol = policy("vec", 4);
1470 let upd = update_for(&schema, &pol);
1471
1472 assert_eq!(upd.fields[0]["id"], 1);
1474 assert_eq!(upd.fields[1]["id"], 2);
1475 assert_eq!(upd.fields[2]["id"], 3);
1476
1477 assert!(upd.fields[1]["type"]["element-id"].as_i64().unwrap() > 3);
1479 }
1480
1481 #[tokio::test]
1484 async fn write_batch_auto_deferred_stages_file() {
1485 use ailake_catalog::{HadoopCatalog, TableIdent};
1486 use ailake_store::LocalStore;
1487 use arrow_schema::{DataType, Field, Schema};
1488
1489 let dir = tempfile::tempdir().unwrap();
1490 let store: std::sync::Arc<dyn ailake_store::Store> =
1491 std::sync::Arc::new(LocalStore::new(dir.path().to_str().unwrap()));
1492 let catalog = std::sync::Arc::new(HadoopCatalog::new(std::sync::Arc::clone(&store), ""));
1493 let pol = policy("embedding", 4);
1494 let ident = TableIdent::new("default", "t");
1495
1496 let mut writer = TableWriter::create_or_open(catalog, store, pol, ident)
1497 .await
1498 .unwrap();
1499
1500 let schema =
1501 std::sync::Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
1502 let batch = arrow_array::RecordBatch::try_new(
1503 schema,
1504 vec![std::sync::Arc::new(arrow_array::StringArray::from(vec![
1505 "hello",
1506 ]))],
1507 )
1508 .unwrap();
1509 let embeddings = vec![vec![1.0f32, 0.0, 0.0, 0.0]];
1510
1511 writer
1512 .write_batch_auto_deferred(&batch, &embeddings)
1513 .await
1514 .unwrap();
1515
1516 assert_eq!(writer.pending_files.len(), 1);
1518 }
1519
1520 #[tokio::test]
1523 async fn write_batch_multi_deferred_stages_file_with_extra_indexes() {
1524 use ailake_catalog::{HadoopCatalog, IndexStatus, TableIdent};
1525 use ailake_store::LocalStore;
1526 use arrow_schema::{DataType, Field, Schema};
1527
1528 let dir = tempfile::tempdir().unwrap();
1529 let store: std::sync::Arc<dyn ailake_store::Store> =
1530 std::sync::Arc::new(LocalStore::new(dir.path().to_str().unwrap()));
1531 let catalog = std::sync::Arc::new(HadoopCatalog::new(std::sync::Arc::clone(&store), ""));
1532 let primary_pol = policy("embedding", 4);
1533 let ident = TableIdent::new("default", "t");
1534
1535 let mut writer = TableWriter::create_or_open(catalog, store, primary_pol, ident)
1536 .await
1537 .unwrap();
1538
1539 let schema =
1540 std::sync::Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
1541 let batch = arrow_array::RecordBatch::try_new(
1542 schema,
1543 vec![std::sync::Arc::new(arrow_array::StringArray::from(vec![
1544 "hello", "world",
1545 ]))],
1546 )
1547 .unwrap();
1548
1549 let text_embs = vec![vec![1.0f32, 0.0, 0.0, 0.0], vec![0.0, 1.0, 0.0, 0.0]];
1550 let img_embs = vec![vec![1.0f32, 0.0], vec![0.0, 1.0]];
1551
1552 let columns = vec![
1553 MultiVectorBatch {
1554 policy: policy("embedding", 4),
1555 embeddings: &text_embs,
1556 },
1557 MultiVectorBatch {
1558 policy: policy("img_embedding", 2),
1559 embeddings: &img_embs,
1560 },
1561 ];
1562
1563 writer
1564 .write_batch_multi_deferred(&batch, &columns)
1565 .await
1566 .unwrap();
1567
1568 assert_eq!(writer.pending_files.len(), 1);
1569 let entry = &writer.pending_files[0];
1570 assert_eq!(entry.index_status, IndexStatus::Indexing);
1572 assert!(entry.centroid_b64.is_some());
1574 assert_eq!(entry.extra_vector_indexes.len(), 1);
1576 let xi = &entry.extra_vector_indexes[0];
1577 assert_eq!(xi.column, "img_embedding");
1578 assert_eq!(xi.dim, 2);
1579 assert_eq!(xi.hnsw_offset, 0); assert_eq!(xi.hnsw_len, 0); assert!(xi.centroid_b64.is_some());
1582 }
1583}