Skip to main content

supertable_core/
metadata.rs

1//! # SuperTable Metadata
2//!
3//! This module defines the core metadata structures for SuperTable, a next-generation
4//! open table format. The design is inspired by Apache Iceberg's metadata specification
5//! while adding modern enhancements for better concurrency control and auditability.
6//!
7//! ## Key Concepts
8//!
9//! - **TableMetadata**: The root metadata object containing all table state
10//! - **Snapshot**: An immutable point-in-time view of a table
11//! - **Schema**: The table's column definitions with evolution support
12//! - **PartitionSpec**: How the table is partitioned (planned)
13//!
14//! ## Concurrency Model
15//!
16//! SuperTable uses optimistic concurrency control (OCC) with compare-and-swap (CAS)
17//! semantics. Each metadata update increments the `last_sequence_number`, enabling
18//! conflict detection during concurrent writes.
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use std::collections::{HashMap, HashSet};
23use uuid::Uuid;
24
25use crate::DataFile;
26use crate::manifest::{Operation, Snapshot};
27use crate::partition::PartitionSpec;
28use crate::schema::Schema;
29
30/// The format version of SuperTable metadata.
31/// Increment this when making breaking changes to the metadata format.
32pub const FORMAT_VERSION: i32 = 1;
33
34/// The root metadata object for a SuperTable table.
35///
36/// This struct contains all information needed to understand the current state
37/// of a table, including its schema history, partition specs, snapshots, and
38/// properties.
39///
40/// # Concurrency
41///
42/// The `last_sequence_number` field is used for optimistic concurrency control.
43/// Each operation that modifies the table increments this counter, allowing
44/// the catalog to detect and reject conflicting concurrent updates.
45///
46/// # Example
47///
48/// ```rust,ignore
49/// use supercore::metadata::TableMetadata;
50/// use supercore::schema::{Schema, Field, Type};
51///
52/// let schema = Schema {
53///     schema_id: 0,
54///     fields: vec![
55///         Field { id: 1, name: "id".into(), required: true, field_type: Type::Long },
56///         Field { id: 2, name: "data".into(), required: false, field_type: Type::String },
57///     ],
58/// };
59///
60/// let metadata = TableMetadata::builder("s3://bucket/my_table", schema)
61///     .with_property("write.format.default", "parquet")
62///     .build();
63/// ```
64#[derive(Debug, Clone, Serialize, Deserialize)]
65#[serde(rename_all = "kebab-case")]
66pub struct TableMetadata {
67    /// The unique identifier for this table (UUID v4).
68    pub table_uuid: Uuid,
69
70    /// The format version of this metadata file.
71    /// Used for forward/backward compatibility checks.
72    pub format_version: i32,
73
74    /// The base location of the table (e.g., `s3://bucket/table`).
75    /// All data and metadata files are stored relative to this location.
76    pub location: String,
77
78    /// Monotonically increasing sequence number for optimistic concurrency.
79    /// Incremented on each metadata update operation.
80    pub last_sequence_number: i64,
81
82    /// Timestamp of the last update in milliseconds since epoch (UTC).
83    pub last_updated_ms: i64,
84
85    /// The highest assigned column ID across all schemas.
86    /// Used to ensure new columns get unique IDs during schema evolution.
87    pub last_column_id: i32,
88
89    /// The ID of the current (active) schema.
90    pub current_schema_id: i32,
91
92    /// List of all schemas, forming the schema history.
93    /// Schemas are immutable once added.
94    #[serde(default)]
95    pub schemas: Vec<Schema>,
96
97    /// The ID of the default partition spec.
98    #[serde(default)]
99    pub default_spec_id: i32,
100
101    /// List of partition specs.
102    #[serde(default)]
103    pub partition_specs: Vec<PartitionSpec>,
104
105    /// The ID of the current snapshot, or `None` if the table is empty.
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub current_snapshot_id: Option<i64>,
108
109    /// List of all snapshots, forming the table's history.
110    #[serde(default)]
111    pub snapshots: Vec<Snapshot>,
112
113    /// Log of snapshot changes for auditing and time travel.
114    #[serde(default)]
115    pub snapshot_log: Vec<SnapshotLogEntry>,
116
117    /// User-defined table properties.
118    /// Common properties include write format, compression, etc.
119    #[serde(default)]
120    pub properties: HashMap<String, String>,
121
122    /// Aggregate table metrics.
123    #[serde(default)]
124    pub metrics: TableMetrics,
125}
126
127/// Aggregate metrics for a table.
128#[derive(Debug, Clone, Serialize, Deserialize, Default)]
129#[serde(rename_all = "kebab-case")]
130pub struct TableMetrics {
131    pub total_records: i64,
132    pub total_files: i64,
133    pub total_size_bytes: i64,
134}
135
136/// A log entry recording a snapshot change.
137///
138/// This provides an audit trail of all operations that modified the table,
139/// enabling debugging, compliance, and time-travel queries.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142pub struct SnapshotLogEntry {
143    /// The ID of the snapshot that was added.
144    pub snapshot_id: i64,
145
146    /// Timestamp when this snapshot became current (ms since epoch).
147    pub timestamp_ms: i64,
148
149    /// The operation that created this snapshot change.
150    pub operation: Operation,
151}
152
153/// Builder for constructing `TableMetadata` instances.
154///
155/// Provides a fluent API for creating new tables with sensible defaults.
156pub struct TableMetadataBuilder {
157    location: String,
158    schema: Schema,
159    partition_spec: Option<PartitionSpec>,
160    properties: HashMap<String, String>,
161}
162
163impl TableMetadataBuilder {
164    /// Creates a new builder with the required location and schema.
165    pub fn new(location: impl Into<String>, schema: Schema) -> Self {
166        Self {
167            location: location.into(),
168            schema,
169            partition_spec: None,
170            properties: HashMap::new(),
171        }
172    }
173
174    /// Sets the partition spec.
175    pub fn with_partition_spec(mut self, spec: PartitionSpec) -> Self {
176        self.partition_spec = Some(spec);
177        self
178    }
179
180    /// Adds a property to the table.
181    pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
182        self.properties.insert(key.into(), value.into());
183        self
184    }
185
186    /// Adds multiple properties to the table.
187    pub fn with_properties(mut self, props: impl IntoIterator<Item = (String, String)>) -> Self {
188        self.properties.extend(props);
189        self
190    }
191
192    /// Builds the `TableMetadata` instance.
193    pub fn build(self) -> TableMetadata {
194        let now = Utc::now();
195        let last_column_id = self.schema.fields.iter().map(|f| f.id).max().unwrap_or(0);
196
197        let mut partition_specs = Vec::new();
198        if let Some(spec) = self.partition_spec {
199            partition_specs.push(spec);
200        }
201
202        TableMetadata {
203            table_uuid: Uuid::new_v4(),
204            format_version: FORMAT_VERSION,
205            location: self.location,
206            last_sequence_number: 0,
207            last_updated_ms: now.timestamp_millis(),
208            last_column_id,
209            current_schema_id: self.schema.schema_id,
210            schemas: vec![self.schema],
211            default_spec_id: 0,
212            partition_specs,
213            current_snapshot_id: None,
214            snapshots: Vec::new(),
215            snapshot_log: Vec::new(),
216            properties: self.properties,
217            metrics: TableMetrics::default(),
218        }
219    }
220}
221
222impl TableMetadata {
223    /// Creates a new `TableMetadataBuilder`.
224    pub fn builder(location: impl Into<String>, schema: Schema) -> TableMetadataBuilder {
225        TableMetadataBuilder::new(location, schema)
226    }
227
228    /// Returns the current schema.
229    ///
230    /// # Panics
231    ///
232    /// Panics if the schema list is empty or the current schema ID is invalid.
233    /// This should never happen with properly constructed metadata.
234    pub fn current_schema(&self) -> &Schema {
235        self.schemas
236            .iter()
237            .find(|s| s.schema_id == self.current_schema_id)
238            .expect("current schema must exist")
239    }
240
241    /// Returns the current snapshot, if any.
242    pub fn current_snapshot(&self) -> Option<&Snapshot> {
243        self.current_snapshot_id
244            .and_then(|id| self.snapshots.iter().find(|s| s.snapshot_id == id))
245    }
246
247    /// Returns a snapshot by its ID.
248    pub fn snapshot(&self, snapshot_id: i64) -> Option<&Snapshot> {
249        self.snapshots.iter().find(|s| s.snapshot_id == snapshot_id)
250    }
251
252    /// Returns the snapshot that was current at the given timestamp.
253    ///
254    /// This enables time-travel queries by finding the most recent snapshot
255    /// that was committed before the specified timestamp.
256    pub fn snapshot_at(&self, timestamp_ms: i64) -> Option<&Snapshot> {
257        // Find the most recent snapshot log entry at or before the timestamp
258        let entry = self
259            .snapshot_log
260            .iter()
261            .rev()
262            .find(|e| e.timestamp_ms <= timestamp_ms)?;
263
264        self.snapshot(entry.snapshot_id)
265    }
266
267    /// Returns the schema with the given ID.
268    pub fn schema(&self, schema_id: i32) -> Option<&Schema> {
269        self.schemas.iter().find(|s| s.schema_id == schema_id)
270    }
271
272    /// Returns the current partition spec.
273    pub fn current_partition_spec(&self) -> Option<&PartitionSpec> {
274        if self.partition_specs.is_empty() {
275            return None;
276        }
277        self.partition_specs
278            .iter()
279            .find(|s| s.spec_id == self.default_spec_id)
280    }
281
282    /// Adds a new partition spec and sets it as the default.
283    pub fn add_partition_spec(&mut self, mut spec: PartitionSpec) {
284        // Find next spec ID
285        let next_id = self
286            .partition_specs
287            .iter()
288            .map(|s| s.spec_id)
289            .max()
290            .unwrap_or(-1)
291            + 1;
292        spec.spec_id = next_id;
293
294        self.default_spec_id = next_id;
295        self.partition_specs.push(spec);
296        self.increment_sequence();
297    }
298
299    /// Increments the sequence number and updates the timestamp.
300    ///
301    /// This should be called before committing any metadata update.
302    pub fn increment_sequence(&mut self) {
303        self.last_sequence_number += 1;
304        self.last_updated_ms = Utc::now().timestamp_millis();
305    }
306
307    /// Adds a new snapshot to the table and makes it current.
308    ///
309    /// This also creates a snapshot log entry for auditing.
310    pub fn add_snapshot(&mut self, snapshot: Snapshot) {
311        let timestamp_ms = Utc::now().timestamp_millis();
312        let snapshot_id = snapshot.snapshot_id;
313        let operation = snapshot.operation;
314
315        self.snapshots.push(snapshot);
316        self.current_snapshot_id = Some(snapshot_id);
317        self.snapshot_log.push(SnapshotLogEntry {
318            snapshot_id,
319            timestamp_ms,
320            operation,
321        });
322        self.increment_sequence();
323    }
324
325    /// Adds a new schema to the table.
326    ///
327    /// The schema ID must be unique. Use `next_schema_id()` to generate one.
328    pub fn add_schema(&mut self, schema: Schema) {
329        // Update last_column_id if needed
330        if let Some(max_id) = schema.fields.iter().map(|f| f.id).max() {
331            self.last_column_id = self.last_column_id.max(max_id);
332        }
333        self.schemas.push(schema);
334    }
335
336    /// Returns the next available schema ID.
337    pub fn next_schema_id(&self) -> i32 {
338        self.schemas.iter().map(|s| s.schema_id).max().unwrap_or(-1) + 1
339    }
340
341    /// Returns the next available column ID.
342    pub fn next_column_id(&self) -> i32 {
343        self.last_column_id + 1
344    }
345
346    /// Returns the next available snapshot ID.
347    pub fn next_snapshot_id(&self) -> i64 {
348        self.snapshots
349            .iter()
350            .map(|s| s.snapshot_id)
351            .max()
352            .unwrap_or(0)
353            + 1
354    }
355
356    /// Sets the current schema by ID.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the schema ID doesn't exist.
361    pub fn set_current_schema(&mut self, schema_id: i32) -> Result<(), MetadataError> {
362        if self.schemas.iter().any(|s| s.schema_id == schema_id) {
363            self.current_schema_id = schema_id;
364            self.increment_sequence();
365            Ok(())
366        } else {
367            Err(MetadataError::SchemaNotFound(schema_id))
368        }
369    }
370
371    /// Rolls back the table to a previous snapshot.
372    ///
373    /// This creates a new snapshot log entry pointing to the old snapshot,
374    /// effectively making it current again without removing history.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the snapshot ID doesn't exist.
379    pub fn rollback_to(&mut self, snapshot_id: i64) -> Result<(), MetadataError> {
380        if self.snapshots.iter().any(|s| s.snapshot_id == snapshot_id) {
381            self.current_snapshot_id = Some(snapshot_id);
382            self.snapshot_log.push(SnapshotLogEntry {
383                snapshot_id,
384                timestamp_ms: Utc::now().timestamp_millis(),
385                operation: Operation::Replace,
386            });
387            self.increment_sequence();
388            Ok(())
389        } else {
390            Err(MetadataError::SnapshotNotFound(snapshot_id))
391        }
392    }
393
394    /// Updates the aggregate metrics based on added and deleted files.
395    pub fn update_metrics(&mut self, added: &[DataFile], deleted_paths: &HashSet<String>) {
396        // This is a simplified incremental update.
397        // In a real system, we might want to re-scan periodically to ensure accuracy.
398        for file in added {
399            self.metrics.total_records += file.record_count;
400            self.metrics.total_files += 1;
401            self.metrics.total_size_bytes += file.file_size_in_bytes;
402        }
403
404        // Deletion metrics are harder because we'd need to know the size/count of the deleted files.
405        // For this prototype, we'll assume the caller doesn't need perfect deletion tracking
406        // or we'd need to look up the old file metadata.
407        // Let's just decrement file count at minimum.
408        self.metrics.total_files -= deleted_paths.len() as i64;
409    }
410}
411
412/// Errors that can occur during metadata operations.
413#[derive(Debug, Clone, thiserror::Error)]
414pub enum MetadataError {
415    /// The requested schema was not found.
416    #[error("schema not found: {0}")]
417    SchemaNotFound(i32),
418
419    /// The requested snapshot was not found.
420    #[error("snapshot not found: {0}")]
421    SnapshotNotFound(i64),
422
423    /// A conflict occurred during a concurrent update.
424    #[error("conflict: expected sequence {expected}, found {actual}")]
425    ConflictError { expected: i64, actual: i64 },
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use crate::schema::Type;
432
433    fn sample_schema() -> Schema {
434        Schema::builder(0)
435            .with_field(1, "id", Type::Long, true)
436            .with_field(2, "data", Type::String, false)
437            .build()
438    }
439
440    #[test]
441    fn test_builder_creates_valid_metadata() {
442        let schema = sample_schema();
443        let metadata = TableMetadata::builder("s3://bucket/table", schema)
444            .with_property("owner", "test")
445            .build();
446
447        assert_eq!(metadata.format_version, FORMAT_VERSION);
448        assert_eq!(metadata.location, "s3://bucket/table");
449        assert_eq!(metadata.last_sequence_number, 0);
450        assert_eq!(metadata.current_schema_id, 0);
451        assert_eq!(metadata.properties.get("owner"), Some(&"test".to_string()));
452        assert!(metadata.current_snapshot_id.is_none());
453    }
454
455    #[test]
456    fn test_increment_sequence() {
457        let schema = sample_schema();
458        let mut metadata = TableMetadata::builder("s3://bucket/table", schema).build();
459
460        let initial_seq = metadata.last_sequence_number;
461        metadata.increment_sequence();
462
463        assert_eq!(metadata.last_sequence_number, initial_seq + 1);
464    }
465
466    #[test]
467    fn test_rollback_to_nonexistent_snapshot() {
468        let schema = sample_schema();
469        let mut metadata = TableMetadata::builder("s3://bucket/table", schema).build();
470
471        let result = metadata.rollback_to(999);
472        assert!(matches!(result, Err(MetadataError::SnapshotNotFound(999))));
473    }
474}