1use std::sync::atomic::{AtomicU32, Ordering};
2use std::sync::Arc;
3
4use ailake_catalog::{
5 encode_centroid_b64, make_data_file_entry, make_data_file_entry_indexing,
6 make_multi_column_data_file_entry, new_snapshot_id, CatalogProvider, DataFileEntry,
7 ExtraVectorIndex, IcebergSchemaUpdate, IndexStatus, NewSnapshot, SnapshotId, SnapshotOperation,
8 TableIdent, TableProperties, VectorIndexInfo,
9};
10use ailake_core::{AilakeResult, VectorStoragePolicy};
11use ailake_file::{AilakeFileReader, AilakeFileWriter, IndexType, VectorColumnBatch};
12use ailake_index::IvfPqConfig;
13use ailake_store::Store;
14use ailake_vec::compute_centroid_and_radius;
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use bytes::Bytes;
18use serde_json;
19
20pub struct MultiVectorBatch<'a> {
22 pub policy: VectorStoragePolicy,
23 pub embeddings: &'a [Vec<f32>],
24}
25
26pub struct TableWriter {
27 catalog: Arc<dyn CatalogProvider>,
28 store: Arc<dyn Store>,
29 policy: VectorStoragePolicy,
30 table: TableIdent,
31 part_counter: Arc<AtomicU32>,
32 pending_files: Vec<DataFileEntry>,
33 parent_snapshot_id: Option<SnapshotId>,
34 captured_schema: Option<SchemaRef>,
37 extra_vec_policies: Vec<VectorStoragePolicy>,
39}
40
41impl TableWriter {
42 pub fn new(
43 catalog: Arc<dyn CatalogProvider>,
44 store: Arc<dyn Store>,
45 policy: VectorStoragePolicy,
46 table: TableIdent,
47 ) -> Self {
48 Self {
49 catalog,
50 store,
51 policy,
52 table,
53 part_counter: Arc::new(AtomicU32::new(0)),
54 pending_files: Vec::new(),
55 parent_snapshot_id: None,
56 captured_schema: None,
57 extra_vec_policies: Vec::new(),
58 }
59 }
60
61 pub fn with_parent_snapshot(mut self, id: SnapshotId) -> Self {
62 self.parent_snapshot_id = Some(id);
63 self
64 }
65
66 pub async fn write_batch_deferred(
76 &mut self,
77 batch: &RecordBatch,
78 embeddings: &[Vec<f32>],
79 ) -> AilakeResult<()> {
80 if self.captured_schema.is_none() {
81 self.captured_schema = Some(batch.schema());
82 }
83 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
84 let file_path = format!("data/part-{:05}.parquet", part_num);
85
86 let file_writer = AilakeFileWriter::new(self.policy.clone());
88 let parquet_bytes = file_writer.write_parquet_only(batch, embeddings)?;
89 let file_size = parquet_bytes.len() as u64;
90 self.store.put(&file_path, parquet_bytes).await?;
91
92 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
94 let entry = make_data_file_entry_indexing(
95 &file_path,
96 embeddings.len() as u64,
97 file_size,
98 ¢roid,
99 &self.policy.column_name,
100 self.policy.dim,
101 );
102 self.pending_files.push(entry);
103
104 let store = self.store.clone();
106 let catalog = self.catalog.clone();
107 let policy = self.policy.clone();
108 let table = self.table.clone();
109 let fp = file_path.clone();
110 tokio::spawn(async move {
111 if let Err(e) = build_and_patch_index(store, catalog, policy, table, fp).await {
112 eprintln!("[ailake] deferred HNSW build failed: {e}");
113 }
114 });
115
116 Ok(())
117 }
118
119 pub async fn write_batch(
121 &mut self,
122 batch: &RecordBatch,
123 embeddings: &[Vec<f32>],
124 ) -> AilakeResult<()> {
125 if self.captured_schema.is_none() {
126 self.captured_schema = Some(batch.schema());
127 }
128 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
129 let file_path = format!("data/part-{:05}.parquet", part_num);
130
131 let file_writer = AilakeFileWriter::new(self.policy.clone());
133 let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
134 let file_size = file_bytes.len() as u64;
135
136 self.store.put(&file_path, file_bytes.clone()).await?;
138
139 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
141
142 let reader = ailake_file::AilakeFileReader::new(
144 file_bytes,
145 &self.policy.column_name,
146 self.policy.dim,
147 );
148 let header = reader.read_header()?;
149 let ailk_start = reader.ailk_offset()?;
150 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
151 let hnsw_len = header.hnsw_len;
152
153 let entry = make_data_file_entry(
154 &file_path,
155 embeddings.len() as u64,
156 file_size,
157 ¢roid,
158 VectorIndexInfo {
159 column: &self.policy.column_name,
160 dim: self.policy.dim,
161 hnsw_offset: hnsw_abs_offset,
162 hnsw_len,
163 },
164 );
165 self.pending_files.push(entry);
166 Ok(())
167 }
168
169 pub async fn write_batch_auto(
175 &mut self,
176 batch: &RecordBatch,
177 embeddings: &[Vec<f32>],
178 ) -> AilakeResult<()> {
179 let profile = ailake_index::HardwareProfile::detect();
180 if profile.recommend_ivf_pq(embeddings.len()) {
181 let ivf_config =
182 ailake_index::IvfPqConfig::for_dataset(self.policy.dim as usize, embeddings.len());
183 self.write_batch_ivf_pq(batch, embeddings, ivf_config).await
184 } else {
185 self.write_batch(batch, embeddings).await
186 }
187 }
188
189 pub async fn write_batch_ivf_pq(
193 &mut self,
194 batch: &RecordBatch,
195 embeddings: &[Vec<f32>],
196 ivf_config: IvfPqConfig,
197 ) -> AilakeResult<()> {
198 if self.captured_schema.is_none() {
199 self.captured_schema = Some(batch.schema());
200 }
201 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
202 let file_path = format!("data/part-{:05}.parquet", part_num);
203
204 let file_writer = AilakeFileWriter::new(self.policy.clone())
205 .with_index_type(IndexType::IvfPq(ivf_config));
206 let file_bytes: Bytes = file_writer.write(batch, embeddings)?;
207 let file_size = file_bytes.len() as u64;
208
209 self.store.put(&file_path, file_bytes.clone()).await?;
210
211 let centroid = compute_centroid_and_radius(embeddings, self.policy.metric);
212
213 let reader = ailake_file::AilakeFileReader::new(
214 file_bytes,
215 &self.policy.column_name,
216 self.policy.dim,
217 );
218 let header = reader.read_header()?;
219 let ailk_start = reader.ailk_offset()?;
220 let index_abs_offset = ailk_start + header.hnsw_offset;
221 let index_len = header.hnsw_len;
222
223 let entry = make_data_file_entry(
224 &file_path,
225 embeddings.len() as u64,
226 file_size,
227 ¢roid,
228 VectorIndexInfo {
229 column: &self.policy.column_name,
230 dim: self.policy.dim,
231 hnsw_offset: index_abs_offset,
232 hnsw_len: index_len,
233 },
234 );
235 self.pending_files.push(entry);
236 Ok(())
237 }
238
239 pub async fn write_batch_multi(
244 &mut self,
245 batch: &RecordBatch,
246 columns: &[MultiVectorBatch<'_>],
247 ) -> AilakeResult<()> {
248 use ailake_core::AilakeError;
249 if self.captured_schema.is_none() {
250 self.captured_schema = Some(batch.schema());
251 }
252 if self.extra_vec_policies.is_empty() && columns.len() > 1 {
253 self.extra_vec_policies = columns[1..].iter().map(|c| c.policy.clone()).collect();
254 }
255
256 if columns.is_empty() {
257 return Err(AilakeError::InvalidArgument(
258 "write_batch_multi requires at least one column".into(),
259 ));
260 }
261
262 let part_num = self.part_counter.fetch_add(1, Ordering::SeqCst);
263 let file_path = format!("data/part-{:05}.parquet", part_num);
264
265 let col_batches: Vec<VectorColumnBatch<'_>> = columns
266 .iter()
267 .map(|c| VectorColumnBatch {
268 policy: &c.policy,
269 embeddings: c.embeddings,
270 })
271 .collect();
272
273 let primary_policy = &columns[0].policy;
274 let file_writer = AilakeFileWriter::new(primary_policy.clone());
275 let file_bytes: Bytes = file_writer.write_multi(batch, &col_batches)?;
276 let file_size = file_bytes.len() as u64;
277
278 self.store.put(&file_path, file_bytes.clone()).await?;
279
280 let primary_centroid =
282 compute_centroid_and_radius(columns[0].embeddings, primary_policy.metric);
283
284 let reader = ailake_file::AilakeFileReader::new(
286 file_bytes.clone(),
287 &primary_policy.column_name,
288 primary_policy.dim,
289 );
290 let primary_ailk_start = reader.ailk_offset()?;
291 let primary_header = {
292 use ailake_file::HEADER_SIZE;
293 let start = primary_ailk_start as usize;
294 let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
295 .try_into()
296 .map_err(|_| AilakeError::NotAnAilakeFile)?;
297 ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
298 };
299 let primary_hnsw_abs = primary_ailk_start + primary_header.hnsw_offset;
300
301 let mut extra: Vec<ExtraVectorIndex> = Vec::new();
303 for col in columns.iter().skip(1) {
304 let col_ailk_start = reader.ailk_offset_for_column(&col.policy.column_name)?;
305 let col_header = {
306 use ailake_file::HEADER_SIZE;
307 let start = col_ailk_start as usize;
308 let hdr_bytes: &[u8; HEADER_SIZE] = file_bytes[start..start + HEADER_SIZE]
309 .try_into()
310 .map_err(|_| AilakeError::NotAnAilakeFile)?;
311 ailake_file::AilakeHeader::from_bytes(hdr_bytes)?
312 };
313 let col_centroid = compute_centroid_and_radius(col.embeddings, col.policy.metric);
314 extra.push(ExtraVectorIndex {
315 column: col.policy.column_name.clone(),
316 dim: col.policy.dim,
317 hnsw_offset: col_ailk_start + col_header.hnsw_offset,
318 hnsw_len: col_header.hnsw_len,
319 centroid_b64: Some(encode_centroid_b64(&col_centroid)),
320 radius: Some(col_centroid.radius),
321 });
322 }
323
324 let entry = make_multi_column_data_file_entry(
325 &file_path,
326 columns[0].embeddings.len() as u64,
327 file_size,
328 &primary_centroid,
329 VectorIndexInfo {
330 column: &primary_policy.column_name,
331 dim: primary_policy.dim,
332 hnsw_offset: primary_hnsw_abs,
333 hnsw_len: primary_header.hnsw_len,
334 },
335 &extra,
336 );
337 self.pending_files.push(entry);
338 Ok(())
339 }
340
341 pub async fn commit(mut self) -> AilakeResult<SnapshotId> {
343 let iceberg_schema = self
344 .captured_schema
345 .as_deref()
346 .map(|s| arrow_schema_to_iceberg_update(s, &self.policy, &self.extra_vec_policies));
347 let snapshot = NewSnapshot {
348 snapshot_id: new_snapshot_id(),
349 parent_snapshot_id: self.parent_snapshot_id,
350 files: std::mem::take(&mut self.pending_files),
351 operation: SnapshotOperation::Append,
352 iceberg_schema,
353 };
354 self.catalog.commit_snapshot(&self.table, snapshot).await
355 }
356
357 pub async fn create_or_open(
359 catalog: Arc<dyn CatalogProvider>,
360 store: Arc<dyn Store>,
361 policy: VectorStoragePolicy,
362 table: TableIdent,
363 ) -> AilakeResult<Self> {
364 if catalog.load_table(&table).await.is_err() {
366 catalog
367 .create_table(
368 &table,
369 &TableProperties {
370 policy: policy.clone(),
371 extra: std::collections::HashMap::new(),
372 },
373 )
374 .await?;
375 }
376 Ok(Self::new(catalog, store, policy, table))
377 }
378}
379
380fn arrow_schema_to_iceberg_update(
387 schema: &arrow_schema::Schema,
388 policy: &VectorStoragePolicy,
389 extra_vec_policies: &[VectorStoragePolicy],
390) -> IcebergSchemaUpdate {
391 let bytes_per_dim = policy.precision.bytes_per_element() as u32;
392 let vec_fixed_len = policy.dim * bytes_per_dim;
393
394 let has_primary_in_batch = schema
396 .fields()
397 .iter()
398 .any(|f| f.name() == &policy.column_name);
399 let vec_cols: Vec<(String, u32)> = {
400 let mut v = Vec::new();
401 if !has_primary_in_batch {
402 v.push((policy.column_name.clone(), vec_fixed_len));
403 }
404 for ep in extra_vec_policies {
405 let ep_fixed_len = ep.dim * ep.precision.bytes_per_element() as u32;
406 if !schema.fields().iter().any(|f| f.name() == &ep.column_name) {
407 v.push((ep.column_name.clone(), ep_fixed_len));
408 }
409 }
410 v
411 };
412
413 let top_level_count = schema.fields().len() + vec_cols.len();
415 let mut nested_id = top_level_count as i32;
417
418 let mut fields: Vec<serde_json::Value> = Vec::new();
419 let mut name_mapping: Vec<serde_json::Value> = Vec::new();
420
421 for (idx, field) in schema.fields().iter().enumerate() {
422 let field_id = (idx + 1) as i32;
423 let iceberg_type = arrow_type_to_iceberg(field.data_type(), &mut nested_id);
424 fields.push(serde_json::json!({
425 "id": field_id,
426 "name": field.name(),
427 "required": false,
428 "type": iceberg_type,
429 }));
430 name_mapping.push(serde_json::json!({
431 "field-id": field_id,
432 "names": [field.name()],
433 }));
434 }
435
436 for (i, (col_name, fixed_len)) in vec_cols.iter().enumerate() {
438 let field_id = (schema.fields().len() + 1 + i) as i32;
439 fields.push(serde_json::json!({
440 "id": field_id,
441 "name": col_name,
442 "required": false,
443 "type": format!("fixed[{fixed_len}]"),
444 }));
445 name_mapping.push(serde_json::json!({
446 "field-id": field_id,
447 "names": [col_name],
448 }));
449 }
450
451 let last_column_id = nested_id;
452 let name_mapping_json = serde_json::to_string(&name_mapping).unwrap_or_else(|_| "[]".into());
453
454 IcebergSchemaUpdate {
455 fields,
456 last_column_id,
457 name_mapping_json,
458 }
459}
460
461fn arrow_type_to_iceberg(dt: &arrow_schema::DataType, nested_id: &mut i32) -> serde_json::Value {
466 use arrow_schema::DataType;
467 match dt {
468 DataType::Boolean => serde_json::json!("boolean"),
469 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::UInt8 | DataType::UInt16 => {
470 serde_json::json!("int")
471 }
472 DataType::Int64 | DataType::UInt32 | DataType::UInt64 => serde_json::json!("long"),
473 DataType::Float16 | DataType::Float32 => serde_json::json!("float"),
474 DataType::Float64 => serde_json::json!("double"),
475 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => serde_json::json!("string"),
476 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
477 serde_json::json!("binary")
478 }
479 DataType::Date32 | DataType::Date64 => serde_json::json!("date"),
480 DataType::Timestamp(_, Some(_)) => serde_json::json!("timestamptz"),
482 DataType::Timestamp(_, None) => serde_json::json!("timestamp"),
483 DataType::Time32(_) | DataType::Time64(_) => serde_json::json!("time"),
484 DataType::FixedSizeBinary(n) => serde_json::json!(format!("fixed[{n}]")),
485 DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
486 serde_json::json!(format!("decimal({p}, {s})"))
487 }
488 DataType::List(inner)
489 | DataType::LargeList(inner)
490 | DataType::ListView(inner)
491 | DataType::FixedSizeList(inner, _) => {
492 *nested_id += 1;
493 let element_id = *nested_id;
494 let element_type = arrow_type_to_iceberg(inner.data_type(), nested_id);
495 serde_json::json!({
496 "type": "list",
497 "element-id": element_id,
498 "element": element_type,
499 "element-required": !inner.is_nullable(),
500 })
501 }
502 DataType::Struct(arrow_fields) => {
503 let struct_fields: Vec<serde_json::Value> = arrow_fields
504 .iter()
505 .map(|f| {
506 *nested_id += 1;
507 let fid = *nested_id;
508 let ftype = arrow_type_to_iceberg(f.data_type(), nested_id);
509 serde_json::json!({
510 "id": fid,
511 "name": f.name(),
512 "required": !f.is_nullable(),
513 "type": ftype,
514 })
515 })
516 .collect();
517 serde_json::json!({ "type": "struct", "fields": struct_fields })
518 }
519 DataType::Map(entries, _) => {
520 *nested_id += 1;
522 let key_id = *nested_id;
523 *nested_id += 1;
524 let val_id = *nested_id;
525 if let DataType::Struct(kv_fields) = entries.data_type() {
526 let key_f = kv_fields
527 .iter()
528 .find(|f| f.name() == "key" || f.name() == "keys");
529 let val_f = kv_fields
530 .iter()
531 .find(|f| f.name() == "value" || f.name() == "values");
532 let key_type = key_f
533 .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
534 .unwrap_or(serde_json::json!("binary"));
535 let val_type = val_f
536 .map(|f| arrow_type_to_iceberg(f.data_type(), nested_id))
537 .unwrap_or(serde_json::json!("binary"));
538 let val_required = val_f.map(|f| !f.is_nullable()).unwrap_or(false);
539 serde_json::json!({
540 "type": "map",
541 "key-id": key_id,
542 "key": key_type,
543 "value-id": val_id,
544 "value": val_type,
545 "value-required": val_required,
546 })
547 } else {
548 serde_json::json!("binary")
549 }
550 }
551 _ => serde_json::json!("binary"),
552 }
553}
554
555async fn build_and_patch_index(
557 store: Arc<dyn Store>,
558 catalog: Arc<dyn CatalogProvider>,
559 policy: VectorStoragePolicy,
560 table: TableIdent,
561 file_path: String,
562) -> AilakeResult<()> {
563 let parquet_bytes = store.get(&file_path).await?;
565 let reader = AilakeFileReader::new(parquet_bytes, &policy.column_name, policy.dim);
566 let (batch, embeddings) = reader.read_parquet()?;
567
568 let full_bytes = tokio::task::spawn_blocking({
571 let policy = policy.clone();
572 move || {
573 let file_writer = AilakeFileWriter::new(policy);
574 file_writer.write(&batch, &embeddings)
575 }
576 })
577 .await
578 .map_err(|e| ailake_core::AilakeError::Store(format!("spawn_blocking panic: {e}")))??;
579
580 let full_reader = AilakeFileReader::new(full_bytes.clone(), &policy.column_name, policy.dim);
582 let header = full_reader.read_header()?;
583 let ailk_start = full_reader.ailk_offset()?;
584 let hnsw_abs_offset = ailk_start + header.hnsw_offset;
585 let hnsw_len = header.hnsw_len;
586
587 store.put(&file_path, full_bytes).await?;
589
590 for _ in 0..120u32 {
593 match catalog.load_table(&table).await {
594 Ok(meta) if meta.current_snapshot_id.is_some() => break,
595 _ => tokio::time::sleep(std::time::Duration::from_millis(500)).await,
596 }
597 }
598
599 for attempt in 0..50u32 {
604 let table_meta = catalog.load_table(&table).await?;
605 let parent_snapshot_id = table_meta.current_snapshot_id;
606 let mut files = catalog.list_files(&table, None).await?;
607
608 if files
610 .iter()
611 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
612 {
613 break;
614 }
615
616 for f in &mut files {
617 if f.path == file_path {
618 f.hnsw_offset = Some(hnsw_abs_offset);
619 f.hnsw_len = Some(hnsw_len);
620 f.index_status = IndexStatus::Ready;
621 break;
622 }
623 }
624 catalog
625 .commit_snapshot(
626 &table,
627 NewSnapshot {
628 snapshot_id: new_snapshot_id(),
629 parent_snapshot_id,
630 files,
631 operation: SnapshotOperation::Replace,
632 iceberg_schema: None,
633 },
634 )
635 .await?;
636
637 tokio::time::sleep(std::time::Duration::from_millis(10 + attempt as u64 * 5)).await;
639
640 let verify = catalog.list_files(&table, None).await?;
641 if verify
642 .iter()
643 .any(|f| f.path == file_path && f.index_status == IndexStatus::Ready)
644 {
645 break;
646 }
647 }
649
650 eprintln!(
651 "[ailake] deferred HNSW built for {file_path} (offset={hnsw_abs_offset}, len={hnsw_len})"
652 );
653 Ok(())
654}
655
656#[cfg(test)]
657mod tests {
658 use super::*;
659 use ailake_core::{VectorMetric, VectorPrecision};
660 use arrow_schema::{DataType, Field, Schema, TimeUnit};
661
662 fn policy(col: &str, dim: u32) -> VectorStoragePolicy {
663 VectorStoragePolicy {
664 column_name: col.to_string(),
665 dim,
666 metric: VectorMetric::Cosine,
667 precision: VectorPrecision::F16,
668 pq: None,
669 keep_raw_for_reranking: false,
670 }
671 }
672
673 fn update_for(schema: &Schema, pol: &VectorStoragePolicy) -> IcebergSchemaUpdate {
674 arrow_schema_to_iceberg_update(schema, pol, &[])
675 }
676
677 #[test]
678 fn simple_schema_produces_correct_fields() {
679 let schema = Schema::new(vec![
680 Field::new("id", DataType::Int32, false),
681 Field::new("text", DataType::Utf8, false),
682 ]);
683 let pol = policy("embedding", 8);
684 let upd = update_for(&schema, &pol);
685
686 assert_eq!(upd.fields.len(), 3);
687 assert_eq!(upd.fields[0]["id"], 1);
688 assert_eq!(upd.fields[0]["type"], "int");
689 assert_eq!(upd.fields[1]["id"], 2);
690 assert_eq!(upd.fields[1]["type"], "string");
691 assert_eq!(upd.fields[2]["id"], 3);
692 assert_eq!(upd.fields[2]["type"], "fixed[16]"); let nm: Vec<serde_json::Value> = serde_json::from_str(&upd.name_mapping_json).unwrap();
695 assert_eq!(nm.len(), 3);
696 assert_eq!(nm[2]["field-id"], 3);
697 assert_eq!(nm[2]["names"][0], "embedding");
698 assert_eq!(upd.last_column_id, 3);
699 }
700
701 #[test]
702 fn timestamp_without_tz_maps_to_timestamp_not_timestamptz() {
703 let schema = Schema::new(vec![
704 Field::new(
705 "created_at",
706 DataType::Timestamp(TimeUnit::Microsecond, None),
707 true,
708 ),
709 Field::new(
710 "updated_at",
711 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
712 true,
713 ),
714 ]);
715 let pol = policy("vec", 4);
716 let upd = update_for(&schema, &pol);
717
718 assert_eq!(upd.fields[0]["type"], "timestamp");
719 assert_eq!(upd.fields[1]["type"], "timestamptz");
720 }
721
722 #[test]
723 fn list_type_produces_iceberg_list_object() {
724 let schema = Schema::new(vec![Field::new(
725 "tags",
726 DataType::List(std::sync::Arc::new(Field::new(
727 "item",
728 DataType::Utf8,
729 true,
730 ))),
731 true,
732 )]);
733 let pol = policy("vec", 4);
734 let upd = update_for(&schema, &pol);
735
736 let t = &upd.fields[0]["type"];
737 assert_eq!(t["type"], "list");
738 assert_eq!(t["element"], "string");
739 assert!(t["element-id"].as_i64().unwrap() > 2);
741 }
742
743 #[test]
744 fn struct_type_produces_nested_fields() {
745 let schema = Schema::new(vec![Field::new(
746 "meta",
747 DataType::Struct(
748 vec![
749 Field::new("key", DataType::Utf8, false),
750 Field::new("val", DataType::Int64, false),
751 ]
752 .into(),
753 ),
754 true,
755 )]);
756 let pol = policy("vec", 4);
757 let upd = update_for(&schema, &pol);
758
759 let t = &upd.fields[0]["type"];
760 assert_eq!(t["type"], "struct");
761 let nested = t["fields"].as_array().unwrap();
762 assert_eq!(nested.len(), 2);
763 assert_eq!(nested[0]["name"], "key");
764 assert_eq!(nested[0]["type"], "string");
765 assert_eq!(nested[1]["name"], "val");
766 assert_eq!(nested[1]["type"], "long");
767 assert!(nested[0]["id"].as_i64().unwrap() > 2);
769 }
770
771 #[test]
772 fn no_duplicate_vec_column_when_already_in_batch() {
773 let schema = Schema::new(vec![
775 Field::new("id", DataType::Int32, false),
776 Field::new("embedding", DataType::FixedSizeBinary(16), false),
777 ]);
778 let pol = policy("embedding", 8);
779 let upd = update_for(&schema, &pol);
780
781 assert_eq!(upd.fields.len(), 2, "should not add embedding twice");
782 let names: Vec<&str> = upd
783 .fields
784 .iter()
785 .map(|f| f["name"].as_str().unwrap())
786 .collect();
787 assert_eq!(names.iter().filter(|&&n| n == "embedding").count(), 1);
788 }
789
790 #[test]
791 fn multi_vec_policies_all_appended() {
792 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
793 let primary = policy("embedding", 4);
794 let extra = vec![policy("context_embedding", 4)];
795 let upd = arrow_schema_to_iceberg_update(&schema, &primary, &extra);
796
797 assert_eq!(upd.fields.len(), 3); let names: Vec<&str> = upd
799 .fields
800 .iter()
801 .map(|f| f["name"].as_str().unwrap())
802 .collect();
803 assert!(names.contains(&"embedding"));
804 assert!(names.contains(&"context_embedding"));
805 }
806
807 #[test]
808 fn top_level_field_ids_match_parquet_stamp_sequence() {
809 let schema = Schema::new(vec![
811 Field::new("id", DataType::Int64, false),
812 Field::new(
813 "tags",
814 DataType::List(std::sync::Arc::new(Field::new(
815 "item",
816 DataType::Utf8,
817 true,
818 ))),
819 true,
820 ),
821 ]);
822 let pol = policy("vec", 4);
823 let upd = update_for(&schema, &pol);
824
825 assert_eq!(upd.fields[0]["id"], 1);
827 assert_eq!(upd.fields[1]["id"], 2);
828 assert_eq!(upd.fields[2]["id"], 3);
829
830 assert!(upd.fields[1]["type"]["element-id"].as_i64().unwrap() > 3);
832 }
833}