1use std::{collections::BTreeMap, str::FromStr};
8
9use arrow::datatypes::SchemaRef;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use snafu::prelude::*;
13
14use crate::metadata::logical_schema::{LogicalSchema, SchemaConvertError};
15
16pub const TABLE_FORMAT_VERSION: u32 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27pub enum TableKind {
28 TimeSeries(TimeIndexSpec),
30
31 Generic,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
41pub struct TableMeta {
42 pub(crate) kind: TableKind,
44
45 pub(crate) logical_schema: Option<LogicalSchema>,
50
51 pub(crate) created_at: DateTime<Utc>,
53
54 pub(crate) format_version: u32,
58
59 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub entity_identity: Option<BTreeMap<String, String>>,
63}
64
65#[derive(Debug, Snafu)]
67pub enum TableMetaSchemaError {
68 #[snafu(display("table has no canonical logical schema yet (logical_schema is None)"))]
70 MissingCanonicalSchema,
71
72 #[snafu(transparent)]
74 Convert {
75 source: SchemaConvertError,
77 },
78}
79
80impl TableMeta {
81 pub fn kind(&self) -> &TableKind {
83 &self.kind
84 }
85
86 pub fn logical_schema(&self) -> Option<&LogicalSchema> {
88 self.logical_schema.as_ref()
89 }
90
91 pub fn created_at(&self) -> DateTime<Utc> {
93 self.created_at
94 }
95
96 pub fn format_version(&self) -> u32 {
98 self.format_version
99 }
100
101 pub fn new_time_series(index: TimeIndexSpec) -> Self {
108 TableMeta {
109 kind: TableKind::TimeSeries(index),
110 logical_schema: None,
111 created_at: Utc::now(),
112 format_version: TABLE_FORMAT_VERSION,
113 entity_identity: None,
114 }
115 }
116
117 pub fn new_time_series_with_schema(
119 index: TimeIndexSpec,
120 logical_schema: LogicalSchema,
121 ) -> Self {
122 TableMeta {
123 kind: TableKind::TimeSeries(index),
124 logical_schema: Some(logical_schema),
125 created_at: Utc::now(),
126 format_version: TABLE_FORMAT_VERSION,
127 entity_identity: None,
128 }
129 }
130
131 pub fn arrow_schema_ref(&self) -> Result<SchemaRef, TableMetaSchemaError> {
136 let logical = self
137 .logical_schema
138 .as_ref()
139 .ok_or(TableMetaSchemaError::MissingCanonicalSchema)?;
140
141 logical
142 .to_arrow_schema_ref()
143 .map_err(|source| TableMetaSchemaError::Convert { source })
144 }
145}
146
147pub type TableMetaDelta = TableMeta;
153
154#[derive(Debug, Snafu, PartialEq, Eq)]
156pub enum ParseTimeBucketError {
157 #[snafu(display("time bucket spec is empty"))]
159 Empty,
160
161 #[snafu(display("time bucket spec '{spec}' is missing a numeric value"))]
163 MissingNumber {
164 spec: String,
166 },
167
168 #[snafu(display("time bucket spec '{spec}' is missing a unit suffix (expected s|m|h|d)"))]
170 MissingUnit {
171 spec: String,
173 },
174
175 #[snafu(display("invalid bucket value in '{spec}': {source}"))]
177 InvalidNumber {
178 spec: String,
180 source: std::num::ParseIntError,
182 },
183
184 #[snafu(display("bucket value must be > 0 (got {value}) in '{spec}'"))]
186 NonPositive {
187 spec: String,
189 value: u64,
191 },
192
193 #[snafu(display("bucket value too large for u32 (got {value}) in '{spec}'"))]
195 TooLarge {
196 spec: String,
198 value: u64,
200 },
201
202 #[snafu(display("unknown time bucket unit '{unit}' in '{spec}' (expected s|m|h|d)"))]
204 UnknownUnit {
205 spec: String,
207 unit: String,
209 },
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
217pub enum TimeBucket {
218 Seconds(u32),
220 Minutes(u32),
222 Hours(u32),
224 Days(u32),
226}
227
228impl FromStr for TimeBucket {
229 type Err = ParseTimeBucketError;
230
231 fn from_str(input: &str) -> Result<Self, Self::Err> {
232 let spec = input.trim();
233 if spec.is_empty() {
234 return Err(ParseTimeBucketError::Empty);
235 }
236
237 let unit_start = spec
239 .char_indices()
240 .find(|(_, c)| c.is_ascii_alphabetic())
241 .map(|(i, _)| i);
242
243 let Some(unit_start) = unit_start else {
244 return Err(ParseTimeBucketError::MissingUnit {
245 spec: spec.to_string(),
246 });
247 };
248
249 if unit_start == 0 {
250 return Err(ParseTimeBucketError::MissingNumber {
252 spec: spec.to_string(),
253 });
254 }
255
256 let (num_str, unit_str) = spec.split_at(unit_start);
257 let num_str = num_str.trim();
258 let unit_str = unit_str.trim();
259
260 if unit_str.is_empty() {
261 return Err(ParseTimeBucketError::MissingUnit {
262 spec: spec.to_string(),
263 });
264 }
265
266 let value: u64 = num_str
267 .parse()
268 .map_err(|source| ParseTimeBucketError::InvalidNumber {
269 spec: spec.to_string(),
270 source,
271 })?;
272
273 if value == 0 {
274 return Err(ParseTimeBucketError::NonPositive {
275 spec: spec.to_string(),
276 value,
277 });
278 }
279
280 if value > u32::MAX as u64 {
281 return Err(ParseTimeBucketError::TooLarge {
282 spec: spec.to_string(),
283 value,
284 });
285 }
286
287 let v = value as u32;
288 let unit = unit_str.to_ascii_lowercase();
289
290 match unit.as_str() {
291 "s" | "sec" | "secs" | "second" | "seconds" => Ok(TimeBucket::Seconds(v)),
292 "m" | "min" | "mins" | "minute" | "minutes" => Ok(TimeBucket::Minutes(v)),
293 "h" | "hr" | "hrs" | "hour" | "hours" => Ok(TimeBucket::Hours(v)),
294 "d" | "day" | "days" => Ok(TimeBucket::Days(v)),
295 _ => Err(ParseTimeBucketError::UnknownUnit {
296 spec: spec.to_string(),
297 unit: unit_str.to_string(),
298 }),
299 }
300 }
301}
302
303impl TimeBucket {
304 pub fn parse(spec: &str) -> Result<Self, ParseTimeBucketError> {
314 spec.parse()
315 }
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
323pub struct TimeIndexSpec {
324 pub timestamp_column: String,
326
327 #[serde(default)]
333 pub entity_columns: Vec<String>,
334
335 pub bucket: TimeBucket,
337
338 #[serde(default, skip_serializing_if = "Option::is_none")]
343 pub timezone: Option<String>,
344}
345
346#[cfg(test)]
347mod tests {
348 use crate::metadata::logical_schema::{LogicalDataType, LogicalField};
349
350 use super::*;
351 use chrono::TimeZone;
352 use serde_json::Value;
353
354 fn utc_datetime(
355 year: i32,
356 month: u32,
357 day: u32,
358 hour: u32,
359 minute: u32,
360 second: u32,
361 ) -> DateTime<Utc> {
362 Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
363 .single()
364 .expect("valid UTC timestamp")
365 }
366
367 fn sample_time_index_spec() -> TimeIndexSpec {
368 TimeIndexSpec {
369 timestamp_column: "ts".to_string(),
370 entity_columns: vec!["symbol".to_string()],
371 bucket: TimeBucket::Minutes(1),
372 timezone: None,
373 }
374 }
375
376 #[test]
377 fn table_meta_json_roundtrip_with_entity_identity_none() {
378 let meta = TableMeta {
379 kind: TableKind::TimeSeries(sample_time_index_spec()),
380 logical_schema: None,
381 created_at: utc_datetime(2025, 1, 1, 0, 0, 0),
382 format_version: TABLE_FORMAT_VERSION,
383 entity_identity: None,
384 };
385
386 let json = serde_json::to_string(&meta).unwrap();
387 let value: Value = serde_json::from_str(&json).unwrap();
388 assert!(value.get("entity_identity").is_none());
389
390 let back: TableMeta = serde_json::from_str(&json).unwrap();
391 assert_eq!(back.entity_identity, None);
392 assert_eq!(back, meta);
393 }
394
395 #[test]
396 fn table_meta_json_roundtrip_with_entity_identity_some() {
397 let entity_identity = BTreeMap::from([
398 ("symbol".to_string(), "AAPL".to_string()),
399 ("venue".to_string(), "NASDAQ".to_string()),
400 ]);
401 let meta = TableMeta {
402 kind: TableKind::TimeSeries(sample_time_index_spec()),
403 logical_schema: None,
404 created_at: utc_datetime(2025, 1, 1, 0, 0, 0),
405 format_version: TABLE_FORMAT_VERSION,
406 entity_identity: Some(entity_identity.clone()),
407 };
408
409 let json = serde_json::to_string(&meta).unwrap();
410 let value: Value = serde_json::from_str(&json).unwrap();
411 assert!(value.get("entity_identity").is_some());
412
413 let back: TableMeta = serde_json::from_str(&json).unwrap();
414 assert_eq!(back.entity_identity, Some(entity_identity));
415 assert_eq!(back, meta);
416 }
417
418 #[test]
419 fn table_meta_arrow_schema_ref_requires_logical_schema() {
420 let meta = TableMeta::new_time_series(sample_time_index_spec());
421 let err = meta.arrow_schema_ref().unwrap_err();
422 assert!(matches!(err, TableMetaSchemaError::MissingCanonicalSchema));
423 }
424
425 #[test]
426 fn table_meta_arrow_schema_ref_propagates_convert_error() {
427 let logical = LogicalSchema::new(vec![LogicalField {
428 name: "legacy_ts".to_string(),
429 data_type: LogicalDataType::Int96,
430 nullable: false,
431 }])
432 .expect("valid schema structure");
433 let meta = TableMeta::new_time_series_with_schema(sample_time_index_spec(), logical);
434
435 let err = meta.arrow_schema_ref().unwrap_err();
436 assert!(
437 matches!(
438 &err,
439 TableMetaSchemaError::Convert {
440 source: SchemaConvertError::Int96Unsupported { column }
441 } if column == "legacy_ts"
442 ),
443 "unexpected error: {err:?}"
444 );
445 }
446
447 #[test]
448 fn time_bucket_parse_accepts_basic_units() {
449 let cases = [
450 ("1s", TimeBucket::Seconds(1)),
451 ("2m", TimeBucket::Minutes(2)),
452 ("3h", TimeBucket::Hours(3)),
453 ("4d", TimeBucket::Days(4)),
454 ];
455
456 for (input, expected) in cases {
457 assert_eq!(input.parse::<TimeBucket>().unwrap(), expected);
458 }
459 }
460
461 #[test]
462 fn time_bucket_parse_accepts_aliases_case_and_whitespace() {
463 let cases = [
464 ("1sec", TimeBucket::Seconds(1)),
465 ("1secs", TimeBucket::Seconds(1)),
466 ("1second", TimeBucket::Seconds(1)),
467 ("1seconds", TimeBucket::Seconds(1)),
468 ("1min", TimeBucket::Minutes(1)),
469 ("1mins", TimeBucket::Minutes(1)),
470 ("1minute", TimeBucket::Minutes(1)),
471 ("1minutes", TimeBucket::Minutes(1)),
472 ("1hr", TimeBucket::Hours(1)),
473 ("1hrs", TimeBucket::Hours(1)),
474 ("1hour", TimeBucket::Hours(1)),
475 ("1hours", TimeBucket::Hours(1)),
476 ("1day", TimeBucket::Days(1)),
477 ("1days", TimeBucket::Days(1)),
478 ("1H", TimeBucket::Hours(1)),
479 ("1MiN", TimeBucket::Minutes(1)),
480 (" 2h", TimeBucket::Hours(2)),
481 ("3d ", TimeBucket::Days(3)),
482 (" 4m ", TimeBucket::Minutes(4)),
483 ("1 h", TimeBucket::Hours(1)),
484 ];
485
486 for (input, expected) in cases {
487 assert_eq!(input.parse::<TimeBucket>().unwrap(), expected);
488 }
489 }
490
491 #[test]
492 fn time_bucket_parse_rejects_empty_or_whitespace() {
493 let cases = ["", " ", "\n\t"];
494 for input in cases {
495 let err = input.parse::<TimeBucket>().unwrap_err();
496 assert!(matches!(err, ParseTimeBucketError::Empty));
497 }
498 }
499
500 #[test]
501 fn time_bucket_parse_rejects_missing_number() {
502 let cases = ["h", " hr", "day", "abcmin"];
503 for input in cases {
504 let err = input.parse::<TimeBucket>().unwrap_err();
505 assert!(
506 matches!(err, ParseTimeBucketError::MissingNumber { .. }),
507 "expected MissingNumber for {input:?}, got {err:?}"
508 );
509 }
510 }
511
512 #[test]
513 fn time_bucket_parse_rejects_missing_unit() {
514 let cases = ["1", " 42 "];
515 for input in cases {
516 let err = input.parse::<TimeBucket>().unwrap_err();
517 assert!(
518 matches!(err, ParseTimeBucketError::MissingUnit { .. }),
519 "expected MissingUnit for {input:?}, got {err:?}"
520 );
521 }
522 }
523
524 #[test]
525 fn time_bucket_parse_rejects_invalid_number() {
526 let cases = ["1.5h", "1_000s"];
527 for input in cases {
528 let err = input.parse::<TimeBucket>().unwrap_err();
529 assert!(
530 matches!(err, ParseTimeBucketError::InvalidNumber { .. }),
531 "expected InvalidNumber for {input:?}, got {err:?}"
532 );
533 }
534 }
535
536 #[test]
537 fn time_bucket_parse_rejects_non_positive() {
538 let cases = ["0s", "0m"];
539 for input in cases {
540 let err = input.parse::<TimeBucket>().unwrap_err();
541 assert!(
542 matches!(err, ParseTimeBucketError::NonPositive { value: 0, .. }),
543 "expected NonPositive for {input:?}, got {err:?}"
544 );
545 }
546 }
547
548 #[test]
549 fn time_bucket_parse_rejects_too_large() {
550 let too_large = (u32::MAX as u64 + 1).to_string();
551 let input = format!("{too_large}h");
552 let err = input.parse::<TimeBucket>().unwrap_err();
553 assert!(
554 matches!(err, ParseTimeBucketError::TooLarge { value, .. } if value == u32::MAX as u64 + 1),
555 "expected TooLarge for {input:?}, got {err:?}"
556 );
557 }
558
559 #[test]
560 fn time_bucket_parse_rejects_unknown_units() {
561 let cases = ["1w", "1ms", "1mo", "10msec"];
562 for input in cases {
563 let err = input.parse::<TimeBucket>().unwrap_err();
564 assert!(
565 matches!(err, ParseTimeBucketError::UnknownUnit { .. }),
566 "expected UnknownUnit for {input:?}, got {err:?}"
567 );
568 }
569 }
570
571 #[test]
572 fn time_bucket_parse_matches_from_str() {
573 let via_method = TimeBucket::parse("5m").unwrap();
574 let via_trait: TimeBucket = "5m".parse().unwrap();
575 assert_eq!(via_method, via_trait);
576 }
577}