auxon_sdk/reflector_config/
mod.rs

1//! A format definition and parser for the `modality-reflector`, to be used
2//! by custom reflector plugins that can be hosted within the reflector itself.
3
4pub mod resolve;
5
6pub use refined::*;
7use std::collections::BTreeMap;
8use std::path::{Path, PathBuf};
9use thiserror::Error;
10pub use toml::Value as TomlValue;
11
12pub const CONFIG_ENV_VAR: &str = "MODALITY_REFLECTOR_CONFIG";
13
14pub const MODALITY_STORAGE_SERVICE_PORT_DEFAULT: u16 = 14182;
15pub const MODALITY_STORAGE_SERVICE_TLS_PORT_DEFAULT: u16 = 14183;
16
17pub const MODALITY_REFLECTOR_INGEST_CONNECT_PORT_DEFAULT: u16 = 14188;
18pub const MODALITY_REFLECTOR_INGEST_CONNECT_TLS_PORT_DEFAULT: u16 = 14189;
19
20pub const MODALITY_MUTATION_CONNECT_PORT_DEFAULT: u16 = 14192;
21pub const MODALITY_MUTATION_CONNECT_TLS_PORT_DEFAULT: u16 = 14194;
22
23pub const MODALITY_REFLECTOR_MUTATION_CONNECT_PORT_DEFAULT: u16 = 14198;
24pub const MODALITY_REFLECTOR_MUTATION_CONNECT_TLS_PORT_DEFAULT: u16 = 14199;
25
26/// Private, internal, raw representation of the TOML content
27pub(crate) mod raw_toml {
28    use super::*;
29    use std::path::PathBuf;
30
31    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
32    #[serde(rename_all = "kebab-case", default)]
33    pub(crate) struct Config {
34        #[serde(skip_serializing_if = "Option::is_none")]
35        pub(crate) ingest: Option<TopLevelIngest>,
36
37        #[serde(skip_serializing_if = "Option::is_none")]
38        pub(crate) mutation: Option<TopLevelMutation>,
39
40        #[serde(skip_serializing_if = "Option::is_none")]
41        pub(crate) plugins: Option<TopLevelPlugins>,
42
43        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
44        pub(crate) metadata: BTreeMap<String, TomlValue>,
45    }
46
47    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
48    #[serde(rename_all = "kebab-case", default)]
49    pub(crate) struct TopLevelIngest {
50        #[serde(skip_serializing_if = "Option::is_none")]
51        pub(crate) protocol_parent_url: Option<String>,
52
53        #[serde(skip_serializing_if = "std::ops::Not::not")]
54        pub(crate) allow_insecure_tls: bool,
55
56        #[serde(skip_serializing_if = "Option::is_none")]
57        pub(crate) max_write_batch_staleness_millis: Option<u64>,
58
59        #[serde(skip_serializing_if = "Option::is_none")]
60        pub(crate) protocol_child_port: Option<u16>,
61
62        #[serde(flatten)]
63        pub(crate) timeline_attributes: TimelineAttributes,
64
65        #[serde(skip_serializing_if = "Vec::is_empty", alias = "rollover-tracker")]
66        pub(crate) rollover_trackers: Vec<IngestRolloverTracker>,
67    }
68
69    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
70    #[serde(rename_all = "kebab-case", default)]
71    pub(crate) struct TopLevelMutation {
72        #[serde(skip_serializing_if = "Option::is_none")]
73        pub(crate) protocol_parent_url: Option<String>,
74
75        #[serde(skip_serializing_if = "std::ops::Not::not")]
76        pub(crate) allow_insecure_tls: bool,
77
78        #[serde(skip_serializing_if = "Option::is_none")]
79        pub(crate) protocol_child_port: Option<u16>,
80
81        #[serde(skip_serializing_if = "Option::is_none")]
82        pub(crate) mutator_http_api_port: Option<u16>,
83
84        #[serde(flatten)]
85        pub(crate) mutator_attributes: MutatorAttributes,
86
87        #[serde(skip_serializing_if = "Vec::is_empty")]
88        pub(crate) external_mutator_urls: Vec<String>,
89    }
90
91    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
92    #[serde(rename_all = "kebab-case", default)]
93    pub(crate) struct TopLevelPlugins {
94        #[serde(skip_serializing_if = "Option::is_none")]
95        pub(crate) available_ports: Option<AvailablePorts>,
96
97        #[serde(skip_serializing_if = "Option::is_none")]
98        pub(crate) plugins_dir: Option<PathBuf>,
99
100        #[serde(skip_serializing_if = "Option::is_none")]
101        pub(crate) ingest: Option<PluginsIngest>,
102
103        #[serde(skip_serializing_if = "Option::is_none")]
104        pub(crate) mutation: Option<PluginsMutation>,
105    }
106
107    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
108    #[serde(rename_all = "kebab-case", default)]
109    pub(crate) struct AvailablePorts {
110        #[serde(skip_serializing_if = "Option::is_none")]
111        pub(crate) any_local: Option<bool>,
112
113        #[serde(skip_serializing_if = "Vec::is_empty")]
114        pub(crate) ranges: Vec<[u16; 2]>,
115    }
116
117    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
118    #[serde(rename_all = "kebab-case", default)]
119    pub(crate) struct TimelineAttributes {
120        #[serde(skip_serializing_if = "Vec::is_empty")]
121        pub(crate) additional_timeline_attributes: Vec<String>,
122
123        #[serde(skip_serializing_if = "Vec::is_empty")]
124        pub(crate) override_timeline_attributes: Vec<String>,
125    }
126
127    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
128    #[serde(rename_all = "kebab-case", default)]
129    pub(crate) struct MutatorAttributes {
130        #[serde(skip_serializing_if = "Vec::is_empty")]
131        pub(crate) additional_mutator_attributes: Vec<String>,
132
133        #[serde(skip_serializing_if = "Vec::is_empty")]
134        pub(crate) override_mutator_attributes: Vec<String>,
135    }
136
137    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
138    #[serde(rename_all = "kebab-case", default)]
139    pub(crate) struct PluginsIngest {
140        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
141        pub(crate) collectors: BTreeMap<String, PluginsIngestMember>,
142
143        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
144        pub(crate) importers: BTreeMap<String, PluginsIngestMember>,
145    }
146
147    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
148    #[serde(rename_all = "kebab-case", default)]
149    pub(crate) struct PluginsIngestMember {
150        pub(crate) plugin: Option<String>,
151
152        #[serde(flatten)]
153        pub(crate) timeline_attributes: TimelineAttributes,
154
155        #[serde(flatten)]
156        pub(crate) shutdown: PluginShutdown,
157
158        #[serde(skip_serializing_if = "Option::is_none")]
159        pub(crate) restart: Option<bool>,
160
161        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
162        pub(crate) metadata: BTreeMap<String, TomlValue>,
163    }
164
165    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
166    #[serde(rename_all = "kebab-case", default)]
167    pub(crate) struct PluginsMutation {
168        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
169        pub(crate) mutators: BTreeMap<String, PluginsMutationMember>,
170    }
171
172    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
173    #[serde(rename_all = "kebab-case", default)]
174    pub(crate) struct PluginsMutationMember {
175        pub(crate) plugin: Option<String>,
176
177        #[serde(flatten)]
178        pub(crate) mutator_attributes: MutatorAttributes,
179
180        #[serde(flatten)]
181        pub(crate) shutdown: PluginShutdown,
182
183        #[serde(skip_serializing_if = "Option::is_none")]
184        pub(crate) restart: Option<bool>,
185
186        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
187        pub(crate) metadata: BTreeMap<String, TomlValue>,
188    }
189
190    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
191    #[serde(rename_all = "kebab-case", default)]
192    pub(crate) struct PluginShutdown {
193        pub(crate) shutdown_signal: Option<String>,
194        pub(crate) shutdown_timeout_millis: Option<u64>,
195    }
196
197    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
198    #[serde(rename_all = "kebab-case", default)]
199    pub(crate) struct IngestRolloverTracker {
200        pub(crate) timeout_millis: Option<u64>,
201        #[serde(skip_serializing_if = "Option::is_none")]
202        pub(crate) sender: Option<RolloverTrackerParticipant>,
203        #[serde(skip_serializing_if = "Vec::is_empty", alias = "receiver")]
204        pub(crate) receivers: Vec<RolloverTrackerParticipant>,
205    }
206
207    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
208    #[serde(rename_all = "kebab-case", default)]
209    pub(crate) struct RolloverTrackerParticipant {
210        #[serde(skip_serializing_if = "Vec::is_empty")]
211        pub(crate) timeline_attributes: Vec<String>,
212        #[serde(skip_serializing_if = "Option::is_none")]
213        pub(crate) event_name: Option<String>,
214        #[serde(skip_serializing_if = "Option::is_none")]
215        pub(crate) event_attribute_key: Option<String>,
216    }
217
218    #[cfg(test)]
219    pub(crate) fn try_raw_to_string_pretty(config: &Config) -> Result<String, toml::ser::Error> {
220        // Slightly unexpected detour through toml::Value to work around some
221        // of the toml crate's touchy handling of the order of serialization of
222        // fields.
223        let toml_value = toml::Value::try_from(config)?;
224        let content = toml::to_string_pretty(&toml_value)?;
225        Ok(content)
226    }
227
228    impl PluginMemberExt for PluginsIngestMember {
229        fn plugin(&self) -> Option<&str> {
230            self.plugin.as_deref()
231        }
232    }
233
234    impl PluginMemberExt for PluginsMutationMember {
235        fn plugin(&self) -> Option<&str> {
236            self.plugin.as_deref()
237        }
238    }
239
240    #[cfg(feature = "modality")]
241    impl PluginsIngest {
242        pub(crate) fn find_collector_member_by_plugin_name<S: AsRef<str>>(
243            &self,
244            plugin_name: S,
245        ) -> Option<&PluginsIngestMember> {
246            find_member_by_plugin_name(&self.collectors, plugin_name)
247        }
248
249        pub(crate) fn find_importer_member_by_plugin_name<S: AsRef<str>>(
250            &self,
251            plugin_name: S,
252        ) -> Option<&PluginsIngestMember> {
253            find_member_by_plugin_name(&self.importers, plugin_name)
254        }
255    }
256
257    #[cfg(feature = "modality")]
258    impl PluginsMutation {
259        pub(crate) fn find_mutator_member_by_plugin_name<S: AsRef<str>>(
260            &self,
261            plugin_name: S,
262        ) -> Option<&PluginsMutationMember> {
263            find_member_by_plugin_name(&self.mutators, plugin_name)
264        }
265    }
266
267    impl From<refined::Config> for Config {
268        fn from(value: refined::Config) -> Self {
269            Self {
270                ingest: value.ingest.map(Into::into),
271                mutation: value.mutation.map(Into::into),
272                plugins: value.plugins.map(Into::into),
273                metadata: value.metadata,
274            }
275        }
276    }
277
278    impl From<refined::TopLevelIngest> for TopLevelIngest {
279        fn from(value: refined::TopLevelIngest) -> Self {
280            Self {
281                protocol_parent_url: value.protocol_parent_url.map(Into::into),
282                allow_insecure_tls: value.allow_insecure_tls,
283                max_write_batch_staleness_millis: value.max_write_batch_staleness.map(|v| {
284                    let millis = v.as_millis();
285                    if millis >= u64::MAX as u128 {
286                        u64::MAX
287                    } else {
288                        millis as u64
289                    }
290                }),
291                protocol_child_port: value.protocol_child_port.map(Into::into),
292                timeline_attributes: value.timeline_attributes.into(),
293                rollover_trackers: value
294                    .rollover_trackers
295                    .into_iter()
296                    .map(Into::into)
297                    .collect(),
298            }
299        }
300    }
301    impl From<refined::TopLevelMutation> for TopLevelMutation {
302        fn from(value: refined::TopLevelMutation) -> Self {
303            Self {
304                protocol_parent_url: value.protocol_parent_url.map(Into::into),
305                allow_insecure_tls: value.allow_insecure_tls,
306                protocol_child_port: value.protocol_child_port.map(Into::into),
307                mutator_http_api_port: value.mutator_http_api_port.map(Into::into),
308                mutator_attributes: value.mutator_attributes.into(),
309                external_mutator_urls: value
310                    .external_mutator_urls
311                    .into_iter()
312                    .map(Into::into)
313                    .collect(),
314            }
315        }
316    }
317    impl From<refined::TopLevelPlugins> for TopLevelPlugins {
318        fn from(value: refined::TopLevelPlugins) -> Self {
319            Self {
320                available_ports: value.available_ports.map(Into::into),
321                plugins_dir: value.plugins_dir,
322                ingest: value.ingest.map(Into::into),
323                mutation: value.mutation.map(Into::into),
324            }
325        }
326    }
327    impl From<refined::TimelineAttributes> for TimelineAttributes {
328        fn from(value: refined::TimelineAttributes) -> Self {
329            Self {
330                additional_timeline_attributes: value
331                    .additional_timeline_attributes
332                    .into_iter()
333                    .map(Into::into)
334                    .collect(),
335                override_timeline_attributes: value
336                    .override_timeline_attributes
337                    .into_iter()
338                    .map(Into::into)
339                    .collect(),
340            }
341        }
342    }
343    impl From<refined::MutatorAttributes> for MutatorAttributes {
344        fn from(value: refined::MutatorAttributes) -> Self {
345            Self {
346                additional_mutator_attributes: value
347                    .additional_mutator_attributes
348                    .into_iter()
349                    .map(Into::into)
350                    .collect(),
351                override_mutator_attributes: value
352                    .override_mutator_attributes
353                    .into_iter()
354                    .map(Into::into)
355                    .collect(),
356            }
357        }
358    }
359    impl From<refined::PluginsIngest> for PluginsIngest {
360        fn from(value: refined::PluginsIngest) -> Self {
361            Self {
362                collectors: value
363                    .collectors
364                    .into_iter()
365                    .map(|(k, v)| (k, v.into()))
366                    .collect(),
367                importers: value
368                    .importers
369                    .into_iter()
370                    .map(|(k, v)| (k, v.into()))
371                    .collect(),
372            }
373        }
374    }
375    impl From<refined::PluginsMutation> for PluginsMutation {
376        fn from(value: refined::PluginsMutation) -> Self {
377            Self {
378                mutators: value
379                    .mutators
380                    .into_iter()
381                    .map(|(k, v)| (k, v.into()))
382                    .collect(),
383            }
384        }
385    }
386    impl From<refined::PluginsIngestMember> for PluginsIngestMember {
387        fn from(value: refined::PluginsIngestMember) -> Self {
388            Self {
389                plugin: value.plugin,
390                timeline_attributes: value.timeline_attributes.into(),
391                shutdown: value.shutdown.into(),
392                restart: value.restart,
393                metadata: value.metadata,
394            }
395        }
396    }
397    impl From<refined::PluginsMutationMember> for PluginsMutationMember {
398        fn from(value: refined::PluginsMutationMember) -> Self {
399            Self {
400                plugin: value.plugin,
401                mutator_attributes: value.mutator_attributes.into(),
402                shutdown: value.shutdown.into(),
403                restart: value.restart,
404                metadata: value.metadata,
405            }
406        }
407    }
408
409    impl From<refined::PluginShutdown> for PluginShutdown {
410        fn from(value: refined::PluginShutdown) -> Self {
411            Self {
412                shutdown_signal: value.shutdown_signal,
413                shutdown_timeout_millis: value.shutdown_timeout.map(|v| {
414                    let millis = v.as_millis();
415                    if millis >= u64::MAX as u128 {
416                        u64::MAX
417                    } else {
418                        millis as u64
419                    }
420                }),
421            }
422        }
423    }
424
425    impl From<refined::AvailablePorts> for AvailablePorts {
426        fn from(value: refined::AvailablePorts) -> Self {
427            Self {
428                any_local: value.any_local,
429                ranges: value
430                    .ranges
431                    .into_iter()
432                    .map(|inclusive_range| [inclusive_range.start(), inclusive_range.end()])
433                    .collect(),
434            }
435        }
436    }
437
438    impl From<refined::IngestRolloverTracker> for IngestRolloverTracker {
439        fn from(value: refined::IngestRolloverTracker) -> Self {
440            Self {
441                timeout_millis: value.timeout.map(|v| {
442                    let millis = v.as_millis();
443                    if millis >= u64::MAX as u128 {
444                        u64::MAX
445                    } else {
446                        millis as u64
447                    }
448                }),
449                sender: value.sender.map(Into::into),
450                receivers: value.receivers.into_iter().map(Into::into).collect(),
451            }
452        }
453    }
454
455    impl From<refined::RolloverTrackerParticipant> for RolloverTrackerParticipant {
456        fn from(value: refined::RolloverTrackerParticipant) -> Self {
457            Self {
458                timeline_attributes: value
459                    .timeline_attributes
460                    .into_iter()
461                    .map(Into::into)
462                    .collect(),
463                event_name: value.event_name,
464                event_attribute_key: value.event_attribute_key,
465            }
466        }
467    }
468}
469
470/// Public-facing, more-semantically-enriched configuration types
471mod refined {
472    use super::TomlValue;
473    use crate::api::types::{AttrKey, AttrVal};
474    use lazy_static::lazy_static;
475    use regex::{Captures, Regex};
476    use std::collections::BTreeMap;
477    use std::env;
478    use std::fmt;
479    use std::path::PathBuf;
480    use std::str::FromStr;
481    use std::time::Duration;
482    use url::Url;
483
484    #[derive(Debug, Clone, Default, PartialEq)]
485    pub struct Config {
486        pub ingest: Option<TopLevelIngest>,
487        pub mutation: Option<TopLevelMutation>,
488        pub plugins: Option<TopLevelPlugins>,
489        pub metadata: BTreeMap<String, TomlValue>,
490    }
491
492    #[derive(Debug, Clone, Default, PartialEq, Eq)]
493    pub struct TopLevelIngest {
494        pub protocol_parent_url: Option<Url>,
495        pub allow_insecure_tls: bool,
496        pub protocol_child_port: Option<u16>,
497        pub timeline_attributes: TimelineAttributes,
498        pub max_write_batch_staleness: Option<Duration>,
499        pub rollover_trackers: Vec<IngestRolloverTracker>,
500    }
501
502    #[derive(Debug, Clone, Default, PartialEq, Eq)]
503    pub struct TopLevelMutation {
504        pub protocol_parent_url: Option<Url>,
505        pub allow_insecure_tls: bool,
506        pub protocol_child_port: Option<u16>,
507        pub mutator_http_api_port: Option<u16>,
508        pub mutator_attributes: MutatorAttributes,
509        pub external_mutator_urls: Vec<Url>,
510    }
511
512    #[derive(Debug, Clone, Default, PartialEq)]
513    pub struct TopLevelPlugins {
514        pub available_ports: Option<AvailablePorts>,
515        pub plugins_dir: Option<PathBuf>,
516        pub ingest: Option<PluginsIngest>,
517        pub mutation: Option<PluginsMutation>,
518    }
519
520    #[derive(Debug, Clone, Default, PartialEq, Eq)]
521    pub struct AvailablePorts {
522        pub any_local: Option<bool>,
523        pub ranges: Vec<InclusivePortRange>,
524    }
525
526    #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
527    pub struct InclusivePortRange {
528        start: u16,
529        end: u16,
530    }
531
532    impl InclusivePortRange {
533        pub fn new(start: u16, end: u16) -> Result<Self, SemanticErrorExplanation> {
534            if start > end {
535                Err(SemanticErrorExplanation(format!("Port range start must <= end, but provided start {start} was > provided end {end}")))
536            } else {
537                Ok(InclusivePortRange { start, end })
538            }
539        }
540        pub fn start(&self) -> u16 {
541            self.start
542        }
543        pub fn end(&self) -> u16 {
544            self.end
545        }
546        pub fn start_mut(&mut self) -> &mut u16 {
547            &mut self.start
548        }
549        pub fn end_mut(&mut self) -> &mut u16 {
550            &mut self.end
551        }
552    }
553    #[derive(Debug, Clone, Default, PartialEq, Eq)]
554    pub struct TimelineAttributes {
555        pub additional_timeline_attributes: Vec<AttrKeyEqValuePair>,
556        pub override_timeline_attributes: Vec<AttrKeyEqValuePair>,
557    }
558    #[derive(Debug, Clone, Default, PartialEq, Eq)]
559    pub struct MutatorAttributes {
560        pub additional_mutator_attributes: Vec<AttrKeyEqValuePair>,
561        pub override_mutator_attributes: Vec<AttrKeyEqValuePair>,
562    }
563
564    impl MutatorAttributes {
565        pub fn merge(
566            &mut self,
567            other: MutatorAttributes,
568        ) -> Result<(), MergeMutatorAttributesError> {
569            for AttrKeyEqValuePair(k, v) in other.additional_mutator_attributes.into_iter() {
570                if self
571                    .additional_mutator_attributes
572                    .iter()
573                    .any(|kvp| kvp.0 == k)
574                {
575                    return Err(MergeMutatorAttributesError::KeyConflict(k));
576                }
577
578                self.additional_mutator_attributes
579                    .push(AttrKeyEqValuePair(k, v));
580            }
581
582            self.override_mutator_attributes
583                .extend(other.override_mutator_attributes);
584
585            Ok(())
586        }
587    }
588
589    #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
590    pub enum MergeMutatorAttributesError {
591        #[error("Conflicting settings for mutator attribute key {0}")]
592        KeyConflict(AttrKey),
593    }
594
595    #[derive(Debug, Clone, Default, PartialEq)]
596    pub struct PluginsIngest {
597        pub collectors: BTreeMap<String, PluginsIngestMember>,
598        pub importers: BTreeMap<String, PluginsIngestMember>,
599    }
600    #[derive(Debug, Clone, Default, PartialEq)]
601    pub struct PluginsIngestMember {
602        pub plugin: Option<String>,
603        pub timeline_attributes: TimelineAttributes,
604        pub shutdown: PluginShutdown,
605        pub restart: Option<bool>,
606        pub metadata: BTreeMap<String, TomlValue>,
607    }
608    #[derive(Debug, Clone, Default, PartialEq)]
609    pub struct PluginsMutation {
610        pub mutators: BTreeMap<String, PluginsMutationMember>,
611    }
612    #[derive(Debug, Clone, Default, PartialEq)]
613    pub struct PluginsMutationMember {
614        pub plugin: Option<String>,
615        pub mutator_attributes: MutatorAttributes,
616        pub shutdown: PluginShutdown,
617        pub restart: Option<bool>,
618        pub metadata: BTreeMap<String, TomlValue>,
619    }
620    #[derive(Debug, Clone, Default, PartialEq)]
621    pub struct PluginShutdown {
622        pub shutdown_signal: Option<String>,
623        pub shutdown_timeout: Option<Duration>,
624    }
625
626    #[derive(Debug, Clone, Default, PartialEq, Eq)]
627    pub struct IngestRolloverTracker {
628        pub timeout: Option<Duration>,
629        pub sender: Option<RolloverTrackerParticipant>,
630        pub receivers: Vec<RolloverTrackerParticipant>,
631    }
632
633    #[derive(Debug, Clone, Default, PartialEq, Eq)]
634    pub struct RolloverTrackerParticipant {
635        pub timeline_attributes: Vec<AttrKeyEqValuePair>,
636        pub event_name: Option<String>,
637        pub event_attribute_key: Option<String>,
638    }
639
640    #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
641    pub enum AttrKeyValuePairParseError {
642        #[error("'{0}' is not a valid attribute key=value string.")]
643        Format(String),
644
645        #[error("The key '{0}' starts with an invalid character.")]
646        InvalidKey(String),
647
648        #[error(transparent)]
649        EnvSub(#[from] EnvSubError),
650    }
651
652    /// Parsing and representation for 'foo = "bar"' or "baz = true" or "whatever.anything = 42"
653    /// type key value attribute pairs.
654    ///
655    /// The [`AttrKeyEqValuePair::from_str`] implementation supports the following
656    /// environment variable substitution expressions:
657    /// * `${NAME}`
658    /// * `${NAME-default}`
659    /// * `${NAME:-default}`
660    #[derive(Clone, Debug, PartialEq, Eq, PartialOrd)]
661    pub struct AttrKeyEqValuePair(pub AttrKey, pub AttrVal);
662
663    impl From<(AttrKey, AttrVal)> for AttrKeyEqValuePair {
664        fn from((k, v): (AttrKey, AttrVal)) -> Self {
665            AttrKeyEqValuePair(k, v)
666        }
667    }
668
669    impl FromStr for AttrKeyEqValuePair {
670        type Err = AttrKeyValuePairParseError;
671
672        fn from_str(input: &str) -> Result<Self, Self::Err> {
673            // Do environment substitution first
674            let s = envsub(input)?;
675
676            let parts: Vec<&str> = s.trim().split('=').map(|p| p.trim()).collect();
677            if parts.len() != 2 || parts[0].is_empty() || parts[1].is_empty() {
678                return Err(AttrKeyValuePairParseError::Format(s.to_string()));
679            }
680
681            let key = parts[0];
682            let val_str = parts[1];
683
684            if key.starts_with('.') {
685                return Err(AttrKeyValuePairParseError::InvalidKey(key.to_string()));
686            }
687
688            let val: Result<_, std::convert::Infallible> = val_str.parse();
689            let val = val.unwrap();
690
691            Ok(AttrKeyEqValuePair(AttrKey::new(key.to_string()), val))
692        }
693    }
694
695    impl TryFrom<String> for AttrKeyEqValuePair {
696        type Error = AttrKeyValuePairParseError;
697
698        fn try_from(s: String) -> Result<Self, Self::Error> {
699            AttrKeyEqValuePair::from_str(&s)
700        }
701    }
702
703    impl From<AttrKeyEqValuePair> for String {
704        fn from(kv: AttrKeyEqValuePair) -> Self {
705            kv.to_string()
706        }
707    }
708
709    impl fmt::Display for AttrKeyEqValuePair {
710        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711            // N.B. When we standardize literal notation for more variants, will have to add here
712            // or delegate to some shared serialization code
713            // TODO - more standardized escaping?
714            let val_s = match &self.1 {
715                AttrVal::String(interned_string) => {
716                    let mut s = String::new();
717                    s.push('\"');
718                    s.push_str(interned_string.as_ref());
719                    s.push('\"');
720                    s
721                }
722                AttrVal::TimelineId(timeline_id) => {
723                    let mut s = String::new();
724                    s.push('\"');
725                    s.push_str(timeline_id.to_string().as_str());
726                    s.push('\"');
727                    s
728                }
729                v => v.to_string(),
730            };
731            write!(f, "{} = {}", self.0, val_s)
732        }
733    }
734
735    #[derive(Debug)]
736    pub struct SemanticErrorExplanation(pub String);
737
738    use crate::reflector_config::raw_toml;
739    impl TryFrom<raw_toml::Config> for Config {
740        type Error = SemanticErrorExplanation;
741
742        fn try_from(value: raw_toml::Config) -> Result<Self, Self::Error> {
743            Ok(Self {
744                ingest: if let Some(ingest) = value.ingest {
745                    Some(ingest.try_into()?)
746                } else {
747                    None
748                },
749                mutation: if let Some(mutation) = value.mutation {
750                    Some(mutation.try_into()?)
751                } else {
752                    None
753                },
754                plugins: if let Some(plugins) = value.plugins {
755                    Some(plugins.try_into()?)
756                } else {
757                    None
758                },
759                metadata: value.metadata,
760            })
761        }
762    }
763
764    impl TryFrom<raw_toml::TopLevelIngest> for TopLevelIngest {
765        type Error = SemanticErrorExplanation;
766
767        fn try_from(value: raw_toml::TopLevelIngest) -> Result<Self, Self::Error> {
768            Ok(Self {
769                protocol_parent_url: if let Some(u) = value.protocol_parent_url {
770                    Some(url::Url::from_str(&u).map_err(|parse_err| {
771                        SemanticErrorExplanation(format!(
772                            "ingest.protocol-parent-url could not be parsed. {parse_err}"
773                        ))
774                    })?)
775                } else {
776                    None
777                },
778                protocol_child_port: value.protocol_child_port,
779                timeline_attributes: value.timeline_attributes.try_into()?,
780                allow_insecure_tls: value.allow_insecure_tls,
781                max_write_batch_staleness: value
782                    .max_write_batch_staleness_millis
783                    .map(Duration::from_millis),
784                rollover_trackers: value
785                    .rollover_trackers
786                    .into_iter()
787                    .map(IngestRolloverTracker::try_from)
788                    .collect::<Result<Vec<_>, SemanticErrorExplanation>>()?,
789            })
790        }
791    }
792    impl TryFrom<raw_toml::TimelineAttributes> for TimelineAttributes {
793        type Error = SemanticErrorExplanation;
794
795        fn try_from(value: raw_toml::TimelineAttributes) -> Result<Self, Self::Error> {
796            Ok(Self {
797                additional_timeline_attributes: value
798                    .additional_timeline_attributes
799                    .into_iter()
800                    .map(AttrKeyEqValuePair::try_from)
801                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
802                    .map_err(|e| {
803                        SemanticErrorExplanation(format!(
804                            "Error in additional-timeline-attributes member. {e}"
805                        ))
806                    })?,
807                override_timeline_attributes: value
808                    .override_timeline_attributes
809                    .into_iter()
810                    .map(AttrKeyEqValuePair::try_from)
811                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
812                    .map_err(|e| {
813                        SemanticErrorExplanation(format!(
814                            "Error in override-timeline-attributes member. {e}"
815                        ))
816                    })?,
817            })
818        }
819    }
820    impl TryFrom<raw_toml::MutatorAttributes> for MutatorAttributes {
821        type Error = SemanticErrorExplanation;
822
823        fn try_from(value: raw_toml::MutatorAttributes) -> Result<Self, Self::Error> {
824            Ok(Self {
825                additional_mutator_attributes: value
826                    .additional_mutator_attributes
827                    .into_iter()
828                    .map(AttrKeyEqValuePair::try_from)
829                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
830                    .map_err(|e| {
831                        SemanticErrorExplanation(format!(
832                            "Error in additional-mutator-attributes member. {e}"
833                        ))
834                    })?,
835                override_mutator_attributes: value
836                    .override_mutator_attributes
837                    .into_iter()
838                    .map(AttrKeyEqValuePair::try_from)
839                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
840                    .map_err(|e| {
841                        SemanticErrorExplanation(format!(
842                            "Error in override-mutator-attributes member. {e}"
843                        ))
844                    })?,
845            })
846        }
847    }
848
849    impl TryFrom<raw_toml::TopLevelMutation> for TopLevelMutation {
850        type Error = SemanticErrorExplanation;
851
852        fn try_from(value: raw_toml::TopLevelMutation) -> Result<Self, Self::Error> {
853            Ok(Self {
854                protocol_parent_url: if let Some(u) = value.protocol_parent_url {
855                    Some(url::Url::from_str(&u).map_err(|parse_err| SemanticErrorExplanation(format!("mutation.protocol-parent-url could not be parsed. {parse_err}")))?)
856                } else {
857                    None
858                },
859                allow_insecure_tls: value.allow_insecure_tls,
860                protocol_child_port: value.protocol_child_port,
861                mutator_http_api_port: value.mutator_http_api_port,
862                mutator_attributes: value.mutator_attributes.try_into()?,
863                external_mutator_urls: value.external_mutator_urls.into_iter().map(|v| url::Url::from_str(&v).map_err(|parse_err|SemanticErrorExplanation(format!("mutation.external-mutator-urls member {v} could not be parsed. {parse_err}")))).collect::<Result<Vec<url::Url>, SemanticErrorExplanation>>()?,
864            })
865        }
866    }
867    impl TryFrom<raw_toml::TopLevelPlugins> for TopLevelPlugins {
868        type Error = SemanticErrorExplanation;
869
870        fn try_from(value: raw_toml::TopLevelPlugins) -> Result<Self, Self::Error> {
871            Ok(Self {
872                available_ports: if let Some(v) = value.available_ports {
873                    Some(v.try_into()?)
874                } else {
875                    None
876                },
877                plugins_dir: value.plugins_dir,
878                ingest: if let Some(v) = value.ingest {
879                    Some(v.try_into()?)
880                } else {
881                    None
882                },
883                mutation: if let Some(v) = value.mutation {
884                    Some(v.try_into()?)
885                } else {
886                    None
887                },
888            })
889        }
890    }
891
892    impl TryFrom<raw_toml::AvailablePorts> for AvailablePorts {
893        type Error = SemanticErrorExplanation;
894
895        fn try_from(value: raw_toml::AvailablePorts) -> Result<Self, Self::Error> {
896            Ok(Self {
897                any_local: value.any_local,
898                ranges: value
899                    .ranges
900                    .into_iter()
901                    .map(|v| InclusivePortRange::new(v[0], v[1]))
902                    .collect::<Result<Vec<InclusivePortRange>, SemanticErrorExplanation>>()?,
903            })
904        }
905    }
906    impl TryFrom<raw_toml::PluginsIngest> for PluginsIngest {
907        type Error = SemanticErrorExplanation;
908
909        fn try_from(value: raw_toml::PluginsIngest) -> Result<Self, Self::Error> {
910            Ok(
911                Self {
912                    collectors:
913                        value
914                            .collectors
915                            .into_iter()
916                            .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
917                            .collect::<Result<
918                                BTreeMap<String, PluginsIngestMember>,
919                                SemanticErrorExplanation,
920                            >>()?,
921                    importers:
922                        value
923                            .importers
924                            .into_iter()
925                            .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
926                            .collect::<Result<
927                                BTreeMap<String, PluginsIngestMember>,
928                                SemanticErrorExplanation,
929                            >>()?,
930                },
931            )
932        }
933    }
934    impl TryFrom<raw_toml::PluginsIngestMember> for PluginsIngestMember {
935        type Error = SemanticErrorExplanation;
936
937        fn try_from(value: raw_toml::PluginsIngestMember) -> Result<Self, Self::Error> {
938            Ok(Self {
939                plugin: value.plugin,
940                timeline_attributes: value.timeline_attributes.try_into()?,
941                shutdown: value.shutdown.into(),
942                restart: value.restart,
943                metadata: value.metadata,
944            })
945        }
946    }
947    impl TryFrom<raw_toml::PluginsMutation> for PluginsMutation {
948        type Error = SemanticErrorExplanation;
949
950        fn try_from(value: raw_toml::PluginsMutation) -> Result<Self, Self::Error> {
951            Ok(
952                Self {
953                    mutators:
954                        value
955                            .mutators
956                            .into_iter()
957                            .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
958                            .collect::<Result<
959                                BTreeMap<String, PluginsMutationMember>,
960                                SemanticErrorExplanation,
961                            >>()?,
962                },
963            )
964        }
965    }
966    impl TryFrom<raw_toml::PluginsMutationMember> for PluginsMutationMember {
967        type Error = SemanticErrorExplanation;
968
969        fn try_from(value: raw_toml::PluginsMutationMember) -> Result<Self, Self::Error> {
970            Ok(Self {
971                plugin: value.plugin,
972                mutator_attributes: value.mutator_attributes.try_into()?,
973                shutdown: value.shutdown.into(),
974                restart: value.restart,
975                metadata: value.metadata,
976            })
977        }
978    }
979
980    impl From<raw_toml::PluginShutdown> for PluginShutdown {
981        fn from(value: raw_toml::PluginShutdown) -> Self {
982            Self {
983                shutdown_signal: value.shutdown_signal,
984                shutdown_timeout: value.shutdown_timeout_millis.map(Duration::from_millis),
985            }
986        }
987    }
988
989    impl TryFrom<raw_toml::IngestRolloverTracker> for IngestRolloverTracker {
990        type Error = SemanticErrorExplanation;
991
992        fn try_from(value: raw_toml::IngestRolloverTracker) -> Result<Self, Self::Error> {
993            Ok(Self {
994                timeout: value.timeout_millis.map(Duration::from_millis),
995                sender: value.sender.map(TryInto::try_into).transpose()?,
996                receivers: value
997                    .receivers
998                    .into_iter()
999                    .map(TryInto::try_into)
1000                    .collect::<Result<Vec<_>, SemanticErrorExplanation>>()?,
1001            })
1002        }
1003    }
1004
1005    impl TryFrom<raw_toml::RolloverTrackerParticipant> for RolloverTrackerParticipant {
1006        type Error = SemanticErrorExplanation;
1007
1008        fn try_from(value: raw_toml::RolloverTrackerParticipant) -> Result<Self, Self::Error> {
1009            Ok(Self {
1010                timeline_attributes: value
1011                    .timeline_attributes
1012                    .into_iter()
1013                    .map(AttrKeyEqValuePair::try_from)
1014                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
1015                    .map_err(|e| {
1016                        SemanticErrorExplanation(format!(
1017                            "Error in rollover-tracker member timeline-attributes. {e}"
1018                        ))
1019                    })?,
1020                event_name: value.event_name,
1021                event_attribute_key: value.event_attribute_key,
1022            })
1023        }
1024    }
1025
1026    impl Config {
1027        pub fn is_empty(&self) -> bool {
1028            self.ingest.is_none()
1029                && self.mutation.is_none()
1030                && self.plugins.is_none()
1031                && self.metadata.is_empty()
1032        }
1033    }
1034
1035    impl PluginsIngest {
1036        pub fn find_collector_member_by_plugin_name<S: AsRef<str>>(
1037            &self,
1038            plugin_name: S,
1039        ) -> Option<&PluginsIngestMember> {
1040            find_member_by_plugin_name(&self.collectors, plugin_name)
1041        }
1042
1043        pub fn find_importer_member_by_plugin_name<S: AsRef<str>>(
1044            &self,
1045            plugin_name: S,
1046        ) -> Option<&PluginsIngestMember> {
1047            find_member_by_plugin_name(&self.importers, plugin_name)
1048        }
1049    }
1050
1051    impl PluginsMutation {
1052        pub fn find_mutator_member_by_plugin_name<S: AsRef<str>>(
1053            &self,
1054            plugin_name: S,
1055        ) -> Option<&PluginsMutationMember> {
1056            find_member_by_plugin_name(&self.mutators, plugin_name)
1057        }
1058    }
1059
1060    pub(crate) fn find_member_by_plugin_name<T: PluginMemberExt, N: AsRef<str>>(
1061        members: &BTreeMap<String, T>,
1062        plugin_name: N,
1063    ) -> Option<&T> {
1064        members.iter().find_map(|(k, m)| {
1065            if member_matches_plugin_name(plugin_name.as_ref(), k, m.plugin()) {
1066                Some(m)
1067            } else {
1068                None
1069            }
1070        })
1071    }
1072
1073    pub(crate) fn member_matches_plugin_name<N: AsRef<str>, K: AsRef<str>, P: AsRef<str>>(
1074        plugin_name: N,
1075        member_key: K,
1076        member_plugin: Option<P>,
1077    ) -> bool {
1078        if member_key.as_ref() == plugin_name.as_ref() {
1079            // Exact match on the key
1080            true
1081        } else if member_plugin
1082            .as_ref()
1083            .map(|p| p.as_ref() == plugin_name.as_ref())
1084            .unwrap_or(false)
1085        {
1086            // Exact match on the explicit plugin field
1087            true
1088        } else if member_key.as_ref().contains(plugin_name.as_ref()) {
1089            // Matched on the key (i.e. look for 'socketcan' in 'my-socketcan-entry')
1090            true
1091        } else if member_plugin
1092            .as_ref()
1093            .map(|p| p.as_ref().contains(plugin_name.as_ref()))
1094            .unwrap_or(false)
1095        {
1096            // Matched on the explicit plugin field (i.e. look for 'socketcan' in 'modality-socketcan-collector')
1097            true
1098        } else {
1099            false
1100        }
1101    }
1102
1103    pub(crate) trait PluginMemberExt {
1104        fn plugin(&self) -> Option<&str>;
1105    }
1106
1107    impl PluginMemberExt for PluginsIngestMember {
1108        fn plugin(&self) -> Option<&str> {
1109            self.plugin.as_deref()
1110        }
1111    }
1112
1113    impl PluginMemberExt for PluginsMutationMember {
1114        fn plugin(&self) -> Option<&str> {
1115            self.plugin.as_deref()
1116        }
1117    }
1118
1119    #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
1120    pub enum EnvSubError {
1121        #[error("The environment variable '{0}' contains invalid unicode")]
1122        EnvVarNotUnicode(String),
1123
1124        #[error("The environment variable '{0}' is not set and no default value is specified")]
1125        EnvVarNotPresent(String),
1126    }
1127
1128    /// Substitute the values of environment variables.
1129    /// Supports the following substitution style expressions:
1130    /// * `${NAME}`
1131    /// * `${NAME-default}`
1132    /// * `${NAME:-default}`
1133    pub fn envsub(input: &str) -> Result<String, EnvSubError> {
1134        lazy_static! {
1135            // Matches the following patterns with named capture groups:
1136            // * '${NAME}' : var = 'NAME'
1137            // * '${NAME-default}' : var = 'NAME', def = 'default'
1138            // * '${NAME:-default}' : var = 'NAME', def = 'default'
1139            static ref ENVSUB_RE: Regex =
1140                Regex::new(r"\$\{(?P<var>[a-zA-Z_][a-zA-Z0-9_]*)(:?-(?P<def>.*?))?\}")
1141                    .expect("Could not construct envsub Regex");
1142        }
1143
1144        replace_all(&ENVSUB_RE, input, |caps: &Captures| {
1145            // SAFETY: the regex requires a match for capture group 'var'
1146            let env_var = &caps["var"];
1147            match env::var(env_var) {
1148                Ok(env_val_val) => Ok(env_val_val),
1149                Err(env::VarError::NotUnicode(_)) => {
1150                    Err(EnvSubError::EnvVarNotUnicode(env_var.to_owned()))
1151                }
1152                Err(env::VarError::NotPresent) => {
1153                    // Use the default value if one was provided
1154                    if let Some(def) = caps.name("def") {
1155                        Ok(def.as_str().to_string())
1156                    } else {
1157                        Err(EnvSubError::EnvVarNotPresent(env_var.to_owned()))
1158                    }
1159                }
1160            }
1161        })
1162    }
1163
1164    // This is essentially a fallible version of Regex::replace_all
1165    fn replace_all(
1166        re: &Regex,
1167        input: &str,
1168        replacement: impl Fn(&Captures) -> Result<String, EnvSubError>,
1169    ) -> Result<String, EnvSubError> {
1170        let mut new = String::with_capacity(input.len());
1171        let mut last_match = 0;
1172        for caps in re.captures_iter(input) {
1173            let m = caps.get(0).unwrap();
1174            new.push_str(&input[last_match..m.start()]);
1175            new.push_str(&replacement(&caps)?);
1176            last_match = m.end();
1177        }
1178        new.push_str(&input[last_match..]);
1179        Ok(new)
1180    }
1181}
1182
1183#[derive(Debug, Error)]
1184pub enum ConfigWriteError {
1185    #[error("TOML serialization error.")]
1186    Toml(#[from] toml::ser::Error),
1187
1188    #[error("IO error")]
1189    Io(#[from] std::io::Error),
1190}
1191
1192#[derive(Debug, Error)]
1193pub enum ConfigLoadError {
1194    #[error("Error in config file {} relating to TOML parsing. {error}", .path.display())]
1195    ConfigFileToml {
1196        path: PathBuf,
1197        #[source]
1198        error: toml::de::Error,
1199    },
1200    #[allow(unused)]
1201    #[error("Error in config content relating to TOML parsing. {error}")]
1202    ConfigToml {
1203        #[source]
1204        error: toml::de::Error,
1205    },
1206
1207    #[error("IO Error")]
1208    Io(#[from] std::io::Error),
1209
1210    #[error("Error in config content relating to semantics. {explanation}")]
1211    DefinitionSemantics { explanation: String },
1212}
1213
1214pub fn try_from_file(path: &Path) -> Result<refined::Config, ConfigLoadError> {
1215    let content = &std::fs::read_to_string(path)?;
1216    let partial: raw_toml::Config =
1217        toml::from_str(content).map_err(|e| ConfigLoadError::ConfigFileToml {
1218            path: path.to_owned(),
1219            error: e,
1220        })?;
1221    let r: Result<refined::Config, SemanticErrorExplanation> = partial.try_into();
1222    r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
1223        explanation: semantics.0,
1224    })
1225}
1226
1227pub fn try_from_str(content: &str) -> Result<refined::Config, ConfigLoadError> {
1228    let partial: raw_toml::Config =
1229        toml::from_str(content).map_err(|e| ConfigLoadError::ConfigToml { error: e })?;
1230    let r: Result<refined::Config, SemanticErrorExplanation> = partial.try_into();
1231    r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
1232        explanation: semantics.0,
1233    })
1234}
1235
1236pub fn try_to_file(config: &refined::Config, path: &Path) -> Result<(), ConfigWriteError> {
1237    let content = try_to_string(config)?;
1238    std::fs::write(path, content)?;
1239    Ok(())
1240}
1241
1242pub fn try_to_string(config: &refined::Config) -> Result<String, ConfigWriteError> {
1243    let raw: raw_toml::Config = config.clone().into();
1244    // Slightly unexpected detour through toml::Value to work around some
1245    // of the toml crate's touchy handling of the order of serialization of
1246    // fields.
1247    let toml_value = toml::Value::try_from(raw)?;
1248    let content = toml::to_string_pretty(&toml_value)?;
1249    Ok(content)
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use super::*;
1255    use crate::api::AttrKey;
1256
1257    /// Note that this toml example is not nearly as compact as it could be
1258    /// with shorthand choices that will still parse equivalently.
1259    /// The current shape is meant to appease the toml pretty-printer for
1260    /// round-trip completeness testing.
1261    const FULLY_FILLED_IN_TOML: &str = r#"[ingest]
1262additional-timeline-attributes = [
1263    'a = 1',
1264    'b = "foo"',
1265]
1266override-timeline-attributes = ['c = true']
1267protocol-child-port = 9079
1268protocol-parent-url = 'modality-ingest://auxon.io:9077'
1269
1270[[ingest.rollover-trackers]]
1271timeout-millis = 1000
1272
1273[[ingest.rollover-trackers.receivers]]
1274event-attribute-key = 'event.seqnum'
1275event-name = 'rx'
1276timeline-attributes = ['timeline.name = "B"']
1277
1278[[ingest.rollover-trackers.receivers]]
1279event-attribute-key = 'event.seqnum'
1280event-name = 'rx'
1281timeline-attributes = ['timeline.name = "C"']
1282
1283[ingest.rollover-trackers.sender]
1284event-attribute-key = 'event.seqnum'
1285event-name = 'tx'
1286timeline-attributes = ['timeline.name = "A"']
1287
1288[metadata]
1289bag = 42
1290grab = 24
1291
1292[mutation]
1293additional-mutator-attributes = [
1294    'd = 100',
1295    'e = "oof"',
1296]
1297external-mutator-urls = ['http://some-other-process.com:8080/']
1298mutator-http-api-port = 9059
1299override-mutator-attributes = ['f = false']
1300protocol-child-port = 9080
1301protocol-parent-url = 'modality-ingest://localhost:9078'
1302
1303[plugins]
1304plugins-dir = 'path/to/custom/plugins/dir'
1305
1306[plugins.available-ports]
1307any-local = false
1308ranges = [
1309    [
1310    9081,
1311    9097,
1312],
1313    [
1314    10123,
1315    10123,
1316],
1317]
1318[plugins.ingest.collectors.foobar]
1319plugin = 'modality-socketcan-collector'
1320
1321[plugins.ingest.collectors.foobar.metadata]
1322all-the-custom = false
1323
1324[plugins.ingest.collectors.lttng-live]
1325additional-timeline-attributes = [
1326    'a = 2',
1327    'r = 3',
1328]
1329override-timeline-attributes = [
1330    'c = false',
1331    'q = 99',
1332]
1333restart = true
1334shutdown-signal = 'SIGINT'
1335shutdown-timeout-millis = 1000
1336
1337[plugins.ingest.collectors.lttng-live.metadata]
1338all-the-custom = true
1339bag = 41
1340[plugins.ingest.collectors.my-dlt-cfg.metadata]
1341foo = 10
1342[plugins.ingest.importers.csv-yolo]
1343additional-timeline-attributes = ['s = 4']
1344override-timeline-attributes = ['t = "five"']
1345
1346[plugins.ingest.importers.csv-yolo.metadata]
1347other-custom = 'yup'
1348[plugins.mutation.mutators.linux-network]
1349additional-mutator-attributes = ['u = "six"']
1350override-mutator-attributes = ['v = 7']
1351
1352[plugins.mutation.mutators.linux-network.metadata]
1353moar-custom = [
1354    'ynot',
1355    'structured',
1356    2,
1357]
1358"#;
1359
1360    #[test]
1361    fn raw_representation_round_trip() {
1362        let raw: raw_toml::Config = toml::from_str(FULLY_FILLED_IN_TOML).unwrap();
1363        let back_out = raw_toml::try_raw_to_string_pretty(&raw).unwrap();
1364        assert_eq!(FULLY_FILLED_IN_TOML, back_out.as_str());
1365    }
1366
1367    #[test]
1368    fn refined_representation_round_trip() {
1369        let refined: refined::Config = try_from_str(FULLY_FILLED_IN_TOML).unwrap();
1370        let back_out = try_to_string(&refined).unwrap();
1371        let refined_prime: refined::Config = try_from_str(&back_out).unwrap();
1372        assert_eq!(refined, refined_prime);
1373        assert_eq!(FULLY_FILLED_IN_TOML, back_out.as_str());
1374    }
1375
1376    #[test]
1377    fn everything_is_optional() {
1378        let empty = "";
1379        let refined: refined::Config = try_from_str(empty).unwrap();
1380        let back_out = try_to_string(&refined).unwrap();
1381        let refined_prime: refined::Config = try_from_str(&back_out).unwrap();
1382        assert_eq!(refined, refined_prime);
1383        assert_eq!(empty, back_out.as_str());
1384    }
1385
1386    #[test]
1387    fn attr_kv_envsub_defaults() {
1388        let toml = r#"
1389[ingest]
1390additional-timeline-attributes = [
1391    '${NOT_SET_KEY:-foo} = ${NOT_SET_VAL-1}',
1392    '${NOT_SET_KEY-bar} = "${NOT_SET_VAL:-foo}"',
1393    '${NOT_SET_KEY-abc} = ${NOT_SET_VAL:-true}',
1394]"#;
1395        let cfg: refined::Config = try_from_str(toml).unwrap();
1396        let attrs = cfg
1397            .ingest
1398            .map(|i| i.timeline_attributes.additional_timeline_attributes)
1399            .unwrap();
1400        assert_eq!(
1401            attrs,
1402            vec![
1403                AttrKeyEqValuePair(AttrKey::new("foo".to_string()), 1_i64.into()),
1404                AttrKeyEqValuePair(AttrKey::new("bar".to_string()), "foo".into()),
1405                AttrKeyEqValuePair(AttrKey::new("abc".to_string()), true.into()),
1406            ]
1407        );
1408    }
1409
1410    #[test]
1411    fn attr_kv_envsub() {
1412        let toml = r#"
1413[ingest]
1414additional-timeline-attributes = [
1415    '${CARGO_PKG_NAME} = "${CARGO_PKG_VERSION}"',
1416    'int_key = ${CARGO_PKG_VERSION_MINOR}',
1417]"#;
1418        let cfg: refined::Config = try_from_str(toml).unwrap();
1419        let attrs = cfg
1420            .ingest
1421            .map(|i| i.timeline_attributes.additional_timeline_attributes)
1422            .unwrap();
1423        assert_eq!(
1424            attrs,
1425            vec![
1426                AttrKeyEqValuePair(
1427                    AttrKey::new(env!("CARGO_PKG_NAME").to_string()),
1428                    env!("CARGO_PKG_VERSION").into()
1429                ),
1430                AttrKeyEqValuePair(
1431                    AttrKey::new("int_key".to_string()),
1432                    env!("CARGO_PKG_VERSION_MINOR")
1433                        .parse::<i64>()
1434                        .unwrap()
1435                        .into()
1436                ),
1437            ]
1438        );
1439    }
1440
1441    #[test]
1442    fn attr_kv_envsub_errors() {
1443        let toml = r#"
1444[ingest]
1445additional-timeline-attributes = [
1446    '${NOT_SET_KEY} = 1',
1447]"#;
1448        match try_from_str(toml).unwrap_err() {
1449            ConfigLoadError::DefinitionSemantics { explanation } => {
1450                assert_eq!(explanation, "Error in additional-timeline-attributes member. The environment variable 'NOT_SET_KEY' is not set and no default value is specified".to_string())
1451            }
1452            _ => panic!(),
1453        }
1454    }
1455
1456    #[test]
1457    fn config_member_lookups() {
1458        let cfg: refined::Config = try_from_str(FULLY_FILLED_IN_TOML).unwrap();
1459        let ingest = cfg
1460            .plugins
1461            .as_ref()
1462            .and_then(|c| c.ingest.as_ref())
1463            .unwrap();
1464        let mutation = cfg
1465            .plugins
1466            .as_ref()
1467            .and_then(|c| c.mutation.as_ref())
1468            .unwrap();
1469        assert!(ingest
1470            .find_collector_member_by_plugin_name("lttng-live")
1471            .is_some());
1472        assert!(ingest
1473            .find_collector_member_by_plugin_name("socketcan")
1474            .is_some());
1475        assert!(ingest.find_collector_member_by_plugin_name("dlt").is_some());
1476        assert!(ingest
1477            .find_importer_member_by_plugin_name("csv-yolo")
1478            .is_some());
1479        assert!(mutation
1480            .find_mutator_member_by_plugin_name("linux-network")
1481            .is_some());
1482    }
1483}