use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Operation {
Append,
Overwrite,
Delete,
Replace,
}
impl std::fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Operation::Append => write!(f, "append"),
Operation::Overwrite => write!(f, "overwrite"),
Operation::Delete => write!(f, "delete"),
Operation::Replace => write!(f, "replace"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Snapshot {
pub snapshot_id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_snapshot_id: Option<i64>,
pub sequence_number: i64,
pub timestamp_ms: i64,
pub operation: Operation,
pub manifest_list: String,
#[serde(default)]
pub summary: HashMap<String, String>,
pub schema_id: i32,
}
impl Snapshot {
pub fn builder(snapshot_id: i64, manifest_list: impl Into<String>) -> SnapshotBuilder {
SnapshotBuilder::new(snapshot_id, manifest_list)
}
pub fn summary_property(&self, key: &str) -> Option<&str> {
self.summary.get(key).map(|s| s.as_str())
}
pub fn added_files_count(&self) -> Option<i64> {
self.summary_property("added-data-files")
.and_then(|s| s.parse().ok())
}
pub fn added_records_count(&self) -> Option<i64> {
self.summary_property("added-records")
.and_then(|s| s.parse().ok())
}
pub fn total_files_count(&self) -> Option<i64> {
self.summary_property("total-data-files")
.and_then(|s| s.parse().ok())
}
pub fn total_records_count(&self) -> Option<i64> {
self.summary_property("total-records")
.and_then(|s| s.parse().ok())
}
pub async fn all_files(
&self,
storage: &crate::storage::Storage,
) -> anyhow::Result<(Vec<DataFile>, Vec<DataFile>)> {
let manifest_list = ManifestList::load(&self.manifest_list, storage).await?;
let mut data_files = Vec::new();
let mut delete_files = Vec::new();
for entry in manifest_list.entries {
let manifest = ManifestFile::load(&entry.manifest_path, storage).await?;
for m_entry in manifest.entries {
if m_entry.status != ManifestEntryStatus::Deleted {
match m_entry.data_file.content {
FileContent::Data => data_files.push(m_entry.data_file),
FileContent::PositionDeletes | FileContent::EqualityDeletes => {
delete_files.push(m_entry.data_file)
}
}
}
}
}
Ok((data_files, delete_files))
}
pub async fn all_data_files(
&self,
storage: &crate::storage::Storage,
) -> anyhow::Result<Vec<DataFile>> {
let (data_files, _) = self.all_files(storage).await?;
Ok(data_files)
}
}
pub struct SnapshotBuilder {
snapshot_id: i64,
manifest_list: String,
parent_snapshot_id: Option<i64>,
sequence_number: i64,
timestamp_ms: i64,
operation: Operation,
summary: HashMap<String, String>,
schema_id: i32,
}
impl SnapshotBuilder {
pub fn new(snapshot_id: i64, manifest_list: impl Into<String>) -> Self {
Self {
snapshot_id,
manifest_list: manifest_list.into(),
parent_snapshot_id: None,
sequence_number: 0,
timestamp_ms: chrono::Utc::now().timestamp_millis(),
operation: Operation::Append,
summary: HashMap::new(),
schema_id: 0,
}
}
pub fn with_parent(mut self, parent_id: i64) -> Self {
self.parent_snapshot_id = Some(parent_id);
self
}
pub fn with_sequence_number(mut self, seq: i64) -> Self {
self.sequence_number = seq;
self
}
pub fn with_operation(mut self, op: Operation) -> Self {
self.operation = op;
self
}
pub fn with_schema_id(mut self, schema_id: i32) -> Self {
self.schema_id = schema_id;
self
}
pub fn with_summary_property(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.summary.insert(key.into(), value.into());
self
}
pub fn build(self) -> Snapshot {
Snapshot {
snapshot_id: self.snapshot_id,
parent_snapshot_id: self.parent_snapshot_id,
sequence_number: self.sequence_number,
timestamp_ms: self.timestamp_ms,
operation: self.operation,
manifest_list: self.manifest_list,
summary: self.summary,
schema_id: self.schema_id,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct ManifestList {
pub entries: Vec<ManifestFileEntry>,
}
impl ManifestList {
pub fn new() -> Self {
Self {
entries: Vec::new(),
}
}
pub fn add_entry(&mut self, entry: ManifestFileEntry) {
self.entries.push(entry);
}
pub fn total_data_files(&self) -> i64 {
self.entries
.iter()
.map(|e| e.added_files_count + e.existing_files_count)
.sum()
}
pub async fn load(path: &str, storage: &crate::storage::Storage) -> anyhow::Result<Self> {
let data = storage.read(path).await?;
Ok(serde_json::from_slice(&data)?)
}
pub async fn save(&self, path: &str, storage: &crate::storage::Storage) -> anyhow::Result<()> {
let data = serde_json::to_vec(self)?;
storage.write(path, data.into()).await?;
Ok(())
}
}
impl Default for ManifestList {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct ManifestFileEntry {
pub manifest_path: String,
pub manifest_length: i64,
pub partition_spec_id: i32,
pub content: ManifestContent,
pub sequence_number: i64,
pub min_sequence_number: i64,
pub added_snapshot_id: i64,
pub added_files_count: i64,
pub existing_files_count: i64,
pub deleted_files_count: i64,
pub added_rows_count: i64,
pub existing_rows_count: i64,
pub deleted_rows_count: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ManifestContent {
Data,
Deletes,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct ManifestFile {
pub schema: crate::schema::Schema,
pub partition_spec_id: i32,
pub content: ManifestContent,
pub entries: Vec<ManifestEntry>,
}
impl ManifestFile {
pub async fn load(path: &str, storage: &crate::storage::Storage) -> anyhow::Result<Self> {
let data = storage.read(path).await?;
Ok(serde_json::from_slice(&data)?)
}
pub async fn save(&self, path: &str, storage: &crate::storage::Storage) -> anyhow::Result<()> {
let data = serde_json::to_vec(self)?;
storage.write(path, data.into()).await?;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct ManifestEntry {
pub status: ManifestEntryStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub snapshot_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sequence_number: Option<i64>,
pub data_file: DataFile,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ManifestEntryStatus {
Existing,
Added,
Deleted,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FileFormat {
Parquet,
Orc,
Avro,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum FileContent {
Data,
PositionDeletes,
EqualityDeletes,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct DataFile {
pub file_path: String,
pub file_format: FileFormat,
pub content: FileContent,
#[serde(default)]
pub partition: HashMap<String, serde_json::Value>,
pub record_count: i64,
pub file_size_in_bytes: i64,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub column_sizes: HashMap<i32, i64>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub value_counts: HashMap<i32, i64>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub null_value_counts: HashMap<i32, i64>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub nan_value_counts: HashMap<i32, i64>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub lower_bounds: HashMap<i32, Vec<u8>>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub upper_bounds: HashMap<i32, Vec<u8>>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub statistics: HashMap<i32, crate::statistics::ColumnStats>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sort_order_id: Option<i32>,
}
impl DataFile {
pub fn new(
file_path: impl Into<String>,
file_format: FileFormat,
record_count: i64,
file_size_in_bytes: i64,
) -> Self {
Self {
file_path: file_path.into(),
file_format,
content: FileContent::Data,
partition: HashMap::new(),
record_count,
file_size_in_bytes,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
statistics: HashMap::new(),
sort_order_id: None,
}
}
pub fn with_partition(mut self, partition: HashMap<String, serde_json::Value>) -> Self {
self.partition = partition;
self
}
pub fn with_column_stats(
mut self,
column_id: i32,
size: i64,
value_count: i64,
null_count: i64,
) -> Self {
self.column_sizes.insert(column_id, size);
self.value_counts.insert(column_id, value_count);
self.null_value_counts.insert(column_id, null_count);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_snapshot_builder() {
let snapshot = Snapshot::builder(1, "/manifest-list.avro")
.with_operation(Operation::Append)
.with_sequence_number(5)
.with_summary_property("added-data-files", "10")
.with_summary_property("added-records", "1000")
.build();
assert_eq!(snapshot.snapshot_id, 1);
assert_eq!(snapshot.operation, Operation::Append);
assert_eq!(snapshot.sequence_number, 5);
assert_eq!(snapshot.added_files_count(), Some(10));
assert_eq!(snapshot.added_records_count(), Some(1000));
}
#[test]
fn test_manifest_list() {
let mut list = ManifestList::new();
list.add_entry(ManifestFileEntry {
manifest_path: "/manifest-1.avro".into(),
manifest_length: 1024,
partition_spec_id: 0,
content: ManifestContent::Data,
sequence_number: 1,
min_sequence_number: 1,
added_snapshot_id: 1,
added_files_count: 5,
existing_files_count: 0,
deleted_files_count: 0,
added_rows_count: 500,
existing_rows_count: 0,
deleted_rows_count: 0,
});
assert_eq!(list.total_data_files(), 5);
}
#[test]
fn test_data_file() {
let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024)
.with_column_stats(1, 512 * 1024, 1000, 0);
assert_eq!(file.record_count, 1000);
assert_eq!(file.column_sizes.get(&1), Some(&(512 * 1024)));
}
}