1pub mod resolve;
5
6pub use refined::*;
7use std::collections::BTreeMap;
8use std::path::{Path, PathBuf};
9use thiserror::Error;
10pub use toml::Value as TomlValue;
11
12pub const CONFIG_ENV_VAR: &str = "MODALITY_REFLECTOR_CONFIG";
13
14pub const MODALITY_STORAGE_SERVICE_PORT_DEFAULT: u16 = 14182;
15pub const MODALITY_STORAGE_SERVICE_TLS_PORT_DEFAULT: u16 = 14183;
16
17pub const MODALITY_REFLECTOR_INGEST_CONNECT_PORT_DEFAULT: u16 = 14188;
18pub const MODALITY_REFLECTOR_INGEST_CONNECT_TLS_PORT_DEFAULT: u16 = 14189;
19
20pub const MODALITY_MUTATION_CONNECT_PORT_DEFAULT: u16 = 14192;
21pub const MODALITY_MUTATION_CONNECT_TLS_PORT_DEFAULT: u16 = 14194;
22
23pub const MODALITY_REFLECTOR_MUTATION_CONNECT_PORT_DEFAULT: u16 = 14198;
24pub const MODALITY_REFLECTOR_MUTATION_CONNECT_TLS_PORT_DEFAULT: u16 = 14199;
25
26pub(crate) mod raw_toml {
28 use super::*;
29 use std::path::PathBuf;
30
31 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
32 #[serde(rename_all = "kebab-case", default)]
33 pub(crate) struct Config {
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub(crate) ingest: Option<TopLevelIngest>,
36
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub(crate) mutation: Option<TopLevelMutation>,
39
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub(crate) plugins: Option<TopLevelPlugins>,
42
43 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
44 pub(crate) metadata: BTreeMap<String, TomlValue>,
45 }
46
47 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
48 #[serde(rename_all = "kebab-case", default)]
49 pub(crate) struct TopLevelIngest {
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub(crate) protocol_parent_url: Option<String>,
52
53 #[serde(skip_serializing_if = "std::ops::Not::not")]
54 pub(crate) allow_insecure_tls: bool,
55
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub(crate) max_write_batch_staleness_millis: Option<u64>,
58
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub(crate) protocol_child_port: Option<u16>,
61
62 #[serde(flatten)]
63 pub(crate) timeline_attributes: TimelineAttributes,
64
65 #[serde(skip_serializing_if = "Vec::is_empty", alias = "rollover-tracker")]
66 pub(crate) rollover_trackers: Vec<IngestRolloverTracker>,
67 }
68
69 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
70 #[serde(rename_all = "kebab-case", default)]
71 pub(crate) struct TopLevelMutation {
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub(crate) protocol_parent_url: Option<String>,
74
75 #[serde(skip_serializing_if = "std::ops::Not::not")]
76 pub(crate) allow_insecure_tls: bool,
77
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub(crate) protocol_child_port: Option<u16>,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub(crate) mutator_http_api_port: Option<u16>,
83
84 #[serde(flatten)]
85 pub(crate) mutator_attributes: MutatorAttributes,
86
87 #[serde(skip_serializing_if = "Vec::is_empty")]
88 pub(crate) external_mutator_urls: Vec<String>,
89 }
90
91 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
92 #[serde(rename_all = "kebab-case", default)]
93 pub(crate) struct TopLevelPlugins {
94 #[serde(skip_serializing_if = "Option::is_none")]
95 pub(crate) available_ports: Option<AvailablePorts>,
96
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub(crate) plugins_dir: Option<PathBuf>,
99
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub(crate) ingest: Option<PluginsIngest>,
102
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub(crate) mutation: Option<PluginsMutation>,
105 }
106
107 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
108 #[serde(rename_all = "kebab-case", default)]
109 pub(crate) struct AvailablePorts {
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub(crate) any_local: Option<bool>,
112
113 #[serde(skip_serializing_if = "Vec::is_empty")]
114 pub(crate) ranges: Vec<[u16; 2]>,
115 }
116
117 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
118 #[serde(rename_all = "kebab-case", default)]
119 pub(crate) struct TimelineAttributes {
120 #[serde(skip_serializing_if = "Vec::is_empty")]
121 pub(crate) additional_timeline_attributes: Vec<String>,
122
123 #[serde(skip_serializing_if = "Vec::is_empty")]
124 pub(crate) override_timeline_attributes: Vec<String>,
125 }
126
127 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
128 #[serde(rename_all = "kebab-case", default)]
129 pub(crate) struct MutatorAttributes {
130 #[serde(skip_serializing_if = "Vec::is_empty")]
131 pub(crate) additional_mutator_attributes: Vec<String>,
132
133 #[serde(skip_serializing_if = "Vec::is_empty")]
134 pub(crate) override_mutator_attributes: Vec<String>,
135 }
136
137 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
138 #[serde(rename_all = "kebab-case", default)]
139 pub(crate) struct PluginsIngest {
140 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
141 pub(crate) collectors: BTreeMap<String, PluginsIngestMember>,
142
143 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
144 pub(crate) importers: BTreeMap<String, PluginsIngestMember>,
145 }
146
147 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
148 #[serde(rename_all = "kebab-case", default)]
149 pub(crate) struct PluginsIngestMember {
150 pub(crate) plugin: Option<String>,
151
152 #[serde(flatten)]
153 pub(crate) timeline_attributes: TimelineAttributes,
154
155 #[serde(flatten)]
156 pub(crate) shutdown: PluginShutdown,
157
158 #[serde(skip_serializing_if = "Option::is_none")]
159 pub(crate) restart: Option<bool>,
160
161 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
162 pub(crate) metadata: BTreeMap<String, TomlValue>,
163 }
164
165 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
166 #[serde(rename_all = "kebab-case", default)]
167 pub(crate) struct PluginsMutation {
168 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
169 pub(crate) mutators: BTreeMap<String, PluginsMutationMember>,
170 }
171
172 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
173 #[serde(rename_all = "kebab-case", default)]
174 pub(crate) struct PluginsMutationMember {
175 pub(crate) plugin: Option<String>,
176
177 #[serde(flatten)]
178 pub(crate) mutator_attributes: MutatorAttributes,
179
180 #[serde(flatten)]
181 pub(crate) shutdown: PluginShutdown,
182
183 #[serde(skip_serializing_if = "Option::is_none")]
184 pub(crate) restart: Option<bool>,
185
186 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
187 pub(crate) metadata: BTreeMap<String, TomlValue>,
188 }
189
190 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
191 #[serde(rename_all = "kebab-case", default)]
192 pub(crate) struct PluginShutdown {
193 pub(crate) shutdown_signal: Option<String>,
194 pub(crate) shutdown_timeout_millis: Option<u64>,
195 }
196
197 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
198 #[serde(rename_all = "kebab-case", default)]
199 pub(crate) struct IngestRolloverTracker {
200 pub(crate) timeout_millis: Option<u64>,
201 #[serde(skip_serializing_if = "Option::is_none")]
202 pub(crate) sender: Option<RolloverTrackerParticipant>,
203 #[serde(skip_serializing_if = "Vec::is_empty", alias = "receiver")]
204 pub(crate) receivers: Vec<RolloverTrackerParticipant>,
205 }
206
207 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
208 #[serde(rename_all = "kebab-case", default)]
209 pub(crate) struct RolloverTrackerParticipant {
210 #[serde(skip_serializing_if = "Vec::is_empty")]
211 pub(crate) timeline_attributes: Vec<String>,
212 #[serde(skip_serializing_if = "Option::is_none")]
213 pub(crate) event_name: Option<String>,
214 #[serde(skip_serializing_if = "Option::is_none")]
215 pub(crate) event_attribute_key: Option<String>,
216 }
217
218 #[cfg(test)]
219 pub(crate) fn try_raw_to_string_pretty(config: &Config) -> Result<String, toml::ser::Error> {
220 let toml_value = toml::Value::try_from(config)?;
224 let content = toml::to_string_pretty(&toml_value)?;
225 Ok(content)
226 }
227
228 impl PluginMemberExt for PluginsIngestMember {
229 fn plugin(&self) -> Option<&str> {
230 self.plugin.as_deref()
231 }
232 }
233
234 impl PluginMemberExt for PluginsMutationMember {
235 fn plugin(&self) -> Option<&str> {
236 self.plugin.as_deref()
237 }
238 }
239
240 #[cfg(feature = "modality")]
241 impl PluginsIngest {
242 pub(crate) fn find_collector_member_by_plugin_name<S: AsRef<str>>(
243 &self,
244 plugin_name: S,
245 ) -> Option<&PluginsIngestMember> {
246 find_member_by_plugin_name(&self.collectors, plugin_name)
247 }
248
249 pub(crate) fn find_importer_member_by_plugin_name<S: AsRef<str>>(
250 &self,
251 plugin_name: S,
252 ) -> Option<&PluginsIngestMember> {
253 find_member_by_plugin_name(&self.importers, plugin_name)
254 }
255 }
256
257 #[cfg(feature = "modality")]
258 impl PluginsMutation {
259 pub(crate) fn find_mutator_member_by_plugin_name<S: AsRef<str>>(
260 &self,
261 plugin_name: S,
262 ) -> Option<&PluginsMutationMember> {
263 find_member_by_plugin_name(&self.mutators, plugin_name)
264 }
265 }
266
267 impl From<refined::Config> for Config {
268 fn from(value: refined::Config) -> Self {
269 Self {
270 ingest: value.ingest.map(Into::into),
271 mutation: value.mutation.map(Into::into),
272 plugins: value.plugins.map(Into::into),
273 metadata: value.metadata,
274 }
275 }
276 }
277
278 impl From<refined::TopLevelIngest> for TopLevelIngest {
279 fn from(value: refined::TopLevelIngest) -> Self {
280 Self {
281 protocol_parent_url: value.protocol_parent_url.map(Into::into),
282 allow_insecure_tls: value.allow_insecure_tls,
283 max_write_batch_staleness_millis: value.max_write_batch_staleness.map(|v| {
284 let millis = v.as_millis();
285 if millis >= u64::MAX as u128 {
286 u64::MAX
287 } else {
288 millis as u64
289 }
290 }),
291 protocol_child_port: value.protocol_child_port.map(Into::into),
292 timeline_attributes: value.timeline_attributes.into(),
293 rollover_trackers: value
294 .rollover_trackers
295 .into_iter()
296 .map(Into::into)
297 .collect(),
298 }
299 }
300 }
301 impl From<refined::TopLevelMutation> for TopLevelMutation {
302 fn from(value: refined::TopLevelMutation) -> Self {
303 Self {
304 protocol_parent_url: value.protocol_parent_url.map(Into::into),
305 allow_insecure_tls: value.allow_insecure_tls,
306 protocol_child_port: value.protocol_child_port.map(Into::into),
307 mutator_http_api_port: value.mutator_http_api_port.map(Into::into),
308 mutator_attributes: value.mutator_attributes.into(),
309 external_mutator_urls: value
310 .external_mutator_urls
311 .into_iter()
312 .map(Into::into)
313 .collect(),
314 }
315 }
316 }
317 impl From<refined::TopLevelPlugins> for TopLevelPlugins {
318 fn from(value: refined::TopLevelPlugins) -> Self {
319 Self {
320 available_ports: value.available_ports.map(Into::into),
321 plugins_dir: value.plugins_dir,
322 ingest: value.ingest.map(Into::into),
323 mutation: value.mutation.map(Into::into),
324 }
325 }
326 }
327 impl From<refined::TimelineAttributes> for TimelineAttributes {
328 fn from(value: refined::TimelineAttributes) -> Self {
329 Self {
330 additional_timeline_attributes: value
331 .additional_timeline_attributes
332 .into_iter()
333 .map(Into::into)
334 .collect(),
335 override_timeline_attributes: value
336 .override_timeline_attributes
337 .into_iter()
338 .map(Into::into)
339 .collect(),
340 }
341 }
342 }
343 impl From<refined::MutatorAttributes> for MutatorAttributes {
344 fn from(value: refined::MutatorAttributes) -> Self {
345 Self {
346 additional_mutator_attributes: value
347 .additional_mutator_attributes
348 .into_iter()
349 .map(Into::into)
350 .collect(),
351 override_mutator_attributes: value
352 .override_mutator_attributes
353 .into_iter()
354 .map(Into::into)
355 .collect(),
356 }
357 }
358 }
359 impl From<refined::PluginsIngest> for PluginsIngest {
360 fn from(value: refined::PluginsIngest) -> Self {
361 Self {
362 collectors: value
363 .collectors
364 .into_iter()
365 .map(|(k, v)| (k, v.into()))
366 .collect(),
367 importers: value
368 .importers
369 .into_iter()
370 .map(|(k, v)| (k, v.into()))
371 .collect(),
372 }
373 }
374 }
375 impl From<refined::PluginsMutation> for PluginsMutation {
376 fn from(value: refined::PluginsMutation) -> Self {
377 Self {
378 mutators: value
379 .mutators
380 .into_iter()
381 .map(|(k, v)| (k, v.into()))
382 .collect(),
383 }
384 }
385 }
386 impl From<refined::PluginsIngestMember> for PluginsIngestMember {
387 fn from(value: refined::PluginsIngestMember) -> Self {
388 Self {
389 plugin: value.plugin,
390 timeline_attributes: value.timeline_attributes.into(),
391 shutdown: value.shutdown.into(),
392 restart: value.restart,
393 metadata: value.metadata,
394 }
395 }
396 }
397 impl From<refined::PluginsMutationMember> for PluginsMutationMember {
398 fn from(value: refined::PluginsMutationMember) -> Self {
399 Self {
400 plugin: value.plugin,
401 mutator_attributes: value.mutator_attributes.into(),
402 shutdown: value.shutdown.into(),
403 restart: value.restart,
404 metadata: value.metadata,
405 }
406 }
407 }
408
409 impl From<refined::PluginShutdown> for PluginShutdown {
410 fn from(value: refined::PluginShutdown) -> Self {
411 Self {
412 shutdown_signal: value.shutdown_signal,
413 shutdown_timeout_millis: value.shutdown_timeout.map(|v| {
414 let millis = v.as_millis();
415 if millis >= u64::MAX as u128 {
416 u64::MAX
417 } else {
418 millis as u64
419 }
420 }),
421 }
422 }
423 }
424
425 impl From<refined::AvailablePorts> for AvailablePorts {
426 fn from(value: refined::AvailablePorts) -> Self {
427 Self {
428 any_local: value.any_local,
429 ranges: value
430 .ranges
431 .into_iter()
432 .map(|inclusive_range| [inclusive_range.start(), inclusive_range.end()])
433 .collect(),
434 }
435 }
436 }
437
438 impl From<refined::IngestRolloverTracker> for IngestRolloverTracker {
439 fn from(value: refined::IngestRolloverTracker) -> Self {
440 Self {
441 timeout_millis: value.timeout.map(|v| {
442 let millis = v.as_millis();
443 if millis >= u64::MAX as u128 {
444 u64::MAX
445 } else {
446 millis as u64
447 }
448 }),
449 sender: value.sender.map(Into::into),
450 receivers: value.receivers.into_iter().map(Into::into).collect(),
451 }
452 }
453 }
454
455 impl From<refined::RolloverTrackerParticipant> for RolloverTrackerParticipant {
456 fn from(value: refined::RolloverTrackerParticipant) -> Self {
457 Self {
458 timeline_attributes: value
459 .timeline_attributes
460 .into_iter()
461 .map(Into::into)
462 .collect(),
463 event_name: value.event_name,
464 event_attribute_key: value.event_attribute_key,
465 }
466 }
467 }
468}
469
470mod refined {
472 use super::TomlValue;
473 use crate::api::types::{AttrKey, AttrVal};
474 use lazy_static::lazy_static;
475 use regex::{Captures, Regex};
476 use std::collections::BTreeMap;
477 use std::env;
478 use std::fmt;
479 use std::path::PathBuf;
480 use std::str::FromStr;
481 use std::time::Duration;
482 use url::Url;
483
484 #[derive(Debug, Clone, Default, PartialEq)]
485 pub struct Config {
486 pub ingest: Option<TopLevelIngest>,
487 pub mutation: Option<TopLevelMutation>,
488 pub plugins: Option<TopLevelPlugins>,
489 pub metadata: BTreeMap<String, TomlValue>,
490 }
491
492 #[derive(Debug, Clone, Default, PartialEq, Eq)]
493 pub struct TopLevelIngest {
494 pub protocol_parent_url: Option<Url>,
495 pub allow_insecure_tls: bool,
496 pub protocol_child_port: Option<u16>,
497 pub timeline_attributes: TimelineAttributes,
498 pub max_write_batch_staleness: Option<Duration>,
499 pub rollover_trackers: Vec<IngestRolloverTracker>,
500 }
501
502 #[derive(Debug, Clone, Default, PartialEq, Eq)]
503 pub struct TopLevelMutation {
504 pub protocol_parent_url: Option<Url>,
505 pub allow_insecure_tls: bool,
506 pub protocol_child_port: Option<u16>,
507 pub mutator_http_api_port: Option<u16>,
508 pub mutator_attributes: MutatorAttributes,
509 pub external_mutator_urls: Vec<Url>,
510 }
511
512 #[derive(Debug, Clone, Default, PartialEq)]
513 pub struct TopLevelPlugins {
514 pub available_ports: Option<AvailablePorts>,
515 pub plugins_dir: Option<PathBuf>,
516 pub ingest: Option<PluginsIngest>,
517 pub mutation: Option<PluginsMutation>,
518 }
519
520 #[derive(Debug, Clone, Default, PartialEq, Eq)]
521 pub struct AvailablePorts {
522 pub any_local: Option<bool>,
523 pub ranges: Vec<InclusivePortRange>,
524 }
525
526 #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
527 pub struct InclusivePortRange {
528 start: u16,
529 end: u16,
530 }
531
532 impl InclusivePortRange {
533 pub fn new(start: u16, end: u16) -> Result<Self, SemanticErrorExplanation> {
534 if start > end {
535 Err(SemanticErrorExplanation(format!("Port range start must <= end, but provided start {start} was > provided end {end}")))
536 } else {
537 Ok(InclusivePortRange { start, end })
538 }
539 }
540 pub fn start(&self) -> u16 {
541 self.start
542 }
543 pub fn end(&self) -> u16 {
544 self.end
545 }
546 pub fn start_mut(&mut self) -> &mut u16 {
547 &mut self.start
548 }
549 pub fn end_mut(&mut self) -> &mut u16 {
550 &mut self.end
551 }
552 }
553 #[derive(Debug, Clone, Default, PartialEq, Eq)]
554 pub struct TimelineAttributes {
555 pub additional_timeline_attributes: Vec<AttrKeyEqValuePair>,
556 pub override_timeline_attributes: Vec<AttrKeyEqValuePair>,
557 }
558 #[derive(Debug, Clone, Default, PartialEq, Eq)]
559 pub struct MutatorAttributes {
560 pub additional_mutator_attributes: Vec<AttrKeyEqValuePair>,
561 pub override_mutator_attributes: Vec<AttrKeyEqValuePair>,
562 }
563
564 impl MutatorAttributes {
565 pub fn merge(
566 &mut self,
567 other: MutatorAttributes,
568 ) -> Result<(), MergeMutatorAttributesError> {
569 for AttrKeyEqValuePair(k, v) in other.additional_mutator_attributes.into_iter() {
570 if self
571 .additional_mutator_attributes
572 .iter()
573 .any(|kvp| kvp.0 == k)
574 {
575 return Err(MergeMutatorAttributesError::KeyConflict(k));
576 }
577
578 self.additional_mutator_attributes
579 .push(AttrKeyEqValuePair(k, v));
580 }
581
582 self.override_mutator_attributes
583 .extend(other.override_mutator_attributes);
584
585 Ok(())
586 }
587 }
588
589 #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
590 pub enum MergeMutatorAttributesError {
591 #[error("Conflicting settings for mutator attribute key {0}")]
592 KeyConflict(AttrKey),
593 }
594
595 #[derive(Debug, Clone, Default, PartialEq)]
596 pub struct PluginsIngest {
597 pub collectors: BTreeMap<String, PluginsIngestMember>,
598 pub importers: BTreeMap<String, PluginsIngestMember>,
599 }
600 #[derive(Debug, Clone, Default, PartialEq)]
601 pub struct PluginsIngestMember {
602 pub plugin: Option<String>,
603 pub timeline_attributes: TimelineAttributes,
604 pub shutdown: PluginShutdown,
605 pub restart: Option<bool>,
606 pub metadata: BTreeMap<String, TomlValue>,
607 }
608 #[derive(Debug, Clone, Default, PartialEq)]
609 pub struct PluginsMutation {
610 pub mutators: BTreeMap<String, PluginsMutationMember>,
611 }
612 #[derive(Debug, Clone, Default, PartialEq)]
613 pub struct PluginsMutationMember {
614 pub plugin: Option<String>,
615 pub mutator_attributes: MutatorAttributes,
616 pub shutdown: PluginShutdown,
617 pub restart: Option<bool>,
618 pub metadata: BTreeMap<String, TomlValue>,
619 }
620 #[derive(Debug, Clone, Default, PartialEq)]
621 pub struct PluginShutdown {
622 pub shutdown_signal: Option<String>,
623 pub shutdown_timeout: Option<Duration>,
624 }
625
626 #[derive(Debug, Clone, Default, PartialEq, Eq)]
627 pub struct IngestRolloverTracker {
628 pub timeout: Option<Duration>,
629 pub sender: Option<RolloverTrackerParticipant>,
630 pub receivers: Vec<RolloverTrackerParticipant>,
631 }
632
633 #[derive(Debug, Clone, Default, PartialEq, Eq)]
634 pub struct RolloverTrackerParticipant {
635 pub timeline_attributes: Vec<AttrKeyEqValuePair>,
636 pub event_name: Option<String>,
637 pub event_attribute_key: Option<String>,
638 }
639
640 #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
641 pub enum AttrKeyValuePairParseError {
642 #[error("'{0}' is not a valid attribute key=value string.")]
643 Format(String),
644
645 #[error("The key '{0}' starts with an invalid character.")]
646 InvalidKey(String),
647
648 #[error(transparent)]
649 EnvSub(#[from] EnvSubError),
650 }
651
652 #[derive(Clone, Debug, PartialEq, Eq, PartialOrd)]
661 pub struct AttrKeyEqValuePair(pub AttrKey, pub AttrVal);
662
663 impl From<(AttrKey, AttrVal)> for AttrKeyEqValuePair {
664 fn from((k, v): (AttrKey, AttrVal)) -> Self {
665 AttrKeyEqValuePair(k, v)
666 }
667 }
668
669 impl FromStr for AttrKeyEqValuePair {
670 type Err = AttrKeyValuePairParseError;
671
672 fn from_str(input: &str) -> Result<Self, Self::Err> {
673 let s = envsub(input)?;
675
676 let parts: Vec<&str> = s.trim().split('=').map(|p| p.trim()).collect();
677 if parts.len() != 2 || parts[0].is_empty() || parts[1].is_empty() {
678 return Err(AttrKeyValuePairParseError::Format(s.to_string()));
679 }
680
681 let key = parts[0];
682 let val_str = parts[1];
683
684 if key.starts_with('.') {
685 return Err(AttrKeyValuePairParseError::InvalidKey(key.to_string()));
686 }
687
688 let val: Result<_, std::convert::Infallible> = val_str.parse();
689 let val = val.unwrap();
690
691 Ok(AttrKeyEqValuePair(AttrKey::new(key.to_string()), val))
692 }
693 }
694
695 impl TryFrom<String> for AttrKeyEqValuePair {
696 type Error = AttrKeyValuePairParseError;
697
698 fn try_from(s: String) -> Result<Self, Self::Error> {
699 AttrKeyEqValuePair::from_str(&s)
700 }
701 }
702
703 impl From<AttrKeyEqValuePair> for String {
704 fn from(kv: AttrKeyEqValuePair) -> Self {
705 kv.to_string()
706 }
707 }
708
709 impl fmt::Display for AttrKeyEqValuePair {
710 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711 let val_s = match &self.1 {
715 AttrVal::String(interned_string) => {
716 let mut s = String::new();
717 s.push('\"');
718 s.push_str(interned_string.as_ref());
719 s.push('\"');
720 s
721 }
722 AttrVal::TimelineId(timeline_id) => {
723 let mut s = String::new();
724 s.push('\"');
725 s.push_str(timeline_id.to_string().as_str());
726 s.push('\"');
727 s
728 }
729 v => v.to_string(),
730 };
731 write!(f, "{} = {}", self.0, val_s)
732 }
733 }
734
735 #[derive(Debug)]
736 pub struct SemanticErrorExplanation(pub String);
737
738 use crate::reflector_config::raw_toml;
739 impl TryFrom<raw_toml::Config> for Config {
740 type Error = SemanticErrorExplanation;
741
742 fn try_from(value: raw_toml::Config) -> Result<Self, Self::Error> {
743 Ok(Self {
744 ingest: if let Some(ingest) = value.ingest {
745 Some(ingest.try_into()?)
746 } else {
747 None
748 },
749 mutation: if let Some(mutation) = value.mutation {
750 Some(mutation.try_into()?)
751 } else {
752 None
753 },
754 plugins: if let Some(plugins) = value.plugins {
755 Some(plugins.try_into()?)
756 } else {
757 None
758 },
759 metadata: value.metadata,
760 })
761 }
762 }
763
764 impl TryFrom<raw_toml::TopLevelIngest> for TopLevelIngest {
765 type Error = SemanticErrorExplanation;
766
767 fn try_from(value: raw_toml::TopLevelIngest) -> Result<Self, Self::Error> {
768 Ok(Self {
769 protocol_parent_url: if let Some(u) = value.protocol_parent_url {
770 Some(url::Url::from_str(&u).map_err(|parse_err| {
771 SemanticErrorExplanation(format!(
772 "ingest.protocol-parent-url could not be parsed. {parse_err}"
773 ))
774 })?)
775 } else {
776 None
777 },
778 protocol_child_port: value.protocol_child_port,
779 timeline_attributes: value.timeline_attributes.try_into()?,
780 allow_insecure_tls: value.allow_insecure_tls,
781 max_write_batch_staleness: value
782 .max_write_batch_staleness_millis
783 .map(Duration::from_millis),
784 rollover_trackers: value
785 .rollover_trackers
786 .into_iter()
787 .map(IngestRolloverTracker::try_from)
788 .collect::<Result<Vec<_>, SemanticErrorExplanation>>()?,
789 })
790 }
791 }
792 impl TryFrom<raw_toml::TimelineAttributes> for TimelineAttributes {
793 type Error = SemanticErrorExplanation;
794
795 fn try_from(value: raw_toml::TimelineAttributes) -> Result<Self, Self::Error> {
796 Ok(Self {
797 additional_timeline_attributes: value
798 .additional_timeline_attributes
799 .into_iter()
800 .map(AttrKeyEqValuePair::try_from)
801 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
802 .map_err(|e| {
803 SemanticErrorExplanation(format!(
804 "Error in additional-timeline-attributes member. {e}"
805 ))
806 })?,
807 override_timeline_attributes: value
808 .override_timeline_attributes
809 .into_iter()
810 .map(AttrKeyEqValuePair::try_from)
811 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
812 .map_err(|e| {
813 SemanticErrorExplanation(format!(
814 "Error in override-timeline-attributes member. {e}"
815 ))
816 })?,
817 })
818 }
819 }
820 impl TryFrom<raw_toml::MutatorAttributes> for MutatorAttributes {
821 type Error = SemanticErrorExplanation;
822
823 fn try_from(value: raw_toml::MutatorAttributes) -> Result<Self, Self::Error> {
824 Ok(Self {
825 additional_mutator_attributes: value
826 .additional_mutator_attributes
827 .into_iter()
828 .map(AttrKeyEqValuePair::try_from)
829 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
830 .map_err(|e| {
831 SemanticErrorExplanation(format!(
832 "Error in additional-mutator-attributes member. {e}"
833 ))
834 })?,
835 override_mutator_attributes: value
836 .override_mutator_attributes
837 .into_iter()
838 .map(AttrKeyEqValuePair::try_from)
839 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
840 .map_err(|e| {
841 SemanticErrorExplanation(format!(
842 "Error in override-mutator-attributes member. {e}"
843 ))
844 })?,
845 })
846 }
847 }
848
849 impl TryFrom<raw_toml::TopLevelMutation> for TopLevelMutation {
850 type Error = SemanticErrorExplanation;
851
852 fn try_from(value: raw_toml::TopLevelMutation) -> Result<Self, Self::Error> {
853 Ok(Self {
854 protocol_parent_url: if let Some(u) = value.protocol_parent_url {
855 Some(url::Url::from_str(&u).map_err(|parse_err| SemanticErrorExplanation(format!("mutation.protocol-parent-url could not be parsed. {parse_err}")))?)
856 } else {
857 None
858 },
859 allow_insecure_tls: value.allow_insecure_tls,
860 protocol_child_port: value.protocol_child_port,
861 mutator_http_api_port: value.mutator_http_api_port,
862 mutator_attributes: value.mutator_attributes.try_into()?,
863 external_mutator_urls: value.external_mutator_urls.into_iter().map(|v| url::Url::from_str(&v).map_err(|parse_err|SemanticErrorExplanation(format!("mutation.external-mutator-urls member {v} could not be parsed. {parse_err}")))).collect::<Result<Vec<url::Url>, SemanticErrorExplanation>>()?,
864 })
865 }
866 }
867 impl TryFrom<raw_toml::TopLevelPlugins> for TopLevelPlugins {
868 type Error = SemanticErrorExplanation;
869
870 fn try_from(value: raw_toml::TopLevelPlugins) -> Result<Self, Self::Error> {
871 Ok(Self {
872 available_ports: if let Some(v) = value.available_ports {
873 Some(v.try_into()?)
874 } else {
875 None
876 },
877 plugins_dir: value.plugins_dir,
878 ingest: if let Some(v) = value.ingest {
879 Some(v.try_into()?)
880 } else {
881 None
882 },
883 mutation: if let Some(v) = value.mutation {
884 Some(v.try_into()?)
885 } else {
886 None
887 },
888 })
889 }
890 }
891
892 impl TryFrom<raw_toml::AvailablePorts> for AvailablePorts {
893 type Error = SemanticErrorExplanation;
894
895 fn try_from(value: raw_toml::AvailablePorts) -> Result<Self, Self::Error> {
896 Ok(Self {
897 any_local: value.any_local,
898 ranges: value
899 .ranges
900 .into_iter()
901 .map(|v| InclusivePortRange::new(v[0], v[1]))
902 .collect::<Result<Vec<InclusivePortRange>, SemanticErrorExplanation>>()?,
903 })
904 }
905 }
906 impl TryFrom<raw_toml::PluginsIngest> for PluginsIngest {
907 type Error = SemanticErrorExplanation;
908
909 fn try_from(value: raw_toml::PluginsIngest) -> Result<Self, Self::Error> {
910 Ok(
911 Self {
912 collectors:
913 value
914 .collectors
915 .into_iter()
916 .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
917 .collect::<Result<
918 BTreeMap<String, PluginsIngestMember>,
919 SemanticErrorExplanation,
920 >>()?,
921 importers:
922 value
923 .importers
924 .into_iter()
925 .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
926 .collect::<Result<
927 BTreeMap<String, PluginsIngestMember>,
928 SemanticErrorExplanation,
929 >>()?,
930 },
931 )
932 }
933 }
934 impl TryFrom<raw_toml::PluginsIngestMember> for PluginsIngestMember {
935 type Error = SemanticErrorExplanation;
936
937 fn try_from(value: raw_toml::PluginsIngestMember) -> Result<Self, Self::Error> {
938 Ok(Self {
939 plugin: value.plugin,
940 timeline_attributes: value.timeline_attributes.try_into()?,
941 shutdown: value.shutdown.into(),
942 restart: value.restart,
943 metadata: value.metadata,
944 })
945 }
946 }
947 impl TryFrom<raw_toml::PluginsMutation> for PluginsMutation {
948 type Error = SemanticErrorExplanation;
949
950 fn try_from(value: raw_toml::PluginsMutation) -> Result<Self, Self::Error> {
951 Ok(
952 Self {
953 mutators:
954 value
955 .mutators
956 .into_iter()
957 .map(|(k, v)| v.try_into().map(|vv| (k, vv)))
958 .collect::<Result<
959 BTreeMap<String, PluginsMutationMember>,
960 SemanticErrorExplanation,
961 >>()?,
962 },
963 )
964 }
965 }
966 impl TryFrom<raw_toml::PluginsMutationMember> for PluginsMutationMember {
967 type Error = SemanticErrorExplanation;
968
969 fn try_from(value: raw_toml::PluginsMutationMember) -> Result<Self, Self::Error> {
970 Ok(Self {
971 plugin: value.plugin,
972 mutator_attributes: value.mutator_attributes.try_into()?,
973 shutdown: value.shutdown.into(),
974 restart: value.restart,
975 metadata: value.metadata,
976 })
977 }
978 }
979
980 impl From<raw_toml::PluginShutdown> for PluginShutdown {
981 fn from(value: raw_toml::PluginShutdown) -> Self {
982 Self {
983 shutdown_signal: value.shutdown_signal,
984 shutdown_timeout: value.shutdown_timeout_millis.map(Duration::from_millis),
985 }
986 }
987 }
988
989 impl TryFrom<raw_toml::IngestRolloverTracker> for IngestRolloverTracker {
990 type Error = SemanticErrorExplanation;
991
992 fn try_from(value: raw_toml::IngestRolloverTracker) -> Result<Self, Self::Error> {
993 Ok(Self {
994 timeout: value.timeout_millis.map(Duration::from_millis),
995 sender: value.sender.map(TryInto::try_into).transpose()?,
996 receivers: value
997 .receivers
998 .into_iter()
999 .map(TryInto::try_into)
1000 .collect::<Result<Vec<_>, SemanticErrorExplanation>>()?,
1001 })
1002 }
1003 }
1004
1005 impl TryFrom<raw_toml::RolloverTrackerParticipant> for RolloverTrackerParticipant {
1006 type Error = SemanticErrorExplanation;
1007
1008 fn try_from(value: raw_toml::RolloverTrackerParticipant) -> Result<Self, Self::Error> {
1009 Ok(Self {
1010 timeline_attributes: value
1011 .timeline_attributes
1012 .into_iter()
1013 .map(AttrKeyEqValuePair::try_from)
1014 .collect::<Result<Vec<_>, AttrKeyValuePairParseError>>()
1015 .map_err(|e| {
1016 SemanticErrorExplanation(format!(
1017 "Error in rollover-tracker member timeline-attributes. {e}"
1018 ))
1019 })?,
1020 event_name: value.event_name,
1021 event_attribute_key: value.event_attribute_key,
1022 })
1023 }
1024 }
1025
1026 impl Config {
1027 pub fn is_empty(&self) -> bool {
1028 self.ingest.is_none()
1029 && self.mutation.is_none()
1030 && self.plugins.is_none()
1031 && self.metadata.is_empty()
1032 }
1033 }
1034
1035 impl PluginsIngest {
1036 pub fn find_collector_member_by_plugin_name<S: AsRef<str>>(
1037 &self,
1038 plugin_name: S,
1039 ) -> Option<&PluginsIngestMember> {
1040 find_member_by_plugin_name(&self.collectors, plugin_name)
1041 }
1042
1043 pub fn find_importer_member_by_plugin_name<S: AsRef<str>>(
1044 &self,
1045 plugin_name: S,
1046 ) -> Option<&PluginsIngestMember> {
1047 find_member_by_plugin_name(&self.importers, plugin_name)
1048 }
1049 }
1050
1051 impl PluginsMutation {
1052 pub fn find_mutator_member_by_plugin_name<S: AsRef<str>>(
1053 &self,
1054 plugin_name: S,
1055 ) -> Option<&PluginsMutationMember> {
1056 find_member_by_plugin_name(&self.mutators, plugin_name)
1057 }
1058 }
1059
1060 pub(crate) fn find_member_by_plugin_name<T: PluginMemberExt, N: AsRef<str>>(
1061 members: &BTreeMap<String, T>,
1062 plugin_name: N,
1063 ) -> Option<&T> {
1064 members.iter().find_map(|(k, m)| {
1065 if member_matches_plugin_name(plugin_name.as_ref(), k, m.plugin()) {
1066 Some(m)
1067 } else {
1068 None
1069 }
1070 })
1071 }
1072
1073 pub(crate) fn member_matches_plugin_name<N: AsRef<str>, K: AsRef<str>, P: AsRef<str>>(
1074 plugin_name: N,
1075 member_key: K,
1076 member_plugin: Option<P>,
1077 ) -> bool {
1078 if member_key.as_ref() == plugin_name.as_ref() {
1079 true
1081 } else if member_plugin
1082 .as_ref()
1083 .map(|p| p.as_ref() == plugin_name.as_ref())
1084 .unwrap_or(false)
1085 {
1086 true
1088 } else if member_key.as_ref().contains(plugin_name.as_ref()) {
1089 true
1091 } else if member_plugin
1092 .as_ref()
1093 .map(|p| p.as_ref().contains(plugin_name.as_ref()))
1094 .unwrap_or(false)
1095 {
1096 true
1098 } else {
1099 false
1100 }
1101 }
1102
1103 pub(crate) trait PluginMemberExt {
1104 fn plugin(&self) -> Option<&str>;
1105 }
1106
1107 impl PluginMemberExt for PluginsIngestMember {
1108 fn plugin(&self) -> Option<&str> {
1109 self.plugin.as_deref()
1110 }
1111 }
1112
1113 impl PluginMemberExt for PluginsMutationMember {
1114 fn plugin(&self) -> Option<&str> {
1115 self.plugin.as_deref()
1116 }
1117 }
1118
1119 #[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
1120 pub enum EnvSubError {
1121 #[error("The environment variable '{0}' contains invalid unicode")]
1122 EnvVarNotUnicode(String),
1123
1124 #[error("The environment variable '{0}' is not set and no default value is specified")]
1125 EnvVarNotPresent(String),
1126 }
1127
1128 pub fn envsub(input: &str) -> Result<String, EnvSubError> {
1134 lazy_static! {
1135 static ref ENVSUB_RE: Regex =
1140 Regex::new(r"\$\{(?P<var>[a-zA-Z_][a-zA-Z0-9_]*)(:?-(?P<def>.*?))?\}")
1141 .expect("Could not construct envsub Regex");
1142 }
1143
1144 replace_all(&ENVSUB_RE, input, |caps: &Captures| {
1145 let env_var = &caps["var"];
1147 match env::var(env_var) {
1148 Ok(env_val_val) => Ok(env_val_val),
1149 Err(env::VarError::NotUnicode(_)) => {
1150 Err(EnvSubError::EnvVarNotUnicode(env_var.to_owned()))
1151 }
1152 Err(env::VarError::NotPresent) => {
1153 if let Some(def) = caps.name("def") {
1155 Ok(def.as_str().to_string())
1156 } else {
1157 Err(EnvSubError::EnvVarNotPresent(env_var.to_owned()))
1158 }
1159 }
1160 }
1161 })
1162 }
1163
1164 fn replace_all(
1166 re: &Regex,
1167 input: &str,
1168 replacement: impl Fn(&Captures) -> Result<String, EnvSubError>,
1169 ) -> Result<String, EnvSubError> {
1170 let mut new = String::with_capacity(input.len());
1171 let mut last_match = 0;
1172 for caps in re.captures_iter(input) {
1173 let m = caps.get(0).unwrap();
1174 new.push_str(&input[last_match..m.start()]);
1175 new.push_str(&replacement(&caps)?);
1176 last_match = m.end();
1177 }
1178 new.push_str(&input[last_match..]);
1179 Ok(new)
1180 }
1181}
1182
1183#[derive(Debug, Error)]
1184pub enum ConfigWriteError {
1185 #[error("TOML serialization error.")]
1186 Toml(#[from] toml::ser::Error),
1187
1188 #[error("IO error")]
1189 Io(#[from] std::io::Error),
1190}
1191
1192#[derive(Debug, Error)]
1193pub enum ConfigLoadError {
1194 #[error("Error in config file {} relating to TOML parsing. {error}", .path.display())]
1195 ConfigFileToml {
1196 path: PathBuf,
1197 #[source]
1198 error: toml::de::Error,
1199 },
1200 #[allow(unused)]
1201 #[error("Error in config content relating to TOML parsing. {error}")]
1202 ConfigToml {
1203 #[source]
1204 error: toml::de::Error,
1205 },
1206
1207 #[error("IO Error")]
1208 Io(#[from] std::io::Error),
1209
1210 #[error("Error in config content relating to semantics. {explanation}")]
1211 DefinitionSemantics { explanation: String },
1212}
1213
1214pub fn try_from_file(path: &Path) -> Result<refined::Config, ConfigLoadError> {
1215 let content = &std::fs::read_to_string(path)?;
1216 let partial: raw_toml::Config =
1217 toml::from_str(content).map_err(|e| ConfigLoadError::ConfigFileToml {
1218 path: path.to_owned(),
1219 error: e,
1220 })?;
1221 let r: Result<refined::Config, SemanticErrorExplanation> = partial.try_into();
1222 r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
1223 explanation: semantics.0,
1224 })
1225}
1226
1227pub fn try_from_str(content: &str) -> Result<refined::Config, ConfigLoadError> {
1228 let partial: raw_toml::Config =
1229 toml::from_str(content).map_err(|e| ConfigLoadError::ConfigToml { error: e })?;
1230 let r: Result<refined::Config, SemanticErrorExplanation> = partial.try_into();
1231 r.map_err(|semantics| ConfigLoadError::DefinitionSemantics {
1232 explanation: semantics.0,
1233 })
1234}
1235
1236pub fn try_to_file(config: &refined::Config, path: &Path) -> Result<(), ConfigWriteError> {
1237 let content = try_to_string(config)?;
1238 std::fs::write(path, content)?;
1239 Ok(())
1240}
1241
1242pub fn try_to_string(config: &refined::Config) -> Result<String, ConfigWriteError> {
1243 let raw: raw_toml::Config = config.clone().into();
1244 let toml_value = toml::Value::try_from(raw)?;
1248 let content = toml::to_string_pretty(&toml_value)?;
1249 Ok(content)
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254 use super::*;
1255 use crate::api::AttrKey;
1256
1257 const FULLY_FILLED_IN_TOML: &str = r#"[ingest]
1262additional-timeline-attributes = [
1263 'a = 1',
1264 'b = "foo"',
1265]
1266override-timeline-attributes = ['c = true']
1267protocol-child-port = 9079
1268protocol-parent-url = 'modality-ingest://auxon.io:9077'
1269
1270[[ingest.rollover-trackers]]
1271timeout-millis = 1000
1272
1273[[ingest.rollover-trackers.receivers]]
1274event-attribute-key = 'event.seqnum'
1275event-name = 'rx'
1276timeline-attributes = ['timeline.name = "B"']
1277
1278[[ingest.rollover-trackers.receivers]]
1279event-attribute-key = 'event.seqnum'
1280event-name = 'rx'
1281timeline-attributes = ['timeline.name = "C"']
1282
1283[ingest.rollover-trackers.sender]
1284event-attribute-key = 'event.seqnum'
1285event-name = 'tx'
1286timeline-attributes = ['timeline.name = "A"']
1287
1288[metadata]
1289bag = 42
1290grab = 24
1291
1292[mutation]
1293additional-mutator-attributes = [
1294 'd = 100',
1295 'e = "oof"',
1296]
1297external-mutator-urls = ['http://some-other-process.com:8080/']
1298mutator-http-api-port = 9059
1299override-mutator-attributes = ['f = false']
1300protocol-child-port = 9080
1301protocol-parent-url = 'modality-ingest://localhost:9078'
1302
1303[plugins]
1304plugins-dir = 'path/to/custom/plugins/dir'
1305
1306[plugins.available-ports]
1307any-local = false
1308ranges = [
1309 [
1310 9081,
1311 9097,
1312],
1313 [
1314 10123,
1315 10123,
1316],
1317]
1318[plugins.ingest.collectors.foobar]
1319plugin = 'modality-socketcan-collector'
1320
1321[plugins.ingest.collectors.foobar.metadata]
1322all-the-custom = false
1323
1324[plugins.ingest.collectors.lttng-live]
1325additional-timeline-attributes = [
1326 'a = 2',
1327 'r = 3',
1328]
1329override-timeline-attributes = [
1330 'c = false',
1331 'q = 99',
1332]
1333restart = true
1334shutdown-signal = 'SIGINT'
1335shutdown-timeout-millis = 1000
1336
1337[plugins.ingest.collectors.lttng-live.metadata]
1338all-the-custom = true
1339bag = 41
1340[plugins.ingest.collectors.my-dlt-cfg.metadata]
1341foo = 10
1342[plugins.ingest.importers.csv-yolo]
1343additional-timeline-attributes = ['s = 4']
1344override-timeline-attributes = ['t = "five"']
1345
1346[plugins.ingest.importers.csv-yolo.metadata]
1347other-custom = 'yup'
1348[plugins.mutation.mutators.linux-network]
1349additional-mutator-attributes = ['u = "six"']
1350override-mutator-attributes = ['v = 7']
1351
1352[plugins.mutation.mutators.linux-network.metadata]
1353moar-custom = [
1354 'ynot',
1355 'structured',
1356 2,
1357]
1358"#;
1359
1360 #[test]
1361 fn raw_representation_round_trip() {
1362 let raw: raw_toml::Config = toml::from_str(FULLY_FILLED_IN_TOML).unwrap();
1363 let back_out = raw_toml::try_raw_to_string_pretty(&raw).unwrap();
1364 assert_eq!(FULLY_FILLED_IN_TOML, back_out.as_str());
1365 }
1366
1367 #[test]
1368 fn refined_representation_round_trip() {
1369 let refined: refined::Config = try_from_str(FULLY_FILLED_IN_TOML).unwrap();
1370 let back_out = try_to_string(&refined).unwrap();
1371 let refined_prime: refined::Config = try_from_str(&back_out).unwrap();
1372 assert_eq!(refined, refined_prime);
1373 assert_eq!(FULLY_FILLED_IN_TOML, back_out.as_str());
1374 }
1375
1376 #[test]
1377 fn everything_is_optional() {
1378 let empty = "";
1379 let refined: refined::Config = try_from_str(empty).unwrap();
1380 let back_out = try_to_string(&refined).unwrap();
1381 let refined_prime: refined::Config = try_from_str(&back_out).unwrap();
1382 assert_eq!(refined, refined_prime);
1383 assert_eq!(empty, back_out.as_str());
1384 }
1385
1386 #[test]
1387 fn attr_kv_envsub_defaults() {
1388 let toml = r#"
1389[ingest]
1390additional-timeline-attributes = [
1391 '${NOT_SET_KEY:-foo} = ${NOT_SET_VAL-1}',
1392 '${NOT_SET_KEY-bar} = "${NOT_SET_VAL:-foo}"',
1393 '${NOT_SET_KEY-abc} = ${NOT_SET_VAL:-true}',
1394]"#;
1395 let cfg: refined::Config = try_from_str(toml).unwrap();
1396 let attrs = cfg
1397 .ingest
1398 .map(|i| i.timeline_attributes.additional_timeline_attributes)
1399 .unwrap();
1400 assert_eq!(
1401 attrs,
1402 vec![
1403 AttrKeyEqValuePair(AttrKey::new("foo".to_string()), 1_i64.into()),
1404 AttrKeyEqValuePair(AttrKey::new("bar".to_string()), "foo".into()),
1405 AttrKeyEqValuePair(AttrKey::new("abc".to_string()), true.into()),
1406 ]
1407 );
1408 }
1409
1410 #[test]
1411 fn attr_kv_envsub() {
1412 let toml = r#"
1413[ingest]
1414additional-timeline-attributes = [
1415 '${CARGO_PKG_NAME} = "${CARGO_PKG_VERSION}"',
1416 'int_key = ${CARGO_PKG_VERSION_MINOR}',
1417]"#;
1418 let cfg: refined::Config = try_from_str(toml).unwrap();
1419 let attrs = cfg
1420 .ingest
1421 .map(|i| i.timeline_attributes.additional_timeline_attributes)
1422 .unwrap();
1423 assert_eq!(
1424 attrs,
1425 vec![
1426 AttrKeyEqValuePair(
1427 AttrKey::new(env!("CARGO_PKG_NAME").to_string()),
1428 env!("CARGO_PKG_VERSION").into()
1429 ),
1430 AttrKeyEqValuePair(
1431 AttrKey::new("int_key".to_string()),
1432 env!("CARGO_PKG_VERSION_MINOR")
1433 .parse::<i64>()
1434 .unwrap()
1435 .into()
1436 ),
1437 ]
1438 );
1439 }
1440
1441 #[test]
1442 fn attr_kv_envsub_errors() {
1443 let toml = r#"
1444[ingest]
1445additional-timeline-attributes = [
1446 '${NOT_SET_KEY} = 1',
1447]"#;
1448 match try_from_str(toml).unwrap_err() {
1449 ConfigLoadError::DefinitionSemantics { explanation } => {
1450 assert_eq!(explanation, "Error in additional-timeline-attributes member. The environment variable 'NOT_SET_KEY' is not set and no default value is specified".to_string())
1451 }
1452 _ => panic!(),
1453 }
1454 }
1455
1456 #[test]
1457 fn config_member_lookups() {
1458 let cfg: refined::Config = try_from_str(FULLY_FILLED_IN_TOML).unwrap();
1459 let ingest = cfg
1460 .plugins
1461 .as_ref()
1462 .and_then(|c| c.ingest.as_ref())
1463 .unwrap();
1464 let mutation = cfg
1465 .plugins
1466 .as_ref()
1467 .and_then(|c| c.mutation.as_ref())
1468 .unwrap();
1469 assert!(ingest
1470 .find_collector_member_by_plugin_name("lttng-live")
1471 .is_some());
1472 assert!(ingest
1473 .find_collector_member_by_plugin_name("socketcan")
1474 .is_some());
1475 assert!(ingest.find_collector_member_by_plugin_name("dlt").is_some());
1476 assert!(ingest
1477 .find_importer_member_by_plugin_name("csv-yolo")
1478 .is_some());
1479 assert!(mutation
1480 .find_mutator_member_by_plugin_name("linux-network")
1481 .is_some());
1482 }
1483}