Skip to main content

timeseries_table_core/transaction_log/
table_state.rs

1//! Reconstructing the current table state by replaying log commits.
2//!
3//! `TableState` materializes the metadata stored in `_timeseries_log/` and the
4//! [`LogStore::rebuild_table_state`] helper walks all commits from version 1 up
5//! to the `CURRENT` pointer, applying their actions in order. This keeps read
6//! logic isolated from the append-only write path and documents the invariant
7//! that table readers must see a state consistent with the latest committed
8//! version.
9use std::collections::HashMap;
10
11#[cfg(feature = "test-counters")]
12use std::cell::Cell;
13
14#[cfg(feature = "test-counters")]
15thread_local! {
16    static REBUILD_TABLE_STATE_COUNT: Cell<usize> = const { Cell::new(0) };
17}
18
19#[cfg(feature = "test-counters")]
20/// Return the number of rebuilds invoked on the current thread (test-only).
21pub fn rebuild_table_state_count() -> usize {
22    REBUILD_TABLE_STATE_COUNT.with(|c| c.get())
23}
24
25#[cfg(feature = "test-counters")]
26/// Reset the rebuild counter to zero (test-only).
27pub fn reset_rebuild_table_state_count() {
28    REBUILD_TABLE_STATE_COUNT.with(|c| c.set(0));
29}
30
31use crate::{metadata::segments::cmp_segment_meta_by_time, transaction_log::*};
32
33/// Pointer to table coverage metadata including bucket specification, path, and version.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct TableCoveragePointer {
36    /// Time bucket specification for the coverage metadata.
37    pub bucket_spec: TimeBucket,
38    /// Path to the coverage metadata file.
39    pub coverage_path: String,
40    /// Version number associated with this coverage pointer.
41    pub version: u64,
42}
43
44/// In-memory view of table metadata and live segments, reconstructed from the log.
45///
46/// Invariant:
47/// - `version` matches the CURRENT pointer.
48/// - `table_meta` and `segments` are the result of applying all commits from
49///   version 1 through `version` in order.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct TableState {
52    /// Latest committed version recorded in CURRENT.
53    pub version: u64,
54    /// Table-level metadata reconstructed from the log.
55    pub table_meta: TableMeta,
56    /// Current live segments keyed by SegmentId.
57    pub segments: HashMap<SegmentId, SegmentMeta>,
58
59    /// Optional pointer to the latest table coverage metadata.
60    pub table_coverage: Option<TableCoveragePointer>,
61}
62
63impl TableState {
64    /// Return live segments sorted deterministically by time.
65    ///
66    /// Ordering is by `ts_min`, then `ts_max`, and finally `segment_id` as a
67    /// stable tie-breaker.
68    pub fn segments_sorted_by_time(&self) -> Vec<&SegmentMeta> {
69        let mut v: Vec<&SegmentMeta> = self.segments.values().collect();
70        v.sort_unstable_by(|a, b| cmp_segment_meta_by_time(a, b));
71        v
72    }
73}
74
75impl TransactionLogStore {
76    /// Rebuild the current TableState by replaying all commits up to CURRENT.
77    ///
78    /// v0.1 behavior:
79    /// - If CURRENT == 0 (no commits), this returns CommitError::CorruptState.
80    /// - The first commit must include at least one UpdateTableMeta action
81    ///   to bootstrap TableMeta; the last UpdateTableMeta wins.
82    pub async fn rebuild_table_state(&self) -> Result<TableState, CommitError> {
83        #[cfg(feature = "test-counters")]
84        REBUILD_TABLE_STATE_COUNT.with(|c| c.set(c.get() + 1));
85
86        let current_version = self.load_current_version().await?;
87
88        if current_version == 0 {
89            // v0.1: treat "no commits" as an uninitialized / corrupt table.
90            return CorruptStateSnafu {
91                msg: "Cannot rebuild TableState: CURRENT is 0 (no commits)".to_string(),
92            }
93            .fail();
94        }
95
96        let mut table_meta: Option<TableMeta> = None;
97        let mut segments: HashMap<SegmentId, SegmentMeta> = HashMap::new();
98
99        let mut table_coverage: Option<TableCoveragePointer> = None;
100
101        // Replay all commits from 1..=current_version in order
102        for v in 1..=current_version {
103            let commit = self.load_commit(v).await?;
104
105            // Defensive: file name version should match payload
106            if commit.version != v {
107                return CorruptStateSnafu {
108                    msg: format!(
109                        "Commit version mismatch: expected {v}, found {} in payload",
110                        commit.version
111                    ),
112                }
113                .fail();
114            }
115
116            for action in commit.actions {
117                match action {
118                    LogAction::AddSegment(meta) => {
119                        // Insert or replace the segment; latest info wins.
120                        segments.insert(meta.segment_id.clone(), meta);
121                    }
122                    LogAction::RemoveSegment { segment_id } => {
123                        segments.remove(&segment_id);
124                    }
125                    LogAction::UpdateTableMeta(delta) => {
126                        // v0.1: full replacement of TableMeta
127                        table_meta = Some(delta);
128                    }
129                    LogAction::UpdateTableCoverage {
130                        bucket_spec,
131                        coverage_path,
132                    } => {
133                        table_coverage = Some(TableCoveragePointer {
134                            bucket_spec,
135                            coverage_path,
136                            version: v,
137                        })
138                    }
139                }
140            }
141        }
142
143        let table_meta = table_meta.context(CorruptStateSnafu {
144            msg: format!("No TableMeta found in commits up to version {current_version}",),
145        })?;
146
147        Ok(TableState {
148            version: current_version,
149            table_meta,
150            segments,
151            table_coverage,
152        })
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::storage::layout;
160    use crate::storage::{StorageError, TableLocation};
161    use crate::transaction_log::{
162        FileFormat, LogAction, SegmentId, SegmentMeta, TableKind, TableMeta, TimeBucket,
163        TimeIndexSpec, TransactionLogStore,
164    };
165    use chrono::TimeZone;
166    use tempfile::TempDir;
167
168    type TestResult = Result<(), Box<dyn std::error::Error>>;
169
170    fn create_test_log_store() -> (TempDir, TransactionLogStore) {
171        let tmp = TempDir::new().expect("create temp dir");
172        let location = TableLocation::local(tmp.path());
173        let store = TransactionLogStore::new(location);
174        (tmp, store)
175    }
176
177    fn sample_table_meta() -> TableMeta {
178        TableMeta {
179            kind: TableKind::TimeSeries(TimeIndexSpec {
180                timestamp_column: "ts".to_string(),
181                entity_columns: vec!["symbol".to_string()],
182                bucket: TimeBucket::Minutes(1),
183                timezone: None,
184            }),
185            logical_schema: None,
186            created_at: chrono::Utc
187                .with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
188                .single()
189                .expect("valid sample table metadata timestamp"),
190            format_version: 1,
191            entity_identity: None,
192        }
193    }
194
195    fn sample_segment(id: &str) -> SegmentMeta {
196        SegmentMeta {
197            segment_id: SegmentId(id.to_string()),
198            path: format!("data/{id}.parquet"),
199            format: FileFormat::Parquet,
200            ts_min: chrono::Utc
201                .with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
202                .single()
203                .expect("valid sample segment ts_min"),
204            ts_max: chrono::Utc
205                .with_ymd_and_hms(2025, 1, 1, 1, 0, 0)
206                .single()
207                .expect("valid sample segment ts_max"),
208            row_count: 42,
209            file_size: None,
210            coverage_path: None,
211        }
212    }
213
214    fn segment_with_ts(id: &str, ts_min: i64, ts_max: i64) -> SegmentMeta {
215        SegmentMeta {
216            segment_id: SegmentId(id.to_string()),
217            path: format!("data/{id}.parquet"),
218            format: FileFormat::Parquet,
219            ts_min: chrono::Utc.timestamp_opt(ts_min, 0).single().unwrap(),
220            ts_max: chrono::Utc.timestamp_opt(ts_max, 0).single().unwrap(),
221            row_count: 1,
222            file_size: None,
223            coverage_path: None,
224        }
225    }
226
227    #[test]
228    fn segments_sorted_by_time_orders_hashmap_deterministically() {
229        let mut segments = HashMap::new();
230        let seg_c = segment_with_ts("c", 10, 30);
231        let seg_a = segment_with_ts("a", 10, 20);
232        let seg_d = segment_with_ts("d", 5, 7);
233        let seg_b = segment_with_ts("b", 10, 20);
234
235        segments.insert(seg_c.segment_id.clone(), seg_c);
236        segments.insert(seg_a.segment_id.clone(), seg_a);
237        segments.insert(seg_d.segment_id.clone(), seg_d);
238        segments.insert(seg_b.segment_id.clone(), seg_b);
239
240        let state = TableState {
241            version: 3,
242            table_meta: sample_table_meta(),
243            segments,
244            table_coverage: None,
245        };
246
247        let ordered: Vec<(i64, i64, String)> = state
248            .segments_sorted_by_time()
249            .iter()
250            .map(|seg| {
251                (
252                    seg.ts_min.timestamp(),
253                    seg.ts_max.timestamp(),
254                    seg.segment_id.0.clone(),
255                )
256            })
257            .collect();
258
259        let mut expected = ordered.clone();
260        expected.sort();
261        assert_eq!(ordered, expected);
262    }
263
264    #[tokio::test]
265    async fn rebuild_table_state_happy_path() -> TestResult {
266        let (_tmp, store) = create_test_log_store();
267        let meta = sample_table_meta();
268        let seg1 = sample_segment("seg1");
269        let seg2 = sample_segment("seg2");
270
271        let v1 = store
272            .commit_with_expected_version(0, vec![LogAction::UpdateTableMeta(meta.clone())])
273            .await?;
274        let v2 = store
275            .commit_with_expected_version(
276                v1,
277                vec![
278                    LogAction::AddSegment(seg1.clone()),
279                    LogAction::AddSegment(seg2.clone()),
280                ],
281            )
282            .await?;
283        let v3 = store
284            .commit_with_expected_version(
285                v2,
286                vec![LogAction::RemoveSegment {
287                    segment_id: seg1.segment_id.clone(),
288                }],
289            )
290            .await?;
291
292        let state = store.rebuild_table_state().await?;
293        assert_eq!(state.version, v3);
294        assert_eq!(state.table_meta, meta);
295        assert!(state.segments.contains_key(&seg2.segment_id));
296        assert!(!state.segments.contains_key(&seg1.segment_id));
297        Ok(())
298    }
299
300    #[tokio::test]
301    async fn rebuild_table_state_errors_when_current_zero() {
302        let (_tmp, store) = create_test_log_store();
303
304        let err = store
305            .rebuild_table_state()
306            .await
307            .expect_err("expected error");
308        assert!(matches!(err, CommitError::CorruptState { .. }));
309    }
310
311    #[tokio::test]
312    async fn rebuild_table_state_errors_when_no_table_meta() -> TestResult {
313        let (_tmp, store) = create_test_log_store();
314        let seg = sample_segment("seg");
315
316        store
317            .commit_with_expected_version(0, vec![LogAction::AddSegment(seg.clone())])
318            .await?;
319
320        let err = store
321            .rebuild_table_state()
322            .await
323            .expect_err("expected error");
324        assert!(matches!(err, CommitError::CorruptState { .. }));
325        Ok(())
326    }
327
328    #[tokio::test]
329    async fn rebuild_table_state_fails_on_corrupt_commit_payload() -> TestResult {
330        let (tmp, store) = create_test_log_store();
331        let meta = sample_table_meta();
332
333        store
334            .commit_with_expected_version(0, vec![LogAction::UpdateTableMeta(meta)])
335            .await?;
336
337        let commit_path = tmp.path().join(layout::commit_rel_path(1));
338        tokio::fs::write(&commit_path, b"not-json").await?;
339
340        let err = store
341            .rebuild_table_state()
342            .await
343            .expect_err("expected error");
344        assert!(matches!(err, CommitError::CorruptState { .. }));
345        Ok(())
346    }
347
348    #[tokio::test]
349    async fn rebuild_table_state_fails_when_commit_missing() -> TestResult {
350        let (tmp, store) = create_test_log_store();
351        let meta = sample_table_meta();
352
353        store
354            .commit_with_expected_version(0, vec![LogAction::UpdateTableMeta(meta)])
355            .await?;
356
357        let commit_path = tmp.path().join(layout::commit_rel_path(1));
358        tokio::fs::remove_file(&commit_path).await?;
359
360        let err = store
361            .rebuild_table_state()
362            .await
363            .expect_err("expected error");
364        match err {
365            CommitError::Storage { source } => match source {
366                StorageError::NotFound { .. } => {}
367                other => panic!("unexpected storage error: {other:?}"),
368            },
369            other => panic!("expected storage error, got {other:?}"),
370        }
371        Ok(())
372    }
373}