Skip to main content

timeseries_table_core/
transaction_log.rs

1//! Append-only metadata log and table state.
2//!
3//! This module implements the Delta-inspired metadata layer for
4//! `timeseries-table-format` and defines the logical metadata model
5//! written to and read from the `_timeseries_log/` directory.
6//!
7//! - A simple append-only commit log stored as JSON files under a
8//!   `_timeseries_log/` directory (for example, `_timeseries_log/0000000000.json`).
9//! - A `CURRENT` pointer that tracks the latest committed table version.
10//! - Strongly-typed metadata structures such as `TableMeta`,
11//!   `TableKind`, `TimeIndexSpec`, `SegmentMeta`, and `LogAction`.
12//! - An optimistic concurrency model based on version guards, so that
13//!   commits fail cleanly with a conflict error when the expected
14//!   version does not match the current version.
15//! - A `TableState` representation materialized from the log, which
16//!   describes the current table version, metadata, and active segments.
17//!
18//! The log is designed to be:
19//!
20//! - **Append-only**: commits never mutate existing files.
21//! - **Monotonically versioned**: versions are `u64` values that only
22//!   increase, enforced by the commit API.
23//! - **Human-inspectable**: JSON commits and a small set of actions
24//!   make it easy to debug with basic tools.
25//!
26//! ## On-disk layout (high level)
27//!
28//! ```text
29//! table_root/
30//!   _timeseries_log/
31//!     CURRENT                  # latest committed version (e.g. "3\n")
32//!     0000000001.json          # Commit version 1
33//!     0000000002.json          # Commit version 2
34//!     0000000003.json          # Commit version 3
35//!   data/                      # Parquet segments live here (convention for now)
36//! ```
37//!
38//! Each `*.json` file contains a single [`Commit`] value, encoded as JSON. For
39//! example:
40//!
41//! ```json
42//! {
43//!   "version": 1,
44//!   "base_version": 0,
45//!   "timestamp": "2025-01-01T00:00:00Z",
46//!   "actions": [
47//!     {
48//!       "AddSegment": {
49//!         "segment_id": "seg-0001",
50//!         "path": "data/nvda_1h_0001.parquet",
51//!         "ts_min": "2020-01-01T00:00:00Z",
52//!         "ts_max": "2020-01-02T00:00:00Z",
53//!         "row_count": 1024,
54//!         "format": "parquet"
55//!       }
56//!     }
57//!   ]
58//! }
59//! ```
60//!
61//! In v0.1 the log is strictly append-only, and table state is reconstructed by
62//! replaying every commit up to the version referenced by `CURRENT`. This module
63//! does not know about query engines; it only provides the persisted metadata
64//! and an API for committing changes safely.
65pub mod actions;
66pub mod log_store;
67pub mod segments;
68pub mod table_state;
69
70pub use crate::metadata::table_metadata::{
71    TableKind, TableMeta, TableMetaDelta, TimeBucket, TimeIndexSpec,
72};
73pub use actions::{Commit, LogAction};
74pub use log_store::TransactionLogStore;
75pub use segments::{FileFormat, SegmentId, SegmentMeta};
76pub use table_state::TableState;
77
78use snafu::{Backtrace, prelude::*};
79
80use crate::storage::StorageError;
81
82/// Errors that can occur while reading or writing the commit log.
83#[derive(Debug, Snafu)]
84pub enum CommitError {
85    /// The caller's expected_version does not match the CURRENT pointer.
86    #[snafu(display("Commit conflict: expected version {expected}, but CURRENT is {found}"))]
87    Conflict {
88        /// The version the caller expected to be current.
89        expected: u64,
90        /// The actual current version found.
91        found: u64,
92        /// Backtrace for debugging.
93        backtrace: Backtrace,
94    },
95
96    /// Underlying storage error while working with the log or CURRENT file.
97    ///
98    /// Backtraces are delegated to the inner StorageError.
99    #[snafu(display("Storage error while accessing commit log: {source}"))]
100    Storage {
101        /// Underlying storage error returned by the storage backend.
102        #[snafu(backtrace)]
103        source: StorageError,
104    },
105
106    /// The log or CURRENT file is in an unexpected / malformed state.
107    #[snafu(display("Corrupt log state: {msg}"))]
108    CorruptState {
109        /// A description of the corrupt state.
110        msg: String,
111        /// Backtrace for debugging.
112        backtrace: Backtrace,
113    },
114}
115
116#[cfg(test)]
117mod tests {
118    use crate::metadata::logical_schema::{
119        LogicalDataType, LogicalField, LogicalSchema, LogicalSchemaError, LogicalTimestampUnit,
120    };
121    use crate::transaction_log::*;
122
123    use chrono::{DateTime, TimeZone, Utc};
124    use serde_json;
125
126    // ==================== Serialization tests ====================
127
128    fn utc_datetime(
129        year: i32,
130        month: u32,
131        day: u32,
132        hour: u32,
133        minute: u32,
134        second: u32,
135    ) -> DateTime<Utc> {
136        Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
137            .single()
138            .expect("valid UTC timestamp")
139    }
140
141    #[test]
142    fn commit_json_roundtrip() {
143        let ts0 = utc_datetime(2025, 1, 1, 0, 0, 0);
144        let ts1 = utc_datetime(2025, 1, 1, 1, 0, 0);
145
146        let time_index = TimeIndexSpec {
147            timestamp_column: "ts".to_string(),
148            entity_columns: vec!["symbol".to_string()],
149            bucket: TimeBucket::Minutes(60),
150            timezone: Some("UTC".to_string()),
151        };
152
153        let table_meta = TableMeta {
154            kind: TableKind::TimeSeries(time_index),
155            logical_schema: Some(
156                LogicalSchema::new(vec![
157                    LogicalField {
158                        name: "ts".to_string(),
159                        data_type: LogicalDataType::Timestamp {
160                            unit: LogicalTimestampUnit::Micros,
161                            timezone: None,
162                        },
163                        nullable: false,
164                    },
165                    LogicalField {
166                        name: "symbol".to_string(),
167                        data_type: LogicalDataType::Utf8,
168                        nullable: false,
169                    },
170                ])
171                .expect("valid logical schema"),
172            ),
173            created_at: ts0,
174            format_version: 1,
175            entity_identity: None,
176        };
177
178        let seg_meta = SegmentMeta {
179            segment_id: SegmentId("seg-0001".to_string()),
180            path: "data/nvda_1h_0001.parquet".to_string(),
181            format: FileFormat::Parquet,
182            ts_min: ts0,
183            ts_max: ts1,
184            row_count: 1024,
185            file_size: None,
186            coverage_path: None,
187        };
188
189        let commit = Commit {
190            version: 1,
191            base_version: 0,
192            timestamp: ts1,
193            actions: vec![
194                LogAction::UpdateTableMeta(table_meta),
195                LogAction::AddSegment(seg_meta),
196            ],
197        };
198
199        // Serialize to JSON.
200        let json = serde_json::to_string_pretty(&commit).expect("serialize commit");
201        // println!("{json}");
202
203        // Deserialize back.
204        let decoded: Commit = serde_json::from_str(&json).expect("deserialize commit");
205
206        // Round-trip equality.
207        assert_eq!(commit, decoded);
208    }
209
210    #[test]
211    fn logical_schema_rejects_duplicate_columns() {
212        let dup = LogicalSchema::new(vec![
213            LogicalField {
214                name: "ts".to_string(),
215                data_type: LogicalDataType::Timestamp {
216                    unit: LogicalTimestampUnit::Micros,
217                    timezone: None,
218                },
219                nullable: false,
220            },
221            LogicalField {
222                name: "ts".to_string(),
223                data_type: LogicalDataType::Timestamp {
224                    unit: LogicalTimestampUnit::Micros,
225                    timezone: None,
226                },
227                nullable: false,
228            },
229        ]);
230
231        let err = dup.expect_err("duplicate columns should be rejected");
232        assert!(matches!(err, LogicalSchemaError::DuplicateColumn { column } if column == "ts"));
233    }
234
235    #[test]
236    fn time_index_spec_defaults() {
237        // JSON with optional fields omitted.
238        let json = r#"{
239            "timestamp_column": "ts",
240            "bucket": { "Hours": 1 }
241        }"#;
242
243        let spec: TimeIndexSpec = serde_json::from_str(json).expect("deserialize");
244
245        assert_eq!(spec.timestamp_column, "ts");
246        assert_eq!(spec.entity_columns, Vec::<String>::new()); // default
247        assert_eq!(spec.bucket, TimeBucket::Hours(1));
248        assert_eq!(spec.timezone, None); // default
249    }
250
251    #[test]
252    fn time_index_spec_skips_none_timezone_on_serialize() {
253        let spec = TimeIndexSpec {
254            timestamp_column: "ts".to_string(),
255            entity_columns: vec![],
256            bucket: TimeBucket::Seconds(30),
257            timezone: None,
258        };
259
260        let json = serde_json::to_string(&spec).expect("serialize");
261
262        // "timezone" key should be absent.
263        assert!(!json.contains("timezone"));
264    }
265
266    #[test]
267    fn logical_column_nullable_requires_explicit_value() {
268        let json = r#"{ "name": "price", "data_type": "Float64" }"#;
269
270        let err = serde_json::from_str::<LogicalField>(json).unwrap_err();
271        assert!(
272            err.to_string().contains("missing field `nullable`"),
273            "unexpected error: {err}"
274        );
275    }
276
277    #[test]
278    fn table_kind_generic_roundtrip() {
279        let kind = TableKind::Generic;
280        let json = serde_json::to_string(&kind).expect("serialize");
281        let decoded: TableKind = serde_json::from_str(&json).expect("deserialize");
282
283        assert_eq!(kind, decoded);
284        assert_eq!(json, r#""Generic""#);
285    }
286
287    #[test]
288    fn all_time_bucket_variants_roundtrip() {
289        let buckets = vec![
290            TimeBucket::Seconds(15),
291            TimeBucket::Minutes(5),
292            TimeBucket::Hours(24),
293            TimeBucket::Days(7),
294        ];
295
296        for bucket in buckets {
297            let json = serde_json::to_string(&bucket).expect("serialize");
298            let decoded: TimeBucket = serde_json::from_str(&json).expect("deserialize");
299            assert_eq!(bucket, decoded);
300        }
301    }
302
303    #[test]
304    fn file_format_serializes_lowercase() {
305        let format = FileFormat::Parquet;
306        let json = serde_json::to_string(&format).expect("serialize");
307
308        assert_eq!(json, r#""parquet""#);
309
310        // Also verify round-trip.
311        let decoded: FileFormat = serde_json::from_str(&json).expect("deserialize");
312        assert_eq!(format, decoded);
313    }
314
315    #[test]
316    fn file_format_default_is_parquet() {
317        assert_eq!(FileFormat::default(), FileFormat::Parquet);
318    }
319
320    #[test]
321    fn remove_segment_action_roundtrip() {
322        let action = LogAction::RemoveSegment {
323            segment_id: SegmentId("seg-to-remove".to_string()),
324        };
325
326        let json = serde_json::to_string(&action).expect("serialize");
327        let decoded: LogAction = serde_json::from_str(&json).expect("deserialize");
328
329        assert_eq!(action, decoded);
330    }
331
332    #[test]
333    fn commit_with_empty_actions() {
334        let ts = utc_datetime(2025, 6, 15, 12, 0, 0);
335
336        let commit = Commit {
337            version: 1,
338            base_version: 0,
339            timestamp: ts,
340            actions: vec![],
341        };
342
343        let json = serde_json::to_string(&commit).expect("serialize");
344        let decoded: Commit = serde_json::from_str(&json).expect("deserialize");
345
346        assert_eq!(commit, decoded);
347        assert!(decoded.actions.is_empty());
348    }
349
350    #[test]
351    fn segment_id_transparent_serialization() {
352        let id = SegmentId("my-segment".to_string());
353        let json = serde_json::to_string(&id).expect("serialize");
354
355        // Should be a plain string, not {"0": "my-segment"}.
356        assert_eq!(json, r#""my-segment""#);
357
358        let decoded: SegmentId = serde_json::from_str(&json).expect("deserialize");
359        assert_eq!(id, decoded);
360    }
361}