timeseries_table_core/metadata/
segments.rs1use std::fmt;
9
10use arrow::error::ArrowError;
11use bytes::Bytes;
12use chrono::{DateTime, Utc};
13use parquet::errors::ParquetError;
14use serde::{Deserialize, Serialize};
15use snafu::{Backtrace, prelude::*};
16
17use crate::metadata::{logical_schema::LogicalSchemaError, time_column::TimeColumnError};
18
19#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
25#[serde(transparent)]
26pub struct SegmentId(pub String);
27
28impl fmt::Display for SegmentId {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 self.0.fmt(f)
31 }
32}
33
34#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "lowercase")]
42pub enum FileFormat {
43 #[default]
45 Parquet,
46 }
51
52#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
56pub struct SegmentMeta {
57 pub segment_id: SegmentId,
59
60 pub path: String,
62
63 pub format: FileFormat,
65
66 pub ts_min: DateTime<Utc>,
68
69 pub ts_max: DateTime<Utc>,
71
72 pub row_count: u64,
74
75 #[serde(default, skip_serializing_if = "Option::is_none")]
77 pub file_size: Option<u64>,
78
79 #[serde(default, skip_serializing_if = "Option::is_none")]
81 pub coverage_path: Option<String>,
82}
83
84impl SegmentMeta {
85 pub fn with_coverage_path(mut self, path: impl Into<String>) -> Self {
87 self.coverage_path = Some(path.into());
88 self
89 }
90}
91
92#[derive(Debug, Snafu)]
98pub enum SegmentMetaError {
99 #[snafu(display("Unsupported file format: {format:?}"))]
101 UnsupportedFormat {
102 format: FileFormat,
104 },
105
106 #[snafu(display("Segment file too short to be valid Parquet: {path}"))]
108 TooShort {
109 path: String,
111 },
112
113 #[snafu(display("Invalid Parquet magic bytes in segment file: {path}"))]
115 InvalidMagic {
116 path: String,
118 },
119
120 #[snafu(display("Error reading Parquet metadata for segment at {path}: {source}"))]
122 ParquetRead {
123 path: String,
125 source: ParquetError,
127 backtrace: Backtrace,
129 },
130
131 #[snafu(display("Arrow read error for segment at {path}: {source}"))]
133 ArrowRead {
134 path: String,
136 source: ArrowError,
138 backtrace: Backtrace,
140 },
141
142 #[snafu(display("Time column error in segment at {path}: {source}"))]
144 TimeColumn {
145 path: String,
147 source: TimeColumnError,
149 },
150
151 #[snafu(display(
153 "Parquet statistics shape invalid for {column} in segment at {path}: {detail}"
154 ))]
155 ParquetStatsShape {
156 path: String,
158 column: String,
160 detail: String,
162 },
163
164 #[snafu(display("Parquet statistics missing for {column} in segment at {path}"))]
166 ParquetStatsMissing {
167 path: String,
169 column: String,
171 },
172
173 #[snafu(display("Invalid logical schema derived from Parquet at {path}: {source}"))]
175 LogicalSchemaInvalid {
176 path: String,
178 #[snafu(source)]
180 source: LogicalSchemaError,
181 },
182}
183
184pub fn segment_id_v1(relative_path: &str, data: &Bytes) -> SegmentId {
192 let mut h = blake3::Hasher::new();
193 h.update(b"segment-id-v1");
194 h.update(b"\0");
195 h.update(relative_path.as_bytes());
196 h.update(b"\0");
197 h.update(data.as_ref());
198 let hex = h.finalize().to_hex();
199 SegmentId(format!("seg-{}", &hex[..32]))
200}
201
202#[allow(clippy::result_large_err)]
204pub type SegmentMetaResult<T> = Result<T, SegmentMetaError>;
205
206pub(crate) fn cmp_segment_meta_by_time(a: &SegmentMeta, b: &SegmentMeta) -> std::cmp::Ordering {
211 a.ts_min
212 .cmp(&b.ts_min)
213 .then_with(|| a.ts_max.cmp(&b.ts_max))
214 .then_with(|| a.segment_id.0.cmp(&b.segment_id.0))
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use chrono::{TimeZone, Utc};
221
222 fn seg(id: &str, ts_min: i64, ts_max: i64) -> SegmentMeta {
223 SegmentMeta {
224 segment_id: SegmentId(id.to_string()),
225 path: format!("data/{id}.parquet"),
226 format: FileFormat::Parquet,
227 ts_min: Utc.timestamp_opt(ts_min, 0).single().unwrap(),
228 ts_max: Utc.timestamp_opt(ts_max, 0).single().unwrap(),
229 row_count: 1,
230 file_size: None,
231 coverage_path: None,
232 }
233 }
234
235 #[test]
236 fn ordering_is_deterministic_with_tie_breakers() {
237 let mut v = vec![
238 seg("c", 10, 20),
239 seg("b", 10, 20),
240 seg("a", 10, 30),
241 seg("d", 5, 7),
242 ];
243
244 v.sort_unstable_by(cmp_segment_meta_by_time);
245
246 let ids: Vec<String> = v.into_iter().map(|s| s.segment_id.0).collect();
247 assert_eq!(ids, vec!["d", "b", "c", "a"]);
248 }
249
250 #[test]
251 fn ordering_is_equal_for_identical_segments() {
252 let a = seg("same", 10, 20);
253 let b = seg("same", 10, 20);
254 assert_eq!(cmp_segment_meta_by_time(&a, &b), std::cmp::Ordering::Equal);
255 assert_eq!(cmp_segment_meta_by_time(&b, &a), std::cmp::Ordering::Equal);
256 }
257
258 #[test]
259 fn ordering_primary_key_ts_min_dominates() {
260 let mut v = vec![seg("z", 20, 30), seg("a", 10, 50), seg("m", 15, 10)];
261
262 v.sort_unstable_by(cmp_segment_meta_by_time);
263
264 let ids: Vec<String> = v.into_iter().map(|s| s.segment_id.0).collect();
265 assert_eq!(ids, vec!["a", "m", "z"]);
266 }
267
268 #[test]
269 fn ordering_uses_segment_id_as_final_tie_breaker() {
270 let mut v = vec![seg("b", 10, 20), seg("a", 10, 20), seg("c", 10, 20)];
271
272 v.sort_unstable_by(cmp_segment_meta_by_time);
273
274 let ids: Vec<String> = v.into_iter().map(|s| s.segment_id.0).collect();
275 assert_eq!(ids, vec!["a", "b", "c"]);
276 }
277}