cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{Cnx, CuConfig, NodeId};
6use crate::config::{ComponentConfig, Node};
7use crate::copperlist::{CopperList, CopperListState, CuListsManager};
8use crate::monitoring::CuMonitor;
9use cu29_clock::{ClockProvider, RobotClock};
10use cu29_log_runtime::LoggerRuntime;
11use cu29_traits::CopperListTuple;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_unifiedlog::UnifiedLoggerWrite;
15use std::sync::{Arc, Mutex};
16
17use petgraph::prelude::*;
18use std::fmt::Debug;
19
20/// Just a simple struct to hold the various bits needed to run a Copper application.
21pub struct CopperContext {
22    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
23    pub logger_runtime: LoggerRuntime,
24    pub clock: RobotClock,
25}
26
27/// This is the main structure that will be injected as a member of the Application struct.
28/// CT is the tuple of all the tasks in order of execution.
29/// CL is the type of the copper list, representing the input/output messages for all the tasks.
30pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
31    /// The tuple of all the tasks in order of execution.
32    pub tasks: CT,
33
34    pub monitor: M,
35
36    /// Copper lists hold in order all the input/output messages for all the tasks.
37    pub copper_lists_manager: CuListsManager<P, NBCL>,
38
39    /// The base clock the runtime will be using to record time.
40    pub clock: RobotClock, // TODO: remove public at some point
41
42    /// Logger
43    logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
44}
45
46/// To be able to share the clock we make the runtime a clock provider.
47impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
48    for CuRuntime<CT, P, M, NBCL>
49{
50    fn get_clock(&self) -> RobotClock {
51        self.clock.clone()
52    }
53}
54
55impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
56    pub fn new(
57        clock: RobotClock,
58        config: &CuConfig,
59        tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
60        monitor_instanciator: impl Fn(&CuConfig) -> M,
61        logger: impl WriteStream<CopperList<P>> + 'static,
62    ) -> CuResult<Self> {
63        let all_instances_configs: Vec<Option<&ComponentConfig>> = config
64            .get_all_nodes()
65            .iter()
66            .map(|(_, node)| node.get_instance_config())
67            .collect();
68        let tasks = tasks_instanciator(all_instances_configs)?;
69
70        let monitor = monitor_instanciator(config);
71
72        // Needed to declare type explicitly as `cargo check` was failing without it
73        let logger_: Option<Box<dyn WriteStream<CopperList<P>>>> =
74            if let Some(logging_config) = &config.logging {
75                if logging_config.enable_task_logging {
76                    Some(Box::new(logger))
77                } else {
78                    None
79                }
80            } else {
81                Some(Box::new(logger))
82            };
83
84        let runtime = Self {
85            tasks,
86            monitor,
87            copper_lists_manager: CuListsManager::new(), // placeholder
88            clock,
89            logger: logger_,
90        };
91
92        Ok(runtime)
93    }
94
95    pub fn available_copper_lists(&self) -> usize {
96        NBCL - self.copper_lists_manager.len()
97    }
98
99    pub fn end_of_processing(&mut self, culistid: u32) {
100        let mut is_top = true;
101        let mut nb_done = 0;
102        self.copper_lists_manager.iter_mut().for_each(|cl| {
103            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
104                cl.change_state(CopperListState::DoneProcessing);
105            }
106            // if we have a series of copper lists that are done processing at the top of the circular buffer
107            // serialize them all and Free them.
108            if is_top && cl.get_state() == CopperListState::DoneProcessing {
109                if let Some(logger) = &mut self.logger {
110                    cl.change_state(CopperListState::BeingSerialized);
111                    logger.log(cl).unwrap();
112                }
113                cl.change_state(CopperListState::Free);
114                nb_done += 1;
115            } else {
116                is_top = false;
117            }
118        });
119        for _ in 0..nb_done {
120            let _ = self.copper_lists_manager.pop();
121        }
122    }
123}
124
125/// Copper tasks can be of 3 types:
126/// - Source: only producing output messages (usually used for drivers)
127/// - Regular: processing input messages and producing output messages, more like compute nodes.
128/// - Sink: only consuming input messages (usually used for actuators)
129#[derive(Debug, PartialEq, Eq, Clone, Copy)]
130pub enum CuTaskType {
131    Source,
132    Regular,
133    Sink,
134}
135
136/// This structure represents a step in the execution plan.
137pub struct CuExecutionStep {
138    /// NodeId: node id of the task to execute
139    pub node_id: NodeId,
140    /// Node: node instance
141    pub node: Node,
142    /// CuTaskType: type of the task
143    pub task_type: CuTaskType,
144
145    /// the indices in the copper list of the input messages and their types
146    pub input_msg_indices_types: Vec<(u32, String)>,
147
148    /// the index in the copper list of the output message and its type
149    pub output_msg_index_type: Option<(u32, String)>,
150}
151
152impl Debug for CuExecutionStep {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
155        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
156        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
157        f.write_str(
158            format!(
159                "              input_msg_types: {:?}\n",
160                self.input_msg_indices_types
161            )
162            .as_str(),
163        )?;
164        f.write_str(
165            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
166        )?;
167        Ok(())
168    }
169}
170
171/// This structure represents a loop in the execution plan.
172/// It is used to represent a sequence of Execution units (loop or steps) that are executed
173/// multiple times.
174/// if loop_count is None, the loop is infinite.
175pub struct CuExecutionLoop {
176    pub steps: Vec<CuExecutionUnit>,
177    pub loop_count: Option<u32>,
178}
179
180impl Debug for CuExecutionLoop {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        f.write_str("CuExecutionLoop:\n")?;
183        for step in &self.steps {
184            match step {
185                CuExecutionUnit::Step(step) => {
186                    step.fmt(f)?;
187                }
188                CuExecutionUnit::Loop(l) => {
189                    l.fmt(f)?;
190                }
191            }
192        }
193
194        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
195        Ok(())
196    }
197}
198
199/// This structure represents a step in the execution plan.
200#[derive(Debug)]
201pub enum CuExecutionUnit {
202    Step(CuExecutionStep),
203    Loop(CuExecutionLoop),
204}
205
206fn find_output_index_type_from_nodeid(
207    node_id: NodeId,
208    steps: &Vec<CuExecutionUnit>,
209) -> Option<(u32, String)> {
210    for step in steps {
211        match step {
212            CuExecutionUnit::Loop(loop_unit) => {
213                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
214                    return Some(index);
215                }
216            }
217            CuExecutionUnit::Step(step) => {
218                if step.node_id == node_id {
219                    return step.output_msg_index_type.clone();
220                }
221            }
222        }
223    }
224    None
225}
226
227pub fn find_task_type_for_id(
228    graph: &StableDiGraph<Node, Cnx, NodeId>,
229    node_id: NodeId,
230) -> CuTaskType {
231    if graph.neighbors_directed(node_id.into(), Incoming).count() == 0 {
232        CuTaskType::Source
233    } else if graph.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
234        CuTaskType::Sink
235    } else {
236        CuTaskType::Regular
237    }
238}
239
240/// Explores a subbranch and build the partial plan out of it.
241fn plan_tasks_tree_branch(
242    config: &CuConfig,
243    mut next_culist_output_index: u32,
244    starting_point: NodeId,
245    plan: &mut Vec<CuExecutionUnit>,
246) -> u32 {
247    // prob not exactly what we want but to get us started
248    let mut visitor = Bfs::new(&config.graph, starting_point.into());
249
250    while let Some(node) = visitor.next(&config.graph) {
251        let id = node.index() as NodeId;
252        let node = config.get_node(id).unwrap();
253
254        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
255        let output_msg_index_type: Option<(u32, String)>;
256
257        let task_type = find_task_type_for_id(&config.graph, id);
258
259        match task_type {
260            CuTaskType::Source => {
261                output_msg_index_type = Some((
262                    next_culist_output_index,
263                    config
264                        .graph
265                        .edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
266                        .unwrap()
267                        .msg
268                        .clone(),
269                ));
270                next_culist_output_index += 1;
271            }
272            CuTaskType::Sink => {
273                let parents: Vec<NodeIndex> = config
274                    .graph
275                    .neighbors_directed(id.into(), Incoming)
276                    .collect();
277
278                for parent in parents {
279                    let index_type =
280                        find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
281                    if let Some(index_type) = index_type {
282                        input_msg_indices_types.push(index_type);
283                    } else {
284                        // here do not add this node yet, wait for the other inputs to do it with all the inputs earliers in the copper list.
285                        return next_culist_output_index;
286                    }
287                }
288                // Here we create an artificial "end node" for this sink to record the metadata associated with it.
289                output_msg_index_type = Some((
290                    next_culist_output_index,
291                    "()".to_string(), // empty type
292                ));
293                next_culist_output_index += 1;
294            }
295            CuTaskType::Regular => {
296                let parents: Vec<NodeIndex> = config
297                    .graph
298                    .neighbors_directed(id.into(), Incoming)
299                    .collect();
300
301                for parent in parents {
302                    let index_type =
303                        find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
304                    if let Some(index_type) = index_type {
305                        input_msg_indices_types.push(index_type);
306                    } else {
307                        // here do not add this node yet, wait for the other inputs to do it with all the inputs earliers in the copper list.
308                        return next_culist_output_index;
309                    }
310                }
311                output_msg_index_type = Some((
312                    next_culist_output_index,
313                    config
314                        .graph
315                        .edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
316                        .unwrap()
317                        .msg
318                        .clone(),
319                ));
320                next_culist_output_index += 1;
321            }
322        }
323
324        // Sort the input messages by index
325        // It means that the tuple presented as input to the merging task
326        // depends on the order of *declaration* in the node section of the config file.
327        input_msg_indices_types.sort_by(|a, b| a.0.cmp(&b.0));
328
329        // Try to see if we did not already add this node to the plan
330        if let Some(pos) = plan.iter().position(|step| {
331            if let CuExecutionUnit::Step(ref s) = step {
332                s.node_id == id
333            } else {
334                false
335            }
336        }) {
337            // modify the existing step and put it back in the plan as the current step as it needs this subsequent output.
338            let mut step = plan.remove(pos);
339            if let CuExecutionUnit::Step(ref mut s) = step {
340                s.input_msg_indices_types = input_msg_indices_types;
341            }
342            plan.push(step);
343        } else {
344            // ok this is just a new step
345            let step = CuExecutionStep {
346                node_id: id,
347                node: node.clone(),
348                task_type,
349                input_msg_indices_types,
350                output_msg_index_type,
351            };
352            plan.push(CuExecutionUnit::Step(step));
353        }
354    }
355    next_culist_output_index
356}
357
358/// This is the main heuristics to compute an execution plan at compilation time.
359/// TODO: Make that heuristic pluggable.
360pub fn compute_runtime_plan(config: &CuConfig) -> CuResult<CuExecutionLoop> {
361    // find all the sources.
362    let nodes_to_visit = config
363        .graph
364        .node_indices()
365        .filter(|node_id| {
366            let id = node_id.index() as NodeId;
367            let task_type = find_task_type_for_id(&config.graph, id);
368            task_type == CuTaskType::Source
369        })
370        .collect::<Vec<NodeIndex>>();
371
372    let mut next_culist_output_index = 0u32;
373    let mut plan: Vec<CuExecutionUnit> = Vec::new();
374
375    for node_index in &nodes_to_visit {
376        next_culist_output_index = plan_tasks_tree_branch(
377            config,
378            next_culist_output_index,
379            node_index.index() as NodeId,
380            &mut plan,
381        );
382    }
383
384    Ok(CuExecutionLoop {
385        steps: plan,
386        loop_count: None, // if in not unit testing, the main loop is infinite
387    })
388}
389
390//tests
391#[cfg(test)]
392mod tests {
393    use super::*;
394    use crate::config::Node;
395    use crate::cutask::CuSinkTask;
396    use crate::cutask::{CuSrcTask, Freezable};
397    use crate::monitoring::NoMonitor;
398    use bincode::Encode;
399
400    pub struct TestSource {}
401
402    impl Freezable for TestSource {}
403
404    impl CuSrcTask<'_> for TestSource {
405        type Output = ();
406        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
407        where
408            Self: Sized,
409        {
410            Ok(Self {})
411        }
412
413        fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
414            Ok(())
415        }
416    }
417
418    pub struct TestSink {}
419
420    impl Freezable for TestSink {}
421
422    impl CuSinkTask<'_> for TestSink {
423        type Input = ();
424
425        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
426        where
427            Self: Sized,
428        {
429            Ok(Self {})
430        }
431
432        fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
433            Ok(())
434        }
435    }
436
437    // Those should be generated by the derive macro
438    type Tasks = (TestSource, TestSink);
439    type Msgs = ((),);
440
441    fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
442        Ok((
443            TestSource::new(all_instances_configs[0])?,
444            TestSink::new(all_instances_configs[1])?,
445        ))
446    }
447
448    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
449        NoMonitor {}
450    }
451
452    #[derive(Debug)]
453    struct FakeWriter {}
454
455    impl<E: Encode> WriteStream<E> for FakeWriter {
456        fn log(&mut self, _obj: &E) -> CuResult<()> {
457            Ok(())
458        }
459    }
460
461    #[test]
462    fn test_runtime_instantiation() {
463        let mut config = CuConfig::default();
464        config.add_node(Node::new("a", "TestSource"));
465        config.add_node(Node::new("b", "TestSink"));
466        config.connect(0, 1, "()");
467        let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
468            RobotClock::default(),
469            &config,
470            tasks_instanciator,
471            monitor_instanciator,
472            FakeWriter {},
473        );
474        assert!(runtime.is_ok());
475    }
476
477    #[test]
478    fn test_copperlists_manager_lifecycle() {
479        let mut config = CuConfig::default();
480        config.add_node(Node::new("a", "TestSource"));
481        config.add_node(Node::new("b", "TestSink"));
482        config.connect(0, 1, "()");
483        let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
484            RobotClock::default(),
485            &config,
486            tasks_instanciator,
487            monitor_instanciator,
488            FakeWriter {},
489        )
490        .unwrap();
491
492        // Now emulates the generated runtime
493        {
494            let copperlists = &mut runtime.copper_lists_manager;
495            let culist0 = copperlists
496                .create()
497                .expect("Ran out of space for copper lists");
498            // FIXME: error handling.
499            let id = culist0.id;
500            assert_eq!(id, 0);
501            culist0.change_state(CopperListState::Processing);
502            assert_eq!(runtime.available_copper_lists(), 1);
503        }
504
505        {
506            let copperlists = &mut runtime.copper_lists_manager;
507            let culist1 = copperlists
508                .create()
509                .expect("Ran out of space for copper lists"); // FIXME: error handling.
510            let id = culist1.id;
511            assert_eq!(id, 1);
512            culist1.change_state(CopperListState::Processing);
513            assert_eq!(runtime.available_copper_lists(), 0);
514        }
515
516        {
517            let copperlists = &mut runtime.copper_lists_manager;
518            let culist2 = copperlists.create();
519            assert!(culist2.is_none());
520            assert_eq!(runtime.available_copper_lists(), 0);
521        }
522
523        // Free in order, should let the top of the stack be serialized and freed.
524        runtime.end_of_processing(1);
525        assert_eq!(runtime.available_copper_lists(), 1);
526
527        // Readd a CL
528        {
529            let copperlists = &mut runtime.copper_lists_manager;
530            let culist2 = copperlists
531                .create()
532                .expect("Ran out of space for copper lists"); // FIXME: error handling.
533            let id = culist2.id;
534            assert_eq!(id, 2);
535            culist2.change_state(CopperListState::Processing);
536            assert_eq!(runtime.available_copper_lists(), 0);
537        }
538
539        // Free out of order, the #0 first
540        runtime.end_of_processing(0);
541        // Should not free up the top of the stack
542        assert_eq!(runtime.available_copper_lists(), 0);
543
544        // Free up the top of the stack
545        runtime.end_of_processing(2);
546        // This should free up 2 CLs
547
548        assert_eq!(runtime.available_copper_lists(), 2);
549    }
550}