1#![deny(warnings, clippy::all)]
2pub mod resolve;
3
4pub use refined::*;
5use std::collections::BTreeMap;
6use std::path::{Path, PathBuf};
7use thiserror::Error;
8pub use toml::Value as TomlValue;
9
10pub const CONFIG_ENV_VAR: &str = "MODALITY_REFLECTOR_CONFIG";
11
12pub const MODALITY_STORAGE_SERVICE_PORT_DEFAULT: u16 = 14182;
13pub const MODALITY_STORAGE_SERVICE_TLS_PORT_DEFAULT: u16 = 14183;
14
15pub const MODALITY_REFLECTOR_INGEST_CONNECT_PORT_DEFAULT: u16 = 14188;
16pub const MODALITY_REFLECTOR_INGEST_CONNECT_TLS_PORT_DEFAULT: u16 = 14189;
17
18pub const MODALITY_MUTATION_CONNECT_PORT_DEFAULT: u16 = 14192;
19pub const MODALITY_MUTATION_CONNECT_TLS_PORT_DEFAULT: u16 = 14194;
20
21pub const MODALITY_REFLECTOR_MUTATION_CONNECT_PORT_DEFAULT: u16 = 14198;
22pub const MODALITY_REFLECTOR_MUTATION_CONNECT_TLS_PORT_DEFAULT: u16 = 14199;
23
24mod raw_toml {
26 use super::*;
27 use std::path::PathBuf;
28
29 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
30 #[serde(rename_all = "kebab-case", default)]
31 pub(crate) struct Config {
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub(crate) ingest: Option<TopLevelIngest>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub(crate) mutation: Option<TopLevelMutation>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub(crate) plugins: Option<TopLevelPlugins>,
38 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
39 pub(crate) metadata: BTreeMap<String, TomlValue>,
40 }
41 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
42 #[serde(rename_all = "kebab-case", default)]
43 pub(crate) struct TopLevelIngest {
44 #[serde(skip_serializing_if = "Option::is_none")]
45 pub(crate) protocol_parent_url: Option<String>,
46 #[serde(skip_serializing_if = "std::ops::Not::not")]
47 pub(crate) allow_insecure_tls: bool,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub(crate) max_write_batch_staleness_millis: Option<u64>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub(crate) protocol_child_port: Option<u16>,
52 #[serde(flatten)]
53 pub(crate) timeline_attributes: TimelineAttributes,
54 }
55 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
56 #[serde(rename_all = "kebab-case", default)]
57 pub(crate) struct TopLevelMutation {
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub(crate) protocol_parent_url: Option<String>,
60 #[serde(skip_serializing_if = "std::ops::Not::not")]
61 pub(crate) allow_insecure_tls: bool,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub(crate) protocol_child_port: Option<u16>,
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub(crate) mutator_http_api_port: Option<u16>,
66 #[serde(flatten)]
67 pub(crate) mutator_attributes: MutatorAttributes,
68 #[serde(skip_serializing_if = "Vec::is_empty")]
69 pub(crate) external_mutator_urls: Vec<String>,
70 }
71 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
72 #[serde(rename_all = "kebab-case", default)]
73 pub(crate) struct TopLevelPlugins {
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub(crate) available_ports: Option<AvailablePorts>,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 pub(crate) plugins_dir: Option<PathBuf>,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub(crate) ingest: Option<PluginsIngest>,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub(crate) mutation: Option<PluginsMutation>,
82 }
83 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
84 #[serde(rename_all = "kebab-case", default)]
85 pub(crate) struct AvailablePorts {
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub(crate) any_local: Option<bool>,
88 #[serde(skip_serializing_if = "Vec::is_empty")]
89 pub(crate) ranges: Vec<[u16; 2]>,
90 }
91 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
92 #[serde(rename_all = "kebab-case", default)]
93 pub(crate) struct TimelineAttributes {
94 #[serde(skip_serializing_if = "Vec::is_empty")]
95 pub(crate) additional_timeline_attributes: Vec<String>,
96 #[serde(skip_serializing_if = "Vec::is_empty")]
97 pub(crate) override_timeline_attributes: Vec<String>,
98 }
99 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
100 #[serde(rename_all = "kebab-case", default)]
101 pub(crate) struct MutatorAttributes {
102 #[serde(skip_serializing_if = "Vec::is_empty")]
103 pub(crate) additional_mutator_attributes: Vec<String>,
104 #[serde(skip_serializing_if = "Vec::is_empty")]
105 pub(crate) override_mutator_attributes: Vec<String>,
106 }
107 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
108 #[serde(rename_all = "kebab-case", default)]
109 pub(crate) struct PluginsIngest {
110 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
111 pub(crate) collectors: BTreeMap<String, PluginsIngestMember>,
112 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
113 pub(crate) importers: BTreeMap<String, PluginsIngestMember>,
114 }
115 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
116 #[serde(rename_all = "kebab-case", default)]
117 pub(crate) struct PluginsIngestMember {
118 #[serde(flatten)]
119 pub(crate) timeline_attributes: TimelineAttributes,
120 #[serde(flatten)]
121 pub(crate) shutdown: PluginShutdown,
122 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
123 pub(crate) metadata: BTreeMap<String, TomlValue>,
124 }
125 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
126 #[serde(rename_all = "kebab-case", default)]
127 pub(crate) struct PluginsMutation {
128 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
129 pub(crate) mutators: BTreeMap<String, PluginsMutationMember>,
130 }
131 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
132 #[serde(rename_all = "kebab-case", default)]
133 pub(crate) struct PluginsMutationMember {
134 #[serde(flatten)]
135 pub(crate) mutator_attributes: MutatorAttributes,
136 #[serde(flatten)]
137 pub(crate) shutdown: PluginShutdown,
138 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
139 pub(crate) metadata: BTreeMap<String, TomlValue>,
140 }
141 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
142 #[serde(rename_all = "kebab-case", default)]
143 pub(crate) struct PluginShutdown {
144 pub(crate) shutdown_signal: Option<String>,
145 pub(crate) shutdown_timeout_millis: Option<u64>,
146 }
147
148 #[cfg(test)]
149 pub(crate) fn try_raw_to_string_pretty(config: &Config) -> Result<String, toml::ser::Error> {
150 let toml_value = toml::Value::try_from(config)?;
154 let content = toml::to_string_pretty(&toml_value)?;
155 Ok(content)
156 }
157
158 impl From<refined::Config> for Config {
159 fn from(value: refined::Config) -> Self {
160 Self {
161 ingest: value.ingest.map(Into::into),
162 mutation: value.mutation.map(Into::into),
163 plugins: value.plugins.map(Into::into),
164 metadata: value.metadata,
165 }
166 }
167 }
168
169 impl From<refined::TopLevelIngest> for TopLevelIngest {
170 fn from(value: refined::TopLevelIngest) -> Self {
171 Self {
172 protocol_parent_url: value.protocol_parent_url.map(Into::into),
173 allow_insecure_tls: value.allow_insecure_tls,
174 max_write_batch_staleness_millis: value.max_write_batch_staleness.map(|v| {
175 let millis = v.as_millis();
176 if millis >= u64::MAX as u128 {
177 u64::MAX
178 } else {
179 millis as u64
180 }
181 }),
182 protocol_child_port: value.protocol_child_port.map(Into::into),
183 timeline_attributes: value.timeline_attributes.into(),
184 }
185 }
186 }
187 impl From<refined::TopLevelMutation> for TopLevelMutation {
188 fn from(value: refined::TopLevelMutation) -> Self {
189 Self {
190 protocol_parent_url: value.protocol_parent_url.map(Into::into),
191 allow_insecure_tls: value.allow_insecure_tls,
192 protocol_child_port: value.protocol_child_port.map(Into::into),
193 mutator_http_api_port: value.mutator_http_api_port.map(Into::into),
194 mutator_attributes: value.mutator_attributes.into(),
195 external_mutator_urls: value
196 .external_mutator_urls
197 .into_iter()
198 .map(Into::into)
199 .collect(),
200 }
201 }
202 }
203 impl From<refined::TopLevelPlugins> for TopLevelPlugins {
204 fn from(value: refined::TopLevelPlugins) -> Self {
205 Self {
206 available_ports: value.available_ports.map(Into::into),
207 plugins_dir: value.plugins_dir,
208 ingest: value.ingest.map(Into::into),
209 mutation: value.mutation.map(Into::into),
210 }
211 }
212 }
213 impl From<refined::TimelineAttributes> for TimelineAttributes {
214 fn from(value: refined::TimelineAttributes) -> Self {
215 Self {
216 additional_timeline_attributes: value
217 .additional_timeline_attributes
218 .into_iter()
219 .map(Into::into)
220 .collect(),
221 override_timeline_attributes: value
222 .override_timeline_attributes
223 .into_iter()
224 .map(Into::into)
225 .collect(),
226 }
227 }
228 }
229 impl From<refined::MutatorAttributes> for MutatorAttributes {
230 fn from(value: refined::MutatorAttributes) -> Self {
231 Self {
232 additional_mutator_attributes: value
233 .additional_mutator_attributes
234 .into_iter()
235 .map(Into::into)
236 .collect(),
237 override_mutator_attributes: value
238 .override_mutator_attributes
239 .into_iter()
240 .map(Into::into)
241 .collect(),
242 }
243 }
244 }
245 impl From<refined::PluginsIngest> for PluginsIngest {
246 fn from(value: refined::PluginsIngest) -> Self {
247 Self {
248 collectors: value
249 .collectors
250 .into_iter()
251 .map(|(k, v)| (k, v.into()))
252 .collect(),
253 importers: value
254 .importers
255 .into_iter()
256 .map(|(k, v)| (k, v.into()))
257 .collect(),
258 }
259 }
260 }
261 impl From<refined::PluginsMutation> for PluginsMutation {
262 fn from(value: refined::PluginsMutation) -> Self {
263 Self {
264 mutators: value
265 .mutators
266 .into_iter()
267 .map(|(k, v)| (k, v.into()))
268 .collect(),
269 }
270 }
271 }
272 impl From<refined::PluginsIngestMember> for PluginsIngestMember {
273 fn from(value: refined::PluginsIngestMember) -> Self {
274 Self {
275 timeline_attributes: value.timeline_attributes.into(),
276 shutdown: value.shutdown.into(),
277 metadata: value.metadata,
278 }
279 }
280 }
281 impl From<refined::PluginsMutationMember> for PluginsMutationMember {
282 fn from(value: refined::PluginsMutationMember) -> Self {
283 Self {
284 mutator_attributes: value.mutator_attributes.into(),
285 shutdown: value.shutdown.into(),
286 metadata: value.metadata,
287 }
288 }
289 }
290
291 impl From<refined::PluginShutdown> for PluginShutdown {
292 fn from(value: refined::PluginShutdown) -> Self {
293 Self {
294 shutdown_signal: value.shutdown_signal,
295 shutdown_timeout_millis: value.shutdown_timeout.map(|v| {
296 let millis = v.as_millis();
297 if millis >= u64::MAX as u128 {
298 u64::MAX
299 } else {
300 millis as u64
301 }
302 }),
303 }
304 }
305 }
306
307 impl From<refined::AvailablePorts> for AvailablePorts {
308 fn from(value: refined::AvailablePorts) -> Self {
309 Self {
310 any_local: value.any_local,
311 ranges: value
312 .ranges
313 .into_iter()
314 .map(|inclusive_range| [inclusive_range.start(), inclusive_range.end()])
315 .collect(),
316 }
317 }
318 }
319}
320
321mod refined {
323 use super::TomlValue;
324 use lazy_static::lazy_static;
325 pub use modality_api::types::{AttrKey, AttrVal};
326 use regex::{Captures, Regex};
327 use std::collections::BTreeMap;
328 use std::env;
329 use std::fmt;
330 use std::path::PathBuf;
331 use std::str::FromStr;
332 use std::time::Duration;
333 use url::Url;
334
335 #[derive(Debug, Clone, Default, PartialEq)]
336 pub struct Config {
337 pub ingest: Option<TopLevelIngest>,
338 pub mutation: Option<TopLevelMutation>,
339 pub plugins: Option<TopLevelPlugins>,
340 pub metadata: BTreeMap<String, TomlValue>,
341 }
342 #[derive(Debug, Clone, Default, PartialEq, Eq)]
343 pub struct TopLevelIngest {
344 pub protocol_parent_url: Option<Url>,
345 pub allow_insecure_tls: bool,
346 pub protocol_child_port: Option<u16>,
347 pub timeline_attributes: TimelineAttributes,
348 pub max_write_batch_staleness: Option<Duration>,
349 }
350 #[derive(Debug, Clone, Default, PartialEq, Eq)]
351 pub struct TopLevelMutation {
352 pub protocol_parent_url: Option<Url>,
353 pub allow_insecure_tls: bool,
354 pub protocol_child_port: Option<u16>,
355 pub mutator_http_api_port: Option<u16>,
356 pub mutator_attributes: MutatorAttributes,
357 pub external_mutator_urls: Vec<Url>,
358 }
359 #[derive(Debug, Clone, Default, PartialEq)]
360 pub struct TopLevelPlugins {
361 pub available_ports: Option<AvailablePorts>,
362 pub plugins_dir: Option<PathBuf>,
363 pub ingest: Option<PluginsIngest>,
364 pub mutation: Option<PluginsMutation>,
365 }
366 #[derive(Debug, Clone, Default, PartialEq, Eq)]
367 pub struct AvailablePorts {
368 pub any_local: Option<bool>,
369 pub ranges: Vec<InclusivePortRange>,
370 }
371
372 #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
373 pub struct InclusivePortRange {
374 start: u16,
375 end: u16,
376 }
377
378 impl InclusivePortRange {
379 pub fn new(start: u16, end: u16) -> Result<Self, SemanticErrorExplanation> {
380 if start > end {
381 Err(SemanticErrorExplanation(format!("Port range start must <= end, but provided start {start} was > provided end {end}")))
382 } else {
383 Ok(InclusivePortRange { start, end })
384 }
385 }
386 pub fn start(&self) -> u16 {
387 self.start
388 }
389 pub fn end(&self) -> u16 {
390 self.end
391 }
392 pub fn start_mut(&mut self) -> &mut u16 {
393 &mut self.start
394 }
395 pub fn end_mut(&mut self) -> &mut u16 {
396 &mut self.end
397 }
398 }
399 #[derive(Debug, Clone, Default, PartialEq, Eq)]
400 pub struct TimelineAttributes {
401 pub additional_timeline_attributes: Vec<AttrKeyEqValuePair>,
402 pub override_timeline_attributes: Vec<AttrKeyEqValuePair>,
403 }
404 #[derive(Debug, Clone, Default, PartialEq, Eq)]
405 pub struct MutatorAttributes {
406 pub additional_mutator_attributes: Vec<AttrKeyEqValuePair>,
407 pub override_mutator_attributes: Vec<AttrKeyEqValuePair>,
408 }
409
410 impl MutatorAttributes {
411 pub fn merge(
412 &mut self,
413 other: MutatorAttributes,
414 ) -> Result<(), MergeMutatorAttributesError> {
415 for AttrKeyEqValuePair(k, v) in other.additional_mutator_attributes.into_iter() {
416 if self
417 .additional_mutator_attributes
418 .iter()
419 .any(|kvp| kvp.0 == k)
420 {
421 return Err(MergeMutatorAttributesError::KeyConflict(k));
422 }
423
424 self.additional_mutator_attributes
425 .push(AttrKeyEqValuePair(k, v));
426 }
427
428 self.override_mutator_attributes
429 .extend(other.override_mutator_attributes);
430
431 Ok(())
432 }
433 }
434
435 #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
436 pub enum MergeMutatorAttributesError {
437 #[error("Conflicting settings for mutator attribute key {0}")]
438 KeyConflict(AttrKey),
439 }
440
441 #[derive(Debug, Clone, Default, PartialEq)]
442 pub struct PluginsIngest {
443 pub collectors: BTreeMap<String, PluginsIngestMember>,
444 pub importers: BTreeMap<String, PluginsIngestMember>,
445 }
446 #[derive(Debug, Clone, Default, PartialEq)]
447 pub struct PluginsIngestMember {
448 pub timeline_attributes: TimelineAttributes,
449 pub shutdown: PluginShutdown,
450 pub metadata: BTreeMap<String, TomlValue>,
451 }
452 #[derive(Debug, Clone, Default, PartialEq)]
453 pub struct PluginsMutation {
454 pub mutators: BTreeMap<String, PluginsMutationMember>,
455 }
456 #[derive(Debug, Clone, Default, PartialEq)]
457 pub struct PluginsMutationMember {
458 pub mutator_attributes: MutatorAttributes,
459 pub shutdown: PluginShutdown,
460 pub metadata: BTreeMap<String, TomlValue>,
461 }
462 #[derive(Debug, Clone, Default, PartialEq)]
463 pub struct PluginShutdown {
464 pub shutdown_signal: Option<String>,
465 pub shutdown_timeout: Option<Duration>,
466 }
467
468 #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
469 pub enum AttrKeyValuePairParseError {
470 #[error("'{0}' is not a valid attribute key=value string.")]
471 Format(String),
472
473 #[error("The key '{0}' starts with an invalid character.")]
474 InvalidKey(String),
475
476 #[error(transparent)]
477 EnvSub(#[from] EnvSubError),
478 }
479
480 #[derive(Clone, Debug, PartialEq, Eq, PartialOrd)]
489 pub struct AttrKeyEqValuePair(pub AttrKey, pub AttrVal);
490
491 impl From<(AttrKey, AttrVal)> for AttrKeyEqValuePair {
492 fn from((k, v): (AttrKey, AttrVal)) -> Self {
493 AttrKeyEqValuePair(k, v)
494 }
495 }
496
497 impl FromStr for AttrKeyEqValuePair {
498 type Err = AttrKeyValuePairParseError;
499
500 fn from_str(input: &str) -> Result<Self, Self::Err> {
501 let s = envsub(input)?;
503
504 let parts: Vec<&str> = s.trim().split('=').map(|p| p.trim()).collect();
505 if parts.len() != 2 || parts[0].is_empty() || parts[1].is_empty() {
506 return Err(AttrKeyValuePairParseError::Format(s.to_string()));
507 }
508
509 let key = parts[0];
510 let val_str = parts[1];
511
512 if key.starts_with('.') {
513 return Err(AttrKeyValuePairParseError::InvalidKey(key.to_string()));
514 }
515
516 let val: Result<_, std::convert::Infallible> = val_str.parse();
517 let val = val.unwrap();
518
519 Ok(AttrKeyEqValuePair(AttrKey::new(key.to_string()), val))
520 }
521 }
522
523 impl TryFrom<String> for AttrKeyEqValuePair {
524 type Error = AttrKeyValuePairParseError;
525
526 fn try_from(s: String) -> Result<Self, Self::Error> {
527 AttrKeyEqValuePair::from_str(&s)
528 }
529 }
530
531 impl From<AttrKeyEqValuePair> for String {
532 fn from(kv: AttrKeyEqValuePair) -> Self {
533 kv.to_string()
534 }
535 }
536
537 impl fmt::Display for AttrKeyEqValuePair {
538 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
539 let val_s = match &self.1 {
543 AttrVal::String(interned_string) => {
544 let mut s = String::new();
545 s.push('\"');
546 s.push_str(interned_string.as_ref());
547 s.push('\"');
548 s
549 }
550 AttrVal::TimelineId(timeline_id) => {
551 let mut s = String::new();
552 s.push('\"');
553 s.push_str(timeline_id.to_string().as_str());
554 s.push('\"');
555 s
556 }
557 v => v.to_string(),
558 };
559 write!(f, "{} = {}", self.0, val_s)
560 }
561 }
562
563 #[derive(Debug)]
564 pub struct SemanticErrorExplanation(pub String);
565
566 use crate::raw_toml;
567 impl TryFrom<raw_toml::Config> for Config {
568 type Error = SemanticErrorExplanation;
569
570 fn try_from(value: raw_toml::Config) -> Result<Self, Self::Error> {
571 Ok(Self {
572 ingest: if let Some(ingest) = value.ingest {
573 Some(ingest.try_into()?)
574 } else {
575 None
576 },
577 mutation: if let Some(mutation) = value.mutation {
578 Some(mutation.try_into()?)
579 } else {
580 None
581 },
582 plugins: if let Some(plugins) = value.plugins {
583 Some(plugins.try_into()?)
584 } else {
585 None
586 },
587 metadata: value.metadata,
588 })
589 }
590 }
591
592 impl TryFrom<raw_toml::TopLevelIngest> for TopLevelIngest {
593 type Error = SemanticErrorExplanation;
594
595 fn try_from(value: raw_toml::TopLevelIngest) -> Result<Self, Self::Error> {
596 Ok(Self {
597 protocol_parent_url: if let Some(u) = value.protocol_parent_url {
598 Some(url::Url::from_str(&u).map_err(|parse_err| {
599 SemanticErrorExplanation(format!(
600 "ingest.protocol-parent-url could not be parsed. {parse_err}"
601 ))
602 })?)
603 } else {
604 None
605 },
606 protocol_child_port: value.protocol_child_port,
607 timeline_attributes: value.timeline_attributes.try_into()?,
608 allow_insecure_tls: value.allow_insecure_tls,
609 max_write_batch_staleness: value
610 .max_write_batch_staleness_millis
611 .map(Duration::from_millis),
612 })
613 }
614 }
615 impl TryFrom<raw_toml::TimelineAttributes> for TimelineAttributes {
616 type Error = SemanticErrorExplanation;
617
618 fn try_from(value: raw_toml::TimelineAttributes) -> Result<Self, Self::Error> {
619 Ok(Self {
620 additional_timeline_attributes: value
621 .additional_timeline_attributes
622 .into_iter()
623 .map(AttrKeyEqValuePair::try_from)
624 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
625 .map_err(|e| {
626 SemanticErrorExplanation(format!(
627 "Error in additional-timeline-attributes member. {e}"
628 ))
629 })?,
630 override_timeline_attributes: value
631 .override_timeline_attributes
632 .into_iter()
633 .map(AttrKeyEqValuePair::try_from)
634 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
635 .map_err(|e| {
636 SemanticErrorExplanation(format!(
637 "Error in override-timeline-attributes member. {e}"
638 ))
639 })?,
640 })
641 }
642 }
643 impl TryFrom<raw_toml::MutatorAttributes> for MutatorAttributes {
644 type Error = SemanticErrorExplanation;
645
646 fn try_from(value: raw_toml::MutatorAttributes) -> Result<Self, Self::Error> {
647 Ok(Self {
648 additional_mutator_attributes: value
649 .additional_mutator_attributes
650 .into_iter()
651 .map(AttrKeyEqValuePair::try_from)
652 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
653 .map_err(|e| {
654 SemanticErrorExplanation(format!(
655 "Error in additional-mutator-attributes member. {e}"
656 ))
657 })?,
658 override_mutator_attributes: value
659 .override_mutator_attributes
660 .into_iter()
661 .map(AttrKeyEqValuePair::try_from)
662 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
663 .map_err(|e| {
664 SemanticErrorExplanation(format!(
665 "Error in override-mutator-attributes member. {e}"
666 ))
667 })?,
668 })
669 }
670 }
671
672 impl TryFrom<raw_toml::TopLevelMutation> for TopLevelMutation {
673 type Error = SemanticErrorExplanation;
674
675 fn try_from(value: raw_toml::TopLevelMutation) -> Result<Self, Self::Error> {
676 Ok(Self {
677 protocol_parent_url: if let Some(u) = value.protocol_parent_url {
678 Some(url::Url::from_str(&u).map_err(|parse_err| SemanticErrorExplanation(format!("mutation.protocol-parent-url could not be parsed. {parse_err}")))?)
679 } else {
680 None
681 },
682 allow_insecure_tls: value.allow_insecure_tls,
683 protocol_child_port: value.protocol_child_port,
684 mutator_http_api_port: value.mutator_http_api_port,
685 mutator_attributes: value.mutator_attributes.try_into()?,
686 external_mutator_urls: value.external_mutator_urls.into_iter().map(|v| url::Url::from_str(&v).map_err(|parse_err|SemanticErrorExplanation(format!("mutation.external-mutator-urls member {v} could not be parsed. {parse_err}")))).collect::<Result<Vec<url::Url>, SemanticErrorExplanation>>()?,
687 })
688 }
689 }
690 impl TryFrom<raw_toml::TopLevelPlugins> for TopLevelPlugins {
691 type Error = SemanticErrorExplanation;
692
693 fn try_from(value: raw_toml::TopLevelPlugins) -> Result<Self, Self::Error> {
694 Ok(Self {
695 available_ports: if let Some(v) = value.available_ports {
696 Some(v.try_into()?)
697 } else {
698 None
699 },
700 plugins_dir: value.plugins_dir,
701 ingest: if let Some(v) = value.ingest {
702 Some(v.try_into()?)
703 } else {
704 None
705 },
706 mutation: if let Some(v) = value.mutation {
707 Some(v.try_into()?)
708 } else {
709 None
710 },
711 })
712 }
713 }
714
715 impl TryFrom<raw_toml::AvailablePorts> for AvailablePorts {
716 type Error = SemanticErrorExplanation;
717
718 fn try_from(value: raw_toml::AvailablePorts) -> Result<Self, Self::Error> {
719 Ok(Self {
720 any_local: value.any_local,
721 ranges: value
722 .ranges
723 .into_iter()
724 .map(|v| InclusivePortRange::new(v[0], v[1]))
725 .collect::<Result<Vec<InclusivePortRange>, SemanticErrorExplanation>>()?,
726 })
727 }
728 }
729 impl TryFrom<raw_toml::PluginsIngest> for PluginsIngest {
730 type Error = SemanticErrorExplanation;
731
732 fn try_from(value: raw_toml::PluginsIngest) -> Result<Self, Self::Error> {
733 Ok(
734 Self {
735 collectors:
736 value
737 .collectors
738 .into_iter()
739 .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
740 .collect::<Result<
741 BTreeMap<String, PluginsIngestMember>,
742 SemanticErrorExplanation,
743 >>()?,
744 importers:
745 value
746 .importers
747 .into_iter()
748 .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
749 .collect::<Result<
750 BTreeMap<String, PluginsIngestMember>,
751 SemanticErrorExplanation,
752 >>()?,
753 },
754 )
755 }
756 }
757 impl TryFrom<raw_toml::PluginsIngestMember> for PluginsIngestMember {
758 type Error = SemanticErrorExplanation;
759
760 fn try_from(value: raw_toml::PluginsIngestMember) -> Result<Self, Self::Error> {
761 Ok(Self {
762 timeline_attributes: value.timeline_attributes.try_into()?,
763 shutdown: value.shutdown.into(),
764 metadata: value.metadata,
765 })
766 }
767 }
768 impl TryFrom<raw_toml::PluginsMutation> for PluginsMutation {
769 type Error = SemanticErrorExplanation;
770
771 fn try_from(value: raw_toml::PluginsMutation) -> Result<Self, Self::Error> {
772 Ok(
773 Self {
774 mutators:
775 value
776 .mutators
777 .into_iter()
778 .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
779 .collect::<Result<
780 BTreeMap<String, PluginsMutationMember>,
781 SemanticErrorExplanation,
782 >>()?,
783 },
784 )
785 }
786 }
787 impl TryFrom<raw_toml::PluginsMutationMember> for PluginsMutationMember {
788 type Error = SemanticErrorExplanation;
789
790 fn try_from(value: raw_toml::PluginsMutationMember) -> Result<Self, Self::Error> {
791 Ok(Self {
792 mutator_attributes: value.mutator_attributes.try_into()?,
793 shutdown: value.shutdown.into(),
794 metadata: value.metadata,
795 })
796 }
797 }
798
799 impl From<raw_toml::PluginShutdown> for PluginShutdown {
800 fn from(value: raw_toml::PluginShutdown) -> Self {
801 Self {
802 shutdown_signal: value.shutdown_signal,
803 shutdown_timeout: value.shutdown_timeout_millis.map(Duration::from_millis),
804 }
805 }
806 }
807
808 impl Config {
809 pub fn is_empty(&self) -> bool {
810 self.ingest.is_none()
811 && self.mutation.is_none()
812 && self.plugins.is_none()
813 && self.metadata.is_empty()
814 }
815 }
816
817 #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
818 pub enum EnvSubError {
819 #[error("The environment variable '{0}' contains invalid unicode")]
820 EnvVarNotUnicode(String),
821
822 #[error("The environment variable '{0}' is not set and no default value is specified")]
823 EnvVarNotPresent(String),
824 }
825
826 fn envsub(input: &str) -> Result<String, EnvSubError> {
832 lazy_static! {
833 static ref ENVSUB_RE: Regex =
838 Regex::new(r"\$\{(?P<var>[a-zA-Z_][a-zA-Z0-9_]*)(:?-(?P<def>.*?))?\}")
839 .expect("Could not construct envsub Regex");
840 }
841
842 replace_all(&ENVSUB_RE, input, |caps: &Captures| {
843 let env_var = &caps["var"];
845 match env::var(env_var) {
846 Ok(env_val_val) => Ok(env_val_val),
847 Err(env::VarError::NotUnicode(_)) => {
848 Err(EnvSubError::EnvVarNotUnicode(env_var.to_owned()))
849 }
850 Err(env::VarError::NotPresent) => {
851 if let Some(def) = caps.name("def") {
853 Ok(def.as_str().to_string())
854 } else {
855 Err(EnvSubError::EnvVarNotPresent(env_var.to_owned()))
856 }
857 }
858 }
859 })
860 }
861
862 fn replace_all(
864 re: &Regex,
865 input: &str,
866 replacement: impl Fn(&Captures) -> Result<String, EnvSubError>,
867 ) -> Result<String, EnvSubError> {
868 let mut new = String::with_capacity(input.len());
869 let mut last_match = 0;
870 for caps in re.captures_iter(input) {
871 let m = caps.get(0).unwrap();
872 new.push_str(&input[last_match..m.start()]);
873 new.push_str(&replacement(&caps)?);
874 last_match = m.end();
875 }
876 new.push_str(&input[last_match..]);
877 Ok(new)
878 }
879}
880
881#[derive(Debug, Error)]
882pub enum ConfigWriteError {
883 #[error("TOML serialization error.")]
884 Toml(#[from] toml::ser::Error),
885
886 #[error("IO error")]
887 Io(#[from] std::io::Error),
888}
889
890#[derive(Debug, Error)]
891pub enum ConfigLoadError {
892 #[error("Error in config file {} relating to TOML parsing. {error}", .path.display())]
893 ConfigFileToml {
894 path: PathBuf,
895 #[source]
896 error: toml::de::Error,
897 },
898 #[allow(unused)]
899 #[error("Error in config content relating to TOML parsing. {error}")]
900 ConfigToml {
901 #[source]
902 error: toml::de::Error,
903 },
904
905 #[error("IO Error")]
906 Io(#[from] std::io::Error),
907
908 #[error("Error in config content relating to semantics. {explanation}")]
909 DefinitionSemantics { explanation: String },
910}
911
912pub fn try_from_file(path: &Path) -> Result<refined::Config, ConfigLoadError> {
913 let content = &std::fs::read_to_string(path)?;
914 let partial: raw_toml::Config =
915 toml::from_str(content).map_err(|e| ConfigLoadError::ConfigFileToml {
916 path: path.to_owned(),
917 error: e,
918 })?;
919 let r: Result<refined::Config, SemanticErrorExplanation> = partial.try_into();
920 r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
921 explanation: semantics.0,
922 })
923}
924#[cfg(test)]
925pub fn try_from_str(content: &str) -> Result<refined::Config, ConfigLoadError> {
926 let partial: raw_toml::Config =
927 toml::from_str(content).map_err(|e| ConfigLoadError::ConfigToml { error: e })?;
928 let r: Result<refined::Config, SemanticErrorExplanation> = partial.try_into();
929 r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
930 explanation: semantics.0,
931 })
932}
933
934pub fn try_to_file(config: &refined::Config, path: &Path) -> Result<(), ConfigWriteError> {
935 let content = try_to_string(config)?;
936 std::fs::write(path, content)?;
937 Ok(())
938}
939
940pub fn try_to_string(config: &refined::Config) -> Result<String, ConfigWriteError> {
941 let raw: raw_toml::Config = config.clone().into();
942 let toml_value = toml::Value::try_from(raw)?;
946 let content = toml::to_string_pretty(&toml_value)?;
947 Ok(content)
948}
949
950#[cfg(test)]
951mod tests {
952 use crate::{try_from_str, try_to_string, AttrKeyEqValuePair, ConfigLoadError};
953 use modality_api::types::AttrKey;
954
955 const FULLY_FILLED_IN_TOML: &str = r#"[ingest]
960additional-timeline-attributes = [
961 'a = 1',
962 'b = "foo"',
963]
964override-timeline-attributes = ['c = true']
965protocol-child-port = 9079
966protocol-parent-url = 'modality-ingest://auxon.io:9077'
967
968[metadata]
969bag = 42
970grab = 24
971
972[mutation]
973additional-mutator-attributes = [
974 'd = 100',
975 'e = "oof"',
976]
977external-mutator-urls = ['http://some-other-process.com:8080/']
978mutator-http-api-port = 9059
979override-mutator-attributes = ['f = false']
980protocol-child-port = 9080
981protocol-parent-url = 'modality-ingest://localhost:9078'
982
983[plugins]
984plugins-dir = 'path/to/custom/plugins/dir'
985
986[plugins.available-ports]
987any-local = false
988ranges = [
989 [
990 9081,
991 9097,
992],
993 [
994 10123,
995 10123,
996],
997]
998[plugins.ingest.collectors.lttng-live]
999additional-timeline-attributes = [
1000 'a = 2',
1001 'r = 3',
1002]
1003override-timeline-attributes = [
1004 'c = false',
1005 'q = 99',
1006]
1007shutdown-signal = 'SIGINT'
1008shutdown-timeout-millis = 1000
1009
1010[plugins.ingest.collectors.lttng-live.metadata]
1011all-the-custom = true
1012bag = 41
1013[plugins.ingest.importers.csv-yolo]
1014additional-timeline-attributes = ['s = 4']
1015override-timeline-attributes = ['t = "five"']
1016
1017[plugins.ingest.importers.csv-yolo.metadata]
1018other-custom = 'yup'
1019[plugins.mutation.mutators.linux-network]
1020additional-mutator-attributes = ['u = "six"']
1021override-mutator-attributes = ['v = 7']
1022
1023[plugins.mutation.mutators.linux-network.metadata]
1024moar-custom = [
1025 'ynot',
1026 'structured',
1027 2,
1028]
1029"#;
1030 #[test]
1031 fn raw_representation_round_trip() {
1032 let raw: crate::raw_toml::Config = toml::from_str(FULLY_FILLED_IN_TOML).unwrap();
1033 let back_out = crate::raw_toml::try_raw_to_string_pretty(&raw).unwrap();
1034 assert_eq!(FULLY_FILLED_IN_TOML, back_out.as_str());
1035 }
1036
1037 #[test]
1038 fn refined_representation_round_trip() {
1039 let refined: crate::refined::Config = try_from_str(FULLY_FILLED_IN_TOML).unwrap();
1040 let back_out = try_to_string(&refined).unwrap();
1041 let refined_prime: crate::refined::Config = try_from_str(&back_out).unwrap();
1042 assert_eq!(refined, refined_prime);
1043 assert_eq!(FULLY_FILLED_IN_TOML, back_out.as_str());
1044 }
1045
1046 #[test]
1047 fn everything_is_optional() {
1048 let empty = "";
1049 let refined: crate::refined::Config = try_from_str(empty).unwrap();
1050 let back_out = try_to_string(&refined).unwrap();
1051 let refined_prime: crate::refined::Config = try_from_str(&back_out).unwrap();
1052 assert_eq!(refined, refined_prime);
1053 assert_eq!(empty, back_out.as_str());
1054 }
1055
1056 #[test]
1057 fn attr_kv_envsub_defaults() {
1058 let toml = r#"
1059[ingest]
1060additional-timeline-attributes = [
1061 '${NOT_SET_KEY:-foo} = ${NOT_SET_VAL-1}',
1062 '${NOT_SET_KEY-bar} = "${NOT_SET_VAL:-foo}"',
1063 '${NOT_SET_KEY-abc} = ${NOT_SET_VAL:-true}',
1064]"#;
1065 let cfg: crate::refined::Config = try_from_str(toml).unwrap();
1066 let attrs = cfg
1067 .ingest
1068 .map(|i| i.timeline_attributes.additional_timeline_attributes)
1069 .unwrap();
1070 assert_eq!(
1071 attrs,
1072 vec![
1073 AttrKeyEqValuePair(AttrKey::new("foo".to_string()), 1_i64.into()),
1074 AttrKeyEqValuePair(AttrKey::new("bar".to_string()), "foo".into()),
1075 AttrKeyEqValuePair(AttrKey::new("abc".to_string()), true.into()),
1076 ]
1077 );
1078 }
1079
1080 #[test]
1081 fn attr_kv_envsub() {
1082 let toml = r#"
1083[ingest]
1084additional-timeline-attributes = [
1085 '${CARGO_PKG_NAME} = "${CARGO_PKG_VERSION}"',
1086 'int_key = ${CARGO_PKG_VERSION_MINOR}',
1087]"#;
1088 let cfg: crate::refined::Config = try_from_str(toml).unwrap();
1089 let attrs = cfg
1090 .ingest
1091 .map(|i| i.timeline_attributes.additional_timeline_attributes)
1092 .unwrap();
1093 assert_eq!(
1094 attrs,
1095 vec![
1096 AttrKeyEqValuePair(
1097 AttrKey::new(env!("CARGO_PKG_NAME").to_string()),
1098 env!("CARGO_PKG_VERSION").into()
1099 ),
1100 AttrKeyEqValuePair(
1101 AttrKey::new("int_key".to_string()),
1102 env!("CARGO_PKG_VERSION_MINOR")
1103 .parse::<i64>()
1104 .unwrap()
1105 .into()
1106 ),
1107 ]
1108 );
1109 }
1110
1111 #[test]
1112 fn attr_kv_envsub_errors() {
1113 let toml = r#"
1114[ingest]
1115additional-timeline-attributes = [
1116 '${NOT_SET_KEY} = 1',
1117]"#;
1118 match try_from_str(toml).unwrap_err() {
1119 ConfigLoadError::DefinitionSemantics { explanation } => {
1120 assert_eq!(explanation, "Error in additional-timeline-attributes member. The environment variable 'NOT_SET_KEY' is not set and no default value is specified".to_string())
1121 }
1122 _ => panic!(),
1123 }
1124 }
1125}