use crate::error::Result;
use crate::io::FileIO;
use crate::manifest::avro::data_file_to_avro;
use crate::manifest::schema::{manifest_entry_schema_v2, manifest_list_schema_v2};
use crate::spec::DataFile;
use apache_avro::types::Value;
use apache_avro::Writer;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum ManifestEntryStatus {
Existing = 0,
Added = 1,
Deleted = 2,
}
impl From<ManifestEntryStatus> for i32 {
fn from(status: ManifestEntryStatus) -> Self {
status as i32
}
}
#[derive(Debug, Clone)]
pub struct ManifestEntry {
pub data_file: DataFile,
pub status: ManifestEntryStatus,
}
#[derive(Debug, Clone)]
pub struct ManifestListEntry {
pub manifest_path: String,
pub manifest_length: i64,
pub partition_spec_id: i32,
pub content: i32,
pub sequence_number: i64,
pub min_sequence_number: i64,
pub added_snapshot_id: i64,
pub added_files_count: i32,
pub existing_files_count: i32,
pub deleted_files_count: i32,
pub added_rows_count: i64,
pub existing_rows_count: i64,
pub deleted_rows_count: i64,
}
pub async fn write_manifest(
file_io: &FileIO,
path: &str,
data_files: &[DataFile],
snapshot_id: i64,
sequence_number: i64,
) -> Result<i64> {
let entries: Vec<ManifestEntry> = data_files
.iter()
.map(|df| ManifestEntry {
data_file: df.clone(),
status: ManifestEntryStatus::Added,
})
.collect();
write_manifest_with_entries(file_io, path, &entries, snapshot_id, sequence_number).await
}
pub async fn write_manifest_with_entries(
file_io: &FileIO,
path: &str,
entries: &[ManifestEntry],
snapshot_id: i64,
sequence_number: i64,
) -> Result<i64> {
let schema = manifest_entry_schema_v2()
.map_err(|e| crate::error::Error::InvalidInput(format!("Invalid Avro schema: {}", e)))?;
let mut writer = Writer::new(&schema, Vec::new());
for entry in entries {
let data_file_value = data_file_to_avro(&entry.data_file)?;
let status_value: i32 = entry.status.into();
let avro_entry = Value::Record(vec![
("status".to_string(), Value::Int(status_value)),
(
"snapshot_id".to_string(),
Value::Union(1, Box::new(Value::Long(snapshot_id))),
),
(
"sequence_number".to_string(),
Value::Union(1, Box::new(Value::Long(sequence_number))),
),
(
"file_sequence_number".to_string(),
Value::Union(1, Box::new(Value::Long(sequence_number))),
),
("data_file".to_string(), data_file_value),
]);
writer.append(avro_entry).map_err(|e| {
crate::error::Error::InvalidInput(format!("Failed to append to Avro writer: {}", e))
})?;
}
let avro_bytes = writer.into_inner().map_err(|e| {
crate::error::Error::InvalidInput(format!("Failed to finalize Avro writer: {}", e))
})?;
let bytes_written = avro_bytes.len() as i64;
file_io.write(path, avro_bytes).await?;
Ok(bytes_written)
}
pub async fn write_manifest_list(
file_io: &FileIO,
path: &str,
entries: Vec<ManifestListEntry>,
) -> Result<()> {
let schema = manifest_list_schema_v2()
.map_err(|e| crate::error::Error::InvalidInput(format!("Invalid Avro schema: {}", e)))?;
let mut writer = Writer::new(&schema, Vec::new());
for entry in entries {
let avro_entry = Value::Record(vec![
(
"manifest_path".to_string(),
Value::String(entry.manifest_path),
),
(
"manifest_length".to_string(),
Value::Long(entry.manifest_length),
),
(
"partition_spec_id".to_string(),
Value::Int(entry.partition_spec_id),
),
("content".to_string(), Value::Int(entry.content)),
(
"sequence_number".to_string(),
Value::Long(entry.sequence_number),
),
(
"min_sequence_number".to_string(),
Value::Long(entry.min_sequence_number),
),
(
"added_snapshot_id".to_string(),
Value::Long(entry.added_snapshot_id),
),
(
"added_files_count".to_string(),
Value::Int(entry.added_files_count),
),
(
"existing_files_count".to_string(),
Value::Int(entry.existing_files_count),
),
(
"deleted_files_count".to_string(),
Value::Int(entry.deleted_files_count),
),
(
"added_rows_count".to_string(),
Value::Long(entry.added_rows_count),
),
(
"existing_rows_count".to_string(),
Value::Long(entry.existing_rows_count),
),
(
"deleted_rows_count".to_string(),
Value::Long(entry.deleted_rows_count),
),
(
"partitions".to_string(),
Value::Union(1, Box::new(Value::Array(vec![]))),
),
(
"key_metadata".to_string(),
Value::Union(0, Box::new(Value::Null)),
),
]);
writer.append(avro_entry).map_err(|e| {
crate::error::Error::InvalidInput(format!("Failed to append to Avro writer: {}", e))
})?;
}
let avro_bytes = writer.into_inner().map_err(|e| {
crate::error::Error::InvalidInput(format!("Failed to finalize Avro writer: {}", e))
})?;
file_io.write(path, avro_bytes).await?;
Ok(())
}