1use crate::config::{Cnx, CuConfig, NodeId};
6use crate::config::{ComponentConfig, Node};
7use crate::copperlist::{CopperList, CopperListState, CuListsManager};
8use crate::monitoring::CuMonitor;
9use cu29_clock::{ClockProvider, RobotClock};
10use cu29_log_runtime::LoggerRuntime;
11use cu29_traits::CopperListTuple;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_unifiedlog::UnifiedLoggerWrite;
15use std::sync::{Arc, Mutex};
16
17use petgraph::prelude::*;
18use std::fmt::Debug;
19
20pub struct CopperContext {
22 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
23 pub logger_runtime: LoggerRuntime,
24 pub clock: RobotClock,
25}
26
27pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
31 pub tasks: CT,
33
34 pub monitor: M,
35
36 pub copper_lists_manager: CuListsManager<P, NBCL>,
38
39 pub clock: RobotClock, logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
44}
45
46impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
48 for CuRuntime<CT, P, M, NBCL>
49{
50 fn get_clock(&self) -> RobotClock {
51 self.clock.clone()
52 }
53}
54
55impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
56 pub fn new(
57 clock: RobotClock,
58 config: &CuConfig,
59 tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
60 monitor_instanciator: impl Fn(&CuConfig) -> M,
61 logger: impl WriteStream<CopperList<P>> + 'static,
62 ) -> CuResult<Self> {
63 let all_instances_configs: Vec<Option<&ComponentConfig>> = config
64 .get_all_nodes()
65 .iter()
66 .map(|(_, node)| node.get_instance_config())
67 .collect();
68 let tasks = tasks_instanciator(all_instances_configs)?;
69
70 let monitor = monitor_instanciator(config);
71
72 let logger_: Option<Box<dyn WriteStream<CopperList<P>>>> =
74 if let Some(logging_config) = &config.logging {
75 if logging_config.enable_task_logging {
76 Some(Box::new(logger))
77 } else {
78 None
79 }
80 } else {
81 Some(Box::new(logger))
82 };
83
84 let runtime = Self {
85 tasks,
86 monitor,
87 copper_lists_manager: CuListsManager::new(), clock,
89 logger: logger_,
90 };
91
92 Ok(runtime)
93 }
94
95 pub fn available_copper_lists(&self) -> usize {
96 NBCL - self.copper_lists_manager.len()
97 }
98
99 pub fn end_of_processing(&mut self, culistid: u32) {
100 let mut is_top = true;
101 let mut nb_done = 0;
102 self.copper_lists_manager.iter_mut().for_each(|cl| {
103 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
104 cl.change_state(CopperListState::DoneProcessing);
105 }
106 if is_top && cl.get_state() == CopperListState::DoneProcessing {
109 if let Some(logger) = &mut self.logger {
110 cl.change_state(CopperListState::BeingSerialized);
111 logger.log(cl).unwrap();
112 }
113 cl.change_state(CopperListState::Free);
114 nb_done += 1;
115 } else {
116 is_top = false;
117 }
118 });
119 for _ in 0..nb_done {
120 let _ = self.copper_lists_manager.pop();
121 }
122 }
123}
124
125#[derive(Debug, PartialEq, Eq, Clone, Copy)]
130pub enum CuTaskType {
131 Source,
132 Regular,
133 Sink,
134}
135
136pub struct CuExecutionStep {
138 pub node_id: NodeId,
140 pub node: Node,
142 pub task_type: CuTaskType,
144
145 pub input_msg_indices_types: Vec<(u32, String)>,
147
148 pub output_msg_index_type: Option<(u32, String)>,
150}
151
152impl Debug for CuExecutionStep {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
155 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
156 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
157 f.write_str(
158 format!(
159 " input_msg_types: {:?}\n",
160 self.input_msg_indices_types
161 )
162 .as_str(),
163 )?;
164 f.write_str(
165 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
166 )?;
167 Ok(())
168 }
169}
170
171pub struct CuExecutionLoop {
176 pub steps: Vec<CuExecutionUnit>,
177 pub loop_count: Option<u32>,
178}
179
180impl Debug for CuExecutionLoop {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 f.write_str("CuExecutionLoop:\n")?;
183 for step in &self.steps {
184 match step {
185 CuExecutionUnit::Step(step) => {
186 step.fmt(f)?;
187 }
188 CuExecutionUnit::Loop(l) => {
189 l.fmt(f)?;
190 }
191 }
192 }
193
194 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
195 Ok(())
196 }
197}
198
199#[derive(Debug)]
201pub enum CuExecutionUnit {
202 Step(CuExecutionStep),
203 Loop(CuExecutionLoop),
204}
205
206fn find_output_index_type_from_nodeid(
207 node_id: NodeId,
208 steps: &Vec<CuExecutionUnit>,
209) -> Option<(u32, String)> {
210 for step in steps {
211 match step {
212 CuExecutionUnit::Loop(loop_unit) => {
213 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
214 return Some(index);
215 }
216 }
217 CuExecutionUnit::Step(step) => {
218 if step.node_id == node_id {
219 return step.output_msg_index_type.clone();
220 }
221 }
222 }
223 }
224 None
225}
226
227pub fn find_task_type_for_id(
228 graph: &StableDiGraph<Node, Cnx, NodeId>,
229 node_id: NodeId,
230) -> CuTaskType {
231 if graph.neighbors_directed(node_id.into(), Incoming).count() == 0 {
232 CuTaskType::Source
233 } else if graph.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
234 CuTaskType::Sink
235 } else {
236 CuTaskType::Regular
237 }
238}
239
240fn plan_tasks_tree_branch(
242 config: &CuConfig,
243 mut next_culist_output_index: u32,
244 starting_point: NodeId,
245 plan: &mut Vec<CuExecutionUnit>,
246) -> u32 {
247 let mut visitor = Bfs::new(&config.graph, starting_point.into());
249
250 while let Some(node) = visitor.next(&config.graph) {
251 let id = node.index() as NodeId;
252 let node = config.get_node(id).unwrap();
253
254 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
255 let output_msg_index_type: Option<(u32, String)>;
256
257 let task_type = find_task_type_for_id(&config.graph, id);
258
259 match task_type {
260 CuTaskType::Source => {
261 output_msg_index_type = Some((
262 next_culist_output_index,
263 config
264 .graph
265 .edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
266 .unwrap()
267 .msg
268 .clone(),
269 ));
270 next_culist_output_index += 1;
271 }
272 CuTaskType::Sink => {
273 let parents: Vec<NodeIndex> = config
274 .graph
275 .neighbors_directed(id.into(), Incoming)
276 .collect();
277
278 for parent in parents {
279 let index_type =
280 find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
281 if let Some(index_type) = index_type {
282 input_msg_indices_types.push(index_type);
283 } else {
284 return next_culist_output_index;
286 }
287 }
288 output_msg_index_type = Some((
290 next_culist_output_index,
291 "()".to_string(), ));
293 next_culist_output_index += 1;
294 }
295 CuTaskType::Regular => {
296 let parents: Vec<NodeIndex> = config
297 .graph
298 .neighbors_directed(id.into(), Incoming)
299 .collect();
300
301 for parent in parents {
302 let index_type =
303 find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
304 if let Some(index_type) = index_type {
305 input_msg_indices_types.push(index_type);
306 } else {
307 return next_culist_output_index;
309 }
310 }
311 output_msg_index_type = Some((
312 next_culist_output_index,
313 config
314 .graph
315 .edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
316 .unwrap()
317 .msg
318 .clone(),
319 ));
320 next_culist_output_index += 1;
321 }
322 }
323
324 input_msg_indices_types.sort_by(|a, b| a.0.cmp(&b.0));
328
329 if let Some(pos) = plan.iter().position(|step| {
331 if let CuExecutionUnit::Step(ref s) = step {
332 s.node_id == id
333 } else {
334 false
335 }
336 }) {
337 let mut step = plan.remove(pos);
339 if let CuExecutionUnit::Step(ref mut s) = step {
340 s.input_msg_indices_types = input_msg_indices_types;
341 }
342 plan.push(step);
343 } else {
344 let step = CuExecutionStep {
346 node_id: id,
347 node: node.clone(),
348 task_type,
349 input_msg_indices_types,
350 output_msg_index_type,
351 };
352 plan.push(CuExecutionUnit::Step(step));
353 }
354 }
355 next_culist_output_index
356}
357
358pub fn compute_runtime_plan(config: &CuConfig) -> CuResult<CuExecutionLoop> {
361 let nodes_to_visit = config
363 .graph
364 .node_indices()
365 .filter(|node_id| {
366 let id = node_id.index() as NodeId;
367 let task_type = find_task_type_for_id(&config.graph, id);
368 task_type == CuTaskType::Source
369 })
370 .collect::<Vec<NodeIndex>>();
371
372 let mut next_culist_output_index = 0u32;
373 let mut plan: Vec<CuExecutionUnit> = Vec::new();
374
375 for node_index in &nodes_to_visit {
376 next_culist_output_index = plan_tasks_tree_branch(
377 config,
378 next_culist_output_index,
379 node_index.index() as NodeId,
380 &mut plan,
381 );
382 }
383
384 Ok(CuExecutionLoop {
385 steps: plan,
386 loop_count: None, })
388}
389
390#[cfg(test)]
392mod tests {
393 use super::*;
394 use crate::config::Node;
395 use crate::cutask::CuSinkTask;
396 use crate::cutask::{CuSrcTask, Freezable};
397 use crate::monitoring::NoMonitor;
398 use bincode::Encode;
399
400 pub struct TestSource {}
401
402 impl Freezable for TestSource {}
403
404 impl CuSrcTask<'_> for TestSource {
405 type Output = ();
406 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
407 where
408 Self: Sized,
409 {
410 Ok(Self {})
411 }
412
413 fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
414 Ok(())
415 }
416 }
417
418 pub struct TestSink {}
419
420 impl Freezable for TestSink {}
421
422 impl CuSinkTask<'_> for TestSink {
423 type Input = ();
424
425 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
426 where
427 Self: Sized,
428 {
429 Ok(Self {})
430 }
431
432 fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
433 Ok(())
434 }
435 }
436
437 type Tasks = (TestSource, TestSink);
439 type Msgs = ((),);
440
441 fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
442 Ok((
443 TestSource::new(all_instances_configs[0])?,
444 TestSink::new(all_instances_configs[1])?,
445 ))
446 }
447
448 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
449 NoMonitor {}
450 }
451
452 #[derive(Debug)]
453 struct FakeWriter {}
454
455 impl<E: Encode> WriteStream<E> for FakeWriter {
456 fn log(&mut self, _obj: &E) -> CuResult<()> {
457 Ok(())
458 }
459 }
460
461 #[test]
462 fn test_runtime_instantiation() {
463 let mut config = CuConfig::default();
464 config.add_node(Node::new("a", "TestSource"));
465 config.add_node(Node::new("b", "TestSink"));
466 config.connect(0, 1, "()");
467 let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
468 RobotClock::default(),
469 &config,
470 tasks_instanciator,
471 monitor_instanciator,
472 FakeWriter {},
473 );
474 assert!(runtime.is_ok());
475 }
476
477 #[test]
478 fn test_copperlists_manager_lifecycle() {
479 let mut config = CuConfig::default();
480 config.add_node(Node::new("a", "TestSource"));
481 config.add_node(Node::new("b", "TestSink"));
482 config.connect(0, 1, "()");
483 let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
484 RobotClock::default(),
485 &config,
486 tasks_instanciator,
487 monitor_instanciator,
488 FakeWriter {},
489 )
490 .unwrap();
491
492 {
494 let copperlists = &mut runtime.copper_lists_manager;
495 let culist0 = copperlists
496 .create()
497 .expect("Ran out of space for copper lists");
498 let id = culist0.id;
500 assert_eq!(id, 0);
501 culist0.change_state(CopperListState::Processing);
502 assert_eq!(runtime.available_copper_lists(), 1);
503 }
504
505 {
506 let copperlists = &mut runtime.copper_lists_manager;
507 let culist1 = copperlists
508 .create()
509 .expect("Ran out of space for copper lists"); let id = culist1.id;
511 assert_eq!(id, 1);
512 culist1.change_state(CopperListState::Processing);
513 assert_eq!(runtime.available_copper_lists(), 0);
514 }
515
516 {
517 let copperlists = &mut runtime.copper_lists_manager;
518 let culist2 = copperlists.create();
519 assert!(culist2.is_none());
520 assert_eq!(runtime.available_copper_lists(), 0);
521 }
522
523 runtime.end_of_processing(1);
525 assert_eq!(runtime.available_copper_lists(), 1);
526
527 {
529 let copperlists = &mut runtime.copper_lists_manager;
530 let culist2 = copperlists
531 .create()
532 .expect("Ran out of space for copper lists"); let id = culist2.id;
534 assert_eq!(id, 2);
535 culist2.change_state(CopperListState::Processing);
536 assert_eq!(runtime.available_copper_lists(), 0);
537 }
538
539 runtime.end_of_processing(0);
541 assert_eq!(runtime.available_copper_lists(), 0);
543
544 runtime.end_of_processing(2);
546 assert_eq!(runtime.available_copper_lists(), 2);
549 }
550}