modality_reflector_config/
lib.rs

1#![deny(warnings, clippy::all)]
2pub mod resolve;
3
4pub use refined::*;
5use std::collections::BTreeMap;
6use std::path::{Path, PathBuf};
7use thiserror::Error;
8pub use toml::Value as TomlValue;
9
10pub const CONFIG_ENV_VAR: &str = "MODALITY_REFLECTOR_CONFIG";
11
12pub const MODALITY_STORAGE_SERVICE_PORT_DEFAULT: u16 = 14182;
13pub const MODALITY_STORAGE_SERVICE_TLS_PORT_DEFAULT: u16 = 14183;
14
15pub const MODALITY_REFLECTOR_INGEST_CONNECT_PORT_DEFAULT: u16 = 14188;
16pub const MODALITY_REFLECTOR_INGEST_CONNECT_TLS_PORT_DEFAULT: u16 = 14189;
17
18pub const MODALITY_MUTATION_CONNECT_PORT_DEFAULT: u16 = 14192;
19pub const MODALITY_MUTATION_CONNECT_TLS_PORT_DEFAULT: u16 = 14194;
20
21pub const MODALITY_REFLECTOR_MUTATION_CONNECT_PORT_DEFAULT: u16 = 14198;
22pub const MODALITY_REFLECTOR_MUTATION_CONNECT_TLS_PORT_DEFAULT: u16 = 14199;
23
24/// Private, internal, raw representation of the TOML content
25mod raw_toml {
26    use super::*;
27    use std::path::PathBuf;
28
29    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
30    #[serde(rename_all = "kebab-case", default)]
31    pub(crate) struct Config {
32        #[serde(skip_serializing_if = "Option::is_none")]
33        pub(crate) ingest: Option<TopLevelIngest>,
34        #[serde(skip_serializing_if = "Option::is_none")]
35        pub(crate) mutation: Option<TopLevelMutation>,
36        #[serde(skip_serializing_if = "Option::is_none")]
37        pub(crate) plugins: Option<TopLevelPlugins>,
38        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
39        pub(crate) metadata: BTreeMap<String, TomlValue>,
40    }
41    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
42    #[serde(rename_all = "kebab-case", default)]
43    pub(crate) struct TopLevelIngest {
44        #[serde(skip_serializing_if = "Option::is_none")]
45        pub(crate) protocol_parent_url: Option<String>,
46        #[serde(skip_serializing_if = "std::ops::Not::not")]
47        pub(crate) allow_insecure_tls: bool,
48        #[serde(skip_serializing_if = "Option::is_none")]
49        pub(crate) max_write_batch_staleness_millis: Option<u64>,
50        #[serde(skip_serializing_if = "Option::is_none")]
51        pub(crate) protocol_child_port: Option<u16>,
52        #[serde(flatten)]
53        pub(crate) timeline_attributes: TimelineAttributes,
54    }
55    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
56    #[serde(rename_all = "kebab-case", default)]
57    pub(crate) struct TopLevelMutation {
58        #[serde(skip_serializing_if = "Option::is_none")]
59        pub(crate) protocol_parent_url: Option<String>,
60        #[serde(skip_serializing_if = "std::ops::Not::not")]
61        pub(crate) allow_insecure_tls: bool,
62        #[serde(skip_serializing_if = "Option::is_none")]
63        pub(crate) protocol_child_port: Option<u16>,
64        #[serde(skip_serializing_if = "Option::is_none")]
65        pub(crate) mutator_http_api_port: Option<u16>,
66        #[serde(flatten)]
67        pub(crate) mutator_attributes: MutatorAttributes,
68        #[serde(skip_serializing_if = "Vec::is_empty")]
69        pub(crate) external_mutator_urls: Vec<String>,
70    }
71    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
72    #[serde(rename_all = "kebab-case", default)]
73    pub(crate) struct TopLevelPlugins {
74        #[serde(skip_serializing_if = "Option::is_none")]
75        pub(crate) available_ports: Option<AvailablePorts>,
76        #[serde(skip_serializing_if = "Option::is_none")]
77        pub(crate) plugins_dir: Option<PathBuf>,
78        #[serde(skip_serializing_if = "Option::is_none")]
79        pub(crate) ingest: Option<PluginsIngest>,
80        #[serde(skip_serializing_if = "Option::is_none")]
81        pub(crate) mutation: Option<PluginsMutation>,
82    }
83    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
84    #[serde(rename_all = "kebab-case", default)]
85    pub(crate) struct AvailablePorts {
86        #[serde(skip_serializing_if = "Option::is_none")]
87        pub(crate) any_local: Option<bool>,
88        #[serde(skip_serializing_if = "Vec::is_empty")]
89        pub(crate) ranges: Vec<[u16; 2]>,
90    }
91    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
92    #[serde(rename_all = "kebab-case", default)]
93    pub(crate) struct TimelineAttributes {
94        #[serde(skip_serializing_if = "Vec::is_empty")]
95        pub(crate) additional_timeline_attributes: Vec<String>,
96        #[serde(skip_serializing_if = "Vec::is_empty")]
97        pub(crate) override_timeline_attributes: Vec<String>,
98    }
99    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
100    #[serde(rename_all = "kebab-case", default)]
101    pub(crate) struct MutatorAttributes {
102        #[serde(skip_serializing_if = "Vec::is_empty")]
103        pub(crate) additional_mutator_attributes: Vec<String>,
104        #[serde(skip_serializing_if = "Vec::is_empty")]
105        pub(crate) override_mutator_attributes: Vec<String>,
106    }
107    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
108    #[serde(rename_all = "kebab-case", default)]
109    pub(crate) struct PluginsIngest {
110        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
111        pub(crate) collectors: BTreeMap<String, PluginsIngestMember>,
112        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
113        pub(crate) importers: BTreeMap<String, PluginsIngestMember>,
114    }
115    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
116    #[serde(rename_all = "kebab-case", default)]
117    pub(crate) struct PluginsIngestMember {
118        #[serde(flatten)]
119        pub(crate) timeline_attributes: TimelineAttributes,
120        #[serde(flatten)]
121        pub(crate) shutdown: PluginShutdown,
122        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
123        pub(crate) metadata: BTreeMap<String, TomlValue>,
124    }
125    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
126    #[serde(rename_all = "kebab-case", default)]
127    pub(crate) struct PluginsMutation {
128        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
129        pub(crate) mutators: BTreeMap<String, PluginsMutationMember>,
130    }
131    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
132    #[serde(rename_all = "kebab-case", default)]
133    pub(crate) struct PluginsMutationMember {
134        #[serde(flatten)]
135        pub(crate) mutator_attributes: MutatorAttributes,
136        #[serde(flatten)]
137        pub(crate) shutdown: PluginShutdown,
138        #[serde(skip_serializing_if = "BTreeMap::is_empty")]
139        pub(crate) metadata: BTreeMap<String, TomlValue>,
140    }
141    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
142    #[serde(rename_all = "kebab-case", default)]
143    pub(crate) struct PluginShutdown {
144        pub(crate) shutdown_signal: Option<String>,
145        pub(crate) shutdown_timeout_millis: Option<u64>,
146    }
147
148    #[cfg(test)]
149    pub(crate) fn try_raw_to_string_pretty(config: &Config) -> Result<String, toml::ser::Error> {
150        // Slightly unexpected detour through toml::Value to work around some
151        // of the toml crate's touchy handling of the order of serialization of
152        // fields.
153        let toml_value = toml::Value::try_from(config)?;
154        let content = toml::to_string_pretty(&toml_value)?;
155        Ok(content)
156    }
157
158    impl From<refined::Config> for Config {
159        fn from(value: refined::Config) -> Self {
160            Self {
161                ingest: value.ingest.map(Into::into),
162                mutation: value.mutation.map(Into::into),
163                plugins: value.plugins.map(Into::into),
164                metadata: value.metadata,
165            }
166        }
167    }
168
169    impl From<refined::TopLevelIngest> for TopLevelIngest {
170        fn from(value: refined::TopLevelIngest) -> Self {
171            Self {
172                protocol_parent_url: value.protocol_parent_url.map(Into::into),
173                allow_insecure_tls: value.allow_insecure_tls,
174                max_write_batch_staleness_millis: value.max_write_batch_staleness.map(|v| {
175                    let millis = v.as_millis();
176                    if millis >= u64::MAX as u128 {
177                        u64::MAX
178                    } else {
179                        millis as u64
180                    }
181                }),
182                protocol_child_port: value.protocol_child_port.map(Into::into),
183                timeline_attributes: value.timeline_attributes.into(),
184            }
185        }
186    }
187    impl From<refined::TopLevelMutation> for TopLevelMutation {
188        fn from(value: refined::TopLevelMutation) -> Self {
189            Self {
190                protocol_parent_url: value.protocol_parent_url.map(Into::into),
191                allow_insecure_tls: value.allow_insecure_tls,
192                protocol_child_port: value.protocol_child_port.map(Into::into),
193                mutator_http_api_port: value.mutator_http_api_port.map(Into::into),
194                mutator_attributes: value.mutator_attributes.into(),
195                external_mutator_urls: value
196                    .external_mutator_urls
197                    .into_iter()
198                    .map(Into::into)
199                    .collect(),
200            }
201        }
202    }
203    impl From<refined::TopLevelPlugins> for TopLevelPlugins {
204        fn from(value: refined::TopLevelPlugins) -> Self {
205            Self {
206                available_ports: value.available_ports.map(Into::into),
207                plugins_dir: value.plugins_dir,
208                ingest: value.ingest.map(Into::into),
209                mutation: value.mutation.map(Into::into),
210            }
211        }
212    }
213    impl From<refined::TimelineAttributes> for TimelineAttributes {
214        fn from(value: refined::TimelineAttributes) -> Self {
215            Self {
216                additional_timeline_attributes: value
217                    .additional_timeline_attributes
218                    .into_iter()
219                    .map(Into::into)
220                    .collect(),
221                override_timeline_attributes: value
222                    .override_timeline_attributes
223                    .into_iter()
224                    .map(Into::into)
225                    .collect(),
226            }
227        }
228    }
229    impl From<refined::MutatorAttributes> for MutatorAttributes {
230        fn from(value: refined::MutatorAttributes) -> Self {
231            Self {
232                additional_mutator_attributes: value
233                    .additional_mutator_attributes
234                    .into_iter()
235                    .map(Into::into)
236                    .collect(),
237                override_mutator_attributes: value
238                    .override_mutator_attributes
239                    .into_iter()
240                    .map(Into::into)
241                    .collect(),
242            }
243        }
244    }
245    impl From<refined::PluginsIngest> for PluginsIngest {
246        fn from(value: refined::PluginsIngest) -> Self {
247            Self {
248                collectors: value
249                    .collectors
250                    .into_iter()
251                    .map(|(k, v)| (k, v.into()))
252                    .collect(),
253                importers: value
254                    .importers
255                    .into_iter()
256                    .map(|(k, v)| (k, v.into()))
257                    .collect(),
258            }
259        }
260    }
261    impl From<refined::PluginsMutation> for PluginsMutation {
262        fn from(value: refined::PluginsMutation) -> Self {
263            Self {
264                mutators: value
265                    .mutators
266                    .into_iter()
267                    .map(|(k, v)| (k, v.into()))
268                    .collect(),
269            }
270        }
271    }
272    impl From<refined::PluginsIngestMember> for PluginsIngestMember {
273        fn from(value: refined::PluginsIngestMember) -> Self {
274            Self {
275                timeline_attributes: value.timeline_attributes.into(),
276                shutdown: value.shutdown.into(),
277                metadata: value.metadata,
278            }
279        }
280    }
281    impl From<refined::PluginsMutationMember> for PluginsMutationMember {
282        fn from(value: refined::PluginsMutationMember) -> Self {
283            Self {
284                mutator_attributes: value.mutator_attributes.into(),
285                shutdown: value.shutdown.into(),
286                metadata: value.metadata,
287            }
288        }
289    }
290
291    impl From<refined::PluginShutdown> for PluginShutdown {
292        fn from(value: refined::PluginShutdown) -> Self {
293            Self {
294                shutdown_signal: value.shutdown_signal,
295                shutdown_timeout_millis: value.shutdown_timeout.map(|v| {
296                    let millis = v.as_millis();
297                    if millis >= u64::MAX as u128 {
298                        u64::MAX
299                    } else {
300                        millis as u64
301                    }
302                }),
303            }
304        }
305    }
306
307    impl From<refined::AvailablePorts> for AvailablePorts {
308        fn from(value: refined::AvailablePorts) -> Self {
309            Self {
310                any_local: value.any_local,
311                ranges: value
312                    .ranges
313                    .into_iter()
314                    .map(|inclusive_range| [inclusive_range.start(), inclusive_range.end()])
315                    .collect(),
316            }
317        }
318    }
319}
320
321/// Public-facing, more-semantically-enriched configuration types
322mod refined {
323    use super::TomlValue;
324    use lazy_static::lazy_static;
325    pub use modality_api::types::{AttrKey, AttrVal};
326    use regex::{Captures, Regex};
327    use std::collections::BTreeMap;
328    use std::env;
329    use std::fmt;
330    use std::path::PathBuf;
331    use std::str::FromStr;
332    use std::time::Duration;
333    use url::Url;
334
335    #[derive(Debug, Clone, Default, PartialEq)]
336    pub struct Config {
337        pub ingest: Option<TopLevelIngest>,
338        pub mutation: Option<TopLevelMutation>,
339        pub plugins: Option<TopLevelPlugins>,
340        pub metadata: BTreeMap<String, TomlValue>,
341    }
342    #[derive(Debug, Clone, Default, PartialEq, Eq)]
343    pub struct TopLevelIngest {
344        pub protocol_parent_url: Option<Url>,
345        pub allow_insecure_tls: bool,
346        pub protocol_child_port: Option<u16>,
347        pub timeline_attributes: TimelineAttributes,
348        pub max_write_batch_staleness: Option<Duration>,
349    }
350    #[derive(Debug, Clone, Default, PartialEq, Eq)]
351    pub struct TopLevelMutation {
352        pub protocol_parent_url: Option<Url>,
353        pub allow_insecure_tls: bool,
354        pub protocol_child_port: Option<u16>,
355        pub mutator_http_api_port: Option<u16>,
356        pub mutator_attributes: MutatorAttributes,
357        pub external_mutator_urls: Vec<Url>,
358    }
359    #[derive(Debug, Clone, Default, PartialEq)]
360    pub struct TopLevelPlugins {
361        pub available_ports: Option<AvailablePorts>,
362        pub plugins_dir: Option<PathBuf>,
363        pub ingest: Option<PluginsIngest>,
364        pub mutation: Option<PluginsMutation>,
365    }
366    #[derive(Debug, Clone, Default, PartialEq, Eq)]
367    pub struct AvailablePorts {
368        pub any_local: Option<bool>,
369        pub ranges: Vec<InclusivePortRange>,
370    }
371
372    #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
373    pub struct InclusivePortRange {
374        start: u16,
375        end: u16,
376    }
377
378    impl InclusivePortRange {
379        pub fn new(start: u16, end: u16) -> Result<Self, SemanticErrorExplanation> {
380            if start > end {
381                Err(SemanticErrorExplanation(format!("Port range start must <= end, but provided start {start} was > provided end {end}")))
382            } else {
383                Ok(InclusivePortRange { start, end })
384            }
385        }
386        pub fn start(&self) -> u16 {
387            self.start
388        }
389        pub fn end(&self) -> u16 {
390            self.end
391        }
392        pub fn start_mut(&mut self) -> &mut u16 {
393            &mut self.start
394        }
395        pub fn end_mut(&mut self) -> &mut u16 {
396            &mut self.end
397        }
398    }
399    #[derive(Debug, Clone, Default, PartialEq, Eq)]
400    pub struct TimelineAttributes {
401        pub additional_timeline_attributes: Vec<AttrKeyEqValuePair>,
402        pub override_timeline_attributes: Vec<AttrKeyEqValuePair>,
403    }
404    #[derive(Debug, Clone, Default, PartialEq, Eq)]
405    pub struct MutatorAttributes {
406        pub additional_mutator_attributes: Vec<AttrKeyEqValuePair>,
407        pub override_mutator_attributes: Vec<AttrKeyEqValuePair>,
408    }
409
410    impl MutatorAttributes {
411        pub fn merge(
412            &mut self,
413            other: MutatorAttributes,
414        ) -> Result<(), MergeMutatorAttributesError> {
415            for AttrKeyEqValuePair(k, v) in other.additional_mutator_attributes.into_iter() {
416                if self
417                    .additional_mutator_attributes
418                    .iter()
419                    .any(|kvp| kvp.0 == k)
420                {
421                    return Err(MergeMutatorAttributesError::KeyConflict(k));
422                }
423
424                self.additional_mutator_attributes
425                    .push(AttrKeyEqValuePair(k, v));
426            }
427
428            self.override_mutator_attributes
429                .extend(other.override_mutator_attributes);
430
431            Ok(())
432        }
433    }
434
435    #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
436    pub enum MergeMutatorAttributesError {
437        #[error("Conflicting settings for mutator attribute key {0}")]
438        KeyConflict(AttrKey),
439    }
440
441    #[derive(Debug, Clone, Default, PartialEq)]
442    pub struct PluginsIngest {
443        pub collectors: BTreeMap<String, PluginsIngestMember>,
444        pub importers: BTreeMap<String, PluginsIngestMember>,
445    }
446    #[derive(Debug, Clone, Default, PartialEq)]
447    pub struct PluginsIngestMember {
448        pub timeline_attributes: TimelineAttributes,
449        pub shutdown: PluginShutdown,
450        pub metadata: BTreeMap<String, TomlValue>,
451    }
452    #[derive(Debug, Clone, Default, PartialEq)]
453    pub struct PluginsMutation {
454        pub mutators: BTreeMap<String, PluginsMutationMember>,
455    }
456    #[derive(Debug, Clone, Default, PartialEq)]
457    pub struct PluginsMutationMember {
458        pub mutator_attributes: MutatorAttributes,
459        pub shutdown: PluginShutdown,
460        pub metadata: BTreeMap<String, TomlValue>,
461    }
462    #[derive(Debug, Clone, Default, PartialEq)]
463    pub struct PluginShutdown {
464        pub shutdown_signal: Option<String>,
465        pub shutdown_timeout: Option<Duration>,
466    }
467
468    #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
469    pub enum AttrKeyValuePairParseError {
470        #[error("'{0}' is not a valid attribute key=value string.")]
471        Format(String),
472
473        #[error("The key '{0}' starts with an invalid character.")]
474        InvalidKey(String),
475
476        #[error(transparent)]
477        EnvSub(#[from] EnvSubError),
478    }
479
480    /// Parsing and representation for 'foo = "bar"' or "baz = true" or "whatever.anything = 42"
481    /// type key value attribute pairs.
482    ///
483    /// The [`AttrKeyEqValuePair::from_str`] implementation supports the following
484    /// environment variable substitution expressions:
485    /// * `${NAME}`
486    /// * `${NAME-default}`
487    /// * `${NAME:-default}`
488    #[derive(Clone, Debug, PartialEq, Eq, PartialOrd)]
489    pub struct AttrKeyEqValuePair(pub AttrKey, pub AttrVal);
490
491    impl From<(AttrKey, AttrVal)> for AttrKeyEqValuePair {
492        fn from((k, v): (AttrKey, AttrVal)) -> Self {
493            AttrKeyEqValuePair(k, v)
494        }
495    }
496
497    impl FromStr for AttrKeyEqValuePair {
498        type Err = AttrKeyValuePairParseError;
499
500        fn from_str(input: &str) -> Result<Self, Self::Err> {
501            // Do environment substitution first
502            let s = envsub(input)?;
503
504            let parts: Vec<&str> = s.trim().split('=').map(|p| p.trim()).collect();
505            if parts.len() != 2 || parts[0].is_empty() || parts[1].is_empty() {
506                return Err(AttrKeyValuePairParseError::Format(s.to_string()));
507            }
508
509            let key = parts[0];
510            let val_str = parts[1];
511
512            if key.starts_with('.') {
513                return Err(AttrKeyValuePairParseError::InvalidKey(key.to_string()));
514            }
515
516            let val: Result<_, std::convert::Infallible> = val_str.parse();
517            let val = val.unwrap();
518
519            Ok(AttrKeyEqValuePair(AttrKey::new(key.to_string()), val))
520        }
521    }
522
523    impl TryFrom<String> for AttrKeyEqValuePair {
524        type Error = AttrKeyValuePairParseError;
525
526        fn try_from(s: String) -> Result<Self, Self::Error> {
527            AttrKeyEqValuePair::from_str(&s)
528        }
529    }
530
531    impl From<AttrKeyEqValuePair> for String {
532        fn from(kv: AttrKeyEqValuePair) -> Self {
533            kv.to_string()
534        }
535    }
536
537    impl fmt::Display for AttrKeyEqValuePair {
538        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
539            // N.B. When we standardize literal notation for more variants, will have to add here
540            // or delegate to some shared serialization code
541            // TODO - more standardized escaping?
542            let val_s = match &self.1 {
543                AttrVal::String(interned_string) => {
544                    let mut s = String::new();
545                    s.push('\"');
546                    s.push_str(interned_string.as_ref());
547                    s.push('\"');
548                    s
549                }
550                AttrVal::TimelineId(timeline_id) => {
551                    let mut s = String::new();
552                    s.push('\"');
553                    s.push_str(timeline_id.to_string().as_str());
554                    s.push('\"');
555                    s
556                }
557                v => v.to_string(),
558            };
559            write!(f, "{} = {}", self.0, val_s)
560        }
561    }
562
563    #[derive(Debug)]
564    pub struct SemanticErrorExplanation(pub String);
565
566    use crate::raw_toml;
567    impl TryFrom<raw_toml::Config> for Config {
568        type Error = SemanticErrorExplanation;
569
570        fn try_from(value: raw_toml::Config) -> Result<Self, Self::Error> {
571            Ok(Self {
572                ingest: if let Some(ingest) = value.ingest {
573                    Some(ingest.try_into()?)
574                } else {
575                    None
576                },
577                mutation: if let Some(mutation) = value.mutation {
578                    Some(mutation.try_into()?)
579                } else {
580                    None
581                },
582                plugins: if let Some(plugins) = value.plugins {
583                    Some(plugins.try_into()?)
584                } else {
585                    None
586                },
587                metadata: value.metadata,
588            })
589        }
590    }
591
592    impl TryFrom<raw_toml::TopLevelIngest> for TopLevelIngest {
593        type Error = SemanticErrorExplanation;
594
595        fn try_from(value: raw_toml::TopLevelIngest) -> Result<Self, Self::Error> {
596            Ok(Self {
597                protocol_parent_url: if let Some(u) = value.protocol_parent_url {
598                    Some(url::Url::from_str(&u).map_err(|parse_err| {
599                        SemanticErrorExplanation(format!(
600                            "ingest.protocol-parent-url could not be parsed. {parse_err}"
601                        ))
602                    })?)
603                } else {
604                    None
605                },
606                protocol_child_port: value.protocol_child_port,
607                timeline_attributes: value.timeline_attributes.try_into()?,
608                allow_insecure_tls: value.allow_insecure_tls,
609                max_write_batch_staleness: value
610                    .max_write_batch_staleness_millis
611                    .map(Duration::from_millis),
612            })
613        }
614    }
615    impl TryFrom<raw_toml::TimelineAttributes> for TimelineAttributes {
616        type Error = SemanticErrorExplanation;
617
618        fn try_from(value: raw_toml::TimelineAttributes) -> Result<Self, Self::Error> {
619            Ok(Self {
620                additional_timeline_attributes: value
621                    .additional_timeline_attributes
622                    .into_iter()
623                    .map(AttrKeyEqValuePair::try_from)
624                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
625                    .map_err(|e| {
626                        SemanticErrorExplanation(format!(
627                            "Error in additional-timeline-attributes member. {e}"
628                        ))
629                    })?,
630                override_timeline_attributes: value
631                    .override_timeline_attributes
632                    .into_iter()
633                    .map(AttrKeyEqValuePair::try_from)
634                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
635                    .map_err(|e| {
636                        SemanticErrorExplanation(format!(
637                            "Error in override-timeline-attributes member. {e}"
638                        ))
639                    })?,
640            })
641        }
642    }
643    impl TryFrom<raw_toml::MutatorAttributes> for MutatorAttributes {
644        type Error = SemanticErrorExplanation;
645
646        fn try_from(value: raw_toml::MutatorAttributes) -> Result<Self, Self::Error> {
647            Ok(Self {
648                additional_mutator_attributes: value
649                    .additional_mutator_attributes
650                    .into_iter()
651                    .map(AttrKeyEqValuePair::try_from)
652                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
653                    .map_err(|e| {
654                        SemanticErrorExplanation(format!(
655                            "Error in additional-mutator-attributes member. {e}"
656                        ))
657                    })?,
658                override_mutator_attributes: value
659                    .override_mutator_attributes
660                    .into_iter()
661                    .map(AttrKeyEqValuePair::try_from)
662                    .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
663                    .map_err(|e| {
664                        SemanticErrorExplanation(format!(
665                            "Error in override-mutator-attributes member. {e}"
666                        ))
667                    })?,
668            })
669        }
670    }
671
672    impl TryFrom<raw_toml::TopLevelMutation> for TopLevelMutation {
673        type Error = SemanticErrorExplanation;
674
675        fn try_from(value: raw_toml::TopLevelMutation) -> Result<Self, Self::Error> {
676            Ok(Self {
677                protocol_parent_url: if let Some(u) = value.protocol_parent_url {
678                    Some(url::Url::from_str(&u).map_err(|parse_err| SemanticErrorExplanation(format!("mutation.protocol-parent-url could not be parsed. {parse_err}")))?)
679                } else {
680                    None
681                },
682                allow_insecure_tls: value.allow_insecure_tls,
683                protocol_child_port: value.protocol_child_port,
684                mutator_http_api_port: value.mutator_http_api_port,
685                mutator_attributes: value.mutator_attributes.try_into()?,
686                external_mutator_urls: value.external_mutator_urls.into_iter().map(|v| url::Url::from_str(&v).map_err(|parse_err|SemanticErrorExplanation(format!("mutation.external-mutator-urls member {v} could not be parsed. {parse_err}")))).collect::<Result<Vec<url::Url>, SemanticErrorExplanation>>()?,
687            })
688        }
689    }
690    impl TryFrom<raw_toml::TopLevelPlugins> for TopLevelPlugins {
691        type Error = SemanticErrorExplanation;
692
693        fn try_from(value: raw_toml::TopLevelPlugins) -> Result<Self, Self::Error> {
694            Ok(Self {
695                available_ports: if let Some(v) = value.available_ports {
696                    Some(v.try_into()?)
697                } else {
698                    None
699                },
700                plugins_dir: value.plugins_dir,
701                ingest: if let Some(v) = value.ingest {
702                    Some(v.try_into()?)
703                } else {
704                    None
705                },
706                mutation: if let Some(v) = value.mutation {
707                    Some(v.try_into()?)
708                } else {
709                    None
710                },
711            })
712        }
713    }
714
715    impl TryFrom<raw_toml::AvailablePorts> for AvailablePorts {
716        type Error = SemanticErrorExplanation;
717
718        fn try_from(value: raw_toml::AvailablePorts) -> Result<Self, Self::Error> {
719            Ok(Self {
720                any_local: value.any_local,
721                ranges: value
722                    .ranges
723                    .into_iter()
724                    .map(|v| InclusivePortRange::new(v[0], v[1]))
725                    .collect::<Result<Vec<InclusivePortRange>, SemanticErrorExplanation>>()?,
726            })
727        }
728    }
729    impl TryFrom<raw_toml::PluginsIngest> for PluginsIngest {
730        type Error = SemanticErrorExplanation;
731
732        fn try_from(value: raw_toml::PluginsIngest) -> Result<Self, Self::Error> {
733            Ok(
734                Self {
735                    collectors:
736                        value
737                            .collectors
738                            .into_iter()
739                            .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
740                            .collect::<Result<
741                                BTreeMap<String, PluginsIngestMember>,
742                                SemanticErrorExplanation,
743                            >>()?,
744                    importers:
745                        value
746                            .importers
747                            .into_iter()
748                            .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
749                            .collect::<Result<
750                                BTreeMap<String, PluginsIngestMember>,
751                                SemanticErrorExplanation,
752                            >>()?,
753                },
754            )
755        }
756    }
757    impl TryFrom<raw_toml::PluginsIngestMember> for PluginsIngestMember {
758        type Error = SemanticErrorExplanation;
759
760        fn try_from(value: raw_toml::PluginsIngestMember) -> Result<Self, Self::Error> {
761            Ok(Self {
762                timeline_attributes: value.timeline_attributes.try_into()?,
763                shutdown: value.shutdown.into(),
764                metadata: value.metadata,
765            })
766        }
767    }
768    impl TryFrom<raw_toml::PluginsMutation> for PluginsMutation {
769        type Error = SemanticErrorExplanation;
770
771        fn try_from(value: raw_toml::PluginsMutation) -> Result<Self, Self::Error> {
772            Ok(
773                Self {
774                    mutators:
775                        value
776                            .mutators
777                            .into_iter()
778                            .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
779                            .collect::<Result<
780                                BTreeMap<String, PluginsMutationMember>,
781                                SemanticErrorExplanation,
782                            >>()?,
783                },
784            )
785        }
786    }
787    impl TryFrom<raw_toml::PluginsMutationMember> for PluginsMutationMember {
788        type Error = SemanticErrorExplanation;
789
790        fn try_from(value: raw_toml::PluginsMutationMember) -> Result<Self, Self::Error> {
791            Ok(Self {
792                mutator_attributes: value.mutator_attributes.try_into()?,
793                shutdown: value.shutdown.into(),
794                metadata: value.metadata,
795            })
796        }
797    }
798
799    impl From<raw_toml::PluginShutdown> for PluginShutdown {
800        fn from(value: raw_toml::PluginShutdown) -> Self {
801            Self {
802                shutdown_signal: value.shutdown_signal,
803                shutdown_timeout: value.shutdown_timeout_millis.map(Duration::from_millis),
804            }
805        }
806    }
807
808    impl Config {
809        pub fn is_empty(&self) -> bool {
810            self.ingest.is_none()
811                && self.mutation.is_none()
812                && self.plugins.is_none()
813                && self.metadata.is_empty()
814        }
815    }
816
817    #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
818    pub enum EnvSubError {
819        #[error("The environment variable '{0}' contains invalid unicode")]
820        EnvVarNotUnicode(String),
821
822        #[error("The environment variable '{0}' is not set and no default value is specified")]
823        EnvVarNotPresent(String),
824    }
825
826    /// Substitute the values of environment variables.
827    /// Supports the following substitution style expressions:
828    /// * `${NAME}`
829    /// * `${NAME-default}`
830    /// * `${NAME:-default}`
831    fn envsub(input: &str) -> Result<String, EnvSubError> {
832        lazy_static! {
833            // Matches the following patterns with named capture groups:
834            // * '${NAME}' : var = 'NAME'
835            // * '${NAME-default}' : var = 'NAME', def = 'default'
836            // * '${NAME:-default}' : var = 'NAME', def = 'default'
837            static ref ENVSUB_RE: Regex =
838                Regex::new(r"\$\{(?P<var>[a-zA-Z_][a-zA-Z0-9_]*)(:?-(?P<def>.*?))?\}")
839                    .expect("Could not construct envsub Regex");
840        }
841
842        replace_all(&ENVSUB_RE, input, |caps: &Captures| {
843            // SAFETY: the regex requires a match for capture group 'var'
844            let env_var = &caps["var"];
845            match env::var(env_var) {
846                Ok(env_val_val) => Ok(env_val_val),
847                Err(env::VarError::NotUnicode(_)) => {
848                    Err(EnvSubError::EnvVarNotUnicode(env_var.to_owned()))
849                }
850                Err(env::VarError::NotPresent) => {
851                    // Use the default value if one was provided
852                    if let Some(def) = caps.name("def") {
853                        Ok(def.as_str().to_string())
854                    } else {
855                        Err(EnvSubError::EnvVarNotPresent(env_var.to_owned()))
856                    }
857                }
858            }
859        })
860    }
861
862    // This is essentially a fallible version of Regex::replace_all
863    fn replace_all(
864        re: &Regex,
865        input: &str,
866        replacement: impl Fn(&Captures) -> Result<String, EnvSubError>,
867    ) -> Result<String, EnvSubError> {
868        let mut new = String::with_capacity(input.len());
869        let mut last_match = 0;
870        for caps in re.captures_iter(input) {
871            let m = caps.get(0).unwrap();
872            new.push_str(&input[last_match..m.start()]);
873            new.push_str(&replacement(&caps)?);
874            last_match = m.end();
875        }
876        new.push_str(&input[last_match..]);
877        Ok(new)
878    }
879}
880
881#[derive(Debug, Error)]
882pub enum ConfigWriteError {
883    #[error("TOML serialization error.")]
884    Toml(#[from] toml::ser::Error),
885
886    #[error("IO error")]
887    Io(#[from] std::io::Error),
888}
889
890#[derive(Debug, Error)]
891pub enum ConfigLoadError {
892    #[error("Error in config file {} relating to TOML parsing. {error}", .path.display())]
893    ConfigFileToml {
894        path: PathBuf,
895        #[source]
896        error: toml::de::Error,
897    },
898    #[allow(unused)]
899    #[error("Error in config content relating to TOML parsing. {error}")]
900    ConfigToml {
901        #[source]
902        error: toml::de::Error,
903    },
904
905    #[error("IO Error")]
906    Io(#[from] std::io::Error),
907
908    #[error("Error in config content relating to semantics. {explanation}")]
909    DefinitionSemantics { explanation: String },
910}
911
912pub fn try_from_file(path: &Path) -> Result<refined::Config, ConfigLoadError> {
913    let content = &std::fs::read_to_string(path)?;
914    let partial: raw_toml::Config =
915        toml::from_str(content).map_err(|e| ConfigLoadError::ConfigFileToml {
916            path: path.to_owned(),
917            error: e,
918        })?;
919    let r: Result<refined::Config, SemanticErrorExplanation> = partial.try_into();
920    r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
921        explanation: semantics.0,
922    })
923}
924#[cfg(test)]
925pub fn try_from_str(content: &str) -> Result<refined::Config, ConfigLoadError> {
926    let partial: raw_toml::Config =
927        toml::from_str(content).map_err(|e| ConfigLoadError::ConfigToml { error: e })?;
928    let r: Result<refined::Config, SemanticErrorExplanation> = partial.try_into();
929    r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
930        explanation: semantics.0,
931    })
932}
933
934pub fn try_to_file(config: &refined::Config, path: &Path) -> Result<(), ConfigWriteError> {
935    let content = try_to_string(config)?;
936    std::fs::write(path, content)?;
937    Ok(())
938}
939
940pub fn try_to_string(config: &refined::Config) -> Result<String, ConfigWriteError> {
941    let raw: raw_toml::Config = config.clone().into();
942    // Slightly unexpected detour through toml::Value to work around some
943    // of the toml crate's touchy handling of the order of serialization of
944    // fields.
945    let toml_value = toml::Value::try_from(raw)?;
946    let content = toml::to_string_pretty(&toml_value)?;
947    Ok(content)
948}
949
950#[cfg(test)]
951mod tests {
952    use crate::{try_from_str, try_to_string, AttrKeyEqValuePair, ConfigLoadError};
953    use modality_api::types::AttrKey;
954
955    /// Note that this toml example is not nearly as compact as it could be
956    /// with shorthand choices that will still parse equivalently.
957    /// The current shape is meant to appease the toml pretty-printer for
958    /// round-trip completeness testing.
959    const FULLY_FILLED_IN_TOML: &str = r#"[ingest]
960additional-timeline-attributes = [
961    'a = 1',
962    'b = "foo"',
963]
964override-timeline-attributes = ['c = true']
965protocol-child-port = 9079
966protocol-parent-url = 'modality-ingest://auxon.io:9077'
967
968[metadata]
969bag = 42
970grab = 24
971
972[mutation]
973additional-mutator-attributes = [
974    'd = 100',
975    'e = "oof"',
976]
977external-mutator-urls = ['http://some-other-process.com:8080/']
978mutator-http-api-port = 9059
979override-mutator-attributes = ['f = false']
980protocol-child-port = 9080
981protocol-parent-url = 'modality-ingest://localhost:9078'
982
983[plugins]
984plugins-dir = 'path/to/custom/plugins/dir'
985
986[plugins.available-ports]
987any-local = false
988ranges = [
989    [
990    9081,
991    9097,
992],
993    [
994    10123,
995    10123,
996],
997]
998[plugins.ingest.collectors.lttng-live]
999additional-timeline-attributes = [
1000    'a = 2',
1001    'r = 3',
1002]
1003override-timeline-attributes = [
1004    'c = false',
1005    'q = 99',
1006]
1007shutdown-signal = 'SIGINT'
1008shutdown-timeout-millis = 1000
1009
1010[plugins.ingest.collectors.lttng-live.metadata]
1011all-the-custom = true
1012bag = 41
1013[plugins.ingest.importers.csv-yolo]
1014additional-timeline-attributes = ['s = 4']
1015override-timeline-attributes = ['t = "five"']
1016
1017[plugins.ingest.importers.csv-yolo.metadata]
1018other-custom = 'yup'
1019[plugins.mutation.mutators.linux-network]
1020additional-mutator-attributes = ['u = "six"']
1021override-mutator-attributes = ['v = 7']
1022
1023[plugins.mutation.mutators.linux-network.metadata]
1024moar-custom = [
1025    'ynot',
1026    'structured',
1027    2,
1028]
1029"#;
1030    #[test]
1031    fn raw_representation_round_trip() {
1032        let raw: crate::raw_toml::Config = toml::from_str(FULLY_FILLED_IN_TOML).unwrap();
1033        let back_out = crate::raw_toml::try_raw_to_string_pretty(&raw).unwrap();
1034        assert_eq!(FULLY_FILLED_IN_TOML, back_out.as_str());
1035    }
1036
1037    #[test]
1038    fn refined_representation_round_trip() {
1039        let refined: crate::refined::Config = try_from_str(FULLY_FILLED_IN_TOML).unwrap();
1040        let back_out = try_to_string(&refined).unwrap();
1041        let refined_prime: crate::refined::Config = try_from_str(&back_out).unwrap();
1042        assert_eq!(refined, refined_prime);
1043        assert_eq!(FULLY_FILLED_IN_TOML, back_out.as_str());
1044    }
1045
1046    #[test]
1047    fn everything_is_optional() {
1048        let empty = "";
1049        let refined: crate::refined::Config = try_from_str(empty).unwrap();
1050        let back_out = try_to_string(&refined).unwrap();
1051        let refined_prime: crate::refined::Config = try_from_str(&back_out).unwrap();
1052        assert_eq!(refined, refined_prime);
1053        assert_eq!(empty, back_out.as_str());
1054    }
1055
1056    #[test]
1057    fn attr_kv_envsub_defaults() {
1058        let toml = r#"
1059[ingest]
1060additional-timeline-attributes = [
1061    '${NOT_SET_KEY:-foo} = ${NOT_SET_VAL-1}',
1062    '${NOT_SET_KEY-bar} = "${NOT_SET_VAL:-foo}"',
1063    '${NOT_SET_KEY-abc} = ${NOT_SET_VAL:-true}',
1064]"#;
1065        let cfg: crate::refined::Config = try_from_str(toml).unwrap();
1066        let attrs = cfg
1067            .ingest
1068            .map(|i| i.timeline_attributes.additional_timeline_attributes)
1069            .unwrap();
1070        assert_eq!(
1071            attrs,
1072            vec![
1073                AttrKeyEqValuePair(AttrKey::new("foo".to_string()), 1_i64.into()),
1074                AttrKeyEqValuePair(AttrKey::new("bar".to_string()), "foo".into()),
1075                AttrKeyEqValuePair(AttrKey::new("abc".to_string()), true.into()),
1076            ]
1077        );
1078    }
1079
1080    #[test]
1081    fn attr_kv_envsub() {
1082        let toml = r#"
1083[ingest]
1084additional-timeline-attributes = [
1085    '${CARGO_PKG_NAME} = "${CARGO_PKG_VERSION}"',
1086    'int_key = ${CARGO_PKG_VERSION_MINOR}',
1087]"#;
1088        let cfg: crate::refined::Config = try_from_str(toml).unwrap();
1089        let attrs = cfg
1090            .ingest
1091            .map(|i| i.timeline_attributes.additional_timeline_attributes)
1092            .unwrap();
1093        assert_eq!(
1094            attrs,
1095            vec![
1096                AttrKeyEqValuePair(
1097                    AttrKey::new(env!("CARGO_PKG_NAME").to_string()),
1098                    env!("CARGO_PKG_VERSION").into()
1099                ),
1100                AttrKeyEqValuePair(
1101                    AttrKey::new("int_key".to_string()),
1102                    env!("CARGO_PKG_VERSION_MINOR")
1103                        .parse::<i64>()
1104                        .unwrap()
1105                        .into()
1106                ),
1107            ]
1108        );
1109    }
1110
1111    #[test]
1112    fn attr_kv_envsub_errors() {
1113        let toml = r#"
1114[ingest]
1115additional-timeline-attributes = [
1116    '${NOT_SET_KEY} = 1',
1117]"#;
1118        match try_from_str(toml).unwrap_err() {
1119            ConfigLoadError::DefinitionSemantics { explanation } => {
1120                assert_eq!(explanation, "Error in additional-timeline-attributes member. The environment variable 'NOT_SET_KEY' is not set and no default value is specified".to_string())
1121            }
1122            _ => panic!(),
1123        }
1124    }
1125}