Skip to main content

timeseries_table_core/table/
coverage.rs

1//! Coverage state helpers for `TimeSeriesTable`.
2//!
3//! This module reads and repairs table coverage bitmaps persisted alongside
4//! the table. It is responsible for:
5//! - Loading coverage snapshots via the transaction log pointer and enforcing
6//!   bucket compatibility.
7//! - Falling back to unioning segment coverage sidecars when the snapshot
8//!   pointer is missing or unreadable (strict vs recovery modes).
9//! - Optionally healing the snapshot file on disk after a successful recovery
10//!   without touching the transaction log.
11
12use std::path::Path;
13
14use log::warn;
15
16use crate::{
17    coverage::Coverage,
18    coverage::io::{read_coverage_sidecar, write_coverage_sidecar_atomic},
19    transaction_log::table_state::TableCoveragePointer,
20};
21
22use super::{TimeSeriesTable, error::TableError};
23
24impl TimeSeriesTable {
25    /// Rebuild table coverage by reading each segment's coverage sidecar.
26    ///
27    /// This is used as a fallback when the table snapshot coverage is missing or
28    /// unreadable. Requires every segment to have a `coverage_path`.
29    pub(crate) async fn recover_table_coverage_from_segments(
30        &self,
31    ) -> Result<Coverage, TableError> {
32        let mut acc = Coverage::empty();
33
34        for seg in self.state().segments.values() {
35            let path = seg.coverage_path.as_ref().ok_or_else(|| {
36                TableError::ExistingSegmentMissingCoverage {
37                    segment_id: seg.segment_id.clone(),
38                }
39            })?;
40
41            let cov = read_coverage_sidecar(self.location(), Path::new(path))
42                .await
43                .map_err(|source| TableError::SegmentCoverageSidecarRead {
44                    segment_id: seg.segment_id.clone(),
45                    coverage_path: path.clone(),
46                    source: Box::new(source),
47                })?;
48
49            // Prefer an in-place union to avoid repeated allocations.
50            acc.union_inplace(&cov);
51        }
52
53        Ok(acc)
54    }
55
56    fn ensure_table_coverage_bucket_matches(
57        &self,
58        ptr: &TableCoveragePointer,
59    ) -> Result<(), TableError> {
60        let expected = self.index_spec().bucket.clone();
61        if ptr.bucket_spec != expected {
62            return Err(TableError::TableCoverageBucketMismatch {
63                expected,
64                actual: ptr.bucket_spec.clone(),
65                pointer_version: ptr.version,
66            });
67        }
68        Ok(())
69    }
70
71    /// Load table coverage using the snapshot pointer only.
72    ///
73    /// - If there is no snapshot pointer:
74    ///   - If table has zero segments: returns empty coverage.
75    ///   - Else: returns MissingTableCoveragePointer (strict mode).
76    /// - If snapshot exists but is missing/corrupt: returns the snapshot read error.
77    pub async fn load_table_coverage_snapshot_only(&self) -> Result<Coverage, TableError> {
78        match &self.state().table_coverage {
79            None => {
80                if self.state().segments.is_empty() {
81                    return Ok(Coverage::empty());
82                }
83                Err(TableError::MissingTableCoveragePointer)
84            }
85            Some(ptr) => {
86                self.ensure_table_coverage_bucket_matches(ptr)?;
87                read_coverage_sidecar(self.location(), Path::new(&ptr.coverage_path))
88                    .await
89                    .map_err(|source| TableError::CoverageSidecar { source })
90            }
91        }
92    }
93
94    /// Load table coverage for read paths (no writes).
95    ///
96    /// - If snapshot pointer is absent:
97    ///   - If table has zero segments: returns empty coverage.
98    ///   - Else: recovers by unioning segment sidecars.
99    /// - If snapshot pointer exists but snapshot is missing/corrupt:
100    ///   - Recovers by unioning segment sidecars.
101    pub(crate) async fn load_table_snapshot_coverage_readonly(
102        &self,
103    ) -> Result<Coverage, TableError> {
104        match &self.state().table_coverage {
105            None => {
106                if self.state().segments.is_empty() {
107                    return Ok(Coverage::empty());
108                }
109                self.recover_table_coverage_from_segments().await
110            }
111            Some(ptr) => {
112                self.ensure_table_coverage_bucket_matches(ptr)?;
113
114                match read_coverage_sidecar(self.location(), Path::new(&ptr.coverage_path)).await {
115                    Ok(cov) => Ok(cov),
116                    Err(snapshot_err) => {
117                        warn!(
118                            "Failed to read table coverage snapshot at {} (version {}): {:?}. \
119                             Attempting recovery from segment sidecars (readonly).",
120                            ptr.coverage_path, ptr.version, snapshot_err
121                        );
122                        self.recover_table_coverage_from_segments().await
123                    }
124                }
125            }
126        }
127    }
128
129    /// Load table coverage and (optionally) heal the snapshot best-effort.
130    ///
131    /// Same as readonly, but if recovery succeeds and a snapshot pointer exists,
132    /// attempts to overwrite the snapshot file with the recovered bitmap.
133    ///
134    /// IMPORTANT: This does NOT update the transaction log; it only rewrites the
135    /// referenced snapshot path best-effort.
136    pub(crate) async fn load_table_snapshot_coverage_with_heal(
137        &self,
138    ) -> Result<Coverage, TableError> {
139        match &self.state().table_coverage {
140            None => {
141                // If there are no segments, treat as empty (first append case).
142                // If there are segments, this is suspicious in v0.1: fail.
143                if self.state().segments.is_empty() {
144                    return Ok(Coverage::empty());
145                }
146
147                // No snapshot pointer, but segments exist -> recover from segments.
148                self.recover_table_coverage_from_segments().await
149            }
150
151            Some(ptr) => {
152                self.ensure_table_coverage_bucket_matches(ptr)?;
153
154                match read_coverage_sidecar(self.location(), Path::new(&ptr.coverage_path)).await {
155                    Ok(cov) => Ok(cov),
156
157                    Err(snapshot_err) => {
158                        warn!(
159                            "Failed to read table coverage snapshot at {} (version {}): {snapshot_err:?}. \
160                         Attempting recovery from segment sidecars.",
161                            ptr.coverage_path, ptr.version
162                        );
163
164                        // Try recovery from segments.
165                        let recovered = self.recover_table_coverage_from_segments().await?;
166
167                        // Optional: heal snapshot best-effort (do not fail open if this fails)
168                        let _ = write_coverage_sidecar_atomic(
169                            self.location(),
170                            Path::new(&ptr.coverage_path),
171                            &recovered,
172                        )
173                        .await;
174
175                        Ok(recovered)
176                    }
177                }
178            }
179        }
180    }
181}
182// Coverage query APIs for TimeSeriesTable.
183//
184// These APIs:
185// - derive an "expected" bucket domain from a timestamp range (half-open [start, end))
186// - load table coverage (readonly recovery)
187// - reuse crate::coverage APIs (coverage_ratio, max_gap_len, last_window_at_or_before)
188use std::ops::RangeInclusive;
189
190use chrono::{DateTime, Duration, Utc};
191use roaring::RoaringBitmap;
192use snafu::ensure;
193
194use crate::{
195    coverage::Bucket,
196    coverage::bucket::{bucket_id, bucket_range},
197    table::error::{BucketDomainOverflowSnafu, InvalidRangeSnafu},
198};
199
200impl TimeSeriesTable {
201    fn expected_bitmap_for_bucket_range_checked(
202        &self,
203        first: u64,
204        last: u64,
205    ) -> Result<RoaringBitmap, TableError> {
206        if first > u32::MAX as u64 {
207            return Err(TableError::BucketDomainOverflow {
208                last_bucket_id: first,
209                max: u32::MAX,
210            });
211        }
212        if last > u32::MAX as u64 {
213            return Err(TableError::BucketDomainOverflow {
214                last_bucket_id: last,
215                max: u32::MAX,
216            });
217        }
218        Ok(RoaringBitmap::from_iter(
219            (first..=last).map(|b| b as Bucket),
220        ))
221    }
222
223    /// Build an "expected" bitmap for `[start, end)` with validation.
224    ///
225    /// - Uses [`bucket_range`] to compute the inclusive set of bucket ids
226    ///   intersecting the half-open interval `[start, end)`.
227    /// - Returns [`TableError::InvalidRange`] if `start >= end`.
228    /// - Returns [`TableError::BucketDomainOverflow`] if any bucket id
229    ///   would exceed `u32::MAX` (our Roaring bitmap domain in v0.1).
230    fn expected_bitmap_for_time_range_checked(
231        &self,
232        start: DateTime<Utc>,
233        end: DateTime<Utc>,
234    ) -> Result<RoaringBitmap, TableError> {
235        ensure!(start < end, InvalidRangeSnafu { start, end });
236
237        let range = bucket_range(&self.index_spec().bucket, start, end);
238        let first = *range.start();
239        let last = *range.end();
240
241        self.expected_bitmap_for_bucket_range_checked(first, last)
242    }
243
244    fn end_bucket_for_half_open_end(&self, ts_end: DateTime<Utc>) -> Result<u64, TableError> {
245        // For half-open semantics [.., ts_end), subtract 1ns so we pick the
246        // last bucket that still intersects the interval.
247
248        let end_adj = ts_end.checked_sub_signed(Duration::nanoseconds(1)).ok_or(
249            TableError::InvalidRange {
250                start: ts_end,
251                end: ts_end,
252            },
253        )?;
254        Ok(bucket_id(&self.index_spec().bucket, end_adj))
255    }
256
257    // ---- public query APIs ----
258
259    /// Coverage ratio in [0.0, 1.0] for the half-open time range [start, end).
260    ///
261    /// Uses the table-level coverage snapshot (with readonly recovery from segments if needed).
262    ///
263    /// # Errors
264    /// - [`TableError::InvalidRange`] if `start >= end`.
265    /// - [`TableError::BucketDomainOverflow`] if the derived bucket ids exceed `u32::MAX`.
266    ///
267    /// # Examples
268    /// ```
269    /// use chrono::{TimeZone, Utc};
270    /// # use timeseries_table_core::{storage::TableLocation, table::TimeSeriesTable};
271    /// # async fn demo(table: &TimeSeriesTable) -> Result<(), timeseries_table_core::table::TableError> {
272    /// let start = Utc.timestamp_opt(0, 0).single().unwrap();
273    /// let end = Utc.timestamp_opt(120, 0).single().unwrap();
274    /// let ratio = table.coverage_ratio_for_range(start, end).await?;
275    /// # let _ = ratio;
276    /// # Ok(())
277    /// # }
278    /// ```
279    pub async fn coverage_ratio_for_range(
280        &self,
281        start: DateTime<Utc>,
282        end: DateTime<Utc>,
283    ) -> Result<f64, TableError> {
284        let expected = self.expected_bitmap_for_time_range_checked(start, end)?;
285        let cov = self.load_table_snapshot_coverage_readonly().await?;
286        Ok(cov.coverage_ratio(&expected))
287    }
288
289    /// Maximum contiguous missing run length (in buckets) for the half-open time range [start, end).
290    ///
291    /// # Errors
292    /// - [`TableError::InvalidRange`] if `start >= end`.
293    /// - [`TableError::BucketDomainOverflow`] if the derived bucket ids exceed `u32::MAX`.
294    ///
295    /// # Examples
296    /// ```
297    /// use chrono::{TimeZone, Utc};
298    /// # use timeseries_table_core::{storage::TableLocation, table::TimeSeriesTable};
299    /// # async fn demo(table: &TimeSeriesTable) -> Result<(), timeseries_table_core::table::TableError> {
300    /// let start = Utc.timestamp_opt(0, 0).single().unwrap();
301    /// let end = Utc.timestamp_opt(180, 0).single().unwrap();
302    /// let gap = table.max_gap_len_for_range(start, end).await?;
303    /// # let _ = gap;
304    /// # Ok(())
305    /// # }
306    /// ```
307    pub async fn max_gap_len_for_range(
308        &self,
309        start: DateTime<Utc>,
310        end: DateTime<Utc>,
311    ) -> Result<u64, TableError> {
312        let expected = self.expected_bitmap_for_time_range_checked(start, end)?;
313        let cov = self.load_table_snapshot_coverage_readonly().await?;
314        Ok(cov.max_gap_len(&expected))
315    }
316
317    /// Return the last fully covered contiguous window (in bucket space) of length >= window_len_buckets,
318    /// ending at or before ts_end.
319    ///
320    /// Notes:
321    /// - This returns a bucket-id RangeInclusive in the v0.1 bucket domain (u32).
322    /// - Returns `None` when `window_len_buckets == 0` or when no fully covered window is found.
323    ///
324    /// # Errors
325    /// - [`TableError::BucketDomainOverflow`] if `ts_end` maps beyond the u32 bucket domain.
326    ///
327    /// # Examples
328    /// ```
329    /// use chrono::{TimeZone, Utc};
330    /// # use timeseries_table_core::{storage::TableLocation, table::TimeSeriesTable};
331    /// # async fn demo(table: &TimeSeriesTable) -> Result<(), timeseries_table_core::table::TableError> {
332    /// let ts_end = Utc.timestamp_opt(360, 0).single().unwrap(); // end of bucket 5
333    /// let window = table.last_fully_covered_window(ts_end, 2).await?;
334    /// # let _ = window;
335    /// # Ok(())
336    /// # }
337    /// ```
338    pub async fn last_fully_covered_window(
339        &self,
340        ts_end: DateTime<Utc>,
341        window_len_buckets: u64,
342    ) -> Result<Option<RangeInclusive<Bucket>>, TableError> {
343        if window_len_buckets == 0 {
344            return Ok(None);
345        }
346
347        let cov = self.load_table_snapshot_coverage_readonly().await?;
348        let end_bucket_u64 = self.end_bucket_for_half_open_end(ts_end)?;
349
350        ensure!(
351            end_bucket_u64 <= u32::MAX as u64,
352            BucketDomainOverflowSnafu {
353                last_bucket_id: end_bucket_u64,
354                max: u32::MAX,
355            }
356        );
357
358        let end_bucket = end_bucket_u64 as Bucket;
359        Ok(cov.last_window_at_or_before(end_bucket, window_len_buckets))
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use crate::{
367        metadata::table_metadata::TimeBucket,
368        storage::TableLocation,
369        table::test_util::{
370            TestResult, TestRow, make_basic_table_meta, utc_datetime, write_test_parquet,
371        },
372    };
373    use chrono::TimeZone;
374    use tempfile::TempDir;
375
376    type HelperResult<T> = Result<T, Box<dyn std::error::Error>>;
377
378    fn ts_from_secs(secs: i64) -> DateTime<Utc> {
379        Utc.timestamp_opt(secs, 0)
380            .single()
381            .expect("valid timestamp")
382    }
383
384    async fn make_table() -> HelperResult<(TempDir, TimeSeriesTable)> {
385        let tmp = TempDir::new()?;
386        let location = TableLocation::local(tmp.path());
387        let table = TimeSeriesTable::create(location, make_basic_table_meta()).await?;
388        Ok((tmp, table))
389    }
390
391    async fn append_segment(
392        table: &mut TimeSeriesTable,
393        tmp: &TempDir,
394        rel_path: &str,
395        rows: &[TestRow],
396    ) -> HelperResult<()> {
397        let abs = tmp.path().join(rel_path);
398        write_test_parquet(&abs, true, false, rows)?;
399        table.append_parquet_segment(rel_path, "ts").await?;
400        Ok(())
401    }
402
403    async fn table_with_sparse_coverage() -> HelperResult<(TempDir, TimeSeriesTable)> {
404        // Buckets covered: 0, 1, 3 (gap at 2).
405        let (tmp, mut table) = make_table().await?;
406        append_segment(
407            &mut table,
408            &tmp,
409            "data/sparse.parquet",
410            &[
411                TestRow {
412                    ts_millis: 1_000,
413                    symbol: "A",
414                    price: 1.0,
415                },
416                TestRow {
417                    ts_millis: 61_000,
418                    symbol: "A",
419                    price: 2.0,
420                },
421                TestRow {
422                    ts_millis: 180_000,
423                    symbol: "A",
424                    price: 3.0,
425                },
426            ],
427        )
428        .await?;
429        Ok((tmp, table))
430    }
431
432    async fn table_with_contiguous_run() -> HelperResult<(TempDir, TimeSeriesTable)> {
433        // Buckets covered: 4 and 5 (contiguous run).
434        let (tmp, mut table) = make_table().await?;
435        append_segment(
436            &mut table,
437            &tmp,
438            "data/window.parquet",
439            &[
440                TestRow {
441                    ts_millis: 240_000,
442                    symbol: "A",
443                    price: 1.0,
444                },
445                TestRow {
446                    ts_millis: 300_000,
447                    symbol: "A",
448                    price: 2.0,
449                },
450            ],
451        )
452        .await?;
453        Ok((tmp, table))
454    }
455
456    #[tokio::test]
457    async fn expected_bitmap_rejects_invalid_range() -> TestResult {
458        let (_tmp, table) = make_table().await?;
459        let ts = utc_datetime(2024, 1, 1, 0, 0, 0);
460
461        let err = table
462            .expected_bitmap_for_time_range_checked(ts, ts)
463            .expect_err("start >= end should be invalid");
464        assert!(matches!(err, TableError::InvalidRange { .. }));
465        Ok(())
466    }
467
468    #[tokio::test]
469    async fn expected_bitmap_errors_on_bucket_overflow() -> TestResult {
470        let (_tmp, table) = make_table().await?;
471        let start = ts_from_secs(0);
472        // Choose an end far enough in the future that the bucket id exceeds u32::MAX.
473        let end = ts_from_secs(((u32::MAX as i64) + 2) * 60);
474
475        let err = table
476            .expected_bitmap_for_time_range_checked(start, end)
477            .expect_err("bucket domain overflow should error");
478
479        match err {
480            TableError::BucketDomainOverflow { last_bucket_id, .. } => {
481                assert!(last_bucket_id > u32::MAX as u64);
482            }
483            other => panic!("unexpected error: {other:?}"),
484        }
485        Ok(())
486    }
487
488    #[tokio::test]
489    async fn expected_bitmap_errors_on_first_bucket_overflow() -> TestResult {
490        let (_tmp, table) = make_table().await?;
491
492        let first = (u32::MAX as u64) + 1;
493        let err = table
494            .expected_bitmap_for_bucket_range_checked(first, first)
495            .expect_err("first bucket overflow should error");
496
497        match err {
498            TableError::BucketDomainOverflow { last_bucket_id, .. } => {
499                assert_eq!(last_bucket_id, first);
500            }
501            other => panic!("unexpected error: {other:?}"),
502        }
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn expected_bitmap_covers_inclusive_bucket_range() -> TestResult {
508        let (_tmp, table) = make_table().await?;
509        let start = ts_from_secs(0);
510        let end = ts_from_secs(180); // covers buckets 0,1,2 with 1-minute bucket spec
511
512        let bitmap = table.expected_bitmap_for_time_range_checked(start, end)?;
513        let first = bucket_id(&table.index_spec().bucket, start);
514        let last = bucket_id(&table.index_spec().bucket, end - Duration::nanoseconds(1));
515        assert_eq!(bitmap.len(), (last - first + 1) as u64);
516        for b in first..=last {
517            assert!(bitmap.contains(b as Bucket));
518        }
519        Ok(())
520    }
521
522    #[tokio::test]
523    async fn coverage_ratio_uses_snapshot_when_present() -> TestResult {
524        let (_tmp, table) = table_with_sparse_coverage().await?;
525        let start = ts_from_secs(0);
526        let end = ts_from_secs(240); // buckets 0,1,2,3 expected
527
528        let ratio = table.coverage_ratio_for_range(start, end).await?;
529        assert!((ratio - 0.75).abs() < 1e-12);
530        Ok(())
531    }
532
533    #[tokio::test]
534    async fn coverage_ratio_recovers_when_snapshot_missing() -> TestResult {
535        let (_tmp, mut table) = table_with_sparse_coverage().await?;
536        table.state_mut().table_coverage = None;
537
538        let ratio = table
539            .coverage_ratio_for_range(ts_from_secs(0), ts_from_secs(240))
540            .await?;
541        assert!((ratio - 0.75).abs() < 1e-12);
542        Ok(())
543    }
544
545    #[tokio::test]
546    async fn coverage_ratio_errors_when_recovery_missing_segment_coverage_path() -> TestResult {
547        let (_tmp, mut table) = table_with_sparse_coverage().await?;
548        table.state_mut().table_coverage = None;
549        let seg_id = table
550            .state()
551            .segments
552            .keys()
553            .next()
554            .cloned()
555            .expect("segment present");
556        table
557            .state_mut()
558            .segments
559            .get_mut(&seg_id)
560            .expect("segment present")
561            .coverage_path = None;
562
563        let err = table
564            .coverage_ratio_for_range(ts_from_secs(0), ts_from_secs(240))
565            .await
566            .expect_err("missing segment coverage_path should bubble up");
567        assert!(matches!(
568            err,
569            TableError::ExistingSegmentMissingCoverage { segment_id } if segment_id == seg_id
570        ));
571        Ok(())
572    }
573
574    #[tokio::test]
575    async fn coverage_ratio_errors_on_bucket_mismatch() -> TestResult {
576        let (_tmp, mut table) = table_with_sparse_coverage().await?;
577        let mut ptr = table
578            .state()
579            .table_coverage
580            .clone()
581            .expect("snapshot pointer present");
582        ptr.bucket_spec = TimeBucket::Hours(1);
583        table.state_mut().table_coverage = Some(ptr.clone());
584
585        let err = table
586            .coverage_ratio_for_range(ts_from_secs(0), ts_from_secs(240))
587            .await
588            .expect_err("mismatched bucket spec should error");
589
590        match err {
591            TableError::TableCoverageBucketMismatch {
592                expected, actual, ..
593            } => {
594                assert_eq!(expected, table.index_spec().bucket.clone());
595                assert_eq!(actual, ptr.bucket_spec);
596            }
597            other => panic!("unexpected error: {other:?}"),
598        }
599        Ok(())
600    }
601
602    #[tokio::test]
603    async fn coverage_ratio_handles_empty_table() -> TestResult {
604        let (_tmp, table) = make_table().await?;
605        let ratio = table
606            .coverage_ratio_for_range(ts_from_secs(0), ts_from_secs(60))
607            .await?;
608        assert_eq!(ratio, 0.0);
609        Ok(())
610    }
611
612    #[tokio::test]
613    async fn coverage_ratio_errors_when_bucket_domain_overflows() -> TestResult {
614        let (_tmp, table) = make_table().await?;
615        let start = ts_from_secs(0);
616        let end = ts_from_secs(((u32::MAX as i64) + 3) * 60);
617
618        let err = table
619            .coverage_ratio_for_range(start, end)
620            .await
621            .expect_err("overflow should error");
622        assert!(matches!(err, TableError::BucketDomainOverflow { .. }));
623        Ok(())
624    }
625
626    #[tokio::test]
627    async fn max_gap_len_reports_missing_run() -> TestResult {
628        let (_tmp, table) = table_with_sparse_coverage().await?;
629        let gap = table
630            .max_gap_len_for_range(ts_from_secs(0), ts_from_secs(240))
631            .await?;
632        assert_eq!(gap, 1);
633        Ok(())
634    }
635
636    #[tokio::test]
637    async fn last_window_returns_none_for_zero_length() -> TestResult {
638        let (_tmp, table) = make_table().await?;
639        let res = table.last_fully_covered_window(ts_from_secs(0), 0).await?;
640        assert!(res.is_none());
641        Ok(())
642    }
643
644    #[tokio::test]
645    async fn last_window_errors_when_bucket_domain_overflows() -> TestResult {
646        let (_tmp, table) = make_table().await?;
647        // Pick an end timestamp that maps past the u32 bucket domain to force an error.
648        let ts_end = ts_from_secs(((u32::MAX as i64) + 2) * 60);
649
650        let err = table
651            .last_fully_covered_window(ts_end, 1)
652            .await
653            .expect_err("overflow should error");
654        assert!(matches!(err, TableError::BucketDomainOverflow { .. }));
655        Ok(())
656    }
657
658    #[tokio::test]
659    async fn last_window_respects_half_open_end_and_run_length() -> TestResult {
660        let (_tmp, table) = table_with_contiguous_run().await?;
661        let ts_end = ts_from_secs(360); // exactly at the start of bucket 6
662
663        let win = table
664            .last_fully_covered_window(ts_end, 2)
665            .await?
666            .expect("window should be present");
667        assert_eq!(win, 4u32..=5u32);
668
669        let none = table.last_fully_covered_window(ts_end, 3).await?;
670        assert!(none.is_none());
671        Ok(())
672    }
673
674    #[tokio::test]
675    async fn last_window_errors_when_recovery_fails() -> TestResult {
676        let (_tmp, mut table) = table_with_contiguous_run().await?;
677        table.state_mut().table_coverage = None;
678        let seg_id = table
679            .state()
680            .segments
681            .keys()
682            .next()
683            .cloned()
684            .expect("segment present");
685        table
686            .state_mut()
687            .segments
688            .get_mut(&seg_id)
689            .expect("segment present")
690            .coverage_path = None;
691
692        let err = table
693            .last_fully_covered_window(ts_from_secs(360), 1)
694            .await
695            .expect_err("missing coverage_path should bubble up");
696        assert!(matches!(
697            err,
698            TableError::ExistingSegmentMissingCoverage { segment_id } if segment_id == seg_id
699        ));
700        Ok(())
701    }
702}