timeseries_table_core/transaction_log/
table_state.rs1use std::collections::HashMap;
10
11#[cfg(feature = "test-counters")]
12use std::cell::Cell;
13
14#[cfg(feature = "test-counters")]
15thread_local! {
16 static REBUILD_TABLE_STATE_COUNT: Cell<usize> = const { Cell::new(0) };
17}
18
19#[cfg(feature = "test-counters")]
20pub fn rebuild_table_state_count() -> usize {
22 REBUILD_TABLE_STATE_COUNT.with(|c| c.get())
23}
24
25#[cfg(feature = "test-counters")]
26pub fn reset_rebuild_table_state_count() {
28 REBUILD_TABLE_STATE_COUNT.with(|c| c.set(0));
29}
30
31use crate::{metadata::segments::cmp_segment_meta_by_time, transaction_log::*};
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct TableCoveragePointer {
36 pub bucket_spec: TimeBucket,
38 pub coverage_path: String,
40 pub version: u64,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct TableState {
52 pub version: u64,
54 pub table_meta: TableMeta,
56 pub segments: HashMap<SegmentId, SegmentMeta>,
58
59 pub table_coverage: Option<TableCoveragePointer>,
61}
62
63impl TableState {
64 pub fn segments_sorted_by_time(&self) -> Vec<&SegmentMeta> {
69 let mut v: Vec<&SegmentMeta> = self.segments.values().collect();
70 v.sort_unstable_by(|a, b| cmp_segment_meta_by_time(a, b));
71 v
72 }
73}
74
75impl TransactionLogStore {
76 pub async fn rebuild_table_state(&self) -> Result<TableState, CommitError> {
83 #[cfg(feature = "test-counters")]
84 REBUILD_TABLE_STATE_COUNT.with(|c| c.set(c.get() + 1));
85
86 let current_version = self.load_current_version().await?;
87
88 if current_version == 0 {
89 return CorruptStateSnafu {
91 msg: "Cannot rebuild TableState: CURRENT is 0 (no commits)".to_string(),
92 }
93 .fail();
94 }
95
96 let mut table_meta: Option<TableMeta> = None;
97 let mut segments: HashMap<SegmentId, SegmentMeta> = HashMap::new();
98
99 let mut table_coverage: Option<TableCoveragePointer> = None;
100
101 for v in 1..=current_version {
103 let commit = self.load_commit(v).await?;
104
105 if commit.version != v {
107 return CorruptStateSnafu {
108 msg: format!(
109 "Commit version mismatch: expected {v}, found {} in payload",
110 commit.version
111 ),
112 }
113 .fail();
114 }
115
116 for action in commit.actions {
117 match action {
118 LogAction::AddSegment(meta) => {
119 segments.insert(meta.segment_id.clone(), meta);
121 }
122 LogAction::RemoveSegment { segment_id } => {
123 segments.remove(&segment_id);
124 }
125 LogAction::UpdateTableMeta(delta) => {
126 table_meta = Some(delta);
128 }
129 LogAction::UpdateTableCoverage {
130 bucket_spec,
131 coverage_path,
132 } => {
133 table_coverage = Some(TableCoveragePointer {
134 bucket_spec,
135 coverage_path,
136 version: v,
137 })
138 }
139 }
140 }
141 }
142
143 let table_meta = table_meta.context(CorruptStateSnafu {
144 msg: format!("No TableMeta found in commits up to version {current_version}",),
145 })?;
146
147 Ok(TableState {
148 version: current_version,
149 table_meta,
150 segments,
151 table_coverage,
152 })
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::storage::layout;
160 use crate::storage::{StorageError, TableLocation};
161 use crate::transaction_log::{
162 FileFormat, LogAction, SegmentId, SegmentMeta, TableKind, TableMeta, TimeBucket,
163 TimeIndexSpec, TransactionLogStore,
164 };
165 use chrono::TimeZone;
166 use tempfile::TempDir;
167
168 type TestResult = Result<(), Box<dyn std::error::Error>>;
169
170 fn create_test_log_store() -> (TempDir, TransactionLogStore) {
171 let tmp = TempDir::new().expect("create temp dir");
172 let location = TableLocation::local(tmp.path());
173 let store = TransactionLogStore::new(location);
174 (tmp, store)
175 }
176
177 fn sample_table_meta() -> TableMeta {
178 TableMeta {
179 kind: TableKind::TimeSeries(TimeIndexSpec {
180 timestamp_column: "ts".to_string(),
181 entity_columns: vec!["symbol".to_string()],
182 bucket: TimeBucket::Minutes(1),
183 timezone: None,
184 }),
185 logical_schema: None,
186 created_at: chrono::Utc
187 .with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
188 .single()
189 .expect("valid sample table metadata timestamp"),
190 format_version: 1,
191 entity_identity: None,
192 }
193 }
194
195 fn sample_segment(id: &str) -> SegmentMeta {
196 SegmentMeta {
197 segment_id: SegmentId(id.to_string()),
198 path: format!("data/{id}.parquet"),
199 format: FileFormat::Parquet,
200 ts_min: chrono::Utc
201 .with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
202 .single()
203 .expect("valid sample segment ts_min"),
204 ts_max: chrono::Utc
205 .with_ymd_and_hms(2025, 1, 1, 1, 0, 0)
206 .single()
207 .expect("valid sample segment ts_max"),
208 row_count: 42,
209 file_size: None,
210 coverage_path: None,
211 }
212 }
213
214 fn segment_with_ts(id: &str, ts_min: i64, ts_max: i64) -> SegmentMeta {
215 SegmentMeta {
216 segment_id: SegmentId(id.to_string()),
217 path: format!("data/{id}.parquet"),
218 format: FileFormat::Parquet,
219 ts_min: chrono::Utc.timestamp_opt(ts_min, 0).single().unwrap(),
220 ts_max: chrono::Utc.timestamp_opt(ts_max, 0).single().unwrap(),
221 row_count: 1,
222 file_size: None,
223 coverage_path: None,
224 }
225 }
226
227 #[test]
228 fn segments_sorted_by_time_orders_hashmap_deterministically() {
229 let mut segments = HashMap::new();
230 let seg_c = segment_with_ts("c", 10, 30);
231 let seg_a = segment_with_ts("a", 10, 20);
232 let seg_d = segment_with_ts("d", 5, 7);
233 let seg_b = segment_with_ts("b", 10, 20);
234
235 segments.insert(seg_c.segment_id.clone(), seg_c);
236 segments.insert(seg_a.segment_id.clone(), seg_a);
237 segments.insert(seg_d.segment_id.clone(), seg_d);
238 segments.insert(seg_b.segment_id.clone(), seg_b);
239
240 let state = TableState {
241 version: 3,
242 table_meta: sample_table_meta(),
243 segments,
244 table_coverage: None,
245 };
246
247 let ordered: Vec<(i64, i64, String)> = state
248 .segments_sorted_by_time()
249 .iter()
250 .map(|seg| {
251 (
252 seg.ts_min.timestamp(),
253 seg.ts_max.timestamp(),
254 seg.segment_id.0.clone(),
255 )
256 })
257 .collect();
258
259 let mut expected = ordered.clone();
260 expected.sort();
261 assert_eq!(ordered, expected);
262 }
263
264 #[tokio::test]
265 async fn rebuild_table_state_happy_path() -> TestResult {
266 let (_tmp, store) = create_test_log_store();
267 let meta = sample_table_meta();
268 let seg1 = sample_segment("seg1");
269 let seg2 = sample_segment("seg2");
270
271 let v1 = store
272 .commit_with_expected_version(0, vec![LogAction::UpdateTableMeta(meta.clone())])
273 .await?;
274 let v2 = store
275 .commit_with_expected_version(
276 v1,
277 vec![
278 LogAction::AddSegment(seg1.clone()),
279 LogAction::AddSegment(seg2.clone()),
280 ],
281 )
282 .await?;
283 let v3 = store
284 .commit_with_expected_version(
285 v2,
286 vec![LogAction::RemoveSegment {
287 segment_id: seg1.segment_id.clone(),
288 }],
289 )
290 .await?;
291
292 let state = store.rebuild_table_state().await?;
293 assert_eq!(state.version, v3);
294 assert_eq!(state.table_meta, meta);
295 assert!(state.segments.contains_key(&seg2.segment_id));
296 assert!(!state.segments.contains_key(&seg1.segment_id));
297 Ok(())
298 }
299
300 #[tokio::test]
301 async fn rebuild_table_state_errors_when_current_zero() {
302 let (_tmp, store) = create_test_log_store();
303
304 let err = store
305 .rebuild_table_state()
306 .await
307 .expect_err("expected error");
308 assert!(matches!(err, CommitError::CorruptState { .. }));
309 }
310
311 #[tokio::test]
312 async fn rebuild_table_state_errors_when_no_table_meta() -> TestResult {
313 let (_tmp, store) = create_test_log_store();
314 let seg = sample_segment("seg");
315
316 store
317 .commit_with_expected_version(0, vec![LogAction::AddSegment(seg.clone())])
318 .await?;
319
320 let err = store
321 .rebuild_table_state()
322 .await
323 .expect_err("expected error");
324 assert!(matches!(err, CommitError::CorruptState { .. }));
325 Ok(())
326 }
327
328 #[tokio::test]
329 async fn rebuild_table_state_fails_on_corrupt_commit_payload() -> TestResult {
330 let (tmp, store) = create_test_log_store();
331 let meta = sample_table_meta();
332
333 store
334 .commit_with_expected_version(0, vec![LogAction::UpdateTableMeta(meta)])
335 .await?;
336
337 let commit_path = tmp.path().join(layout::commit_rel_path(1));
338 tokio::fs::write(&commit_path, b"not-json").await?;
339
340 let err = store
341 .rebuild_table_state()
342 .await
343 .expect_err("expected error");
344 assert!(matches!(err, CommitError::CorruptState { .. }));
345 Ok(())
346 }
347
348 #[tokio::test]
349 async fn rebuild_table_state_fails_when_commit_missing() -> TestResult {
350 let (tmp, store) = create_test_log_store();
351 let meta = sample_table_meta();
352
353 store
354 .commit_with_expected_version(0, vec![LogAction::UpdateTableMeta(meta)])
355 .await?;
356
357 let commit_path = tmp.path().join(layout::commit_rel_path(1));
358 tokio::fs::remove_file(&commit_path).await?;
359
360 let err = store
361 .rebuild_table_state()
362 .await
363 .expect_err("expected error");
364 match err {
365 CommitError::Storage { source } => match source {
366 StorageError::NotFound { .. } => {}
367 other => panic!("unexpected storage error: {other:?}"),
368 },
369 other => panic!("expected storage error, got {other:?}"),
370 }
371 Ok(())
372 }
373}