timeseries_table_core/coverage/io.rs
1//! Coverage sidecar file management.
2//!
3//! This module provides helpers for reading and writing coverage data to
4//! sidecar files in the table storage directory. It bridges the coverage
5//! module (serialization/deserialization) with the storage layer (disk I/O).
6//!
7//! # Overview
8//!
9//! Coverage sidecars are stored alongside table data and segments to track which
10//! time buckets have been observed. This module abstracts the I/O details:
11//!
12//! - Serializes [`Coverage`] instances to bytes using the RoaringBitmap format.
13//! - Writes bytes to the table storage with atomic or new-only semantics.
14//! - Handles errors from layout validation, serialization, and storage layers.
15//!
16//! # Atomic vs. New-Only Writes
17//!
18//! - **Atomic**: Writes using [`write_coverage_sidecar_atomic`] are safe for
19//! overwriting existing sidecars (e.g., updating a table snapshot).
20//! - **New-Only**: Writes using [`write_coverage_sidecar_new`] fail if the file
21//! already exists (e.g., creating per-segment coverage for the first time).
22
23use std::path::Path;
24
25use snafu::{ResultExt, Snafu};
26
27use crate::{
28 coverage::layout::CoverageLayoutError,
29 coverage::{
30 Coverage,
31 serde::{CoverageSerdeError, coverage_from_bytes, coverage_to_bytes},
32 },
33 storage::{self, StorageError, TableLocation},
34};
35
36/// Errors that can occur during coverage sidecar operations.
37///
38/// These errors propagate from lower layers: layout validation, serialization,
39/// storage, and file I/O. Callers should inspect the variant to determine
40/// the nature of the failure and how to recover.
41#[derive(Debug, Snafu)]
42pub enum CoverageError {
43 /// Layout validation error (e.g., invalid coverage ID or path).
44 #[snafu(display("{source}"))]
45 Layout {
46 /// The underlying layout error.
47 source: CoverageLayoutError,
48 },
49
50 /// Serialization or deserialization error.
51 #[snafu(display("{source}"))]
52 Serde {
53 /// The underlying serde error.
54 source: CoverageSerdeError,
55 },
56
57 /// Coverage sidecar file was not found at the expected path.
58 #[snafu(display("Coverage sidecar not found: {path}"))]
59 NotFound {
60 /// The path where the sidecar was expected.
61 path: String,
62 },
63
64 /// Storage I/O error (read, write, or metadata operations).
65 #[snafu(display("Storage error while reading/writing coverage sidecar: {source}"))]
66 Storage {
67 /// The underlying storage error.
68 #[snafu(source, backtrace)]
69 source: StorageError,
70 },
71}
72
73/// Write a coverage bitmap to a sidecar file using atomic semantics.
74///
75/// Atomically writes the given [`Coverage`] to a file at `rel_path` within the
76/// table storage. If the file already exists, it will be overwritten. This is
77/// suitable for updating table-level coverage snapshots or refreshing segment
78/// coverage metadata.
79///
80/// # Arguments
81///
82/// * `location` - The table storage location.
83/// * `rel_path` - The relative path within the table root where the sidecar should be written.
84/// * `cov` - The coverage bitmap to serialize and write.
85///
86/// # Returns
87///
88/// Returns `Ok(())` if the sidecar was written successfully, or an error if
89/// serialization or storage fails.
90///
91/// # Errors
92///
93/// Returns [`CoverageError`] if:
94/// - Serialization of the coverage fails ([`CoverageError::Serde`]).
95/// - Storage I/O fails ([`CoverageError::Storage`]).
96pub async fn write_coverage_sidecar_atomic(
97 location: &TableLocation,
98 rel_path: &Path,
99 cov: &Coverage,
100) -> Result<(), CoverageError> {
101 let bytes = coverage_to_bytes(cov).context(SerdeSnafu)?;
102 storage::write_atomic(location.as_ref(), rel_path, &bytes)
103 .await
104 .context(StorageSnafu)?;
105 Ok(())
106}
107
108/// Write a coverage bitmap to a sidecar file with exclusive creation.
109///
110/// Writes the given [`Coverage`] to a file at `rel_path` within the table storage,
111/// but only if the file does not already exist. This is suitable for creating
112/// per-segment coverage files for the first time, ensuring that accidental
113/// overwrites do not occur.
114///
115/// # Arguments
116///
117/// * `location` - The table storage location.
118/// * `rel_path` - The relative path within the table root where the sidecar should be written.
119/// * `cov` - The coverage bitmap to serialize and write.
120///
121/// # Returns
122///
123/// Returns `Ok(())` if the sidecar was created successfully, or an error if
124/// the file already exists or if serialization/storage fails.
125///
126/// # Errors
127///
128/// Returns [`CoverageError`] if:
129/// - Serialization of the coverage fails ([`CoverageError::Serde`]).
130/// - The file already exists (storage layer dependent).
131/// - Storage I/O fails for other reasons ([`CoverageError::Storage`]).
132pub async fn write_coverage_sidecar_new(
133 location: &TableLocation,
134 rel_path: &Path,
135 cov: &Coverage,
136) -> Result<(), CoverageError> {
137 let bytes = coverage_to_bytes(cov).context(SerdeSnafu)?;
138 storage::write_new(location.as_ref(), rel_path, &bytes)
139 .await
140 .context(StorageSnafu)?;
141 Ok(())
142}
143
144/// Write a coverage bitmap as bytes to a sidecar file with exclusive creation.
145///
146/// # Errors
147///
148/// Returns [`CoverageError::Storage`] when the storage layer rejects the write,
149/// including when the file already exists. Callers that perform idempotent
150/// writes may choose to treat an `AlreadyExists` storage error as non-fatal.
151pub async fn write_coverage_sidecar_new_bytes(
152 location: &TableLocation,
153 rel_path: &Path,
154 bytes: &[u8],
155) -> Result<(), CoverageError> {
156 storage::write_new(location.as_ref(), rel_path, bytes)
157 .await
158 .context(StorageSnafu)?;
159 Ok(())
160}
161
162/// Read a coverage bitmap from a sidecar file.
163///
164/// Reads and deserializes a [`Coverage`] instance from a sidecar file at `rel_path`
165/// within the table storage. If the file is not found, returns a [`CoverageError::NotFound`]
166/// error. If deserialization fails, returns a [`CoverageError::Serde`] error.
167///
168/// # Arguments
169///
170/// * `location` - The table storage location.
171/// * `rel_path` - The relative path within the table root where the sidecar is located.
172///
173/// # Returns
174///
175/// Returns `Ok(coverage)` if the sidecar was read and deserialized successfully,
176/// or an error if the file is not found or deserialization fails.
177///
178/// # Errors
179///
180/// Returns [`CoverageError`] if:
181/// - The file does not exist ([`CoverageError::NotFound`]).
182/// - Deserialization of the coverage fails ([`CoverageError::Serde`]).
183/// - Storage I/O fails for other reasons ([`CoverageError::Storage`]).
184pub async fn read_coverage_sidecar(
185 location: &TableLocation,
186 rel_path: &Path,
187) -> Result<Coverage, CoverageError> {
188 match storage::read_all_bytes(location.as_ref(), rel_path).await {
189 Ok(bytes) => coverage_from_bytes(&bytes).context(SerdeSnafu),
190 Err(StorageError::NotFound { path, .. }) => Err(CoverageError::NotFound { path }),
191 Err(e) => Err(CoverageError::Storage { source: e }),
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use crate::{coverage::serde::coverage_from_bytes, storage::StorageLocation};
199 use tempfile::TempDir;
200
201 fn temp_location() -> (TempDir, TableLocation) {
202 let tmp = TempDir::new().expect("tempdir");
203 let loc = TableLocation::local(tmp.path());
204 (tmp, loc)
205 }
206
207 #[tokio::test]
208 async fn write_atomic_overwrites_existing() {
209 let (_tmp, loc) = temp_location();
210 let rel = Path::new("_coverage/table/1.roar");
211
212 let cov1 = Coverage::from_iter(vec![1u32, 2, 3]);
213 write_coverage_sidecar_atomic(&loc, rel, &cov1)
214 .await
215 .expect("first write");
216
217 // Overwrite with different coverage
218 let cov2 = Coverage::from_iter(vec![10u32, 11]);
219 write_coverage_sidecar_atomic(&loc, rel, &cov2)
220 .await
221 .expect("overwrite");
222
223 // Read back and verify it matches the second write
224 let abs = match &loc.as_ref() {
225 StorageLocation::Local(root) => root.join(rel),
226 };
227 let bytes = std::fs::read(abs).expect("read file");
228 let restored = coverage_from_bytes(&bytes).expect("deserialize");
229 assert_eq!(cov2.present(), restored.present());
230 }
231
232 #[tokio::test]
233 async fn write_new_fails_if_exists() {
234 let (_tmp, loc) = temp_location();
235 let rel = Path::new("_coverage/segments/seg-1.roar");
236
237 let cov = Coverage::from_iter(vec![5u32]);
238 write_coverage_sidecar_new(&loc, rel, &cov)
239 .await
240 .expect("first write");
241
242 let err = write_coverage_sidecar_new(&loc, rel, &cov)
243 .await
244 .expect_err("second write should fail");
245
246 match err {
247 CoverageError::Storage {
248 source: StorageError::AlreadyExists { .. },
249 ..
250 } => {}
251 _ => panic!("expected AlreadyExists storage error"),
252 }
253 }
254
255 #[tokio::test]
256 async fn read_sidecar_round_trip() {
257 let (_tmp, loc) = temp_location();
258 let rel = Path::new("_coverage/table/2.roar");
259
260 let cov = Coverage::from_iter(vec![1u32, 3, 5, 7]);
261 write_coverage_sidecar_atomic(&loc, rel, &cov)
262 .await
263 .expect("write sidecar");
264
265 let restored = read_coverage_sidecar(&loc, rel)
266 .await
267 .expect("read sidecar");
268 assert_eq!(cov.present(), restored.present());
269 }
270
271 #[tokio::test]
272 async fn read_sidecar_missing_returns_not_found() {
273 let (_tmp, loc) = temp_location();
274 let rel = Path::new("_coverage/table/missing.roar");
275
276 let err = read_coverage_sidecar(&loc, rel)
277 .await
278 .expect_err("should be missing");
279
280 match err {
281 CoverageError::NotFound { path } => {
282 assert!(path.contains("missing.roar"));
283 }
284 _ => panic!("expected NotFound error"),
285 }
286 }
287
288 #[tokio::test]
289 async fn read_sidecar_corrupt_bytes_returns_serde_error() {
290 let (tmp, loc) = temp_location();
291 let rel = Path::new("_coverage/table/corrupt.roar");
292
293 // Write garbage bytes to the expected path
294 let abs = match &loc.as_ref() {
295 StorageLocation::Local(root) => root.join(rel),
296 };
297 std::fs::create_dir_all(abs.parent().unwrap()).expect("create dirs");
298 std::fs::write(&abs, b"not a bitmap").expect("write corrupt");
299
300 let err = read_coverage_sidecar(&loc, rel)
301 .await
302 .expect_err("should fail to deserialize");
303
304 match err {
305 CoverageError::Serde { .. } => {}
306 _ => panic!("expected Serde error"),
307 }
308
309 drop(tmp); // ensure tempdir not optimized away
310 }
311}