Skip to main content

mpl_lang/
query.rs

1//! The query structures
2use std::{
3    collections::{HashMap, HashSet},
4    num::TryFromIntError,
5};
6
7#[cfg(feature = "clock")]
8use chrono::Utc;
9use chrono::{DateTime, Duration, FixedOffset};
10use miette::SourceSpan;
11use pest::Parser as _;
12use strumbra::SharedString;
13
14use crate::{
15    ParseError,
16    enc_regex::EncodableRegex,
17    linker::{AlignFunction, ComputeFunction, GroupFunction, MapFunction},
18    parser::{self, MPLParser, ParseParamError, Rule},
19    tags::TagValue,
20    time::{Resolution, ResolutionError},
21    types::{BucketSpec, BucketType, Dataset, Metric, Parameterized},
22};
23
24mod fmt;
25#[cfg(test)]
26mod tests;
27
28/// Metric identifier
29#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
30#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
31#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
32pub struct MetricId {
33    /// The dataset identifier or param
34    pub dataset: Parameterized<Dataset>,
35    /// The metric identifier
36    pub metric: Metric,
37}
38
39/// Time unit
40#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
41#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
42#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
43pub enum TimeUnit {
44    /// Millisecond
45    Millisecond,
46    /// Second
47    Second,
48    /// Minute
49    Minute,
50    /// Hour
51    Hour,
52    /// Day
53    Day,
54    /// Week
55    Week,
56    /// Month
57    Month,
58    /// Year
59    Year,
60}
61
62#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
63/// Relative time (1h)
64#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
65#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
66pub struct RelativeTime {
67    /// Value
68    pub value: u64,
69    /// Unit
70    pub unit: TimeUnit,
71}
72
73/// A point in time
74#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
75#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
76#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
77pub enum Time {
78    /// A time relative to now
79    Relative(RelativeTime),
80    /// A timestamp
81    Timestamp(i64),
82    /// A RFC3339 timestamp
83    RFC3339(#[cfg_attr(feature = "wasm", tsify(type = "string"))] DateTime<FixedOffset>),
84    /// A time modifier
85    Modifier(String),
86}
87
88/// A timerange between two times
89#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
90#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
91#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
92pub struct TimeRange {
93    /// Start time of the range
94    pub start: Time,
95    /// End time of the range or None for 'now'
96    pub end: Option<Time>,
97}
98
99/// The source for a query
100#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
101#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
102#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
103pub struct Source {
104    /// The metric
105    pub metric_id: MetricId,
106    /// The time range
107    pub time: Option<TimeRange>,
108}
109
110///An error relkated to value parsing
111#[derive(Debug, thiserror::Error)]
112pub enum ValueError {
113    /// Invalid float value
114    #[error("Invalid Float")]
115    BadFloat,
116}
117
118/// A comparison operator for filtering based on a value
119#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
120#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
121#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
122pub enum Cmp {
123    /// Equal to the given value
124    Eq(Parameterized<TagValue>),
125    /// Not equal to the given value
126    Ne(Parameterized<TagValue>),
127    /// Greater than the given value
128    Gt(Parameterized<TagValue>),
129    /// Greater than or equal to the given value
130    Ge(Parameterized<TagValue>),
131    /// Less than the given value
132    Lt(Parameterized<TagValue>),
133    /// Less than or equal to the given value
134    Le(Parameterized<TagValue>),
135    /// Matches the given regular expression
136    RegEx(Parameterized<EncodableRegex>),
137    /// Does not match the given regular expression
138    RegExNot(Parameterized<EncodableRegex>),
139    /// Is the given tag type
140    Is(TagType),
141}
142
143/// Rename the output as a new metric
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
146#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
147pub struct As {
148    /// The new name for the metric
149    pub name: Metric,
150}
151
152/// Filter the series
153#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
154#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
155#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
156pub enum Filter {
157    /// Logical AND of the given filters
158    And(Vec<Filter>),
159    /// Logical OR of the given filters
160    Or(Vec<Filter>),
161    /// Logical NOT of the given filters
162    Not(Box<Filter>),
163    /// Filter based on a filed
164    Cmp {
165        /// The field to filter on
166        field: String,
167        /// The compaison to perform
168        rhs: Cmp,
169    },
170}
171
172/// A Mapping function
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
174#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
175#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
176pub struct Mapping {
177    /// The function to apply
178    pub function: MapFunction,
179    /// The optional argument to pass to the function
180    pub arg: Option<f64>,
181}
182
183/// An Alignment function
184#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
185#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
186#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
187pub struct Align {
188    /// The function to apply
189    pub function: AlignFunction,
190    /// The time to align to
191    pub time: Parameterized<RelativeTime>,
192}
193
194/// A Grouping function
195#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
196#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
197#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
198pub struct GroupBy {
199    /// The location of the group by clause
200    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
201    pub span: SourceSpan,
202    /// The function to apply
203    pub function: GroupFunction,
204    /// The tags to group by
205    pub tags: Vec<String>,
206}
207
208/// A Bucketing function, applying both tag and time based aggregation
209#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
210#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
211#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
212pub struct BucketBy {
213    /// The location of the group by clause
214    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
215    pub span: SourceSpan,
216    /// The function to apply
217    pub function: BucketType,
218    /// The time to align to
219    pub time: Parameterized<RelativeTime>,
220    /// The tags to group by
221    pub tags: Vec<String>,
222    /// The buckets to produce
223    pub spec: Vec<BucketSpec>,
224}
225
226/// Possible aggregate functions
227#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
228#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
229#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
230pub enum Aggregate {
231    /// Map a function over each value
232    Map(Mapping),
233    /// Align the data to a time interval
234    Align(Align),
235    /// Group the data by tags
236    GroupBy(GroupBy),
237    /// Bucket the data by time and tags
238    Bucket(BucketBy),
239    /// Rename the metric
240    As(As),
241}
242
243/// Values for directives
244#[cfg_attr(feature = "wasm", tsify::declare)]
245#[cfg_attr(feature = "bincode", derive(bincode::Encode, bincode::Decode))]
246#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)]
247pub enum DirectiveValue {
248    /// Directive with a ident value
249    Ident(String),
250    /// Directive with a literal value
251    Int(i64),
252    /// Directive with a float value
253    Float(f64),
254    /// Directive with a string value
255    String(String),
256    /// Directive with a boolean value
257    Bool(bool),
258    /// Directive with no value
259    None,
260}
261
262impl DirectiveValue {
263    /// Ident value
264    #[must_use]
265    pub fn as_ident(&self) -> Option<&str> {
266        match self {
267            DirectiveValue::Ident(ident) => Some(ident),
268            _ => None,
269        }
270    }
271    /// Int value
272    #[must_use]
273    pub fn as_int(&self) -> Option<i64> {
274        match self {
275            DirectiveValue::Int(int) => Some(*int),
276            _ => None,
277        }
278    }
279    /// Float value
280    #[must_use]
281    pub fn as_float(&self) -> Option<f64> {
282        match self {
283            DirectiveValue::Float(float) => Some(*float),
284            _ => None,
285        }
286    }
287    /// String value
288    #[must_use]
289    pub fn as_string(&self) -> Option<&str> {
290        match self {
291            DirectiveValue::String(string) => Some(string),
292            _ => None,
293        }
294    }
295    /// Bool value
296    #[must_use]
297    pub fn as_bool(&self) -> Option<bool> {
298        match self {
299            DirectiveValue::Bool(bool) => Some(*bool),
300            _ => None,
301        }
302    }
303    /// Tests if value is None
304    #[must_use]
305    pub fn is_none(&self) -> bool {
306        matches!(self, DirectiveValue::None)
307    }
308    /// Tests if value is Some
309    #[must_use]
310    pub fn is_some(&self) -> bool {
311        !self.is_none()
312    }
313}
314
315/// Types for params.
316#[cfg_attr(feature = "wasm", tsify::declare)]
317#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
318pub enum ParamType {
319    /// Duration (e.g. 25s)
320    Duration,
321    /// Dataset
322    Dataset,
323    /// Regex
324    Regex,
325    /// A tag value type
326    Tag(TagType),
327}
328impl std::fmt::Display for ParamType {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        match self {
331            ParamType::Dataset => write!(f, "dataset"),
332            ParamType::Duration => write!(f, "duration"),
333            ParamType::Regex => write!(f, "regex"),
334            ParamType::Tag(t) => t.fmt(f),
335        }
336    }
337}
338
339/// Types for params.
340#[cfg_attr(feature = "wasm", tsify::declare)]
341#[cfg_attr(feature = "bincode", derive(bincode::Encode, bincode::Decode))]
342#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
343pub enum TagType {
344    /// String
345    String,
346    /// Int
347    Int,
348    /// Float
349    Float,
350    /// Bool
351    Bool,
352    /// None / Null value
353    None,
354}
355impl std::fmt::Display for TagType {
356    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357        write!(
358            f,
359            "{}",
360            match self {
361                TagType::String => "string",
362                TagType::Int => "int",
363                TagType::Float => "float",
364                TagType::Bool => "bool",
365                TagType::None => "null",
366            }
367        )
368    }
369}
370
371/// Directives given to adjust the behavior of the runtime
372#[cfg_attr(feature = "wasm", tsify::declare)]
373pub type Directives = HashMap<String, DirectiveValue>;
374
375/// A param.
376#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
377#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
378pub struct Param {
379    /// The location of the param
380    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
381    pub span: SourceSpan,
382    /// The name of the param
383    pub name: String,
384    /// The type of the param
385    pub typ: ParamType,
386}
387
388/// A param value.
389#[derive(Debug, Clone, PartialEq)]
390pub enum ParamValue {
391    /// Dataset
392    Dataset(Dataset),
393    /// Duration
394    Duration(RelativeTime),
395    /// String
396    String(String),
397    /// Int
398    Int(i64),
399    /// Float
400    Float(f64),
401    /// Bool
402    Bool(bool),
403    /// Regex
404    Regex(EncodableRegex),
405}
406
407impl ParamValue {
408    /// Get the type of the param value.
409    #[must_use]
410    pub fn typ(&self) -> ParamType {
411        match self {
412            ParamValue::Dataset(_) => ParamType::Dataset,
413            ParamValue::Duration(_) => ParamType::Duration,
414            ParamValue::Regex(_) => ParamType::Regex,
415            ParamValue::String(_) => ParamType::Tag(TagType::String),
416            ParamValue::Int(_) => ParamType::Tag(TagType::Int),
417            ParamValue::Float(_) => ParamType::Tag(TagType::Float),
418            ParamValue::Bool(_) => ParamType::Tag(TagType::Bool),
419        }
420    }
421}
422
423/// The param provided to the query.
424#[derive(Debug, Clone, PartialEq)]
425pub struct ProvidedParam {
426    /// The name of the param.
427    pub name: String,
428    /// The value.
429    pub value: ParamValue,
430}
431
432impl ProvidedParam {
433    /// Create a new `ProvidedParam`.
434    pub fn new(name: impl Into<String>, value: ParamValue) -> Self {
435        Self {
436            name: name.into(),
437            value,
438        }
439    }
440}
441
442/// A smol wrapper around `Vec<ProvidedParam>` for easier use.
443#[derive(Debug, Clone, Default)]
444pub struct ProvidedParams {
445    inner: Vec<ProvidedParam>,
446}
447
448/// The error returned from `ProvidedParams::resolve`.
449#[derive(Debug, thiserror::Error)]
450pub enum ResolveError {
451    /// Param not provided
452    #[error("Param ${0} was not provided to the query")]
453    ParamNotProvided(String),
454    /// Invalid type
455    #[error(
456        "Param ${name} is defined as `{defined}`, but was used in a context that expected one of: {}",
457        expected.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
458    )]
459    InvalidType {
460        /// Name of the param
461        name: String,
462        /// Type of the param
463        defined: ParamType,
464        /// The type that is valid in the context it was used
465        expected: Vec<ParamType>,
466    },
467    /// Shared string error
468    #[error("Shared string error: {0}")]
469    SharedString(#[from] strumbra::Error),
470}
471
472/// The error returned from `ProvidedParams::parse`.
473#[derive(Debug, thiserror::Error)]
474pub enum ParseProvidedParamsError {
475    /// Parse failed
476    #[error("Failed to parse the value for ${param_name} as {expected_type}: {err}")]
477    ParseParam {
478        /// Param name
479        param_name: String,
480        /// Expected t ype
481        expected_type: ParamType,
482        /// Parse param error
483        err: ParseParamError,
484    },
485    /// Params provided more than once
486    #[error("These params were provided more than once: {}", .0.join(", "))]
487    ParamsProvidedMoreThanOnce(Vec<String>),
488    /// Params declared but not provided
489    #[error("The following params were declared but not provided: {}", .0.join(", "))]
490    ParamsDeclaredButNotProvided(Vec<String>),
491    /// Too many params provided
492    #[error("The number of params provided exceeds the upper limit of {0}")]
493    TooManyParamsProvided(usize),
494}
495
496/// Warnings we want to surface to the user instead of failing the request.
497#[derive(Debug, Default)]
498pub struct Warnings {
499    inner: Vec<String>,
500}
501
502impl Warnings {
503    /// Create a new warnings structure.
504    #[must_use]
505    pub fn new() -> Self {
506        Self::default()
507    }
508
509    /// Add a new warning.
510    pub fn push(&mut self, warning: impl Into<String>) {
511        self.inner.push(warning.into());
512    }
513
514    /// Returns true if there are no warnings.
515    #[must_use]
516    pub fn is_empty(&self) -> bool {
517        self.inner.is_empty()
518    }
519
520    /// Get the warnings as slice.
521    #[must_use]
522    pub fn as_slice(&self) -> &[String] {
523        &self.inner
524    }
525
526    /// Turn into a vector.
527    #[must_use]
528    pub fn into_vec(self) -> Vec<String> {
529        self.inner
530    }
531}
532
533impl ProvidedParams {
534    /// Create a new `ProvidedParams` struct.
535    #[must_use]
536    pub fn new(inner: Vec<ProvidedParam>) -> Self {
537        Self { inner }
538    }
539
540    /// Parse params from a hashmap of query parameters.
541    /// This will only look at params that start with `param__` and it'll use
542    /// the parser definitions to extract the values.
543    pub fn parse_and_validate(
544        mpl_params: &Params,
545        query_params: &[(String, String)],
546    ) -> Result<(Self, Warnings), ParseProvidedParamsError> {
547        const PREFIX: &str = "param__";
548        const PARAM_COUNT_LIMIT: usize = 128;
549
550        let mut warnings = Warnings::new();
551        let mut defined_more_than_once = HashSet::new();
552        let mut provided_but_not_declared = HashSet::new();
553        let mut seen = HashSet::new();
554
555        let params = query_params
556            .iter()
557            .filter_map(|(name, value)| {
558                if !name.starts_with(PREFIX) {
559                    return None;
560                }
561                let name = name.trim_start_matches(PREFIX);
562                if name.is_empty() {
563                    return None;
564                }
565
566                Some((name, value))
567            })
568            .take(PARAM_COUNT_LIMIT + 1)
569            .collect::<Vec<(&str, &String)>>();
570
571        // we don't support unlimited params
572        if params.len() > PARAM_COUNT_LIMIT {
573            return Err(ParseProvidedParamsError::TooManyParamsProvided(
574                PARAM_COUNT_LIMIT,
575            ));
576        }
577
578        let mut provided_params = Vec::new();
579        for (name, value) in params {
580            if seen.contains(name) {
581                // uh oh, we've already seen this value
582                defined_more_than_once.insert(name);
583                continue;
584            }
585            seen.insert(name);
586
587            // is the param even declared?
588            let Some(mpl_param) = mpl_params.iter().find(|p| p.name == name) else {
589                provided_but_not_declared.insert(name);
590                continue;
591            };
592
593            // parse mpl
594            let parsed = MPLParser::parse(Rule::param_value, value).map_err(|err| {
595                ParseProvidedParamsError::ParseParam {
596                    param_name: name.to_string(),
597                    expected_type: mpl_param.typ,
598                    err: ParseParamError::Parse(ParseError::from(err)),
599                }
600            })?;
601
602            // parse as correct type
603            let value = parser::parse_param_value(mpl_param, parsed).map_err(|err| {
604                ParseProvidedParamsError::ParseParam {
605                    param_name: name.to_string(),
606                    expected_type: mpl_param.typ,
607                    err,
608                }
609            })?;
610
611            provided_params.push(ProvidedParam {
612                name: name.to_string(),
613                value,
614            });
615        }
616
617        if !provided_but_not_declared.is_empty() {
618            // sort for consistency
619            let mut items = provided_but_not_declared
620                .into_iter()
621                .map(|p| format!("${p}"))
622                .collect::<Vec<String>>();
623            items.sort();
624
625            // add to warnings, no need to error
626            warnings.push(format!(
627                "These params were provided but not declared: {}",
628                items.join(", ")
629            ));
630        }
631
632        if !defined_more_than_once.is_empty() {
633            // sort for consistency
634            let mut items = defined_more_than_once
635                .into_iter()
636                .map(String::from)
637                .collect::<Vec<String>>();
638            items.sort();
639
640            return Err(ParseProvidedParamsError::ParamsProvidedMoreThanOnce(items));
641        }
642
643        let declared_param_names = mpl_params
644            .iter()
645            .map(|p| p.name.as_str())
646            .collect::<HashSet<&str>>();
647        let declared_but_not_provided = declared_param_names
648            .difference(&seen)
649            .collect::<Vec<&&str>>();
650        if !declared_but_not_provided.is_empty() {
651            // sort for consistency
652            let mut items = declared_but_not_provided
653                .into_iter()
654                .map(|s| String::from(*s))
655                .collect::<Vec<String>>();
656            items.sort();
657
658            return Err(ParseProvidedParamsError::ParamsDeclaredButNotProvided(
659                items,
660            ));
661        }
662
663        Ok((ProvidedParams::new(provided_params), warnings))
664    }
665
666    /// Return a ref to the inner value.
667    #[must_use]
668    pub fn as_slice(&self) -> &[ProvidedParam] {
669        self.inner.as_slice()
670    }
671
672    fn get_param(&self, name: &str) -> Result<&ProvidedParam, ResolveError> {
673        self.inner
674            .iter()
675            .find(|p| p.name == name)
676            .ok_or(ResolveError::ParamNotProvided(name.to_string()))
677    }
678
679    /// Resolve a `TagValue`.
680    pub fn resolve_tag_value(&self, pv: Parameterized<TagValue>) -> Result<TagValue, ResolveError> {
681        let param = match pv {
682            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
683            Parameterized::Param { span: _, param } => param,
684        };
685
686        let provided_param = self.get_param(&param.name)?;
687        match &provided_param.value {
688            ParamValue::String(val) => Ok(TagValue::String(SharedString::try_from(val)?)),
689            ParamValue::Int(val) => Ok(TagValue::Int(*val)),
690            ParamValue::Float(val) => Ok(TagValue::Float(*val)),
691            ParamValue::Bool(val) => Ok(TagValue::Bool(*val)),
692            val => Err(ResolveError::InvalidType {
693                name: param.name,
694                defined: val.typ(),
695                expected: vec![
696                    ParamType::Tag(TagType::String),
697                    ParamType::Tag(TagType::Int),
698                    ParamType::Tag(TagType::Float),
699                    ParamType::Tag(TagType::Bool),
700                ],
701            }),
702        }
703    }
704
705    /// Resolve a `Dataset`.
706    pub fn resolve_dataset(&self, pv: Parameterized<Dataset>) -> Result<Dataset, ResolveError> {
707        let param = match pv {
708            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
709            Parameterized::Param { span: _, param } => param,
710        };
711
712        let provided_param = self.get_param(&param.name)?;
713        match &provided_param.value {
714            ParamValue::Dataset(dataset) => Ok(dataset.clone()),
715            val => Err(ResolveError::InvalidType {
716                name: param.name,
717                defined: val.typ(),
718                expected: vec![ParamType::Dataset],
719            }),
720        }
721    }
722
723    /// Resolve a `RelativeTime`, aka duration.
724    pub fn resolve_relative_time(
725        &self,
726        pv: Parameterized<RelativeTime>,
727    ) -> Result<RelativeTime, ResolveError> {
728        let param = match pv {
729            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
730            Parameterized::Param { span: _, param } => param,
731        };
732
733        let provided_param = self.get_param(&param.name)?;
734        match &provided_param.value {
735            ParamValue::Duration(relative_time) => Ok(relative_time.clone()),
736            val => Err(ResolveError::InvalidType {
737                name: param.name,
738                defined: val.typ(),
739                expected: vec![ParamType::Duration],
740            }),
741        }
742    }
743
744    /// Resolve a regex.
745    pub fn resolve_regex(
746        &self,
747        pv: Parameterized<EncodableRegex>,
748    ) -> Result<EncodableRegex, ResolveError> {
749        let param = match pv {
750            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
751            Parameterized::Param { span: _, param } => param,
752        };
753
754        let provided_param = self.get_param(&param.name)?;
755        match &provided_param.value {
756            ParamValue::Regex(re) => Ok(re.clone()),
757            val => Err(ResolveError::InvalidType {
758                name: param.name,
759                defined: val.typ(),
760                expected: vec![ParamType::Regex],
761            }),
762        }
763    }
764}
765
766/// Parameters that will be set externally.
767#[cfg_attr(feature = "wasm", tsify::declare)]
768pub type Params = Vec<Param>;
769
770/// A Query AST representing a query in the `MPL` language
771#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
772#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
773#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
774pub enum Query {
775    /// A simple query that will produce a result
776    Simple {
777        /// The source of the data
778        source: Source,
779        /// The filters to apply to the data
780        filters: Vec<Filter>,
781        /// The aggregates to apply to the data
782        aggregates: Vec<Aggregate>,
783        /// The directives
784        directives: Directives,
785        /// The params
786        params: Params,
787        /// How to sample series
788        sample: Option<f64>,
789    },
790    /// A compute query taking the input of two queries and producing a by computing combined values
791    Compute {
792        /// The left hand side query to compute
793        left: Box<Query>,
794        /// The right hand side query to compute
795        right: Box<Query>,
796        /// The name of the metric to produce
797        name: Metric,
798        /// The compute operation used to combine the left and right queries
799        op: ComputeFunction,
800        /// The aggregates to apply to the combined data
801        aggregates: Vec<Aggregate>,
802        /// The directives
803        directives: Directives,
804        /// The params
805        params: Params,
806    },
807}
808
809impl Query {
810    /// Get a ref to the params of the query.
811    #[must_use]
812    pub fn params(&self) -> &Params {
813        match self {
814            Query::Simple { params, .. } | Query::Compute { params, .. } => params,
815        }
816    }
817    /// Get a ref to the directives of the query.
818    #[must_use]
819    pub fn directives(&self) -> &Directives {
820        match self {
821            Query::Simple { directives, .. } | Query::Compute { directives, .. } => directives,
822        }
823    }
824}
825
826impl RelativeTime {
827    /// Converts a relative time to a `Duration`
828    pub fn to_duration(&self) -> Result<Duration, TimeError> {
829        let v = i64::try_from(self.value).map_err(TimeError::InvalidDuration)?;
830        Ok(match self.unit {
831            TimeUnit::Millisecond => Duration::milliseconds(v),
832            TimeUnit::Second => Duration::seconds(v),
833            TimeUnit::Minute => Duration::minutes(v),
834            TimeUnit::Hour => Duration::hours(v),
835            TimeUnit::Day => Duration::days(v),
836            TimeUnit::Week => Duration::weeks(v),
837            TimeUnit::Month => Duration::days(v.saturating_mul(30)),
838            TimeUnit::Year => Duration::days(v.saturating_mul(365)),
839        })
840    }
841
842    /// Converts a relative time to a `Resolution`
843    pub fn to_resolution(&self) -> Result<Resolution, ResolutionError> {
844        match self.unit {
845            TimeUnit::Millisecond => Resolution::secs(self.value / 1000),
846            TimeUnit::Second => Resolution::secs(self.value),
847            TimeUnit::Minute => Resolution::secs(self.value.saturating_mul(60)),
848            TimeUnit::Hour => Resolution::secs(self.value.saturating_mul(60 * 60)),
849            TimeUnit::Day => Resolution::secs(self.value.saturating_mul(60 * 60 * 24)),
850            TimeUnit::Week => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 7)),
851            TimeUnit::Month => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 30)),
852            TimeUnit::Year => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 365)),
853        }
854    }
855}
856
857/// An error that can occur when converting a time value.
858#[derive(Debug, thiserror::Error)]
859pub enum TimeError {
860    /// Invalid timestamp could not be converted to a UTC datetime
861    #[error("Invalid timestamp {0}, could not be converted to a UTC datetime")]
862    InvalidTimestamp(i64),
863    /// Invalid duration could not be converted to Duration as it exceeds the maximum i64
864    #[error(
865        "Invalid duration {0}, could not be converted to Duration as it exceeds the maximum i64"
866    )]
867    InvalidDuration(TryFromIntError),
868}
869#[cfg(feature = "clock")]
870impl Time {
871    fn to_datetime(&self) -> Result<DateTime<Utc>, TimeError> {
872        Ok(match self {
873            Time::Relative(t) => Utc::now() - t.to_duration()?,
874            Time::Timestamp(ts) => {
875                DateTime::<Utc>::from_timestamp(*ts, 0).ok_or(TimeError::InvalidTimestamp(*ts))?
876            }
877            Time::RFC3339(t) => t.with_timezone(&Utc),
878            Time::Modifier(_) => todo!(),
879        })
880    }
881}
882
883#[cfg(feature = "clock")]
884impl TimeRange {
885    /// Converts a time range to a start and pair
886    pub fn to_start_end(&self) -> Result<(DateTime<Utc>, DateTime<Utc>), TimeError> {
887        let start = self.start.to_datetime()?;
888        let end = self
889            .end
890            .as_ref()
891            .map_or_else(|| Ok(Utc::now()), Time::to_datetime)?;
892        Ok((start, end))
893    }
894}