use icepick::catalog::Catalog;
use icepick::io::FileIO;
use icepick::spec::{
DataFile, NamespaceIdent, NestedField, PrimitiveType, Schema, TableCreation, TableIdent,
TableMetadata, Type,
};
use icepick::table::Table;
use opendal::Operator;
use std::collections::HashMap;
struct TestCatalog {
file_io: FileIO,
}
impl TestCatalog {
fn new(file_io: FileIO) -> Self {
Self { file_io }
}
}
#[async_trait::async_trait]
impl Catalog for TestCatalog {
fn file_io(&self) -> &FileIO {
&self.file_io
}
async fn create_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
) -> icepick::error::Result<()> {
Ok(())
}
async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> icepick::error::Result<bool> {
Ok(true)
}
async fn list_tables(
&self,
_namespace: &NamespaceIdent,
) -> icepick::error::Result<Vec<TableIdent>> {
Ok(vec![])
}
async fn table_exists(&self, _identifier: &TableIdent) -> icepick::error::Result<bool> {
Ok(true)
}
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
) -> icepick::error::Result<Table> {
unimplemented!("TestCatalog::create_table")
}
async fn load_table(&self, _identifier: &TableIdent) -> icepick::error::Result<Table> {
unimplemented!("TestCatalog::load_table")
}
async fn drop_table(&self, _identifier: &TableIdent) -> icepick::error::Result<()> {
Ok(())
}
async fn update_table_metadata(
&self,
_identifier: &TableIdent,
_old_metadata_location: &str,
_new_metadata_location: &str,
) -> icepick::error::Result<()> {
Ok(())
}
}
#[tokio::test]
async fn test_end_to_end_commit_with_stats() {
let op = Operator::via_iter(opendal::Scheme::Memory, []).unwrap();
let file_io = FileIO::new(op.clone());
let schema = Schema::builder()
.with_fields(vec![
NestedField::required_field(1, "id".to_string(), Type::Primitive(PrimitiveType::Long)),
NestedField::optional_field(
2,
"name".to_string(),
Type::Primitive(PrimitiveType::String),
),
])
.build()
.unwrap();
let metadata = TableMetadata::builder()
.with_location("memory://warehouse/db/table")
.with_current_schema(schema)
.build()
.unwrap();
let ident = TableIdent::new(
NamespaceIdent::new(vec!["db".to_string()]),
"table".to_string(),
);
let table = Table::new(
ident,
metadata,
"memory://warehouse/db/table/metadata/v0.metadata.json".to_string(),
file_io.clone(),
);
let mut value_counts = HashMap::new();
value_counts.insert(1, 1000);
value_counts.insert(2, 950);
let mut null_counts = HashMap::new();
null_counts.insert(2, 50);
let data_file = DataFile::builder()
.with_file_path("memory://warehouse/db/table/data/file1.parquet")
.with_file_format("PARQUET")
.with_record_count(1000)
.with_file_size_in_bytes(50_000)
.with_value_counts(value_counts)
.with_null_value_counts(null_counts)
.build()
.unwrap();
let catalog = TestCatalog::new(file_io.clone());
let timestamp_ms = 1234567890;
table
.transaction()
.append(vec![data_file])
.commit(&catalog, timestamp_ms)
.await
.unwrap();
let manifest_exists = op
.list("memory://warehouse/db/table/metadata/")
.await
.unwrap()
.into_iter()
.any(|entry| entry.path().contains("-m0.avro"));
assert!(manifest_exists, "Manifest file should exist");
let manifest_list_exists = op
.list("memory://warehouse/db/table/metadata/")
.await
.unwrap()
.into_iter()
.any(|entry| entry.path().contains("snap-") && entry.path().contains("-1-"));
assert!(manifest_list_exists, "Manifest list should exist");
let metadata_exists = op
.exists("memory://warehouse/db/table/metadata/v1.metadata.json")
.await
.unwrap();
assert!(metadata_exists, "New metadata file should exist");
let metadata_bytes = op
.read("memory://warehouse/db/table/metadata/v1.metadata.json")
.await
.unwrap();
let new_metadata: TableMetadata = serde_json::from_slice(&metadata_bytes.to_vec()).unwrap();
let snapshot_id = new_metadata
.current_snapshot_id()
.expect("Should have current snapshot");
assert!(snapshot_id > 0, "Snapshot ID should be positive");
assert_eq!(new_metadata.snapshots().len(), 1);
let snapshot = new_metadata.current_snapshot().unwrap();
assert_eq!(snapshot.snapshot_id(), snapshot_id);
assert_eq!(
snapshot.summary().get("operation"),
Some(&"append".to_string())
);
assert_eq!(
snapshot.summary().get("added-data-files"),
Some(&"1".to_string())
);
assert_eq!(
snapshot.summary().get("added-records"),
Some(&"1000".to_string())
);
}
#[tokio::test]
async fn test_multiple_sequential_commits() {
let op = Operator::via_iter(opendal::Scheme::Memory, []).unwrap();
let file_io = FileIO::new(op.clone());
let schema = Schema::builder()
.with_fields(vec![NestedField::required_field(
1,
"id".to_string(),
Type::Primitive(PrimitiveType::Long),
)])
.build()
.unwrap();
let metadata = TableMetadata::builder()
.with_location("memory://warehouse/test/multi")
.with_current_schema(schema)
.build()
.unwrap();
let ident = TableIdent::new(
NamespaceIdent::new(vec!["test".to_string()]),
"multi".to_string(),
);
let mut table = Table::new(
ident.clone(),
metadata,
"memory://warehouse/test/multi/metadata/v0.metadata.json".to_string(),
file_io.clone(),
);
let catalog = TestCatalog::new(file_io.clone());
let data_file1 = DataFile::builder()
.with_file_path("memory://warehouse/test/multi/data/file1.parquet")
.with_file_format("PARQUET")
.with_record_count(100)
.with_file_size_in_bytes(5000)
.build()
.unwrap();
let timestamp_ms_1 = 1234567890;
table
.transaction()
.append(vec![data_file1])
.commit(&catalog, timestamp_ms_1)
.await
.unwrap();
let metadata_bytes = op
.read("memory://warehouse/test/multi/metadata/v1.metadata.json")
.await
.unwrap();
let updated_metadata: TableMetadata = serde_json::from_slice(&metadata_bytes.to_vec()).unwrap();
table = Table::new(
ident.clone(),
updated_metadata,
"memory://warehouse/test/multi/metadata/v1.metadata.json".to_string(),
file_io.clone(),
);
let data_file2 = DataFile::builder()
.with_file_path("memory://warehouse/test/multi/data/file2.parquet")
.with_file_format("PARQUET")
.with_record_count(200)
.with_file_size_in_bytes(10000)
.build()
.unwrap();
let timestamp_ms_2 = 1234567900;
table
.transaction()
.append(vec![data_file2])
.commit(&catalog, timestamp_ms_2)
.await
.unwrap();
let final_metadata_bytes = op
.read("memory://warehouse/test/multi/metadata/v2.metadata.json")
.await
.unwrap();
let final_metadata: TableMetadata =
serde_json::from_slice(&final_metadata_bytes.to_vec()).unwrap();
assert_eq!(final_metadata.snapshots().len(), 2);
let current_snapshot_id = final_metadata
.current_snapshot_id()
.expect("Should have current snapshot");
assert!(
current_snapshot_id > 0,
"Current snapshot ID should be positive"
);
let snapshots = final_metadata.snapshots();
assert_eq!(snapshots[1].snapshot_id(), current_snapshot_id);
let first_snapshot_id = snapshots[0].snapshot_id();
assert_eq!(snapshots[1].parent_snapshot_id(), Some(first_snapshot_id));
let reloaded_table = Table::new(
ident,
final_metadata,
"memory://warehouse/test/multi/metadata/v2.metadata.json".to_string(),
file_io,
);
let files = reloaded_table.files().await.unwrap();
assert_eq!(files.len(), 2, "Should have 2 data files from both commits");
let file_paths: Vec<String> = files.iter().map(|f| f.file_path.clone()).collect();
assert!(
file_paths.contains(&"memory://warehouse/test/multi/data/file1.parquet".to_string()),
"Should include file1 from first commit"
);
assert!(
file_paths.contains(&"memory://warehouse/test/multi/data/file2.parquet".to_string()),
"Should include file2 from second commit"
);
let total_records: i64 = files.iter().map(|f| f.record_count).sum();
assert_eq!(
total_records, 300,
"Should have 100 + 200 = 300 total records"
);
}