Skip to main content

dora_message/
metadata.rs

1use std::convert::TryFrom;
2use std::{collections::BTreeMap, fmt};
3
4use arrow_schema::DataType;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8/// Additional data that is sent as part of output messages.
9///
10/// Includes a timestamp, type information, and additional user-provided parameters.
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub struct Metadata {
13    metadata_version: u16,
14    timestamp: uhlc::Timestamp,
15    pub type_info: ArrowTypeInfo,
16    pub parameters: MetadataParameters,
17}
18
19impl Metadata {
20    pub fn new(timestamp: uhlc::Timestamp, type_info: ArrowTypeInfo) -> Self {
21        Self::from_parameters(timestamp, type_info, Default::default())
22    }
23
24    pub fn from_parameters(
25        timestamp: uhlc::Timestamp,
26        type_info: ArrowTypeInfo,
27        parameters: MetadataParameters,
28    ) -> Self {
29        Self {
30            metadata_version: 0,
31            timestamp,
32            parameters,
33            type_info,
34        }
35    }
36
37    pub fn timestamp(&self) -> uhlc::Timestamp {
38        self.timestamp
39    }
40
41    /// Returns a raw metadata parameter by key.
42    ///
43    /// This method only accesses user-provided metadata parameters.
44    /// It does **not** return fields such as the timestamp or type information.
45    /// For example, `get("timestamp")` is not equivalent to [`Metadata::timestamp`].
46    pub fn get(&self, key: &str) -> Option<&Parameter> {
47        self.parameters.get(key)
48    }
49
50    /// Returns the parameter for `key` converted to `T`, or `default` if missing or wrong type.
51    pub fn get_or<'a, T>(&'a self, key: &str, default: T) -> T
52    where
53        T: TryFrom<&'a Parameter>,
54    {
55        self.parameters
56            .get(key)
57            .and_then(|p| T::try_from(p).ok())
58            .unwrap_or(default)
59    }
60
61    pub fn open_telemetry_context(&self) -> String {
62        self.get("open_telemetry_context")
63            .and_then(|p| String::try_from(p).ok())
64            .unwrap_or_default()
65    }
66}
67
68/// Additional metadata that can be sent as part of output messages.
69pub type MetadataParameters = BTreeMap<String, Parameter>;
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub struct ArrowTypeInfo {
73    pub data_type: DataType,
74    pub len: usize,
75    pub null_count: usize,
76    pub validity: Option<Vec<u8>>,
77    pub offset: usize,
78    pub buffer_offsets: Vec<BufferOffset>,
79    pub child_data: Vec<ArrowTypeInfo>,
80}
81
82#[derive(Debug, Clone)]
83pub struct TryFromParameterError {
84    pub expected: &'static str,
85    pub found: &'static str,
86}
87
88impl fmt::Display for TryFromParameterError {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        write!(f, "expected {}, found {}", self.expected, self.found)
91    }
92}
93
94impl std::error::Error for TryFromParameterError {}
95/// A metadata parameter that can be sent as part of output messages.
96#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
97pub enum Parameter {
98    Bool(bool),
99    Integer(i64),
100    String(String),
101    ListInt(Vec<i64>),
102    Float(f64),
103    ListFloat(Vec<f64>),
104    ListString(Vec<String>),
105    Timestamp(DateTime<Utc>),
106}
107
108impl Parameter {
109    pub(crate) fn variant_name(&self) -> &'static str {
110        match self {
111            Parameter::Bool(_) => "bool",
112            Parameter::Integer(_) => "integer",
113            Parameter::String(_) => "string",
114            Parameter::ListInt(_) => "list<i64>",
115            Parameter::Float(_) => "float",
116            Parameter::ListFloat(_) => "list<f64>",
117            Parameter::ListString(_) => "list<string>",
118            Parameter::Timestamp(_) => "timestamp",
119        }
120    }
121}
122
123impl TryFrom<&Parameter> for bool {
124    type Error = TryFromParameterError;
125
126    fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
127        match value {
128            Parameter::Bool(value) => Ok(*value),
129            other => Err(TryFromParameterError {
130                expected: "bool",
131                found: other.variant_name(),
132            }),
133        }
134    }
135}
136
137impl TryFrom<&Parameter> for String {
138    type Error = TryFromParameterError;
139
140    fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
141        match value {
142            Parameter::String(val) => Ok(val.clone()),
143            other => Err(TryFromParameterError {
144                expected: "string",
145                found: other.variant_name(),
146            }),
147        }
148    }
149}
150
151impl<'a> TryFrom<&'a Parameter> for &'a str {
152    type Error = TryFromParameterError;
153
154    fn try_from(value: &'a Parameter) -> Result<Self, Self::Error> {
155        match value {
156            Parameter::String(v) => Ok(v.as_str()),
157            other => Err(TryFromParameterError {
158                expected: "&str",
159                found: other.variant_name(),
160            }),
161        }
162    }
163}
164
165impl TryFrom<&Parameter> for i64 {
166    type Error = TryFromParameterError;
167
168    fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
169        match value {
170            Parameter::Integer(v) => Ok(*v),
171            other => Err(TryFromParameterError {
172                expected: "i64",
173                found: other.variant_name(),
174            }),
175        }
176    }
177}
178
179impl TryFrom<&Parameter> for f64 {
180    type Error = TryFromParameterError;
181
182    fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
183        match value {
184            Parameter::Float(val) => Ok(*val),
185            other => Err(TryFromParameterError {
186                expected: "f64",
187                found: other.variant_name(),
188            }),
189        }
190    }
191}
192
193impl TryFrom<&Parameter> for Vec<i64> {
194    type Error = TryFromParameterError;
195
196    fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
197        match value {
198            Parameter::ListInt(v) => Ok(v.clone()),
199            other => Err(TryFromParameterError {
200                expected: "list<i64>",
201                found: other.variant_name(),
202            }),
203        }
204    }
205}
206
207impl<'a> TryFrom<&'a Parameter> for &'a [i64] {
208    type Error = TryFromParameterError;
209
210    fn try_from(value: &'a Parameter) -> Result<Self, Self::Error> {
211        match value {
212            Parameter::ListInt(v) => Ok(v.as_slice()),
213            other => Err(TryFromParameterError {
214                expected: "&[i64]",
215                found: other.variant_name(),
216            }),
217        }
218    }
219}
220
221impl TryFrom<&Parameter> for Vec<f64> {
222    type Error = TryFromParameterError;
223
224    fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
225        match value {
226            Parameter::ListFloat(val) => Ok(val.clone()),
227            other => Err(TryFromParameterError {
228                expected: "list<f64>",
229                found: other.variant_name(),
230            }),
231        }
232    }
233}
234
235impl<'a> TryFrom<&'a Parameter> for &'a [f64] {
236    type Error = TryFromParameterError;
237
238    fn try_from(value: &'a Parameter) -> Result<Self, Self::Error> {
239        match value {
240            Parameter::ListFloat(v) => Ok(v.as_slice()),
241            other => Err(TryFromParameterError {
242                expected: "&[f64]",
243                found: other.variant_name(),
244            }),
245        }
246    }
247}
248
249impl TryFrom<&Parameter> for Vec<String> {
250    type Error = TryFromParameterError;
251
252    fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
253        match value {
254            Parameter::ListString(v) => Ok(v.clone()),
255            other => Err(TryFromParameterError {
256                expected: "list<string>",
257                found: other.variant_name(),
258            }),
259        }
260    }
261}
262
263impl<'a> TryFrom<&'a Parameter> for &'a [String] {
264    type Error = TryFromParameterError;
265
266    fn try_from(value: &'a Parameter) -> Result<Self, Self::Error> {
267        match value {
268            Parameter::ListString(v) => Ok(v.as_slice()),
269            other => Err(TryFromParameterError {
270                expected: "&[String]",
271                found: other.variant_name(),
272            }),
273        }
274    }
275}
276
277#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
278pub struct BufferOffset {
279    pub offset: usize,
280    pub len: usize,
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use std::convert::TryFrom;
287
288    #[test]
289    fn try_from_bool_ok() {
290        let p = Parameter::Bool(true);
291        let v = bool::try_from(&p).unwrap();
292        assert!(v);
293    }
294
295    #[test]
296    fn try_from_bool_type_mismatch() {
297        let p = Parameter::Integer(1);
298        let err = bool::try_from(&p).unwrap_err();
299        assert!(err.to_string().contains("expected bool"));
300    }
301
302    #[test]
303    fn try_from_i64_ok() {
304        let p = Parameter::Integer(42);
305        let v = i64::try_from(&p).unwrap();
306        assert_eq!(v, 42);
307    }
308
309    #[test]
310    fn try_from_i64_type_mismatch() {
311        let p = Parameter::Float(1.0);
312        let err = i64::try_from(&p).unwrap_err();
313        assert!(err.to_string().contains("expected i64"));
314    }
315
316    #[test]
317    fn try_from_f64_ok() {
318        let p = Parameter::Float(1.0);
319        let val = f64::try_from(&p).unwrap();
320        assert_eq!(val, 1.0);
321    }
322
323    #[test]
324    fn try_from_f64_type_mismatch() {
325        let p = Parameter::Integer(50);
326        let err = f64::try_from(&p).unwrap_err();
327        assert!(err.to_string().contains("expected f64"));
328    }
329
330    #[test]
331    fn try_from_string_ok() {
332        let p = Parameter::String(String::from("welcome"));
333        let val = String::try_from(&p).unwrap();
334        assert_eq!(val, String::from("welcome"));
335    }
336
337    #[test]
338    fn try_from_string_type_mismatch() {
339        let p = Parameter::Integer(5);
340        let err = String::try_from(&p).unwrap_err();
341        assert!(err.to_string().contains("expected string"));
342    }
343
344    #[test]
345    fn try_from_str_ok() {
346        let p = Parameter::String("welcome".into());
347        let v: &str = <&str>::try_from(&p).unwrap();
348        assert_eq!(v, "welcome");
349    }
350
351    #[test]
352    fn try_from_str_type_mismatch() {
353        let p = Parameter::Integer(5);
354        let err = <&str>::try_from(&p).unwrap_err();
355        assert!(err.to_string().contains("&str"));
356    }
357
358    #[test]
359    fn try_from_vec_i64_ok() {
360        let p = Parameter::ListInt(vec![1, 2, 3]);
361        let v = Vec::<i64>::try_from(&p).unwrap();
362        assert_eq!(v, vec![1, 2, 3]);
363    }
364
365    #[test]
366    fn try_from_vec_i64_type_mismatch() {
367        let p = Parameter::ListFloat(vec![1.0]);
368        let err = Vec::<i64>::try_from(&p).unwrap_err();
369        assert!(err.to_string().contains("list<i64>"));
370    }
371
372    #[test]
373    fn try_from_vec_f64_ok() {
374        let p = Parameter::ListFloat(vec![1.0, 2.0]);
375        let v = Vec::<f64>::try_from(&p).unwrap();
376        assert_eq!(v, vec![1.0, 2.0]);
377    }
378
379    #[test]
380    fn try_from_vec_f64_type_mismatch() {
381        let p = Parameter::ListInt(vec![1, 2]);
382        let err = Vec::<f64>::try_from(&p).unwrap_err();
383        assert!(err.to_string().contains("list<f64>"));
384    }
385
386    #[test]
387    fn try_from_vec_string_ok() {
388        let p = Parameter::ListString(vec!["a".into(), "b".into()]);
389        let v = Vec::<String>::try_from(&p).unwrap();
390        assert_eq!(v, vec!["a", "b"]);
391    }
392
393    #[test]
394    fn try_from_vec_string_type_mismatch() {
395        let p = Parameter::String("x".into());
396        let err = Vec::<String>::try_from(&p).unwrap_err();
397        assert!(err.to_string().contains("list<string>"));
398    }
399
400    #[test]
401    fn try_from_slice_i64_ok() {
402        let p = Parameter::ListInt(vec![1, 2, 3]);
403        let v: &[i64] = <&[i64]>::try_from(&p).unwrap();
404        assert_eq!(v, &[1, 2, 3]);
405    }
406
407    #[test]
408    fn try_from_slice_i64_type_mismatch() {
409        let p = Parameter::ListFloat(vec![1.0]);
410        let err = <&[i64]>::try_from(&p).unwrap_err();
411        assert!(err.to_string().contains("&[i64]"));
412    }
413
414    #[test]
415    fn try_from_slice_f64_ok() {
416        let p = Parameter::ListFloat(vec![1.0, 2.0]);
417        let v: &[f64] = <&[f64]>::try_from(&p).unwrap();
418        assert_eq!(v, &[1.0, 2.0]);
419    }
420
421    #[test]
422    fn try_from_slice_f64_type_mismatch() {
423        let p = Parameter::ListInt(vec![1, 2]);
424        let err = <&[f64]>::try_from(&p).unwrap_err();
425        assert!(err.to_string().contains("&[f64]"));
426    }
427    #[test]
428    fn try_from_slice_string_ok() {
429        let p = Parameter::ListString(vec!["a".into(), "b".into()]);
430        let v: &[String] = <&[String]>::try_from(&p).unwrap();
431        assert_eq!(v, &["a", "b"]);
432    }
433
434    #[test]
435    fn try_from_slice_string_type_mismatch() {
436        let p = Parameter::String("x".into());
437        let err = <&[String]>::try_from(&p).unwrap_err();
438        assert!(err.to_string().contains("&[String]"));
439    }
440
441    #[test]
442    fn get_or_existing_key() {
443        let p = Parameter::Bool(false);
444        let mut params = MetadataParameters::new();
445        params.insert("wait".into(), p);
446        let ts = uhlc::HLC::default().new_timestamp();
447        let type_info = ArrowTypeInfo {
448            data_type: arrow_schema::DataType::Null,
449            len: 0,
450            null_count: 0,
451            validity: None,
452            offset: 0,
453            buffer_offsets: vec![],
454            child_data: vec![],
455        };
456        let m = Metadata::from_parameters(ts, type_info, params);
457        assert_eq!(m.get_or("wait", true), false);
458    }
459
460    #[test]
461    fn get_or_missing_key_returns_default() {
462        let ts = uhlc::HLC::default().new_timestamp();
463        let type_info = ArrowTypeInfo {
464            data_type: arrow_schema::DataType::Null,
465            len: 0,
466            null_count: 0,
467            validity: None,
468            offset: 0,
469            buffer_offsets: vec![],
470            child_data: vec![],
471        };
472        let m = Metadata::new(ts, type_info);
473        assert_eq!(m.get_or("timeout", 42_i64), 42);
474    }
475
476    #[test]
477    fn get_or_type_mismatch_returns_default() {
478        let ts = uhlc::HLC::default().new_timestamp();
479        let type_info = ArrowTypeInfo {
480            data_type: arrow_schema::DataType::Null,
481            len: 0,
482            null_count: 0,
483            validity: None,
484            offset: 0,
485            buffer_offsets: vec![],
486            child_data: vec![],
487        };
488        let mut params = MetadataParameters::new();
489        params.insert("count".into(), Parameter::Integer(5));
490        let m = Metadata::from_parameters(ts, type_info, params);
491        assert_eq!(m.get_or("count", true), true);
492    }
493}