1use chrono::{DateTime, Utc};
11use snafu::{Backtrace, prelude::*};
12
13use crate::storage::{self, StorageError, TableLocation};
14
15pub use crate::metadata::segments::{
17 FileFormat, SegmentId, SegmentMeta, SegmentMetaError, SegmentMetaResult, segment_id_v1,
18};
19
20#[derive(Debug, Snafu)]
22pub enum SegmentIoError {
23 #[snafu(display("Segment file missing or not a regular file: {path}"))]
25 MissingFile {
26 path: String,
28 backtrace: Backtrace,
30 },
31
32 #[snafu(display("I/O error while validating segment at {path}: {source}"))]
34 Storage {
35 path: String,
37 #[snafu(source, backtrace)]
39 source: StorageError,
40 },
41}
42
43#[derive(Debug, Snafu)]
45pub enum SegmentError {
46 #[snafu(transparent)]
48 Io {
49 source: SegmentIoError,
51 },
52
53 #[snafu(transparent)]
55 Meta {
56 source: SegmentMetaError,
58 },
59}
60
61#[allow(clippy::result_large_err)]
63pub type SegmentResult<T> = Result<T, SegmentError>;
64
65pub fn map_storage_error(err: StorageError) -> SegmentError {
71 let (is_missing, path) = match &err {
72 StorageError::NotFound { path, .. } => (true, path.clone()),
73 StorageError::AlreadyExists { path, .. }
74 | StorageError::OtherIo { path, .. }
75 | StorageError::AlreadyExistsNoSource { path, .. } => (false, path.clone()),
76 };
77
78 if is_missing {
79 SegmentIoError::MissingFile {
80 path,
81 backtrace: Backtrace::capture(),
82 }
83 .into()
84 } else {
85 SegmentIoError::Storage { path, source: err }.into()
86 }
87}
88
89impl SegmentMeta {
90 pub async fn for_parquet(
99 location: &TableLocation,
100 segment_id: SegmentId,
101 path: &str,
102 ts_min: DateTime<Utc>,
103 ts_max: DateTime<Utc>,
104 row_count: u64,
105 ) -> SegmentResult<Self> {
106 let probe = storage::read_head_tail_4(location.as_ref(), std::path::Path::new(path))
108 .await
109 .map_err(map_storage_error)?;
110
111 if probe.len < 8 {
112 return Err(SegmentMetaError::TooShort {
113 path: path.to_string(),
114 }
115 .into());
116 }
117
118 const PARQUET_MAGIC: &[u8; 4] = b"PAR1";
119
120 if &probe.head != PARQUET_MAGIC || &probe.tail != PARQUET_MAGIC {
121 return Err(SegmentMetaError::InvalidMagic {
122 path: path.to_string(),
123 }
124 .into());
125 }
126
127 Ok(SegmentMeta {
128 segment_id,
129 path: path.to_string(),
130 format: FileFormat::Parquet,
131 ts_min,
132 ts_max,
133 row_count,
134 file_size: Some(probe.len),
135 coverage_path: None,
136 })
137 }
138
139 pub async fn new_validated(
144 location: &TableLocation,
145 segment_id: SegmentId,
146 path: &str,
147 format: FileFormat,
148 ts_min: DateTime<Utc>,
149 ts_max: DateTime<Utc>,
150 row_count: u64,
151 ) -> SegmentResult<Self> {
152 match format {
153 FileFormat::Parquet => {
154 SegmentMeta::for_parquet(location, segment_id, path, ts_min, ts_max, row_count)
155 .await
156 } }
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use chrono::{DateTime, TimeZone};
165 use tempfile::TempDir;
166
167 type TestResult = Result<(), Box<dyn std::error::Error>>;
168
169 fn utc_datetime(
170 year: i32,
171 month: u32,
172 day: u32,
173 hour: u32,
174 minute: u32,
175 second: u32,
176 ) -> DateTime<Utc> {
177 Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
178 .single()
179 .expect("valid UTC timestamp")
180 }
181
182 fn sample_segment_meta() -> SegmentMeta {
183 SegmentMeta {
184 segment_id: SegmentId("seg-001".to_string()),
185 path: "data/seg-001.parquet".to_string(),
186 format: FileFormat::Parquet,
187 ts_min: utc_datetime(2025, 1, 1, 0, 0, 0),
188 ts_max: utc_datetime(2025, 1, 1, 1, 0, 0),
189 row_count: 123,
190 file_size: None,
191 coverage_path: None,
192 }
193 }
194
195 #[test]
196 fn segment_meta_json_roundtrip_with_and_without_coverage_path() {
197 let seg = sample_segment_meta();
199 let json = serde_json::to_string(&seg).unwrap();
200 let back: SegmentMeta = serde_json::from_str(&json).unwrap();
201 assert_eq!(back.coverage_path, None);
202 assert_eq!(back.file_size, None);
203
204 let mut seg2 = sample_segment_meta().with_coverage_path("_coverage/segments/a.roar");
206 seg2.file_size = Some(42);
207 let json2 = serde_json::to_string(&seg2).unwrap();
208 let back2: SegmentMeta = serde_json::from_str(&json2).unwrap();
209 assert_eq!(
210 back2.coverage_path.as_deref(),
211 Some("_coverage/segments/a.roar")
212 );
213 assert_eq!(back2.file_size, Some(42));
214 }
215
216 async fn write_bytes(path: &std::path::Path, bytes: &[u8]) -> Result<(), std::io::Error> {
217 if let Some(parent) = path.parent() {
218 tokio::fs::create_dir_all(parent).await?;
219 }
220 tokio::fs::write(path, bytes).await
221 }
222
223 fn into_boxed(err: SegmentError) -> Box<dyn std::error::Error> {
224 Box::new(err)
225 }
226
227 #[tokio::test]
228 async fn parquet_segment_validation_succeeds() -> TestResult {
229 let tmp = TempDir::new()?;
230 let location = TableLocation::local(tmp.path());
231 let rel_path = "data/valid.parquet";
232 let abs_path = tmp.path().join(rel_path);
233 write_bytes(&abs_path, b"PAR1PAR1").await?;
234
235 let ts_min = utc_datetime(2025, 1, 1, 0, 0, 0);
236 let ts_max = utc_datetime(2025, 1, 1, 1, 0, 0);
237
238 let meta = SegmentMeta::for_parquet(
239 &location,
240 SegmentId("seg-001".to_string()),
241 rel_path,
242 ts_min,
243 ts_max,
244 1_234,
245 )
246 .await
247 .map_err(into_boxed)?;
248
249 assert_eq!(meta.path, rel_path);
250 assert_eq!(meta.segment_id, SegmentId("seg-001".to_string()));
251 assert_eq!(meta.format, FileFormat::Parquet);
252 assert_eq!(meta.ts_min, ts_min);
253 assert_eq!(meta.ts_max, ts_max);
254 assert_eq!(meta.row_count, 1_234);
255 assert_eq!(meta.file_size, Some(8));
256
257 Ok(())
258 }
259
260 #[tokio::test]
261 async fn parquet_segment_missing_file_returns_error() -> TestResult {
262 let tmp = TempDir::new()?;
263 let location = TableLocation::local(tmp.path());
264
265 let result = SegmentMeta::for_parquet(
266 &location,
267 SegmentId("missing".to_string()),
268 "data/missing.parquet",
269 utc_datetime(2025, 1, 1, 0, 0, 0),
270 utc_datetime(2025, 1, 1, 1, 0, 0),
271 42,
272 )
273 .await;
274
275 assert!(matches!(
276 result,
277 Err(SegmentError::Io {
278 source: SegmentIoError::MissingFile { .. }
279 })
280 ));
281 Ok(())
282 }
283
284 #[tokio::test]
285 async fn parquet_segment_too_short_returns_error() -> TestResult {
286 let tmp = TempDir::new()?;
287 let location = TableLocation::local(tmp.path());
288 let rel_path = "data/short.parquet";
289 let abs_path = tmp.path().join(rel_path);
290 write_bytes(&abs_path, b"PAR1").await?;
291
292 let result = SegmentMeta::for_parquet(
293 &location,
294 SegmentId("short".to_string()),
295 rel_path,
296 utc_datetime(2025, 1, 1, 0, 0, 0),
297 utc_datetime(2025, 1, 1, 1, 0, 0),
298 10,
299 )
300 .await;
301
302 assert!(matches!(
303 result,
304 Err(SegmentError::Meta {
305 source: SegmentMetaError::TooShort { .. }
306 })
307 ));
308 Ok(())
309 }
310
311 #[tokio::test]
312 async fn parquet_segment_invalid_magic_returns_error() -> TestResult {
313 let tmp = TempDir::new()?;
314 let location = TableLocation::local(tmp.path());
315 let rel_path = "data/invalid_magic.parquet";
316 let abs_path = tmp.path().join(rel_path);
317 write_bytes(&abs_path, b"PAR1NOPE").await?;
318
319 let result = SegmentMeta::for_parquet(
320 &location,
321 SegmentId("bad".to_string()),
322 rel_path,
323 utc_datetime(2025, 1, 1, 0, 0, 0),
324 utc_datetime(2025, 1, 1, 1, 0, 0),
325 10,
326 )
327 .await;
328
329 assert!(matches!(
330 result,
331 Err(SegmentError::Meta {
332 source: SegmentMetaError::InvalidMagic { .. }
333 })
334 ));
335 Ok(())
336 }
337
338 #[tokio::test]
339 async fn new_validated_delegates_to_parquet_constructor() -> TestResult {
340 let tmp = TempDir::new()?;
341 let location = TableLocation::local(tmp.path());
342 let rel_path = "data/delegate.parquet";
343 let abs_path = tmp.path().join(rel_path);
344 write_bytes(&abs_path, b"PAR1PAR1").await?;
345
346 let ts_min = utc_datetime(2025, 1, 1, 0, 0, 0);
347 let ts_max = utc_datetime(2025, 1, 1, 1, 0, 0);
348
349 let meta = SegmentMeta::new_validated(
350 &location,
351 SegmentId("delegate".to_string()),
352 rel_path,
353 FileFormat::Parquet,
354 ts_min,
355 ts_max,
356 5,
357 )
358 .await
359 .map_err(into_boxed)?;
360
361 assert_eq!(meta.path, rel_path);
362 assert_eq!(meta.format, FileFormat::Parquet);
363 assert_eq!(meta.row_count, 5);
364 Ok(())
365 }
366}