1use cu29_traits::{CuError, CuResult};
7use petgraph::adj::NodeIndex;
8use petgraph::stable_graph::{EdgeIndex, StableDiGraph};
9use petgraph::visit::EdgeRef;
10use ron::extensions::Extensions;
11use ron::value::Value as RonValue;
12use ron::Options;
13use serde::{Deserialize, Deserializer, Serialize, Serializer};
14use std::collections::HashMap;
15use std::fmt;
16use std::fmt::Display;
17use std::fs::read_to_string;
18
19pub type NodeId = u32;
22
23#[derive(Serialize, Deserialize, Debug, Clone, Default)]
27pub struct ComponentConfig(pub HashMap<String, Value>);
28
29impl Display for ComponentConfig {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 let mut first = true;
32 write!(f, "{{")?;
33 for (key, value) in self.0.iter() {
34 if !first {
35 write!(f, ", ")?;
36 }
37 write!(f, "{key}: {value}")?;
38 first = false;
39 }
40 write!(f, "}}")
41 }
42}
43
44impl ComponentConfig {
46 #[allow(dead_code)]
47 pub fn new() -> Self {
48 ComponentConfig(HashMap::new())
49 }
50
51 #[allow(dead_code)]
52 pub fn get<T: From<Value>>(&self, key: &str) -> Option<T> {
53 self.0.get(key).map(|v| T::from(v.clone()))
54 }
55
56 #[allow(dead_code)]
57 pub fn set<T: Into<Value>>(&mut self, key: &str, value: T) {
58 self.0.insert(key.to_string(), value.into());
59 }
60}
61
62#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
71pub struct Value(RonValue);
72
73impl From<i32> for Value {
74 fn from(value: i32) -> Self {
75 Value(RonValue::Number(value.into()))
76 }
77}
78
79impl From<u32> for Value {
80 fn from(value: u32) -> Self {
81 Value(RonValue::Number((value as u64).into()))
82 }
83}
84
85impl From<u16> for Value {
86 fn from(value: u16) -> Self {
87 Value(RonValue::Number((value as u64).into()))
88 }
89}
90
91impl From<u8> for Value {
92 fn from(value: u8) -> Self {
93 Value(RonValue::Number((value as u64).into()))
94 }
95}
96
97impl From<f64> for Value {
98 fn from(value: f64) -> Self {
99 Value(RonValue::Number(value.into()))
100 }
101}
102
103impl From<Value> for bool {
104 fn from(value: Value) -> Self {
105 if let RonValue::Bool(v) = value.0 {
106 v
107 } else {
108 panic!("Expected a Boolean variant but got {value:?}")
109 }
110 }
111}
112
113impl From<Value> for u8 {
114 fn from(value: Value) -> Self {
115 if let RonValue::Number(num) = value.0 {
116 if let Some(i) = num.as_i64() {
117 i as u8
118 } else {
119 panic!("Expected an integer value but got {value:?}")
120 }
121 } else {
122 panic!("Expected a Number variant but got {value:?}")
123 }
124 }
125}
126
127impl From<Value> for u32 {
128 fn from(value: Value) -> Self {
129 if let RonValue::Number(num) = value.0 {
130 if let Some(i) = num.as_i64() {
131 i as u32
132 } else {
133 panic!("Expected an integer value but got {value:?}")
134 }
135 } else {
136 panic!("Expected a Number variant but got {value:?}")
137 }
138 }
139}
140
141impl From<Value> for i32 {
142 fn from(value: Value) -> Self {
143 if let RonValue::Number(num) = value.0 {
144 if let Some(i) = num.as_i64() {
145 i as i32
146 } else {
147 panic!("Expected an integer value but got {value:?}")
148 }
149 } else {
150 panic!("Expected a Number variant but got {value:?}")
151 }
152 }
153}
154
155impl From<Value> for f64 {
156 fn from(value: Value) -> Self {
157 if let RonValue::Number(num) = value.0 {
158 if let Some(f) = num.as_f64() {
159 f
160 } else {
161 panic!("Expected a float value but got {value:?}")
162 }
163 } else {
164 panic!("Expected a Number variant but got {value:?}")
165 }
166 }
167}
168
169impl From<String> for Value {
170 fn from(value: String) -> Self {
171 Value(RonValue::String(value))
172 }
173}
174
175impl From<Value> for String {
176 fn from(value: Value) -> Self {
177 if let RonValue::String(s) = value.0 {
178 s
179 } else {
180 panic!("Expected a String variant")
181 }
182 }
183}
184
185impl Display for Value {
186 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187 match &self.0 {
188 RonValue::Number(n) => write!(f, "{}", n.as_i64().unwrap()),
189 RonValue::String(s) => write!(f, "{s}"),
190 RonValue::Bool(b) => write!(f, "{b}"),
191 RonValue::Map(m) => write!(f, "{m:?}"),
192 RonValue::Char(c) => write!(f, "{c:?}"),
193 RonValue::Unit => write!(f, "unit"),
194 RonValue::Option(o) => write!(f, "{o:?}"),
195 RonValue::Seq(s) => write!(f, "{s:?}"),
196 }
197 }
198}
199
200#[derive(Serialize, Deserialize, Debug, Clone)]
203pub struct Node {
204 id: String,
205 #[serde(rename = "type", skip_serializing_if = "Option::is_none")]
206 type_: Option<String>,
207 #[serde(skip_serializing_if = "Option::is_none")]
208 config: Option<ComponentConfig>,
209}
210
211impl Node {
212 #[allow(dead_code)]
213 pub fn new(id: &str, ptype: &str) -> Self {
214 Node {
215 id: id.to_string(),
216 type_: Some(ptype.to_string()),
217 config: None,
219 }
220 }
221
222 #[allow(dead_code)]
223 pub fn get_id(&self) -> String {
224 self.id.clone()
225 }
226
227 #[allow(dead_code)]
228 pub fn set_type(mut self, name: Option<String>) -> Self {
229 self.type_ = name;
230 self
231 }
232
233 pub fn get_type(&self) -> &str {
234 self.type_.as_ref().unwrap()
235 }
236
237 #[allow(dead_code)]
238 pub fn get_instance_config(&self) -> Option<&ComponentConfig> {
239 self.config.as_ref()
240 }
241
242 #[allow(dead_code)]
243 pub fn get_param<T: From<Value>>(&self, key: &str) -> Option<T> {
244 let pc = self.config.as_ref()?;
245 let v = pc.0.get(key)?;
246 Some(T::from(v.clone()))
247 }
248
249 #[allow(dead_code)]
250 pub fn set_param<T: Into<Value>>(&mut self, key: &str, value: T) {
251 if self.config.is_none() {
252 self.config = Some(ComponentConfig(HashMap::new()));
253 }
254 self.config
255 .as_mut()
256 .unwrap()
257 .0
258 .insert(key.to_string(), value.into());
259 }
260}
261
262#[derive(Serialize, Deserialize, Debug, Clone)]
264pub struct Cnx {
265 src: String,
267
268 dst: String,
270
271 pub msg: String,
273
274 pub batch: Option<u32>,
278
279 pub store: Option<bool>,
281}
282
283#[derive(Debug, Clone)]
286pub struct CuConfig {
287 pub graph: StableDiGraph<Node, Cnx, NodeId>,
289 pub monitor: Option<MonitorConfig>,
290 pub logging: Option<LoggingConfig>,
291}
292
293#[derive(Serialize, Deserialize, Default, Debug, Clone)]
294pub struct MonitorConfig {
295 #[serde(rename = "type")]
296 type_: String,
297 #[serde(skip_serializing_if = "Option::is_none")]
298 config: Option<ComponentConfig>,
299}
300
301impl MonitorConfig {
302 #[allow(dead_code)]
303 pub fn get_type(&self) -> &str {
304 &self.type_
305 }
306
307 #[allow(dead_code)]
308 pub fn get_config(&self) -> Option<&ComponentConfig> {
309 self.config.as_ref()
310 }
311}
312
313fn default_as_true() -> bool {
314 true
315}
316
317#[derive(Serialize, Deserialize, Default, Debug, Clone)]
318pub struct LoggingConfig {
319 #[serde(skip_serializing_if = "Option::is_none")]
320 pub slab_size_mib: Option<u64>,
321 #[serde(skip_serializing_if = "Option::is_none")]
322 pub section_size_mib: Option<u64>,
323 #[serde(default = "default_as_true", skip_serializing_if = "Clone::clone")]
324 pub enable_task_logging: bool,
325}
326
327#[derive(Serialize, Deserialize, Default)]
329struct CuConfigRepresentation {
330 tasks: Vec<Node>,
331 cnx: Vec<Cnx>,
332 monitor: Option<MonitorConfig>,
333 logging: Option<LoggingConfig>,
334}
335
336impl<'de> Deserialize<'de> for CuConfig {
337 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
339 where
340 D: Deserializer<'de>,
341 {
342 let representation =
343 CuConfigRepresentation::deserialize(deserializer).map_err(serde::de::Error::custom)?;
344
345 let mut cuconfig = CuConfig::default();
346 for task in representation.tasks {
347 cuconfig.add_node(task);
348 }
349
350 for c in representation.cnx {
351 let src = cuconfig
352 .graph
353 .node_indices()
354 .find(|i| cuconfig.graph[*i].id == c.src)
355 .expect("Source node not found");
356 let dst = cuconfig
357 .graph
358 .node_indices()
359 .find(|i| cuconfig.graph[*i].id == c.dst)
360 .unwrap_or_else(|| panic!("Destination {} node not found", c.dst));
361 cuconfig.connect_ext(
362 src.index() as NodeId,
363 dst.index() as NodeId,
364 &c.msg,
365 c.batch,
366 c.store,
367 );
368 }
369 cuconfig.monitor = representation.monitor;
370 cuconfig.logging = representation.logging;
371 Ok(cuconfig)
372 }
373}
374
375impl Serialize for CuConfig {
376 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
378 where
379 S: Serializer,
380 {
381 let tasks: Vec<Node> = self
382 .graph
383 .node_indices()
384 .map(|idx| self.graph[idx].clone())
385 .collect();
386
387 let cnx: Vec<Cnx> = self
388 .graph
389 .edge_indices()
390 .map(|edge| self.graph[edge].clone())
391 .collect();
392
393 CuConfigRepresentation {
394 tasks,
395 cnx,
396 monitor: self.monitor.clone(),
397 logging: self.logging.clone(),
398 }
399 .serialize(serializer)
400 }
401}
402
403impl Default for CuConfig {
404 fn default() -> Self {
405 CuConfig {
406 graph: StableDiGraph::new(),
407 monitor: None,
408 logging: None,
409 }
410 }
411}
412
413impl CuConfig {
416 pub fn add_node(&mut self, node: Node) -> NodeId {
418 self.graph.add_node(node).index() as NodeId
419 }
420
421 #[allow(dead_code)] pub fn get_node(&self, node_id: NodeId) -> Option<&Node> {
424 self.graph.node_weight(node_id.into())
425 }
426
427 #[allow(dead_code)] pub fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> {
430 self.graph.node_weight_mut(node_id.into())
431 }
432
433 #[allow(dead_code)] pub fn get_node_output_msg_type(&self, node_id: &str) -> Option<String> {
436 self.graph.node_indices().find_map(|node_index| {
437 if let Some(node) = self.get_node(node_index.index() as u32) {
438 if node.id != node_id {
439 return None;
440 }
441 let edges = self.get_src_edges(node_index.index() as u32);
442 if edges.is_empty() {
443 panic!("A CuSrcTask is configured with no task connected to it.")
444 }
445 let cnx = self
446 .graph
447 .edge_weight(EdgeIndex::new(edges[0]))
448 .expect("Found an cnx id but could not retrieve it back");
449 return Some(cnx.msg.clone());
450 }
451 None
452 })
453 }
454
455 #[allow(dead_code)] pub fn get_node_input_msg_type(&self, node_id: &str) -> Option<String> {
458 self.graph.node_indices().find_map(|node_index| {
459 if let Some(node) = self.get_node(node_index.index() as u32) {
460 if node.id != node_id {
461 return None;
462 }
463 let edges = self.get_dst_edges(node_index.index() as u32);
464 if edges.is_empty() {
465 panic!("A CuSinkTask is configured with no task connected to it.")
466 }
467 let cnx = self
468 .graph
469 .edge_weight(EdgeIndex::new(edges[0]))
470 .expect("Found an cnx id but could not retrieve it back");
471 return Some(cnx.msg.clone());
472 }
473 None
474 })
475 }
476
477 pub fn get_src_edges(&self, node_id: NodeId) -> Vec<usize> {
479 self.graph
480 .edges_directed(node_id.into(), petgraph::Direction::Outgoing)
481 .map(|edge| edge.id().index())
482 .collect()
483 }
484
485 pub fn get_dst_edges(&self, node_id: NodeId) -> Vec<usize> {
487 self.graph
488 .edges_directed(node_id.into(), petgraph::Direction::Incoming)
489 .map(|edge| edge.id().index())
490 .collect()
491 }
492
493 #[allow(dead_code)]
494 pub fn get_edge_weight(&self, index: usize) -> Option<Cnx> {
495 self.graph.edge_weight(EdgeIndex::new(index)).cloned()
496 }
497
498 pub fn get_all_nodes(&self) -> Vec<(NodeIndex, &Node)> {
500 self.graph
501 .node_indices()
502 .map(|index| (index.index() as u32, &self.graph[index]))
503 .collect()
504 }
505
506 pub fn connect_ext(
511 &mut self,
512 source: NodeId,
513 target: NodeId,
514 msg_type: &str,
515 batch: Option<u32>,
516 store: Option<bool>,
517 ) {
518 self.graph.add_edge(
519 source.into(),
520 target.into(),
521 Cnx {
522 src: self
523 .get_node(source)
524 .expect("Source node not found")
525 .id
526 .clone(),
527 dst: self
528 .get_node(target)
529 .expect("Target node not found")
530 .id
531 .clone(),
532 msg: msg_type.to_string(),
533 batch,
534 store,
535 },
536 );
537 }
538
539 #[allow(dead_code)]
542 pub fn connect(&mut self, source: NodeId, target: NodeId, msg_type: &str) {
543 self.connect_ext(source, target, msg_type, None, None);
544 }
545
546 fn get_options() -> Options {
547 Options::default()
548 .with_default_extension(Extensions::IMPLICIT_SOME)
549 .with_default_extension(Extensions::UNWRAP_NEWTYPES)
550 .with_default_extension(Extensions::UNWRAP_VARIANT_NEWTYPES)
551 }
552
553 #[allow(dead_code)]
554 pub fn serialize_ron(&self) -> String {
555 let ron = Self::get_options();
556 let pretty = ron::ser::PrettyConfig::default();
557 ron.to_string_pretty(&self, pretty).unwrap()
558 }
559
560 pub fn deserialize_ron(ron: &str) -> Self {
561 match Self::get_options().from_str(ron) {
562 Ok(ron) => ron,
563 Err(e) => panic!(
564 "Syntax Error in config: {} at position {}",
565 e.code, e.position
566 ),
567 }
568 }
569
570 pub fn render(&self, output: &mut dyn std::io::Write) {
572 writeln!(output, "digraph G {{").unwrap();
573
574 for index in self.graph.node_indices() {
575 let node = &self.graph[index];
576 let config_str = match &node.config {
577 Some(config) => {
578 let config_str = config
579 .0
580 .iter()
581 .map(|(k, v)| format!("<B>{k}</B> = {v}<BR ALIGN=\"LEFT\"/>"))
582 .collect::<Vec<String>>()
583 .join("\n");
584 format!("<BR/>____________<BR ALIGN=\"LEFT\"/>{config_str}")
585 }
586 None => String::new(),
587 };
588 writeln!(output, "{} [", index.index()).unwrap();
589 writeln!(output, "shape=box,").unwrap();
590 writeln!(output, "style=\"rounded, filled\",").unwrap();
591 writeln!(output, "fontname=\"Noto Sans\"").unwrap();
592
593 let is_src = self.get_dst_edges(index.index() as NodeId).is_empty();
594 let is_sink = self.get_src_edges(index.index() as NodeId).is_empty();
595 if is_src {
596 writeln!(output, "fillcolor=lightgreen,").unwrap();
597 } else if is_sink {
598 writeln!(output, "fillcolor=lightblue,").unwrap();
599 } else {
600 writeln!(output, "fillcolor=lightgrey,").unwrap();
601 }
602 writeln!(output, "color=grey,").unwrap();
603
604 writeln!(output, "labeljust=l,").unwrap();
605 writeln!(
606 output,
607 "label=< <FONT COLOR=\"red\"><B>{}</B></FONT><BR ALIGN=\"LEFT\"/><BR ALIGN=\"RIGHT\"/><FONT COLOR=\"dimgray\">{}</FONT><BR ALIGN=\"LEFT\"/>{} >",
608 node.id,
609 node.get_type(),
610 config_str
611 )
612 .unwrap();
613
614 writeln!(output, "];").unwrap();
615 }
616 for edge in self.graph.edge_indices() {
617 let (src, dst) = self.graph.edge_endpoints(edge).unwrap();
618
619 let cnx = &self.graph[edge];
620 writeln!(
621 output,
622 "{} -> {} [label=< <B><FONT COLOR=\"gray\">{}/{}/{}</FONT></B> >];",
623 src.index(),
624 dst.index(),
625 cnx.msg,
626 cnx.batch.unwrap_or(1),
627 cnx.store.unwrap_or(false)
628 )
629 .unwrap();
630 }
631 writeln!(output, "}}").unwrap();
632 }
633
634 #[allow(dead_code)]
635 pub fn get_all_instances_configs(&self) -> Vec<Option<&ComponentConfig>> {
636 self.get_all_nodes()
637 .iter()
638 .map(|(_, node)| node.get_instance_config())
639 .collect()
640 }
641
642 #[allow(dead_code)]
643 pub fn get_monitor_config(&self) -> Option<&MonitorConfig> {
644 self.monitor.as_ref()
645 }
646
647 pub fn validate_logging_config(&self) -> CuResult<()> {
650 if let Some(logging) = &self.logging {
651 return logging.validate();
652 }
653 Ok(())
654 }
655}
656
657impl LoggingConfig {
658 pub fn validate(&self) -> CuResult<()> {
660 if let Some(section_size_mib) = self.section_size_mib {
661 if let Some(slab_size_mib) = self.slab_size_mib {
662 if section_size_mib > slab_size_mib {
663 return Err(CuError::from(format!("Section size ({} MiB) cannot be larger than slab size ({} MiB). Adjust the parameters accordingly.", section_size_mib, slab_size_mib)));
664 }
665 }
666 }
667
668 Ok(())
669 }
670}
671
672pub fn read_configuration(config_filename: &str) -> CuResult<CuConfig> {
674 let config_content = read_to_string(config_filename).map_err(|e| {
675 CuError::from(format!(
676 "Failed to read configuration file: {:?}",
677 &config_filename
678 ))
679 .add_cause(e.to_string().as_str())
680 })?;
681 read_configuration_str(config_content)
682}
683
684pub fn read_configuration_str(config_content: String) -> CuResult<CuConfig> {
686 let cuconfig = CuConfig::deserialize_ron(&config_content);
687 cuconfig.validate_logging_config()?;
688
689 Ok(cuconfig)
690}
691
692#[cfg(test)]
694mod tests {
695 use super::*;
696
697 #[test]
698 fn test_plain_serialize() {
699 let mut config = CuConfig::default();
700 let n1 = config.add_node(Node::new("test1", "package::Plugin1"));
701 let n2 = config.add_node(Node::new("test2", "package::Plugin2"));
702 config.connect(n1, n2, "msgpkg::MsgType");
703 let serialized = config.serialize_ron();
704 let deserialized = CuConfig::deserialize_ron(&serialized);
705 assert_eq!(config.graph.node_count(), deserialized.graph.node_count());
706 assert_eq!(config.graph.edge_count(), deserialized.graph.edge_count());
707 }
708
709 #[test]
710 fn test_serialize_with_params() {
711 let mut config = CuConfig::default();
712 let mut camera = Node::new("copper-camera", "camerapkg::Camera");
713 camera.set_param::<Value>("resolution-height", 1080.into());
714 config.add_node(camera);
715 let serialized = config.serialize_ron();
716 let deserialized = CuConfig::deserialize_ron(&serialized);
717 assert_eq!(
718 deserialized
719 .get_node(0)
720 .unwrap()
721 .get_param::<i32>("resolution-height")
722 .unwrap(),
723 1080
724 );
725 }
726
727 #[test]
728 #[should_panic(expected = "Syntax Error in config: Expected opening `[` at position 1:10")]
729 fn test_deserialization_error() {
730 let txt = r#"( tasks: (), cnx: [], monitor: (type: "ExampleMonitor", ) ) "#;
732 CuConfig::deserialize_ron(txt);
733 }
734
735 #[test]
736 fn test_monitor() {
737 let txt = r#"( tasks: [], cnx: [], monitor: (type: "ExampleMonitor", ) ) "#;
738 let config = CuConfig::deserialize_ron(txt);
739 assert_eq!(config.monitor.as_ref().unwrap().type_, "ExampleMonitor");
740
741 let txt =
742 r#"( tasks: [], cnx: [], monitor: (type: "ExampleMonitor", config: { "toto": 4, } )) "#;
743 let config = CuConfig::deserialize_ron(txt);
744 assert_eq!(
745 config.monitor.as_ref().unwrap().config.as_ref().unwrap().0["toto"],
746 4.into()
747 );
748 }
749
750 #[test]
751 fn test_logging_parameters() {
752 let txt = r#"( tasks: [], cnx: [], logging: ( slab_size_mib: 1024, section_size_mib: 100, enable_task_logging: false ),) "#;
754
755 let config = CuConfig::deserialize_ron(txt);
756 assert!(config.logging.is_some());
757 let logging_config = config.logging.unwrap();
758 assert_eq!(logging_config.slab_size_mib.unwrap(), 1024);
759 assert_eq!(logging_config.section_size_mib.unwrap(), 100);
760 assert!(!logging_config.enable_task_logging);
761
762 let txt =
764 r#"( tasks: [], cnx: [], logging: ( slab_size_mib: 1024, section_size_mib: 100, ),) "#;
765 let config = CuConfig::deserialize_ron(txt);
766 assert!(config.logging.is_some());
767 let logging_config = config.logging.unwrap();
768 assert_eq!(logging_config.slab_size_mib.unwrap(), 1024);
769 assert_eq!(logging_config.section_size_mib.unwrap(), 100);
770 assert!(logging_config.enable_task_logging);
771 }
772
773 #[test]
774 fn test_validate_logging_config() {
775 let txt =
777 r#"( tasks: [], cnx: [], logging: ( slab_size_mib: 1024, section_size_mib: 100 ) )"#;
778 let config = CuConfig::deserialize_ron(txt);
779 assert!(config.validate_logging_config().is_ok());
780
781 let txt =
783 r#"( tasks: [], cnx: [], logging: ( slab_size_mib: 100, section_size_mib: 1024 ) )"#;
784 let config = CuConfig::deserialize_ron(txt);
785 assert!(config.validate_logging_config().is_err());
786 }
787}