use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use uuid::Uuid;
use crate::DataFile;
use crate::manifest::{Operation, Snapshot};
use crate::partition::PartitionSpec;
use crate::schema::Schema;
pub const FORMAT_VERSION: i32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct TableMetadata {
pub table_uuid: Uuid,
pub format_version: i32,
pub location: String,
pub last_sequence_number: i64,
pub last_updated_ms: i64,
pub last_column_id: i32,
pub current_schema_id: i32,
#[serde(default)]
pub schemas: Vec<Schema>,
#[serde(default)]
pub default_spec_id: i32,
#[serde(default)]
pub partition_specs: Vec<PartitionSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub current_snapshot_id: Option<i64>,
#[serde(default)]
pub snapshots: Vec<Snapshot>,
#[serde(default)]
pub snapshot_log: Vec<SnapshotLogEntry>,
#[serde(default)]
pub properties: HashMap<String, String>,
#[serde(default)]
pub metrics: TableMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "kebab-case")]
pub struct TableMetrics {
pub total_records: i64,
pub total_files: i64,
pub total_size_bytes: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct SnapshotLogEntry {
pub snapshot_id: i64,
pub timestamp_ms: i64,
pub operation: Operation,
}
pub struct TableMetadataBuilder {
location: String,
schema: Schema,
partition_spec: Option<PartitionSpec>,
properties: HashMap<String, String>,
}
impl TableMetadataBuilder {
pub fn new(location: impl Into<String>, schema: Schema) -> Self {
Self {
location: location.into(),
schema,
partition_spec: None,
properties: HashMap::new(),
}
}
pub fn with_partition_spec(mut self, spec: PartitionSpec) -> Self {
self.partition_spec = Some(spec);
self
}
pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.properties.insert(key.into(), value.into());
self
}
pub fn with_properties(mut self, props: impl IntoIterator<Item = (String, String)>) -> Self {
self.properties.extend(props);
self
}
pub fn build(self) -> TableMetadata {
let now = Utc::now();
let last_column_id = self.schema.fields.iter().map(|f| f.id).max().unwrap_or(0);
let mut partition_specs = Vec::new();
if let Some(spec) = self.partition_spec {
partition_specs.push(spec);
}
TableMetadata {
table_uuid: Uuid::new_v4(),
format_version: FORMAT_VERSION,
location: self.location,
last_sequence_number: 0,
last_updated_ms: now.timestamp_millis(),
last_column_id,
current_schema_id: self.schema.schema_id,
schemas: vec![self.schema],
default_spec_id: 0,
partition_specs,
current_snapshot_id: None,
snapshots: Vec::new(),
snapshot_log: Vec::new(),
properties: self.properties,
metrics: TableMetrics::default(),
}
}
}
impl TableMetadata {
pub fn builder(location: impl Into<String>, schema: Schema) -> TableMetadataBuilder {
TableMetadataBuilder::new(location, schema)
}
pub fn current_schema(&self) -> &Schema {
self.schemas
.iter()
.find(|s| s.schema_id == self.current_schema_id)
.expect("current schema must exist")
}
pub fn current_snapshot(&self) -> Option<&Snapshot> {
self.current_snapshot_id
.and_then(|id| self.snapshots.iter().find(|s| s.snapshot_id == id))
}
pub fn snapshot(&self, snapshot_id: i64) -> Option<&Snapshot> {
self.snapshots.iter().find(|s| s.snapshot_id == snapshot_id)
}
pub fn snapshot_at(&self, timestamp_ms: i64) -> Option<&Snapshot> {
let entry = self
.snapshot_log
.iter()
.rev()
.find(|e| e.timestamp_ms <= timestamp_ms)?;
self.snapshot(entry.snapshot_id)
}
pub fn schema(&self, schema_id: i32) -> Option<&Schema> {
self.schemas.iter().find(|s| s.schema_id == schema_id)
}
pub fn current_partition_spec(&self) -> Option<&PartitionSpec> {
if self.partition_specs.is_empty() {
return None;
}
self.partition_specs
.iter()
.find(|s| s.spec_id == self.default_spec_id)
}
pub fn add_partition_spec(&mut self, mut spec: PartitionSpec) {
let next_id = self
.partition_specs
.iter()
.map(|s| s.spec_id)
.max()
.unwrap_or(-1)
+ 1;
spec.spec_id = next_id;
self.default_spec_id = next_id;
self.partition_specs.push(spec);
self.increment_sequence();
}
pub fn increment_sequence(&mut self) {
self.last_sequence_number += 1;
self.last_updated_ms = Utc::now().timestamp_millis();
}
pub fn add_snapshot(&mut self, snapshot: Snapshot) {
let timestamp_ms = Utc::now().timestamp_millis();
let snapshot_id = snapshot.snapshot_id;
let operation = snapshot.operation;
self.snapshots.push(snapshot);
self.current_snapshot_id = Some(snapshot_id);
self.snapshot_log.push(SnapshotLogEntry {
snapshot_id,
timestamp_ms,
operation,
});
self.increment_sequence();
}
pub fn add_schema(&mut self, schema: Schema) {
if let Some(max_id) = schema.fields.iter().map(|f| f.id).max() {
self.last_column_id = self.last_column_id.max(max_id);
}
self.schemas.push(schema);
}
pub fn next_schema_id(&self) -> i32 {
self.schemas.iter().map(|s| s.schema_id).max().unwrap_or(-1) + 1
}
pub fn next_column_id(&self) -> i32 {
self.last_column_id + 1
}
pub fn next_snapshot_id(&self) -> i64 {
self.snapshots
.iter()
.map(|s| s.snapshot_id)
.max()
.unwrap_or(0)
+ 1
}
pub fn set_current_schema(&mut self, schema_id: i32) -> Result<(), MetadataError> {
if self.schemas.iter().any(|s| s.schema_id == schema_id) {
self.current_schema_id = schema_id;
self.increment_sequence();
Ok(())
} else {
Err(MetadataError::SchemaNotFound(schema_id))
}
}
pub fn rollback_to(&mut self, snapshot_id: i64) -> Result<(), MetadataError> {
if self.snapshots.iter().any(|s| s.snapshot_id == snapshot_id) {
self.current_snapshot_id = Some(snapshot_id);
self.snapshot_log.push(SnapshotLogEntry {
snapshot_id,
timestamp_ms: Utc::now().timestamp_millis(),
operation: Operation::Replace,
});
self.increment_sequence();
Ok(())
} else {
Err(MetadataError::SnapshotNotFound(snapshot_id))
}
}
pub fn update_metrics(&mut self, added: &[DataFile], deleted_paths: &HashSet<String>) {
for file in added {
self.metrics.total_records += file.record_count;
self.metrics.total_files += 1;
self.metrics.total_size_bytes += file.file_size_in_bytes;
}
self.metrics.total_files -= deleted_paths.len() as i64;
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum MetadataError {
#[error("schema not found: {0}")]
SchemaNotFound(i32),
#[error("snapshot not found: {0}")]
SnapshotNotFound(i64),
#[error("conflict: expected sequence {expected}, found {actual}")]
ConflictError { expected: i64, actual: i64 },
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::Type;
fn sample_schema() -> Schema {
Schema::builder(0)
.with_field(1, "id", Type::Long, true)
.with_field(2, "data", Type::String, false)
.build()
}
#[test]
fn test_builder_creates_valid_metadata() {
let schema = sample_schema();
let metadata = TableMetadata::builder("s3://bucket/table", schema)
.with_property("owner", "test")
.build();
assert_eq!(metadata.format_version, FORMAT_VERSION);
assert_eq!(metadata.location, "s3://bucket/table");
assert_eq!(metadata.last_sequence_number, 0);
assert_eq!(metadata.current_schema_id, 0);
assert_eq!(metadata.properties.get("owner"), Some(&"test".to_string()));
assert!(metadata.current_snapshot_id.is_none());
}
#[test]
fn test_increment_sequence() {
let schema = sample_schema();
let mut metadata = TableMetadata::builder("s3://bucket/table", schema).build();
let initial_seq = metadata.last_sequence_number;
metadata.increment_sequence();
assert_eq!(metadata.last_sequence_number, initial_seq + 1);
}
#[test]
fn test_rollback_to_nonexistent_snapshot() {
let schema = sample_schema();
let mut metadata = TableMetadata::builder("s3://bucket/table", schema).build();
let result = metadata.rollback_to(999);
assert!(matches!(result, Err(MetadataError::SnapshotNotFound(999))));
}
}