do_not_use_testing_rclrs/
executor.rs1use crate::rcl_bindings::rcl_context_is_valid;
2use crate::{Node, RclReturnCode, RclrsError, WaitSet};
3use std::sync::{Arc, Mutex, Weak};
4use std::time::Duration;
5
6pub struct SingleThreadedExecutor {
8 nodes_mtx: Mutex<Vec<Weak<Node>>>,
9}
10
11impl Default for SingleThreadedExecutor {
12 fn default() -> Self {
13 Self::new()
14 }
15}
16
17impl SingleThreadedExecutor {
18 pub fn new() -> Self {
20 SingleThreadedExecutor {
21 nodes_mtx: Mutex::new(Vec::new()),
22 }
23 }
24
25 pub fn add_node(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
27 { self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node));
28 Ok(())
29 }
30
31 pub fn remove_node(&self, node: Arc<Node>) -> Result<(), RclrsError> {
33 { self.nodes_mtx.lock().unwrap() }
34 .retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false));
35 Ok(())
36 }
37
38 pub fn spin_once(&self, timeout: Option<Duration>) -> Result<(), RclrsError> {
42 for node in { self.nodes_mtx.lock().unwrap() }
43 .iter()
44 .filter_map(Weak::upgrade)
45 .filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) })
46 {
47 let wait_set = WaitSet::new_for_node(&node)?;
48 let ready_entities = wait_set.wait(timeout)?;
49
50 for ready_subscription in ready_entities.subscriptions {
51 ready_subscription.execute()?;
52 }
53
54 for ready_client in ready_entities.clients {
55 ready_client.execute()?;
56 }
57
58 for ready_service in ready_entities.services {
59 ready_service.execute()?;
60 }
61 }
62
63 Ok(())
64 }
65
66 pub fn spin(&self) -> Result<(), RclrsError> {
68 while !{ self.nodes_mtx.lock().unwrap() }.is_empty() {
69 match self.spin_once(None) {
70 Ok(_)
71 | Err(RclrsError::RclError {
72 code: RclReturnCode::Timeout,
73 ..
74 }) => std::thread::yield_now(),
75 error => return error,
76 }
77 }
78
79 Ok(())
80 }
81}