Skip to main content

mpl_lang/
query.rs

1//! The query structures
2use std::{
3    collections::{HashMap, HashSet},
4    fmt::Display,
5    num::TryFromIntError,
6};
7
8#[cfg(feature = "clock")]
9use chrono::Utc;
10use chrono::{DateTime, Duration, FixedOffset};
11use miette::SourceSpan;
12use pest::Parser as _;
13use strumbra::SharedString;
14
15use crate::{
16    ParseError,
17    enc_regex::EncodableRegex,
18    linker::{AlignFunction, ComputeFunction, GroupFunction, MapFunction},
19    parser::{self, MPLParser, ParseParamError, Rule},
20    tags::TagValue,
21    time::{Resolution, ResolutionError},
22    types::{BucketSpec, BucketType, Dataset, Metric, Parameterized},
23};
24
25mod fmt;
26#[cfg(test)]
27mod tests;
28
29/// Metric identifier
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
31#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
32#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
33pub struct MetricId {
34    /// The dataset identifier or param
35    pub dataset: Parameterized<Dataset>,
36    /// The metric identifier
37    pub metric: Metric,
38}
39
40/// Time unit
41#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
42#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
43#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
44pub enum TimeUnit {
45    /// Millisecond
46    Millisecond,
47    /// Second
48    Second,
49    /// Minute
50    Minute,
51    /// Hour
52    Hour,
53    /// Day
54    Day,
55    /// Week
56    Week,
57    /// Month
58    Month,
59    /// Year
60    Year,
61}
62
63#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
64/// Relative time (1h)
65#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
66#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
67pub struct RelativeTime {
68    /// Value
69    pub value: u64,
70    /// Unit
71    pub unit: TimeUnit,
72}
73
74/// A point in time
75#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
76#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
77#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
78pub enum Time {
79    /// A time relative to now
80    Relative(RelativeTime),
81    /// A timestamp
82    Timestamp(i64),
83    /// A RFC3339 timestamp
84    RFC3339(#[cfg_attr(feature = "wasm", tsify(type = "string"))] DateTime<FixedOffset>),
85    /// A time modifier
86    Modifier(String),
87}
88
89/// A timerange between two times
90#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
91#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
92#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
93pub struct TimeRange {
94    /// Start time of the range
95    pub start: Time,
96    /// End time of the range or None for 'now'
97    pub end: Option<Time>,
98}
99
100/// The source for a query
101#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
102#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
103#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
104pub struct Source {
105    /// The metric
106    pub metric_id: MetricId,
107    /// The time range
108    pub time: Option<TimeRange>,
109}
110impl Source {
111    fn time(&self) -> Option<&TimeRange> {
112        self.time.as_ref()
113    }
114}
115
116/// An error related to value parsing
117#[derive(Debug, thiserror::Error)]
118pub enum ValueError {
119    /// Invalid float value
120    #[error("Invalid Float")]
121    BadFloat,
122}
123
124/// A comparison operator for filtering based on a value
125#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
126#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
127#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
128pub enum Cmp {
129    /// Equal to the given value
130    Eq(Parameterized<TagValue>),
131    /// Not equal to the given value
132    Ne(Parameterized<TagValue>),
133    /// Greater than the given value
134    Gt(Parameterized<TagValue>),
135    /// Greater than or equal to the given value
136    Ge(Parameterized<TagValue>),
137    /// Less than the given value
138    Lt(Parameterized<TagValue>),
139    /// Less than or equal to the given value
140    Le(Parameterized<TagValue>),
141    /// Matches the given regular expression
142    RegEx(Parameterized<EncodableRegex>),
143    /// Does not match the given regular expression
144    RegExNot(Parameterized<EncodableRegex>),
145    /// Is the given tag type
146    Is(TagType),
147}
148
149/// Rename the output as a new metric
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
151#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
152#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
153pub struct As {
154    /// The new name for the metric
155    pub name: Metric,
156}
157
158/// Filter the series
159#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
160#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
161#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
162pub enum Filter {
163    /// Logical AND of the given filters
164    And(Vec<Filter>),
165    /// Logical OR of the given filters
166    Or(Vec<Filter>),
167    /// Logical NOT of the given filters
168    Not(Box<Filter>),
169    /// Filter based on a field
170    Cmp {
171        /// The field to filter on
172        field: String,
173        /// The comparison to perform
174        rhs: Cmp,
175    },
176}
177
178/// Ifdef conditionally filters the series
179#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
180#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
181#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
182pub enum FilterOrIfDef {
183    /// A plain filter
184    Filter(Filter),
185    /// ifdef based on a parameter declaration
186    Ifdef {
187        /// The name of the parameter
188        param: ParamDeclaration,
189        /// The filter
190        filter: Filter,
191    },
192}
193
194impl FilterOrIfDef {
195    #[cfg(test)]
196    pub(crate) fn filter(&self) -> &Filter {
197        match self {
198            FilterOrIfDef::Filter(filter) | FilterOrIfDef::Ifdef { filter, .. } => filter,
199        }
200    }
201}
202
203/// A Mapping function
204#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
205#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
206#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
207pub struct Mapping {
208    /// The function to apply
209    pub function: MapFunction,
210    /// The optional argument to pass to the function
211    pub arg: Option<f64>,
212}
213
214/// An Alignment function
215#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
216#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
217#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
218pub struct Align {
219    /// The function to apply
220    pub function: AlignFunction,
221    /// The time to align to
222    pub time: Option<Parameterized<RelativeTime>>,
223}
224
225/// A Grouping function
226#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
227#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
228#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
229pub struct GroupBy {
230    /// The location of the group by clause
231    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
232    pub span: SourceSpan,
233    /// The function to apply
234    pub function: GroupFunction,
235    /// The tags to group by
236    pub tags: Vec<String>,
237}
238
239/// A Bucketing function, applying both tag and time based aggregation
240#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
241#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
242#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
243pub struct BucketBy {
244    /// The location of the group by clause
245    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
246    pub span: SourceSpan,
247    /// The function to apply
248    pub function: BucketType,
249    /// The time to align to
250    pub time: Option<Parameterized<RelativeTime>>,
251    /// The tags to group by
252    pub tags: Vec<String>,
253    /// The buckets to produce
254    pub spec: Vec<BucketSpec>,
255}
256
257/// Possible aggregate functions
258#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
259#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
260#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
261pub enum Aggregate {
262    /// Map a function over each value
263    Map(Mapping),
264    /// Align the data to a time interval
265    Align(Align),
266    /// Group the data by tags
267    GroupBy(GroupBy),
268    /// Bucket the data by time and tags
269    Bucket(BucketBy),
270    /// Rename the metric
271    As(As),
272}
273
274/// Values for directives
275#[cfg_attr(feature = "wasm", tsify::declare)]
276#[cfg_attr(feature = "bincode", derive(bincode::Encode, bincode::Decode))]
277#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)]
278pub enum DirectiveValue {
279    /// Directive with a ident value
280    Ident(String),
281    /// Directive with a literal value
282    Int(i64),
283    /// Directive with a float value
284    Float(f64),
285    /// Directive with a string value
286    String(String),
287    /// Directive with a boolean value
288    Bool(bool),
289    /// Directive with no value
290    None,
291}
292
293impl DirectiveValue {
294    /// Ident value
295    #[must_use]
296    pub fn as_ident(&self) -> Option<&str> {
297        match self {
298            DirectiveValue::Ident(ident) => Some(ident),
299            _ => None,
300        }
301    }
302    /// Int value
303    #[must_use]
304    pub fn as_int(&self) -> Option<i64> {
305        match self {
306            DirectiveValue::Int(int) => Some(*int),
307            _ => None,
308        }
309    }
310    /// Float value
311    #[must_use]
312    pub fn as_float(&self) -> Option<f64> {
313        match self {
314            DirectiveValue::Float(float) => Some(*float),
315            _ => None,
316        }
317    }
318    /// String value
319    #[must_use]
320    pub fn as_string(&self) -> Option<&str> {
321        match self {
322            DirectiveValue::String(string) => Some(string),
323            _ => None,
324        }
325    }
326    /// Bool value
327    #[must_use]
328    pub fn as_bool(&self) -> Option<bool> {
329        match self {
330            DirectiveValue::Bool(bool) => Some(*bool),
331            _ => None,
332        }
333    }
334    /// Tests if value is None
335    #[must_use]
336    pub fn is_none(&self) -> bool {
337        matches!(self, DirectiveValue::None)
338    }
339    /// Tests if value is Some
340    #[must_use]
341    pub fn is_some(&self) -> bool {
342        !self.is_none()
343    }
344}
345
346/// A parameter type, either Optional or Terminal.
347#[cfg_attr(feature = "wasm", tsify::declare)]
348#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
349pub enum ParamType {
350    /// A type that's defined and present `param p: int`
351    Terminal(TerminalParamType),
352    /// A type that may or may not be present `param p: Option<int>`
353    Optional(TerminalParamType),
354}
355
356impl ParamType {
357    fn is_optional(self) -> bool {
358        matches!(self, ParamType::Optional(_))
359    }
360}
361
362impl std::fmt::Display for ParamType {
363    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364        match self {
365            ParamType::Terminal(t) => t.fmt(f),
366            ParamType::Optional(t) => write!(f, "Option<{t}>"),
367        }
368    }
369}
370
371/// Terminal Types for params.
372#[cfg_attr(feature = "wasm", tsify::declare)]
373#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
374pub enum TerminalParamType {
375    /// Duration (e.g. 25s)
376    Duration,
377    /// Dataset
378    Dataset,
379    /// Regex
380    Regex,
381    /// A tag value type
382    Tag(TagType),
383}
384impl std::fmt::Display for TerminalParamType {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            TerminalParamType::Dataset => write!(f, "Dataset"),
388            TerminalParamType::Duration => write!(f, "Duration"),
389            TerminalParamType::Regex => write!(f, "Regex"),
390            TerminalParamType::Tag(t) => t.fmt(f),
391        }
392    }
393}
394
395/// Types for params.
396#[cfg_attr(feature = "wasm", tsify::declare)]
397#[cfg_attr(feature = "bincode", derive(bincode::Encode, bincode::Decode))]
398#[derive(Clone, Copy, Debug, Hash, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
399pub enum TagType {
400    /// String
401    String,
402    /// Int
403    Int,
404    /// Float
405    Float,
406    /// Bool
407    Bool,
408    /// Null value
409    Null,
410}
411
412#[cfg(feature = "bincode")]
413#[test]
414fn test_renaming_none_to_null_has_no_bincode_side_effects() {
415    let enc = [4];
416    assert_eq!(
417        (TagType::Null, 1),
418        bincode::decode_from_slice(&enc, bincode::config::standard()).expect("it does ...")
419    );
420}
421
422impl std::fmt::Display for TagType {
423    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
424        write!(
425            f,
426            "{}",
427            match self {
428                TagType::String => "string",
429                TagType::Int => "int",
430                TagType::Float => "float",
431                TagType::Bool => "bool",
432                TagType::Null => "null",
433            }
434        )
435    }
436}
437
438/// Directives given to adjust the behavior of the runtime
439#[cfg_attr(feature = "wasm", tsify::declare)]
440pub type Directives = HashMap<String, DirectiveValue>;
441
442/// A param.
443#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
444#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
445pub struct ParamDeclaration {
446    /// The location of the param
447    #[cfg_attr(feature = "wasm", tsify(type = "{ offset: number, length: number }"))]
448    pub span: SourceSpan,
449    /// The name of the param
450    pub name: String,
451    /// The type of the param
452    pub typ: ParamType,
453}
454
455impl ParamDeclaration {
456    pub(crate) fn typ(&self) -> TerminalParamType {
457        match self.typ {
458            ParamType::Terminal(terminal_param_type) | ParamType::Optional(terminal_param_type) => {
459                terminal_param_type
460            }
461        }
462    }
463
464    pub(crate) fn is_optional(&self) -> bool {
465        self.typ.is_optional()
466    }
467}
468
469/// A param value.
470#[derive(Debug, Clone, PartialEq)]
471pub enum ParamValue {
472    /// Dataset
473    Dataset(Dataset),
474    /// Duration
475    Duration(RelativeTime),
476    /// String
477    String(String),
478    /// Int
479    Int(i64),
480    /// Float
481    Float(f64),
482    /// Bool
483    Bool(bool),
484    /// Regex
485    Regex(EncodableRegex),
486}
487
488impl ParamValue {
489    /// Get the type of the param value.
490    #[must_use]
491    pub fn typ(&self) -> TerminalParamType {
492        match self {
493            ParamValue::Dataset(_) => TerminalParamType::Dataset,
494            ParamValue::Duration(_) => TerminalParamType::Duration,
495            ParamValue::Regex(_) => TerminalParamType::Regex,
496            ParamValue::String(_) => TerminalParamType::Tag(TagType::String),
497            ParamValue::Int(_) => TerminalParamType::Tag(TagType::Int),
498            ParamValue::Float(_) => TerminalParamType::Tag(TagType::Float),
499            ParamValue::Bool(_) => TerminalParamType::Tag(TagType::Bool),
500        }
501    }
502}
503
504/// The param provided to the query.
505#[derive(Debug, Clone, PartialEq)]
506pub struct ProvidedParam {
507    /// The name of the param.
508    pub name: String,
509    /// The value.
510    pub value: ParamValue,
511}
512
513impl ProvidedParam {
514    /// Create a new `ProvidedParam`.
515    pub fn new(name: impl Into<String>, value: ParamValue) -> Self {
516        Self {
517            name: name.into(),
518            value,
519        }
520    }
521}
522
523/// A smol wrapper around `Vec<ProvidedParam>` for easier use.
524#[derive(Debug, Clone, Default)]
525pub struct ProvidedParams {
526    inner: Vec<ProvidedParam>,
527}
528
529/// The error returned from `ProvidedParams::resolve`.
530#[derive(Debug, thiserror::Error)]
531pub enum ResolveError {
532    /// Param not provided
533    #[error("Param ${0} was not provided to the query")]
534    ParamNotProvided(String),
535    /// Invalid type
536    #[error(
537        "Param ${name} is defined as `{defined}`, but was used in a context that expected one of: {}",
538        expected.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
539    )]
540    InvalidType {
541        /// Name of the param
542        name: String,
543        /// Type of the param
544        defined: TerminalParamType,
545        /// The type that is valid in the context it was used
546        expected: Vec<TerminalParamType>,
547    },
548    /// Shared string error
549    #[error("Shared string error: {0}")]
550    SharedString(#[from] strumbra::Error),
551}
552
553/// The error returned from `ProvidedParams::parse`.
554#[derive(Debug, thiserror::Error)]
555pub enum ParseProvidedParamsError {
556    /// Parse failed
557    #[error("Failed to parse the value for ${param_name} as {expected_type}: {err}")]
558    ParseParam {
559        /// Param name
560        param_name: String,
561        /// Expected t ype
562        expected_type: ParamType,
563        /// Parse param error
564        err: ParseParamError,
565    },
566    /// Params provided more than once
567    #[error("These params were provided more than once: {}", .0.join(", "))]
568    ParamsProvidedMoreThanOnce(Vec<String>),
569    /// Params declared but not provided
570    #[error("The following params were declared but not provided: {}", .0.join(", "))]
571    ParamsDeclaredButNotProvided(Vec<String>),
572    /// Too many params provided
573    #[error("The number of params provided exceeds the upper limit of {0}")]
574    TooManyParamsProvided(usize),
575}
576/// List of warning reasons
577#[derive(Debug)]
578pub enum WarningReason {
579    /// Provided but not declared  param
580    ParamNotDeclared(Vec<String>),
581    /// System parameter declared
582    ParamUsingSystemPrefix {
583        /// The param
584        param: String,
585    },
586    /// lowercase duration
587    OldDuration,
588}
589
590impl Display for WarningReason {
591    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592        match self {
593            WarningReason::ParamNotDeclared(items) => write!(
594                f,
595                "These params were provided but not declared: {}",
596                items.join(", ")
597            ),
598            WarningReason::OldDuration => {
599                write!(f, "`duration` is depricated, please ues `Duration`")
600            }
601            WarningReason::ParamUsingSystemPrefix { param } => {
602                write!(
603                    f,
604                    "The param ${param} uses the `__` prefix reserved for system params"
605                )
606            }
607        }
608    }
609}
610
611/// Warning we want to surface to the user instead of failing the request.
612#[derive(Debug)]
613pub struct Warning {
614    source: Option<SourceSpan>,
615    warning: WarningReason,
616}
617
618impl Warning {
619    /// The warning message
620    #[must_use]
621    pub fn warning(&self) -> &WarningReason {
622        &self.warning
623    }
624    /// The location of the warning (if any)
625    #[must_use]
626    pub fn source(&self) -> Option<SourceSpan> {
627        self.source
628    }
629}
630
631/// Warnings we want to surface to the user instead of failing the request.
632#[derive(Debug, Default)]
633pub struct Warnings {
634    inner: Vec<Warning>,
635}
636
637impl Warnings {
638    /// Create a new warnings structure.
639    #[must_use]
640    pub fn new() -> Self {
641        Self::default()
642    }
643
644    /// Add a new warning.
645    pub fn push(&mut self, warning: WarningReason) {
646        self.inner.push(Warning {
647            source: None,
648            warning,
649        });
650    }
651    /// Add a new warning.
652    pub fn push_span(&mut self, span: SourceSpan, warning: WarningReason) {
653        self.inner.push(Warning {
654            source: Some(span),
655            warning,
656        });
657    }
658
659    /// Returns true if there are no warnings.
660    #[must_use]
661    pub fn is_empty(&self) -> bool {
662        self.inner.is_empty()
663    }
664
665    /// Get the warnings as slice.
666    #[must_use]
667    pub fn as_slice(&self) -> &[Warning] {
668        &self.inner
669    }
670
671    /// Turn into a vector.
672    #[must_use]
673    pub fn into_vec(self) -> Vec<Warning> {
674        self.inner
675    }
676}
677
678impl ProvidedParams {
679    /// Create a new `ProvidedParams` struct.
680    #[must_use]
681    pub fn new(inner: Vec<ProvidedParam>) -> Self {
682        Self { inner }
683    }
684
685    /// Parse params from a hashmap of query parameters.
686    /// This will only look at params that start with `param__` and it'll use
687    /// the parser definitions to extract the values.
688    pub fn parse_and_validate(
689        mpl_params: &Params,
690        query_params: &[(String, String)],
691    ) -> Result<(Self, Warnings), ParseProvidedParamsError> {
692        const PREFIX: &str = "param__";
693        const PARAM_COUNT_LIMIT: usize = 128;
694
695        let mut warnings = Warnings::new();
696        let mut defined_more_than_once = HashSet::new();
697        let mut provided_but_not_declared = HashSet::new();
698        let mut seen = HashSet::new();
699
700        let params = query_params
701            .iter()
702            .filter_map(|(name, value)| {
703                if !name.starts_with(PREFIX) {
704                    return None;
705                }
706                let name = name.trim_start_matches(PREFIX);
707                if name.is_empty() {
708                    return None;
709                }
710
711                Some((name, value))
712            })
713            .take(PARAM_COUNT_LIMIT + 1)
714            .collect::<Vec<(&str, &String)>>();
715
716        // we don't support unlimited params
717        if params.len() > PARAM_COUNT_LIMIT {
718            return Err(ParseProvidedParamsError::TooManyParamsProvided(
719                PARAM_COUNT_LIMIT,
720            ));
721        }
722
723        let mut provided_params = Vec::new();
724        for (name, value) in params {
725            if seen.contains(name) {
726                // uh oh, we've already seen this value
727                defined_more_than_once.insert(name);
728                continue;
729            }
730            seen.insert(name);
731
732            // is the param even declared?
733            let Some(mpl_param) = mpl_params.iter().find(|p| p.name == name) else {
734                provided_but_not_declared.insert(name);
735                continue;
736            };
737
738            // parse mpl
739            let parsed = MPLParser::parse(Rule::param_value, value).map_err(|err| {
740                ParseProvidedParamsError::ParseParam {
741                    param_name: name.to_string(),
742                    expected_type: mpl_param.typ,
743                    err: ParseParamError::Parse(ParseError::from(err)),
744                }
745            })?;
746
747            // parse as correct type
748            let value = parser::parse_param_value(mpl_param, parsed).map_err(|err| {
749                ParseProvidedParamsError::ParseParam {
750                    param_name: name.to_string(),
751                    expected_type: mpl_param.typ,
752                    err,
753                }
754            })?;
755
756            provided_params.push(ProvidedParam {
757                name: name.to_string(),
758                value,
759            });
760        }
761
762        if !provided_but_not_declared.is_empty() {
763            // sort for consistency
764            let mut items = provided_but_not_declared
765                .into_iter()
766                .map(|p| format!("${p}"))
767                .collect::<Vec<String>>();
768            items.sort();
769
770            // add to warnings, no need to error
771            warnings.push(WarningReason::ParamNotDeclared(items));
772        }
773
774        if !defined_more_than_once.is_empty() {
775            // sort for consistency
776            let mut items = defined_more_than_once
777                .into_iter()
778                .map(String::from)
779                .collect::<Vec<String>>();
780            items.sort();
781
782            return Err(ParseProvidedParamsError::ParamsProvidedMoreThanOnce(items));
783        }
784
785        let declared_param_names = mpl_params
786            .iter()
787            .filter_map(|p| {
788                // Skip optional params since they don't need to be provided.
789                if p.typ.is_optional() {
790                    None
791                } else {
792                    Some(p.name.as_str())
793                }
794            })
795            .collect::<HashSet<&str>>();
796        let declared_but_not_provided = declared_param_names
797            .difference(&seen)
798            .collect::<Vec<&&str>>();
799        if !declared_but_not_provided.is_empty() {
800            // sort for consistency
801            let mut items = declared_but_not_provided
802                .into_iter()
803                .map(|s| String::from(*s))
804                .collect::<Vec<String>>();
805            items.sort();
806
807            return Err(ParseProvidedParamsError::ParamsDeclaredButNotProvided(
808                items,
809            ));
810        }
811
812        Ok((ProvidedParams::new(provided_params), warnings))
813    }
814
815    /// Return a ref to the inner value.
816    #[must_use]
817    pub fn as_slice(&self) -> &[ProvidedParam] {
818        self.inner.as_slice()
819    }
820
821    fn get_param(&self, name: &str) -> Result<&ProvidedParam, ResolveError> {
822        self.inner
823            .iter()
824            .find(|p| p.name == name)
825            .ok_or(ResolveError::ParamNotProvided(name.to_string()))
826    }
827
828    /// Resolve a `TagValue`.
829    pub fn resolve_tag_value(&self, pv: Parameterized<TagValue>) -> Result<TagValue, ResolveError> {
830        let param = match pv {
831            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
832            Parameterized::Param { span: _, param } => param,
833        };
834
835        let provided_param = self.get_param(&param.name)?;
836        match &provided_param.value {
837            ParamValue::String(val) => Ok(TagValue::String(SharedString::try_from(val)?)),
838            ParamValue::Int(val) => Ok(TagValue::Int(*val)),
839            ParamValue::Float(val) => Ok(TagValue::Float(*val)),
840            ParamValue::Bool(val) => Ok(TagValue::Bool(*val)),
841            val => Err(ResolveError::InvalidType {
842                name: param.name,
843                defined: val.typ(),
844                expected: vec![
845                    TerminalParamType::Tag(TagType::String),
846                    TerminalParamType::Tag(TagType::Int),
847                    TerminalParamType::Tag(TagType::Float),
848                    TerminalParamType::Tag(TagType::Bool),
849                ],
850            }),
851        }
852    }
853
854    /// Resolve a `Dataset`.
855    pub fn resolve_dataset(&self, pv: Parameterized<Dataset>) -> Result<Dataset, ResolveError> {
856        let param = match pv {
857            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
858            Parameterized::Param { span: _, param } => param,
859        };
860
861        let provided_param = self.get_param(&param.name)?;
862        match &provided_param.value {
863            ParamValue::Dataset(dataset) => Ok(dataset.clone()),
864            val => Err(ResolveError::InvalidType {
865                name: param.name,
866                defined: val.typ(),
867                expected: vec![TerminalParamType::Dataset],
868            }),
869        }
870    }
871
872    /// Resolve a `RelativeTime`, aka duration.
873    pub fn resolve_relative_time(
874        &self,
875        pv: Parameterized<RelativeTime>,
876    ) -> Result<RelativeTime, ResolveError> {
877        let param = match pv {
878            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
879            Parameterized::Param { span: _, param } => param,
880        };
881
882        let provided_param = self.get_param(&param.name)?;
883        match &provided_param.value {
884            ParamValue::Duration(relative_time) => Ok(relative_time.clone()),
885            val => Err(ResolveError::InvalidType {
886                name: param.name,
887                defined: val.typ(),
888                expected: vec![TerminalParamType::Duration],
889            }),
890        }
891    }
892
893    /// Resolve a regex.
894    pub fn resolve_regex(
895        &self,
896        pv: Parameterized<EncodableRegex>,
897    ) -> Result<EncodableRegex, ResolveError> {
898        let param = match pv {
899            Parameterized::Concrete(val) => return Ok(val), // no need to resolve
900            Parameterized::Param { span: _, param } => param,
901        };
902
903        let provided_param = self.get_param(&param.name)?;
904        match &provided_param.value {
905            ParamValue::Regex(re) => Ok(re.clone()),
906            val => Err(ResolveError::InvalidType {
907                name: param.name,
908                defined: val.typ(),
909                expected: vec![TerminalParamType::Regex],
910            }),
911        }
912    }
913    /// Checks if a param was provided
914    #[must_use]
915    pub fn contains(&self, param: &str) -> bool {
916        self.get_param(param).is_ok()
917    }
918
919    /// Returns the filter when it should be applied for these params.
920    ///
921    /// Plain filters are always active. `ifdef` filters are active only when
922    /// their guarding optional param was provided by the caller.
923    #[must_use]
924    pub fn active_filter<'a>(&self, filter: &'a FilterOrIfDef) -> Option<&'a Filter> {
925        match filter {
926            FilterOrIfDef::Filter(filter) => Some(filter),
927            FilterOrIfDef::Ifdef { param, filter } if self.contains(&param.name) => Some(filter),
928            FilterOrIfDef::Ifdef { .. } => None,
929        }
930    }
931
932    /// Returns filters that should be applied for these params, preserving order.
933    #[must_use]
934    pub fn active_filters<'a>(&self, filters: &'a [FilterOrIfDef]) -> Vec<&'a Filter> {
935        filters
936            .iter()
937            .filter_map(|filter| self.active_filter(filter))
938            .collect()
939    }
940}
941
942/// Parameters that will be set externally.
943#[cfg_attr(feature = "wasm", tsify::declare)]
944pub type Params = Vec<ParamDeclaration>;
945
946/// A Query AST representing a query in the `MPL` language
947#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
948#[cfg_attr(feature = "wasm", derive(tsify::Tsify))]
949#[cfg_attr(feature = "wasm", tsify(into_wasm_abi, from_wasm_abi))]
950pub enum Query {
951    /// A simple query that will produce a result
952    Simple {
953        /// The source of the data
954        source: Source,
955        /// The filters to apply to the data
956        filters: Vec<FilterOrIfDef>,
957        /// The aggregates to apply to the data
958        aggregates: Vec<Aggregate>,
959        /// The directives
960        directives: Directives,
961        /// The params
962        params: Params,
963        /// How to sample series
964        sample: Option<f64>,
965    },
966    /// A compute query taking the input of two queries and producing a by computing combined values
967    Compute {
968        /// The left hand side query to compute
969        left: Box<Query>,
970        /// The right hand side query to compute
971        right: Box<Query>,
972        /// The name of the metric to produce
973        name: Metric,
974        /// The compute operation used to combine the left and right queries
975        op: ComputeFunction,
976        /// The aggregates to apply to the combined data
977        aggregates: Vec<Aggregate>,
978        /// The directives
979        directives: Directives,
980        /// The params
981        params: Params,
982    },
983}
984
985impl Query {
986    /// Gets the time range for the query
987    #[must_use]
988    pub fn time_range(&self) -> Option<&TimeRange> {
989        match self {
990            Query::Simple { source, .. } => source.time(),
991            Query::Compute { left, .. } => left.time_range(),
992        }
993    }
994    /// Get a ref to the params of the query.
995    #[must_use]
996    pub fn params(&self) -> &Params {
997        match self {
998            Query::Simple { params, .. } | Query::Compute { params, .. } => params,
999        }
1000    }
1001    /// Get a ref to the directives of the query.
1002    #[must_use]
1003    pub fn directives(&self) -> &Directives {
1004        match self {
1005            Query::Simple { directives, .. } | Query::Compute { directives, .. } => directives,
1006        }
1007    }
1008}
1009
1010impl RelativeTime {
1011    /// Converts a relative time to a `Duration`
1012    pub fn to_duration(&self) -> Result<Duration, TimeError> {
1013        let v = i64::try_from(self.value).map_err(TimeError::InvalidDuration)?;
1014        Ok(match self.unit {
1015            TimeUnit::Millisecond => Duration::milliseconds(v),
1016            TimeUnit::Second => Duration::seconds(v),
1017            TimeUnit::Minute => Duration::minutes(v),
1018            TimeUnit::Hour => Duration::hours(v),
1019            TimeUnit::Day => Duration::days(v),
1020            TimeUnit::Week => Duration::weeks(v),
1021            TimeUnit::Month => Duration::days(v.saturating_mul(30)),
1022            TimeUnit::Year => Duration::days(v.saturating_mul(365)),
1023        })
1024    }
1025
1026    /// Converts a relative time to a `Resolution`
1027    pub fn to_resolution(&self) -> Result<Resolution, ResolutionError> {
1028        match self.unit {
1029            TimeUnit::Millisecond => Resolution::secs(self.value / 1000),
1030            TimeUnit::Second => Resolution::secs(self.value),
1031            TimeUnit::Minute => Resolution::secs(self.value.saturating_mul(60)),
1032            TimeUnit::Hour => Resolution::secs(self.value.saturating_mul(60 * 60)),
1033            TimeUnit::Day => Resolution::secs(self.value.saturating_mul(60 * 60 * 24)),
1034            TimeUnit::Week => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 7)),
1035            TimeUnit::Month => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 30)),
1036            TimeUnit::Year => Resolution::secs(self.value.saturating_mul(60 * 60 * 24 * 365)),
1037        }
1038    }
1039}
1040
1041/// An error that can occur when converting a time value.
1042#[derive(Debug, thiserror::Error)]
1043pub enum TimeError {
1044    /// Invalid timestamp could not be converted to a UTC datetime
1045    #[error("Invalid timestamp {0}, could not be converted to a UTC datetime")]
1046    InvalidTimestamp(i64),
1047    /// Invalid duration could not be converted to Duration as it exceeds the maximum i64
1048    #[error(
1049        "Invalid duration {0}, could not be converted to Duration as it exceeds the maximum i64"
1050    )]
1051    InvalidDuration(TryFromIntError),
1052}
1053#[cfg(feature = "clock")]
1054impl Time {
1055    fn to_datetime(&self) -> Result<DateTime<Utc>, TimeError> {
1056        Ok(match self {
1057            Time::Relative(t) => Utc::now() - t.to_duration()?,
1058            Time::Timestamp(ts) => {
1059                DateTime::<Utc>::from_timestamp(*ts, 0).ok_or(TimeError::InvalidTimestamp(*ts))?
1060            }
1061            Time::RFC3339(t) => t.with_timezone(&Utc),
1062            Time::Modifier(_) => todo!(),
1063        })
1064    }
1065}
1066
1067#[cfg(feature = "clock")]
1068impl TimeRange {
1069    /// Converts a time range to a start and pair
1070    pub fn to_start_end(&self) -> Result<(DateTime<Utc>, DateTime<Utc>), TimeError> {
1071        let start = self.start.to_datetime()?;
1072        let end = self
1073            .end
1074            .as_ref()
1075            .map_or_else(|| Ok(Utc::now()), Time::to_datetime)?;
1076        Ok((start, end))
1077    }
1078}