Skip to main content

timeseries_table_core/metadata/
table_metadata.rs

1//! Table-level metadata structures recorded in the log.
2//!
3//! This module models the schema and configuration captured by
4//! `LogAction::UpdateTableMeta`, including table kind, logical schema, and the
5//! time index specification. Future evolutions can extend these types without
6//! touching the storage/reader code paths.
7use std::{collections::BTreeMap, str::FromStr};
8
9use arrow::datatypes::SchemaRef;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use snafu::prelude::*;
13
14use crate::metadata::logical_schema::{LogicalSchema, SchemaConvertError};
15
16/// Current table metadata / log format version.
17///
18/// Bumped only when we make a breaking change to the on-disk JSON format.
19pub const TABLE_FORMAT_VERSION: u32 = 1;
20
21/// The high-level "kind" of table.
22///
23/// v0.1 supports only `TimeSeries`, but a `Generic` kind is reserved so that
24/// the log format can represent non-timeseries tables later without breaking
25/// existing JSON.
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27pub enum TableKind {
28    /// A time-series table with an explicit time index specification.
29    TimeSeries(TimeIndexSpec),
30
31    /// Placeholder for future basic tables that do not have a time index.
32    /// Not used in v0.1.
33    Generic,
34}
35
36/// High-level table metadata stored in the log.
37///
38/// This describes the table kind, a logical schema (optional in v0.1), and
39/// basic bookkeeping fields.
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
41pub struct TableMeta {
42    /// Table kind: TimeSeries or Generic.
43    pub(crate) kind: TableKind,
44
45    /// Optional logical schema description.
46    ///
47    /// v0.1 can treat this as informational; enforcement is handled by
48    /// higher layers.
49    pub(crate) logical_schema: Option<LogicalSchema>,
50
51    /// Creation timestamp of the table, stored as RFC3339 UTC.
52    pub(crate) created_at: DateTime<Utc>,
53
54    /// Format version for future evolution of the log/table format.
55    ///
56    /// v0.1 can hard-code this to 1.
57    pub(crate) format_version: u32,
58
59    /// v0.1: If TimeIndexSpec.entity_columns is non-empty, we pin a single entity identity
60    /// per table (map keyed by column name).
61    #[serde(default, skip_serializing_if = "Option::is_none")]
62    pub entity_identity: Option<BTreeMap<String, String>>,
63}
64
65/// Errors encountered while retrieving or converting a table's logical schema.
66#[derive(Debug, Snafu)]
67pub enum TableMetaSchemaError {
68    /// The table metadata has not yet recorded a canonical logical schema.
69    #[snafu(display("table has no canonical logical schema yet (logical_schema is None)"))]
70    MissingCanonicalSchema,
71
72    /// Failed to convert the logical schema to Arrow types.
73    #[snafu(transparent)]
74    Convert {
75        /// Underlying conversion error.
76        source: SchemaConvertError,
77    },
78}
79
80impl TableMeta {
81    /// Returns the table kind (e.g. time series or generic).
82    pub fn kind(&self) -> &TableKind {
83        &self.kind
84    }
85
86    /// Returns the optional logical schema if it has been set.
87    pub fn logical_schema(&self) -> Option<&LogicalSchema> {
88        self.logical_schema.as_ref()
89    }
90
91    /// Returns the UTC timestamp when the table was created.
92    pub fn created_at(&self) -> DateTime<Utc> {
93        self.created_at
94    }
95
96    /// Returns the on-disk table metadata format version.
97    pub fn format_version(&self) -> u32 {
98        self.format_version
99    }
100
101    /// Convenience constructor for a time-series table.
102    ///
103    /// - Fills `created_at` with `Utc::now()`.
104    /// - Fills `format_version` with `TABLE_FORMAT_VERSION`.
105    /// - Leaves `logical_schema` as `None`; it will be adopted from the
106    ///   first appended segment in v0.1.
107    pub fn new_time_series(index: TimeIndexSpec) -> Self {
108        TableMeta {
109            kind: TableKind::TimeSeries(index),
110            logical_schema: None,
111            created_at: Utc::now(),
112            format_version: TABLE_FORMAT_VERSION,
113            entity_identity: None,
114        }
115    }
116
117    /// Variant that lets you explicitly pass a logical schema up front.
118    pub fn new_time_series_with_schema(
119        index: TimeIndexSpec,
120        logical_schema: LogicalSchema,
121    ) -> Self {
122        TableMeta {
123            kind: TableKind::TimeSeries(index),
124            logical_schema: Some(logical_schema),
125            created_at: Utc::now(),
126            format_version: TABLE_FORMAT_VERSION,
127            entity_identity: None,
128        }
129    }
130
131    /// Convert the table's logical schema to a shared Arrow [`SchemaRef`].
132    ///
133    /// Returns [`TableMetaSchemaError::MissingCanonicalSchema`] if the schema has
134    /// not yet been established for the table.
135    pub fn arrow_schema_ref(&self) -> Result<SchemaRef, TableMetaSchemaError> {
136        let logical = self
137            .logical_schema
138            .as_ref()
139            .ok_or(TableMetaSchemaError::MissingCanonicalSchema)?;
140
141        logical
142            .to_arrow_schema_ref()
143            .map_err(|source| TableMetaSchemaError::Convert { source })
144    }
145}
146
147/// For v0.1, a `TableMetaDelta` is just a full replacement of [`TableMeta`].
148///
149/// This alias keeps the wire format simple (the JSON is the same as `TableMeta`)
150/// while leaving room to evolve to more granular metadata updates in future
151/// versions (for example, partial updates or additive fields).
152pub type TableMetaDelta = TableMeta;
153
154/// Errors produced when parsing a human-friendly time bucket spec (e.g. `1h`).
155#[derive(Debug, Snafu, PartialEq, Eq)]
156pub enum ParseTimeBucketError {
157    /// The spec string was empty or only whitespace.
158    #[snafu(display("time bucket spec is empty"))]
159    Empty,
160
161    /// The spec did not include a numeric value.
162    #[snafu(display("time bucket spec '{spec}' is missing a numeric value"))]
163    MissingNumber {
164        /// The original spec string.
165        spec: String,
166    },
167
168    /// The spec did not include a required unit suffix.
169    #[snafu(display("time bucket spec '{spec}' is missing a unit suffix (expected s|m|h|d)"))]
170    MissingUnit {
171        /// The original spec string.
172        spec: String,
173    },
174
175    /// The numeric portion of the spec failed to parse.
176    #[snafu(display("invalid bucket value in '{spec}': {source}"))]
177    InvalidNumber {
178        /// The original spec string.
179        spec: String,
180        /// The parse error returned by `u64::from_str`.
181        source: std::num::ParseIntError,
182    },
183
184    /// The parsed numeric value was zero.
185    #[snafu(display("bucket value must be > 0 (got {value}) in '{spec}'"))]
186    NonPositive {
187        /// The original spec string.
188        spec: String,
189        /// The parsed numeric value.
190        value: u64,
191    },
192
193    /// The parsed numeric value did not fit in a `u32`.
194    #[snafu(display("bucket value too large for u32 (got {value}) in '{spec}'"))]
195    TooLarge {
196        /// The original spec string.
197        spec: String,
198        /// The parsed numeric value.
199        value: u64,
200    },
201
202    /// The spec used an unsupported unit suffix.
203    #[snafu(display("unknown time bucket unit '{unit}' in '{spec}' (expected s|m|h|d)"))]
204    UnknownUnit {
205        /// The original spec string.
206        spec: String,
207        /// The unrecognized unit suffix.
208        unit: String,
209    },
210}
211
212/// Granularity for time buckets used by coverage/bitmap logic.
213///
214/// This does not affect physical storage directly, but describes how the time
215/// axis is discretized when building coverage bitmaps and computing gaps.
216#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
217pub enum TimeBucket {
218    /// A bucket spanning a fixed number of seconds.
219    Seconds(u32),
220    /// A bucket spanning a fixed number of minutes.
221    Minutes(u32),
222    /// A bucket spanning a fixed number of hours.
223    Hours(u32),
224    /// A bucket spanning a fixed number of days.
225    Days(u32),
226}
227
228impl FromStr for TimeBucket {
229    type Err = ParseTimeBucketError;
230
231    fn from_str(input: &str) -> Result<Self, Self::Err> {
232        let spec = input.trim();
233        if spec.is_empty() {
234            return Err(ParseTimeBucketError::Empty);
235        }
236
237        // Split into numeric prefix + unit suffix (unit starts at first alphabetic char).
238        let unit_start = spec
239            .char_indices()
240            .find(|(_, c)| c.is_ascii_alphabetic())
241            .map(|(i, _)| i);
242
243        let Some(unit_start) = unit_start else {
244            return Err(ParseTimeBucketError::MissingUnit {
245                spec: spec.to_string(),
246            });
247        };
248
249        if unit_start == 0 {
250            // No leading digits (e.g. "h")
251            return Err(ParseTimeBucketError::MissingNumber {
252                spec: spec.to_string(),
253            });
254        }
255
256        let (num_str, unit_str) = spec.split_at(unit_start);
257        let num_str = num_str.trim();
258        let unit_str = unit_str.trim();
259
260        if unit_str.is_empty() {
261            return Err(ParseTimeBucketError::MissingUnit {
262                spec: spec.to_string(),
263            });
264        }
265
266        let value: u64 = num_str
267            .parse()
268            .map_err(|source| ParseTimeBucketError::InvalidNumber {
269                spec: spec.to_string(),
270                source,
271            })?;
272
273        if value == 0 {
274            return Err(ParseTimeBucketError::NonPositive {
275                spec: spec.to_string(),
276                value,
277            });
278        }
279
280        if value > u32::MAX as u64 {
281            return Err(ParseTimeBucketError::TooLarge {
282                spec: spec.to_string(),
283                value,
284            });
285        }
286
287        let v = value as u32;
288        let unit = unit_str.to_ascii_lowercase();
289
290        match unit.as_str() {
291            "s" | "sec" | "secs" | "second" | "seconds" => Ok(TimeBucket::Seconds(v)),
292            "m" | "min" | "mins" | "minute" | "minutes" => Ok(TimeBucket::Minutes(v)),
293            "h" | "hr" | "hrs" | "hour" | "hours" => Ok(TimeBucket::Hours(v)),
294            "d" | "day" | "days" => Ok(TimeBucket::Days(v)),
295            _ => Err(ParseTimeBucketError::UnknownUnit {
296                spec: spec.to_string(),
297                unit: unit_str.to_string(),
298            }),
299        }
300    }
301}
302
303impl TimeBucket {
304    /// Parse a human-friendly time bucket spec (e.g. `1h`, `15m`, `30s`, `2d`).
305    ///
306    /// This is a convenience wrapper around `str::parse` for `TimeBucket`, and
307    /// accepts common unit aliases (e.g. `sec`, `min`, `hr`, `day`).
308    ///
309    /// # Errors
310    /// Returns [`ParseTimeBucketError`] if the spec is empty, missing a unit,
311    /// has an invalid or non-positive number, overflows `u32`, or uses an
312    /// unsupported unit.
313    pub fn parse(spec: &str) -> Result<Self, ParseTimeBucketError> {
314        spec.parse()
315    }
316}
317
318/// Configuration for the time index of a time-series table.
319///
320/// In v0.1 this is assumed to exist for all "time-series" tables; a future
321/// `TableKind::Generic` may omit it.
322#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
323pub struct TimeIndexSpec {
324    /// Name of the timestamp column (for example, `"ts"` or `"timestamp"`).
325    pub timestamp_column: String,
326
327    /// Optional entity/symbol columns that help partition the time axis
328    /// (for example, `["symbol"]` or `["symbol", "venue"]`).
329    ///
330    /// This is metadata only; enforcement and partitioning are handled by
331    /// higher layers.
332    #[serde(default)]
333    pub entity_columns: Vec<String>,
334
335    /// Logical bucket size used by coverage bitmaps (for example, 1 minute, 1 hour).
336    pub bucket: TimeBucket,
337
338    /// Optional IANA timezone identifier (for example, `"America/New_York"`).
339    ///
340    /// For v0.1 this is primarily reserved for future use; timestamps are
341    /// generally expected to be stored in UTC.
342    #[serde(default, skip_serializing_if = "Option::is_none")]
343    pub timezone: Option<String>,
344}
345
346#[cfg(test)]
347mod tests {
348    use crate::metadata::logical_schema::{LogicalDataType, LogicalField};
349
350    use super::*;
351    use chrono::TimeZone;
352    use serde_json::Value;
353
354    fn utc_datetime(
355        year: i32,
356        month: u32,
357        day: u32,
358        hour: u32,
359        minute: u32,
360        second: u32,
361    ) -> DateTime<Utc> {
362        Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
363            .single()
364            .expect("valid UTC timestamp")
365    }
366
367    fn sample_time_index_spec() -> TimeIndexSpec {
368        TimeIndexSpec {
369            timestamp_column: "ts".to_string(),
370            entity_columns: vec!["symbol".to_string()],
371            bucket: TimeBucket::Minutes(1),
372            timezone: None,
373        }
374    }
375
376    #[test]
377    fn table_meta_json_roundtrip_with_entity_identity_none() {
378        let meta = TableMeta {
379            kind: TableKind::TimeSeries(sample_time_index_spec()),
380            logical_schema: None,
381            created_at: utc_datetime(2025, 1, 1, 0, 0, 0),
382            format_version: TABLE_FORMAT_VERSION,
383            entity_identity: None,
384        };
385
386        let json = serde_json::to_string(&meta).unwrap();
387        let value: Value = serde_json::from_str(&json).unwrap();
388        assert!(value.get("entity_identity").is_none());
389
390        let back: TableMeta = serde_json::from_str(&json).unwrap();
391        assert_eq!(back.entity_identity, None);
392        assert_eq!(back, meta);
393    }
394
395    #[test]
396    fn table_meta_json_roundtrip_with_entity_identity_some() {
397        let entity_identity = BTreeMap::from([
398            ("symbol".to_string(), "AAPL".to_string()),
399            ("venue".to_string(), "NASDAQ".to_string()),
400        ]);
401        let meta = TableMeta {
402            kind: TableKind::TimeSeries(sample_time_index_spec()),
403            logical_schema: None,
404            created_at: utc_datetime(2025, 1, 1, 0, 0, 0),
405            format_version: TABLE_FORMAT_VERSION,
406            entity_identity: Some(entity_identity.clone()),
407        };
408
409        let json = serde_json::to_string(&meta).unwrap();
410        let value: Value = serde_json::from_str(&json).unwrap();
411        assert!(value.get("entity_identity").is_some());
412
413        let back: TableMeta = serde_json::from_str(&json).unwrap();
414        assert_eq!(back.entity_identity, Some(entity_identity));
415        assert_eq!(back, meta);
416    }
417
418    #[test]
419    fn table_meta_arrow_schema_ref_requires_logical_schema() {
420        let meta = TableMeta::new_time_series(sample_time_index_spec());
421        let err = meta.arrow_schema_ref().unwrap_err();
422        assert!(matches!(err, TableMetaSchemaError::MissingCanonicalSchema));
423    }
424
425    #[test]
426    fn table_meta_arrow_schema_ref_propagates_convert_error() {
427        let logical = LogicalSchema::new(vec![LogicalField {
428            name: "legacy_ts".to_string(),
429            data_type: LogicalDataType::Int96,
430            nullable: false,
431        }])
432        .expect("valid schema structure");
433        let meta = TableMeta::new_time_series_with_schema(sample_time_index_spec(), logical);
434
435        let err = meta.arrow_schema_ref().unwrap_err();
436        assert!(
437            matches!(
438                &err,
439                TableMetaSchemaError::Convert {
440                    source: SchemaConvertError::Int96Unsupported { column }
441                } if column == "legacy_ts"
442            ),
443            "unexpected error: {err:?}"
444        );
445    }
446
447    #[test]
448    fn time_bucket_parse_accepts_basic_units() {
449        let cases = [
450            ("1s", TimeBucket::Seconds(1)),
451            ("2m", TimeBucket::Minutes(2)),
452            ("3h", TimeBucket::Hours(3)),
453            ("4d", TimeBucket::Days(4)),
454        ];
455
456        for (input, expected) in cases {
457            assert_eq!(input.parse::<TimeBucket>().unwrap(), expected);
458        }
459    }
460
461    #[test]
462    fn time_bucket_parse_accepts_aliases_case_and_whitespace() {
463        let cases = [
464            ("1sec", TimeBucket::Seconds(1)),
465            ("1secs", TimeBucket::Seconds(1)),
466            ("1second", TimeBucket::Seconds(1)),
467            ("1seconds", TimeBucket::Seconds(1)),
468            ("1min", TimeBucket::Minutes(1)),
469            ("1mins", TimeBucket::Minutes(1)),
470            ("1minute", TimeBucket::Minutes(1)),
471            ("1minutes", TimeBucket::Minutes(1)),
472            ("1hr", TimeBucket::Hours(1)),
473            ("1hrs", TimeBucket::Hours(1)),
474            ("1hour", TimeBucket::Hours(1)),
475            ("1hours", TimeBucket::Hours(1)),
476            ("1day", TimeBucket::Days(1)),
477            ("1days", TimeBucket::Days(1)),
478            ("1H", TimeBucket::Hours(1)),
479            ("1MiN", TimeBucket::Minutes(1)),
480            ("  2h", TimeBucket::Hours(2)),
481            ("3d  ", TimeBucket::Days(3)),
482            ("  4m  ", TimeBucket::Minutes(4)),
483            ("1 h", TimeBucket::Hours(1)),
484        ];
485
486        for (input, expected) in cases {
487            assert_eq!(input.parse::<TimeBucket>().unwrap(), expected);
488        }
489    }
490
491    #[test]
492    fn time_bucket_parse_rejects_empty_or_whitespace() {
493        let cases = ["", "   ", "\n\t"];
494        for input in cases {
495            let err = input.parse::<TimeBucket>().unwrap_err();
496            assert!(matches!(err, ParseTimeBucketError::Empty));
497        }
498    }
499
500    #[test]
501    fn time_bucket_parse_rejects_missing_number() {
502        let cases = ["h", " hr", "day", "abcmin"];
503        for input in cases {
504            let err = input.parse::<TimeBucket>().unwrap_err();
505            assert!(
506                matches!(err, ParseTimeBucketError::MissingNumber { .. }),
507                "expected MissingNumber for {input:?}, got {err:?}"
508            );
509        }
510    }
511
512    #[test]
513    fn time_bucket_parse_rejects_missing_unit() {
514        let cases = ["1", "  42  "];
515        for input in cases {
516            let err = input.parse::<TimeBucket>().unwrap_err();
517            assert!(
518                matches!(err, ParseTimeBucketError::MissingUnit { .. }),
519                "expected MissingUnit for {input:?}, got {err:?}"
520            );
521        }
522    }
523
524    #[test]
525    fn time_bucket_parse_rejects_invalid_number() {
526        let cases = ["1.5h", "1_000s"];
527        for input in cases {
528            let err = input.parse::<TimeBucket>().unwrap_err();
529            assert!(
530                matches!(err, ParseTimeBucketError::InvalidNumber { .. }),
531                "expected InvalidNumber for {input:?}, got {err:?}"
532            );
533        }
534    }
535
536    #[test]
537    fn time_bucket_parse_rejects_non_positive() {
538        let cases = ["0s", "0m"];
539        for input in cases {
540            let err = input.parse::<TimeBucket>().unwrap_err();
541            assert!(
542                matches!(err, ParseTimeBucketError::NonPositive { value: 0, .. }),
543                "expected NonPositive for {input:?}, got {err:?}"
544            );
545        }
546    }
547
548    #[test]
549    fn time_bucket_parse_rejects_too_large() {
550        let too_large = (u32::MAX as u64 + 1).to_string();
551        let input = format!("{too_large}h");
552        let err = input.parse::<TimeBucket>().unwrap_err();
553        assert!(
554            matches!(err, ParseTimeBucketError::TooLarge { value, .. } if value == u32::MAX as u64 + 1),
555            "expected TooLarge for {input:?}, got {err:?}"
556        );
557    }
558
559    #[test]
560    fn time_bucket_parse_rejects_unknown_units() {
561        let cases = ["1w", "1ms", "1mo", "10msec"];
562        for input in cases {
563            let err = input.parse::<TimeBucket>().unwrap_err();
564            assert!(
565                matches!(err, ParseTimeBucketError::UnknownUnit { .. }),
566                "expected UnknownUnit for {input:?}, got {err:?}"
567            );
568        }
569    }
570
571    #[test]
572    fn time_bucket_parse_matches_from_str() {
573        let via_method = TimeBucket::parse("5m").unwrap();
574        let via_trait: TimeBucket = "5m".parse().unwrap();
575        assert_eq!(via_method, via_trait);
576    }
577}