1use cu29_traits::{CuError, CuResult};
7use petgraph::adj::NodeIndex;
8use petgraph::stable_graph::{EdgeIndex, StableDiGraph};
9use petgraph::visit::EdgeRef;
10use ron::extensions::Extensions;
11use ron::value::Value as RonValue;
12use ron::Options;
13use serde::{Deserialize, Deserializer, Serialize, Serializer};
14use std::collections::HashMap;
15use std::fmt;
16use std::fmt::Display;
17use std::fs::read_to_string;
18
19pub type NodeId = u32;
22
23#[derive(Serialize, Deserialize, Debug, Clone, Default)]
27pub struct ComponentConfig(pub HashMap<String, Value>);
28
29impl Display for ComponentConfig {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 let mut first = true;
32 let ComponentConfig(config) = self;
33 write!(f, "{{")?;
34 for (key, value) in config.iter() {
35 if !first {
36 write!(f, ", ")?;
37 }
38 write!(f, "{key}: {value}")?;
39 first = false;
40 }
41 write!(f, "}}")
42 }
43}
44
45impl ComponentConfig {
47 #[allow(dead_code)]
48 pub fn new() -> Self {
49 ComponentConfig(HashMap::new())
50 }
51
52 #[allow(dead_code)]
53 pub fn get<T: From<Value>>(&self, key: &str) -> Option<T> {
54 let ComponentConfig(config) = self;
55 config.get(key).map(|v| T::from(v.clone()))
56 }
57
58 #[allow(dead_code)]
59 pub fn set<T: Into<Value>>(&mut self, key: &str, value: T) {
60 let ComponentConfig(config) = self;
61 config.insert(key.to_string(), value.into());
62 }
63}
64
65#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
74pub struct Value(RonValue);
75
76impl From<i32> for Value {
77 fn from(value: i32) -> Self {
78 Value(RonValue::Number(value.into()))
79 }
80}
81
82impl From<u32> for Value {
83 fn from(value: u32) -> Self {
84 Value(RonValue::Number((value as u64).into()))
85 }
86}
87
88impl From<u16> for Value {
89 fn from(value: u16) -> Self {
90 Value(RonValue::Number((value as u64).into()))
91 }
92}
93
94impl From<u8> for Value {
95 fn from(value: u8) -> Self {
96 Value(RonValue::Number((value as u64).into()))
97 }
98}
99
100impl From<f64> for Value {
101 fn from(value: f64) -> Self {
102 Value(RonValue::Number(value.into()))
103 }
104}
105
106impl From<Value> for bool {
107 fn from(value: Value) -> Self {
108 if let Value(RonValue::Bool(v)) = value {
109 v
110 } else {
111 panic!("Expected a Boolean variant but got {value:?}")
112 }
113 }
114}
115
116impl From<Value> for u8 {
117 fn from(value: Value) -> Self {
118 if let Value(RonValue::Number(num)) = value {
119 if let Some(i) = num.as_i64() {
120 i as u8
121 } else {
122 panic!("Expected an integer value but got {value:?}")
123 }
124 } else {
125 panic!("Expected a Number variant but got {value:?}")
126 }
127 }
128}
129
130impl From<Value> for u32 {
131 fn from(value: Value) -> Self {
132 if let Value(RonValue::Number(num)) = value {
133 if let Some(i) = num.as_i64() {
134 i as u32
135 } else {
136 panic!("Expected an integer value but got {value:?}")
137 }
138 } else {
139 panic!("Expected a Number variant but got {value:?}")
140 }
141 }
142}
143
144impl From<Value> for i32 {
145 fn from(value: Value) -> Self {
146 if let Value(RonValue::Number(num)) = value {
147 if let Some(i) = num.as_i64() {
148 i as i32
149 } else {
150 panic!("Expected an integer value but got {value:?}")
151 }
152 } else {
153 panic!("Expected a Number variant but got {value:?}")
154 }
155 }
156}
157
158impl From<Value> for f64 {
159 fn from(value: Value) -> Self {
160 if let Value(RonValue::Number(num)) = value {
161 if let Some(f) = num.as_f64() {
162 f
163 } else {
164 panic!("Expected a float value but got {value:?}")
165 }
166 } else {
167 panic!("Expected a Number variant but got {value:?}")
168 }
169 }
170}
171
172impl From<String> for Value {
173 fn from(value: String) -> Self {
174 Value(RonValue::String(value))
175 }
176}
177
178impl From<Value> for String {
179 fn from(value: Value) -> Self {
180 if let Value(RonValue::String(s)) = value {
181 s
182 } else {
183 panic!("Expected a String variant")
184 }
185 }
186}
187
188impl Display for Value {
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 let Value(value) = self;
191 match value {
192 RonValue::Number(n) => write!(f, "{}", n.as_i64().unwrap()),
193 RonValue::String(s) => write!(f, "{s}"),
194 RonValue::Bool(b) => write!(f, "{b}"),
195 RonValue::Map(m) => write!(f, "{m:?}"),
196 RonValue::Char(c) => write!(f, "{c:?}"),
197 RonValue::Unit => write!(f, "unit"),
198 RonValue::Option(o) => write!(f, "{o:?}"),
199 RonValue::Seq(s) => write!(f, "{s:?}"),
200 }
201 }
202}
203
204#[derive(Serialize, Deserialize, Debug, Clone)]
207pub struct Node {
208 id: String,
209 #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
210 type_: Option<String>,
211 #[serde(skip_serializing_if = "Option::is_none")]
212 config: Option<ComponentConfig>,
213}
214
215impl Node {
216 #[allow(dead_code)]
217 pub fn new(id: &str, ptype: &str) -> Self {
218 Node {
219 id: id.to_string(),
220 type_: Some(ptype.to_string()),
221 config: None,
223 }
224 }
225
226 #[allow(dead_code)]
227 pub fn get_id(&self) -> String {
228 self.id.clone()
229 }
230
231 #[allow(dead_code)]
232 pub fn set_type(mut self, name: Option<String>) -> Self {
233 self.type_ = name;
234 self
235 }
236
237 pub fn get_type(&self) -> &str {
238 self.type_.as_ref().unwrap()
239 }
240
241 #[allow(dead_code)]
242 pub fn get_instance_config(&self) -> Option<&ComponentConfig> {
243 self.config.as_ref()
244 }
245
246 #[allow(dead_code)]
247 pub fn get_param<T: From<Value>>(&self, key: &str) -> Option<T> {
248 let pc = self.config.as_ref()?;
249 let ComponentConfig(pc) = pc;
250 let v = pc.get(key)?;
251 Some(T::from(v.clone()))
252 }
253
254 #[allow(dead_code)]
255 pub fn set_param<T: Into<Value>>(&mut self, key: &str, value: T) {
256 if self.config.is_none() {
257 self.config = Some(ComponentConfig(HashMap::new()));
258 }
259 let ComponentConfig(config) = self.config.as_mut().unwrap();
260 config.insert(key.to_string(), value.into());
261 }
262}
263
264#[derive(Serialize, Deserialize, Debug, Clone)]
266pub struct Cnx {
267 src: String,
269
270 dst: String,
272
273 pub msg: String,
275
276 pub batch: Option<u32>,
280
281 pub store: Option<bool>,
283}
284
285#[derive(Debug, Clone)]
288pub struct CuConfig {
289 pub graph: StableDiGraph<Node, Cnx, NodeId>,
291 pub monitor: Option<MonitorConfig>,
292 pub logging: Option<LoggingConfig>,
293}
294
295#[derive(Serialize, Deserialize, Default, Debug, Clone)]
296pub struct MonitorConfig {
297 #[serde(rename = "type")]
298 type_: String,
299 #[serde(skip_serializing_if = "Option::is_none")]
300 config: Option<ComponentConfig>,
301}
302
303impl MonitorConfig {
304 #[allow(dead_code)]
305 pub fn get_type(&self) -> &str {
306 &self.type_
307 }
308
309 #[allow(dead_code)]
310 pub fn get_config(&self) -> Option<&ComponentConfig> {
311 self.config.as_ref()
312 }
313}
314
315fn default_as_true() -> bool {
316 true
317}
318
319#[derive(Serialize, Deserialize, Default, Debug, Clone)]
320pub struct LoggingConfig {
321 #[serde(skip_serializing_if = "Option::is_none")]
322 pub slab_size_mib: Option<u64>,
323 #[serde(skip_serializing_if = "Option::is_none")]
324 pub section_size_mib: Option<u64>,
325 #[serde(default = "default_as_true", skip_serializing_if = "Clone::clone")]
326 pub enable_task_logging: bool,
327}
328
329#[derive(Serialize, Deserialize, Default)]
331struct CuConfigRepresentation {
332 tasks: Vec<Node>,
333 cnx: Vec<Cnx>,
334 monitor: Option<MonitorConfig>,
335 logging: Option<LoggingConfig>,
336}
337
338impl<'de> Deserialize<'de> for CuConfig {
339 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
341 where
342 D: Deserializer<'de>,
343 {
344 let representation =
345 CuConfigRepresentation::deserialize(deserializer).map_err(serde::de::Error::custom)?;
346
347 let mut cuconfig = CuConfig::default();
348 for task in representation.tasks {
349 cuconfig.add_node(task);
350 }
351
352 for c in representation.cnx {
353 let src = cuconfig
354 .graph
355 .node_indices()
356 .find(|i| cuconfig.graph[*i].id == c.src)
357 .expect("Source node not found");
358 let dst = cuconfig
359 .graph
360 .node_indices()
361 .find(|i| cuconfig.graph[*i].id == c.dst)
362 .unwrap_or_else(|| panic!("Destination {} node not found", c.dst));
363 cuconfig.connect_ext(
364 src.index() as NodeId,
365 dst.index() as NodeId,
366 &c.msg,
367 c.batch,
368 c.store,
369 );
370 }
371 cuconfig.monitor = representation.monitor;
372 cuconfig.logging = representation.logging;
373 Ok(cuconfig)
374 }
375}
376
377impl Serialize for CuConfig {
378 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
380 where
381 S: Serializer,
382 {
383 let tasks: Vec<Node> = self
384 .graph
385 .node_indices()
386 .map(|idx| self.graph[idx].clone())
387 .collect();
388
389 let cnx: Vec<Cnx> = self
390 .graph
391 .edge_indices()
392 .map(|edge| self.graph[edge].clone())
393 .collect();
394
395 CuConfigRepresentation {
396 tasks,
397 cnx,
398 monitor: self.monitor.clone(),
399 logging: self.logging.clone(),
400 }
401 .serialize(serializer)
402 }
403}
404
405impl Default for CuConfig {
406 fn default() -> Self {
407 CuConfig {
408 graph: StableDiGraph::new(),
409 monitor: None,
410 logging: None,
411 }
412 }
413}
414
415impl CuConfig {
418 pub fn add_node(&mut self, node: Node) -> NodeId {
420 self.graph.add_node(node).index() as NodeId
421 }
422
423 #[allow(dead_code)] pub fn get_node(&self, node_id: NodeId) -> Option<&Node> {
426 self.graph.node_weight(node_id.into())
427 }
428
429 #[allow(dead_code)] pub fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> {
432 self.graph.node_weight_mut(node_id.into())
433 }
434
435 #[allow(dead_code)] pub fn get_node_output_msg_type(&self, node_id: &str) -> Option<String> {
438 self.graph.node_indices().find_map(|node_index| {
439 if let Some(node) = self.get_node(node_index.index() as u32) {
440 if node.id != node_id {
441 return None;
442 }
443 let edges = self.get_src_edges(node_index.index() as u32);
444 if edges.is_empty() {
445 panic!("A CuSrcTask is configured with no task connected to it.")
446 }
447 let cnx = self
448 .graph
449 .edge_weight(EdgeIndex::new(edges[0]))
450 .expect("Found an cnx id but could not retrieve it back");
451 return Some(cnx.msg.clone());
452 }
453 None
454 })
455 }
456
457 #[allow(dead_code)] pub fn get_node_input_msg_type(&self, node_id: &str) -> Option<String> {
460 self.graph.node_indices().find_map(|node_index| {
461 if let Some(node) = self.get_node(node_index.index() as u32) {
462 if node.id != node_id {
463 return None;
464 }
465 let edges = self.get_dst_edges(node_index.index() as u32);
466 if edges.is_empty() {
467 panic!("A CuSinkTask is configured with no task connected to it.")
468 }
469 let cnx = self
470 .graph
471 .edge_weight(EdgeIndex::new(edges[0]))
472 .expect("Found an cnx id but could not retrieve it back");
473 return Some(cnx.msg.clone());
474 }
475 None
476 })
477 }
478
479 pub fn get_src_edges(&self, node_id: NodeId) -> Vec<usize> {
481 self.graph
482 .edges_directed(node_id.into(), petgraph::Direction::Outgoing)
483 .map(|edge| edge.id().index())
484 .collect()
485 }
486
487 pub fn get_dst_edges(&self, node_id: NodeId) -> Vec<usize> {
489 self.graph
490 .edges_directed(node_id.into(), petgraph::Direction::Incoming)
491 .map(|edge| edge.id().index())
492 .collect()
493 }
494
495 #[allow(dead_code)]
496 pub fn get_edge_weight(&self, index: usize) -> Option<Cnx> {
497 self.graph.edge_weight(EdgeIndex::new(index)).cloned()
498 }
499
500 pub fn get_all_nodes(&self) -> Vec<(NodeIndex, &Node)> {
502 self.graph
503 .node_indices()
504 .map(|index| (index.index() as u32, &self.graph[index]))
505 .collect()
506 }
507
508 pub fn connect_ext(
513 &mut self,
514 source: NodeId,
515 target: NodeId,
516 msg_type: &str,
517 batch: Option<u32>,
518 store: Option<bool>,
519 ) {
520 self.graph.add_edge(
521 source.into(),
522 target.into(),
523 Cnx {
524 src: self
525 .get_node(source)
526 .expect("Source node not found")
527 .id
528 .clone(),
529 dst: self
530 .get_node(target)
531 .expect("Target node not found")
532 .id
533 .clone(),
534 msg: msg_type.to_string(),
535 batch,
536 store,
537 },
538 );
539 }
540
541 #[allow(dead_code)]
544 pub fn connect(&mut self, source: NodeId, target: NodeId, msg_type: &str) {
545 self.connect_ext(source, target, msg_type, None, None);
546 }
547
548 fn get_options() -> Options {
549 Options::default()
550 .with_default_extension(Extensions::IMPLICIT_SOME)
551 .with_default_extension(Extensions::UNWRAP_NEWTYPES)
552 .with_default_extension(Extensions::UNWRAP_VARIANT_NEWTYPES)
553 }
554
555 #[allow(dead_code)]
556 pub fn serialize_ron(&self) -> String {
557 let ron = Self::get_options();
558 let pretty = ron::ser::PrettyConfig::default();
559 ron.to_string_pretty(&self, pretty).unwrap()
560 }
561
562 pub fn deserialize_ron(ron: &str) -> Self {
563 match Self::get_options().from_str(ron) {
564 Ok(ron) => ron,
565 Err(e) => panic!(
566 "Syntax Error in config: {} at position {}",
567 e.code, e.position
568 ),
569 }
570 }
571
572 pub fn render(&self, output: &mut dyn std::io::Write) {
574 writeln!(output, "digraph G {{").unwrap();
575
576 for index in self.graph.node_indices() {
577 let node = &self.graph[index];
578 let config_str = match &node.config {
579 Some(config) => {
580 let config_str = config
581 .0
582 .iter()
583 .map(|(k, v)| format!("<B>{k}</B> = {v}<BR ALIGN=\"LEFT\"/>"))
584 .collect::<Vec<String>>()
585 .join("\n");
586 format!("<BR/>____________<BR ALIGN=\"LEFT\"/>{config_str}")
587 }
588 None => String::new(),
589 };
590 writeln!(output, "{} [", index.index()).unwrap();
591 writeln!(output, "shape=box,").unwrap();
592 writeln!(output, "style=\"rounded, filled\",").unwrap();
593 writeln!(output, "fontname=\"Noto Sans\"").unwrap();
594
595 let is_src = self.get_dst_edges(index.index() as NodeId).is_empty();
596 let is_sink = self.get_src_edges(index.index() as NodeId).is_empty();
597 if is_src {
598 writeln!(output, "fillcolor=lightgreen,").unwrap();
599 } else if is_sink {
600 writeln!(output, "fillcolor=lightblue,").unwrap();
601 } else {
602 writeln!(output, "fillcolor=lightgrey,").unwrap();
603 }
604 writeln!(output, "color=grey,").unwrap();
605
606 writeln!(output, "labeljust=l,").unwrap();
607 writeln!(
608 output,
609 "label=< <FONT COLOR=\"red\"><B>{}</B></FONT><BR ALIGN=\"LEFT\"/><BR ALIGN=\"RIGHT\"/><FONT COLOR=\"dimgray\">{}</FONT><BR ALIGN=\"LEFT\"/>{} >",
610 node.id,
611 node.get_type(),
612 config_str
613 )
614 .unwrap();
615
616 writeln!(output, "];").unwrap();
617 }
618 for edge in self.graph.edge_indices() {
619 let (src, dst) = self.graph.edge_endpoints(edge).unwrap();
620
621 let cnx = &self.graph[edge];
622 writeln!(
623 output,
624 "{} -> {} [label=< <B><FONT COLOR=\"gray\">{}/{}/{}</FONT></B> >];",
625 src.index(),
626 dst.index(),
627 cnx.msg,
628 cnx.batch.unwrap_or(1),
629 cnx.store.unwrap_or(false)
630 )
631 .unwrap();
632 }
633 writeln!(output, "}}").unwrap();
634 }
635
636 #[allow(dead_code)]
637 pub fn get_all_instances_configs(&self) -> Vec<Option<&ComponentConfig>> {
638 self.get_all_nodes()
639 .iter()
640 .map(|(_, node)| node.get_instance_config())
641 .collect()
642 }
643
644 #[allow(dead_code)]
645 pub fn get_monitor_config(&self) -> Option<&MonitorConfig> {
646 self.monitor.as_ref()
647 }
648
649 pub fn validate_logging_config(&self) -> CuResult<()> {
652 if let Some(logging) = &self.logging {
653 return logging.validate();
654 }
655 Ok(())
656 }
657}
658
659impl LoggingConfig {
660 pub fn validate(&self) -> CuResult<()> {
662 if let Some(section_size_mib) = self.section_size_mib {
663 if let Some(slab_size_mib) = self.slab_size_mib {
664 if section_size_mib > slab_size_mib {
665 return Err(CuError::from(format!("Section size ({} MiB) cannot be larger than slab size ({} MiB). Adjust the parameters accordingly.", section_size_mib, slab_size_mib)));
666 }
667 }
668 }
669
670 Ok(())
671 }
672}
673
674pub fn read_configuration(config_filename: &str) -> CuResult<CuConfig> {
676 let config_content = read_to_string(config_filename).map_err(|e| {
677 CuError::from(format!(
678 "Failed to read configuration file: {:?}",
679 &config_filename
680 ))
681 .add_cause(e.to_string().as_str())
682 })?;
683 read_configuration_str(config_content)
684}
685
686pub fn read_configuration_str(config_content: String) -> CuResult<CuConfig> {
688 let cuconfig = CuConfig::deserialize_ron(&config_content);
689 cuconfig.validate_logging_config()?;
690
691 Ok(cuconfig)
692}
693
694#[cfg(test)]
696mod tests {
697 use super::*;
698
699 #[test]
700 fn test_plain_serialize() {
701 let mut config = CuConfig::default();
702 let n1 = config.add_node(Node::new("test1", "package::Plugin1"));
703 let n2 = config.add_node(Node::new("test2", "package::Plugin2"));
704 config.connect(n1, n2, "msgpkg::MsgType");
705 let serialized = config.serialize_ron();
706 let deserialized = CuConfig::deserialize_ron(&serialized);
707 assert_eq!(config.graph.node_count(), deserialized.graph.node_count());
708 assert_eq!(config.graph.edge_count(), deserialized.graph.edge_count());
709 }
710
711 #[test]
712 fn test_serialize_with_params() {
713 let mut config = CuConfig::default();
714 let mut camera = Node::new("copper-camera", "camerapkg::Camera");
715 camera.set_param::<Value>("resolution-height", 1080.into());
716 config.add_node(camera);
717 let serialized = config.serialize_ron();
718 let deserialized = CuConfig::deserialize_ron(&serialized);
719 assert_eq!(
720 deserialized
721 .get_node(0)
722 .unwrap()
723 .get_param::<i32>("resolution-height")
724 .unwrap(),
725 1080
726 );
727 }
728
729 #[test]
730 #[should_panic(expected = "Syntax Error in config: Expected opening `[` at position 1:10")]
731 fn test_deserialization_error() {
732 let txt = r#"( tasks: (), cnx: [], monitor: (type: "ExampleMonitor", ) ) "#;
734 CuConfig::deserialize_ron(txt);
735 }
736
737 #[test]
738 fn test_monitor() {
739 let txt = r#"( tasks: [], cnx: [], monitor: (type: "ExampleMonitor", ) ) "#;
740 let config = CuConfig::deserialize_ron(txt);
741 assert_eq!(config.monitor.as_ref().unwrap().type_, "ExampleMonitor");
742
743 let txt =
744 r#"( tasks: [], cnx: [], monitor: (type: "ExampleMonitor", config: { "toto": 4, } )) "#;
745 let config = CuConfig::deserialize_ron(txt);
746 assert_eq!(
747 config.monitor.as_ref().unwrap().config.as_ref().unwrap().0["toto"],
748 4.into()
749 );
750 }
751
752 #[test]
753 fn test_logging_parameters() {
754 let txt = r#"( tasks: [], cnx: [], logging: ( slab_size_mib: 1024, section_size_mib: 100, enable_task_logging: false ),) "#;
756
757 let config = CuConfig::deserialize_ron(txt);
758 assert!(config.logging.is_some());
759 let logging_config = config.logging.unwrap();
760 assert_eq!(logging_config.slab_size_mib.unwrap(), 1024);
761 assert_eq!(logging_config.section_size_mib.unwrap(), 100);
762 assert!(!logging_config.enable_task_logging);
763
764 let txt =
766 r#"( tasks: [], cnx: [], logging: ( slab_size_mib: 1024, section_size_mib: 100, ),) "#;
767 let config = CuConfig::deserialize_ron(txt);
768 assert!(config.logging.is_some());
769 let logging_config = config.logging.unwrap();
770 assert_eq!(logging_config.slab_size_mib.unwrap(), 1024);
771 assert_eq!(logging_config.section_size_mib.unwrap(), 100);
772 assert!(logging_config.enable_task_logging);
773 }
774
775 #[test]
776 fn test_validate_logging_config() {
777 let txt =
779 r#"( tasks: [], cnx: [], logging: ( slab_size_mib: 1024, section_size_mib: 100 ) )"#;
780 let config = CuConfig::deserialize_ron(txt);
781 assert!(config.validate_logging_config().is_ok());
782
783 let txt =
785 r#"( tasks: [], cnx: [], logging: ( slab_size_mib: 100, section_size_mib: 1024 ) )"#;
786 let config = CuConfig::deserialize_ron(txt);
787 assert!(config.validate_logging_config().is_err());
788 }
789
790 #[test]
792 fn test_deserialization_edge_id_assignment() {
793 let txt = r#"(
796 tasks: [(id: "src1", type: "a"), (id: "src2", type: "b"), (id: "sink", type: "c")],
797 cnx: [(src: "src2", dst: "sink", msg: "msg1"), (src: "src1", dst: "sink", msg: "msg2")]
798 )"#;
799 let config = CuConfig::deserialize_ron(txt);
800 assert!(config.validate_logging_config().is_ok());
801
802 let src1_id = 0;
804 assert_eq!(config.get_node(src1_id).unwrap().id, "src1");
805 let src2_id = 1;
806 assert_eq!(config.get_node(src2_id).unwrap().id, "src2");
807
808 let src1_edge_id = *config.get_src_edges(src1_id).first().unwrap();
811 assert_eq!(src1_edge_id, 1);
812 let src2_edge_id = *config.get_src_edges(src2_id).first().unwrap();
813 assert_eq!(src2_edge_id, 0);
814 }
815}