rart_rs/common/
executor.rs

1use core::task::Context;
2use core::task::Poll;
3use crate::common::arc::Arc;
4use crate::common::blocking_channel::BlockingChannel;
5use crate::common::task::TaskId;
6use crate::common::waker::{rart_waker_into_waker, RARTWaker};
7use crate::common::task::Task;
8use heapless::Vec;
9use crate::common::result::RARTError;
10use crate::safe_log;
11#[cfg(not(feature = "std"))]
12use crate::no_std::log_fn;
13#[cfg(not(feature = "std"))]
14use const_format::formatcp;
15
16pub struct Executor<const N: usize> {
17    tasks: [&'static Task; N],
18    active_tasks: BlockingChannel<TaskId>,
19}
20
21impl<const N: usize> Executor<N> {
22    pub fn new(tasks: Vec<&'static Task, N>) -> Self {
23        if let Ok(tasks) = tasks.as_slice().try_into() {
24            Executor {
25                tasks,
26                active_tasks: BlockingChannel::new(),
27            }
28        } else {
29            panic!("Mismatch the number of tasks")
30        }
31    }
32
33    fn task_by_id(&self, id: TaskId) -> Option<&'static Task> {
34        if let Some(task) = self.tasks.iter().find(|&&x| x.id() == id) {
35            Some(*task)
36        } else {
37            None
38        }
39    }
40
41    pub fn run(&self) -> Result<(), RARTError> {
42        {
43            let guard = self.active_tasks.new_sender();
44            let temp_sender = guard.lock()?;
45            for task in self.tasks.iter() {
46                temp_sender.send(task.id())?;
47            }
48        }
49
50        let guard = self.active_tasks.new_receiver();
51        let receiver = guard.lock()?;
52        let mut end_task = 0;
53
54        'main_loop: loop {
55            if end_task == self.tasks.len() {
56                break 'main_loop;
57            }
58
59            let task_id = receiver.recv()?;
60
61            if let Some(task) = self.task_by_id(task_id) {
62
63                // TODO Explain why this is safe
64                unsafe {
65                    let future_slot = &mut *task.future().get();
66
67                    if let Some(mut future) = future_slot.take() {
68                        let rart_waker = Arc::new(
69                            RARTWaker::new(
70                                task.id(),
71                                self.active_tasks.new_sender(),
72                            )
73                        );
74                        let waker = rart_waker_into_waker(Arc::into_raw(rart_waker));
75                        let context = &mut Context::from_waker(&waker);
76
77                        match future.as_mut().poll(context) {
78                            Poll::Pending => *future_slot = Some(future),
79                            Poll::Ready(res) => {
80                                end_task += 1;
81                                match res {
82                                    Ok(_) => safe_log!("Task end with success"),
83                                    Err(_) => safe_log!("Task end with error"),
84                                }
85                            }
86                        }
87                    }
88                }
89            }
90        }
91
92        Ok(())
93    }
94}