use apache_avro::types::Value;
use bytes::Bytes;
use crate::provider::{DataFileEntry, IndexStatus, PartitionField, PartitionSpec, SnapshotId};
const MANIFEST_ENTRY_SCHEMA_STR: &str = r#"{
"type": "record",
"name": "manifest_entry",
"fields": [
{"name": "status", "type": "int", "field-id": 0},
{"name": "snapshot_id", "type": ["null", "long"], "default": null, "field-id": 1},
{"name": "sequence_number", "type": ["null", "long"], "default": null, "field-id": 3},
{"name": "file_sequence_number","type": ["null", "long"], "default": null, "field-id": 4},
{"name": "data_file", "type": {
"type": "record", "name": "r2",
"fields": [
{"name": "content", "type": "int", "field-id": 134, "doc": "0=DATA"},
{"name": "file_path", "type": "string", "field-id": 100},
{"name": "file_format", "type": "string", "field-id": 101},
{"name": "partition", "type": {"type": "record", "name": "r102", "fields": []}, "field-id": 102},
{"name": "record_count", "type": "long", "field-id": 103},
{"name": "file_size_in_bytes","type": "long", "field-id": 104},
{"name": "column_sizes", "type": ["null", {"type": "array", "logicalType": "map", "items": {"type":"record","name":"k117_v118","fields":[{"name":"key","type":"int","field-id":117},{"name":"value","type":"long","field-id":118}]},"element-id":119}], "default": null, "field-id": 108},
{"name": "value_counts", "type": ["null", {"type": "array", "logicalType": "map", "items": {"type":"record","name":"k119_v120","fields":[{"name":"key","type":"int","field-id":119},{"name":"value","type":"long","field-id":120}]},"element-id":121}], "default": null, "field-id": 109},
{"name": "null_value_counts", "type": ["null", {"type": "array", "logicalType": "map", "items": {"type":"record","name":"k121_v122","fields":[{"name":"key","type":"int","field-id":121},{"name":"value","type":"long","field-id":122}]},"element-id":123}], "default": null, "field-id": 110},
{"name": "nan_value_counts", "type": ["null", {"type": "array", "logicalType": "map", "items": {"type":"record","name":"k138_v139","fields":[{"name":"key","type":"int","field-id":138},{"name":"value","type":"long","field-id":139}]},"element-id":140}], "default": null, "field-id": 137},
{"name": "lower_bounds", "type": ["null", {"type": "array", "logicalType": "map", "items": {"type":"record","name":"k126_v127","fields":[{"name":"key","type":"int","field-id":126},{"name":"value","type":"bytes","field-id":127}]},"element-id":128}], "default": null, "field-id": 125},
{"name": "upper_bounds", "type": ["null", {"type": "array", "logicalType": "map", "items": {"type":"record","name":"k129_v130","fields":[{"name":"key","type":"int","field-id":129},{"name":"value","type":"bytes","field-id":130}]},"element-id":131}], "default": null, "field-id": 128},
{"name": "key_metadata", "type": ["null", "bytes"], "default": null, "field-id": 131},
{"name": "split_offsets", "type": ["null", {"type": "array", "items": "long", "element-id": 133}], "default": null, "field-id": 132},
{"name": "equality_ids", "type": ["null", {"type": "array", "items": "int", "element-id": 136}], "default": null, "field-id": 135},
{"name": "sort_order_id", "type": ["null", "int"], "default": null, "field-id": 140},
{"name": "first_row_id", "type": ["null", "long"], "default": null, "field-id": 141}
]
}, "field-id": 2}
]
}"#;
const MANIFEST_LIST_SCHEMA_STR: &str = r#"{
"type": "record",
"name": "manifest_file",
"fields": [
{"name": "manifest_path", "type": "string", "field-id": 500},
{"name": "manifest_length", "type": "long", "field-id": 501},
{"name": "partition_spec_id", "type": "int", "field-id": 502},
{"name": "content", "type": "int", "field-id": 517, "doc": "0=DATA"},
{"name": "sequence_number", "type": "long", "field-id": 515},
{"name": "min_sequence_number", "type": "long", "field-id": 516},
{"name": "added_snapshot_id", "type": "long", "field-id": 503},
{"name": "added_data_files_count", "type": "int", "field-id": 504},
{"name": "existing_data_files_count", "type": "int", "field-id": 505},
{"name": "deleted_data_files_count", "type": "int", "field-id": 506},
{"name": "added_rows_count", "type": "long", "field-id": 512},
{"name": "existing_rows_count", "type": "long", "field-id": 513},
{"name": "deleted_rows_count", "type": "long", "field-id": 514},
{"name": "partitions", "type": {
"type": "array",
"items": {
"type": "record", "name": "r508",
"fields": [
{"name": "contains_null", "type": "boolean", "field-id": 509},
{"name": "contains_nan", "type": ["null", "boolean"], "default": null, "field-id": 518},
{"name": "lower_bound", "type": ["null", "bytes"], "default": null, "field-id": 510},
{"name": "upper_bound", "type": ["null", "bytes"], "default": null, "field-id": 511}
]
},
"element-id": 508
}, "field-id": 507}
]
}"#;
fn build_partition_record_schema(spec: Option<&PartitionSpec>) -> String {
let fields: Vec<String> = spec
.map(|s| s.fields.as_slice())
.unwrap_or(&[])
.iter()
.map(|f| {
let avro_type = match f.source_type.as_str() {
"int" | "integer" => "int",
"long" => "long",
_ => "string", };
format!(
r#"{{"name":"{name}","type":["null","{avro_type}"],"default":null,"field-id":{fid}}}"#,
name = f.name,
avro_type = avro_type,
fid = f.field_id
)
})
.collect();
format!(
r#"{{"type":"record","name":"r102","fields":[{}]}}"#,
fields.join(",")
)
}
pub fn build_manifest_entry_schema(spec: Option<&PartitionSpec>) -> String {
let partition_record = build_partition_record_schema(spec);
MANIFEST_ENTRY_SCHEMA_STR.replace(
r#"{"type": "record", "name": "r102", "fields": []}"#,
&partition_record,
)
}
fn encode_partition_value(field: &PartitionField, value: Option<&str>, buf: &mut Vec<u8>) {
use crate::avro_raw::{encode_long, encode_string};
match value {
None => encode_long(0, buf), Some(v) => {
encode_long(1, buf); match field.source_type.as_str() {
"int" | "integer" => {
if let Ok(n) = v.parse::<i32>() {
encode_long(n as i64, buf); } else {
let last = buf.len() - 1;
buf[last] = 0x00; }
}
"long" => {
if let Ok(n) = v.parse::<i64>() {
encode_long(n, buf);
} else {
let last = buf.len() - 1;
buf[last] = 0x00;
}
}
_ => encode_string(v, buf), }
}
}
}
pub fn write_manifest_file(
files: &[DataFileEntry],
snapshot_id: SnapshotId,
sequence_number: i64,
table_schema_json: &str,
partition_spec_json: &str,
format_version: u8,
partition_spec: Option<&PartitionSpec>,
) -> Bytes {
use crate::avro_raw::{
encode_empty_array, encode_int, encode_long, encode_string, encode_union_bytes,
encode_union_long, encode_union_null, write_avro_container,
};
let mut records: Vec<Vec<u8>> = Vec::with_capacity(files.len());
for f in files {
let mut rec = Vec::new();
encode_int(1, &mut rec); encode_union_long(1, snapshot_id, &mut rec); encode_union_long(1, sequence_number, &mut rec); encode_union_long(1, sequence_number, &mut rec); encode_int(0, &mut rec); encode_string(&f.path, &mut rec); encode_string("PARQUET", &mut rec); if let Some(spec) = partition_spec {
if !spec.fields.is_empty() {
let raw = f.partition_value.as_deref().unwrap_or("");
let parts: Vec<&str> = raw.split('\x1f').collect();
for (i, field) in spec.fields.iter().enumerate() {
let val = parts.get(i).copied().filter(|s| !s.is_empty());
encode_partition_value(field, val, &mut rec);
}
}
}
encode_long(f.record_count as i64, &mut rec); encode_long(f.file_size_bytes as i64, &mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); let ext = AilakeEntryExt {
centroid_b64: f.centroid_b64.clone(),
radius: f.radius,
hnsw_offset: f.hnsw_offset,
hnsw_len: f.hnsw_len,
vector_column: f.vector_column.clone(),
vector_dim: f.vector_dim,
extra_vector_indexes: f.extra_vector_indexes.clone(),
index_status: f.index_status.clone(),
batch_id: f.batch_id.clone(),
embedding_model: f.embedding_model.clone(),
partition_value: f.partition_value.clone(),
deletion_vector: f.deletion_vector.clone(),
first_row_id: f.first_row_id,
};
match serde_json::to_vec(&ext) {
Ok(bytes) => encode_union_bytes(1, &bytes, &mut rec), Err(_) => encode_union_null(&mut rec), }
encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); match f.first_row_id {
Some(id) => encode_union_long(1, id, &mut rec),
None => encode_union_null(&mut rec),
} let _ = encode_empty_array; records.push(rec);
}
let fv_str: &[u8] = if format_version >= 3 { b"3" } else { b"2" };
let spec_id_str = partition_spec
.map(|s| s.spec_id.to_string())
.unwrap_or_else(|| "0".to_string());
let extra_meta: &[(&str, &[u8])] = &[
("schema", table_schema_json.as_bytes()),
("partition-spec", partition_spec_json.as_bytes()),
("partition-spec-id", spec_id_str.as_bytes()),
("format-version", fv_str),
("content", b"data"),
];
let schema_str = build_manifest_entry_schema(partition_spec);
Bytes::from(write_avro_container(&schema_str, extra_meta, &records))
}
pub fn write_manifest_list(
manifest_path: &str,
manifest_bytes: usize,
snapshot_id: SnapshotId,
sequence_number: i64,
added_rows: i64,
) -> Bytes {
write_manifest_list_multi(
&[(manifest_path.to_string(), manifest_bytes as i64)],
snapshot_id,
sequence_number,
added_rows,
)
}
pub fn write_manifest_list_multi(
manifests: &[(String, i64)],
snapshot_id: SnapshotId,
sequence_number: i64,
added_rows: i64,
) -> Bytes {
use crate::avro_raw::{
encode_empty_array, encode_int, encode_long, encode_string, write_avro_container,
};
let n = manifests.len();
let mut records: Vec<Vec<u8>> = Vec::with_capacity(n);
for (i, (path, len)) in manifests.iter().enumerate() {
let rows = if i + 1 == n { added_rows } else { 0i64 };
let mut rec = Vec::new();
encode_string(path, &mut rec); encode_long(*len, &mut rec); encode_int(0, &mut rec); encode_int(0, &mut rec); encode_long(sequence_number, &mut rec); encode_long(sequence_number, &mut rec); encode_long(snapshot_id, &mut rec); encode_int(1, &mut rec); encode_int(0, &mut rec); encode_int(0, &mut rec); encode_long(rows, &mut rec); encode_long(0, &mut rec); encode_long(0, &mut rec); encode_empty_array(&mut rec); records.push(rec);
}
Bytes::from(write_avro_container(
MANIFEST_LIST_SCHEMA_STR,
&[],
&records,
))
}
pub fn read_manifest_file(data: &[u8]) -> apache_avro::AvroResult<Vec<DataFileEntry>> {
let reader = apache_avro::Reader::new(data)?;
let mut results = Vec::new();
for value in reader {
let value = value?;
if let Value::Record(fields) = value {
let key_meta_bytes: Option<Vec<u8>> = fields
.iter()
.find(|(k, _)| k == "data_file")
.and_then(|(_, v)| {
if let Value::Record(df_fields) = v {
df_fields
.iter()
.find(|(k, _)| k == "key_metadata")
.and_then(|(_, v)| match v {
Value::Union(_, inner) => {
if let Value::Bytes(b) = inner.as_ref() {
Some(b.clone())
} else {
None
}
}
Value::Bytes(b) => Some(b.clone()),
_ => None,
})
} else {
None
}
});
let data_file = fields
.iter()
.find(|(k, _)| k == "data_file")
.map(|(_, v)| v);
if let Some(Value::Record(df_fields)) = data_file {
let path = df_fields
.iter()
.find(|(k, _)| k == "file_path")
.and_then(|(_, v)| {
if let Value::String(s) = v {
Some(s.clone())
} else {
None
}
});
let record_count = df_fields
.iter()
.find(|(k, _)| k == "record_count")
.and_then(|(_, v)| {
if let Value::Long(n) = v {
Some(*n as u64)
} else {
None
}
})
.unwrap_or(0);
let file_size_bytes = df_fields
.iter()
.find(|(k, _)| k == "file_size_in_bytes")
.and_then(|(_, v)| {
if let Value::Long(n) = v {
Some(*n as u64)
} else {
None
}
})
.unwrap_or(0);
if let Some(path) = path {
let ext: Option<AilakeEntryExt> = key_meta_bytes
.as_deref()
.and_then(|b| serde_json::from_slice(b).ok());
let native_dv = parse_v3_deletion_vector(df_fields);
let native_first_row_id = parse_v3_first_row_id(df_fields);
let native_partition_value = df_fields
.iter()
.find(|(k, _)| k == "partition")
.and_then(|(_, v)| {
if let Value::Record(parts) = v {
if parts.is_empty() {
return None;
}
let vals: Vec<String> = parts
.iter()
.filter_map(|(_, pv)| match pv {
Value::Union(idx, inner) if *idx > 0 => {
match inner.as_ref() {
Value::String(s) => Some(s.clone()),
Value::Int(n) => Some(n.to_string()),
Value::Long(n) => Some(n.to_string()),
_ => None,
}
}
Value::String(s) => Some(s.clone()),
Value::Int(n) => Some(n.to_string()),
Value::Long(n) => Some(n.to_string()),
_ => None,
})
.collect();
if vals.is_empty() {
None
} else if vals.len() == 1 {
Some(vals.into_iter().next().unwrap())
} else {
Some(vals.join("\x1f"))
}
} else {
None
}
});
results.push(DataFileEntry {
path,
record_count,
file_size_bytes,
centroid_b64: ext.as_ref().and_then(|e| e.centroid_b64.clone()),
radius: ext.as_ref().and_then(|e| e.radius),
hnsw_offset: ext.as_ref().and_then(|e| e.hnsw_offset),
hnsw_len: ext.as_ref().and_then(|e| e.hnsw_len),
vector_column: ext.as_ref().and_then(|e| e.vector_column.clone()),
vector_dim: ext.as_ref().and_then(|e| e.vector_dim),
extra_vector_indexes: ext
.as_ref()
.map(|e| e.extra_vector_indexes.clone())
.unwrap_or_default(),
index_status: ext
.as_ref()
.map(|e| e.index_status.clone())
.unwrap_or_default(),
batch_id: ext.as_ref().and_then(|e| e.batch_id.clone()),
embedding_model: ext.as_ref().and_then(|e| e.embedding_model.clone()),
partition_value: native_partition_value
.or_else(|| ext.as_ref().and_then(|e| e.partition_value.clone())),
deletion_vector: ext
.as_ref()
.and_then(|e| e.deletion_vector.clone())
.or(native_dv),
first_row_id: native_first_row_id
.or_else(|| ext.as_ref().and_then(|e| e.first_row_id)),
});
}
}
}
}
Ok(results)
}
#[derive(serde::Serialize, serde::Deserialize)]
struct AilakeEntryExt {
#[serde(skip_serializing_if = "Option::is_none")]
pub centroid_b64: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub radius: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub hnsw_offset: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub hnsw_len: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_column: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_dim: Option<u32>,
#[serde(default)]
pub extra_vector_indexes: Vec<crate::provider::ExtraVectorIndex>,
#[serde(default)]
pub index_status: IndexStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub batch_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub embedding_model: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub partition_value: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deletion_vector: Option<crate::provider::DeletionVector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub first_row_id: Option<i64>,
}
fn parse_v3_deletion_vector(
df_fields: &[(String, Value)],
) -> Option<crate::provider::DeletionVector> {
let dv_val = df_fields.iter().find(|(k, _)| k == "deletion_vector")?;
let dv_record = match &dv_val.1 {
Value::Union(_, inner) => {
if let Value::Record(fields) = inner.as_ref() {
fields
} else {
return None;
}
}
Value::Record(fields) => fields,
_ => return None,
};
let get_str = |name: &str| {
dv_record
.iter()
.find(|(k, _)| k == name)
.and_then(|(_, v)| {
if let Value::String(s) = v {
Some(s.clone())
} else {
None
}
})
};
let get_long = |name: &str| {
dv_record
.iter()
.find(|(k, _)| k == name)
.and_then(|(_, v)| {
if let Value::Long(n) = v {
Some(*n)
} else {
None
}
})
};
let path = get_str("path")?;
let offset = get_long("offset")? as u64;
let length = get_long("length")? as u64;
let cardinality = get_long("cardinality").unwrap_or(-1);
Some(crate::provider::DeletionVector {
path,
offset,
length,
cardinality,
})
}
fn parse_v3_first_row_id(df_fields: &[(String, Value)]) -> Option<i64> {
df_fields
.iter()
.find(|(k, _)| k == "first_row_id")
.and_then(|(_, v)| match v {
Value::Union(_, inner) => {
if let Value::Long(n) = inner.as_ref() {
Some(*n)
} else {
None
}
}
Value::Long(n) => Some(*n),
_ => None,
})
}
pub fn read_manifest_list(data: &[u8]) -> apache_avro::AvroResult<Vec<String>> {
Ok(read_manifest_list_typed(data)?
.into_iter()
.map(|(p, _)| p)
.collect())
}
pub fn read_manifest_list_typed(data: &[u8]) -> apache_avro::AvroResult<Vec<(String, i32)>> {
let reader = apache_avro::Reader::new(data)?;
let mut results = Vec::new();
for value in reader {
let value = value?;
if let Value::Record(fields) = value {
let path = fields
.iter()
.find(|(k, _)| k == "manifest_path")
.and_then(|(_, v)| {
if let Value::String(s) = v {
Some(s.clone())
} else {
None
}
});
let content: i32 = fields
.iter()
.find(|(k, _)| k == "content")
.and_then(|(_, v)| {
if let Value::Int(n) = v {
Some(*n)
} else {
None
}
})
.unwrap_or(0);
if let Some(p) = path {
results.push((p, content));
}
}
}
Ok(results)
}
pub fn write_manifest_list_multi_typed(
manifests: &[(String, i64, i32)],
snapshot_id: SnapshotId,
sequence_number: i64,
added_rows: i64,
) -> Bytes {
use crate::avro_raw::{
encode_empty_array, encode_int, encode_long, encode_string, write_avro_container,
};
let n = manifests.len();
let mut records: Vec<Vec<u8>> = Vec::with_capacity(n);
for (i, (path, len, content)) in manifests.iter().enumerate() {
let rows = if i + 1 == n { added_rows } else { 0i64 };
let mut rec = Vec::new();
encode_string(path, &mut rec);
encode_long(*len, &mut rec);
encode_int(0, &mut rec); encode_int(*content, &mut rec); encode_long(sequence_number, &mut rec);
encode_long(sequence_number, &mut rec); encode_long(snapshot_id, &mut rec);
encode_int(1, &mut rec); encode_int(0, &mut rec); encode_int(0, &mut rec); encode_long(rows, &mut rec);
encode_long(0, &mut rec);
encode_long(0, &mut rec);
encode_empty_array(&mut rec); records.push(rec);
}
Bytes::from(write_avro_container(
MANIFEST_LIST_SCHEMA_STR,
&[],
&records,
))
}
pub fn write_equality_delete_manifest(
deletes: &[crate::provider::EqualityDeleteFile],
snapshot_id: SnapshotId,
sequence_number: i64,
) -> Bytes {
use crate::avro_raw::{
encode_int, encode_long, encode_string, encode_union_long, encode_union_null,
write_avro_container,
};
let mut records: Vec<Vec<u8>> = Vec::with_capacity(deletes.len());
for d in deletes {
let mut rec = Vec::new();
encode_int(1, &mut rec); encode_union_long(1, snapshot_id, &mut rec); encode_union_long(1, sequence_number, &mut rec); encode_union_long(1, sequence_number, &mut rec); encode_int(2, &mut rec); encode_string(&d.path, &mut rec); encode_string("AVRO", &mut rec); encode_long(d.record_count as i64, &mut rec); encode_long(d.file_size_bytes as i64, &mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); encode_long(1, &mut rec); encode_long(d.equality_ids.len() as i64, &mut rec); for &id in &d.equality_ids {
encode_int(id, &mut rec);
}
encode_long(0, &mut rec); encode_union_null(&mut rec); encode_union_null(&mut rec); records.push(rec);
}
let schema_json = r#"{"schema-id":0,"type":"struct","fields":[]}"#;
let partition_spec = r#"[{"spec-id":0,"fields":[]}]"#;
let extra_meta: &[(&str, &[u8])] = &[
("schema", schema_json.as_bytes()),
("partition-spec", partition_spec.as_bytes()),
("partition-spec-id", b"0"),
("format-version", b"2"),
("content", b"deletes"),
];
Bytes::from(write_avro_container(
MANIFEST_ENTRY_SCHEMA_STR,
extra_meta,
&records,
))
}
pub fn read_equality_delete_manifest(
data: &[u8],
) -> apache_avro::AvroResult<Vec<crate::provider::EqualityDeleteFile>> {
let reader = apache_avro::Reader::new(data)?;
let mut results = Vec::new();
for value in reader {
let value = value?;
if let Value::Record(fields) = value {
let data_file = fields
.iter()
.find(|(k, _)| k == "data_file")
.map(|(_, v)| v);
if let Some(Value::Record(df_fields)) = data_file {
let content: i32 = df_fields
.iter()
.find(|(k, _)| k == "content")
.and_then(|(_, v)| {
if let Value::Int(n) = v {
Some(*n)
} else {
None
}
})
.unwrap_or(0);
if content != 2 {
continue;
}
let path = df_fields
.iter()
.find(|(k, _)| k == "file_path")
.and_then(|(_, v)| {
if let Value::String(s) = v {
Some(s.clone())
} else {
None
}
});
let record_count = df_fields
.iter()
.find(|(k, _)| k == "record_count")
.and_then(|(_, v)| {
if let Value::Long(n) = v {
Some(*n as u64)
} else {
None
}
})
.unwrap_or(0);
let file_size_bytes = df_fields
.iter()
.find(|(k, _)| k == "file_size_in_bytes")
.and_then(|(_, v)| {
if let Value::Long(n) = v {
Some(*n as u64)
} else {
None
}
})
.unwrap_or(0);
let equality_ids = df_fields
.iter()
.find(|(k, _)| k == "equality_ids")
.and_then(|(_, v)| match v {
Value::Union(_, inner) => {
if let Value::Array(arr) = inner.as_ref() {
Some(
arr.iter()
.filter_map(|item| {
if let Value::Int(n) = item {
Some(*n)
} else {
None
}
})
.collect(),
)
} else {
None
}
}
Value::Array(arr) => Some(
arr.iter()
.filter_map(|item| {
if let Value::Int(n) = item {
Some(*n)
} else {
None
}
})
.collect(),
),
_ => None,
})
.unwrap_or_default();
if let Some(path) = path {
results.push(crate::provider::EqualityDeleteFile {
path,
equality_ids,
record_count,
file_size_bytes,
});
}
}
}
}
Ok(results)
}
pub fn write_equality_delete_avro(
col_name: &str,
field_id: i32,
iceberg_type: &str,
values: &[&str],
) -> apache_avro::AvroResult<Bytes> {
let avro_type = match iceberg_type.trim() {
"int" | "integer" => "int",
"long" => "long",
"float" => "float",
"double" => "double",
_ => "string",
};
let schema_str = format!(
r#"{{"type":"record","name":"eq_delete_entry","fields":[{{"name":"{col_name}","type":"{avro_type}","field-id":{field_id}}}]}}"#
);
let schema = apache_avro::Schema::parse_str(&schema_str)?;
let mut writer = apache_avro::Writer::new(&schema, Vec::new());
for val in values {
use apache_avro::types::Value as AV;
let avro_val = match avro_type {
"int" => val
.parse::<i32>()
.map(AV::Int)
.unwrap_or(AV::String(val.to_string())),
"long" => val
.parse::<i64>()
.map(AV::Long)
.unwrap_or(AV::String(val.to_string())),
"float" => val
.parse::<f32>()
.map(AV::Float)
.unwrap_or(AV::String(val.to_string())),
"double" => val
.parse::<f64>()
.map(AV::Double)
.unwrap_or(AV::String(val.to_string())),
_ => AV::String(val.to_string()),
};
let record = AV::Record(vec![(col_name.to_string(), avro_val)]);
writer.append(record)?;
}
Ok(Bytes::from(writer.into_inner()?))
}
pub fn read_equality_delete_values(data: &[u8]) -> apache_avro::AvroResult<Vec<(String, String)>> {
let reader = apache_avro::Reader::new(data)?;
let mut results = Vec::new();
for value in reader {
let value = value?;
if let Value::Record(fields) = value {
for (col, val) in fields {
let s = match &val {
Value::String(s) => s.clone(),
Value::Int(n) => n.to_string(),
Value::Long(n) => n.to_string(),
Value::Float(f) => f.to_string(),
Value::Double(d) => d.to_string(),
Value::Bytes(b) => String::from_utf8_lossy(b).into_owned(),
_ => continue,
};
results.push((col, s));
}
}
}
Ok(results)
}
pub fn write_partition_stats_parquet(
partition_spec: &PartitionSpec,
data_files: &[DataFileEntry],
) -> ailake_core::AilakeResult<bytes::Bytes> {
use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::{Int64Array, StringArray, StructArray};
use arrow_schema::{DataType, Field, Fields, Schema};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
let part_field = match partition_spec.fields.first() {
Some(f) => f,
None => {
return Err(ailake_core::AilakeError::Catalog(
"write_partition_stats_parquet called on empty spec".into(),
))
}
};
let mut agg: HashMap<String, (i64, i64, i64)> = HashMap::new();
for f in data_files {
let key = f.partition_value.clone().unwrap_or_default();
let e = agg.entry(key).or_insert((0, 0, 0));
e.0 += f.record_count as i64;
e.1 += 1;
e.2 += f.file_size_bytes as i64;
}
let n = agg.len();
let mut part_vals: Vec<Option<&str>> = Vec::with_capacity(n);
let mut record_counts: Vec<i64> = Vec::with_capacity(n);
let mut file_counts: Vec<i64> = Vec::with_capacity(n);
let mut total_sizes: Vec<i64> = Vec::with_capacity(n);
let mut sorted: Vec<(&String, &(i64, i64, i64))> = agg.iter().collect();
sorted.sort_by_key(|(k, _)| k.as_str());
for (key, (rc, fc, ts)) in &sorted {
part_vals.push(if key.is_empty() {
None
} else {
Some(key.as_str())
});
record_counts.push(*rc);
file_counts.push(*fc);
total_sizes.push(*ts);
}
let part_col_field = Field::new(&part_field.name, DataType::Utf8, true);
let part_struct_field = Field::new(
"partition",
DataType::Struct(Fields::from(vec![part_col_field.clone()])),
false,
);
let part_string_arr = Arc::new(StringArray::from(part_vals)) as Arc<dyn arrow_array::Array>;
let partition_arr = Arc::new(StructArray::new(
Fields::from(vec![part_col_field]),
vec![part_string_arr],
None,
)) as Arc<dyn arrow_array::Array>;
let schema = Arc::new(Schema::new(vec![
part_struct_field,
Field::new("record_count", DataType::Int64, false),
Field::new("file_count", DataType::Int64, false),
Field::new("total_size_bytes", DataType::Int64, false),
]));
let batch = arrow_array::RecordBatch::try_new(
schema.clone(),
vec![
partition_arr,
Arc::new(Int64Array::from(record_counts)),
Arc::new(Int64Array::from(file_counts)),
Arc::new(Int64Array::from(total_sizes)),
],
)
.map_err(|e| ailake_core::AilakeError::Catalog(e.to_string()))?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut buf: Vec<u8> = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(props))
.map_err(|e| ailake_core::AilakeError::Catalog(e.to_string()))?;
writer
.write(&batch)
.map_err(|e| ailake_core::AilakeError::Catalog(e.to_string()))?;
writer
.close()
.map_err(|e| ailake_core::AilakeError::Catalog(e.to_string()))?;
Ok(bytes::Bytes::from(buf))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::provider::{DataFileEntry, IndexStatus};
#[test]
fn manifest_list_roundtrip() {
let bytes = write_manifest_list("warehouse/ns.db/t/metadata/m0.avro", 512, 42, 1, 10);
let paths = read_manifest_list(&bytes).expect("read_manifest_list failed");
assert_eq!(paths, vec!["warehouse/ns.db/t/metadata/m0.avro"]);
}
#[test]
fn manifest_file_roundtrip() {
let file = DataFileEntry {
path: "data/part-0.parquet".to_string(),
record_count: 5,
file_size_bytes: 1024,
centroid_b64: None,
radius: Some(0.1),
hnsw_offset: Some(100),
hnsw_len: Some(50),
vector_column: Some("emb".to_string()),
vector_dim: Some(4),
extra_vector_indexes: vec![],
index_status: IndexStatus::Ready,
batch_id: None,
embedding_model: None,
partition_value: None,
deletion_vector: None,
first_row_id: None,
};
let schema_json = r#"{"schema-id":0,"type":"struct","fields":[]}"#;
let partition_spec = r#"[{"spec-id":0,"fields":[]}]"#;
let bytes = write_manifest_file(&[file], 99, 1, schema_json, partition_spec, 2, None);
let entries = read_manifest_file(&bytes).expect("read_manifest_file failed");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].path, "data/part-0.parquet");
assert_eq!(entries[0].record_count, 5);
assert_eq!(entries[0].hnsw_offset, Some(100));
}
#[test]
fn batch_id_roundtrip() {
let file = DataFileEntry {
path: "data/part-1.parquet".to_string(),
record_count: 100,
file_size_bytes: 4096,
centroid_b64: None,
radius: None,
hnsw_offset: Some(200),
hnsw_len: Some(80),
vector_column: Some("embedding".to_string()),
vector_dim: Some(4),
extra_vector_indexes: vec![],
index_status: IndexStatus::Ready,
batch_id: Some("dag_run_2026-05-28_taskA".to_string()),
embedding_model: None,
partition_value: None,
deletion_vector: None,
first_row_id: None,
};
let schema_json = r#"{"schema-id":0,"type":"struct","fields":[]}"#;
let partition_spec = r#"[{"spec-id":0,"fields":[]}]"#;
let bytes = write_manifest_file(&[file], 42, 1, schema_json, partition_spec, 2, None);
let entries = read_manifest_file(&bytes).expect("read_manifest_file failed");
assert_eq!(
entries[0].batch_id.as_deref(),
Some("dag_run_2026-05-28_taskA")
);
}
#[test]
fn first_row_id_roundtrip_v3() {
let file = DataFileEntry {
path: "data/part-2.parquet".to_string(),
record_count: 200,
file_size_bytes: 8192,
centroid_b64: None,
radius: None,
hnsw_offset: Some(300),
hnsw_len: Some(100),
vector_column: Some("embedding".to_string()),
vector_dim: Some(4),
extra_vector_indexes: vec![],
index_status: IndexStatus::Ready,
batch_id: None,
embedding_model: None,
partition_value: None,
deletion_vector: None,
first_row_id: Some(5000),
};
let schema_json = r#"{"schema-id":0,"type":"struct","fields":[]}"#;
let partition_spec = r#"[{"spec-id":0,"fields":[]}]"#;
let bytes = write_manifest_file(&[file], 77, 1, schema_json, partition_spec, 3, None);
let entries = read_manifest_file(&bytes).expect("read_manifest_file failed");
assert_eq!(entries[0].first_row_id, Some(5000));
}
#[test]
fn first_row_id_none_for_v2() {
let file = DataFileEntry {
path: "data/part-3.parquet".to_string(),
record_count: 100,
file_size_bytes: 4096,
centroid_b64: None,
radius: None,
hnsw_offset: None,
hnsw_len: None,
vector_column: None,
vector_dim: None,
extra_vector_indexes: vec![],
index_status: IndexStatus::Ready,
batch_id: None,
embedding_model: None,
partition_value: None,
deletion_vector: None,
first_row_id: None,
};
let schema_json = r#"{"schema-id":0,"type":"struct","fields":[]}"#;
let partition_spec = r#"[{"spec-id":0,"fields":[]}]"#;
let bytes = write_manifest_file(&[file], 88, 1, schema_json, partition_spec, 2, None);
let entries = read_manifest_file(&bytes).expect("read_manifest_file failed");
assert_eq!(entries[0].first_row_id, None);
}
#[test]
fn equality_delete_manifest_roundtrip() {
use crate::provider::EqualityDeleteFile;
let del = EqualityDeleteFile {
path: "metadata/eq-del-001.avro".to_string(),
equality_ids: vec![5, 9],
record_count: 3,
file_size_bytes: 512,
};
let bytes = write_equality_delete_manifest(&[del], 42, 1);
let entries =
read_equality_delete_manifest(&bytes).expect("read_equality_delete_manifest failed");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].path, "metadata/eq-del-001.avro");
assert_eq!(entries[0].equality_ids, vec![5, 9]);
assert_eq!(entries[0].record_count, 3);
assert_eq!(entries[0].file_size_bytes, 512);
}
#[test]
fn read_manifest_list_typed_returns_content() {
let manifests = vec![
("metadata/m0.avro".to_string(), 1024i64, 0i32),
("metadata/m0-eq-del.avro".to_string(), 256i64, 1i32),
];
let bytes = write_manifest_list_multi_typed(&manifests, 99, 1, 10);
let typed = read_manifest_list_typed(&bytes).expect("read_manifest_list_typed failed");
assert_eq!(typed.len(), 2);
assert_eq!(typed[0].0, "metadata/m0.avro");
assert_eq!(typed[0].1, 0); assert_eq!(typed[1].0, "metadata/m0-eq-del.avro");
assert_eq!(typed[1].1, 1); }
#[test]
fn partition_spec_native_roundtrip() {
use crate::provider::{PartitionField, PartitionSpec};
let spec = PartitionSpec {
spec_id: 1,
fields: vec![PartitionField {
source_id: 1,
field_id: 1000,
name: "agent_id".to_string(),
transform: "identity".to_string(),
source_type: "string".to_string(),
}],
};
let file = DataFileEntry {
path: "data/part-agent-a.parquet".to_string(),
record_count: 50,
file_size_bytes: 2048,
centroid_b64: None,
radius: None,
hnsw_offset: None,
hnsw_len: None,
vector_column: Some("embedding".to_string()),
vector_dim: Some(4),
extra_vector_indexes: vec![],
index_status: IndexStatus::Ready,
batch_id: None,
embedding_model: None,
partition_value: Some("agent-abc-123".to_string()),
deletion_vector: None,
first_row_id: None,
};
let schema_json = r#"{"schema-id":0,"type":"struct","fields":[{"id":1,"name":"agent_id","required":false,"type":"string"}]}"#;
let partition_spec_json = r#"[{"spec-id":0,"fields":[]},{"spec-id":1,"fields":[{"name":"agent_id","transform":"identity","source-id":1,"field-id":1000}]}]"#;
let bytes = write_manifest_file(
&[file],
200,
1,
schema_json,
partition_spec_json,
2,
Some(&spec),
);
let entries = read_manifest_file(&bytes).expect("partition roundtrip failed");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].partition_value.as_deref(), Some("agent-abc-123"));
}
#[test]
fn partition_spec_int_native_roundtrip() {
use crate::provider::{PartitionField, PartitionSpec};
let spec = PartitionSpec {
spec_id: 1,
fields: vec![PartitionField {
source_id: 1,
field_id: 1000,
name: "shard_id".to_string(),
transform: "identity".to_string(),
source_type: "int".to_string(),
}],
};
let file = DataFileEntry {
path: "data/part-shard-7.parquet".to_string(),
record_count: 10,
file_size_bytes: 512,
centroid_b64: None,
radius: None,
hnsw_offset: None,
hnsw_len: None,
vector_column: None,
vector_dim: None,
extra_vector_indexes: vec![],
index_status: IndexStatus::Ready,
batch_id: None,
embedding_model: None,
partition_value: Some("7".to_string()),
deletion_vector: None,
first_row_id: None,
};
let schema_json = r#"{"schema-id":0,"type":"struct","fields":[{"id":1,"name":"shard_id","required":false,"type":"int"}]}"#;
let partition_spec_json = r#"[{"spec-id":1,"fields":[{"name":"shard_id","transform":"identity","source-id":1,"field-id":1000}]}]"#;
let bytes = write_manifest_file(
&[file],
201,
1,
schema_json,
partition_spec_json,
2,
Some(&spec),
);
let entries = read_manifest_file(&bytes).expect("int partition roundtrip failed");
assert_eq!(entries[0].partition_value.as_deref(), Some("7"));
}
#[test]
fn multi_column_partition_roundtrip() {
use crate::provider::{PartitionField, PartitionSpec};
let spec = PartitionSpec {
spec_id: 1,
fields: vec![
PartitionField {
source_id: 1,
field_id: 1000,
name: "agent_id".to_string(),
transform: "identity".to_string(),
source_type: "string".to_string(),
},
PartitionField {
source_id: 2,
field_id: 1001,
name: "ts".to_string(),
transform: "truncate[4]".to_string(),
source_type: "string".to_string(),
},
],
};
let compound = "agt-007\x1f2025"; let file = DataFileEntry {
path: "data/part-multi.parquet".to_string(),
record_count: 30,
file_size_bytes: 1024,
centroid_b64: None,
radius: None,
hnsw_offset: None,
hnsw_len: None,
vector_column: Some("embedding".to_string()),
vector_dim: Some(4),
extra_vector_indexes: vec![],
index_status: IndexStatus::Ready,
batch_id: None,
embedding_model: None,
partition_value: Some(compound.to_string()),
deletion_vector: None,
first_row_id: None,
};
let schema_json = r#"{"schema-id":0,"type":"struct","fields":[{"id":1,"name":"agent_id","required":false,"type":"string"},{"id":2,"name":"ts","required":false,"type":"string"}]}"#;
let partition_spec_json = r#"[{"spec-id":0,"fields":[]},{"spec-id":1,"fields":[{"name":"agent_id","transform":"identity","source-id":1,"field-id":1000},{"name":"ts","transform":"truncate[4]","source-id":2,"field-id":1001}]}]"#;
let bytes = write_manifest_file(
&[file],
202,
1,
schema_json,
partition_spec_json,
2,
Some(&spec),
);
let entries = read_manifest_file(&bytes).expect("multi-column roundtrip failed");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].partition_value.as_deref(), Some(compound));
}
#[test]
fn build_manifest_entry_schema_no_spec() {
let s = build_manifest_entry_schema(None);
assert!(s.contains("r102"));
assert!(s.contains(r#""fields":[]"#));
assert!(!s.contains("tenant_id")); }
#[test]
fn build_manifest_entry_schema_with_string_spec() {
use crate::provider::{PartitionField, PartitionSpec};
let spec = PartitionSpec {
spec_id: 1,
fields: vec![PartitionField {
source_id: 1,
field_id: 1000,
name: "tenant_id".to_string(),
transform: "identity".to_string(),
source_type: "string".to_string(),
}],
};
let s = build_manifest_entry_schema(Some(&spec));
assert!(s.contains("tenant_id"));
assert!(s.contains("1000")); assert!(s.contains(r#""null","string""#));
}
fn make_file(partition_value: Option<&str>, record_count: u64, size: u64) -> DataFileEntry {
DataFileEntry {
path: "data/part.parquet".to_string(),
record_count,
file_size_bytes: size,
centroid_b64: None,
radius: None,
hnsw_offset: None,
hnsw_len: None,
vector_column: None,
vector_dim: None,
extra_vector_indexes: vec![],
index_status: IndexStatus::Ready,
batch_id: None,
embedding_model: None,
partition_value: partition_value.map(String::from),
deletion_vector: None,
first_row_id: None,
}
}
#[test]
fn write_partition_stats_parquet_basic() {
use crate::provider::{PartitionField, PartitionSpec};
let spec = PartitionSpec {
spec_id: 1,
fields: vec![PartitionField {
source_id: 1,
field_id: 1000,
name: "agent_id".to_string(),
transform: "identity".to_string(),
source_type: "string".to_string(),
}],
};
let files = vec![
make_file(Some("agent-A"), 100, 4096),
make_file(Some("agent-A"), 200, 8192),
make_file(Some("agent-B"), 50, 2048),
];
let bytes = write_partition_stats_parquet(&spec, &files).expect("should not fail");
assert!(!bytes.is_empty(), "output must be non-empty");
use parquet::file::reader::{FileReader, SerializedFileReader};
let reader =
SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec())).expect("valid parquet");
let row_count: usize = reader.get_row_iter(None).expect("iter").count();
assert_eq!(row_count, 2, "expected one row per partition value");
}
#[test]
fn write_partition_stats_parquet_empty_files() {
use crate::provider::{PartitionField, PartitionSpec};
let spec = PartitionSpec {
spec_id: 1,
fields: vec![PartitionField {
source_id: 1,
field_id: 1000,
name: "tenant_id".to_string(),
transform: "identity".to_string(),
source_type: "string".to_string(),
}],
};
let bytes = write_partition_stats_parquet(&spec, &[]).expect("should not fail");
assert!(!bytes.is_empty());
use parquet::file::reader::{FileReader, SerializedFileReader};
let reader =
SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec())).expect("valid parquet");
let row_count = reader.get_row_iter(None).expect("iter").count();
assert_eq!(row_count, 0);
}
#[test]
fn write_partition_stats_parquet_aggregates_correctly() {
use crate::provider::{PartitionField, PartitionSpec};
let spec = PartitionSpec {
spec_id: 1,
fields: vec![PartitionField {
source_id: 1,
field_id: 1000,
name: "region".to_string(),
transform: "identity".to_string(),
source_type: "string".to_string(),
}],
};
let files = vec![
make_file(Some("us-east"), 1000, 10000),
make_file(Some("us-east"), 2000, 20000),
make_file(Some("eu-west"), 500, 5000),
];
let bytes = write_partition_stats_parquet(&spec, &files).expect("aggregation ok");
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::RowAccessor;
let reader =
SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec())).expect("valid parquet");
let mut rc_us = 0i64;
let mut rc_eu = 0i64;
for row in reader.get_row_iter(None).expect("iter") {
let row = row.expect("row");
let rc = row.get_long(1).expect("record_count");
if rc == 3000 {
rc_us = rc; } else if rc == 500 {
rc_eu = rc; }
}
assert_eq!(rc_us, 3000, "us-east record_count should aggregate to 3000");
assert_eq!(rc_eu, 500, "eu-west record_count should be 500");
}
}