1use std::convert::TryFrom;
2use std::{collections::BTreeMap, fmt};
3
4use arrow_schema::DataType;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub struct Metadata {
13 metadata_version: u16,
14 timestamp: uhlc::Timestamp,
15 pub type_info: ArrowTypeInfo,
16 pub parameters: MetadataParameters,
17}
18
19impl Metadata {
20 pub fn new(timestamp: uhlc::Timestamp, type_info: ArrowTypeInfo) -> Self {
21 Self::from_parameters(timestamp, type_info, Default::default())
22 }
23
24 pub fn from_parameters(
25 timestamp: uhlc::Timestamp,
26 type_info: ArrowTypeInfo,
27 parameters: MetadataParameters,
28 ) -> Self {
29 Self {
30 metadata_version: 0,
31 timestamp,
32 parameters,
33 type_info,
34 }
35 }
36
37 pub fn timestamp(&self) -> uhlc::Timestamp {
38 self.timestamp
39 }
40
41 pub fn get(&self, key: &str) -> Option<&Parameter> {
47 self.parameters.get(key)
48 }
49
50 pub fn get_or<'a, T>(&'a self, key: &str, default: T) -> T
52 where
53 T: TryFrom<&'a Parameter>,
54 {
55 self.parameters
56 .get(key)
57 .and_then(|p| T::try_from(p).ok())
58 .unwrap_or(default)
59 }
60
61 pub fn open_telemetry_context(&self) -> String {
62 self.get("open_telemetry_context")
63 .and_then(|p| String::try_from(p).ok())
64 .unwrap_or_default()
65 }
66}
67
68pub type MetadataParameters = BTreeMap<String, Parameter>;
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72pub struct ArrowTypeInfo {
73 pub data_type: DataType,
74 pub len: usize,
75 pub null_count: usize,
76 pub validity: Option<Vec<u8>>,
77 pub offset: usize,
78 pub buffer_offsets: Vec<BufferOffset>,
79 pub child_data: Vec<ArrowTypeInfo>,
80}
81
82#[derive(Debug, Clone)]
83pub struct TryFromParameterError {
84 pub expected: &'static str,
85 pub found: &'static str,
86}
87
88impl fmt::Display for TryFromParameterError {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 write!(f, "expected {}, found {}", self.expected, self.found)
91 }
92}
93
94impl std::error::Error for TryFromParameterError {}
95#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
97pub enum Parameter {
98 Bool(bool),
99 Integer(i64),
100 String(String),
101 ListInt(Vec<i64>),
102 Float(f64),
103 ListFloat(Vec<f64>),
104 ListString(Vec<String>),
105 Timestamp(DateTime<Utc>),
106}
107
108impl Parameter {
109 pub(crate) fn variant_name(&self) -> &'static str {
110 match self {
111 Parameter::Bool(_) => "bool",
112 Parameter::Integer(_) => "integer",
113 Parameter::String(_) => "string",
114 Parameter::ListInt(_) => "list<i64>",
115 Parameter::Float(_) => "float",
116 Parameter::ListFloat(_) => "list<f64>",
117 Parameter::ListString(_) => "list<string>",
118 Parameter::Timestamp(_) => "timestamp",
119 }
120 }
121}
122
123impl TryFrom<&Parameter> for bool {
124 type Error = TryFromParameterError;
125
126 fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
127 match value {
128 Parameter::Bool(value) => Ok(*value),
129 other => Err(TryFromParameterError {
130 expected: "bool",
131 found: other.variant_name(),
132 }),
133 }
134 }
135}
136
137impl TryFrom<&Parameter> for String {
138 type Error = TryFromParameterError;
139
140 fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
141 match value {
142 Parameter::String(val) => Ok(val.clone()),
143 other => Err(TryFromParameterError {
144 expected: "string",
145 found: other.variant_name(),
146 }),
147 }
148 }
149}
150
151impl<'a> TryFrom<&'a Parameter> for &'a str {
152 type Error = TryFromParameterError;
153
154 fn try_from(value: &'a Parameter) -> Result<Self, Self::Error> {
155 match value {
156 Parameter::String(v) => Ok(v.as_str()),
157 other => Err(TryFromParameterError {
158 expected: "&str",
159 found: other.variant_name(),
160 }),
161 }
162 }
163}
164
165impl TryFrom<&Parameter> for i64 {
166 type Error = TryFromParameterError;
167
168 fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
169 match value {
170 Parameter::Integer(v) => Ok(*v),
171 other => Err(TryFromParameterError {
172 expected: "i64",
173 found: other.variant_name(),
174 }),
175 }
176 }
177}
178
179impl TryFrom<&Parameter> for f64 {
180 type Error = TryFromParameterError;
181
182 fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
183 match value {
184 Parameter::Float(val) => Ok(*val),
185 other => Err(TryFromParameterError {
186 expected: "f64",
187 found: other.variant_name(),
188 }),
189 }
190 }
191}
192
193impl TryFrom<&Parameter> for Vec<i64> {
194 type Error = TryFromParameterError;
195
196 fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
197 match value {
198 Parameter::ListInt(v) => Ok(v.clone()),
199 other => Err(TryFromParameterError {
200 expected: "list<i64>",
201 found: other.variant_name(),
202 }),
203 }
204 }
205}
206
207impl<'a> TryFrom<&'a Parameter> for &'a [i64] {
208 type Error = TryFromParameterError;
209
210 fn try_from(value: &'a Parameter) -> Result<Self, Self::Error> {
211 match value {
212 Parameter::ListInt(v) => Ok(v.as_slice()),
213 other => Err(TryFromParameterError {
214 expected: "&[i64]",
215 found: other.variant_name(),
216 }),
217 }
218 }
219}
220
221impl TryFrom<&Parameter> for Vec<f64> {
222 type Error = TryFromParameterError;
223
224 fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
225 match value {
226 Parameter::ListFloat(val) => Ok(val.clone()),
227 other => Err(TryFromParameterError {
228 expected: "list<f64>",
229 found: other.variant_name(),
230 }),
231 }
232 }
233}
234
235impl<'a> TryFrom<&'a Parameter> for &'a [f64] {
236 type Error = TryFromParameterError;
237
238 fn try_from(value: &'a Parameter) -> Result<Self, Self::Error> {
239 match value {
240 Parameter::ListFloat(v) => Ok(v.as_slice()),
241 other => Err(TryFromParameterError {
242 expected: "&[f64]",
243 found: other.variant_name(),
244 }),
245 }
246 }
247}
248
249impl TryFrom<&Parameter> for Vec<String> {
250 type Error = TryFromParameterError;
251
252 fn try_from(value: &Parameter) -> Result<Self, Self::Error> {
253 match value {
254 Parameter::ListString(v) => Ok(v.clone()),
255 other => Err(TryFromParameterError {
256 expected: "list<string>",
257 found: other.variant_name(),
258 }),
259 }
260 }
261}
262
263impl<'a> TryFrom<&'a Parameter> for &'a [String] {
264 type Error = TryFromParameterError;
265
266 fn try_from(value: &'a Parameter) -> Result<Self, Self::Error> {
267 match value {
268 Parameter::ListString(v) => Ok(v.as_slice()),
269 other => Err(TryFromParameterError {
270 expected: "&[String]",
271 found: other.variant_name(),
272 }),
273 }
274 }
275}
276
277#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
278pub struct BufferOffset {
279 pub offset: usize,
280 pub len: usize,
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use std::convert::TryFrom;
287
288 #[test]
289 fn try_from_bool_ok() {
290 let p = Parameter::Bool(true);
291 let v = bool::try_from(&p).unwrap();
292 assert!(v);
293 }
294
295 #[test]
296 fn try_from_bool_type_mismatch() {
297 let p = Parameter::Integer(1);
298 let err = bool::try_from(&p).unwrap_err();
299 assert!(err.to_string().contains("expected bool"));
300 }
301
302 #[test]
303 fn try_from_i64_ok() {
304 let p = Parameter::Integer(42);
305 let v = i64::try_from(&p).unwrap();
306 assert_eq!(v, 42);
307 }
308
309 #[test]
310 fn try_from_i64_type_mismatch() {
311 let p = Parameter::Float(1.0);
312 let err = i64::try_from(&p).unwrap_err();
313 assert!(err.to_string().contains("expected i64"));
314 }
315
316 #[test]
317 fn try_from_f64_ok() {
318 let p = Parameter::Float(1.0);
319 let val = f64::try_from(&p).unwrap();
320 assert_eq!(val, 1.0);
321 }
322
323 #[test]
324 fn try_from_f64_type_mismatch() {
325 let p = Parameter::Integer(50);
326 let err = f64::try_from(&p).unwrap_err();
327 assert!(err.to_string().contains("expected f64"));
328 }
329
330 #[test]
331 fn try_from_string_ok() {
332 let p = Parameter::String(String::from("welcome"));
333 let val = String::try_from(&p).unwrap();
334 assert_eq!(val, String::from("welcome"));
335 }
336
337 #[test]
338 fn try_from_string_type_mismatch() {
339 let p = Parameter::Integer(5);
340 let err = String::try_from(&p).unwrap_err();
341 assert!(err.to_string().contains("expected string"));
342 }
343
344 #[test]
345 fn try_from_str_ok() {
346 let p = Parameter::String("welcome".into());
347 let v: &str = <&str>::try_from(&p).unwrap();
348 assert_eq!(v, "welcome");
349 }
350
351 #[test]
352 fn try_from_str_type_mismatch() {
353 let p = Parameter::Integer(5);
354 let err = <&str>::try_from(&p).unwrap_err();
355 assert!(err.to_string().contains("&str"));
356 }
357
358 #[test]
359 fn try_from_vec_i64_ok() {
360 let p = Parameter::ListInt(vec![1, 2, 3]);
361 let v = Vec::<i64>::try_from(&p).unwrap();
362 assert_eq!(v, vec![1, 2, 3]);
363 }
364
365 #[test]
366 fn try_from_vec_i64_type_mismatch() {
367 let p = Parameter::ListFloat(vec![1.0]);
368 let err = Vec::<i64>::try_from(&p).unwrap_err();
369 assert!(err.to_string().contains("list<i64>"));
370 }
371
372 #[test]
373 fn try_from_vec_f64_ok() {
374 let p = Parameter::ListFloat(vec![1.0, 2.0]);
375 let v = Vec::<f64>::try_from(&p).unwrap();
376 assert_eq!(v, vec![1.0, 2.0]);
377 }
378
379 #[test]
380 fn try_from_vec_f64_type_mismatch() {
381 let p = Parameter::ListInt(vec![1, 2]);
382 let err = Vec::<f64>::try_from(&p).unwrap_err();
383 assert!(err.to_string().contains("list<f64>"));
384 }
385
386 #[test]
387 fn try_from_vec_string_ok() {
388 let p = Parameter::ListString(vec!["a".into(), "b".into()]);
389 let v = Vec::<String>::try_from(&p).unwrap();
390 assert_eq!(v, vec!["a", "b"]);
391 }
392
393 #[test]
394 fn try_from_vec_string_type_mismatch() {
395 let p = Parameter::String("x".into());
396 let err = Vec::<String>::try_from(&p).unwrap_err();
397 assert!(err.to_string().contains("list<string>"));
398 }
399
400 #[test]
401 fn try_from_slice_i64_ok() {
402 let p = Parameter::ListInt(vec![1, 2, 3]);
403 let v: &[i64] = <&[i64]>::try_from(&p).unwrap();
404 assert_eq!(v, &[1, 2, 3]);
405 }
406
407 #[test]
408 fn try_from_slice_i64_type_mismatch() {
409 let p = Parameter::ListFloat(vec![1.0]);
410 let err = <&[i64]>::try_from(&p).unwrap_err();
411 assert!(err.to_string().contains("&[i64]"));
412 }
413
414 #[test]
415 fn try_from_slice_f64_ok() {
416 let p = Parameter::ListFloat(vec![1.0, 2.0]);
417 let v: &[f64] = <&[f64]>::try_from(&p).unwrap();
418 assert_eq!(v, &[1.0, 2.0]);
419 }
420
421 #[test]
422 fn try_from_slice_f64_type_mismatch() {
423 let p = Parameter::ListInt(vec![1, 2]);
424 let err = <&[f64]>::try_from(&p).unwrap_err();
425 assert!(err.to_string().contains("&[f64]"));
426 }
427 #[test]
428 fn try_from_slice_string_ok() {
429 let p = Parameter::ListString(vec!["a".into(), "b".into()]);
430 let v: &[String] = <&[String]>::try_from(&p).unwrap();
431 assert_eq!(v, &["a", "b"]);
432 }
433
434 #[test]
435 fn try_from_slice_string_type_mismatch() {
436 let p = Parameter::String("x".into());
437 let err = <&[String]>::try_from(&p).unwrap_err();
438 assert!(err.to_string().contains("&[String]"));
439 }
440
441 #[test]
442 fn get_or_existing_key() {
443 let p = Parameter::Bool(false);
444 let mut params = MetadataParameters::new();
445 params.insert("wait".into(), p);
446 let ts = uhlc::HLC::default().new_timestamp();
447 let type_info = ArrowTypeInfo {
448 data_type: arrow_schema::DataType::Null,
449 len: 0,
450 null_count: 0,
451 validity: None,
452 offset: 0,
453 buffer_offsets: vec![],
454 child_data: vec![],
455 };
456 let m = Metadata::from_parameters(ts, type_info, params);
457 assert_eq!(m.get_or("wait", true), false);
458 }
459
460 #[test]
461 fn get_or_missing_key_returns_default() {
462 let ts = uhlc::HLC::default().new_timestamp();
463 let type_info = ArrowTypeInfo {
464 data_type: arrow_schema::DataType::Null,
465 len: 0,
466 null_count: 0,
467 validity: None,
468 offset: 0,
469 buffer_offsets: vec![],
470 child_data: vec![],
471 };
472 let m = Metadata::new(ts, type_info);
473 assert_eq!(m.get_or("timeout", 42_i64), 42);
474 }
475
476 #[test]
477 fn get_or_type_mismatch_returns_default() {
478 let ts = uhlc::HLC::default().new_timestamp();
479 let type_info = ArrowTypeInfo {
480 data_type: arrow_schema::DataType::Null,
481 len: 0,
482 null_count: 0,
483 validity: None,
484 offset: 0,
485 buffer_offsets: vec![],
486 child_data: vec![],
487 };
488 let mut params = MetadataParameters::new();
489 params.insert("count".into(), Parameter::Integer(5));
490 let m = Metadata::from_parameters(ts, type_info, params);
491 assert_eq!(m.get_or("count", true), true);
492 }
493}