1use std::path::Path;
13
14use log::warn;
15
16use crate::{
17 coverage::Coverage,
18 coverage::io::{read_coverage_sidecar, write_coverage_sidecar_atomic},
19 transaction_log::table_state::TableCoveragePointer,
20};
21
22use super::{TimeSeriesTable, error::TableError};
23
24impl TimeSeriesTable {
25 pub(crate) async fn recover_table_coverage_from_segments(
30 &self,
31 ) -> Result<Coverage, TableError> {
32 let mut acc = Coverage::empty();
33
34 for seg in self.state().segments.values() {
35 let path = seg.coverage_path.as_ref().ok_or_else(|| {
36 TableError::ExistingSegmentMissingCoverage {
37 segment_id: seg.segment_id.clone(),
38 }
39 })?;
40
41 let cov = read_coverage_sidecar(self.location(), Path::new(path))
42 .await
43 .map_err(|source| TableError::SegmentCoverageSidecarRead {
44 segment_id: seg.segment_id.clone(),
45 coverage_path: path.clone(),
46 source: Box::new(source),
47 })?;
48
49 acc.union_inplace(&cov);
51 }
52
53 Ok(acc)
54 }
55
56 fn ensure_table_coverage_bucket_matches(
57 &self,
58 ptr: &TableCoveragePointer,
59 ) -> Result<(), TableError> {
60 let expected = self.index_spec().bucket.clone();
61 if ptr.bucket_spec != expected {
62 return Err(TableError::TableCoverageBucketMismatch {
63 expected,
64 actual: ptr.bucket_spec.clone(),
65 pointer_version: ptr.version,
66 });
67 }
68 Ok(())
69 }
70
71 pub async fn load_table_coverage_snapshot_only(&self) -> Result<Coverage, TableError> {
78 match &self.state().table_coverage {
79 None => {
80 if self.state().segments.is_empty() {
81 return Ok(Coverage::empty());
82 }
83 Err(TableError::MissingTableCoveragePointer)
84 }
85 Some(ptr) => {
86 self.ensure_table_coverage_bucket_matches(ptr)?;
87 read_coverage_sidecar(self.location(), Path::new(&ptr.coverage_path))
88 .await
89 .map_err(|source| TableError::CoverageSidecar { source })
90 }
91 }
92 }
93
94 pub(crate) async fn load_table_snapshot_coverage_readonly(
102 &self,
103 ) -> Result<Coverage, TableError> {
104 match &self.state().table_coverage {
105 None => {
106 if self.state().segments.is_empty() {
107 return Ok(Coverage::empty());
108 }
109 self.recover_table_coverage_from_segments().await
110 }
111 Some(ptr) => {
112 self.ensure_table_coverage_bucket_matches(ptr)?;
113
114 match read_coverage_sidecar(self.location(), Path::new(&ptr.coverage_path)).await {
115 Ok(cov) => Ok(cov),
116 Err(snapshot_err) => {
117 warn!(
118 "Failed to read table coverage snapshot at {} (version {}): {:?}. \
119 Attempting recovery from segment sidecars (readonly).",
120 ptr.coverage_path, ptr.version, snapshot_err
121 );
122 self.recover_table_coverage_from_segments().await
123 }
124 }
125 }
126 }
127 }
128
129 pub(crate) async fn load_table_snapshot_coverage_with_heal(
137 &self,
138 ) -> Result<Coverage, TableError> {
139 match &self.state().table_coverage {
140 None => {
141 if self.state().segments.is_empty() {
144 return Ok(Coverage::empty());
145 }
146
147 self.recover_table_coverage_from_segments().await
149 }
150
151 Some(ptr) => {
152 self.ensure_table_coverage_bucket_matches(ptr)?;
153
154 match read_coverage_sidecar(self.location(), Path::new(&ptr.coverage_path)).await {
155 Ok(cov) => Ok(cov),
156
157 Err(snapshot_err) => {
158 warn!(
159 "Failed to read table coverage snapshot at {} (version {}): {snapshot_err:?}. \
160 Attempting recovery from segment sidecars.",
161 ptr.coverage_path, ptr.version
162 );
163
164 let recovered = self.recover_table_coverage_from_segments().await?;
166
167 let _ = write_coverage_sidecar_atomic(
169 self.location(),
170 Path::new(&ptr.coverage_path),
171 &recovered,
172 )
173 .await;
174
175 Ok(recovered)
176 }
177 }
178 }
179 }
180 }
181}
182use std::ops::RangeInclusive;
189
190use chrono::{DateTime, Duration, Utc};
191use roaring::RoaringBitmap;
192use snafu::ensure;
193
194use crate::{
195 coverage::Bucket,
196 coverage::bucket::{bucket_id, bucket_range},
197 table::error::{BucketDomainOverflowSnafu, InvalidRangeSnafu},
198};
199
200impl TimeSeriesTable {
201 fn expected_bitmap_for_bucket_range_checked(
202 &self,
203 first: u64,
204 last: u64,
205 ) -> Result<RoaringBitmap, TableError> {
206 if first > u32::MAX as u64 {
207 return Err(TableError::BucketDomainOverflow {
208 last_bucket_id: first,
209 max: u32::MAX,
210 });
211 }
212 if last > u32::MAX as u64 {
213 return Err(TableError::BucketDomainOverflow {
214 last_bucket_id: last,
215 max: u32::MAX,
216 });
217 }
218 Ok(RoaringBitmap::from_iter(
219 (first..=last).map(|b| b as Bucket),
220 ))
221 }
222
223 fn expected_bitmap_for_time_range_checked(
231 &self,
232 start: DateTime<Utc>,
233 end: DateTime<Utc>,
234 ) -> Result<RoaringBitmap, TableError> {
235 ensure!(start < end, InvalidRangeSnafu { start, end });
236
237 let range = bucket_range(&self.index_spec().bucket, start, end);
238 let first = *range.start();
239 let last = *range.end();
240
241 self.expected_bitmap_for_bucket_range_checked(first, last)
242 }
243
244 fn end_bucket_for_half_open_end(&self, ts_end: DateTime<Utc>) -> Result<u64, TableError> {
245 let end_adj = ts_end.checked_sub_signed(Duration::nanoseconds(1)).ok_or(
249 TableError::InvalidRange {
250 start: ts_end,
251 end: ts_end,
252 },
253 )?;
254 Ok(bucket_id(&self.index_spec().bucket, end_adj))
255 }
256
257 pub async fn coverage_ratio_for_range(
280 &self,
281 start: DateTime<Utc>,
282 end: DateTime<Utc>,
283 ) -> Result<f64, TableError> {
284 let expected = self.expected_bitmap_for_time_range_checked(start, end)?;
285 let cov = self.load_table_snapshot_coverage_readonly().await?;
286 Ok(cov.coverage_ratio(&expected))
287 }
288
289 pub async fn max_gap_len_for_range(
308 &self,
309 start: DateTime<Utc>,
310 end: DateTime<Utc>,
311 ) -> Result<u64, TableError> {
312 let expected = self.expected_bitmap_for_time_range_checked(start, end)?;
313 let cov = self.load_table_snapshot_coverage_readonly().await?;
314 Ok(cov.max_gap_len(&expected))
315 }
316
317 pub async fn last_fully_covered_window(
339 &self,
340 ts_end: DateTime<Utc>,
341 window_len_buckets: u64,
342 ) -> Result<Option<RangeInclusive<Bucket>>, TableError> {
343 if window_len_buckets == 0 {
344 return Ok(None);
345 }
346
347 let cov = self.load_table_snapshot_coverage_readonly().await?;
348 let end_bucket_u64 = self.end_bucket_for_half_open_end(ts_end)?;
349
350 ensure!(
351 end_bucket_u64 <= u32::MAX as u64,
352 BucketDomainOverflowSnafu {
353 last_bucket_id: end_bucket_u64,
354 max: u32::MAX,
355 }
356 );
357
358 let end_bucket = end_bucket_u64 as Bucket;
359 Ok(cov.last_window_at_or_before(end_bucket, window_len_buckets))
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use crate::{
367 metadata::table_metadata::TimeBucket,
368 storage::TableLocation,
369 table::test_util::{
370 TestResult, TestRow, make_basic_table_meta, utc_datetime, write_test_parquet,
371 },
372 };
373 use chrono::TimeZone;
374 use tempfile::TempDir;
375
376 type HelperResult<T> = Result<T, Box<dyn std::error::Error>>;
377
378 fn ts_from_secs(secs: i64) -> DateTime<Utc> {
379 Utc.timestamp_opt(secs, 0)
380 .single()
381 .expect("valid timestamp")
382 }
383
384 async fn make_table() -> HelperResult<(TempDir, TimeSeriesTable)> {
385 let tmp = TempDir::new()?;
386 let location = TableLocation::local(tmp.path());
387 let table = TimeSeriesTable::create(location, make_basic_table_meta()).await?;
388 Ok((tmp, table))
389 }
390
391 async fn append_segment(
392 table: &mut TimeSeriesTable,
393 tmp: &TempDir,
394 rel_path: &str,
395 rows: &[TestRow],
396 ) -> HelperResult<()> {
397 let abs = tmp.path().join(rel_path);
398 write_test_parquet(&abs, true, false, rows)?;
399 table.append_parquet_segment(rel_path, "ts").await?;
400 Ok(())
401 }
402
403 async fn table_with_sparse_coverage() -> HelperResult<(TempDir, TimeSeriesTable)> {
404 let (tmp, mut table) = make_table().await?;
406 append_segment(
407 &mut table,
408 &tmp,
409 "data/sparse.parquet",
410 &[
411 TestRow {
412 ts_millis: 1_000,
413 symbol: "A",
414 price: 1.0,
415 },
416 TestRow {
417 ts_millis: 61_000,
418 symbol: "A",
419 price: 2.0,
420 },
421 TestRow {
422 ts_millis: 180_000,
423 symbol: "A",
424 price: 3.0,
425 },
426 ],
427 )
428 .await?;
429 Ok((tmp, table))
430 }
431
432 async fn table_with_contiguous_run() -> HelperResult<(TempDir, TimeSeriesTable)> {
433 let (tmp, mut table) = make_table().await?;
435 append_segment(
436 &mut table,
437 &tmp,
438 "data/window.parquet",
439 &[
440 TestRow {
441 ts_millis: 240_000,
442 symbol: "A",
443 price: 1.0,
444 },
445 TestRow {
446 ts_millis: 300_000,
447 symbol: "A",
448 price: 2.0,
449 },
450 ],
451 )
452 .await?;
453 Ok((tmp, table))
454 }
455
456 #[tokio::test]
457 async fn expected_bitmap_rejects_invalid_range() -> TestResult {
458 let (_tmp, table) = make_table().await?;
459 let ts = utc_datetime(2024, 1, 1, 0, 0, 0);
460
461 let err = table
462 .expected_bitmap_for_time_range_checked(ts, ts)
463 .expect_err("start >= end should be invalid");
464 assert!(matches!(err, TableError::InvalidRange { .. }));
465 Ok(())
466 }
467
468 #[tokio::test]
469 async fn expected_bitmap_errors_on_bucket_overflow() -> TestResult {
470 let (_tmp, table) = make_table().await?;
471 let start = ts_from_secs(0);
472 let end = ts_from_secs(((u32::MAX as i64) + 2) * 60);
474
475 let err = table
476 .expected_bitmap_for_time_range_checked(start, end)
477 .expect_err("bucket domain overflow should error");
478
479 match err {
480 TableError::BucketDomainOverflow { last_bucket_id, .. } => {
481 assert!(last_bucket_id > u32::MAX as u64);
482 }
483 other => panic!("unexpected error: {other:?}"),
484 }
485 Ok(())
486 }
487
488 #[tokio::test]
489 async fn expected_bitmap_errors_on_first_bucket_overflow() -> TestResult {
490 let (_tmp, table) = make_table().await?;
491
492 let first = (u32::MAX as u64) + 1;
493 let err = table
494 .expected_bitmap_for_bucket_range_checked(first, first)
495 .expect_err("first bucket overflow should error");
496
497 match err {
498 TableError::BucketDomainOverflow { last_bucket_id, .. } => {
499 assert_eq!(last_bucket_id, first);
500 }
501 other => panic!("unexpected error: {other:?}"),
502 }
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn expected_bitmap_covers_inclusive_bucket_range() -> TestResult {
508 let (_tmp, table) = make_table().await?;
509 let start = ts_from_secs(0);
510 let end = ts_from_secs(180); let bitmap = table.expected_bitmap_for_time_range_checked(start, end)?;
513 let first = bucket_id(&table.index_spec().bucket, start);
514 let last = bucket_id(&table.index_spec().bucket, end - Duration::nanoseconds(1));
515 assert_eq!(bitmap.len(), (last - first + 1) as u64);
516 for b in first..=last {
517 assert!(bitmap.contains(b as Bucket));
518 }
519 Ok(())
520 }
521
522 #[tokio::test]
523 async fn coverage_ratio_uses_snapshot_when_present() -> TestResult {
524 let (_tmp, table) = table_with_sparse_coverage().await?;
525 let start = ts_from_secs(0);
526 let end = ts_from_secs(240); let ratio = table.coverage_ratio_for_range(start, end).await?;
529 assert!((ratio - 0.75).abs() < 1e-12);
530 Ok(())
531 }
532
533 #[tokio::test]
534 async fn coverage_ratio_recovers_when_snapshot_missing() -> TestResult {
535 let (_tmp, mut table) = table_with_sparse_coverage().await?;
536 table.state_mut().table_coverage = None;
537
538 let ratio = table
539 .coverage_ratio_for_range(ts_from_secs(0), ts_from_secs(240))
540 .await?;
541 assert!((ratio - 0.75).abs() < 1e-12);
542 Ok(())
543 }
544
545 #[tokio::test]
546 async fn coverage_ratio_errors_when_recovery_missing_segment_coverage_path() -> TestResult {
547 let (_tmp, mut table) = table_with_sparse_coverage().await?;
548 table.state_mut().table_coverage = None;
549 let seg_id = table
550 .state()
551 .segments
552 .keys()
553 .next()
554 .cloned()
555 .expect("segment present");
556 table
557 .state_mut()
558 .segments
559 .get_mut(&seg_id)
560 .expect("segment present")
561 .coverage_path = None;
562
563 let err = table
564 .coverage_ratio_for_range(ts_from_secs(0), ts_from_secs(240))
565 .await
566 .expect_err("missing segment coverage_path should bubble up");
567 assert!(matches!(
568 err,
569 TableError::ExistingSegmentMissingCoverage { segment_id } if segment_id == seg_id
570 ));
571 Ok(())
572 }
573
574 #[tokio::test]
575 async fn coverage_ratio_errors_on_bucket_mismatch() -> TestResult {
576 let (_tmp, mut table) = table_with_sparse_coverage().await?;
577 let mut ptr = table
578 .state()
579 .table_coverage
580 .clone()
581 .expect("snapshot pointer present");
582 ptr.bucket_spec = TimeBucket::Hours(1);
583 table.state_mut().table_coverage = Some(ptr.clone());
584
585 let err = table
586 .coverage_ratio_for_range(ts_from_secs(0), ts_from_secs(240))
587 .await
588 .expect_err("mismatched bucket spec should error");
589
590 match err {
591 TableError::TableCoverageBucketMismatch {
592 expected, actual, ..
593 } => {
594 assert_eq!(expected, table.index_spec().bucket.clone());
595 assert_eq!(actual, ptr.bucket_spec);
596 }
597 other => panic!("unexpected error: {other:?}"),
598 }
599 Ok(())
600 }
601
602 #[tokio::test]
603 async fn coverage_ratio_handles_empty_table() -> TestResult {
604 let (_tmp, table) = make_table().await?;
605 let ratio = table
606 .coverage_ratio_for_range(ts_from_secs(0), ts_from_secs(60))
607 .await?;
608 assert_eq!(ratio, 0.0);
609 Ok(())
610 }
611
612 #[tokio::test]
613 async fn coverage_ratio_errors_when_bucket_domain_overflows() -> TestResult {
614 let (_tmp, table) = make_table().await?;
615 let start = ts_from_secs(0);
616 let end = ts_from_secs(((u32::MAX as i64) + 3) * 60);
617
618 let err = table
619 .coverage_ratio_for_range(start, end)
620 .await
621 .expect_err("overflow should error");
622 assert!(matches!(err, TableError::BucketDomainOverflow { .. }));
623 Ok(())
624 }
625
626 #[tokio::test]
627 async fn max_gap_len_reports_missing_run() -> TestResult {
628 let (_tmp, table) = table_with_sparse_coverage().await?;
629 let gap = table
630 .max_gap_len_for_range(ts_from_secs(0), ts_from_secs(240))
631 .await?;
632 assert_eq!(gap, 1);
633 Ok(())
634 }
635
636 #[tokio::test]
637 async fn last_window_returns_none_for_zero_length() -> TestResult {
638 let (_tmp, table) = make_table().await?;
639 let res = table.last_fully_covered_window(ts_from_secs(0), 0).await?;
640 assert!(res.is_none());
641 Ok(())
642 }
643
644 #[tokio::test]
645 async fn last_window_errors_when_bucket_domain_overflows() -> TestResult {
646 let (_tmp, table) = make_table().await?;
647 let ts_end = ts_from_secs(((u32::MAX as i64) + 2) * 60);
649
650 let err = table
651 .last_fully_covered_window(ts_end, 1)
652 .await
653 .expect_err("overflow should error");
654 assert!(matches!(err, TableError::BucketDomainOverflow { .. }));
655 Ok(())
656 }
657
658 #[tokio::test]
659 async fn last_window_respects_half_open_end_and_run_length() -> TestResult {
660 let (_tmp, table) = table_with_contiguous_run().await?;
661 let ts_end = ts_from_secs(360); let win = table
664 .last_fully_covered_window(ts_end, 2)
665 .await?
666 .expect("window should be present");
667 assert_eq!(win, 4u32..=5u32);
668
669 let none = table.last_fully_covered_window(ts_end, 3).await?;
670 assert!(none.is_none());
671 Ok(())
672 }
673
674 #[tokio::test]
675 async fn last_window_errors_when_recovery_fails() -> TestResult {
676 let (_tmp, mut table) = table_with_contiguous_run().await?;
677 table.state_mut().table_coverage = None;
678 let seg_id = table
679 .state()
680 .segments
681 .keys()
682 .next()
683 .cloned()
684 .expect("segment present");
685 table
686 .state_mut()
687 .segments
688 .get_mut(&seg_id)
689 .expect("segment present")
690 .coverage_path = None;
691
692 let err = table
693 .last_fully_covered_window(ts_from_secs(360), 1)
694 .await
695 .expect_err("missing coverage_path should bubble up");
696 assert!(matches!(
697 err,
698 TableError::ExistingSegmentMissingCoverage { segment_id } if segment_id == seg_id
699 ));
700 Ok(())
701 }
702}