Skip to main content

timeseries_table_core/
table.rs

1//! High-level time-series table abstraction.
2//!
3//! This module is the canonical home for the user-facing [`TimeSeriesTable`]
4//! API. The `time_series_table` module exists as a thin compatibility facade
5//! and re-exports this API.
6//!
7//! In v0.1 this is intentionally read-heavy and write-light:
8//! - `open` reconstructs state from the transaction log,
9//! - `create` bootstraps a fresh table with an initial metadata commit,
10//! - append APIs handle schema enforcement, coverage sidecars, and OCC,
11//! - range scans stream filtered record batches.
12
13pub mod append;
14/// Append profiling report types used by CLI benchmarks.
15pub mod append_report;
16pub mod coverage;
17pub mod error;
18pub mod scan;
19
20#[cfg(test)]
21pub(crate) mod test_util;
22
23use std::pin::Pin;
24
25use arrow::array::RecordBatch;
26use futures::Stream;
27use snafu::prelude::*;
28
29use crate::table::error::{
30    AlreadyExistsSnafu, EmptyTableSnafu, NotTimeSeriesSnafu, TransactionLogSnafu,
31};
32
33use crate::{
34    storage::TableLocation,
35    transaction_log::{
36        LogAction, TableKind, TableMeta, TableState, TimeIndexSpec, TransactionLogStore,
37    },
38};
39
40pub use error::TableError;
41
42/// Stream of Arrow RecordBatch values from a time-series scan.
43pub type TimeSeriesScan = Pin<Box<dyn Stream<Item = Result<RecordBatch, TableError>> + Send>>;
44
45/// High-level time-series table handle.
46///
47/// This is the main entry point for callers. It bundles:
48/// - where the table is,
49/// - how to talk to the transaction log,
50/// - what the current committed state is,
51/// - and the extracted time index spec.
52#[derive(Debug)]
53pub struct TimeSeriesTable {
54    log: TransactionLogStore,
55    state: TableState,
56    index: TimeIndexSpec,
57}
58
59impl TimeSeriesTable {
60    /// Construct a table handle from an existing snapshot.
61    ///
62    /// This does not replay the transaction log; callers must provide a state
63    /// derived from the same location.
64    pub fn from_state(location: TableLocation, state: TableState) -> Result<Self, TableError> {
65        let index = match &state.table_meta.kind {
66            TableKind::TimeSeries(spec) => spec.clone(),
67            other => {
68                return NotTimeSeriesSnafu {
69                    kind: other.clone(),
70                }
71                .fail();
72            }
73        };
74
75        let log = TransactionLogStore::new(location);
76        Ok(Self { log, state, index })
77    }
78
79    /// Return the current committed table state.
80    pub fn state(&self) -> &TableState {
81        &self.state
82    }
83
84    /// Return a mutable reference to the current committed table state (crate-internal).
85    ///
86    /// This exists to support internal helpers (for example, tests) without
87    /// exposing mutation to library callers.
88    #[allow(dead_code)]
89    pub(crate) fn state_mut(&mut self) -> &mut TableState {
90        &mut self.state
91    }
92
93    /// Return the time index specification for this table.
94    pub fn index_spec(&self) -> &TimeIndexSpec {
95        &self.index
96    }
97
98    /// Return the table location.
99    pub fn location(&self) -> &TableLocation {
100        self.log.location()
101    }
102
103    /// Return the transaction log store handle.
104    pub fn log_store(&self) -> &TransactionLogStore {
105        &self.log
106    }
107
108    /// Open an existing time-series table at the given location.
109    ///
110    /// Steps:
111    /// - Build a `TransactionLogStore` for the location.
112    /// - Rebuild `TableState` from the transaction log.
113    /// - Reject empty tables (version == 0).
114    /// - Require `TableKind::TimeSeries` and extract `TimeIndexSpec`.
115    pub async fn open(location: TableLocation) -> Result<Self, TableError> {
116        let log = TransactionLogStore::new(location.clone());
117
118        // Early return for tables with no commits so we surface TableError::EmptyTable
119        // instead of a lower-level corrupt state error.
120        let current_version = log
121            .load_current_version()
122            .await
123            .context(TransactionLogSnafu)?;
124
125        if current_version == 0 {
126            return EmptyTableSnafu.fail();
127        }
128
129        // Rebuild the snapshot of state from the log.
130        let state = log
131            .rebuild_table_state()
132            .await
133            .context(TransactionLogSnafu)?;
134
135        // Extract the time index spec from TableMeta.kind.
136        let index = match &state.table_meta.kind {
137            TableKind::TimeSeries(spec) => spec.clone(),
138            other => {
139                return NotTimeSeriesSnafu {
140                    kind: other.clone(),
141                }
142                .fail();
143            }
144        };
145
146        Ok(Self { log, state, index })
147    }
148
149    /// Create a new time-series table at the given location.
150    ///
151    /// This:
152    /// - Requires `table_meta.kind` to be `TableKind::TimeSeries`,
153    /// - Verifies that there are no existing commits (version must be 0),
154    /// - Writes an initial commit with `UpdateTableMeta(table_meta.clone())`,
155    /// - Returns a `TimeSeriesTable` with a fresh `TableState`.
156    pub async fn create(
157        location: TableLocation,
158        table_meta: TableMeta,
159    ) -> Result<Self, TableError> {
160        // 1) Extract the time index spec from the provided metadata
161        // and ensure this is actually a time-series table.
162        let index = match &table_meta.kind {
163            TableKind::TimeSeries(spec) => spec.clone(),
164            other => {
165                return NotTimeSeriesSnafu {
166                    kind: other.clone(),
167                }
168                .fail();
169            }
170        };
171
172        let log = TransactionLogStore::new(location.clone());
173
174        // 2) Check that there are no existing commits. This keeps `create`
175        // from silently appending to a pre-existing table.
176        let current_version = log
177            .load_current_version()
178            .await
179            .context(TransactionLogSnafu)?;
180
181        if current_version != 0 {
182            return AlreadyExistsSnafu { current_version }.fail();
183        }
184
185        // 3) Write the initial metadata commit at version 1.
186        let actions = vec![LogAction::UpdateTableMeta(table_meta.clone())];
187
188        let new_version = log
189            .commit_with_expected_version(0, actions)
190            .await
191            .context(TransactionLogSnafu)?;
192
193        debug_assert_eq!(new_version, 1);
194
195        // 4) Rebuild state from the log so that `state` is guaranteed to be
196        // consistent with what is on disk.
197        let state = log
198            .rebuild_table_state()
199            .await
200            .context(TransactionLogSnafu)?;
201        Ok(Self { log, state, index })
202    }
203
204    /// Load the current log version from disk without mutating in-memory state.
205    pub async fn current_version(&self) -> Result<u64, TableError> {
206        self.log
207            .load_current_version()
208            .await
209            .context(TransactionLogSnafu)
210    }
211
212    /// Rebuild and return the latest table state from the transaction log.
213    pub async fn load_latest_state(&self) -> Result<TableState, TableError> {
214        self.log
215            .rebuild_table_state()
216            .await
217            .context(TransactionLogSnafu)
218    }
219
220    /// Refresh in-memory state if the log has advanced; returns true if updated.
221    pub async fn refresh(&mut self) -> Result<bool, TableError> {
222        let current = self
223            .log
224            .load_current_version()
225            .await
226            .context(TransactionLogSnafu)?;
227
228        if current == self.state.version {
229            return Ok(false);
230        }
231
232        let state = self
233            .log
234            .rebuild_table_state()
235            .await
236            .context(TransactionLogSnafu)?;
237
238        let index = match &state.table_meta.kind {
239            TableKind::TimeSeries(spec) => spec.clone(),
240            other => {
241                return NotTimeSeriesSnafu {
242                    kind: other.clone(),
243                }
244                .fail();
245            }
246        };
247
248        self.state = state;
249        self.index = index;
250        Ok(true)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    use crate::storage::{StorageLocation, layout};
259    use crate::table::test_util::*;
260    use crate::transaction_log::{TimeBucket, TransactionLogStore};
261
262    use tempfile::TempDir;
263
264    #[tokio::test]
265    async fn create_initializes_log_and_state() -> TestResult {
266        let tmp = TempDir::new()?;
267        let location = TableLocation::local(tmp.path());
268
269        let meta = make_basic_table_meta();
270        let table = TimeSeriesTable::create(location.clone(), meta).await?;
271
272        // State should be at version 1 with no segments.
273        assert_eq!(table.state().version, 1);
274        assert!(table.state().segments.is_empty());
275
276        // Verify that the log layout exists on disk.
277        let root = match table.location().storage() {
278            StorageLocation::Local(p) => p.clone(),
279        };
280
281        let log_dir = root.join(layout::log_rel_dir());
282        assert!(log_dir.is_dir());
283
284        let current_path = root.join(layout::current_rel_path());
285        let current_contents = tokio::fs::read_to_string(&current_path).await?;
286        assert_eq!(current_contents.trim(), "1");
287
288        Ok(())
289    }
290
291    #[tokio::test]
292    async fn open_round_trip_after_create() -> TestResult {
293        let tmp = TempDir::new()?;
294        let location = TableLocation::local(tmp.path());
295
296        let meta = make_basic_table_meta();
297        let created = TimeSeriesTable::create(location.clone(), meta).await?;
298
299        let reopened = TimeSeriesTable::open(location.clone()).await?;
300
301        assert_eq!(created.state().version, reopened.state().version);
302        assert_eq!(created.index_spec(), reopened.index_spec());
303        Ok(())
304    }
305
306    #[tokio::test]
307    async fn open_empty_root_errors() -> TestResult {
308        let tmp = TempDir::new()?;
309        let location = TableLocation::local(tmp.path());
310
311        // There is no CURRENT and no commits, so opening should fail.
312        let result = TimeSeriesTable::open(location).await;
313        assert!(matches!(result, Err(TableError::EmptyTable)));
314        Ok(())
315    }
316
317    #[tokio::test]
318    async fn create_fails_if_table_already_exists() -> TestResult {
319        let tmp = TempDir::new()?;
320        let location = TableLocation::local(tmp.path());
321
322        let meta = make_basic_table_meta();
323        let _first = TimeSeriesTable::create(location.clone(), meta.clone()).await?;
324
325        // Second create should detect existing commits and fail.
326        let result = TimeSeriesTable::create(location.clone(), meta).await;
327        assert!(matches!(result, Err(TableError::AlreadyExists { .. })));
328        Ok(())
329    }
330
331    #[tokio::test]
332    async fn refresh_returns_false_when_no_new_commits() -> TestResult {
333        let tmp = TempDir::new()?;
334        let location = TableLocation::local(tmp.path());
335
336        let meta = make_basic_table_meta();
337        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
338
339        let refreshed = table.refresh().await?;
340        assert!(!refreshed);
341        assert_eq!(table.state().version, 1);
342        Ok(())
343    }
344
345    #[tokio::test]
346    async fn refresh_updates_state_and_index_on_change() -> TestResult {
347        let tmp = TempDir::new()?;
348        let location = TableLocation::local(tmp.path());
349
350        let meta = make_basic_table_meta();
351        let mut table = TimeSeriesTable::create(location.clone(), meta.clone()).await?;
352
353        let mut updated_meta = meta.clone();
354        if let TableKind::TimeSeries(spec) = &mut updated_meta.kind {
355            spec.bucket = TimeBucket::Minutes(5);
356        }
357
358        let log = TransactionLogStore::new(location.clone());
359        let new_version = log
360            .commit_with_expected_version(1, vec![LogAction::UpdateTableMeta(updated_meta.clone())])
361            .await?;
362        assert_eq!(new_version, 2);
363
364        let refreshed = table.refresh().await?;
365        assert!(refreshed);
366        assert_eq!(table.state().version, 2);
367
368        match &table.state().table_meta.kind {
369            TableKind::TimeSeries(spec) => assert_eq!(spec.bucket, TimeBucket::Minutes(5)),
370            other => panic!("expected time series table kind, got {other:?}"),
371        }
372        assert_eq!(table.index_spec().bucket, TimeBucket::Minutes(5));
373        Ok(())
374    }
375}