Skip to main content

timeseries_table_core/metadata/
segments.rs

1//! Segment identifiers, formats, and per-file metadata recorded in table metadata.
2//!
3//! This module contains **pure** data types + non-IO validation/decoding errors.
4//! Any functions that touch storage backends (filesystem, object store, etc.)
5//! must live outside `metadata/` (for example under `transaction_log` or
6//! format-specific helpers).
7
8use std::fmt;
9
10use arrow::error::ArrowError;
11use bytes::Bytes;
12use chrono::{DateTime, Utc};
13use parquet::errors::ParquetError;
14use serde::{Deserialize, Serialize};
15use snafu::{Backtrace, prelude::*};
16
17use crate::metadata::{logical_schema::LogicalSchemaError, time_column::TimeColumnError};
18
19/// Identifier for a physical segment (e.g. a Parquet file or group).
20///
21/// This is a logical ID used by the metadata; the actual file path is stored
22/// separately in [`SegmentMeta`]. Using a newtype makes it harder to mix
23/// up segment IDs with other stringly-typed fields.
24#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
25#[serde(transparent)]
26pub struct SegmentId(pub String);
27
28impl fmt::Display for SegmentId {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        self.0.fmt(f)
31    }
32}
33
34/// Supported on-disk file formats for segments.
35///
36/// In v0.1, only `Parquet` is implemented, but the enum keeps the metadata model
37/// open to other formats in future versions.
38///
39/// JSON layout example: `"format": "parquet"`
40#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "lowercase")]
42pub enum FileFormat {
43    /// Apache Parquet columnar format.
44    #[default]
45    Parquet,
46    // Future:
47    // Orc,
48    // Avro,
49    // Csv,
50}
51
52/// Metadata about a single physical segment.
53///
54/// In v0.1, a "segment" corresponds to a single data file on disk.
55#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
56pub struct SegmentMeta {
57    /// Logical identifier for this segment.
58    pub segment_id: SegmentId,
59
60    /// File path relative to the table root (for example, `"data/nvda_1h_0001.parquet"`).
61    pub path: String,
62
63    /// File format for this segment.
64    pub format: FileFormat,
65
66    /// Minimum timestamp contained in this segment (inclusive), in RFC3339 UTC.
67    pub ts_min: DateTime<Utc>,
68
69    /// Maximum timestamp contained in this segment (inclusive), in RFC3339 UTC.
70    pub ts_max: DateTime<Utc>,
71
72    /// Number of rows in this segment.
73    pub row_count: u64,
74
75    /// Optional file size in bytes at the time metadata was captured.
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    pub file_size: Option<u64>,
78
79    /// Coverage sidecar pointer.
80    #[serde(default, skip_serializing_if = "Option::is_none")]
81    pub coverage_path: Option<String>,
82}
83
84impl SegmentMeta {
85    /// Set the coverage sidecar path for this segment metadata.
86    pub fn with_coverage_path(mut self, path: impl Into<String>) -> Self {
87        self.coverage_path = Some(path.into());
88        self
89    }
90}
91
92/// Errors that can occur while validating or decoding segment metadata.
93///
94/// This enum intentionally contains **no storage backend errors**. IO-related
95/// errors should be wrapped at the IO boundary (for example, in
96/// `transaction_log::segments::SegmentError`).
97#[derive(Debug, Snafu)]
98pub enum SegmentMetaError {
99    /// File format is not supported for v0.1.
100    #[snafu(display("Unsupported file format: {format:?}"))]
101    UnsupportedFormat {
102        /// The offending file format.
103        format: FileFormat,
104    },
105
106    /// The file is too short to be a valid Parquet file.
107    #[snafu(display("Segment file too short to be valid Parquet: {path}"))]
108    TooShort {
109        /// The path to the file that was too short.
110        path: String,
111    },
112
113    /// Magic bytes at the start / end of file don't match the Parquet spec.
114    #[snafu(display("Invalid Parquet magic bytes in segment file: {path}"))]
115    InvalidMagic {
116        /// The path to the file with invalid magic bytes.
117        path: String,
118    },
119
120    /// Parquet reader / metadata failure.
121    #[snafu(display("Error reading Parquet metadata for segment at {path}: {source}"))]
122    ParquetRead {
123        /// The path to the file that caused the Parquet read failure.
124        path: String,
125        /// Underlying parquet error that caused this failure.
126        source: ParquetError,
127        /// Diagnostic backtrace for this error.
128        backtrace: Backtrace,
129    },
130
131    /// Arrow decode failure while reading Parquet data.
132    #[snafu(display("Arrow read error for segment at {path}: {source}"))]
133    ArrowRead {
134        /// The path to the file that caused the Arrow read failure.
135        path: String,
136        /// Underlying Arrow error that caused this failure.
137        source: ArrowError,
138        /// Diagnostic backtrace for this error.
139        backtrace: Backtrace,
140    },
141
142    /// Time column validation or metadata error.
143    #[snafu(display("Time column error in segment at {path}: {source}"))]
144    TimeColumn {
145        /// The path to the segment file with a time column error.
146        path: String,
147        /// The underlying time column error.
148        source: TimeColumnError,
149    },
150
151    /// Statistics exist but are not well-shaped (wrong length / unexpected type).
152    #[snafu(display(
153        "Parquet statistics shape invalid for {column} in segment at {path}: {detail}"
154    ))]
155    ParquetStatsShape {
156        /// The path to the file with malformed Parquet statistics.
157        path: String,
158        /// The column whose statistics are malformed.
159        column: String,
160        /// Details about how the statistics are malformed.
161        detail: String,
162    },
163
164    /// No usable statistics for the time column; v0.1 may fall back to a scan.
165    #[snafu(display("Parquet statistics missing for {column} in segment at {path}"))]
166    ParquetStatsMissing {
167        /// The path to the file missing statistics for the column.
168        path: String,
169        /// The column missing statistics.
170        column: String,
171    },
172
173    /// Failed to derive a valid LogicalSchema from the Parquet file.
174    #[snafu(display("Invalid logical schema derived from Parquet at {path}: {source}"))]
175    LogicalSchemaInvalid {
176        /// The path to the file without a valid LogicalSchema.
177        path: String,
178        /// Underlying logical schema error that triggered this failure.
179        #[snafu(source)]
180        source: LogicalSchemaError,
181    },
182}
183
184/// Derive a deterministic segment id for an append entry.
185///
186/// This is content-addressable: it hashes both the relative path and the bytes
187/// so retries with the same input stay stable while same bytes at different
188/// paths diverge. The returned id uses the `seg-` prefix followed by 32 hex
189/// chars of the BLAKE3 digest, keeping ids bounded and safe for idempotent
190/// appends.
191pub fn segment_id_v1(relative_path: &str, data: &Bytes) -> SegmentId {
192    let mut h = blake3::Hasher::new();
193    h.update(b"segment-id-v1");
194    h.update(b"\0");
195    h.update(relative_path.as_bytes());
196    h.update(b"\0");
197    h.update(data.as_ref());
198    let hex = h.finalize().to_hex();
199    SegmentId(format!("seg-{}", &hex[..32]))
200}
201
202/// Result type for pure (non-IO) segment metadata operations.
203#[allow(clippy::result_large_err)]
204pub type SegmentMetaResult<T> = Result<T, SegmentMetaError>;
205
206/// Deterministic ordering for segments by time.
207///
208/// Ordering is by `ts_min`, then `ts_max`, and finally `segment_id` as a stable
209/// tie-breaker.
210pub(crate) fn cmp_segment_meta_by_time(a: &SegmentMeta, b: &SegmentMeta) -> std::cmp::Ordering {
211    a.ts_min
212        .cmp(&b.ts_min)
213        .then_with(|| a.ts_max.cmp(&b.ts_max))
214        .then_with(|| a.segment_id.0.cmp(&b.segment_id.0))
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use chrono::{TimeZone, Utc};
221
222    fn seg(id: &str, ts_min: i64, ts_max: i64) -> SegmentMeta {
223        SegmentMeta {
224            segment_id: SegmentId(id.to_string()),
225            path: format!("data/{id}.parquet"),
226            format: FileFormat::Parquet,
227            ts_min: Utc.timestamp_opt(ts_min, 0).single().unwrap(),
228            ts_max: Utc.timestamp_opt(ts_max, 0).single().unwrap(),
229            row_count: 1,
230            file_size: None,
231            coverage_path: None,
232        }
233    }
234
235    #[test]
236    fn ordering_is_deterministic_with_tie_breakers() {
237        let mut v = vec![
238            seg("c", 10, 20),
239            seg("b", 10, 20),
240            seg("a", 10, 30),
241            seg("d", 5, 7),
242        ];
243
244        v.sort_unstable_by(cmp_segment_meta_by_time);
245
246        let ids: Vec<String> = v.into_iter().map(|s| s.segment_id.0).collect();
247        assert_eq!(ids, vec!["d", "b", "c", "a"]);
248    }
249
250    #[test]
251    fn ordering_is_equal_for_identical_segments() {
252        let a = seg("same", 10, 20);
253        let b = seg("same", 10, 20);
254        assert_eq!(cmp_segment_meta_by_time(&a, &b), std::cmp::Ordering::Equal);
255        assert_eq!(cmp_segment_meta_by_time(&b, &a), std::cmp::Ordering::Equal);
256    }
257
258    #[test]
259    fn ordering_primary_key_ts_min_dominates() {
260        let mut v = vec![seg("z", 20, 30), seg("a", 10, 50), seg("m", 15, 10)];
261
262        v.sort_unstable_by(cmp_segment_meta_by_time);
263
264        let ids: Vec<String> = v.into_iter().map(|s| s.segment_id.0).collect();
265        assert_eq!(ids, vec!["a", "m", "z"]);
266    }
267
268    #[test]
269    fn ordering_uses_segment_id_as_final_tie_breaker() {
270        let mut v = vec![seg("b", 10, 20), seg("a", 10, 20), seg("c", 10, 20)];
271
272        v.sort_unstable_by(cmp_segment_meta_by_time);
273
274        let ids: Vec<String> = v.into_iter().map(|s| s.segment_id.0).collect();
275        assert_eq!(ids, vec!["a", "b", "c"]);
276    }
277}