1use crate::reactors::ReactorDefinition;
2use crate::workflows::definitions::{WorkflowDefinition, WorkflowStepDefinition, WorkflowStepType};
3use pest::iterators::{Pair, Pairs};
4use pest::Parser;
5use std::collections::HashMap;
6use std::time::Duration;
7use thiserror::Error;
8use tracing::warn;
9
10pub struct MmidsConfig {
12 pub settings: HashMap<String, Option<String>>,
13 pub reactors: HashMap<String, ReactorDefinition>,
14 pub workflows: HashMap<String, WorkflowDefinition>,
15}
16
17#[derive(Error, Debug)]
19pub enum ConfigParseError {
20 #[error("The config provided could not be parsed")]
21 InvalidConfig(#[from] pest::error::Error<Rule>),
22
23 #[error("Found unexpected rule '{rule:?}' in the {section} section")]
24 UnexpectedRule { rule: Rule, section: String },
25
26 #[error("Duplicate workflow name: '{name}'")]
27 DuplicateWorkflowName { name: String },
28
29 #[error("Invalid node name '{name}' on line {line}")]
30 InvalidNodeName { name: String, line: usize },
31
32 #[error("Arguments are not allowed on a settings node, but some were found on line {line}")]
33 ArgumentsSpecifiedOnSettingNode { line: usize },
34
35 #[error("More than 1 argument was provided for the setting on line {line}")]
36 TooManySettingArguments { line: usize },
37
38 #[error("The argument provided for the setting on line {line} is invalid. Equal signs are not allowed")]
39 InvalidSettingArgumentFormat { line: usize },
40
41 #[error(
42 "The `routed_by_reactor` argument on line {line} is invalid. Equal signs are not allowed"
43 )]
44 InvalidRoutedByReactorArgument { line: usize },
45
46 #[error("The workflow on line {line} did not have a name specified")]
47 NoNameOnWorkflow { line: usize },
48
49 #[error("Invalid workflow name of {name} on line {line}")]
50 InvalidWorkflowName { line: usize, name: String },
51
52 #[error("The reactor on line {line} did not have a name specified")]
53 NoNameOnReactor { line: usize },
54
55 #[error("Invalid workflow name of '{name}' on line {line}")]
56 InvalidReactorName { line: usize, name: String },
57
58 #[error("The reactor on line {line} has an invalid update_interval value of '{argument}'. This value must be a number")]
59 InvalidUpdateIntervalValue { line: usize, argument: String },
60
61 #[error(
62 "The reactor parameter's value on line {line} is invalid. Equal signs are not allowed"
63 )]
64 InvalidReactorParameterValueFormat { line: usize },
65
66 #[error("The reactor parameter on line {line} had multiple values. Only 1 is allowed")]
67 TooManyReactorParameterValues { line: usize },
68
69 #[error("Multiple reactors have the name of '{name}'. Each reactor must have a unique name")]
70 DuplicateReactorName { name: String },
71
72 #[error("The executor on line {line} did not have an executor specified")]
73 NoExecutorForReactor { line: usize },
74}
75
76#[derive(Parser)]
77#[grammar = "config.pest"]
78struct RawConfigParser;
79
80struct ChildNode {
81 name: String,
82 arguments: HashMap<String, Option<String>>,
83}
84
85pub fn parse(content: &str) -> Result<MmidsConfig, ConfigParseError> {
87 let mut config = MmidsConfig {
88 settings: HashMap::new(),
89 reactors: HashMap::new(),
90 workflows: HashMap::new(),
91 };
92
93 let pairs = RawConfigParser::parse(Rule::content, content)?;
94 for pair in pairs {
95 let rule = pair.as_rule();
96 match &rule {
97 Rule::node_block => handle_node_block(&mut config, pair)?,
98 Rule::EOI => (),
99 x => {
100 return Err(ConfigParseError::UnexpectedRule {
101 rule: x.clone(),
102 section: "root".to_string(),
103 })
104 }
105 }
106 }
107
108 Ok(config)
109}
110
111fn handle_node_block(config: &mut MmidsConfig, pair: Pair<Rule>) -> Result<(), ConfigParseError> {
112 let mut rules = pair.into_inner();
113 let name_node = rules.next().unwrap(); let name = name_node.as_str().trim();
115
116 match name.to_lowercase().as_str() {
117 "settings" => read_settings(config, rules)?,
118 "workflow" => read_workflow(config, rules, name_node.as_span().start_pos().line_col().0)?,
119 "reactor" => read_reactor(config, rules, name_node.as_span().start_pos().line_col().0)?,
120 _ => {
121 return Err(ConfigParseError::InvalidNodeName {
122 name: name.to_string(),
123 line: name_node.as_span().start_pos().line_col().0,
124 })
125 }
126 }
127
128 Ok(())
129}
130
131fn read_settings(config: &mut MmidsConfig, pairs: Pairs<Rule>) -> Result<(), ConfigParseError> {
132 for pair in pairs {
133 match pair.as_rule() {
134 Rule::child_node => {
135 let child_node = read_child_node(pair.clone())?;
136 if child_node.arguments.len() > 1 {
137 return Err(ConfigParseError::TooManySettingArguments {
138 line: get_line_number(&pair),
139 });
140 }
141
142 if let Some(key) = child_node.arguments.keys().nth(0) {
143 if let Some(Some(_value)) = child_node.arguments.get(key) {
144 return Err(ConfigParseError::InvalidSettingArgumentFormat {
145 line: get_line_number(&pair),
146 });
147 }
148
149 config.settings.insert(child_node.name, Some(key.clone()));
150 } else {
151 config.settings.insert(child_node.name, None);
152 }
153 }
154
155 Rule::argument => {
156 return Err(ConfigParseError::ArgumentsSpecifiedOnSettingNode {
157 line: get_line_number(&pair),
158 })
159 }
160
161 rule => {
162 return Err(ConfigParseError::UnexpectedRule {
163 rule,
164 section: "settings".to_string(),
165 })
166 }
167 }
168 }
169
170 Ok(())
171}
172
173fn read_workflow(
174 config: &mut MmidsConfig,
175 pairs: Pairs<Rule>,
176 starting_line: usize,
177) -> Result<(), ConfigParseError> {
178 let mut steps = Vec::new();
179 let mut workflow_name = None;
180 let mut routed_by_reactor = false;
181 for pair in pairs {
182 match pair.as_rule() {
183 Rule::child_node => {
184 let child_node = read_child_node(pair)?;
185 steps.push(WorkflowStepDefinition {
186 step_type: WorkflowStepType(child_node.name),
187 parameters: child_node.arguments,
188 });
189 }
190
191 Rule::argument => {
192 let (key, value) = read_argument(pair.clone())?;
193 if workflow_name.is_some() {
194 if &key == "routed_by_reactor" {
195 if value.is_some() {
196 return Err(ConfigParseError::InvalidRoutedByReactorArgument {
197 line: get_line_number(&pair),
198 });
199 }
200
201 routed_by_reactor = true;
202 } else {
203 let line = get_line_number(&pair);
204 warn!(
205 workflow_name = %workflow_name.as_ref().unwrap(),
206 line = %line,
207 argument = %key,
208 "Unknown argument '{}' for workflow {} on line {}",
209 key, workflow_name.as_ref().unwrap(), line,
210 );
211 }
212 } else {
213 if value.is_some() {
214 return Err(ConfigParseError::InvalidWorkflowName {
215 name: pair.as_str().to_string(),
216 line: get_line_number(&pair),
217 });
218 }
219
220 workflow_name = Some(key);
221 }
222 }
223
224 rule => {
225 return Err(ConfigParseError::UnexpectedRule {
226 rule,
227 section: "workflow".to_string(),
228 })
229 }
230 }
231 }
232
233 if let Some(name) = workflow_name {
234 if config.workflows.contains_key(&name) {
235 return Err(ConfigParseError::DuplicateWorkflowName { name });
236 }
237
238 config.workflows.insert(
239 name.to_string(),
240 WorkflowDefinition {
241 name,
242 steps,
243 routed_by_reactor,
244 },
245 );
246 } else {
247 return Err(ConfigParseError::NoNameOnWorkflow {
248 line: starting_line,
249 });
250 }
251
252 Ok(())
253}
254
255fn read_reactor(
256 config: &mut MmidsConfig,
257 pairs: Pairs<Rule>,
258 starting_line: usize,
259) -> Result<(), ConfigParseError> {
260 let mut name = None;
261 let mut parameters = HashMap::new();
262 let mut executor_name = None;
263 let mut update_interval = 0;
264
265 for pair in pairs {
266 match pair.as_rule() {
267 Rule::argument => {
268 let (key, value) = read_argument(pair.clone())?;
269 if name.is_none() {
270 if value.is_some() {
272 return Err(ConfigParseError::InvalidReactorName {
273 line: get_line_number(&pair),
274 name: pair.as_str().to_string(),
275 });
276 }
277
278 name = Some(key);
279 } else {
280 if key == "executor" {
281 if let Some(value) = value {
282 executor_name = Some(value);
283 }
284 } else if key == "update_interval" {
285 if let Some(value) = value {
286 if let Ok(num) = value.parse() {
287 update_interval = num;
288 } else {
289 return Err(ConfigParseError::InvalidUpdateIntervalValue {
290 line: get_line_number(&pair),
291 argument: value,
292 });
293 }
294 } else {
295 return Err(ConfigParseError::InvalidUpdateIntervalValue {
296 line: get_line_number(&pair),
297 argument: "".to_string(),
298 });
299 }
300 } else {
301 let line = get_line_number(&pair);
302 warn!(
303 line = %line,
304 argument = %key,
305 reactor_name = %name.as_ref().unwrap(),
306 "Unknown argument '{}' for reactor {} on line {}",
307 key, name.as_ref().unwrap(), line,
308 );
309 }
310 }
311 }
312
313 Rule::child_node => {
314 let line_number = pair.as_span().start_pos().line_col().0;
315 let child_node = read_child_node(pair)?;
316 if child_node.arguments.len() > 1 {
317 return Err(ConfigParseError::TooManyReactorParameterValues {
318 line: line_number,
319 });
320 }
321
322 if let Some(key) = child_node.arguments.keys().nth(0) {
323 if let Some(Some(_)) = child_node.arguments.get(key) {
324 return Err(ConfigParseError::InvalidReactorParameterValueFormat {
325 line: line_number,
326 });
327 }
328
329 parameters.insert(child_node.name, Some(key.clone()));
330 } else {
331 parameters.insert(child_node.name, None);
332 }
333 }
334
335 rule => {
336 return Err(ConfigParseError::UnexpectedRule {
337 rule,
338 section: "settings".to_string(),
339 })
340 }
341 }
342 }
343
344 if let Some(name) = name {
345 if config.reactors.contains_key(&name) {
346 return Err(ConfigParseError::DuplicateReactorName { name });
347 }
348
349 if let Some(executor) = executor_name {
350 config.reactors.insert(
351 name.to_string(),
352 ReactorDefinition {
353 name,
354 parameters,
355 executor,
356 update_interval: Duration::from_secs(update_interval),
357 },
358 );
359 } else {
360 return Err(ConfigParseError::NoExecutorForReactor {
361 line: starting_line,
362 });
363 }
364 } else {
365 return Err(ConfigParseError::NoNameOnReactor {
366 line: starting_line,
367 });
368 }
369
370 Ok(())
371}
372
373fn read_argument(pair: Pair<Rule>) -> Result<(String, Option<String>), ConfigParseError> {
374 let result;
375 let argument = pair.into_inner().nth(0).unwrap();
377 match argument.as_rule() {
378 Rule::argument_flag => {
379 result = (argument.as_str().to_string(), None);
380 }
381
382 Rule::quoted_string_value => {
383 result = (argument.as_str().to_string(), None);
384 }
385
386 Rule::key_value_pair => {
387 let mut key = "".to_string();
388 let mut value = "".to_string();
389 for inner in argument.into_inner() {
390 match inner.as_rule() {
391 Rule::key => key = inner.as_str().to_string(),
392 Rule::value => {
393 value = inner
396 .clone()
397 .into_inner()
398 .filter(|p| p.as_rule() == Rule::quoted_string_value)
399 .map(|p| p.as_str().to_string())
400 .nth(0)
401 .unwrap_or(inner.as_str().to_string());
402 }
403
404 rule => {
405 return Err(ConfigParseError::UnexpectedRule {
406 rule,
407 section: "argument".to_string(),
408 })
409 }
410 }
411 }
412
413 result = (key, Some(value));
414 }
415
416 _ => {
417 return Err(ConfigParseError::UnexpectedRule {
418 rule: argument.as_rule(),
419 section: "child_node argument".to_string(),
420 })
421 }
422 }
423
424 Ok(result)
425}
426
427fn read_child_node(child_node: Pair<Rule>) -> Result<ChildNode, ConfigParseError> {
428 let mut pairs = child_node.into_inner();
429 let name_node = pairs.next().unwrap(); let mut parsed_node = ChildNode {
431 name: name_node.as_str().to_string(),
432 arguments: HashMap::new(),
433 };
434
435 for pair in pairs {
436 match pair.as_rule() {
437 Rule::argument => {
438 let (key, value) = read_argument(pair)?;
439 parsed_node.arguments.insert(key, value);
440 }
441
442 rule => {
443 return Err(ConfigParseError::UnexpectedRule {
444 rule,
445 section: "child_node".to_string(),
446 })
447 }
448 }
449 }
450
451 Ok(parsed_node)
452}
453
454fn get_line_number(node: &Pair<Rule>) -> usize {
455 node.as_span().start_pos().line_col().0
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn can_parse_settings() {
464 let content = "
465settings {
466 first a
467 second \"C:\\program files\\ffmpeg\\bin\\ffmpeg.exe\"
468 flag
469
470}
471";
472
473 let config = parse(content).unwrap();
474 assert_eq!(config.settings.len(), 3, "Unexpected number of settings");
475 assert_eq!(
476 config.settings.get("first"),
477 Some(&Some("a".to_string())),
478 "Unexpected first value"
479 );
480 assert_eq!(
481 config.settings.get("second"),
482 Some(&Some(
483 "C:\\program files\\ffmpeg\\bin\\ffmpeg.exe".to_string()
484 )),
485 "Unexpected second value"
486 );
487 assert_eq!(
488 config.settings.get("flag"),
489 Some(&None),
490 "Unexpected flag value"
491 );
492 }
493
494 #[test]
495 fn can_read_single_workflow() {
496 let content = "
497workflow name {
498 rtmp_receive port=1935 app=receive stream_key=*
499 hls path=c:\\temp\\test.m3u8 segment_size=\"3\" size=640x480 flag
500}
501";
502 let config = parse(content).unwrap();
503 assert_eq!(config.workflows.len(), 1, "Unexpected number of workflows");
504 assert!(
505 config.workflows.contains_key("name"),
506 "workflow 'name' did not exist"
507 );
508
509 let workflow = config.workflows.get("name").unwrap();
510 assert_eq!(
511 workflow.name,
512 "name".to_string(),
513 "Unexpected workflow name"
514 );
515 assert_eq!(
516 workflow.steps.len(),
517 2,
518 "Unexpected number of workflow steps"
519 );
520 assert!(
521 !workflow.routed_by_reactor,
522 "Expected routed by reactor to be false"
523 );
524
525 let step1 = workflow.steps.get(0).unwrap();
526 assert_eq!(
527 step1.step_type.0,
528 "rtmp_receive".to_string(),
529 "Unexpected type of step 1"
530 );
531 assert_eq!(step1.parameters.len(), 3, "Unexpected number of parameters");
532 assert_eq!(
533 step1.parameters.get("port"),
534 Some(&Some("1935".to_string())),
535 "Unexpected step 1 port value"
536 );
537 assert_eq!(
538 step1.parameters.get("app"),
539 Some(&Some("receive".to_string())),
540 "Unexpected step 1 app value"
541 );
542 assert_eq!(
543 step1.parameters.get("stream_key"),
544 Some(&Some("*".to_string())),
545 "Unexpected step 1 stream_key value"
546 );
547
548 let step2 = workflow.steps.get(1).unwrap();
549 assert_eq!(
550 step2.step_type.0,
551 "hls".to_string(),
552 "Unexpected type of step 1"
553 );
554 assert_eq!(step2.parameters.len(), 4, "Unexpected number of parameters");
555 assert_eq!(
556 step2.parameters.get("path"),
557 Some(&Some("c:\\temp\\test.m3u8".to_string())),
558 "Unexpected step 2 path value"
559 );
560 assert_eq!(
561 step2.parameters.get("segment_size"),
562 Some(&Some("3".to_string())),
563 "Unexpected step 2 segment_size value"
564 );
565 assert_eq!(
566 step2.parameters.get("size"),
567 Some(&Some("640x480".to_string())),
568 "Unexpected step 2 size value"
569 );
570 assert_eq!(
571 step2.parameters.get("flag"),
572 Some(&None),
573 "Unexpected step 2 flag value"
574 );
575 }
576
577 #[test]
578 fn can_read_multiple_workflows() {
579 let content = "
580workflow name {
581 rtmp_receive port=1935 app=receive stream_key=*
582 hls path=c:\\temp\\test.m3u8 segment_size=\"3\" size=640x480 flag
583}
584
585workflow name2 {
586 another a
587}
588";
589 let config = parse(content).unwrap();
590
591 assert_eq!(config.workflows.len(), 2, "Unexpected number of workflows");
592 assert!(
593 config.workflows.contains_key("name"),
594 "Could not find a workflow named 'name'"
595 );
596 assert!(
597 config.workflows.contains_key("name2"),
598 "Could not find a workflow named 'name2'"
599 );
600 }
601
602 #[test]
603 fn can_read_single_reactor() {
604 let content = "
605reactor name executor=abc {
606 param1 value
607 param2 value2
608}
609";
610 let config = parse(content).unwrap();
611 assert_eq!(config.reactors.len(), 1, "Unexpected number of reactors");
612 assert!(
613 config.reactors.contains_key("name"),
614 "Reactor in config did not have the expected name"
615 );
616
617 let reactor = &config.reactors["name"];
618 assert_eq!(
619 reactor.name,
620 "name".to_string(),
621 "Unexpected name of reactor"
622 );
623 assert_eq!(reactor.executor, "abc".to_string(), "Unexpected executor");
624 assert_eq!(
625 reactor.parameters.len(),
626 2,
627 "Unexpected number of parameters"
628 );
629 assert_eq!(
630 reactor.parameters.get("param1"),
631 Some(&Some("value".to_string())),
632 "Unexpected param1 value"
633 );
634 assert_eq!(
635 reactor.parameters.get("param2"),
636 Some(&Some("value2".to_string())),
637 "Unexpected param2 value"
638 );
639 }
640
641 #[test]
642 fn duplicate_workflow_name_returns_error() {
643 let content = "
644workflow name {
645 rtmp_receive port=1935 app=receive stream_key=*
646 hls path=c:\\temp\\test.m3u8 segment_size=\"3\" size=640x480 flag
647}
648
649workflow name {
650 another a
651}
652";
653 match parse(content) {
654 Err(ConfigParseError::DuplicateWorkflowName { name }) => {
655 if name != "name".to_string() {
656 panic!("Unexpected name in workflow: '{}'", name);
657 }
658 }
659 Err(e) => panic!(
660 "Expected duplicate workflow name error, instead got: {:?}",
661 e
662 ),
663 Ok(_) => panic!("Received successful parse, but an error was expected"),
664 }
665 }
666
667 #[test]
668 fn full_config_can_be_parsed() {
669 let content = "
670# comment
671settings {
672 first a # another comment
673 second \"C:\\program files\\ffmpeg\\bin\\ffmpeg.exe\"
674 flag
675
676}
677
678workflow name { #workflow comment
679 rtmp_receive port=1935 app=receive stream_key=* #step comment
680 hls path=c:\\temp\\test.m3u8 segment_size=\"3\" size=640x480 flag
681}
682
683workflow name2 {
684 another a
685}
686";
687 parse(content).unwrap();
688 }
689
690 #[test]
691 fn can_parse_routed_by_reactor_argument_on_workflow() {
692 let content = "
693workflow name routed_by_reactor {
694 rtmp_receive port=1935 app=receive stream_key=*
695}
696";
697
698 let config = parse(content).unwrap();
699 let workflow = config.workflows.get("name").unwrap();
700 assert!(
701 workflow.routed_by_reactor,
702 "Expected routed by workflow to be true"
703 );
704 }
705
706 #[test]
707 fn comments_can_have_greater_than_or_less_than_signs() {
708 let content = "
709settings {
710 # <test>
711}
712";
713 parse(content).unwrap();
714 }
715
716 #[test]
717 fn comments_can_have_back_ticks() {
718 let content = "\
719settings {
720 # `test `
721}
722";
723
724 parse(content).unwrap();
725 }
726}