Skip to main content

supertable_core/
manifest.rs

1//! # SuperTable Manifest and Snapshot Management
2//!
3//! This module defines the structures for tracking table state and file inventory.
4//! The design follows Apache Iceberg's snapshot-based model, where each snapshot
5//! represents an immutable, point-in-time view of the table.
6//!
7//! ## Key Concepts
8//!
9//! - **Snapshot**: A complete view of the table at a point in time
10//! - **Manifest List**: A file listing all manifest files for a snapshot
11//! - **Manifest File**: A file listing data files with their metadata
12//! - **Data File**: An actual Parquet file containing table data
13//!
14//! ## Immutability
15//!
16//! Once committed, snapshots are immutable. Updates create new snapshots that
17//! reference both new and existing data files. This enables:
18//! - Time travel (querying historical states)
19//! - Atomic commits (all-or-nothing updates)
20//! - Concurrent reads (readers see consistent snapshots)
21
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24
25/// The type of operation that created a snapshot.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum Operation {
29    /// New data was appended to the table.
30    Append,
31    /// Existing data was replaced (partition overwrite).
32    Overwrite,
33    /// Some data was deleted.
34    Delete,
35    /// An existing snapshot was restored.
36    Replace,
37}
38
39impl std::fmt::Display for Operation {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            Operation::Append => write!(f, "append"),
43            Operation::Overwrite => write!(f, "overwrite"),
44            Operation::Delete => write!(f, "delete"),
45            Operation::Replace => write!(f, "replace"),
46        }
47    }
48}
49
50/// A snapshot represents an immutable view of the table at a point in time.
51///
52/// Each snapshot contains a reference to a manifest list, which in turn
53/// references all the data files that make up the table at that point.
54///
55/// # Lineage
56///
57/// Snapshots form a tree structure through parent references, allowing
58/// tracking of table lineage and enabling features like rollback.
59#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61pub struct Snapshot {
62    /// Unique identifier for this snapshot.
63    pub snapshot_id: i64,
64
65    /// The ID of the parent snapshot, if any.
66    /// The first snapshot has no parent (None).
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub parent_snapshot_id: Option<i64>,
69
70    /// Monotonically increasing sequence number.
71    /// Used to order operations across snapshots.
72    pub sequence_number: i64,
73
74    /// Timestamp when this snapshot was created (ms since epoch).
75    pub timestamp_ms: i64,
76
77    /// The operation that created this snapshot.
78    pub operation: Operation,
79
80    /// Path to the manifest list file for this snapshot.
81    pub manifest_list: String,
82
83    /// Summary statistics and operation metadata.
84    #[serde(default)]
85    pub summary: HashMap<String, String>,
86
87    /// The schema ID used for this snapshot's data.
88    pub schema_id: i32,
89}
90
91impl Snapshot {
92    /// Creates a new snapshot builder.
93    pub fn builder(snapshot_id: i64, manifest_list: impl Into<String>) -> SnapshotBuilder {
94        SnapshotBuilder::new(snapshot_id, manifest_list)
95    }
96
97    /// Returns the value of a summary property.
98    pub fn summary_property(&self, key: &str) -> Option<&str> {
99        self.summary.get(key).map(|s| s.as_str())
100    }
101
102    /// Returns the added files count from the summary.
103    pub fn added_files_count(&self) -> Option<i64> {
104        self.summary_property("added-data-files")
105            .and_then(|s| s.parse().ok())
106    }
107
108    /// Returns the added records count from the summary.
109    pub fn added_records_count(&self) -> Option<i64> {
110        self.summary_property("added-records")
111            .and_then(|s| s.parse().ok())
112    }
113
114    /// Returns the total files count from the summary.
115    pub fn total_files_count(&self) -> Option<i64> {
116        self.summary_property("total-data-files")
117            .and_then(|s| s.parse().ok())
118    }
119
120    /// Returns the total records count from the summary.
121    pub fn total_records_count(&self) -> Option<i64> {
122        self.summary_property("total-records")
123            .and_then(|s| s.parse().ok())
124    }
125
126    /// Loads all data and delete files for this snapshot.
127    ///
128    /// Returns a tuple of (data_files, delete_files).
129    pub async fn all_files(
130        &self,
131        storage: &crate::storage::Storage,
132    ) -> anyhow::Result<(Vec<DataFile>, Vec<DataFile>)> {
133        let manifest_list = ManifestList::load(&self.manifest_list, storage).await?;
134        let mut data_files = Vec::new();
135        let mut delete_files = Vec::new();
136
137        for entry in manifest_list.entries {
138            let manifest = ManifestFile::load(&entry.manifest_path, storage).await?;
139            for m_entry in manifest.entries {
140                if m_entry.status != ManifestEntryStatus::Deleted {
141                    match m_entry.data_file.content {
142                        FileContent::Data => data_files.push(m_entry.data_file),
143                        FileContent::PositionDeletes | FileContent::EqualityDeletes => {
144                            delete_files.push(m_entry.data_file)
145                        }
146                    }
147                }
148            }
149        }
150
151        Ok((data_files, delete_files))
152    }
153
154    /// Loads all data files for this snapshot.
155    pub async fn all_data_files(
156        &self,
157        storage: &crate::storage::Storage,
158    ) -> anyhow::Result<Vec<DataFile>> {
159        let (data_files, _) = self.all_files(storage).await?;
160        Ok(data_files)
161    }
162}
163
164/// Builder for constructing `Snapshot` instances.
165pub struct SnapshotBuilder {
166    snapshot_id: i64,
167    manifest_list: String,
168    parent_snapshot_id: Option<i64>,
169    sequence_number: i64,
170    timestamp_ms: i64,
171    operation: Operation,
172    summary: HashMap<String, String>,
173    schema_id: i32,
174}
175
176impl SnapshotBuilder {
177    /// Creates a new builder with required fields.
178    pub fn new(snapshot_id: i64, manifest_list: impl Into<String>) -> Self {
179        Self {
180            snapshot_id,
181            manifest_list: manifest_list.into(),
182            parent_snapshot_id: None,
183            sequence_number: 0,
184            timestamp_ms: chrono::Utc::now().timestamp_millis(),
185            operation: Operation::Append,
186            summary: HashMap::new(),
187            schema_id: 0,
188        }
189    }
190
191    /// Sets the parent snapshot ID.
192    pub fn with_parent(mut self, parent_id: i64) -> Self {
193        self.parent_snapshot_id = Some(parent_id);
194        self
195    }
196
197    /// Sets the sequence number.
198    pub fn with_sequence_number(mut self, seq: i64) -> Self {
199        self.sequence_number = seq;
200        self
201    }
202
203    /// Sets the operation type.
204    pub fn with_operation(mut self, op: Operation) -> Self {
205        self.operation = op;
206        self
207    }
208
209    /// Sets the schema ID.
210    pub fn with_schema_id(mut self, schema_id: i32) -> Self {
211        self.schema_id = schema_id;
212        self
213    }
214
215    /// Adds a summary property.
216    pub fn with_summary_property(
217        mut self,
218        key: impl Into<String>,
219        value: impl Into<String>,
220    ) -> Self {
221        self.summary.insert(key.into(), value.into());
222        self
223    }
224
225    /// Builds the snapshot.
226    pub fn build(self) -> Snapshot {
227        Snapshot {
228            snapshot_id: self.snapshot_id,
229            parent_snapshot_id: self.parent_snapshot_id,
230            sequence_number: self.sequence_number,
231            timestamp_ms: self.timestamp_ms,
232            operation: self.operation,
233            manifest_list: self.manifest_list,
234            summary: self.summary,
235            schema_id: self.schema_id,
236        }
237    }
238}
239
240/// A manifest list contains references to all manifest files for a snapshot.
241#[derive(Debug, Clone, Serialize, Deserialize)]
242#[serde(rename_all = "kebab-case")]
243pub struct ManifestList {
244    /// The list of manifest file entries.
245    pub entries: Vec<ManifestFileEntry>,
246}
247
248impl ManifestList {
249    /// Creates a new empty manifest list.
250    pub fn new() -> Self {
251        Self {
252            entries: Vec::new(),
253        }
254    }
255
256    /// Adds a manifest file entry.
257    pub fn add_entry(&mut self, entry: ManifestFileEntry) {
258        self.entries.push(entry);
259    }
260
261    /// Returns the total number of data files across all manifests.
262    pub fn total_data_files(&self) -> i64 {
263        self.entries
264            .iter()
265            .map(|e| e.added_files_count + e.existing_files_count)
266            .sum()
267    }
268
269    /// Loads a manifest list from storage.
270    pub async fn load(path: &str, storage: &crate::storage::Storage) -> anyhow::Result<Self> {
271        let data = storage.read(path).await?;
272        Ok(serde_json::from_slice(&data)?)
273    }
274
275    /// Saves a manifest list to storage.
276    pub async fn save(&self, path: &str, storage: &crate::storage::Storage) -> anyhow::Result<()> {
277        let data = serde_json::to_vec(self)?;
278        storage.write(path, data.into()).await?;
279        Ok(())
280    }
281}
282
283impl Default for ManifestList {
284    fn default() -> Self {
285        Self::new()
286    }
287}
288
289/// An entry in a manifest list, referencing a single manifest file.
290#[derive(Debug, Clone, Serialize, Deserialize)]
291#[serde(rename_all = "kebab-case")]
292pub struct ManifestFileEntry {
293    /// Path to the manifest file.
294    pub manifest_path: String,
295
296    /// Length of the manifest file in bytes.
297    pub manifest_length: i64,
298
299    /// The partition spec ID used to write the manifest.
300    pub partition_spec_id: i32,
301
302    /// The content type of files in this manifest.
303    pub content: ManifestContent,
304
305    /// Sequence number when this manifest was added.
306    pub sequence_number: i64,
307
308    /// Minimum sequence number of any file in this manifest.
309    pub min_sequence_number: i64,
310
311    /// The snapshot ID that added this manifest.
312    pub added_snapshot_id: i64,
313
314    /// Number of files added in this manifest.
315    pub added_files_count: i64,
316
317    /// Number of existing (unchanged) files in this manifest.
318    pub existing_files_count: i64,
319
320    /// Number of files deleted in this manifest.
321    pub deleted_files_count: i64,
322
323    /// Number of rows added across all files in this manifest.
324    pub added_rows_count: i64,
325
326    /// Number of existing rows in this manifest.
327    pub existing_rows_count: i64,
328
329    /// Number of rows deleted in this manifest.
330    pub deleted_rows_count: i64,
331}
332
333/// The type of content in a manifest file.
334#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
335#[serde(rename_all = "lowercase")]
336pub enum ManifestContent {
337    /// Data files containing table records.
338    Data,
339    /// Delete files containing deleted record identifiers.
340    Deletes,
341}
342
343/// A manifest file contains a list of data files.
344#[derive(Debug, Clone, Serialize, Deserialize)]
345#[serde(rename_all = "kebab-case")]
346pub struct ManifestFile {
347    /// The schema used to write this manifest.
348    pub schema: crate::schema::Schema,
349
350    /// The partition spec ID.
351    pub partition_spec_id: i32,
352
353    /// The content type.
354    pub content: ManifestContent,
355
356    /// The entries in this manifest.
357    pub entries: Vec<ManifestEntry>,
358}
359
360impl ManifestFile {
361    /// Loads a manifest file from storage.
362    pub async fn load(path: &str, storage: &crate::storage::Storage) -> anyhow::Result<Self> {
363        let data = storage.read(path).await?;
364        Ok(serde_json::from_slice(&data)?)
365    }
366
367    /// Saves a manifest file to storage.
368    pub async fn save(&self, path: &str, storage: &crate::storage::Storage) -> anyhow::Result<()> {
369        let data = serde_json::to_vec(self)?;
370        storage.write(path, data.into()).await?;
371        Ok(())
372    }
373}
374
375/// An entry in a manifest file, representing a single data file.
376#[derive(Debug, Clone, Serialize, Deserialize)]
377#[serde(rename_all = "kebab-case")]
378pub struct ManifestEntry {
379    /// The status of this entry.
380    pub status: ManifestEntryStatus,
381
382    /// The snapshot ID that added this file.
383    #[serde(skip_serializing_if = "Option::is_none")]
384    pub snapshot_id: Option<i64>,
385
386    /// The sequence number when this file was added.
387    #[serde(skip_serializing_if = "Option::is_none")]
388    pub sequence_number: Option<i64>,
389
390    /// The data file metadata.
391    pub data_file: DataFile,
392}
393
394/// The status of a manifest entry.
395#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
396#[serde(rename_all = "lowercase")]
397pub enum ManifestEntryStatus {
398    /// The file exists in this snapshot.
399    Existing,
400    /// The file was added in this snapshot.
401    Added,
402    /// The file was deleted in this snapshot.
403    Deleted,
404}
405
406/// Supported data file formats.
407#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
408#[serde(rename_all = "lowercase")]
409pub enum FileFormat {
410    /// Apache Parquet format.
411    Parquet,
412    /// Apache ORC format (planned).
413    Orc,
414    /// Apache Avro format (planned).
415    Avro,
416}
417
418/// Type of content stored in a file.
419#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
420#[serde(rename_all = "kebab-case")]
421pub enum FileContent {
422    /// Data files containing table records.
423    Data,
424    /// Position delete files containing file paths and row positions.
425    PositionDeletes,
426    /// Equality delete files containing column values for deleted rows.
427    EqualityDeletes,
428}
429
430/// Metadata about a data or delete file.
431#[derive(Debug, Clone, Serialize, Deserialize)]
432#[serde(rename_all = "kebab-case")]
433pub struct DataFile {
434    /// Path to the file.
435    pub file_path: String,
436
437    /// The file format.
438    pub file_format: FileFormat,
439
440    /// The type of content in this file.
441    pub content: FileContent,
442
443    /// The partition data for this file.
444    #[serde(default)]
445    pub partition: HashMap<String, serde_json::Value>,
446
447    /// Number of records in this file.
448    pub record_count: i64,
449
450    /// File size in bytes.
451    pub file_size_in_bytes: i64,
452
453    /// Column sizes (column_id -> size in bytes).
454    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
455    pub column_sizes: HashMap<i32, i64>,
456
457    /// Value counts (column_id -> non-null value count).
458    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
459    pub value_counts: HashMap<i32, i64>,
460
461    /// Null value counts (column_id -> null value count).
462    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
463    pub null_value_counts: HashMap<i32, i64>,
464
465    /// NaN value counts (column_id -> NaN count).
466    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
467    pub nan_value_counts: HashMap<i32, i64>,
468
469    /// Lower bounds (column_id -> serialized lower bound).
470    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
471    pub lower_bounds: HashMap<i32, Vec<u8>>,
472
473    /// Upper bounds (column_id -> serialized upper bound).
474    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
475    pub upper_bounds: HashMap<i32, Vec<u8>>,
476
477    /// Column-level statistics.
478    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
479    pub statistics: HashMap<i32, crate::statistics::ColumnStats>,
480
481    /// Sort order ID, if the file is sorted.
482    #[serde(skip_serializing_if = "Option::is_none")]
483    pub sort_order_id: Option<i32>,
484}
485
486impl DataFile {
487    /// Creates a new data file with minimal required fields.
488    pub fn new(
489        file_path: impl Into<String>,
490        file_format: FileFormat,
491        record_count: i64,
492        file_size_in_bytes: i64,
493    ) -> Self {
494        Self {
495            file_path: file_path.into(),
496            file_format,
497            content: FileContent::Data,
498            partition: HashMap::new(),
499            record_count,
500            file_size_in_bytes,
501            column_sizes: HashMap::new(),
502            value_counts: HashMap::new(),
503            null_value_counts: HashMap::new(),
504            nan_value_counts: HashMap::new(),
505            lower_bounds: HashMap::new(),
506            upper_bounds: HashMap::new(),
507            statistics: HashMap::new(),
508            sort_order_id: None,
509        }
510    }
511
512    /// Sets the partition data for this file.
513    pub fn with_partition(mut self, partition: HashMap<String, serde_json::Value>) -> Self {
514        self.partition = partition;
515        self
516    }
517
518    /// Adds column statistics.
519    pub fn with_column_stats(
520        mut self,
521        column_id: i32,
522        size: i64,
523        value_count: i64,
524        null_count: i64,
525    ) -> Self {
526        self.column_sizes.insert(column_id, size);
527        self.value_counts.insert(column_id, value_count);
528        self.null_value_counts.insert(column_id, null_count);
529        self
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536
537    #[test]
538    fn test_snapshot_builder() {
539        let snapshot = Snapshot::builder(1, "/manifest-list.avro")
540            .with_operation(Operation::Append)
541            .with_sequence_number(5)
542            .with_summary_property("added-data-files", "10")
543            .with_summary_property("added-records", "1000")
544            .build();
545
546        assert_eq!(snapshot.snapshot_id, 1);
547        assert_eq!(snapshot.operation, Operation::Append);
548        assert_eq!(snapshot.sequence_number, 5);
549        assert_eq!(snapshot.added_files_count(), Some(10));
550        assert_eq!(snapshot.added_records_count(), Some(1000));
551    }
552
553    #[test]
554    fn test_manifest_list() {
555        let mut list = ManifestList::new();
556        list.add_entry(ManifestFileEntry {
557            manifest_path: "/manifest-1.avro".into(),
558            manifest_length: 1024,
559            partition_spec_id: 0,
560            content: ManifestContent::Data,
561            sequence_number: 1,
562            min_sequence_number: 1,
563            added_snapshot_id: 1,
564            added_files_count: 5,
565            existing_files_count: 0,
566            deleted_files_count: 0,
567            added_rows_count: 500,
568            existing_rows_count: 0,
569            deleted_rows_count: 0,
570        });
571
572        assert_eq!(list.total_data_files(), 5);
573    }
574
575    #[test]
576    fn test_data_file() {
577        let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024)
578            .with_column_stats(1, 512 * 1024, 1000, 0);
579
580        assert_eq!(file.record_count, 1000);
581        assert_eq!(file.column_sizes.get(&1), Some(&(512 * 1024)));
582    }
583}