do_not_use_testing_rclrs/
executor.rs

1use crate::rcl_bindings::rcl_context_is_valid;
2use crate::{Node, RclReturnCode, RclrsError, WaitSet};
3use std::sync::{Arc, Mutex, Weak};
4use std::time::Duration;
5
6/// Single-threaded executor implementation.
7pub struct SingleThreadedExecutor {
8    nodes_mtx: Mutex<Vec<Weak<Node>>>,
9}
10
11impl Default for SingleThreadedExecutor {
12    fn default() -> Self {
13        Self::new()
14    }
15}
16
17impl SingleThreadedExecutor {
18    /// Creates a new executor.
19    pub fn new() -> Self {
20        SingleThreadedExecutor {
21            nodes_mtx: Mutex::new(Vec::new()),
22        }
23    }
24
25    /// Add a node to the executor.
26    pub fn add_node(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
27        { self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node));
28        Ok(())
29    }
30
31    /// Remove a node from the executor.
32    pub fn remove_node(&self, node: Arc<Node>) -> Result<(), RclrsError> {
33        { self.nodes_mtx.lock().unwrap() }
34            .retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false));
35        Ok(())
36    }
37
38    /// Polls the nodes for new messages and executes the corresponding callbacks.
39    ///
40    /// This function additionally checks that the context is still valid.
41    pub fn spin_once(&self, timeout: Option<Duration>) -> Result<(), RclrsError> {
42        for node in { self.nodes_mtx.lock().unwrap() }
43            .iter()
44            .filter_map(Weak::upgrade)
45            .filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) })
46        {
47            let wait_set = WaitSet::new_for_node(&node)?;
48            let ready_entities = wait_set.wait(timeout)?;
49
50            for ready_subscription in ready_entities.subscriptions {
51                ready_subscription.execute()?;
52            }
53
54            for ready_client in ready_entities.clients {
55                ready_client.execute()?;
56            }
57
58            for ready_service in ready_entities.services {
59                ready_service.execute()?;
60            }
61        }
62
63        Ok(())
64    }
65
66    /// Convenience function for calling [`SingleThreadedExecutor::spin_once`] in a loop.
67    pub fn spin(&self) -> Result<(), RclrsError> {
68        while !{ self.nodes_mtx.lock().unwrap() }.is_empty() {
69            match self.spin_once(None) {
70                Ok(_)
71                | Err(RclrsError::RclError {
72                    code: RclReturnCode::Timeout,
73                    ..
74                }) => std::thread::yield_now(),
75                error => return error,
76            }
77        }
78
79        Ok(())
80    }
81}