Skip to main content

timeseries_table_core/coverage/
io.rs

1//! Coverage sidecar file management.
2//!
3//! This module provides helpers for reading and writing coverage data to
4//! sidecar files in the table storage directory. It bridges the coverage
5//! module (serialization/deserialization) with the storage layer (disk I/O).
6//!
7//! # Overview
8//!
9//! Coverage sidecars are stored alongside table data and segments to track which
10//! time buckets have been observed. This module abstracts the I/O details:
11//!
12//! - Serializes [`Coverage`] instances to bytes using the RoaringBitmap format.
13//! - Writes bytes to the table storage with atomic or new-only semantics.
14//! - Handles errors from layout validation, serialization, and storage layers.
15//!
16//! # Atomic vs. New-Only Writes
17//!
18//! - **Atomic**: Writes using [`write_coverage_sidecar_atomic`] are safe for
19//!   overwriting existing sidecars (e.g., updating a table snapshot).
20//! - **New-Only**: Writes using [`write_coverage_sidecar_new`] fail if the file
21//!   already exists (e.g., creating per-segment coverage for the first time).
22
23use std::path::Path;
24
25use snafu::{ResultExt, Snafu};
26
27use crate::{
28    coverage::layout::CoverageLayoutError,
29    coverage::{
30        Coverage,
31        serde::{CoverageSerdeError, coverage_from_bytes, coverage_to_bytes},
32    },
33    storage::{self, StorageError, TableLocation},
34};
35
36/// Errors that can occur during coverage sidecar operations.
37///
38/// These errors propagate from lower layers: layout validation, serialization,
39/// storage, and file I/O. Callers should inspect the variant to determine
40/// the nature of the failure and how to recover.
41#[derive(Debug, Snafu)]
42pub enum CoverageError {
43    /// Layout validation error (e.g., invalid coverage ID or path).
44    #[snafu(display("{source}"))]
45    Layout {
46        /// The underlying layout error.
47        source: CoverageLayoutError,
48    },
49
50    /// Serialization or deserialization error.
51    #[snafu(display("{source}"))]
52    Serde {
53        /// The underlying serde error.
54        source: CoverageSerdeError,
55    },
56
57    /// Coverage sidecar file was not found at the expected path.
58    #[snafu(display("Coverage sidecar not found: {path}"))]
59    NotFound {
60        /// The path where the sidecar was expected.
61        path: String,
62    },
63
64    /// Storage I/O error (read, write, or metadata operations).
65    #[snafu(display("Storage error while reading/writing coverage sidecar: {source}"))]
66    Storage {
67        /// The underlying storage error.
68        #[snafu(source, backtrace)]
69        source: StorageError,
70    },
71}
72
73/// Write a coverage bitmap to a sidecar file using atomic semantics.
74///
75/// Atomically writes the given [`Coverage`] to a file at `rel_path` within the
76/// table storage. If the file already exists, it will be overwritten. This is
77/// suitable for updating table-level coverage snapshots or refreshing segment
78/// coverage metadata.
79///
80/// # Arguments
81///
82/// * `location` - The table storage location.
83/// * `rel_path` - The relative path within the table root where the sidecar should be written.
84/// * `cov` - The coverage bitmap to serialize and write.
85///
86/// # Returns
87///
88/// Returns `Ok(())` if the sidecar was written successfully, or an error if
89/// serialization or storage fails.
90///
91/// # Errors
92///
93/// Returns [`CoverageError`] if:
94/// - Serialization of the coverage fails ([`CoverageError::Serde`]).
95/// - Storage I/O fails ([`CoverageError::Storage`]).
96pub async fn write_coverage_sidecar_atomic(
97    location: &TableLocation,
98    rel_path: &Path,
99    cov: &Coverage,
100) -> Result<(), CoverageError> {
101    let bytes = coverage_to_bytes(cov).context(SerdeSnafu)?;
102    storage::write_atomic(location.as_ref(), rel_path, &bytes)
103        .await
104        .context(StorageSnafu)?;
105    Ok(())
106}
107
108/// Write a coverage bitmap to a sidecar file with exclusive creation.
109///
110/// Writes the given [`Coverage`] to a file at `rel_path` within the table storage,
111/// but only if the file does not already exist. This is suitable for creating
112/// per-segment coverage files for the first time, ensuring that accidental
113/// overwrites do not occur.
114///
115/// # Arguments
116///
117/// * `location` - The table storage location.
118/// * `rel_path` - The relative path within the table root where the sidecar should be written.
119/// * `cov` - The coverage bitmap to serialize and write.
120///
121/// # Returns
122///
123/// Returns `Ok(())` if the sidecar was created successfully, or an error if
124/// the file already exists or if serialization/storage fails.
125///
126/// # Errors
127///
128/// Returns [`CoverageError`] if:
129/// - Serialization of the coverage fails ([`CoverageError::Serde`]).
130/// - The file already exists (storage layer dependent).
131/// - Storage I/O fails for other reasons ([`CoverageError::Storage`]).
132pub async fn write_coverage_sidecar_new(
133    location: &TableLocation,
134    rel_path: &Path,
135    cov: &Coverage,
136) -> Result<(), CoverageError> {
137    let bytes = coverage_to_bytes(cov).context(SerdeSnafu)?;
138    storage::write_new(location.as_ref(), rel_path, &bytes)
139        .await
140        .context(StorageSnafu)?;
141    Ok(())
142}
143
144/// Write a coverage bitmap as bytes to a sidecar file with exclusive creation.
145///
146/// # Errors
147///
148/// Returns [`CoverageError::Storage`] when the storage layer rejects the write,
149/// including when the file already exists. Callers that perform idempotent
150/// writes may choose to treat an `AlreadyExists` storage error as non-fatal.
151pub async fn write_coverage_sidecar_new_bytes(
152    location: &TableLocation,
153    rel_path: &Path,
154    bytes: &[u8],
155) -> Result<(), CoverageError> {
156    storage::write_new(location.as_ref(), rel_path, bytes)
157        .await
158        .context(StorageSnafu)?;
159    Ok(())
160}
161
162/// Read a coverage bitmap from a sidecar file.
163///
164/// Reads and deserializes a [`Coverage`] instance from a sidecar file at `rel_path`
165/// within the table storage. If the file is not found, returns a [`CoverageError::NotFound`]
166/// error. If deserialization fails, returns a [`CoverageError::Serde`] error.
167///
168/// # Arguments
169///
170/// * `location` - The table storage location.
171/// * `rel_path` - The relative path within the table root where the sidecar is located.
172///
173/// # Returns
174///
175/// Returns `Ok(coverage)` if the sidecar was read and deserialized successfully,
176/// or an error if the file is not found or deserialization fails.
177///
178/// # Errors
179///
180/// Returns [`CoverageError`] if:
181/// - The file does not exist ([`CoverageError::NotFound`]).
182/// - Deserialization of the coverage fails ([`CoverageError::Serde`]).
183/// - Storage I/O fails for other reasons ([`CoverageError::Storage`]).
184pub async fn read_coverage_sidecar(
185    location: &TableLocation,
186    rel_path: &Path,
187) -> Result<Coverage, CoverageError> {
188    match storage::read_all_bytes(location.as_ref(), rel_path).await {
189        Ok(bytes) => coverage_from_bytes(&bytes).context(SerdeSnafu),
190        Err(StorageError::NotFound { path, .. }) => Err(CoverageError::NotFound { path }),
191        Err(e) => Err(CoverageError::Storage { source: e }),
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use crate::{coverage::serde::coverage_from_bytes, storage::StorageLocation};
199    use tempfile::TempDir;
200
201    fn temp_location() -> (TempDir, TableLocation) {
202        let tmp = TempDir::new().expect("tempdir");
203        let loc = TableLocation::local(tmp.path());
204        (tmp, loc)
205    }
206
207    #[tokio::test]
208    async fn write_atomic_overwrites_existing() {
209        let (_tmp, loc) = temp_location();
210        let rel = Path::new("_coverage/table/1.roar");
211
212        let cov1 = Coverage::from_iter(vec![1u32, 2, 3]);
213        write_coverage_sidecar_atomic(&loc, rel, &cov1)
214            .await
215            .expect("first write");
216
217        // Overwrite with different coverage
218        let cov2 = Coverage::from_iter(vec![10u32, 11]);
219        write_coverage_sidecar_atomic(&loc, rel, &cov2)
220            .await
221            .expect("overwrite");
222
223        // Read back and verify it matches the second write
224        let abs = match &loc.as_ref() {
225            StorageLocation::Local(root) => root.join(rel),
226        };
227        let bytes = std::fs::read(abs).expect("read file");
228        let restored = coverage_from_bytes(&bytes).expect("deserialize");
229        assert_eq!(cov2.present(), restored.present());
230    }
231
232    #[tokio::test]
233    async fn write_new_fails_if_exists() {
234        let (_tmp, loc) = temp_location();
235        let rel = Path::new("_coverage/segments/seg-1.roar");
236
237        let cov = Coverage::from_iter(vec![5u32]);
238        write_coverage_sidecar_new(&loc, rel, &cov)
239            .await
240            .expect("first write");
241
242        let err = write_coverage_sidecar_new(&loc, rel, &cov)
243            .await
244            .expect_err("second write should fail");
245
246        match err {
247            CoverageError::Storage {
248                source: StorageError::AlreadyExists { .. },
249                ..
250            } => {}
251            _ => panic!("expected AlreadyExists storage error"),
252        }
253    }
254
255    #[tokio::test]
256    async fn read_sidecar_round_trip() {
257        let (_tmp, loc) = temp_location();
258        let rel = Path::new("_coverage/table/2.roar");
259
260        let cov = Coverage::from_iter(vec![1u32, 3, 5, 7]);
261        write_coverage_sidecar_atomic(&loc, rel, &cov)
262            .await
263            .expect("write sidecar");
264
265        let restored = read_coverage_sidecar(&loc, rel)
266            .await
267            .expect("read sidecar");
268        assert_eq!(cov.present(), restored.present());
269    }
270
271    #[tokio::test]
272    async fn read_sidecar_missing_returns_not_found() {
273        let (_tmp, loc) = temp_location();
274        let rel = Path::new("_coverage/table/missing.roar");
275
276        let err = read_coverage_sidecar(&loc, rel)
277            .await
278            .expect_err("should be missing");
279
280        match err {
281            CoverageError::NotFound { path } => {
282                assert!(path.contains("missing.roar"));
283            }
284            _ => panic!("expected NotFound error"),
285        }
286    }
287
288    #[tokio::test]
289    async fn read_sidecar_corrupt_bytes_returns_serde_error() {
290        let (tmp, loc) = temp_location();
291        let rel = Path::new("_coverage/table/corrupt.roar");
292
293        // Write garbage bytes to the expected path
294        let abs = match &loc.as_ref() {
295            StorageLocation::Local(root) => root.join(rel),
296        };
297        std::fs::create_dir_all(abs.parent().unwrap()).expect("create dirs");
298        std::fs::write(&abs, b"not a bitmap").expect("write corrupt");
299
300        let err = read_coverage_sidecar(&loc, rel)
301            .await
302            .expect_err("should fail to deserialize");
303
304        match err {
305            CoverageError::Serde { .. } => {}
306            _ => panic!("expected Serde error"),
307        }
308
309        drop(tmp); // ensure tempdir not optimized away
310    }
311}