1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
use std::{collections::HashMap, sync::Arc, time::Duration, vec::Vec};
use crate::{
error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult},
log_error,
rcl_bindings::*,
Context, ContextHandle,
};
mod guard_condition;
pub use guard_condition::*;
mod rcl_primitive;
pub use rcl_primitive::*;
mod waitable;
pub use waitable::*;
mod wait_set_runner;
pub use wait_set_runner::*;
/// A struct for waiting on subscriptions and other waitable entities to become ready.
pub struct WaitSet {
primitives: HashMap<RclPrimitiveKind, Vec<Waitable>>,
handle: WaitSetHandle,
}
// SAFETY: While the rcl_wait_set_t does have some interior mutability (because it has
// members of non-const pointer type), this interior mutability is hidden/not used by
// the WaitSet type. Therefore, sharing &WaitSet between threads does not risk data races.
unsafe impl Sync for WaitSet {}
impl WaitSet {
/// Creates a new empty wait set.
pub fn new(context: &Context) -> Result<Self, RclrsError> {
let count = WaitableCount::new();
let rcl_wait_set =
unsafe { count.initialize(&mut context.handle.rcl_context.lock().unwrap())? };
let handle = WaitSetHandle {
rcl_wait_set,
context_handle: Arc::clone(&context.handle),
};
let mut wait_set = Self {
primitives: HashMap::new(),
handle,
};
wait_set.register_rcl_primitives()?;
Ok(wait_set)
}
/// Take all the items out of `entities` and move them into this wait set.
pub fn add(&mut self, entities: impl IntoIterator<Item = Waitable>) -> Result<(), RclrsError> {
for entity in entities {
if entity.in_wait_set() {
return Err(RclrsError::AlreadyAddedToWaitSet);
}
let kind = entity.primitive.kind();
self.primitives.entry(kind).or_default().push(entity);
}
self.resize_rcl_containers()?;
self.register_rcl_primitives()?;
Ok(())
}
/// Removes all entities from the wait set.
///
/// This effectively resets the wait set to the state it was in after being created by
/// [`WaitSet::new`].
pub fn clear(&mut self) {
self.primitives.clear();
self.rcl_clear();
}
/// Blocks until the wait set is ready, or until the timeout has been exceeded.
///
/// If the timeout is `None` then this function will block indefinitely until
/// something in the wait set is valid or it is interrupted.
///
/// If the timeout is [`Duration::ZERO`][1] then this function will be non-blocking; checking what's
/// ready now, but not waiting if nothing is ready yet.
///
/// If the timeout is greater than [`Duration::ZERO`][1] then this function will return after
/// that period of time has elapsed or the wait set becomes ready, which ever
/// comes first.
///
/// Once one or more items in the wait set are ready, `f` will be triggered
/// for each ready item.
///
/// This function does not change the entities registered in the wait set.
///
/// # Errors
///
/// - Passing a wait set with no wait-able items in it will return an error.
/// - The timeout must not be so large so as to overflow an `i64` with its nanosecond
/// representation, or an error will occur.
///
/// This list is not comprehensive, since further errors may occur in the `rmw` or `rcl` layers.
///
/// [1]: std::time::Duration::ZERO
pub fn wait(
&mut self,
timeout: Option<Duration>,
mut f: impl FnMut(ReadyKind, &mut dyn RclPrimitive) -> Result<(), RclrsError>,
) -> Result<(), RclrsError> {
let timeout_ns = match timeout.map(|d| d.as_nanos()) {
None => -1,
Some(ns) if ns <= i64::MAX as u128 => ns as i64,
_ => {
return Err(RclrsError::RclError {
code: RclReturnCode::InvalidArgument,
msg: None,
})
}
};
// SAFETY: The comments in rcl mention "This function cannot operate on the same wait set
// in multiple threads, and the wait sets may not share content."
// * The we have exclusive access to rcl_wait_set because this is a
// mutable borrow of WaitSet, which houses rcl_wait_set.
// * We guarantee that the wait sets do not share content by funneling
// the waitable of each primitive to one (and only one) WaitSet when
// the primitive gets constructed. The waitables are never allowed to
// move between wait sets.
let r = match unsafe { rcl_wait(&mut self.handle.rcl_wait_set, timeout_ns) }.ok() {
Ok(_) => Ok(()),
Err(error) => match error {
RclrsError::RclError { code, msg } => match code {
RclReturnCode::WaitSetEmpty => Ok(()),
_ => Err(RclrsError::RclError { code, msg }),
},
_ => Err(error),
},
};
// Remove any waitables that are no longer being used
for waitable in self.primitives.values_mut() {
waitable.retain(|w| w.in_use());
}
// Do not check the readiness if an error was reported.
if !r.is_err() {
// For the remaining entities, check if they were activated and then run
// the callback for those that were.
for waiter in self.primitives.values_mut().flat_map(|v| v) {
if let Some(ready) = waiter.is_ready(&self.handle.rcl_wait_set) {
f(ready, &mut *waiter.primitive)?;
}
}
}
// Each time we call rcl_wait, the rcl_wait_set_t handle will have some
// of its entities set to null, so we need to put them back in. We do
// not need to resize the rcl_wait_set_t because no new entities could
// have been added while we had the mutable borrow of the WaitSet. Some
// entities could have been removed, but that does not require a resizing.
// Note that self.clear() will not change the allocated size of each rcl
// entity container, so we do not need to resize before re-registering
// the rcl entities.
self.rcl_clear();
if let Err(err) = self.register_rcl_primitives() {
log_error!(
"rclrs.WaitSet.wait",
"Error while registering rcl primitives: {err}",
);
}
r
}
/// Get a count of the different kinds of entities in the wait set.
pub fn count(&self) -> WaitableCount {
let mut c = WaitableCount::new();
for (kind, collection) in &self.primitives {
c.add_group(kind, collection);
}
c
}
fn resize_rcl_containers(&mut self) -> Result<(), RclrsError> {
let count = self.count();
unsafe {
count.resize(&mut self.handle.rcl_wait_set)?;
}
Ok(())
}
/// Clear only the rcl_wait_set. This is done so that we can safely repopulate
/// it to perform another wait. This does not effect the entities that we
/// consider to still be in the wait set.
fn rcl_clear(&mut self) {
// This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
// valid, which it always is in our case. Hence, only debug_assert instead of returning
// Result.
// SAFETY: No preconditions for this function (besides passing in a valid wait set).
let ret = unsafe { rcl_wait_set_clear(&mut self.handle.rcl_wait_set) };
debug_assert_eq!(ret, 0);
}
/// Registers all the waitable entities with the rcl wait set.
///
/// # Errors
/// - If the number of subscriptions in the wait set is larger than the
/// allocated size [`WaitSetFull`][1] will be returned. If this happens
/// then there is a bug in rclrs.
///
/// [1]: crate::RclReturnCode
fn register_rcl_primitives(&mut self) -> Result<(), RclrsError> {
for entity in self.primitives.values_mut().flat_map(|c| c) {
entity.add_to_wait_set(&mut self.handle.rcl_wait_set)?;
}
Ok(())
}
}
impl Drop for rcl_wait_set_t {
fn drop(&mut self) {
// SAFETY: No preconditions for this function (besides passing in a valid wait set).
let rc = unsafe { rcl_wait_set_fini(self) };
if let Err(e) = to_rclrs_result(rc) {
panic!("Unable to release WaitSet. {:?}", e)
}
}
}
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_wait_set_t {}
/// Manage the lifecycle of an `rcl_wait_set_t`, including managing its dependency
/// on `rcl_context_t` by ensuring that this dependency is [dropped after][1] the
/// `rcl_wait_set_t`.
///
/// [1]: <https://doc.rust-lang.org/reference/destructors.html>
struct WaitSetHandle {
pub(crate) rcl_wait_set: rcl_wait_set_t,
// Used to ensure the context is alive while the wait set is alive.
#[allow(dead_code)]
context_handle: Arc<ContextHandle>,
}
#[cfg(test)]
mod tests {
use crate::*;
use std::time::Duration;
#[test]
fn traits() {
use crate::test_helpers::*;
assert_send::<WaitSet>();
assert_sync::<WaitSet>();
}
#[test]
fn guard_condition_in_wait_set_readies() -> Result<(), RclrsError> {
let mut executor = Context::default().create_basic_executor();
// After spinning has started, wait a moment and then wake up the wait sets.
// TODO(@mxgrey): When we have timers, change this to use a one-shot timer instead.
let commands = executor.commands().clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(1));
commands.wake_all_wait_sets();
});
let start = std::time::Instant::now();
// This should stop spinning right away because the guard condition was
// already triggered.
executor
.spin(SpinOptions::spin_once().timeout(Duration::from_secs(10)))
.first_error()?;
// If it took more than a second to finish spinning then something is
// probably wrong.
//
// Note that this test could theoretically be flaky if it runs on a
// machine with very strange CPU scheduling behaviors. To have a test
// that is guaranteed to be stable we could write a custom executor for
// testing that will give us more introspection.
assert!(std::time::Instant::now() - start < Duration::from_secs(1));
Ok(())
}
}