auxon_sdk/plugin_utils/
config.rs

1//! A one-stop shop for reflector plugin configuration.  Automatically
2//! handles configuration from a reflector config file (both running
3//! standalone and as a managed child of the reflector) and
4//! environment variables, via the
5//! [envy](https://docs.rs/envy/latest/envy/) crate.
6//!
7//! All environment variable settings take precedence over config file
8//! settings.
9//!
10//! Standard enviornment variable overrides are automatically
11//! processed as well:
12//!
13//! * `MODALITY_REFLECTOR_CONFIG` indicates the path to a toml
14//!   formatted reflector config file, which is read if given.
15//!
16//! * `MODALITY_AUTH_TOKEN` sets the authentication token to use
17//!   for the backend connection. If not given, it is read from
18//!   the user profile directory
19//!
20//! * `MODALITY_CLIENT_TIMEOUT` Backend connection timeout, in
21//!   seconds. Defaults to 1 second if not given.
22//!
23//! * `MODALITY_RUN_ID` is attached as the `timeline.run_id` attribute
24//!   to all timelines; a uuid is generated if not given.
25//!
26//! * `MODALITY_TIME_DOMAIN` is attached as the `timeline.time_domain`
27//!   attribtue to all timelines, if given.
28//!
29//! * `MODALITY_INGEST_URL`: The modality-ingest connection url
30//!   where the client will try to connect. If not given, falls back to a url formed from
31//!   `MODALITY_HOST`, or else `modality-ingest://localhost`.
32//!
33//! * `MODALITY_HOST`: The name or ip of the host where modality is
34//!   running. `MODALITY_INGEST_URL` takes precedence over
35//!   this. Defaults to `localhost`. This will connect to localhost
36//!   via plaintext, but use TLS connections and ports when connecting
37//!   to any other host.
38//!
39//! * `ADDITIONAL_TIMELINE_ATTRIBUTES`: A
40//!   comma-separated list of attr=value pairs, which will be attached
41//!   to all timelines.
42//!
43//! * `OVERRIDE_TIMELINE_ATTRIBUTES`: A
44//!   comma-separated list of attr=value pairs, which will be attached
45//!   to all timelines, overriding any other attributes with the same
46//!   names.
47//!
48//! # Example
49//! ```no_run
50//! use auxon_sdk::{init_tracing, api::TimelineId, plugin_utils::config::Config};
51//! use serde::{Serialize, Deserialize};
52//!
53//! #[derive(Serialize, Deserialize, Clone, Debug, Default)]
54//! pub struct MyConfig {
55//!     // This can be set with the MY_PLUGIN_SETTING environment variable
56//!     pub setting: Option<u32>,
57//! }
58//!
59//! #[tokio::main]
60//! async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
61//!   init_tracing!();
62//!   let cfg = Config::<MyConfig>::load("MY_PLUGIN_")?;
63//!   let mut client = cfg.connect_and_authenticate_ingest().await?;
64//!
65//!   let timeline_id = TimelineId::allocate();
66//!   client.switch_timeline(timeline_id).await?;
67//!   client.send_timeline_attrs("tl", [
68//!     ("attr1", 42.into()),
69//!     ("attr2", "hello".into())
70//!   ]).await?;
71//!
72//!   client.send_event("ev", 0, [
73//!     ("attr1", 42.into()),
74//!     ("attr2", "hello".into())
75//!   ]).await?;
76//!
77//!   Ok(())
78//! }
79//! ```
80
81use crate::{
82    auth_token::AuthToken,
83    ingest_client::IngestClient,
84    reflector_config::{
85        AttrKeyEqValuePair, ConfigLoadError, SemanticErrorExplanation, TomlValue, TopLevelIngest,
86        TopLevelMutation, CONFIG_ENV_VAR,
87    },
88};
89use serde::{de::DeserializeOwned, Deserialize, Serialize};
90use std::{
91    collections::BTreeMap,
92    env,
93    path::{Path, PathBuf},
94    str::FromStr as _,
95    time::Duration,
96};
97use url::Url;
98
99/// Plugin configuration structure; contains both common elements, and
100/// plugin-specific elements, based on the type param `T`.
101pub struct Config<T> {
102    /// Common ingest configuration; mostly connection-related.
103    pub ingest: TopLevelIngest,
104
105    /// Common mutation configuration; mostly connection-related.
106    pub mutation: TopLevelMutation,
107
108    /// The plugin-specific portion of the configuration.
109    pub plugin: T,
110
111    /// The client connection timeout. This is automatically used when
112    /// you call [Config::connect_and_authenticate_ingest].
113    pub client_timeout: Option<Duration>,
114
115    /// `timeline.run_id` will be set to this value for all created
116    /// timelines.
117    pub run_id: String,
118
119    /// If `Some(...)`, `timeline.time_domain` will be set to this value
120    /// for all created timelines.
121    pub time_domain: Option<String>,
122}
123
124#[derive(Deserialize)]
125struct EnvConfig {
126    // MODALITY_CLIENT_TIMEOUT Environment variable
127    modality_client_timeout: Option<f32>,
128
129    // MODALITY_RUN_ID Environment variable
130    modality_run_id: Option<String>,
131
132    // MODALITY_TIME_DOMAIN Environment variable
133    modality_time_domain: Option<String>,
134}
135
136impl Config<()> {
137    /// Load common config only. This is useful if you're writing a plugin
138    /// with no specific configuartion options, or if you're connecting
139    /// from a context other than a reflector plugin.
140    pub fn load_common() -> Result<Config<()>, Box<dyn std::error::Error + Send + Sync>> {
141        Self::load_custom("__NONE__", |_, _| Ok(None))
142    }
143}
144
145impl<T: Serialize + DeserializeOwned> Config<T> {
146    /// Load configuration from config file given in
147    /// `MODALITY_REFLECTOR_CONFIG` as well as from other environment
148    /// variables (see module documentation). The returned [Config]
149    /// structure represents the fully reconcicled configuration.
150    ///
151    /// * `env_prefix`: The prefix used for environment variable based
152    ///   settings for members of the configuration struct (type
153    ///   param `T`).
154    pub fn load(env_prefix: &str) -> Result<Config<T>, Box<dyn std::error::Error + Send + Sync>> {
155        Self::load_custom(env_prefix, |_, _| Ok(None))
156    }
157
158    /// Load configuration, like [Config::load], but allows passing a
159    /// `map_env_val` hook.  This can be used to implement
160    /// non-standard environment deserialization, for value types
161    /// which aren't correctly handled by the [envy](https://docs.rs/envy/latest/envy/) crate.
162    ///
163    /// * `map_env_val`: A function which will be called for every
164    ///   environment variable. If it returns `Ok(Some((key, toml_value)))`,
165    ///   a corresponding entry will be created in the
166    ///   `metadata` toml table, which is then deserialized to the
167    ///   custom config structure (type param `T`). This intermediate
168    ///   form is used as a basis for merging values from the config
169    ///   file and from the environment. Since this function returns
170    ///   environment-provided values, they take precedence over the
171    ///   config file.
172    ///
173    ///   For example:
174    ///   ```
175    ///   fn custom_map_val(env_key: &str, env_val: &str) -> Result<Option<(String, toml::Value)>, Box<dyn std::error::Error + Send + Sync>> {
176    ///     // look for MY_PLUGIN_PREFIX_STRONGLY_ENCRYPTED_PASSWORD env var
177    ///     if env_key == "STRONGLY_ENCRYPTED_PASSWORD" {
178    ///       Ok(Some(("password".to_string(), toml::Value::String(env_val.to_owned()))))
179    ///     } else {
180    ///       // All other env vars use default deserialization
181    ///       Ok(None)
182    ///     }
183    ///   }
184    ///   ```
185    pub fn load_custom(
186        env_prefix: &str,
187        map_env_val: impl Fn(
188            &str,
189            &str,
190        ) -> Result<
191            Option<(String, TomlValue)>,
192            Box<dyn std::error::Error + Send + Sync>,
193        >,
194    ) -> Result<Config<T>, Box<dyn std::error::Error + Send + Sync>> {
195        let mut cfg = None;
196
197        // load from MODALITY_REFLECTOR_CONFIG
198        if let Ok(env_path) = env::var(CONFIG_ENV_VAR) {
199            let path = Path::new(&env_path);
200
201            // Look at the file content to determine which section should be used.
202            let content = &std::fs::read_to_string(path)?;
203            let mut raw_toml: crate::reflector_config::raw_toml::Config =
204                toml::from_str(content).map_err(|e| ConfigLoadError::ConfigFileToml {
205                    path: path.to_owned(),
206                    error: e,
207                })?;
208
209            // The 'metadata' entry is set up by the reflector on behalf of whatever plugin it's running,
210            // so prefer it if it's present.
211            if raw_toml.metadata.is_empty() {
212                // if not, find the right plugin section and copy it to the top level
213                copy_relevant_plugin_section_to_top_level_metadata(&mut raw_toml)?;
214            }
215
216            let r: Result<crate::reflector_config::Config, SemanticErrorExplanation> =
217                raw_toml.try_into();
218            cfg = Some(r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
219                explanation: semantics.0,
220            })?);
221        }
222
223        let cfg = cfg.unwrap_or_default();
224
225        let mut ingest = cfg.ingest.clone().unwrap_or_default();
226        override_ingest_config_from_env(&mut ingest)?;
227
228        let mut mutation = cfg.mutation.clone().unwrap_or_default();
229        override_mutation_config_from_env(&mut mutation)?;
230
231        let env_config = envy::from_env::<EnvConfig>()?;
232
233        // Load plugin-specific config from the 'metdata' entry
234        let mut plugin_toml = cfg.metadata.clone();
235        merge_plugin_config_from_env::<T>(env_prefix, map_env_val, &mut plugin_toml)?;
236
237        // deserialize from merged toml values to the actual struct
238        let plugin: T = TomlValue::Table(plugin_toml.into_iter().collect()).try_into()?;
239
240        // syntheisze a uuid runid if none was given
241        let run_id = env_config
242            .modality_run_id
243            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
244
245        let client_timeout = env_config
246            .modality_client_timeout
247            .map(Duration::from_secs_f32);
248
249        Ok(Config {
250            ingest,
251            mutation,
252            plugin,
253            client_timeout,
254            run_id,
255            time_domain: env_config.modality_time_domain,
256        })
257    }
258
259    #[deprecated = "Prefer the more explicit 'connect_and_authenticate_ingest'"]
260    pub async fn connect_and_authenticate(
261        &self,
262    ) -> Result<super::ingest::Client, Box<dyn std::error::Error + Send + Sync>> {
263        self.connect_and_authenticate_ingest().await
264    }
265
266    /// Connect to the configured Modality backend for ingest,
267    /// authenticate, and return a high-level ingest client.
268    pub async fn connect_and_authenticate_ingest(
269        &self,
270    ) -> Result<super::ingest::Client, Box<dyn std::error::Error + Send + Sync>> {
271        let protocol_parent_url = if let Some(url) = &self.ingest.protocol_parent_url {
272            url.clone()
273        } else {
274            Url::parse("modality-ingest://127.0.0.1")?
275        };
276
277        // load from MODALITY_AUTH_TOKEN or from the user profile
278        let auth_token = AuthToken::load()?;
279
280        let client = IngestClient::connect_with_timeout(
281            &protocol_parent_url,
282            self.ingest.allow_insecure_tls,
283            self.client_timeout
284                .unwrap_or_else(|| Duration::from_secs(1)),
285        )
286        .await?
287        .authenticate(auth_token.into())
288        .await?;
289
290        Ok(super::ingest::Client::new(
291            client,
292            self.ingest.timeline_attributes.clone(),
293            Some(self.run_id.clone()),
294            self.time_domain.clone(),
295        )
296        .await?)
297    }
298
299    /// Connect to the configured Modality backend for mutation,
300    /// authenticate, and return a connected MutatorHost.
301    ///
302    /// This also creates an ingest connection, which is used
303    /// internally to log mutation-related events.
304    #[cfg(feature = "deviant")]
305    pub async fn connect_and_authenticate_mutation(
306        &self,
307    ) -> Result<super::mutation::MutatorHost, Box<dyn std::error::Error + Send + Sync>> {
308        let ingest = self.connect_and_authenticate_ingest().await?;
309
310        // TODO handle top level mutator_attributes configuration
311        let protocol_parent_url = if let Some(url) = &self.mutation.protocol_parent_url {
312            url.clone()
313        } else {
314            Url::parse("modality-mutation://127.0.0.1")?
315        };
316
317        // load from MODALITY_AUTH_TOKEN or from the user profile
318        let auth_token = AuthToken::load()?;
319
320        let client = super::mutation::MutatorHost::connect_and_authenticate(
321            &protocol_parent_url,
322            self.mutation.allow_insecure_tls,
323            auth_token,
324            Some(ingest),
325        )
326        .await?;
327
328        Ok(client)
329    }
330}
331
332/// We don't have a 'metadata' section, so we might be dealing with a reflector-style config file. Here we pull the confiruation from
333/// one of the 'plugin.*' sections based on sniffing the executable name, and put that data in the 'metadata' section, so the caller
334/// can find it all in that one place.
335fn copy_relevant_plugin_section_to_top_level_metadata(
336    raw_toml: &mut crate::reflector_config::raw_toml::Config,
337) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
338    if let Some(plugins) = &raw_toml.plugins {
339        let file_stem = AliasablePluginFileStem::for_current_process()?;
340
341        if let Some(ingest) = &plugins.ingest {
342            let plugins_ingest_member = if file_stem.looks_like_collector() {
343                ingest
344                    .find_collector_member_by_plugin_name(file_stem.as_str()) // toml section: plugins.ingest.collectors.<full bin filename>
345                    .or_else(|| ingest.find_collector_member_by_plugin_name(file_stem.alias()))
346            // toml section: plugins.ingest.collectors.<stem filename>
347            } else if file_stem.looks_like_importer() {
348                ingest
349                    .find_importer_member_by_plugin_name(file_stem.as_str()) // toml section: plugins.ingest.importers.<full bin filename>
350                    .or_else(|| ingest.find_importer_member_by_plugin_name(file_stem.alias()))
351            // toml section: plugins.ingest.importers.<stem filename>
352            } else {
353                None
354            };
355
356            if let Some(pim) = plugins_ingest_member {
357                // If we identified a named toml entry, merge it in to the top level as 'metadata'.
358                raw_toml.metadata = pim.metadata.clone();
359
360                if raw_toml.ingest.is_none() {
361                    raw_toml.ingest = Some(Default::default());
362                }
363                raw_toml.ingest.as_mut().unwrap().timeline_attributes =
364                    pim.timeline_attributes.clone();
365            }
366        } else if let Some(mutation) = plugins.mutation.as_ref() {
367            let mutations_ingest_member = if file_stem.looks_like_mutator() {
368                mutation
369                    .find_mutator_member_by_plugin_name(file_stem.as_str()) // toml section: plugins.mutation.mutators.<full bin filename>
370                    .or_else(|| mutation.find_mutator_member_by_plugin_name(file_stem.alias()))
371            // toml section: plugins.mutation.mutators.<stem filename>
372            } else {
373                None
374            };
375
376            if let Some(mim) = mutations_ingest_member {
377                // If we identified a named toml entry, merge it in to the top level as 'metadata'.
378                raw_toml.metadata = mim.metadata.clone();
379
380                if raw_toml.ingest.is_none() {
381                    raw_toml.ingest = Some(Default::default());
382                }
383            }
384        }
385    }
386
387    Ok(())
388}
389
390/// Merge plugin-specific configuration values from environment
391/// variables into the plugin_toml table from the config file (could
392/// be an empty table, if no config file was given).
393fn merge_plugin_config_from_env<T: Serialize + DeserializeOwned>(
394    env_prefix: &str,
395    map_env_val: impl Fn(
396        &str,
397        &str,
398    ) -> Result<
399        Option<(String, TomlValue)>,
400        Box<dyn std::error::Error + Send + Sync>,
401    >,
402    plugin_toml: &mut BTreeMap<String, TomlValue>,
403) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
404    let mut auto_vars = vec![];
405    for (k, v) in env::vars() {
406        let Some(k) = k.strip_prefix(env_prefix) else {
407            continue;
408        };
409
410        if let Some((k, toml_val)) = map_env_val(k, &v)? {
411            plugin_toml.insert(k.to_string(), toml_val);
412            continue;
413        } else {
414            auto_vars.push((k.to_string(), v));
415        }
416    }
417
418    let env_config = envy::from_iter::<_, T>(auto_vars.into_iter())?;
419    let env_config_as_toml_str = toml::to_string(&env_config)?;
420    let env_config_as_toml: BTreeMap<String, TomlValue> = toml::from_str(&env_config_as_toml_str)?;
421
422    plugin_toml.extend(env_config_as_toml);
423    Ok(())
424}
425
426#[derive(Deserialize)]
427struct IngestEnvOverrides {
428    // MODALITY_ingest_URL environment variable
429    modality_ingest_url: Option<Url>,
430
431    // MODALITY_HOST environment variable
432    modality_host: Option<String>,
433
434    // MODALITY_ALLOW_INSECURE_TLS environment variable
435    modality_allow_insecure_tls: Option<bool>,
436
437    // MODALITY_REFLECTOR_PROTOCOL_CHILD_PORT environment variable
438    ingest_protocol_child_port: Option<u16>,
439
440    // ADDITIONAL_TIMELINE_ATTRIBUTES environment variable
441    additional_timeline_attributes: Option<Vec<String>>,
442
443    // OVERRIDE_TIMELINE_ATTRIBUTES environment variable
444    override_timeline_attributes: Option<Vec<String>>,
445}
446
447fn override_ingest_config_from_env(
448    ingest: &mut TopLevelIngest,
449) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
450    let ingest_env_overrides = envy::from_env::<IngestEnvOverrides>()?;
451    if let Some(u) = ingest_env_overrides.modality_ingest_url {
452        ingest.protocol_parent_url = Some(u);
453    } else if ingest.protocol_parent_url.is_none() {
454        if let Some(host) = ingest_env_overrides.modality_host {
455            let scheme = if host == "localhost" {
456                "modality-ingest"
457            } else {
458                "modality-ingest-tls"
459            };
460            ingest.protocol_parent_url =
461                Some(url::Url::parse(&format!("{scheme}://{host}")).map_err(|e| e.to_string())?);
462        }
463    }
464    if let Some(b) = ingest_env_overrides.modality_allow_insecure_tls {
465        ingest.allow_insecure_tls = b;
466    }
467    if let Some(p) = ingest_env_overrides.ingest_protocol_child_port {
468        ingest.protocol_child_port = Some(p);
469    }
470
471    if let Some(strs) = ingest_env_overrides.additional_timeline_attributes {
472        for s in strs {
473            let kvp = AttrKeyEqValuePair::from_str(&s)?;
474            ingest
475                .timeline_attributes
476                .additional_timeline_attributes
477                .push(kvp);
478        }
479    }
480
481    if let Some(strs) = ingest_env_overrides.override_timeline_attributes {
482        for s in strs {
483            let kvp = AttrKeyEqValuePair::from_str(&s)?;
484            ingest
485                .timeline_attributes
486                .override_timeline_attributes
487                .push(kvp);
488        }
489    }
490
491    Ok(())
492}
493
494#[derive(Deserialize)]
495struct MutationEnvOverrides {
496    // MODALITY_INGEST_URL environment variable
497    modality_ingest_url: Option<Url>,
498
499    // MODALITY_MUTATION_URL environment variable
500    modality_mutation_url: Option<Url>,
501
502    // MODALITY_HOST environment variable
503    modality_host: Option<String>,
504
505    // MODALITY_ALLOW_INSECURE_TLS environment variable
506    modality_allow_insecure_tls: Option<bool>,
507
508    // ADDITIONAL_TIMELINE_ATTRIBUTES environment variable
509    additional_mutator_attributes: Option<Vec<String>>,
510}
511
512fn override_mutation_config_from_env(
513    mutation: &mut TopLevelMutation,
514) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
515    let mutation_env_overrides = envy::from_env::<MutationEnvOverrides>()?;
516    if let Some(u) = mutation_env_overrides.modality_mutation_url {
517        mutation.protocol_parent_url = Some(u);
518    } else if let Some(u) = mutation_env_overrides.modality_ingest_url {
519        let scheme = if u.scheme() == "modality-ingest-tls" {
520            "modality-mutation-tls"
521        } else {
522            "modality-mutation"
523        };
524        let host = u
525            .host()
526            .ok_or_else(|| "Ingest url must have a host component".to_string())?;
527        mutation.protocol_parent_url =
528            Some(url::Url::parse(&format!("{scheme}://{host}")).map_err(|e| e.to_string())?);
529    } else if mutation.protocol_parent_url.is_none() {
530        if let Some(host) = mutation_env_overrides.modality_host {
531            let scheme = if host == "localhost" {
532                "modality-mutation"
533            } else {
534                "modality-mutation-tls"
535            };
536            mutation.protocol_parent_url =
537                Some(url::Url::parse(&format!("{scheme}://{host}")).map_err(|e| e.to_string())?);
538        }
539    }
540    if let Some(b) = mutation_env_overrides.modality_allow_insecure_tls {
541        mutation.allow_insecure_tls = b;
542    }
543
544    if let Some(strs) = mutation_env_overrides.additional_mutator_attributes {
545        for s in strs {
546            let kvp = AttrKeyEqValuePair::from_str(&s)?;
547            mutation
548                .mutator_attributes
549                .additional_mutator_attributes
550                .push(kvp);
551        }
552    }
553
554    Ok(())
555}
556
557/// Plugin file stem wrapper to allow aliasing (i.e. modality-foo-importer can be refered to with
558/// the alias foo).
559/// Supports our three plugin kind postfixes in a few different variants:
560///   - collector\[s\]
561///   - importer\[s\] | import
562///   - mutator\[s\]
563///
564/// Also supports no postfix at all, since kind is implied by the plugin directory it lives in.
565#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
566struct AliasablePluginFileStem {
567    filename: String,
568    path: PathBuf,
569}
570
571impl AliasablePluginFileStem {
572    #[cfg(not(test))]
573    pub fn for_current_process() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
574        Self::for_path(std::env::current_exe()?)
575    }
576
577    #[cfg(test)]
578    pub fn for_current_process() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
579        if let Ok(path) = std::env::var("TEST_CURRENT_EXE_PATH") {
580            Self::for_path(path)
581        } else {
582            Self::for_path(std::env::current_exe()?)
583        }
584    }
585
586    pub fn for_path(p: impl AsRef<Path>) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
587        let path = p.as_ref().to_owned();
588        let filename = path
589            .file_name()
590            .ok_or("Plugin does not refer to a file")?
591            .to_string_lossy()
592            .to_string();
593        Ok(Self { path, filename })
594    }
595
596    pub fn alias(&self) -> &str {
597        self.filename
598            .trim_start_matches("modality-")
599            .trim_end_matches("-import")
600            .trim_end_matches("-importer")
601            .trim_end_matches("-importers")
602            .trim_end_matches("-collector")
603            .trim_end_matches("-collectors")
604            .trim_end_matches("-mutator")
605            .trim_end_matches("-mutators")
606    }
607
608    pub fn looks_like_importer(&self) -> bool {
609        self.filename.ends_with("-import")
610            || self.filename.ends_with("-importer")
611            || self.filename.ends_with("-importers")
612            || self
613                .path
614                .parent()
615                .and_then(|p| p.components().last())
616                .map(|c| c.as_os_str() == "importers")
617                .unwrap_or(false)
618    }
619
620    pub fn looks_like_collector(&self) -> bool {
621        self.filename.ends_with("-collector")
622            || self.filename.ends_with("-collectors")
623            || self
624                .path
625                .parent()
626                .and_then(|p| p.components().last())
627                .map(|c| c.as_os_str() == "collectors")
628                .unwrap_or(false)
629    }
630
631    #[allow(unused)]
632    pub fn looks_like_mutator(&self) -> bool {
633        self.filename.ends_with("-mutator")
634            || self.filename.ends_with("-mutators")
635            || self
636                .path
637                .parent()
638                .and_then(|p| p.components().last())
639                .map(|c| c.as_os_str() == "mutators")
640                .unwrap_or(false)
641    }
642
643    pub fn as_str(&self) -> &str {
644        self.filename.as_ref()
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651    use crate::api::{AttrKey, AttrVal};
652    use std::io::Write;
653
654    fn apfs(p: impl AsRef<Path>) -> AliasablePluginFileStem {
655        AliasablePluginFileStem::for_path(p).unwrap()
656    }
657
658    #[track_caller]
659    fn check_alias(path: &str, expected: &str) {
660        assert_eq!(expected, apfs(path).alias());
661    }
662
663    #[test]
664    fn plugin_alias() {
665        check_alias("/modality-foo", "foo");
666        check_alias("/dir/modality-foo", "foo");
667        check_alias("/dir/foo-import", "foo");
668        check_alias("/dir/foo-importer", "foo");
669        check_alias("/dir/foo-importers", "foo");
670        check_alias("/dir/foo-collector", "foo");
671        check_alias("/dir/foo-collectors", "foo");
672        check_alias("/dir/foo-mutator", "foo");
673        check_alias("/dir/foo-mutators", "foo");
674        check_alias("/dir/foo", "foo");
675    }
676
677    #[test]
678    fn type_heuristics() {
679        assert!(apfs("/dir/foo-import").looks_like_importer());
680        assert!(apfs("/dir/foo-importer").looks_like_importer());
681        assert!(apfs("/dir/foo-importers").looks_like_importer());
682        assert!(apfs("/dir/importers/foo").looks_like_importer());
683        assert!(!apfs("/dir/collectors/foo").looks_like_importer());
684        assert!(!apfs("/dir/mutators/foo").looks_like_importer());
685        assert!(!apfs("/dir/foo-collector").looks_like_importer());
686        assert!(!apfs("/dir/foo-mutator").looks_like_importer());
687
688        assert!(apfs("/dir/foo-collector").looks_like_collector());
689        assert!(apfs("/dir/foo-collectors").looks_like_collector());
690        assert!(apfs("/dir/collectors/foo").looks_like_collector());
691        assert!(!apfs("/dir/foo").looks_like_collector());
692        assert!(!apfs("/dir/foo-importer").looks_like_collector());
693        assert!(!apfs("/dir/foo-mutator").looks_like_collector());
694        assert!(!apfs("/dir/importers/foo").looks_like_collector());
695        assert!(!apfs("/dir/mutators/foo").looks_like_collector());
696
697        assert!(apfs("/dir/foo-mutator").looks_like_mutator());
698        assert!(apfs("/dir/foo-mutators").looks_like_mutator());
699        assert!(apfs("/dir/mutators/foo").looks_like_mutator());
700        assert!(!apfs("/dir/foo").looks_like_mutator());
701        assert!(!apfs("/dir/foo-collector").looks_like_mutator());
702        assert!(!apfs("/dir/foo-importer").looks_like_mutator());
703        assert!(!apfs("/dir/collectors/foo").looks_like_mutator());
704        assert!(!apfs("/dir/importers/foo").looks_like_mutator());
705    }
706
707    #[derive(Serialize, Deserialize)]
708    struct CustomConfig {
709        val: Option<u32>,
710    }
711
712    fn clear_relevant_env_vars() {
713        env::remove_var("MODALITY_REFLECTOR_CONFIG");
714        env::remove_var("MODALITY_CLIENT_TIMEOUT");
715        env::remove_var("MODALITY_RUN_ID");
716        env::remove_var("MODALITY_HOST");
717        env::remove_var("MODALITY_INGEST_URL");
718        env::remove_var("MODALITY_MUTATION_URL");
719        env::remove_var("ADDITIONAL_TIMELINE_ATTRIBUTES");
720        env::remove_var("OVERRIDE_TIMELINE_ATTRIBUTES");
721        env::remove_var("MODALITY_ALLOW_INSECURE_TLS");
722    }
723
724    #[test]
725    #[serial_test::serial]
726    fn load_config_from_env() {
727        // clear all the relevant env vars
728        env::remove_var("TEST_VAL");
729        clear_relevant_env_vars();
730
731        // With no env shenanigans going on, we should get the default config
732        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
733        assert_eq!(cfg.ingest, TopLevelIngest::default());
734        assert!(cfg.client_timeout.is_none());
735        assert!(cfg.time_domain.is_none());
736        assert!(cfg.plugin.val.is_none());
737
738        // Load custom val from the environment
739        env::set_var("TEST_VAL", "42");
740        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
741        assert_eq!(cfg.plugin.val, Some(42));
742        env::remove_var("TEST_VAL");
743
744        // Load client timeout from the environment
745        env::set_var("MODALITY_CLIENT_TIMEOUT", "42");
746        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
747        assert_eq!(cfg.client_timeout, Some(Duration::from_secs(42)));
748        env::remove_var("MODALITY_CLIENT_TIMEOUT");
749
750        // Load run id from the environment
751        env::set_var("MODALITY_RUN_ID", "42");
752        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
753        assert_eq!(cfg.run_id, "42");
754        env::remove_var("MODALITY_RUN_ID");
755
756        // Load time domain from the environment
757        env::set_var("MODALITY_TIME_DOMAIN", "42");
758        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
759        assert_eq!(cfg.time_domain.unwrap(), "42");
760        env::remove_var("MODALITY_TIME_DOMAIN");
761
762        // Load reflector protocol parent url from the environment
763        env::set_var("MODALITY_INGEST_URL", "modality-ingest://foo");
764        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
765        assert_eq!(
766            cfg.ingest.protocol_parent_url,
767            Url::parse("modality-ingest://foo").ok()
768        );
769        assert_eq!(
770            cfg.mutation.protocol_parent_url,
771            Url::parse("modality-mutation://foo").ok()
772        );
773        env::remove_var("MODALITY_INGEST_URL");
774
775        // When it's TLS, mutation url should follow
776        env::set_var("MODALITY_INGEST_URL", "modality-ingest-tls://foo");
777        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
778        assert_eq!(
779            cfg.ingest.protocol_parent_url,
780            Url::parse("modality-ingest-tls://foo").ok()
781        );
782        assert_eq!(
783            cfg.mutation.protocol_parent_url,
784            Url::parse("modality-mutation-tls://foo").ok()
785        );
786        env::remove_var("MODALITY_INGEST_URL");
787
788        // Load host from environment
789        env::set_var("MODALITY_HOST", "foo");
790        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
791        assert_eq!(
792            cfg.ingest.protocol_parent_url,
793            Url::parse("modality-ingest-tls://foo").ok()
794        );
795        assert_eq!(
796            cfg.mutation.protocol_parent_url,
797            Url::parse("modality-mutation-tls://foo").ok()
798        );
799        env::remove_var("MODALITY_HOST");
800
801        // Load host from environment
802        env::set_var("MODALITY_HOST", "localhost");
803        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
804        assert_eq!(
805            cfg.ingest.protocol_parent_url,
806            Url::parse("modality-ingest://localhost").ok()
807        );
808        assert_eq!(
809            cfg.mutation.protocol_parent_url,
810            Url::parse("modality-mutation://localhost").ok()
811        );
812        env::remove_var("MODALITY_HOST");
813
814        // reflector protocol parent url takes precedence over host
815        env::set_var("MODALITY_INGEST_URL", "modality-ingest://foo");
816        env::set_var("MODALITY_HOST", "bar");
817        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
818        assert_eq!(
819            cfg.ingest.protocol_parent_url,
820            Url::parse("modality-ingest://foo").ok()
821        );
822        env::remove_var("MODALITY_HOST");
823        env::remove_var("MODALITY_INGEST_URL");
824
825        // Load additional timeline attrs from environment
826        env::set_var("ADDITIONAL_TIMELINE_ATTRIBUTES", "foo=42,bar='yo'");
827        env::set_var("OVERRIDE_TIMELINE_ATTRIBUTES", "foo=42,bar='yo'");
828        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
829        assert_eq!(
830            cfg.ingest
831                .timeline_attributes
832                .additional_timeline_attributes,
833            vec![
834                (AttrKey::from("foo"), AttrVal::from(42)).into(),
835                (AttrKey::from("bar"), AttrVal::from("yo")).into(),
836            ]
837        );
838        assert_eq!(
839            cfg.ingest.timeline_attributes.override_timeline_attributes,
840            vec![
841                (AttrKey::from("foo"), AttrVal::from(42)).into(),
842                (AttrKey::from("bar"), AttrVal::from("yo")).into(),
843            ]
844        );
845        env::remove_var("ADDITIONAL_TIMELINE_ATTRIBUTES");
846        env::remove_var("OVERRIDE_TIMELINE_ATTRIBUTES");
847
848        // TODO test mutator section overrides
849
850        clear_relevant_env_vars();
851    }
852
853    #[test]
854    #[serial_test::serial]
855    fn load_config_from_file() {
856        clear_relevant_env_vars();
857
858        let content = "
859[ingest]
860additional-timeline-attributes = ['a = 1']
861override-timeline-attributes = ['c = true']
862protocol-parent-url = 'modality-ingest-tls://auxon.io:9077'
863allow-insecure-tls = true
864
865[mutation]
866additional-mutator-attributes = ['a = 1']
867override-mutator-attributes = ['c = true']
868protocol-parent-url = 'modality-mutation://auxon.io'
869allow-insecure-tls = true
870
871[metadata]
872val = 42
873";
874        let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
875        write!(tmpfile, "{content}").unwrap();
876
877        env::set_var("MODALITY_REFLECTOR_CONFIG", tmpfile.path());
878        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
879
880        assert_eq!(
881            cfg.ingest
882                .timeline_attributes
883                .additional_timeline_attributes,
884            vec![(AttrKey::from("a"), AttrVal::from(1)).into(),]
885        );
886        assert_eq!(
887            cfg.ingest.timeline_attributes.override_timeline_attributes,
888            vec![(AttrKey::from("c"), AttrVal::from(true)).into(),]
889        );
890        assert_eq!(
891            cfg.ingest.protocol_parent_url,
892            Url::parse("modality-ingest-tls://auxon.io:9077").ok()
893        );
894        assert!(cfg.ingest.allow_insecure_tls);
895        assert_eq!(cfg.plugin.val, Some(42));
896
897        assert_eq!(
898            cfg.mutation
899                .mutator_attributes
900                .additional_mutator_attributes,
901            vec![(AttrKey::from("a"), AttrVal::from(1)).into(),]
902        );
903        assert_eq!(
904            cfg.mutation.mutator_attributes.override_mutator_attributes,
905            vec![(AttrKey::from("c"), AttrVal::from(true)).into(),]
906        );
907        assert_eq!(
908            cfg.mutation.protocol_parent_url,
909            Url::parse("modality-mutation://auxon.io").ok()
910        );
911        assert!(cfg.mutation.allow_insecure_tls);
912        assert_eq!(cfg.plugin.val, Some(42));
913
914        env::remove_var("MODALITY_REFLECTOR_CONFIG");
915        clear_relevant_env_vars();
916    }
917
918    #[test]
919    #[serial_test::serial]
920    fn named_ingest_metadata_section_from_config_file() {
921        clear_relevant_env_vars();
922
923        let content = "
924[plugins.ingest.collectors.test.metadata]
925val = 42
926";
927        let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
928        write!(tmpfile, "{content}").unwrap();
929
930        env::set_var("TEST_CURRENT_EXE_PATH", "/dir/test-collector");
931        env::set_var("MODALITY_REFLECTOR_CONFIG", tmpfile.path());
932
933        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
934        assert_eq!(cfg.plugin.val, Some(42));
935
936        env::remove_var("MODALITY_REFLECTOR_CONFIG");
937        env::remove_var("TEST_CURRENT_EXE_PATH");
938
939        clear_relevant_env_vars();
940    }
941
942    #[test]
943    #[serial_test::serial]
944    fn env_overrides_config_file() {
945        env::remove_var("TEST_VAL");
946        clear_relevant_env_vars();
947
948        let content = "
949[ingest]
950additional-timeline-attributes = ['a = 1']
951override-timeline-attributes = ['c = true']
952protocol-parent-url = 'modality-ingest-tls://auxon.io:9077'
953allow-insecure-tls = true
954
955[mutation]
956protocol-parent-url = 'modality-mutation://auxon.io'
957allow-insecure-tls = true
958
959
960[metadata]
961val = 42
962";
963        let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
964        write!(tmpfile, "{content}").unwrap();
965        env::set_var("MODALITY_REFLECTOR_CONFIG", tmpfile.path());
966
967        // Now set environment variables to override EVERYTHING
968        env::set_var("ADDITIONAL_TIMELINE_ATTRIBUTES", "foo=42,bar='yo'");
969        env::set_var("OVERRIDE_TIMELINE_ATTRIBUTES", "foo=42,bar='yo'");
970        env::set_var("MODALITY_INGEST_URL", "modality-ingest://foo");
971        env::set_var("MODALITY_ALLOW_INSECURE_TLS", "false");
972        env::set_var("MODALITY_MUTATION_URL", "modality-mutation://foo");
973        env::set_var("TEST_VAL", "99");
974
975        let cfg = Config::<CustomConfig>::load("TEST_").unwrap();
976
977        assert_eq!(
978            cfg.ingest
979                .timeline_attributes
980                .additional_timeline_attributes,
981            vec![
982                (AttrKey::from("a"), AttrVal::from(1)).into(),
983                (AttrKey::from("foo"), AttrVal::from(42)).into(),
984                (AttrKey::from("bar"), AttrVal::from("yo")).into(),
985            ]
986        );
987        assert_eq!(
988            cfg.ingest.timeline_attributes.override_timeline_attributes,
989            vec![
990                (AttrKey::from("c"), AttrVal::from(true)).into(),
991                (AttrKey::from("foo"), AttrVal::from(42)).into(),
992                (AttrKey::from("bar"), AttrVal::from("yo")).into(),
993            ]
994        );
995        assert_eq!(
996            cfg.ingest.protocol_parent_url,
997            Url::parse("modality-ingest://foo").ok()
998        );
999        assert!(!cfg.ingest.allow_insecure_tls);
1000
1001        assert_eq!(
1002            cfg.mutation.protocol_parent_url,
1003            Url::parse("modality-mutation://foo").ok()
1004        );
1005        assert!(!cfg.mutation.allow_insecure_tls);
1006
1007        assert_eq!(cfg.plugin.val, Some(99));
1008
1009        env::remove_var("TEST_VAL");
1010        clear_relevant_env_vars();
1011    }
1012}