1use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use std::collections::{HashMap, HashSet};
23use uuid::Uuid;
24
25use crate::DataFile;
26use crate::manifest::{Operation, Snapshot};
27use crate::partition::PartitionSpec;
28use crate::schema::Schema;
29
30pub const FORMAT_VERSION: i32 = 1;
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
65#[serde(rename_all = "kebab-case")]
66pub struct TableMetadata {
67 pub table_uuid: Uuid,
69
70 pub format_version: i32,
73
74 pub location: String,
77
78 pub last_sequence_number: i64,
81
82 pub last_updated_ms: i64,
84
85 pub last_column_id: i32,
88
89 pub current_schema_id: i32,
91
92 #[serde(default)]
95 pub schemas: Vec<Schema>,
96
97 #[serde(default)]
99 pub default_spec_id: i32,
100
101 #[serde(default)]
103 pub partition_specs: Vec<PartitionSpec>,
104
105 #[serde(skip_serializing_if = "Option::is_none")]
107 pub current_snapshot_id: Option<i64>,
108
109 #[serde(default)]
111 pub snapshots: Vec<Snapshot>,
112
113 #[serde(default)]
115 pub snapshot_log: Vec<SnapshotLogEntry>,
116
117 #[serde(default)]
120 pub properties: HashMap<String, String>,
121
122 #[serde(default)]
124 pub metrics: TableMetrics,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize, Default)]
129#[serde(rename_all = "kebab-case")]
130pub struct TableMetrics {
131 pub total_records: i64,
132 pub total_files: i64,
133 pub total_size_bytes: i64,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142pub struct SnapshotLogEntry {
143 pub snapshot_id: i64,
145
146 pub timestamp_ms: i64,
148
149 pub operation: Operation,
151}
152
153pub struct TableMetadataBuilder {
157 location: String,
158 schema: Schema,
159 partition_spec: Option<PartitionSpec>,
160 properties: HashMap<String, String>,
161}
162
163impl TableMetadataBuilder {
164 pub fn new(location: impl Into<String>, schema: Schema) -> Self {
166 Self {
167 location: location.into(),
168 schema,
169 partition_spec: None,
170 properties: HashMap::new(),
171 }
172 }
173
174 pub fn with_partition_spec(mut self, spec: PartitionSpec) -> Self {
176 self.partition_spec = Some(spec);
177 self
178 }
179
180 pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
182 self.properties.insert(key.into(), value.into());
183 self
184 }
185
186 pub fn with_properties(mut self, props: impl IntoIterator<Item = (String, String)>) -> Self {
188 self.properties.extend(props);
189 self
190 }
191
192 pub fn build(self) -> TableMetadata {
194 let now = Utc::now();
195 let last_column_id = self.schema.fields.iter().map(|f| f.id).max().unwrap_or(0);
196
197 let mut partition_specs = Vec::new();
198 if let Some(spec) = self.partition_spec {
199 partition_specs.push(spec);
200 }
201
202 TableMetadata {
203 table_uuid: Uuid::new_v4(),
204 format_version: FORMAT_VERSION,
205 location: self.location,
206 last_sequence_number: 0,
207 last_updated_ms: now.timestamp_millis(),
208 last_column_id,
209 current_schema_id: self.schema.schema_id,
210 schemas: vec![self.schema],
211 default_spec_id: 0,
212 partition_specs,
213 current_snapshot_id: None,
214 snapshots: Vec::new(),
215 snapshot_log: Vec::new(),
216 properties: self.properties,
217 metrics: TableMetrics::default(),
218 }
219 }
220}
221
222impl TableMetadata {
223 pub fn builder(location: impl Into<String>, schema: Schema) -> TableMetadataBuilder {
225 TableMetadataBuilder::new(location, schema)
226 }
227
228 pub fn current_schema(&self) -> &Schema {
235 self.schemas
236 .iter()
237 .find(|s| s.schema_id == self.current_schema_id)
238 .expect("current schema must exist")
239 }
240
241 pub fn current_snapshot(&self) -> Option<&Snapshot> {
243 self.current_snapshot_id
244 .and_then(|id| self.snapshots.iter().find(|s| s.snapshot_id == id))
245 }
246
247 pub fn snapshot(&self, snapshot_id: i64) -> Option<&Snapshot> {
249 self.snapshots.iter().find(|s| s.snapshot_id == snapshot_id)
250 }
251
252 pub fn snapshot_at(&self, timestamp_ms: i64) -> Option<&Snapshot> {
257 let entry = self
259 .snapshot_log
260 .iter()
261 .rev()
262 .find(|e| e.timestamp_ms <= timestamp_ms)?;
263
264 self.snapshot(entry.snapshot_id)
265 }
266
267 pub fn schema(&self, schema_id: i32) -> Option<&Schema> {
269 self.schemas.iter().find(|s| s.schema_id == schema_id)
270 }
271
272 pub fn current_partition_spec(&self) -> Option<&PartitionSpec> {
274 if self.partition_specs.is_empty() {
275 return None;
276 }
277 self.partition_specs
278 .iter()
279 .find(|s| s.spec_id == self.default_spec_id)
280 }
281
282 pub fn add_partition_spec(&mut self, mut spec: PartitionSpec) {
284 let next_id = self
286 .partition_specs
287 .iter()
288 .map(|s| s.spec_id)
289 .max()
290 .unwrap_or(-1)
291 + 1;
292 spec.spec_id = next_id;
293
294 self.default_spec_id = next_id;
295 self.partition_specs.push(spec);
296 self.increment_sequence();
297 }
298
299 pub fn increment_sequence(&mut self) {
303 self.last_sequence_number += 1;
304 self.last_updated_ms = Utc::now().timestamp_millis();
305 }
306
307 pub fn add_snapshot(&mut self, snapshot: Snapshot) {
311 let timestamp_ms = Utc::now().timestamp_millis();
312 let snapshot_id = snapshot.snapshot_id;
313 let operation = snapshot.operation;
314
315 self.snapshots.push(snapshot);
316 self.current_snapshot_id = Some(snapshot_id);
317 self.snapshot_log.push(SnapshotLogEntry {
318 snapshot_id,
319 timestamp_ms,
320 operation,
321 });
322 self.increment_sequence();
323 }
324
325 pub fn add_schema(&mut self, schema: Schema) {
329 if let Some(max_id) = schema.fields.iter().map(|f| f.id).max() {
331 self.last_column_id = self.last_column_id.max(max_id);
332 }
333 self.schemas.push(schema);
334 }
335
336 pub fn next_schema_id(&self) -> i32 {
338 self.schemas.iter().map(|s| s.schema_id).max().unwrap_or(-1) + 1
339 }
340
341 pub fn next_column_id(&self) -> i32 {
343 self.last_column_id + 1
344 }
345
346 pub fn next_snapshot_id(&self) -> i64 {
348 self.snapshots
349 .iter()
350 .map(|s| s.snapshot_id)
351 .max()
352 .unwrap_or(0)
353 + 1
354 }
355
356 pub fn set_current_schema(&mut self, schema_id: i32) -> Result<(), MetadataError> {
362 if self.schemas.iter().any(|s| s.schema_id == schema_id) {
363 self.current_schema_id = schema_id;
364 self.increment_sequence();
365 Ok(())
366 } else {
367 Err(MetadataError::SchemaNotFound(schema_id))
368 }
369 }
370
371 pub fn rollback_to(&mut self, snapshot_id: i64) -> Result<(), MetadataError> {
380 if self.snapshots.iter().any(|s| s.snapshot_id == snapshot_id) {
381 self.current_snapshot_id = Some(snapshot_id);
382 self.snapshot_log.push(SnapshotLogEntry {
383 snapshot_id,
384 timestamp_ms: Utc::now().timestamp_millis(),
385 operation: Operation::Replace,
386 });
387 self.increment_sequence();
388 Ok(())
389 } else {
390 Err(MetadataError::SnapshotNotFound(snapshot_id))
391 }
392 }
393
394 pub fn update_metrics(&mut self, added: &[DataFile], deleted_paths: &HashSet<String>) {
396 for file in added {
399 self.metrics.total_records += file.record_count;
400 self.metrics.total_files += 1;
401 self.metrics.total_size_bytes += file.file_size_in_bytes;
402 }
403
404 self.metrics.total_files -= deleted_paths.len() as i64;
409 }
410}
411
412#[derive(Debug, Clone, thiserror::Error)]
414pub enum MetadataError {
415 #[error("schema not found: {0}")]
417 SchemaNotFound(i32),
418
419 #[error("snapshot not found: {0}")]
421 SnapshotNotFound(i64),
422
423 #[error("conflict: expected sequence {expected}, found {actual}")]
425 ConflictError { expected: i64, actual: i64 },
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use crate::schema::Type;
432
433 fn sample_schema() -> Schema {
434 Schema::builder(0)
435 .with_field(1, "id", Type::Long, true)
436 .with_field(2, "data", Type::String, false)
437 .build()
438 }
439
440 #[test]
441 fn test_builder_creates_valid_metadata() {
442 let schema = sample_schema();
443 let metadata = TableMetadata::builder("s3://bucket/table", schema)
444 .with_property("owner", "test")
445 .build();
446
447 assert_eq!(metadata.format_version, FORMAT_VERSION);
448 assert_eq!(metadata.location, "s3://bucket/table");
449 assert_eq!(metadata.last_sequence_number, 0);
450 assert_eq!(metadata.current_schema_id, 0);
451 assert_eq!(metadata.properties.get("owner"), Some(&"test".to_string()));
452 assert!(metadata.current_snapshot_id.is_none());
453 }
454
455 #[test]
456 fn test_increment_sequence() {
457 let schema = sample_schema();
458 let mut metadata = TableMetadata::builder("s3://bucket/table", schema).build();
459
460 let initial_seq = metadata.last_sequence_number;
461 metadata.increment_sequence();
462
463 assert_eq!(metadata.last_sequence_number, initial_seq + 1);
464 }
465
466 #[test]
467 fn test_rollback_to_nonexistent_snapshot() {
468 let schema = sample_schema();
469 let mut metadata = TableMetadata::builder("s3://bucket/table", schema).build();
470
471 let result = metadata.rollback_to(999);
472 assert!(matches!(result, Err(MetadataError::SnapshotNotFound(999))));
473 }
474}