Skip to main content

mpl_lang/
query.rs

1//! The query structures
2use std::{
3    collections::{HashMap, HashSet},
4    num::TryFromIntError,
5};
6
7#[cfg(feature = "clock")]
8use chrono::Utc;
9use chrono::{DateTime, Duration, FixedOffset};
10use miette::SourceSpan;
11use pest::Parser as _;
12use strumbra::SharedString;
13
14use crate::{
15    ParseError,
16    enc_regex::EncodableRegex,
17    linker::{AlignFunction, ComputeFunction, GroupFunction, MapFunction},
18    parser::{self, MPLParser, ParseParamError, Rule},
19    tags::TagValue,
20    time::{Resolution, ResolutionError},
21    types::{BucketSpec, BucketType, Dataset, Metric, Parameterized},
22};
23
24mod fmt;
25#[cfg(test)]
26mod tests;
27
28/// Metric identifier
29#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
30#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
31#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
32pub struct MetricId {
33    /// The dataset identifier or param
34    pub dataset: Parameterized<Dataset>,
35    /// The metric identifier
36    pub metric: Metric,
37}
38
39/// Time unit
40#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
41#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
42#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
43pub enum TimeUnit {
44    /// Millisecond
45    Millisecond,
46    /// Second
47    Second,
48    /// Minute
49    Minute,
50    /// Hour
51    Hour,
52    /// Day
53    Day,
54    /// Week
55    Week,
56    /// Month
57    Month,
58    /// Year
59    Year,
60}
61
62#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
63/// Relative time (1h)
64#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
65#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
66pub struct RelativeTime {
67    /// Value
68    pub value: u64,
69    /// Unit
70    pub unit: TimeUnit,
71}
72
73/// A point in time
74#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
75#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
76#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
77pub enum Time {
78    /// A time relative to now
79    Relative(RelativeTime),
80    /// A timestamp
81    Timestamp(i64),
82    /// A RFC3339 timestamp
83    RFC3339(#[cfg_attr(feature = "wasm", tsify(type = "string"))] DateTime<FixedOffset>),
84    /// A time modifier
85    Modifier(String),
86}
87
88/// A timerange between two times
89#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
90#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
91#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
92pub struct TimeRange {
93    /// Start time of the range
94    pub start: Time,
95    /// End time of the range or None for 'now'
96    pub end: Option<Time>,
97}
98
99/// The source for a query
100#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
101#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
102#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
103pub struct Source {
104    /// The metric
105    pub metric_id: MetricId,
106    /// The time range
107    pub time: Option<TimeRange>,
108}
109
110/// An error related to value parsing
111#[derive(Debug, thiserror::Error)]
112pub enum ValueError {
113    /// Invalid float value
114    #[error("Invalid Float")]
115    BadFloat,
116}
117
118/// A comparison operator for filtering based on a value
119#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
120#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
121#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
122pub enum Cmp {
123    /// Equal to the given value
124    Eq(Parameterized<TagValue>),
125    /// Not equal to the given value
126    Ne(Parameterized<TagValue>),
127    /// Greater than the given value
128    Gt(Parameterized<TagValue>),
129    /// Greater than or equal to the given value
130    Ge(Parameterized<TagValue>),
131    /// Less than the given value
132    Lt(Parameterized<TagValue>),
133    /// Less than or equal to the given value
134    Le(Parameterized<TagValue>),
135    /// Matches the given regular expression
136    RegEx(Parameterized<EncodableRegex>),
137    /// Does not match the given regular expression
138    RegExNot(Parameterized<EncodableRegex>),
139    /// Is the given tag type
140    Is(TagType),
141}
142
143/// Rename the output as a new metric
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
146#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
147pub struct As {
148    /// The new name for the metric
149    pub name: Metric,
150}
151
152/// Filter the series
153#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
154#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
155#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
156pub enum Filter {
157    /// Logical AND of the given filters
158    And(Vec<Filter>),
159    /// Logical OR of the given filters
160    Or(Vec<Filter>),
161    /// Logical NOT of the given filters
162    Not(Box<Filter>),
163    /// Filter based on a field
164    Cmp {
165        /// The field to filter on
166        field: String,
167        /// The comparison to perform
168        rhs: Cmp,
169    },
170}
171
172/// Ifdef conditionally filters the series
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
174#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
175#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
176pub enum FilterOrIfDef {
177    /// A plain filter
178    Filter(Filter),
179    /// ifdef based on a parameter declaration
180    Ifdef {
181        /// The name of the parameter
182        param: ParamDeclaration,
183        /// The filter
184        filter: Filter,
185    },
186}
187
188impl FilterOrIfDef {
189    #[cfg(test)]
190    pub(crate) fn filter(&self) -> &Filter {
191        match self {
192            FilterOrIfDef::Filter(filter) | FilterOrIfDef::Ifdef { filter, .. } => filter,
193        }
194    }
195}
196
197/// A Mapping function
198#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
199#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
200#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
201pub struct Mapping {
202    /// The function to apply
203    pub function: MapFunction,
204    /// The optional argument to pass to the function
205    pub arg: Option<f64>,
206}
207
208/// An Alignment function
209#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
210#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
211#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
212pub struct Align {
213    /// The function to apply
214    pub function: AlignFunction,
215    /// The time to align to
216    pub time: Parameterized<RelativeTime>,
217}
218
219/// A Grouping function
220#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
221#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
222#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
223pub struct GroupBy {
224    /// The location of the group by clause
225    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
226    pub span: SourceSpan,
227    /// The function to apply
228    pub function: GroupFunction,
229    /// The tags to group by
230    pub tags: Vec<String>,
231}
232
233/// A Bucketing function, applying both tag and time based aggregation
234#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
235#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
236#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
237pub struct BucketBy {
238    /// The location of the group by clause
239    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
240    pub span: SourceSpan,
241    /// The function to apply
242    pub function: BucketType,
243    /// The time to align to
244    pub time: Parameterized<RelativeTime>,
245    /// The tags to group by
246    pub tags: Vec<String>,
247    /// The buckets to produce
248    pub spec: Vec<BucketSpec>,
249}
250
251/// Possible aggregate functions
252#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
253#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
254#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
255pub enum Aggregate {
256    /// Map a function over each value
257    Map(Mapping),
258    /// Align the data to a time interval
259    Align(Align),
260    /// Group the data by tags
261    GroupBy(GroupBy),
262    /// Bucket the data by time and tags
263    Bucket(BucketBy),
264    /// Rename the metric
265    As(As),
266}
267
268/// Values for directives
269#[cfg_attr(feature = "wasm", tsify::declare)]
270#[cfg_attr(feature = "bincode", derive(bincode::Encode, bincode::Decode))]
271#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)]
272pub enum DirectiveValue {
273    /// Directive with a ident value
274    Ident(String),
275    /// Directive with a literal value
276    Int(i64),
277    /// Directive with a float value
278    Float(f64),
279    /// Directive with a string value
280    String(String),
281    /// Directive with a boolean value
282    Bool(bool),
283    /// Directive with no value
284    None,
285}
286
287impl DirectiveValue {
288    /// Ident value
289    #[must_use]
290    pub fn as_ident(&self) -> Option<&str> {
291        match self {
292            DirectiveValue::Ident(ident) => Some(ident),
293            _ => None,
294        }
295    }
296    /// Int value
297    #[must_use]
298    pub fn as_int(&self) -> Option<i64> {
299        match self {
300            DirectiveValue::Int(int) => Some(*int),
301            _ => None,
302        }
303    }
304    /// Float value
305    #[must_use]
306    pub fn as_float(&self) -> Option<f64> {
307        match self {
308            DirectiveValue::Float(float) => Some(*float),
309            _ => None,
310        }
311    }
312    /// String value
313    #[must_use]
314    pub fn as_string(&self) -> Option<&str> {
315        match self {
316            DirectiveValue::String(string) => Some(string),
317            _ => None,
318        }
319    }
320    /// Bool value
321    #[must_use]
322    pub fn as_bool(&self) -> Option<bool> {
323        match self {
324            DirectiveValue::Bool(bool) => Some(*bool),
325            _ => None,
326        }
327    }
328    /// Tests if value is None
329    #[must_use]
330    pub fn is_none(&self) -> bool {
331        matches!(self, DirectiveValue::None)
332    }
333    /// Tests if value is Some
334    #[must_use]
335    pub fn is_some(&self) -> bool {
336        !self.is_none()
337    }
338}
339
340/// A parameter type, either Optional or Terminal.
341#[cfg_attr(feature = "wasm", tsify::declare)]
342#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
343pub enum ParamType {
344    /// A type that's defined and present `param p: int`
345    Terminal(TerminalParamType),
346    /// A type that may or may not be present `param p: Option<int>`
347    Optional(TerminalParamType),
348}
349
350impl ParamType {
351    fn is_optional(self) -> bool {
352        matches!(self, ParamType::Optional(_))
353    }
354}
355
356impl std::fmt::Display for ParamType {
357    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358        match self {
359            ParamType::Terminal(t) => t.fmt(f),
360            ParamType::Optional(t) => write!(f, "Option<{t}>"),
361        }
362    }
363}
364
365/// Terminal Types for params.
366#[cfg_attr(feature = "wasm", tsify::declare)]
367#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
368pub enum TerminalParamType {
369    /// Duration (e.g. 25s)
370    Duration,
371    /// Dataset
372    Dataset,
373    /// Regex
374    Regex,
375    /// A tag value type
376    Tag(TagType),
377}
378impl std::fmt::Display for TerminalParamType {
379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380        match self {
381            TerminalParamType::Dataset => write!(f, "Dataset"),
382            TerminalParamType::Duration => write!(f, "Duration"),
383            TerminalParamType::Regex => write!(f, "Regex"),
384            TerminalParamType::Tag(t) => t.fmt(f),
385        }
386    }
387}
388
389/// Types for params.
390#[cfg_attr(feature = "wasm", tsify::declare)]
391#[cfg_attr(feature = "bincode", derive(bincode::Encode, bincode::Decode))]
392#[derive(Clone, Copy, Debug, Hash, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
393pub enum TagType {
394    /// String
395    String,
396    /// Int
397    Int,
398    /// Float
399    Float,
400    /// Bool
401    Bool,
402    /// Null value
403    Null,
404}
405
406#[cfg(feature = "bincode")]
407#[test]
408fn test_renaming_none_to_null_has_no_bincode_side_effects() {
409    let enc = [4];
410    assert_eq!(
411        (TagType::Null, 1),
412        bincode::decode_from_slice(&enc, bincode::config::standard()).expect("it does ...")
413    );
414}
415
416impl std::fmt::Display for TagType {
417    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418        write!(
419            f,
420            "{}",
421            match self {
422                TagType::String => "string",
423                TagType::Int => "int",
424                TagType::Float => "float",
425                TagType::Bool => "bool",
426                TagType::Null => "null",
427            }
428        )
429    }
430}
431
432/// Directives given to adjust the behavior of the runtime
433#[cfg_attr(feature = "wasm", tsify::declare)]
434pub type Directives = HashMap<String, DirectiveValue>;
435
436/// A param.
437#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
438#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
439pub struct ParamDeclaration {
440    /// The location of the param
441    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
442    pub span: SourceSpan,
443    /// The name of the param
444    pub name: String,
445    /// The type of the param
446    pub typ: ParamType,
447}
448
449impl ParamDeclaration {
450    pub(crate) fn typ(&self) -> TerminalParamType {
451        match self.typ {
452            ParamType::Terminal(terminal_param_type) | ParamType::Optional(terminal_param_type) => {
453                terminal_param_type
454            }
455        }
456    }
457
458    pub(crate) fn is_optional(&self) -> bool {
459        self.typ.is_optional()
460    }
461}
462
463/// A param value.
464#[derive(Debug, Clone, PartialEq)]
465pub enum ParamValue {
466    /// Dataset
467    Dataset(Dataset),
468    /// Duration
469    Duration(RelativeTime),
470    /// String
471    String(String),
472    /// Int
473    Int(i64),
474    /// Float
475    Float(f64),
476    /// Bool
477    Bool(bool),
478    /// Regex
479    Regex(EncodableRegex),
480}
481
482impl ParamValue {
483    /// Get the type of the param value.
484    #[must_use]
485    pub fn typ(&self) -> TerminalParamType {
486        match self {
487            ParamValue::Dataset(_) => TerminalParamType::Dataset,
488            ParamValue::Duration(_) => TerminalParamType::Duration,
489            ParamValue::Regex(_) => TerminalParamType::Regex,
490            ParamValue::String(_) => TerminalParamType::Tag(TagType::String),
491            ParamValue::Int(_) => TerminalParamType::Tag(TagType::Int),
492            ParamValue::Float(_) => TerminalParamType::Tag(TagType::Float),
493            ParamValue::Bool(_) => TerminalParamType::Tag(TagType::Bool),
494        }
495    }
496}
497
498/// The param provided to the query.
499#[derive(Debug, Clone, PartialEq)]
500pub struct ProvidedParam {
501    /// The name of the param.
502    pub name: String,
503    /// The value.
504    pub value: ParamValue,
505}
506
507impl ProvidedParam {
508    /// Create a new `ProvidedParam`.
509    pub fn new(name: impl Into<String>, value: ParamValue) -> Self {
510        Self {
511            name: name.into(),
512            value,
513        }
514    }
515}
516
517/// A smol wrapper around `Vec<ProvidedParam>` for easier use.
518#[derive(Debug, Clone, Default)]
519pub struct ProvidedParams {
520    inner: Vec<ProvidedParam>,
521}
522
523/// The error returned from `ProvidedParams::resolve`.
524#[derive(Debug, thiserror::Error)]
525pub enum ResolveError {
526    /// Param not provided
527    #[error("Param ${0} was not provided to the query")]
528    ParamNotProvided(String),
529    /// Invalid type
530    #[error(
531        "Param ${name} is defined as `{defined}`, but was used in a context that expected one of: {}",
532        expected.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
533    )]
534    InvalidType {
535        /// Name of the param
536        name: String,
537        /// Type of the param
538        defined: TerminalParamType,
539        /// The type that is valid in the context it was used
540        expected: Vec<TerminalParamType>,
541    },
542    /// Shared string error
543    #[error("Shared string error: {0}")]
544    SharedString(#[from] strumbra::Error),
545}
546
547/// The error returned from `ProvidedParams::parse`.
548#[derive(Debug, thiserror::Error)]
549pub enum ParseProvidedParamsError {
550    /// Parse failed
551    #[error("Failed to parse the value for ${param_name} as {expected_type}: {err}")]
552    ParseParam {
553        /// Param name
554        param_name: String,
555        /// Expected t ype
556        expected_type: ParamType,
557        /// Parse param error
558        err: ParseParamError,
559    },
560    /// Params provided more than once
561    #[error("These params were provided more than once: {}", .0.join(", "))]
562    ParamsProvidedMoreThanOnce(Vec<String>),
563    /// Params declared but not provided
564    #[error("The following params were declared but not provided: {}", .0.join(", "))]
565    ParamsDeclaredButNotProvided(Vec<String>),
566    /// Too many params provided
567    #[error("The number of params provided exceeds the upper limit of {0}")]
568    TooManyParamsProvided(usize),
569}
570
571/// Warnings we want to surface to the user instead of failing the request.
572#[derive(Debug, Default)]
573pub struct Warnings {
574    inner: Vec<String>,
575}
576
577impl Warnings {
578    /// Create a new warnings structure.
579    #[must_use]
580    pub fn new() -> Self {
581        Self::default()
582    }
583
584    /// Add a new warning.
585    pub fn push(&mut self, warning: impl Into<String>) {
586        self.inner.push(warning.into());
587    }
588
589    /// Returns true if there are no warnings.
590    #[must_use]
591    pub fn is_empty(&self) -> bool {
592        self.inner.is_empty()
593    }
594
595    /// Get the warnings as slice.
596    #[must_use]
597    pub fn as_slice(&self) -> &[String] {
598        &self.inner
599    }
600
601    /// Turn into a vector.
602    #[must_use]
603    pub fn into_vec(self) -> Vec<String> {
604        self.inner
605    }
606}
607
608impl ProvidedParams {
609    /// Create a new `ProvidedParams` struct.
610    #[must_use]
611    pub fn new(inner: Vec<ProvidedParam>) -> Self {
612        Self { inner }
613    }
614
615    /// Parse params from a hashmap of query parameters.
616    /// This will only look at params that start with `param__` and it'll use
617    /// the parser definitions to extract the values.
618    pub fn parse_and_validate(
619        mpl_params: &Params,
620        query_params: &[(String, String)],
621    ) -> Result<(Self, Warnings), ParseProvidedParamsError> {
622        const PREFIX: &str = "param__";
623        const PARAM_COUNT_LIMIT: usize = 128;
624
625        let mut warnings = Warnings::new();
626        let mut defined_more_than_once = HashSet::new();
627        let mut provided_but_not_declared = HashSet::new();
628        let mut seen = HashSet::new();
629
630        let params = query_params
631            .iter()
632            .filter_map(|(name, value)| {
633                if !name.starts_with(PREFIX) {
634                    return None;
635                }
636                let name = name.trim_start_matches(PREFIX);
637                if name.is_empty() {
638                    return None;
639                }
640
641                Some((name, value))
642            })
643            .take(PARAM_COUNT_LIMIT + 1)
644            .collect::<Vec<(&str, &String)>>();
645
646        // we don't support unlimited params
647        if params.len() > PARAM_COUNT_LIMIT {
648            return Err(ParseProvidedParamsError::TooManyParamsProvided(
649                PARAM_COUNT_LIMIT,
650            ));
651        }
652
653        let mut provided_params = Vec::new();
654        for (name, value) in params {
655            if seen.contains(name) {
656                // uh oh, we've already seen this value
657                defined_more_than_once.insert(name);
658                continue;
659            }
660            seen.insert(name);
661
662            // is the param even declared?
663            let Some(mpl_param) = mpl_params.iter().find(|p| p.name == name) else {
664                provided_but_not_declared.insert(name);
665                continue;
666            };
667
668            // parse mpl
669            let parsed = MPLParser::parse(Rule::param_value, value).map_err(|err| {
670                ParseProvidedParamsError::ParseParam {
671                    param_name: name.to_string(),
672                    expected_type: mpl_param.typ,
673                    err: ParseParamError::Parse(ParseError::from(err)),
674                }
675            })?;
676
677            // parse as correct type
678            let value = parser::parse_param_value(mpl_param, parsed).map_err(|err| {
679                ParseProvidedParamsError::ParseParam {
680                    param_name: name.to_string(),
681                    expected_type: mpl_param.typ,
682                    err,
683                }
684            })?;
685
686            provided_params.push(ProvidedParam {
687                name: name.to_string(),
688                value,
689            });
690        }
691
692        if !provided_but_not_declared.is_empty() {
693            // sort for consistency
694            let mut items = provided_but_not_declared
695                .into_iter()
696                .map(|p| format!("${p}"))
697                .collect::<Vec<String>>();
698            items.sort();
699
700            // add to warnings, no need to error
701            warnings.push(format!(
702                "These params were provided but not declared: {}",
703                items.join(", ")
704            ));
705        }
706
707        if !defined_more_than_once.is_empty() {
708            // sort for consistency
709            let mut items = defined_more_than_once
710                .into_iter()
711                .map(String::from)
712                .collect::<Vec<String>>();
713            items.sort();
714
715            return Err(ParseProvidedParamsError::ParamsProvidedMoreThanOnce(items));
716        }
717
718        let declared_param_names = mpl_params
719            .iter()
720            .filter_map(|p| {
721                // Skip optional params since they don't need to be provided.
722                if p.typ.is_optional() {
723                    None
724                } else {
725                    Some(p.name.as_str())
726                }
727            })
728            .collect::<HashSet<&str>>();
729        let declared_but_not_provided = declared_param_names
730            .difference(&seen)
731            .collect::<Vec<&&str>>();
732        if !declared_but_not_provided.is_empty() {
733            // sort for consistency
734            let mut items = declared_but_not_provided
735                .into_iter()
736                .map(|s| String::from(*s))
737                .collect::<Vec<String>>();
738            items.sort();
739
740            return Err(ParseProvidedParamsError::ParamsDeclaredButNotProvided(
741                items,
742            ));
743        }
744
745        Ok((ProvidedParams::new(provided_params), warnings))
746    }
747
748    /// Return a ref to the inner value.
749    #[must_use]
750    pub fn as_slice(&self) -> &[ProvidedParam] {
751        self.inner.as_slice()
752    }
753
754    fn get_param(&self, name: &str) -> Result<&ProvidedParam, ResolveError> {
755        self.inner
756            .iter()
757            .find(|p| p.name == name)
758            .ok_or(ResolveError::ParamNotProvided(name.to_string()))
759    }
760
761    /// Resolve a `TagValue`.
762    pub fn resolve_tag_value(&self, pv: Parameterized<TagValue>) -> Result<TagValue, ResolveError> {
763        let param = match pv {
764            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
765            Parameterized::Param { span: _, param } => param,
766        };
767
768        let provided_param = self.get_param(&param.name)?;
769        match &provided_param.value {
770            ParamValue::String(val) => Ok(TagValue::String(SharedString::try_from(val)?)),
771            ParamValue::Int(val) => Ok(TagValue::Int(*val)),
772            ParamValue::Float(val) => Ok(TagValue::Float(*val)),
773            ParamValue::Bool(val) => Ok(TagValue::Bool(*val)),
774            val => Err(ResolveError::InvalidType {
775                name: param.name,
776                defined: val.typ(),
777                expected: vec![
778                    TerminalParamType::Tag(TagType::String),
779                    TerminalParamType::Tag(TagType::Int),
780                    TerminalParamType::Tag(TagType::Float),
781                    TerminalParamType::Tag(TagType::Bool),
782                ],
783            }),
784        }
785    }
786
787    /// Resolve a `Dataset`.
788    pub fn resolve_dataset(&self, pv: Parameterized<Dataset>) -> Result<Dataset, ResolveError> {
789        let param = match pv {
790            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
791            Parameterized::Param { span: _, param } => param,
792        };
793
794        let provided_param = self.get_param(&param.name)?;
795        match &provided_param.value {
796            ParamValue::Dataset(dataset) => Ok(dataset.clone()),
797            val => Err(ResolveError::InvalidType {
798                name: param.name,
799                defined: val.typ(),
800                expected: vec![TerminalParamType::Dataset],
801            }),
802        }
803    }
804
805    /// Resolve a `RelativeTime`, aka duration.
806    pub fn resolve_relative_time(
807        &self,
808        pv: Parameterized<RelativeTime>,
809    ) -> Result<RelativeTime, ResolveError> {
810        let param = match pv {
811            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
812            Parameterized::Param { span: _, param } => param,
813        };
814
815        let provided_param = self.get_param(&param.name)?;
816        match &provided_param.value {
817            ParamValue::Duration(relative_time) => Ok(relative_time.clone()),
818            val => Err(ResolveError::InvalidType {
819                name: param.name,
820                defined: val.typ(),
821                expected: vec![TerminalParamType::Duration],
822            }),
823        }
824    }
825
826    /// Resolve a regex.
827    pub fn resolve_regex(
828        &self,
829        pv: Parameterized<EncodableRegex>,
830    ) -> Result<EncodableRegex, ResolveError> {
831        let param = match pv {
832            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
833            Parameterized::Param { span: _, param } => param,
834        };
835
836        let provided_param = self.get_param(&param.name)?;
837        match &provided_param.value {
838            ParamValue::Regex(re) => Ok(re.clone()),
839            val => Err(ResolveError::InvalidType {
840                name: param.name,
841                defined: val.typ(),
842                expected: vec![TerminalParamType::Regex],
843            }),
844        }
845    }
846    /// Checks if a param was provided
847    #[must_use]
848    pub fn contains(&self, param: &str) -> bool {
849        self.get_param(param).is_ok()
850    }
851
852    /// Returns the filter when it should be applied for these params.
853    ///
854    /// Plain filters are always active. `ifdef` filters are active only when
855    /// their guarding optional param was provided by the caller.
856    #[must_use]
857    pub fn active_filter<'a>(&self, filter: &'a FilterOrIfDef) -> Option<&'a Filter> {
858        match filter {
859            FilterOrIfDef::Filter(filter) => Some(filter),
860            FilterOrIfDef::Ifdef { param, filter } if self.contains(&param.name) => Some(filter),
861            FilterOrIfDef::Ifdef { .. } => None,
862        }
863    }
864
865    /// Returns filters that should be applied for these params, preserving order.
866    #[must_use]
867    pub fn active_filters<'a>(&self, filters: &'a [FilterOrIfDef]) -> Vec<&'a Filter> {
868        filters
869            .iter()
870            .filter_map(|filter| self.active_filter(filter))
871            .collect()
872    }
873}
874
875/// Parameters that will be set externally.
876#[cfg_attr(feature = "wasm", tsify::declare)]
877pub type Params = Vec<ParamDeclaration>;
878
879/// A Query AST representing a query in the `MPL` language
880#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
881#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
882#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
883pub enum Query {
884    /// A simple query that will produce a result
885    Simple {
886        /// The source of the data
887        source: Source,
888        /// The filters to apply to the data
889        filters: Vec<FilterOrIfDef>,
890        /// The aggregates to apply to the data
891        aggregates: Vec<Aggregate>,
892        /// The directives
893        directives: Directives,
894        /// The params
895        params: Params,
896        /// How to sample series
897        sample: Option<f64>,
898    },
899    /// A compute query taking the input of two queries and producing a by computing combined values
900    Compute {
901        /// The left hand side query to compute
902        left: Box<Query>,
903        /// The right hand side query to compute
904        right: Box<Query>,
905        /// The name of the metric to produce
906        name: Metric,
907        /// The compute operation used to combine the left and right queries
908        op: ComputeFunction,
909        /// The aggregates to apply to the combined data
910        aggregates: Vec<Aggregate>,
911        /// The directives
912        directives: Directives,
913        /// The params
914        params: Params,
915    },
916}
917
918impl Query {
919    /// Get a ref to the params of the query.
920    #[must_use]
921    pub fn params(&self) -> &Params {
922        match self {
923            Query::Simple { params, .. } | Query::Compute { params, .. } => params,
924        }
925    }
926    /// Get a ref to the directives of the query.
927    #[must_use]
928    pub fn directives(&self) -> &Directives {
929        match self {
930            Query::Simple { directives, .. } | Query::Compute { directives, .. } => directives,
931        }
932    }
933}
934
935impl RelativeTime {
936    /// Converts a relative time to a `Duration`
937    pub fn to_duration(&self) -> Result<Duration, TimeError> {
938        let v = i64::try_from(self.value).map_err(TimeError::InvalidDuration)?;
939        Ok(match self.unit {
940            TimeUnit::Millisecond => Duration::milliseconds(v),
941            TimeUnit::Second => Duration::seconds(v),
942            TimeUnit::Minute => Duration::minutes(v),
943            TimeUnit::Hour => Duration::hours(v),
944            TimeUnit::Day => Duration::days(v),
945            TimeUnit::Week => Duration::weeks(v),
946            TimeUnit::Month => Duration::days(v.saturating_mul(30)),
947            TimeUnit::Year => Duration::days(v.saturating_mul(365)),
948        })
949    }
950
951    /// Converts a relative time to a `Resolution`
952    pub fn to_resolution(&self) -> Result<Resolution, ResolutionError> {
953        match self.unit {
954            TimeUnit::Millisecond => Resolution::secs(self.value / 1000),
955            TimeUnit::Second => Resolution::secs(self.value),
956            TimeUnit::Minute => Resolution::secs(self.value.saturating_mul(60)),
957            TimeUnit::Hour => Resolution::secs(self.value.saturating_mul(60 * 60)),
958            TimeUnit::Day => Resolution::secs(self.value.saturating_mul(60 * 60 * 24)),
959            TimeUnit::Week => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 7)),
960            TimeUnit::Month => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 30)),
961            TimeUnit::Year => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 365)),
962        }
963    }
964}
965
966/// An error that can occur when converting a time value.
967#[derive(Debug, thiserror::Error)]
968pub enum TimeError {
969    /// Invalid timestamp could not be converted to a UTC datetime
970    #[error("Invalid timestamp {0}, could not be converted to a UTC datetime")]
971    InvalidTimestamp(i64),
972    /// Invalid duration could not be converted to Duration as it exceeds the maximum i64
973    #[error(
974        "Invalid duration {0}, could not be converted to Duration as it exceeds the maximum i64"
975    )]
976    InvalidDuration(TryFromIntError),
977}
978#[cfg(feature = "clock")]
979impl Time {
980    fn to_datetime(&self) -> Result<DateTime<Utc>, TimeError> {
981        Ok(match self {
982            Time::Relative(t) => Utc::now() - t.to_duration()?,
983            Time::Timestamp(ts) => {
984                DateTime::<Utc>::from_timestamp(*ts, 0).ok_or(TimeError::InvalidTimestamp(*ts))?
985            }
986            Time::RFC3339(t) => t.with_timezone(&Utc),
987            Time::Modifier(_) => todo!(),
988        })
989    }
990}
991
992#[cfg(feature = "clock")]
993impl TimeRange {
994    /// Converts a time range to a start and pair
995    pub fn to_start_end(&self) -> Result<(DateTime<Utc>, DateTime<Utc>), TimeError> {
996        let start = self.start.to_datetime()?;
997        let end = self
998            .end
999            .as_ref()
1000            .map_or_else(|| Ok(Utc::now()), Time::to_datetime)?;
1001        Ok((start, end))
1002    }
1003}