Skip to main content

timeseries_table_core/transaction_log/
segments.rs

1//! Segment IO helpers.
2//!
3//! The canonical segment metadata model lives in [`crate::metadata::segments`]
4//! and contains **no storage IO**.
5//!
6//! This module is the IO boundary: it re-exports the pure types and provides
7//! constructors/validators that touch storage (for example, verifying Parquet
8//! magic bytes via a storage backend).
9
10use chrono::{DateTime, Utc};
11use snafu::{Backtrace, prelude::*};
12
13use crate::storage::{self, StorageError, TableLocation};
14
15// Re-export pure types for compatibility (`transaction_log::segments::*`).
16pub use crate::metadata::segments::{
17    FileFormat, SegmentId, SegmentMeta, SegmentMetaError, SegmentMetaResult, segment_id_v1,
18};
19
20/// IO-layer errors when constructing/validating segments.
21#[derive(Debug, Snafu)]
22pub enum SegmentIoError {
23    /// The file is missing or not a regular file.
24    #[snafu(display("Segment file missing or not a regular file: {path}"))]
25    MissingFile {
26        /// The path to the missing or invalid file.
27        path: String,
28        /// Backtrace for debugging.
29        backtrace: Backtrace,
30    },
31
32    /// Generic I/O error while validating the segment.
33    #[snafu(display("I/O error while validating segment at {path}: {source}"))]
34    Storage {
35        /// The path to the file that caused the I/O error.
36        path: String,
37        /// Underlying storage error that caused this I/O failure.
38        #[snafu(source, backtrace)]
39        source: StorageError,
40    },
41}
42
43/// Segment error at the IO boundary: either a storage failure or a pure metadata failure.
44#[derive(Debug, Snafu)]
45pub enum SegmentError {
46    /// Storage / backend error while accessing a segment.
47    #[snafu(transparent)]
48    Io {
49        /// The underlying IO-layer error.
50        source: SegmentIoError,
51    },
52
53    /// Pure metadata/decoding/validation error.
54    #[snafu(transparent)]
55    Meta {
56        /// The underlying pure metadata error.
57        source: SegmentMetaError,
58    },
59}
60
61/// Convenience alias for results returned by IO-layer segment operations.
62#[allow(clippy::result_large_err)]
63pub type SegmentResult<T> = Result<T, SegmentError>;
64
65/// Convert a lower-level `StorageError` into the corresponding `SegmentError`.
66///
67/// - `StorageError::NotFound` is mapped to `SegmentIoError::MissingFile`.
68/// - All other storage errors are wrapped in `SegmentIoError::Storage`,
69///   preserving the original `StorageError` as the source for diagnostics.
70pub fn map_storage_error(err: StorageError) -> SegmentError {
71    let (is_missing, path) = match &err {
72        StorageError::NotFound { path, .. } => (true, path.clone()),
73        StorageError::AlreadyExists { path, .. }
74        | StorageError::OtherIo { path, .. }
75        | StorageError::AlreadyExistsNoSource { path, .. } => (false, path.clone()),
76    };
77
78    if is_missing {
79        SegmentIoError::MissingFile {
80            path,
81            backtrace: Backtrace::capture(),
82        }
83        .into()
84    } else {
85        SegmentIoError::Storage { path, source: err }.into()
86    }
87}
88
89impl SegmentMeta {
90    /// Construct a validated Parquet SegmentMeta for a file.
91    ///
92    /// - `location` describes where the table lives (e.g. local root).
93    /// - `path` is the logical path stored in the log (e.g. "data/seg1.parquet"
94    ///   or an absolute path).
95    ///
96    /// This is a v0.1 local-filesystem helper: it relies on `storage::read_head_tail_4`
97    /// which currently only supports `TableLocation::Local`.
98    pub async fn for_parquet(
99        location: &TableLocation,
100        segment_id: SegmentId,
101        path: &str,
102        ts_min: DateTime<Utc>,
103        ts_max: DateTime<Utc>,
104        row_count: u64,
105    ) -> SegmentResult<Self> {
106        // Use storage layer to get len + first/last 4 bytes.
107        let probe = storage::read_head_tail_4(location.as_ref(), std::path::Path::new(path))
108            .await
109            .map_err(map_storage_error)?;
110
111        if probe.len < 8 {
112            return Err(SegmentMetaError::TooShort {
113                path: path.to_string(),
114            }
115            .into());
116        }
117
118        const PARQUET_MAGIC: &[u8; 4] = b"PAR1";
119
120        if &probe.head != PARQUET_MAGIC || &probe.tail != PARQUET_MAGIC {
121            return Err(SegmentMetaError::InvalidMagic {
122                path: path.to_string(),
123            }
124            .into());
125        }
126
127        Ok(SegmentMeta {
128            segment_id,
129            path: path.to_string(),
130            format: FileFormat::Parquet,
131            ts_min,
132            ts_max,
133            row_count,
134            file_size: Some(probe.len),
135            coverage_path: None,
136        })
137    }
138
139    /// Format-dispatching constructor that can grow in future versions.
140    ///
141    /// v0.1: only `FileFormat::Parquet` is supported and validated via
142    /// `for_parquet`.
143    pub async fn new_validated(
144        location: &TableLocation,
145        segment_id: SegmentId,
146        path: &str,
147        format: FileFormat,
148        ts_min: DateTime<Utc>,
149        ts_max: DateTime<Utc>,
150        row_count: u64,
151    ) -> SegmentResult<Self> {
152        match format {
153            FileFormat::Parquet => {
154                SegmentMeta::for_parquet(location, segment_id, path, ts_min, ts_max, row_count)
155                    .await
156            } // other => UnsupportedFormatSnafu { format: other }.fail(),
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use chrono::{DateTime, TimeZone};
165    use tempfile::TempDir;
166
167    type TestResult = Result<(), Box<dyn std::error::Error>>;
168
169    fn utc_datetime(
170        year: i32,
171        month: u32,
172        day: u32,
173        hour: u32,
174        minute: u32,
175        second: u32,
176    ) -> DateTime<Utc> {
177        Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
178            .single()
179            .expect("valid UTC timestamp")
180    }
181
182    fn sample_segment_meta() -> SegmentMeta {
183        SegmentMeta {
184            segment_id: SegmentId("seg-001".to_string()),
185            path: "data/seg-001.parquet".to_string(),
186            format: FileFormat::Parquet,
187            ts_min: utc_datetime(2025, 1, 1, 0, 0, 0),
188            ts_max: utc_datetime(2025, 1, 1, 1, 0, 0),
189            row_count: 123,
190            file_size: None,
191            coverage_path: None,
192        }
193    }
194
195    #[test]
196    fn segment_meta_json_roundtrip_with_and_without_coverage_path() {
197        // Without coverage_path
198        let seg = sample_segment_meta();
199        let json = serde_json::to_string(&seg).unwrap();
200        let back: SegmentMeta = serde_json::from_str(&json).unwrap();
201        assert_eq!(back.coverage_path, None);
202        assert_eq!(back.file_size, None);
203
204        // With coverage_path
205        let mut seg2 = sample_segment_meta().with_coverage_path("_coverage/segments/a.roar");
206        seg2.file_size = Some(42);
207        let json2 = serde_json::to_string(&seg2).unwrap();
208        let back2: SegmentMeta = serde_json::from_str(&json2).unwrap();
209        assert_eq!(
210            back2.coverage_path.as_deref(),
211            Some("_coverage/segments/a.roar")
212        );
213        assert_eq!(back2.file_size, Some(42));
214    }
215
216    async fn write_bytes(path: &std::path::Path, bytes: &[u8]) -> Result<(), std::io::Error> {
217        if let Some(parent) = path.parent() {
218            tokio::fs::create_dir_all(parent).await?;
219        }
220        tokio::fs::write(path, bytes).await
221    }
222
223    fn into_boxed(err: SegmentError) -> Box<dyn std::error::Error> {
224        Box::new(err)
225    }
226
227    #[tokio::test]
228    async fn parquet_segment_validation_succeeds() -> TestResult {
229        let tmp = TempDir::new()?;
230        let location = TableLocation::local(tmp.path());
231        let rel_path = "data/valid.parquet";
232        let abs_path = tmp.path().join(rel_path);
233        write_bytes(&abs_path, b"PAR1PAR1").await?;
234
235        let ts_min = utc_datetime(2025, 1, 1, 0, 0, 0);
236        let ts_max = utc_datetime(2025, 1, 1, 1, 0, 0);
237
238        let meta = SegmentMeta::for_parquet(
239            &location,
240            SegmentId("seg-001".to_string()),
241            rel_path,
242            ts_min,
243            ts_max,
244            1_234,
245        )
246        .await
247        .map_err(into_boxed)?;
248
249        assert_eq!(meta.path, rel_path);
250        assert_eq!(meta.segment_id, SegmentId("seg-001".to_string()));
251        assert_eq!(meta.format, FileFormat::Parquet);
252        assert_eq!(meta.ts_min, ts_min);
253        assert_eq!(meta.ts_max, ts_max);
254        assert_eq!(meta.row_count, 1_234);
255        assert_eq!(meta.file_size, Some(8));
256
257        Ok(())
258    }
259
260    #[tokio::test]
261    async fn parquet_segment_missing_file_returns_error() -> TestResult {
262        let tmp = TempDir::new()?;
263        let location = TableLocation::local(tmp.path());
264
265        let result = SegmentMeta::for_parquet(
266            &location,
267            SegmentId("missing".to_string()),
268            "data/missing.parquet",
269            utc_datetime(2025, 1, 1, 0, 0, 0),
270            utc_datetime(2025, 1, 1, 1, 0, 0),
271            42,
272        )
273        .await;
274
275        assert!(matches!(
276            result,
277            Err(SegmentError::Io {
278                source: SegmentIoError::MissingFile { .. }
279            })
280        ));
281        Ok(())
282    }
283
284    #[tokio::test]
285    async fn parquet_segment_too_short_returns_error() -> TestResult {
286        let tmp = TempDir::new()?;
287        let location = TableLocation::local(tmp.path());
288        let rel_path = "data/short.parquet";
289        let abs_path = tmp.path().join(rel_path);
290        write_bytes(&abs_path, b"PAR1").await?;
291
292        let result = SegmentMeta::for_parquet(
293            &location,
294            SegmentId("short".to_string()),
295            rel_path,
296            utc_datetime(2025, 1, 1, 0, 0, 0),
297            utc_datetime(2025, 1, 1, 1, 0, 0),
298            10,
299        )
300        .await;
301
302        assert!(matches!(
303            result,
304            Err(SegmentError::Meta {
305                source: SegmentMetaError::TooShort { .. }
306            })
307        ));
308        Ok(())
309    }
310
311    #[tokio::test]
312    async fn parquet_segment_invalid_magic_returns_error() -> TestResult {
313        let tmp = TempDir::new()?;
314        let location = TableLocation::local(tmp.path());
315        let rel_path = "data/invalid_magic.parquet";
316        let abs_path = tmp.path().join(rel_path);
317        write_bytes(&abs_path, b"PAR1NOPE").await?;
318
319        let result = SegmentMeta::for_parquet(
320            &location,
321            SegmentId("bad".to_string()),
322            rel_path,
323            utc_datetime(2025, 1, 1, 0, 0, 0),
324            utc_datetime(2025, 1, 1, 1, 0, 0),
325            10,
326        )
327        .await;
328
329        assert!(matches!(
330            result,
331            Err(SegmentError::Meta {
332                source: SegmentMetaError::InvalidMagic { .. }
333            })
334        ));
335        Ok(())
336    }
337
338    #[tokio::test]
339    async fn new_validated_delegates_to_parquet_constructor() -> TestResult {
340        let tmp = TempDir::new()?;
341        let location = TableLocation::local(tmp.path());
342        let rel_path = "data/delegate.parquet";
343        let abs_path = tmp.path().join(rel_path);
344        write_bytes(&abs_path, b"PAR1PAR1").await?;
345
346        let ts_min = utc_datetime(2025, 1, 1, 0, 0, 0);
347        let ts_max = utc_datetime(2025, 1, 1, 1, 0, 0);
348
349        let meta = SegmentMeta::new_validated(
350            &location,
351            SegmentId("delegate".to_string()),
352            rel_path,
353            FileFormat::Parquet,
354            ts_min,
355            ts_max,
356            5,
357        )
358        .await
359        .map_err(into_boxed)?;
360
361        assert_eq!(meta.path, rel_path);
362        assert_eq!(meta.format, FileFormat::Parquet);
363        assert_eq!(meta.row_count, 5);
364        Ok(())
365    }
366}