timeseries_table_core/
table.rs1pub mod append;
14pub mod append_report;
16pub mod coverage;
17pub mod error;
18pub mod scan;
19
20#[cfg(test)]
21pub(crate) mod test_util;
22
23use std::pin::Pin;
24
25use arrow::array::RecordBatch;
26use futures::Stream;
27use snafu::prelude::*;
28
29use crate::table::error::{
30 AlreadyExistsSnafu, EmptyTableSnafu, NotTimeSeriesSnafu, TransactionLogSnafu,
31};
32
33use crate::{
34 storage::TableLocation,
35 transaction_log::{
36 LogAction, TableKind, TableMeta, TableState, TimeIndexSpec, TransactionLogStore,
37 },
38};
39
40pub use error::TableError;
41
42pub type TimeSeriesScan = Pin<Box<dyn Stream<Item = Result<RecordBatch, TableError>> + Send>>;
44
45#[derive(Debug)]
53pub struct TimeSeriesTable {
54 log: TransactionLogStore,
55 state: TableState,
56 index: TimeIndexSpec,
57}
58
59impl TimeSeriesTable {
60 pub fn from_state(location: TableLocation, state: TableState) -> Result<Self, TableError> {
65 let index = match &state.table_meta.kind {
66 TableKind::TimeSeries(spec) => spec.clone(),
67 other => {
68 return NotTimeSeriesSnafu {
69 kind: other.clone(),
70 }
71 .fail();
72 }
73 };
74
75 let log = TransactionLogStore::new(location);
76 Ok(Self { log, state, index })
77 }
78
79 pub fn state(&self) -> &TableState {
81 &self.state
82 }
83
84 #[allow(dead_code)]
89 pub(crate) fn state_mut(&mut self) -> &mut TableState {
90 &mut self.state
91 }
92
93 pub fn index_spec(&self) -> &TimeIndexSpec {
95 &self.index
96 }
97
98 pub fn location(&self) -> &TableLocation {
100 self.log.location()
101 }
102
103 pub fn log_store(&self) -> &TransactionLogStore {
105 &self.log
106 }
107
108 pub async fn open(location: TableLocation) -> Result<Self, TableError> {
116 let log = TransactionLogStore::new(location.clone());
117
118 let current_version = log
121 .load_current_version()
122 .await
123 .context(TransactionLogSnafu)?;
124
125 if current_version == 0 {
126 return EmptyTableSnafu.fail();
127 }
128
129 let state = log
131 .rebuild_table_state()
132 .await
133 .context(TransactionLogSnafu)?;
134
135 let index = match &state.table_meta.kind {
137 TableKind::TimeSeries(spec) => spec.clone(),
138 other => {
139 return NotTimeSeriesSnafu {
140 kind: other.clone(),
141 }
142 .fail();
143 }
144 };
145
146 Ok(Self { log, state, index })
147 }
148
149 pub async fn create(
157 location: TableLocation,
158 table_meta: TableMeta,
159 ) -> Result<Self, TableError> {
160 let index = match &table_meta.kind {
163 TableKind::TimeSeries(spec) => spec.clone(),
164 other => {
165 return NotTimeSeriesSnafu {
166 kind: other.clone(),
167 }
168 .fail();
169 }
170 };
171
172 let log = TransactionLogStore::new(location.clone());
173
174 let current_version = log
177 .load_current_version()
178 .await
179 .context(TransactionLogSnafu)?;
180
181 if current_version != 0 {
182 return AlreadyExistsSnafu { current_version }.fail();
183 }
184
185 let actions = vec![LogAction::UpdateTableMeta(table_meta.clone())];
187
188 let new_version = log
189 .commit_with_expected_version(0, actions)
190 .await
191 .context(TransactionLogSnafu)?;
192
193 debug_assert_eq!(new_version, 1);
194
195 let state = log
198 .rebuild_table_state()
199 .await
200 .context(TransactionLogSnafu)?;
201 Ok(Self { log, state, index })
202 }
203
204 pub async fn current_version(&self) -> Result<u64, TableError> {
206 self.log
207 .load_current_version()
208 .await
209 .context(TransactionLogSnafu)
210 }
211
212 pub async fn load_latest_state(&self) -> Result<TableState, TableError> {
214 self.log
215 .rebuild_table_state()
216 .await
217 .context(TransactionLogSnafu)
218 }
219
220 pub async fn refresh(&mut self) -> Result<bool, TableError> {
222 let current = self
223 .log
224 .load_current_version()
225 .await
226 .context(TransactionLogSnafu)?;
227
228 if current == self.state.version {
229 return Ok(false);
230 }
231
232 let state = self
233 .log
234 .rebuild_table_state()
235 .await
236 .context(TransactionLogSnafu)?;
237
238 let index = match &state.table_meta.kind {
239 TableKind::TimeSeries(spec) => spec.clone(),
240 other => {
241 return NotTimeSeriesSnafu {
242 kind: other.clone(),
243 }
244 .fail();
245 }
246 };
247
248 self.state = state;
249 self.index = index;
250 Ok(true)
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 use crate::storage::{StorageLocation, layout};
259 use crate::table::test_util::*;
260 use crate::transaction_log::{TimeBucket, TransactionLogStore};
261
262 use tempfile::TempDir;
263
264 #[tokio::test]
265 async fn create_initializes_log_and_state() -> TestResult {
266 let tmp = TempDir::new()?;
267 let location = TableLocation::local(tmp.path());
268
269 let meta = make_basic_table_meta();
270 let table = TimeSeriesTable::create(location.clone(), meta).await?;
271
272 assert_eq!(table.state().version, 1);
274 assert!(table.state().segments.is_empty());
275
276 let root = match table.location().storage() {
278 StorageLocation::Local(p) => p.clone(),
279 };
280
281 let log_dir = root.join(layout::log_rel_dir());
282 assert!(log_dir.is_dir());
283
284 let current_path = root.join(layout::current_rel_path());
285 let current_contents = tokio::fs::read_to_string(¤t_path).await?;
286 assert_eq!(current_contents.trim(), "1");
287
288 Ok(())
289 }
290
291 #[tokio::test]
292 async fn open_round_trip_after_create() -> TestResult {
293 let tmp = TempDir::new()?;
294 let location = TableLocation::local(tmp.path());
295
296 let meta = make_basic_table_meta();
297 let created = TimeSeriesTable::create(location.clone(), meta).await?;
298
299 let reopened = TimeSeriesTable::open(location.clone()).await?;
300
301 assert_eq!(created.state().version, reopened.state().version);
302 assert_eq!(created.index_spec(), reopened.index_spec());
303 Ok(())
304 }
305
306 #[tokio::test]
307 async fn open_empty_root_errors() -> TestResult {
308 let tmp = TempDir::new()?;
309 let location = TableLocation::local(tmp.path());
310
311 let result = TimeSeriesTable::open(location).await;
313 assert!(matches!(result, Err(TableError::EmptyTable)));
314 Ok(())
315 }
316
317 #[tokio::test]
318 async fn create_fails_if_table_already_exists() -> TestResult {
319 let tmp = TempDir::new()?;
320 let location = TableLocation::local(tmp.path());
321
322 let meta = make_basic_table_meta();
323 let _first = TimeSeriesTable::create(location.clone(), meta.clone()).await?;
324
325 let result = TimeSeriesTable::create(location.clone(), meta).await;
327 assert!(matches!(result, Err(TableError::AlreadyExists { .. })));
328 Ok(())
329 }
330
331 #[tokio::test]
332 async fn refresh_returns_false_when_no_new_commits() -> TestResult {
333 let tmp = TempDir::new()?;
334 let location = TableLocation::local(tmp.path());
335
336 let meta = make_basic_table_meta();
337 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
338
339 let refreshed = table.refresh().await?;
340 assert!(!refreshed);
341 assert_eq!(table.state().version, 1);
342 Ok(())
343 }
344
345 #[tokio::test]
346 async fn refresh_updates_state_and_index_on_change() -> TestResult {
347 let tmp = TempDir::new()?;
348 let location = TableLocation::local(tmp.path());
349
350 let meta = make_basic_table_meta();
351 let mut table = TimeSeriesTable::create(location.clone(), meta.clone()).await?;
352
353 let mut updated_meta = meta.clone();
354 if let TableKind::TimeSeries(spec) = &mut updated_meta.kind {
355 spec.bucket = TimeBucket::Minutes(5);
356 }
357
358 let log = TransactionLogStore::new(location.clone());
359 let new_version = log
360 .commit_with_expected_version(1, vec![LogAction::UpdateTableMeta(updated_meta.clone())])
361 .await?;
362 assert_eq!(new_version, 2);
363
364 let refreshed = table.refresh().await?;
365 assert!(refreshed);
366 assert_eq!(table.state().version, 2);
367
368 match &table.state().table_meta.kind {
369 TableKind::TimeSeries(spec) => assert_eq!(spec.bucket, TimeBucket::Minutes(5)),
370 other => panic!("expected time series table kind, got {other:?}"),
371 }
372 assert_eq!(table.index_spec().bucket, TimeBucket::Minutes(5));
373 Ok(())
374 }
375}