mmids_core/
config.rs

1use crate::reactors::ReactorDefinition;
2use crate::workflows::definitions::{WorkflowDefinition, WorkflowStepDefinition, WorkflowStepType};
3use pest::iterators::{Pair, Pairs};
4use pest::Parser;
5use std::collections::HashMap;
6use std::time::Duration;
7use thiserror::Error;
8use tracing::warn;
9
10/// Configuration for a Mmids system.  Defines the settings and any workflows that should be active.
11pub struct MmidsConfig {
12    pub settings: HashMap<String, Option<String>>,
13    pub reactors: HashMap<String, ReactorDefinition>,
14    pub workflows: HashMap<String, WorkflowDefinition>,
15}
16
17/// Errors that can occur when parsing a configuration entry
18#[derive(Error, Debug)]
19pub enum ConfigParseError {
20    #[error("The config provided could not be parsed")]
21    InvalidConfig(#[from] pest::error::Error<Rule>),
22
23    #[error("Found unexpected rule '{rule:?}' in the {section} section")]
24    UnexpectedRule { rule: Rule, section: String },
25
26    #[error("Duplicate workflow name: '{name}'")]
27    DuplicateWorkflowName { name: String },
28
29    #[error("Invalid node name '{name}' on line {line}")]
30    InvalidNodeName { name: String, line: usize },
31
32    #[error("Arguments are not allowed on a settings node, but some were found on line {line}")]
33    ArgumentsSpecifiedOnSettingNode { line: usize },
34
35    #[error("More than 1 argument was provided for the setting on line {line}")]
36    TooManySettingArguments { line: usize },
37
38    #[error("The argument provided for the setting on line {line} is invalid. Equal signs are not allowed")]
39    InvalidSettingArgumentFormat { line: usize },
40
41    #[error(
42        "The `routed_by_reactor` argument on line {line} is invalid. Equal signs are not allowed"
43    )]
44    InvalidRoutedByReactorArgument { line: usize },
45
46    #[error("The workflow on line {line} did not have a name specified")]
47    NoNameOnWorkflow { line: usize },
48
49    #[error("Invalid workflow name of {name} on line {line}")]
50    InvalidWorkflowName { line: usize, name: String },
51
52    #[error("The reactor on line {line} did not have a name specified")]
53    NoNameOnReactor { line: usize },
54
55    #[error("Invalid workflow name of '{name}' on line {line}")]
56    InvalidReactorName { line: usize, name: String },
57
58    #[error("The reactor on line {line} has an invalid update_interval value of '{argument}'. This value must be a number")]
59    InvalidUpdateIntervalValue { line: usize, argument: String },
60
61    #[error(
62        "The reactor parameter's value on line {line} is invalid. Equal signs are not allowed"
63    )]
64    InvalidReactorParameterValueFormat { line: usize },
65
66    #[error("The reactor parameter on line {line} had multiple values. Only 1 is allowed")]
67    TooManyReactorParameterValues { line: usize },
68
69    #[error("Multiple reactors have the name of '{name}'. Each reactor must have a unique name")]
70    DuplicateReactorName { name: String },
71
72    #[error("The executor on line {line} did not have an executor specified")]
73    NoExecutorForReactor { line: usize },
74}
75
76#[derive(Parser)]
77#[grammar = "config.pest"]
78struct RawConfigParser;
79
80struct ChildNode {
81    name: String,
82    arguments: HashMap<String, Option<String>>,
83}
84
85/// Parses configuration from a text block.
86pub fn parse(content: &str) -> Result<MmidsConfig, ConfigParseError> {
87    let mut config = MmidsConfig {
88        settings: HashMap::new(),
89        reactors: HashMap::new(),
90        workflows: HashMap::new(),
91    };
92
93    let pairs = RawConfigParser::parse(Rule::content, content)?;
94    for pair in pairs {
95        let rule = pair.as_rule();
96        match &rule {
97            Rule::node_block => handle_node_block(&mut config, pair)?,
98            Rule::EOI => (),
99            x => {
100                return Err(ConfigParseError::UnexpectedRule {
101                    rule: x.clone(),
102                    section: "root".to_string(),
103                })
104            }
105        }
106    }
107
108    Ok(config)
109}
110
111fn handle_node_block(config: &mut MmidsConfig, pair: Pair<Rule>) -> Result<(), ConfigParseError> {
112    let mut rules = pair.into_inner();
113    let name_node = rules.next().unwrap(); // grammar requires a node name
114    let name = name_node.as_str().trim();
115
116    match name.to_lowercase().as_str() {
117        "settings" => read_settings(config, rules)?,
118        "workflow" => read_workflow(config, rules, name_node.as_span().start_pos().line_col().0)?,
119        "reactor" => read_reactor(config, rules, name_node.as_span().start_pos().line_col().0)?,
120        _ => {
121            return Err(ConfigParseError::InvalidNodeName {
122                name: name.to_string(),
123                line: name_node.as_span().start_pos().line_col().0,
124            })
125        }
126    }
127
128    Ok(())
129}
130
131fn read_settings(config: &mut MmidsConfig, pairs: Pairs<Rule>) -> Result<(), ConfigParseError> {
132    for pair in pairs {
133        match pair.as_rule() {
134            Rule::child_node => {
135                let child_node = read_child_node(pair.clone())?;
136                if child_node.arguments.len() > 1 {
137                    return Err(ConfigParseError::TooManySettingArguments {
138                        line: get_line_number(&pair),
139                    });
140                }
141
142                if let Some(key) = child_node.arguments.keys().nth(0) {
143                    if let Some(Some(_value)) = child_node.arguments.get(key) {
144                        return Err(ConfigParseError::InvalidSettingArgumentFormat {
145                            line: get_line_number(&pair),
146                        });
147                    }
148
149                    config.settings.insert(child_node.name, Some(key.clone()));
150                } else {
151                    config.settings.insert(child_node.name, None);
152                }
153            }
154
155            Rule::argument => {
156                return Err(ConfigParseError::ArgumentsSpecifiedOnSettingNode {
157                    line: get_line_number(&pair),
158                })
159            }
160
161            rule => {
162                return Err(ConfigParseError::UnexpectedRule {
163                    rule,
164                    section: "settings".to_string(),
165                })
166            }
167        }
168    }
169
170    Ok(())
171}
172
173fn read_workflow(
174    config: &mut MmidsConfig,
175    pairs: Pairs<Rule>,
176    starting_line: usize,
177) -> Result<(), ConfigParseError> {
178    let mut steps = Vec::new();
179    let mut workflow_name = None;
180    let mut routed_by_reactor = false;
181    for pair in pairs {
182        match pair.as_rule() {
183            Rule::child_node => {
184                let child_node = read_child_node(pair)?;
185                steps.push(WorkflowStepDefinition {
186                    step_type: WorkflowStepType(child_node.name),
187                    parameters: child_node.arguments,
188                });
189            }
190
191            Rule::argument => {
192                let (key, value) = read_argument(pair.clone())?;
193                if workflow_name.is_some() {
194                    if &key == "routed_by_reactor" {
195                        if value.is_some() {
196                            return Err(ConfigParseError::InvalidRoutedByReactorArgument {
197                                line: get_line_number(&pair),
198                            });
199                        }
200
201                        routed_by_reactor = true;
202                    } else {
203                        let line = get_line_number(&pair);
204                        warn!(
205                            workflow_name = %workflow_name.as_ref().unwrap(),
206                            line = %line,
207                            argument = %key,
208                            "Unknown argument '{}' for workflow {} on line {}",
209                            key, workflow_name.as_ref().unwrap(), line,
210                        );
211                    }
212                } else {
213                    if value.is_some() {
214                        return Err(ConfigParseError::InvalidWorkflowName {
215                            name: pair.as_str().to_string(),
216                            line: get_line_number(&pair),
217                        });
218                    }
219
220                    workflow_name = Some(key);
221                }
222            }
223
224            rule => {
225                return Err(ConfigParseError::UnexpectedRule {
226                    rule,
227                    section: "workflow".to_string(),
228                })
229            }
230        }
231    }
232
233    if let Some(name) = workflow_name {
234        if config.workflows.contains_key(&name) {
235            return Err(ConfigParseError::DuplicateWorkflowName { name });
236        }
237
238        config.workflows.insert(
239            name.to_string(),
240            WorkflowDefinition {
241                name,
242                steps,
243                routed_by_reactor,
244            },
245        );
246    } else {
247        return Err(ConfigParseError::NoNameOnWorkflow {
248            line: starting_line,
249        });
250    }
251
252    Ok(())
253}
254
255fn read_reactor(
256    config: &mut MmidsConfig,
257    pairs: Pairs<Rule>,
258    starting_line: usize,
259) -> Result<(), ConfigParseError> {
260    let mut name = None;
261    let mut parameters = HashMap::new();
262    let mut executor_name = None;
263    let mut update_interval = 0;
264
265    for pair in pairs {
266        match pair.as_rule() {
267            Rule::argument => {
268                let (key, value) = read_argument(pair.clone())?;
269                if name.is_none() {
270                    // Name must come first and only have a key, no pair
271                    if value.is_some() {
272                        return Err(ConfigParseError::InvalidReactorName {
273                            line: get_line_number(&pair),
274                            name: pair.as_str().to_string(),
275                        });
276                    }
277
278                    name = Some(key);
279                } else {
280                    if key == "executor" {
281                        if let Some(value) = value {
282                            executor_name = Some(value);
283                        }
284                    } else if key == "update_interval" {
285                        if let Some(value) = value {
286                            if let Ok(num) = value.parse() {
287                                update_interval = num;
288                            } else {
289                                return Err(ConfigParseError::InvalidUpdateIntervalValue {
290                                    line: get_line_number(&pair),
291                                    argument: value,
292                                });
293                            }
294                        } else {
295                            return Err(ConfigParseError::InvalidUpdateIntervalValue {
296                                line: get_line_number(&pair),
297                                argument: "".to_string(),
298                            });
299                        }
300                    } else {
301                        let line = get_line_number(&pair);
302                        warn!(
303                            line = %line,
304                            argument = %key,
305                            reactor_name = %name.as_ref().unwrap(),
306                            "Unknown argument '{}' for reactor {} on line {}",
307                            key, name.as_ref().unwrap(), line,
308                        );
309                    }
310                }
311            }
312
313            Rule::child_node => {
314                let line_number = pair.as_span().start_pos().line_col().0;
315                let child_node = read_child_node(pair)?;
316                if child_node.arguments.len() > 1 {
317                    return Err(ConfigParseError::TooManyReactorParameterValues {
318                        line: line_number,
319                    });
320                }
321
322                if let Some(key) = child_node.arguments.keys().nth(0) {
323                    if let Some(Some(_)) = child_node.arguments.get(key) {
324                        return Err(ConfigParseError::InvalidReactorParameterValueFormat {
325                            line: line_number,
326                        });
327                    }
328
329                    parameters.insert(child_node.name, Some(key.clone()));
330                } else {
331                    parameters.insert(child_node.name, None);
332                }
333            }
334
335            rule => {
336                return Err(ConfigParseError::UnexpectedRule {
337                    rule,
338                    section: "settings".to_string(),
339                })
340            }
341        }
342    }
343
344    if let Some(name) = name {
345        if config.reactors.contains_key(&name) {
346            return Err(ConfigParseError::DuplicateReactorName { name });
347        }
348
349        if let Some(executor) = executor_name {
350            config.reactors.insert(
351                name.to_string(),
352                ReactorDefinition {
353                    name,
354                    parameters,
355                    executor,
356                    update_interval: Duration::from_secs(update_interval),
357                },
358            );
359        } else {
360            return Err(ConfigParseError::NoExecutorForReactor {
361                line: starting_line,
362            });
363        }
364    } else {
365        return Err(ConfigParseError::NoNameOnReactor {
366            line: starting_line,
367        });
368    }
369
370    Ok(())
371}
372
373fn read_argument(pair: Pair<Rule>) -> Result<(String, Option<String>), ConfigParseError> {
374    let result;
375    // Each argument should have a single child rule based on grammar
376    let argument = pair.into_inner().nth(0).unwrap();
377    match argument.as_rule() {
378        Rule::argument_flag => {
379            result = (argument.as_str().to_string(), None);
380        }
381
382        Rule::quoted_string_value => {
383            result = (argument.as_str().to_string(), None);
384        }
385
386        Rule::key_value_pair => {
387            let mut key = "".to_string();
388            let mut value = "".to_string();
389            for inner in argument.into_inner() {
390                match inner.as_rule() {
391                    Rule::key => key = inner.as_str().to_string(),
392                    Rule::value => {
393                        // If this is a quotes string value, we need to unquote it, otherwise
394                        // use the value as-is
395                        value = inner
396                            .clone()
397                            .into_inner()
398                            .filter(|p| p.as_rule() == Rule::quoted_string_value)
399                            .map(|p| p.as_str().to_string())
400                            .nth(0)
401                            .unwrap_or(inner.as_str().to_string());
402                    }
403
404                    rule => {
405                        return Err(ConfigParseError::UnexpectedRule {
406                            rule,
407                            section: "argument".to_string(),
408                        })
409                    }
410                }
411            }
412
413            result = (key, Some(value));
414        }
415
416        _ => {
417            return Err(ConfigParseError::UnexpectedRule {
418                rule: argument.as_rule(),
419                section: "child_node argument".to_string(),
420            })
421        }
422    }
423
424    Ok(result)
425}
426
427fn read_child_node(child_node: Pair<Rule>) -> Result<ChildNode, ConfigParseError> {
428    let mut pairs = child_node.into_inner();
429    let name_node = pairs.next().unwrap(); // Grammar requires a node name first
430    let mut parsed_node = ChildNode {
431        name: name_node.as_str().to_string(),
432        arguments: HashMap::new(),
433    };
434
435    for pair in pairs {
436        match pair.as_rule() {
437            Rule::argument => {
438                let (key, value) = read_argument(pair)?;
439                parsed_node.arguments.insert(key, value);
440            }
441
442            rule => {
443                return Err(ConfigParseError::UnexpectedRule {
444                    rule,
445                    section: "child_node".to_string(),
446                })
447            }
448        }
449    }
450
451    Ok(parsed_node)
452}
453
454fn get_line_number(node: &Pair<Rule>) -> usize {
455    node.as_span().start_pos().line_col().0
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn can_parse_settings() {
464        let content = "
465settings {
466    first a
467    second \"C:\\program files\\ffmpeg\\bin\\ffmpeg.exe\"
468    flag
469
470}
471";
472
473        let config = parse(content).unwrap();
474        assert_eq!(config.settings.len(), 3, "Unexpected number of settings");
475        assert_eq!(
476            config.settings.get("first"),
477            Some(&Some("a".to_string())),
478            "Unexpected first value"
479        );
480        assert_eq!(
481            config.settings.get("second"),
482            Some(&Some(
483                "C:\\program files\\ffmpeg\\bin\\ffmpeg.exe".to_string()
484            )),
485            "Unexpected second value"
486        );
487        assert_eq!(
488            config.settings.get("flag"),
489            Some(&None),
490            "Unexpected flag value"
491        );
492    }
493
494    #[test]
495    fn can_read_single_workflow() {
496        let content = "
497workflow name {
498    rtmp_receive port=1935 app=receive stream_key=*
499    hls path=c:\\temp\\test.m3u8 segment_size=\"3\" size=640x480 flag
500}
501";
502        let config = parse(content).unwrap();
503        assert_eq!(config.workflows.len(), 1, "Unexpected number of workflows");
504        assert!(
505            config.workflows.contains_key("name"),
506            "workflow 'name' did not exist"
507        );
508
509        let workflow = config.workflows.get("name").unwrap();
510        assert_eq!(
511            workflow.name,
512            "name".to_string(),
513            "Unexpected workflow name"
514        );
515        assert_eq!(
516            workflow.steps.len(),
517            2,
518            "Unexpected number of workflow steps"
519        );
520        assert!(
521            !workflow.routed_by_reactor,
522            "Expected routed by reactor to be false"
523        );
524
525        let step1 = workflow.steps.get(0).unwrap();
526        assert_eq!(
527            step1.step_type.0,
528            "rtmp_receive".to_string(),
529            "Unexpected type of step 1"
530        );
531        assert_eq!(step1.parameters.len(), 3, "Unexpected number of parameters");
532        assert_eq!(
533            step1.parameters.get("port"),
534            Some(&Some("1935".to_string())),
535            "Unexpected step 1 port value"
536        );
537        assert_eq!(
538            step1.parameters.get("app"),
539            Some(&Some("receive".to_string())),
540            "Unexpected step 1 app value"
541        );
542        assert_eq!(
543            step1.parameters.get("stream_key"),
544            Some(&Some("*".to_string())),
545            "Unexpected step 1 stream_key value"
546        );
547
548        let step2 = workflow.steps.get(1).unwrap();
549        assert_eq!(
550            step2.step_type.0,
551            "hls".to_string(),
552            "Unexpected type of step 1"
553        );
554        assert_eq!(step2.parameters.len(), 4, "Unexpected number of parameters");
555        assert_eq!(
556            step2.parameters.get("path"),
557            Some(&Some("c:\\temp\\test.m3u8".to_string())),
558            "Unexpected step 2 path value"
559        );
560        assert_eq!(
561            step2.parameters.get("segment_size"),
562            Some(&Some("3".to_string())),
563            "Unexpected step 2 segment_size value"
564        );
565        assert_eq!(
566            step2.parameters.get("size"),
567            Some(&Some("640x480".to_string())),
568            "Unexpected step 2 size value"
569        );
570        assert_eq!(
571            step2.parameters.get("flag"),
572            Some(&None),
573            "Unexpected step 2 flag value"
574        );
575    }
576
577    #[test]
578    fn can_read_multiple_workflows() {
579        let content = "
580workflow name {
581    rtmp_receive port=1935 app=receive stream_key=*
582    hls path=c:\\temp\\test.m3u8 segment_size=\"3\" size=640x480 flag
583}
584
585workflow name2 {
586    another a
587}
588";
589        let config = parse(content).unwrap();
590
591        assert_eq!(config.workflows.len(), 2, "Unexpected number of workflows");
592        assert!(
593            config.workflows.contains_key("name"),
594            "Could not find a workflow named 'name'"
595        );
596        assert!(
597            config.workflows.contains_key("name2"),
598            "Could not find a workflow named 'name2'"
599        );
600    }
601
602    #[test]
603    fn can_read_single_reactor() {
604        let content = "
605reactor name executor=abc {
606    param1 value
607    param2 value2
608}
609";
610        let config = parse(content).unwrap();
611        assert_eq!(config.reactors.len(), 1, "Unexpected number of reactors");
612        assert!(
613            config.reactors.contains_key("name"),
614            "Reactor in config did not have the expected name"
615        );
616
617        let reactor = &config.reactors["name"];
618        assert_eq!(
619            reactor.name,
620            "name".to_string(),
621            "Unexpected name of reactor"
622        );
623        assert_eq!(reactor.executor, "abc".to_string(), "Unexpected executor");
624        assert_eq!(
625            reactor.parameters.len(),
626            2,
627            "Unexpected number of parameters"
628        );
629        assert_eq!(
630            reactor.parameters.get("param1"),
631            Some(&Some("value".to_string())),
632            "Unexpected param1 value"
633        );
634        assert_eq!(
635            reactor.parameters.get("param2"),
636            Some(&Some("value2".to_string())),
637            "Unexpected param2 value"
638        );
639    }
640
641    #[test]
642    fn duplicate_workflow_name_returns_error() {
643        let content = "
644workflow name {
645    rtmp_receive port=1935 app=receive stream_key=*
646    hls path=c:\\temp\\test.m3u8 segment_size=\"3\" size=640x480 flag
647}
648
649workflow name {
650    another a
651}
652";
653        match parse(content) {
654            Err(ConfigParseError::DuplicateWorkflowName { name }) => {
655                if name != "name".to_string() {
656                    panic!("Unexpected name in workflow: '{}'", name);
657                }
658            }
659            Err(e) => panic!(
660                "Expected duplicate workflow name error, instead got: {:?}",
661                e
662            ),
663            Ok(_) => panic!("Received successful parse, but an error was expected"),
664        }
665    }
666
667    #[test]
668    fn full_config_can_be_parsed() {
669        let content = "
670# comment
671settings {
672    first a # another comment
673    second \"C:\\program files\\ffmpeg\\bin\\ffmpeg.exe\"
674    flag
675
676}
677
678workflow name { #workflow comment
679    rtmp_receive port=1935 app=receive stream_key=* #step comment
680    hls path=c:\\temp\\test.m3u8 segment_size=\"3\" size=640x480 flag
681}
682
683workflow name2 {
684    another a
685}
686";
687        parse(content).unwrap();
688    }
689
690    #[test]
691    fn can_parse_routed_by_reactor_argument_on_workflow() {
692        let content = "
693workflow name routed_by_reactor {
694    rtmp_receive port=1935 app=receive stream_key=*
695}
696";
697
698        let config = parse(content).unwrap();
699        let workflow = config.workflows.get("name").unwrap();
700        assert!(
701            workflow.routed_by_reactor,
702            "Expected routed by workflow to be true"
703        );
704    }
705
706    #[test]
707    fn comments_can_have_greater_than_or_less_than_signs() {
708        let content = "
709settings {
710  # <test>
711}
712";
713        parse(content).unwrap();
714    }
715
716    #[test]
717    fn comments_can_have_back_ticks() {
718        let content = "\
719settings {
720    # `test `
721}
722";
723
724        parse(content).unwrap();
725    }
726}