roboplc/
supervisor.rs

1use std::collections::{btree_map, BTreeMap};
2use std::{mem, thread};
3
4use serde::Serialize;
5
6use crate::thread_rt::{Builder, ScopedTask, Task};
7use crate::time::Interval;
8use crate::{Error, Result};
9
10/// The supervisor prelude
11pub mod prelude {
12    pub use super::Supervisor;
13    pub use crate::thread_rt::{Builder, Scheduling};
14}
15
16/// A supervisor object used to manage tasks spawned with [`Builder`]
17#[derive(Serialize)]
18pub struct Supervisor<T> {
19    tasks: BTreeMap<String, Task<T>>,
20}
21
22impl<T> Default for Supervisor<T> {
23    fn default() -> Self {
24        Self {
25            tasks: <_>::default(),
26        }
27    }
28}
29
30macro_rules! vacant_entry {
31    ($self:ident, $builder:ident) => {{
32        let Some(name) = $builder.name.clone() else {
33            return Err(Error::SupervisorNameNotSpecified);
34        };
35        let btree_map::Entry::Vacant(entry) = $self.tasks.entry(name.clone()) else {
36            return Err(Error::SupervisorDuplicateTask(name));
37        };
38        entry
39    }};
40}
41
42impl<T> Supervisor<T> {
43    /// Creates a new supervisor object
44    pub fn new() -> Self {
45        Self::default()
46    }
47    /// Spawns a new task using a [`Builder`] object and registers it. The task name MUST be unique
48    /// and SHOULD be 15 characters or less to set a proper thread name
49    pub fn spawn<F, B>(&mut self, builder: B, f: F) -> Result<&Task<T>>
50    where
51        B: Into<Builder>,
52        F: FnOnce() -> T + Send + 'static,
53        T: Send + 'static,
54    {
55        let builder = builder.into();
56        let entry = vacant_entry!(self, builder);
57        let task = builder.spawn(f)?;
58        Ok(entry.insert(task))
59    }
60    /// Spawns a new periodic task using a [`Builder`] object and registers it. The task name MUST
61    /// be unique and SHOULD be 15 characters or less to set a proper thread name
62    pub fn spawn_periodic<F, B>(&mut self, builder: B, f: F, interval: Interval) -> Result<&Task<T>>
63    where
64        F: Fn() -> T + Send + 'static,
65        T: Send + 'static,
66        B: Into<Builder>,
67    {
68        let builder = builder.into();
69        let entry = vacant_entry!(self, builder);
70        let task = builder.spawn_periodic(f, interval)?;
71        Ok(entry.insert(task))
72    }
73    /// Gets a task by its name
74    pub fn get_task(&self, name: &str) -> Option<&Task<T>> {
75        self.tasks.get(name)
76    }
77    /// Gets a task by its name as a mutable object
78    pub fn get_task_mut(&mut self, name: &str) -> Option<&mut Task<T>> {
79        self.tasks.get_mut(name)
80    }
81    /// Takes a task by its name and removes it from the internal registry
82    pub fn take_task(&mut self, name: &str) -> Option<Task<T>> {
83        self.tasks.remove(name)
84    }
85    /// Removes a task from the internal registry
86    pub fn forget_task(&mut self, name: &str) -> Result<()> {
87        if self.tasks.remove(name).is_some() {
88            Ok(())
89        } else {
90            Err(Error::SupervisorTaskNotFound)
91        }
92    }
93    /// Removes all finished tasks from the internal registry
94    pub fn purge(&mut self) {
95        self.tasks.retain(|_, task| !task.is_finished());
96    }
97    /// Joins all tasks in the internal registry and returns a map with their results. After the
98    /// operation the registry is cleared
99    pub fn join_all(&mut self) -> BTreeMap<String, thread::Result<T>> {
100        let mut result = BTreeMap::new();
101        for (name, task) in mem::take(&mut self.tasks) {
102            if !task.is_blocking() {
103                result.insert(name, task.join());
104            }
105        }
106        result
107    }
108}
109
110/// A scoped supervisor object
111#[allow(clippy::module_name_repetitions)]
112#[derive(Serialize)]
113pub struct ScopedSupervisor<'a, 'env: 'a, T> {
114    tasks: BTreeMap<String, ScopedTask<'a, T>>,
115    #[serde(skip_serializing)]
116    scope: &'a thread::Scope<'a, 'env>,
117}
118
119impl<'a, 'env, T> ScopedSupervisor<'a, 'env, T> {
120    /// Creates a new scoped supervisor object
121    pub fn new(scope: &'a thread::Scope<'a, 'env>) -> Self {
122        Self {
123            tasks: <_>::default(),
124            scope,
125        }
126    }
127    /// Spawns a new task using a [`Builder`] object and registers it. The task name MUST be unique
128    /// and SHOULD be 15 characters or less to set a proper thread name
129    pub fn spawn<F, B>(&mut self, builder: B, f: F) -> Result<&ScopedTask<T>>
130    where
131        B: Into<Builder>,
132        F: FnOnce() -> T + Send + 'a,
133        T: Send + 'a,
134    {
135        let builder = builder.into();
136        let entry = vacant_entry!(self, builder);
137        let task = builder.spawn_scoped(self.scope, f)?;
138        Ok(entry.insert(task))
139    }
140    /// Spawns a new periodic task using a [`Builder`] object and registers it. The task name MUST
141    /// be unique and SHOULD be 15 characters or less to set a proper thread name
142    pub fn spawn_periodic<F, B>(
143        &mut self,
144        builder: B,
145        f: F,
146        interval: Interval,
147    ) -> Result<&ScopedTask<T>>
148    where
149        F: Fn() -> T + Send + 'a,
150        T: Send + 'a,
151        B: Into<Builder>,
152    {
153        let builder = builder.into();
154        let entry = vacant_entry!(self, builder);
155        let task = builder.spawn_scoped_periodic(self.scope, f, interval)?;
156        Ok(entry.insert(task))
157    }
158    /// Gets a task by its name
159    pub fn get_task(&self, name: &str) -> Option<&ScopedTask<T>> {
160        self.tasks.get(name)
161    }
162    /// Gets a task by its name as a mutable object
163    pub fn get_task_mut(&mut self, name: &str) -> Option<&mut ScopedTask<'a, T>> {
164        self.tasks.get_mut(name)
165    }
166    /// Takes a task by its name and removes it from the internal registry
167    pub fn take_task(&mut self, name: &str) -> Option<ScopedTask<T>> {
168        self.tasks.remove(name)
169    }
170    /// Removes a task from the internal registry
171    pub fn forget_task(&mut self, name: &str) -> Result<()> {
172        if self.tasks.remove(name).is_some() {
173            Ok(())
174        } else {
175            Err(Error::SupervisorTaskNotFound)
176        }
177    }
178    /// Removes all finished tasks from the internal registry
179    pub fn purge(&mut self) {
180        self.tasks.retain(|_, task| !task.is_finished());
181    }
182    /// Joins all tasks in the internal registry and returns a map with their results. After the
183    /// operation the registry is cleared
184    pub fn join_all(&mut self) -> BTreeMap<String, thread::Result<T>> {
185        let mut result = BTreeMap::new();
186        for (name, task) in mem::take(&mut self.tasks) {
187            if !task.is_blocking() {
188                result.insert(name, task.join());
189            }
190        }
191        result
192    }
193}