1use crate::config::{Cnx, CuConfig, NodeId};
6use crate::config::{ComponentConfig, Node};
7use crate::copperlist::{CopperList, CopperListState, CuListsManager};
8use crate::monitoring::CuMonitor;
9use cu29_clock::{ClockProvider, RobotClock};
10use cu29_log_runtime::LoggerRuntime;
11use cu29_traits::CopperListTuple;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_unifiedlog::UnifiedLoggerWrite;
15use std::sync::{Arc, Mutex};
16
17use petgraph::prelude::*;
18use std::fmt::Debug;
19
20pub struct CopperContext {
22 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
23 pub logger_runtime: LoggerRuntime,
24 pub clock: RobotClock,
25}
26
27pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
31 pub tasks: CT,
33
34 pub monitor: M,
35
36 pub copper_lists_manager: CuListsManager<P, NBCL>,
38
39 pub clock: RobotClock, logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
44}
45
46impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
48 for CuRuntime<CT, P, M, NBCL>
49{
50 fn get_clock(&self) -> RobotClock {
51 self.clock.clone()
52 }
53}
54
55impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
56 pub fn new(
57 clock: RobotClock,
58 config: &CuConfig,
59 tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
60 monitor_instanciator: impl Fn(&CuConfig) -> M,
61 logger: impl WriteStream<CopperList<P>> + 'static,
62 ) -> CuResult<Self> {
63 let all_instances_configs: Vec<Option<&ComponentConfig>> = config
64 .get_all_nodes()
65 .iter()
66 .map(|(_, node)| node.get_instance_config())
67 .collect();
68 let tasks = tasks_instanciator(all_instances_configs)?;
69
70 let monitor = monitor_instanciator(config);
71
72 let logger_: Option<Box<dyn WriteStream<CopperList<P>>>> =
74 if let Some(logging_config) = &config.logging {
75 if logging_config.enable_task_logging {
76 Some(Box::new(logger))
77 } else {
78 None
79 }
80 } else {
81 Some(Box::new(logger))
82 };
83
84 let runtime = Self {
85 tasks,
86 monitor,
87 copper_lists_manager: CuListsManager::new(), clock,
89 logger: logger_,
90 };
91
92 Ok(runtime)
93 }
94
95 pub fn available_copper_lists(&self) -> usize {
96 NBCL - self.copper_lists_manager.len()
97 }
98
99 pub fn end_of_processing(&mut self, culistid: u32) {
100 let mut is_top = true;
101 let mut nb_done = 0;
102 self.copper_lists_manager.iter_mut().for_each(|cl| {
103 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
104 cl.change_state(CopperListState::DoneProcessing);
105 }
106 if is_top && cl.get_state() == CopperListState::DoneProcessing {
109 if let Some(logger) = &mut self.logger {
110 cl.change_state(CopperListState::BeingSerialized);
111 logger.log(cl).unwrap();
112 }
113 cl.change_state(CopperListState::Free);
114 nb_done += 1;
115 } else {
116 is_top = false;
117 }
118 });
119 for _ in 0..nb_done {
120 let _ = self.copper_lists_manager.pop();
121 }
122 }
123}
124
125#[derive(Debug, PartialEq, Eq, Clone, Copy)]
130pub enum CuTaskType {
131 Source,
132 Regular,
133 Sink,
134}
135
136pub struct CuExecutionStep {
138 pub node_id: NodeId,
140 pub node: Node,
142 pub task_type: CuTaskType,
144
145 pub input_msg_indices_types: Vec<(u32, String)>,
147
148 pub output_msg_index_type: Option<(u32, String)>,
150}
151
152impl Debug for CuExecutionStep {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
155 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
156 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
157 f.write_str(
158 format!(
159 " input_msg_types: {:?}\n",
160 self.input_msg_indices_types
161 )
162 .as_str(),
163 )?;
164 f.write_str(
165 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
166 )?;
167 Ok(())
168 }
169}
170
171pub struct CuExecutionLoop {
176 pub steps: Vec<CuExecutionUnit>,
177 pub loop_count: Option<u32>,
178}
179
180impl Debug for CuExecutionLoop {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 f.write_str("CuExecutionLoop:\n")?;
183 for step in &self.steps {
184 match step {
185 CuExecutionUnit::Step(step) => {
186 step.fmt(f)?;
187 }
188 CuExecutionUnit::Loop(l) => {
189 l.fmt(f)?;
190 }
191 }
192 }
193
194 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
195 Ok(())
196 }
197}
198
199#[derive(Debug)]
201pub enum CuExecutionUnit {
202 Step(CuExecutionStep),
203 Loop(CuExecutionLoop),
204}
205
206fn find_output_index_type_from_nodeid(
207 node_id: NodeId,
208 steps: &Vec<CuExecutionUnit>,
209) -> Option<(u32, String)> {
210 for step in steps {
211 match step {
212 CuExecutionUnit::Loop(loop_unit) => {
213 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
214 return Some(index);
215 }
216 }
217 CuExecutionUnit::Step(step) => {
218 if step.node_id == node_id {
219 return step.output_msg_index_type.clone();
220 }
221 }
222 }
223 }
224 None
225}
226
227pub fn find_task_type_for_id(
228 graph: &StableDiGraph<Node, Cnx, NodeId>,
229 node_id: NodeId,
230) -> CuTaskType {
231 if graph.neighbors_directed(node_id.into(), Incoming).count() == 0 {
232 CuTaskType::Source
233 } else if graph.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
234 CuTaskType::Sink
235 } else {
236 CuTaskType::Regular
237 }
238}
239
240fn find_edge_with_plan_input_id(
243 plan: &[CuExecutionUnit],
244 config: &CuConfig,
245 plan_id: u32,
246 output_node_id: NodeId,
247) -> usize {
248 let input_node = plan
249 .get(plan_id as usize)
250 .expect("Input step should've been added to plan before the step that receives the input");
251 let CuExecutionUnit::Step(input_step) = input_node else {
252 panic!("Expected input to be from a step, not a loop");
253 };
254 let input_node_id = input_step.node_id;
255
256 config
257 .graph
258 .edges_connecting(input_node_id.into(), output_node_id.into())
259 .map(|edge| edge.id().index())
260 .next()
261 .expect("An edge connecting the input to the output should exist")
262}
263
264fn sort_inputs_by_cnx_id(
267 input_msg_indices_types: &mut [(u32, String)],
268 plan: &[CuExecutionUnit],
269 config: &CuConfig,
270 curr_node_id: NodeId,
271) {
272 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
273 let a_edge_id = find_edge_with_plan_input_id(plan, config, *a_index, curr_node_id);
274 let b_edge_id = find_edge_with_plan_input_id(plan, config, *b_index, curr_node_id);
275 a_edge_id.cmp(&b_edge_id)
276 });
277}
278fn plan_tasks_tree_branch(
280 config: &CuConfig,
281 mut next_culist_output_index: u32,
282 starting_point: NodeId,
283 plan: &mut Vec<CuExecutionUnit>,
284) -> u32 {
285 let mut visitor = Bfs::new(&config.graph, starting_point.into());
287
288 while let Some(node) = visitor.next(&config.graph) {
289 let id = node.index() as NodeId;
290 let node = config.get_node(id).unwrap();
291
292 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
293 let output_msg_index_type: Option<(u32, String)>;
294
295 let task_type = find_task_type_for_id(&config.graph, id);
296
297 match task_type {
298 CuTaskType::Source => {
299 output_msg_index_type = Some((
300 next_culist_output_index,
301 config
302 .graph
303 .edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
304 .unwrap()
305 .msg
306 .clone(),
307 ));
308 next_culist_output_index += 1;
309 }
310 CuTaskType::Sink => {
311 let parents: Vec<NodeIndex> = config
312 .graph
313 .neighbors_directed(id.into(), Incoming)
314 .collect();
315
316 for parent in parents {
317 let index_type =
318 find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
319 if let Some(index_type) = index_type {
320 input_msg_indices_types.push(index_type);
321 } else {
322 return next_culist_output_index;
324 }
325 }
326 output_msg_index_type = Some((
328 next_culist_output_index,
329 "()".to_string(), ));
331 next_culist_output_index += 1;
332 }
333 CuTaskType::Regular => {
334 let parents: Vec<NodeIndex> = config
335 .graph
336 .neighbors_directed(id.into(), Incoming)
337 .collect();
338
339 for parent in parents {
340 let index_type =
341 find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
342 if let Some(index_type) = index_type {
343 input_msg_indices_types.push(index_type);
344 } else {
345 return next_culist_output_index;
347 }
348 }
349 output_msg_index_type = Some((
350 next_culist_output_index,
351 config
352 .graph
353 .edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
354 .unwrap()
355 .msg
356 .clone(),
357 ));
358 next_culist_output_index += 1;
359 }
360 }
361
362 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, config, id);
366
367 if let Some(pos) = plan.iter().position(|step| {
369 if let CuExecutionUnit::Step(ref s) = step {
370 s.node_id == id
371 } else {
372 false
373 }
374 }) {
375 let mut step = plan.remove(pos);
377 if let CuExecutionUnit::Step(ref mut s) = step {
378 s.input_msg_indices_types = input_msg_indices_types;
379 }
380 plan.push(step);
381 } else {
382 let step = CuExecutionStep {
384 node_id: id,
385 node: node.clone(),
386 task_type,
387 input_msg_indices_types,
388 output_msg_index_type,
389 };
390 plan.push(CuExecutionUnit::Step(step));
391 }
392 }
393 next_culist_output_index
394}
395
396pub fn compute_runtime_plan(config: &CuConfig) -> CuResult<CuExecutionLoop> {
399 let nodes_to_visit = config
401 .graph
402 .node_indices()
403 .filter(|node_id| {
404 let id = node_id.index() as NodeId;
405 let task_type = find_task_type_for_id(&config.graph, id);
406 task_type == CuTaskType::Source
407 })
408 .collect::<Vec<NodeIndex>>();
409
410 let mut next_culist_output_index = 0u32;
411 let mut plan: Vec<CuExecutionUnit> = Vec::new();
412
413 for node_index in &nodes_to_visit {
414 next_culist_output_index = plan_tasks_tree_branch(
415 config,
416 next_culist_output_index,
417 node_index.index() as NodeId,
418 &mut plan,
419 );
420 }
421
422 Ok(CuExecutionLoop {
423 steps: plan,
424 loop_count: None, })
426}
427
428#[cfg(test)]
430mod tests {
431 use super::*;
432 use crate::config::Node;
433 use crate::cutask::CuSinkTask;
434 use crate::cutask::{CuSrcTask, Freezable};
435 use crate::monitoring::NoMonitor;
436 use bincode::Encode;
437
438 pub struct TestSource {}
439
440 impl Freezable for TestSource {}
441
442 impl CuSrcTask<'_> for TestSource {
443 type Output = ();
444 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
445 where
446 Self: Sized,
447 {
448 Ok(Self {})
449 }
450
451 fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
452 Ok(())
453 }
454 }
455
456 pub struct TestSink {}
457
458 impl Freezable for TestSink {}
459
460 impl CuSinkTask<'_> for TestSink {
461 type Input = ();
462
463 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
464 where
465 Self: Sized,
466 {
467 Ok(Self {})
468 }
469
470 fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
471 Ok(())
472 }
473 }
474
475 type Tasks = (TestSource, TestSink);
477 type Msgs = ((),);
478
479 fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
480 Ok((
481 TestSource::new(all_instances_configs[0])?,
482 TestSink::new(all_instances_configs[1])?,
483 ))
484 }
485
486 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
487 NoMonitor {}
488 }
489
490 #[derive(Debug)]
491 struct FakeWriter {}
492
493 impl<E: Encode> WriteStream<E> for FakeWriter {
494 fn log(&mut self, _obj: &E) -> CuResult<()> {
495 Ok(())
496 }
497 }
498
499 #[test]
500 fn test_runtime_instantiation() {
501 let mut config = CuConfig::default();
502 config.add_node(Node::new("a", "TestSource"));
503 config.add_node(Node::new("b", "TestSink"));
504 config.connect(0, 1, "()");
505 let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
506 RobotClock::default(),
507 &config,
508 tasks_instanciator,
509 monitor_instanciator,
510 FakeWriter {},
511 );
512 assert!(runtime.is_ok());
513 }
514
515 #[test]
516 fn test_copperlists_manager_lifecycle() {
517 let mut config = CuConfig::default();
518 config.add_node(Node::new("a", "TestSource"));
519 config.add_node(Node::new("b", "TestSink"));
520 config.connect(0, 1, "()");
521 let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
522 RobotClock::default(),
523 &config,
524 tasks_instanciator,
525 monitor_instanciator,
526 FakeWriter {},
527 )
528 .unwrap();
529
530 {
532 let copperlists = &mut runtime.copper_lists_manager;
533 let culist0 = copperlists
534 .create()
535 .expect("Ran out of space for copper lists");
536 let id = culist0.id;
538 assert_eq!(id, 0);
539 culist0.change_state(CopperListState::Processing);
540 assert_eq!(runtime.available_copper_lists(), 1);
541 }
542
543 {
544 let copperlists = &mut runtime.copper_lists_manager;
545 let culist1 = copperlists
546 .create()
547 .expect("Ran out of space for copper lists"); let id = culist1.id;
549 assert_eq!(id, 1);
550 culist1.change_state(CopperListState::Processing);
551 assert_eq!(runtime.available_copper_lists(), 0);
552 }
553
554 {
555 let copperlists = &mut runtime.copper_lists_manager;
556 let culist2 = copperlists.create();
557 assert!(culist2.is_none());
558 assert_eq!(runtime.available_copper_lists(), 0);
559 }
560
561 runtime.end_of_processing(1);
563 assert_eq!(runtime.available_copper_lists(), 1);
564
565 {
567 let copperlists = &mut runtime.copper_lists_manager;
568 let culist2 = copperlists
569 .create()
570 .expect("Ran out of space for copper lists"); let id = culist2.id;
572 assert_eq!(id, 2);
573 culist2.change_state(CopperListState::Processing);
574 assert_eq!(runtime.available_copper_lists(), 0);
575 }
576
577 runtime.end_of_processing(0);
579 assert_eq!(runtime.available_copper_lists(), 0);
581
582 runtime.end_of_processing(2);
584 assert_eq!(runtime.available_copper_lists(), 2);
587 }
588
589 #[test]
590 fn test_runtime_task_input_order() {
591 let mut config = CuConfig::default();
592 let src1_id = config.add_node(Node::new("a", "Source1"));
593 let src2_id = config.add_node(Node::new("b", "Source2"));
594 let sink_id = config.add_node(Node::new("c", "Sink"));
595
596 assert_eq!(src1_id, 0);
597 assert_eq!(src2_id, 1);
598
599 let src1_type = "src1_type";
601 let src2_type = "src2_type";
602 config.connect(src2_id, sink_id, src2_type);
603 config.connect(src1_id, sink_id, src1_type);
604
605 let src1_edge_id = *config.get_src_edges(src1_id).first().unwrap();
606 let src2_edge_id = *config.get_src_edges(src2_id).first().unwrap();
607 assert_eq!(src1_edge_id, 1);
610 assert_eq!(src2_edge_id, 0);
611
612 let runtime = compute_runtime_plan(&config).unwrap();
613 let sink_step = runtime
614 .steps
615 .iter()
616 .find_map(|step| match step {
617 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
618 _ => None,
619 })
620 .unwrap();
621
622 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
625 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
626 }
627}