cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{Cnx, CuConfig, NodeId};
6use crate::config::{ComponentConfig, Node};
7use crate::copperlist::{CopperList, CopperListState, CuListsManager};
8use crate::monitoring::CuMonitor;
9use cu29_clock::{ClockProvider, RobotClock};
10use cu29_log_runtime::LoggerRuntime;
11use cu29_traits::CopperListTuple;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_unifiedlog::UnifiedLoggerWrite;
15use std::sync::{Arc, Mutex};
16
17use petgraph::prelude::*;
18use std::fmt::Debug;
19
20/// Just a simple struct to hold the various bits needed to run a Copper application.
21pub struct CopperContext {
22    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
23    pub logger_runtime: LoggerRuntime,
24    pub clock: RobotClock,
25}
26
27/// This is the main structure that will be injected as a member of the Application struct.
28/// CT is the tuple of all the tasks in order of execution.
29/// CL is the type of the copper list, representing the input/output messages for all the tasks.
30pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
31    /// The tuple of all the tasks in order of execution.
32    pub tasks: CT,
33
34    pub monitor: M,
35
36    /// Copper lists hold in order all the input/output messages for all the tasks.
37    pub copper_lists_manager: CuListsManager<P, NBCL>,
38
39    /// The base clock the runtime will be using to record time.
40    pub clock: RobotClock, // TODO: remove public at some point
41
42    /// Logger
43    logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
44}
45
46/// To be able to share the clock we make the runtime a clock provider.
47impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
48    for CuRuntime<CT, P, M, NBCL>
49{
50    fn get_clock(&self) -> RobotClock {
51        self.clock.clone()
52    }
53}
54
55impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
56    pub fn new(
57        clock: RobotClock,
58        config: &CuConfig,
59        tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
60        monitor_instanciator: impl Fn(&CuConfig) -> M,
61        logger: impl WriteStream<CopperList<P>> + 'static,
62    ) -> CuResult<Self> {
63        let all_instances_configs: Vec<Option<&ComponentConfig>> = config
64            .get_all_nodes()
65            .iter()
66            .map(|(_, node)| node.get_instance_config())
67            .collect();
68        let tasks = tasks_instanciator(all_instances_configs)?;
69
70        let monitor = monitor_instanciator(config);
71
72        // Needed to declare type explicitly as `cargo check` was failing without it
73        let logger_: Option<Box<dyn WriteStream<CopperList<P>>>> =
74            if let Some(logging_config) = &config.logging {
75                if logging_config.enable_task_logging {
76                    Some(Box::new(logger))
77                } else {
78                    None
79                }
80            } else {
81                Some(Box::new(logger))
82            };
83
84        let runtime = Self {
85            tasks,
86            monitor,
87            copper_lists_manager: CuListsManager::new(), // placeholder
88            clock,
89            logger: logger_,
90        };
91
92        Ok(runtime)
93    }
94
95    pub fn available_copper_lists(&self) -> usize {
96        NBCL - self.copper_lists_manager.len()
97    }
98
99    pub fn end_of_processing(&mut self, culistid: u32) {
100        let mut is_top = true;
101        let mut nb_done = 0;
102        self.copper_lists_manager.iter_mut().for_each(|cl| {
103            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
104                cl.change_state(CopperListState::DoneProcessing);
105            }
106            // if we have a series of copper lists that are done processing at the top of the circular buffer
107            // serialize them all and Free them.
108            if is_top && cl.get_state() == CopperListState::DoneProcessing {
109                if let Some(logger) = &mut self.logger {
110                    cl.change_state(CopperListState::BeingSerialized);
111                    logger.log(cl).unwrap();
112                }
113                cl.change_state(CopperListState::Free);
114                nb_done += 1;
115            } else {
116                is_top = false;
117            }
118        });
119        for _ in 0..nb_done {
120            let _ = self.copper_lists_manager.pop();
121        }
122    }
123}
124
125/// Copper tasks can be of 3 types:
126/// - Source: only producing output messages (usually used for drivers)
127/// - Regular: processing input messages and producing output messages, more like compute nodes.
128/// - Sink: only consuming input messages (usually used for actuators)
129#[derive(Debug, PartialEq, Eq, Clone, Copy)]
130pub enum CuTaskType {
131    Source,
132    Regular,
133    Sink,
134}
135
136/// This structure represents a step in the execution plan.
137pub struct CuExecutionStep {
138    /// NodeId: node id of the task to execute
139    pub node_id: NodeId,
140    /// Node: node instance
141    pub node: Node,
142    /// CuTaskType: type of the task
143    pub task_type: CuTaskType,
144
145    /// the indices in the copper list of the input messages and their types
146    pub input_msg_indices_types: Vec<(u32, String)>,
147
148    /// the index in the copper list of the output message and its type
149    pub output_msg_index_type: Option<(u32, String)>,
150}
151
152impl Debug for CuExecutionStep {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
155        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
156        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
157        f.write_str(
158            format!(
159                "              input_msg_types: {:?}\n",
160                self.input_msg_indices_types
161            )
162            .as_str(),
163        )?;
164        f.write_str(
165            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
166        )?;
167        Ok(())
168    }
169}
170
171/// This structure represents a loop in the execution plan.
172/// It is used to represent a sequence of Execution units (loop or steps) that are executed
173/// multiple times.
174/// if loop_count is None, the loop is infinite.
175pub struct CuExecutionLoop {
176    pub steps: Vec<CuExecutionUnit>,
177    pub loop_count: Option<u32>,
178}
179
180impl Debug for CuExecutionLoop {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        f.write_str("CuExecutionLoop:\n")?;
183        for step in &self.steps {
184            match step {
185                CuExecutionUnit::Step(step) => {
186                    step.fmt(f)?;
187                }
188                CuExecutionUnit::Loop(l) => {
189                    l.fmt(f)?;
190                }
191            }
192        }
193
194        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
195        Ok(())
196    }
197}
198
199/// This structure represents a step in the execution plan.
200#[derive(Debug)]
201pub enum CuExecutionUnit {
202    Step(CuExecutionStep),
203    Loop(CuExecutionLoop),
204}
205
206fn find_output_index_type_from_nodeid(
207    node_id: NodeId,
208    steps: &Vec<CuExecutionUnit>,
209) -> Option<(u32, String)> {
210    for step in steps {
211        match step {
212            CuExecutionUnit::Loop(loop_unit) => {
213                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
214                    return Some(index);
215                }
216            }
217            CuExecutionUnit::Step(step) => {
218                if step.node_id == node_id {
219                    return step.output_msg_index_type.clone();
220                }
221            }
222        }
223    }
224    None
225}
226
227pub fn find_task_type_for_id(
228    graph: &StableDiGraph<Node, Cnx, NodeId>,
229    node_id: NodeId,
230) -> CuTaskType {
231    if graph.neighbors_directed(node_id.into(), Incoming).count() == 0 {
232        CuTaskType::Source
233    } else if graph.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
234        CuTaskType::Sink
235    } else {
236        CuTaskType::Regular
237    }
238}
239
240/// This function gets the input node by using the input step plan id, to get the edge that
241/// connects the input to the output in the config graph
242fn find_edge_with_plan_input_id(
243    plan: &[CuExecutionUnit],
244    config: &CuConfig,
245    plan_id: u32,
246    output_node_id: NodeId,
247) -> usize {
248    let input_node = plan
249        .get(plan_id as usize)
250        .expect("Input step should've been added to plan before the step that receives the input");
251    let CuExecutionUnit::Step(input_step) = input_node else {
252        panic!("Expected input to be from a step, not a loop");
253    };
254    let input_node_id = input_step.node_id;
255
256    config
257        .graph
258        .edges_connecting(input_node_id.into(), output_node_id.into())
259        .map(|edge| edge.id().index())
260        .next()
261        .expect("An edge connecting the input to the output should exist")
262}
263
264/// The connection id used here is the index of the config graph edge that equates to the wanted
265/// connection
266fn sort_inputs_by_cnx_id(
267    input_msg_indices_types: &mut [(u32, String)],
268    plan: &[CuExecutionUnit],
269    config: &CuConfig,
270    curr_node_id: NodeId,
271) {
272    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
273        let a_edge_id = find_edge_with_plan_input_id(plan, config, *a_index, curr_node_id);
274        let b_edge_id = find_edge_with_plan_input_id(plan, config, *b_index, curr_node_id);
275        a_edge_id.cmp(&b_edge_id)
276    });
277}
278/// Explores a subbranch and build the partial plan out of it.
279fn plan_tasks_tree_branch(
280    config: &CuConfig,
281    mut next_culist_output_index: u32,
282    starting_point: NodeId,
283    plan: &mut Vec<CuExecutionUnit>,
284) -> u32 {
285    // prob not exactly what we want but to get us started
286    let mut visitor = Bfs::new(&config.graph, starting_point.into());
287
288    while let Some(node) = visitor.next(&config.graph) {
289        let id = node.index() as NodeId;
290        let node = config.get_node(id).unwrap();
291
292        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
293        let output_msg_index_type: Option<(u32, String)>;
294
295        let task_type = find_task_type_for_id(&config.graph, id);
296
297        match task_type {
298            CuTaskType::Source => {
299                output_msg_index_type = Some((
300                    next_culist_output_index,
301                    config
302                        .graph
303                        .edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
304                        .unwrap()
305                        .msg
306                        .clone(),
307                ));
308                next_culist_output_index += 1;
309            }
310            CuTaskType::Sink => {
311                let parents: Vec<NodeIndex> = config
312                    .graph
313                    .neighbors_directed(id.into(), Incoming)
314                    .collect();
315
316                for parent in parents {
317                    let index_type =
318                        find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
319                    if let Some(index_type) = index_type {
320                        input_msg_indices_types.push(index_type);
321                    } else {
322                        // here do not add this node yet, wait for the other inputs to do it with all the inputs earliers in the copper list.
323                        return next_culist_output_index;
324                    }
325                }
326                // Here we create an artificial "end node" for this sink to record the metadata associated with it.
327                output_msg_index_type = Some((
328                    next_culist_output_index,
329                    "()".to_string(), // empty type
330                ));
331                next_culist_output_index += 1;
332            }
333            CuTaskType::Regular => {
334                let parents: Vec<NodeIndex> = config
335                    .graph
336                    .neighbors_directed(id.into(), Incoming)
337                    .collect();
338
339                for parent in parents {
340                    let index_type =
341                        find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
342                    if let Some(index_type) = index_type {
343                        input_msg_indices_types.push(index_type);
344                    } else {
345                        // here do not add this node yet, wait for the other inputs to do it with all the inputs earliers in the copper list.
346                        return next_culist_output_index;
347                    }
348                }
349                output_msg_index_type = Some((
350                    next_culist_output_index,
351                    config
352                        .graph
353                        .edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
354                        .unwrap()
355                        .msg
356                        .clone(),
357                ));
358                next_culist_output_index += 1;
359            }
360        }
361
362        // Sort the input messages by index
363        // It means that the tuple presented as input to the merging task
364        // depends on the order of *declaration* in the node section of the config file.
365        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, config, id);
366
367        // Try to see if we did not already add this node to the plan
368        if let Some(pos) = plan.iter().position(|step| {
369            if let CuExecutionUnit::Step(ref s) = step {
370                s.node_id == id
371            } else {
372                false
373            }
374        }) {
375            // modify the existing step and put it back in the plan as the current step as it needs this subsequent output.
376            let mut step = plan.remove(pos);
377            if let CuExecutionUnit::Step(ref mut s) = step {
378                s.input_msg_indices_types = input_msg_indices_types;
379            }
380            plan.push(step);
381        } else {
382            // ok this is just a new step
383            let step = CuExecutionStep {
384                node_id: id,
385                node: node.clone(),
386                task_type,
387                input_msg_indices_types,
388                output_msg_index_type,
389            };
390            plan.push(CuExecutionUnit::Step(step));
391        }
392    }
393    next_culist_output_index
394}
395
396/// This is the main heuristics to compute an execution plan at compilation time.
397/// TODO: Make that heuristic pluggable.
398pub fn compute_runtime_plan(config: &CuConfig) -> CuResult<CuExecutionLoop> {
399    // find all the sources.
400    let nodes_to_visit = config
401        .graph
402        .node_indices()
403        .filter(|node_id| {
404            let id = node_id.index() as NodeId;
405            let task_type = find_task_type_for_id(&config.graph, id);
406            task_type == CuTaskType::Source
407        })
408        .collect::<Vec<NodeIndex>>();
409
410    let mut next_culist_output_index = 0u32;
411    let mut plan: Vec<CuExecutionUnit> = Vec::new();
412
413    for node_index in &nodes_to_visit {
414        next_culist_output_index = plan_tasks_tree_branch(
415            config,
416            next_culist_output_index,
417            node_index.index() as NodeId,
418            &mut plan,
419        );
420    }
421
422    Ok(CuExecutionLoop {
423        steps: plan,
424        loop_count: None, // if in not unit testing, the main loop is infinite
425    })
426}
427
428//tests
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use crate::config::Node;
433    use crate::cutask::CuSinkTask;
434    use crate::cutask::{CuSrcTask, Freezable};
435    use crate::monitoring::NoMonitor;
436    use bincode::Encode;
437
438    pub struct TestSource {}
439
440    impl Freezable for TestSource {}
441
442    impl CuSrcTask<'_> for TestSource {
443        type Output = ();
444        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
445        where
446            Self: Sized,
447        {
448            Ok(Self {})
449        }
450
451        fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
452            Ok(())
453        }
454    }
455
456    pub struct TestSink {}
457
458    impl Freezable for TestSink {}
459
460    impl CuSinkTask<'_> for TestSink {
461        type Input = ();
462
463        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
464        where
465            Self: Sized,
466        {
467            Ok(Self {})
468        }
469
470        fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
471            Ok(())
472        }
473    }
474
475    // Those should be generated by the derive macro
476    type Tasks = (TestSource, TestSink);
477    type Msgs = ((),);
478
479    fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
480        Ok((
481            TestSource::new(all_instances_configs[0])?,
482            TestSink::new(all_instances_configs[1])?,
483        ))
484    }
485
486    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
487        NoMonitor {}
488    }
489
490    #[derive(Debug)]
491    struct FakeWriter {}
492
493    impl<E: Encode> WriteStream<E> for FakeWriter {
494        fn log(&mut self, _obj: &E) -> CuResult<()> {
495            Ok(())
496        }
497    }
498
499    #[test]
500    fn test_runtime_instantiation() {
501        let mut config = CuConfig::default();
502        config.add_node(Node::new("a", "TestSource"));
503        config.add_node(Node::new("b", "TestSink"));
504        config.connect(0, 1, "()");
505        let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
506            RobotClock::default(),
507            &config,
508            tasks_instanciator,
509            monitor_instanciator,
510            FakeWriter {},
511        );
512        assert!(runtime.is_ok());
513    }
514
515    #[test]
516    fn test_copperlists_manager_lifecycle() {
517        let mut config = CuConfig::default();
518        config.add_node(Node::new("a", "TestSource"));
519        config.add_node(Node::new("b", "TestSink"));
520        config.connect(0, 1, "()");
521        let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
522            RobotClock::default(),
523            &config,
524            tasks_instanciator,
525            monitor_instanciator,
526            FakeWriter {},
527        )
528        .unwrap();
529
530        // Now emulates the generated runtime
531        {
532            let copperlists = &mut runtime.copper_lists_manager;
533            let culist0 = copperlists
534                .create()
535                .expect("Ran out of space for copper lists");
536            // FIXME: error handling.
537            let id = culist0.id;
538            assert_eq!(id, 0);
539            culist0.change_state(CopperListState::Processing);
540            assert_eq!(runtime.available_copper_lists(), 1);
541        }
542
543        {
544            let copperlists = &mut runtime.copper_lists_manager;
545            let culist1 = copperlists
546                .create()
547                .expect("Ran out of space for copper lists"); // FIXME: error handling.
548            let id = culist1.id;
549            assert_eq!(id, 1);
550            culist1.change_state(CopperListState::Processing);
551            assert_eq!(runtime.available_copper_lists(), 0);
552        }
553
554        {
555            let copperlists = &mut runtime.copper_lists_manager;
556            let culist2 = copperlists.create();
557            assert!(culist2.is_none());
558            assert_eq!(runtime.available_copper_lists(), 0);
559        }
560
561        // Free in order, should let the top of the stack be serialized and freed.
562        runtime.end_of_processing(1);
563        assert_eq!(runtime.available_copper_lists(), 1);
564
565        // Readd a CL
566        {
567            let copperlists = &mut runtime.copper_lists_manager;
568            let culist2 = copperlists
569                .create()
570                .expect("Ran out of space for copper lists"); // FIXME: error handling.
571            let id = culist2.id;
572            assert_eq!(id, 2);
573            culist2.change_state(CopperListState::Processing);
574            assert_eq!(runtime.available_copper_lists(), 0);
575        }
576
577        // Free out of order, the #0 first
578        runtime.end_of_processing(0);
579        // Should not free up the top of the stack
580        assert_eq!(runtime.available_copper_lists(), 0);
581
582        // Free up the top of the stack
583        runtime.end_of_processing(2);
584        // This should free up 2 CLs
585
586        assert_eq!(runtime.available_copper_lists(), 2);
587    }
588
589    #[test]
590    fn test_runtime_task_input_order() {
591        let mut config = CuConfig::default();
592        let src1_id = config.add_node(Node::new("a", "Source1"));
593        let src2_id = config.add_node(Node::new("b", "Source2"));
594        let sink_id = config.add_node(Node::new("c", "Sink"));
595
596        assert_eq!(src1_id, 0);
597        assert_eq!(src2_id, 1);
598
599        // note that the source2 connection is before the source1
600        let src1_type = "src1_type";
601        let src2_type = "src2_type";
602        config.connect(src2_id, sink_id, src2_type);
603        config.connect(src1_id, sink_id, src1_type);
604
605        let src1_edge_id = *config.get_src_edges(src1_id).first().unwrap();
606        let src2_edge_id = *config.get_src_edges(src2_id).first().unwrap();
607        // the edge id depends on the order the connection is created, not
608        // on the node id, and that is what determines the input order
609        assert_eq!(src1_edge_id, 1);
610        assert_eq!(src2_edge_id, 0);
611
612        let runtime = compute_runtime_plan(&config).unwrap();
613        let sink_step = runtime
614            .steps
615            .iter()
616            .find_map(|step| match step {
617                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
618                _ => None,
619            })
620            .unwrap();
621
622        // since the src2 connection was added before src1 connection, the src2 type should be
623        // first
624        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
625        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
626    }
627}