do_not_use_testing_rclrs/
wait.rs

1// Copyright 2020 DCS Corporation, All Rights Reserved.
2
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6
7//     http://www.apache.org/licenses/LICENSE-2.0
8
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// DISTRIBUTION A. Approved for public release; distribution unlimited.
16// OPSEC #4584.
17
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use std::vec::Vec;
21
22use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult};
23use crate::rcl_bindings::*;
24use crate::{ClientBase, Context, Node, ServiceBase, SubscriptionBase};
25
26mod exclusivity_guard;
27mod guard_condition;
28use exclusivity_guard::*;
29pub use guard_condition::*;
30
31/// A struct for waiting on subscriptions and other waitable entities to become ready.
32pub struct WaitSet {
33    rcl_wait_set: rcl_wait_set_t,
34    // Used to ensure the context is alive while the wait set is alive.
35    _rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
36    // The subscriptions that are currently registered in the wait set.
37    // This correspondence is an invariant that must be maintained by all functions,
38    // even in the error case.
39    subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
40    clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
41    // The guard conditions that are currently registered in the wait set.
42    guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
43    services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
44}
45
46/// A list of entities that are ready, returned by [`WaitSet::wait`].
47pub struct ReadyEntities {
48    /// A list of subscriptions that have potentially received messages.
49    pub subscriptions: Vec<Arc<dyn SubscriptionBase>>,
50    /// A list of clients that have potentially received responses.
51    pub clients: Vec<Arc<dyn ClientBase>>,
52    /// A list of guard conditions that have been triggered.
53    pub guard_conditions: Vec<Arc<GuardCondition>>,
54    /// A list of services that have potentially received requests.
55    pub services: Vec<Arc<dyn ServiceBase>>,
56}
57
58impl Drop for rcl_wait_set_t {
59    fn drop(&mut self) {
60        // SAFETY: No preconditions for this function (besides passing in a valid wait set).
61        let rc = unsafe { rcl_wait_set_fini(self) };
62        if let Err(e) = to_rclrs_result(rc) {
63            panic!("Unable to release WaitSet. {:?}", e)
64        }
65    }
66}
67
68// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
69// they are running in. Therefore, this type can be safely sent to another thread.
70unsafe impl Send for rcl_wait_set_t {}
71
72// SAFETY: While the rcl_wait_set_t does have some interior mutability (because it has
73// members of non-const pointer type), this interior mutability is hidden/not used by
74// the WaitSet type. Therefore, sharing &WaitSet between threads does not risk data races.
75unsafe impl Sync for WaitSet {}
76
77impl WaitSet {
78    /// Creates a new wait set.
79    ///
80    /// The given number of subscriptions is a capacity, corresponding to how often
81    /// [`WaitSet::add_subscription`] may be called.
82    pub fn new(
83        number_of_subscriptions: usize,
84        number_of_guard_conditions: usize,
85        number_of_timers: usize,
86        number_of_clients: usize,
87        number_of_services: usize,
88        number_of_events: usize,
89        context: &Context,
90    ) -> Result<Self, RclrsError> {
91        let rcl_wait_set = unsafe {
92            // SAFETY: Getting a zero-initialized value is always safe
93            let mut rcl_wait_set = rcl_get_zero_initialized_wait_set();
94            // SAFETY: We're passing in a zero-initialized wait set and a valid context.
95            // There are no other preconditions.
96            rcl_wait_set_init(
97                &mut rcl_wait_set,
98                number_of_subscriptions,
99                number_of_guard_conditions,
100                number_of_timers,
101                number_of_clients,
102                number_of_services,
103                number_of_events,
104                &mut *context.rcl_context_mtx.lock().unwrap(),
105                rcutils_get_default_allocator(),
106            )
107            .ok()?;
108            rcl_wait_set
109        };
110        Ok(Self {
111            rcl_wait_set,
112            _rcl_context_mtx: context.rcl_context_mtx.clone(),
113            subscriptions: Vec::new(),
114            guard_conditions: Vec::new(),
115            clients: Vec::new(),
116            services: Vec::new(),
117        })
118    }
119
120    /// Creates a new wait set and adds all waitable entities in the node to it.
121    ///
122    /// The wait set is sized to fit the node exactly, so there is no capacity for adding other entities.
123    pub fn new_for_node(node: &Node) -> Result<Self, RclrsError> {
124        let live_subscriptions = node.live_subscriptions();
125        let live_clients = node.live_clients();
126        let live_guard_conditions = node.live_guard_conditions();
127        let live_services = node.live_services();
128        let ctx = Context {
129            rcl_context_mtx: node.rcl_context_mtx.clone(),
130        };
131        let mut wait_set = WaitSet::new(
132            live_subscriptions.len(),
133            live_guard_conditions.len(),
134            0,
135            live_clients.len(),
136            live_services.len(),
137            0,
138            &ctx,
139        )?;
140
141        for live_subscription in &live_subscriptions {
142            wait_set.add_subscription(live_subscription.clone())?;
143        }
144
145        for live_client in &live_clients {
146            wait_set.add_client(live_client.clone())?;
147        }
148
149        for live_guard_condition in &live_guard_conditions {
150            wait_set.add_guard_condition(live_guard_condition.clone())?;
151        }
152
153        for live_service in &live_services {
154            wait_set.add_service(live_service.clone())?;
155        }
156        Ok(wait_set)
157    }
158
159    /// Removes all entities from the wait set.
160    ///
161    /// This effectively resets the wait set to the state it was in after being created by
162    /// [`WaitSet::new`].
163    pub fn clear(&mut self) {
164        self.subscriptions.clear();
165        self.guard_conditions.clear();
166        self.clients.clear();
167        self.services.clear();
168        // This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
169        // valid, which it always is in our case. Hence, only debug_assert instead of returning
170        // Result.
171        // SAFETY: No preconditions for this function (besides passing in a valid wait set).
172        let ret = unsafe { rcl_wait_set_clear(&mut self.rcl_wait_set) };
173        debug_assert_eq!(ret, 0);
174    }
175
176    /// Adds a subscription to the wait set.
177    ///
178    /// # Errors
179    /// - If the subscription was already added to this wait set or another one,
180    ///   [`AlreadyAddedToWaitSet`][1] will be returned
181    /// - If the number of subscriptions in the wait set is larger than the
182    ///   capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
183    ///
184    /// [1]: crate::RclrsError
185    /// [2]: crate::RclReturnCode
186    pub fn add_subscription(
187        &mut self,
188        subscription: Arc<dyn SubscriptionBase>,
189    ) -> Result<(), RclrsError> {
190        let exclusive_subscription = ExclusivityGuard::new(
191            Arc::clone(&subscription),
192            Arc::clone(&subscription.handle().in_use_by_wait_set),
193        )?;
194        unsafe {
195            // SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
196            // for as long as the wait set exists, because it's stored in self.subscriptions.
197            // Passing in a null pointer for the third argument is explicitly allowed.
198            rcl_wait_set_add_subscription(
199                &mut self.rcl_wait_set,
200                &*subscription.handle().lock(),
201                std::ptr::null_mut(),
202            )
203        }
204        .ok()?;
205        self.subscriptions.push(exclusive_subscription);
206        Ok(())
207    }
208
209    /// Adds a guard condition to the wait set.
210    ///
211    /// # Errors
212    /// - If the guard condition was already added to this wait set or another one,
213    ///   [`AlreadyAddedToWaitSet`][1] will be returned
214    /// - If the number of guard conditions in the wait set is larger than the
215    ///   capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
216    ///
217    /// [1]: crate::RclrsError
218    /// [2]: crate::RclReturnCode
219    pub fn add_guard_condition(
220        &mut self,
221        guard_condition: Arc<GuardCondition>,
222    ) -> Result<(), RclrsError> {
223        let exclusive_guard_condition = ExclusivityGuard::new(
224            Arc::clone(&guard_condition),
225            Arc::clone(&guard_condition.in_use_by_wait_set),
226        )?;
227
228        unsafe {
229            // SAFETY: Safe if the wait set and guard condition are initialized
230            rcl_wait_set_add_guard_condition(
231                &mut self.rcl_wait_set,
232                &*guard_condition.rcl_guard_condition.lock().unwrap(),
233                std::ptr::null_mut(),
234            )
235            .ok()?;
236        }
237        self.guard_conditions.push(exclusive_guard_condition);
238        Ok(())
239    }
240
241    /// Adds a client to the wait set.
242    ///
243    /// # Errors
244    /// - If the client was already added to this wait set or another one,
245    ///   [`AlreadyAddedToWaitSet`][1] will be returned
246    /// - If the number of clients in the wait set is larger than the
247    ///   capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
248    ///
249    /// [1]: crate::RclrsError
250    /// [2]: crate::RclReturnCode
251    pub fn add_client(&mut self, client: Arc<dyn ClientBase>) -> Result<(), RclrsError> {
252        let exclusive_client = ExclusivityGuard::new(
253            Arc::clone(&client),
254            Arc::clone(&client.handle().in_use_by_wait_set),
255        )?;
256        unsafe {
257            // SAFETY: I'm not sure if it's required, but the client pointer will remain valid
258            // for as long as the wait set exists, because it's stored in self.clients.
259            // Passing in a null pointer for the third argument is explicitly allowed.
260            rcl_wait_set_add_client(
261                &mut self.rcl_wait_set,
262                &*client.handle().lock() as *const _,
263                core::ptr::null_mut(),
264            )
265        }
266        .ok()?;
267        self.clients.push(exclusive_client);
268        Ok(())
269    }
270
271    /// Adds a service to the wait set.
272    ///
273    /// # Errors
274    /// - If the service was already added to this wait set or another one,
275    ///   [`AlreadyAddedToWaitSet`][1] will be returned
276    /// - If the number of services in the wait set is larger than the
277    ///   capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
278    ///
279    /// [1]: crate::RclrsError
280    /// [2]: crate::RclReturnCode
281    pub fn add_service(&mut self, service: Arc<dyn ServiceBase>) -> Result<(), RclrsError> {
282        let exclusive_service = ExclusivityGuard::new(
283            Arc::clone(&service),
284            Arc::clone(&service.handle().in_use_by_wait_set),
285        )?;
286        unsafe {
287            // SAFETY: I'm not sure if it's required, but the service pointer will remain valid
288            // for as long as the wait set exists, because it's stored in self.services.
289            // Passing in a null pointer for the third argument is explicitly allowed.
290            rcl_wait_set_add_service(
291                &mut self.rcl_wait_set,
292                &*service.handle().lock() as *const _,
293                core::ptr::null_mut(),
294            )
295        }
296        .ok()?;
297        self.services.push(exclusive_service);
298        Ok(())
299    }
300
301    /// Blocks until the wait set is ready, or until the timeout has been exceeded.
302    ///
303    /// If the timeout is `None` then this function will block indefinitely until
304    /// something in the wait set is valid or it is interrupted.
305    ///
306    /// If the timeout is [`Duration::ZERO`][1] then this function will be non-blocking; checking what's
307    /// ready now, but not waiting if nothing is ready yet.
308    ///
309    /// If the timeout is greater than [`Duration::ZERO`][1] then this function will return after
310    /// that period of time has elapsed or the wait set becomes ready, which ever
311    /// comes first.
312    ///
313    /// This function does not change the entities registered in the wait set.
314    ///
315    /// # Errors
316    ///
317    /// - Passing a wait set with no wait-able items in it will return an error.
318    /// - The timeout must not be so large so as to overflow an `i64` with its nanosecond
319    /// representation, or an error will occur.
320    ///
321    /// This list is not comprehensive, since further errors may occur in the `rmw` or `rcl` layers.
322    ///
323    /// [1]: std::time::Duration::ZERO
324    pub fn wait(mut self, timeout: Option<Duration>) -> Result<ReadyEntities, RclrsError> {
325        let timeout_ns = match timeout.map(|d| d.as_nanos()) {
326            None => -1,
327            Some(ns) if ns <= i64::MAX as u128 => ns as i64,
328            _ => {
329                return Err(RclrsError::RclError {
330                    code: RclReturnCode::InvalidArgument,
331                    msg: None,
332                })
333            }
334        };
335        // SAFETY: The comments in rcl mention "This function cannot operate on the same wait set
336        // in multiple threads, and the wait sets may not share content."
337        // We cannot currently guarantee that the wait sets may not share content, but it is
338        // mentioned in the doc comment for `add_subscription`.
339        // Also, the rcl_wait_set is obviously valid.
340        match unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok() {
341            Ok(_) => (),
342            Err(error) => match error {
343                RclrsError::RclError { code, msg } => match code {
344                    RclReturnCode::WaitSetEmpty => (),
345                    _ => return Err(RclrsError::RclError { code, msg }),
346                },
347                _ => return Err(error),
348            },
349        }
350        let mut ready_entities = ReadyEntities {
351            subscriptions: Vec::new(),
352            clients: Vec::new(),
353            guard_conditions: Vec::new(),
354            services: Vec::new(),
355        };
356        for (i, subscription) in self.subscriptions.iter().enumerate() {
357            // SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is
358            // equivalent to
359            // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
360            let wait_set_entry = unsafe { *self.rcl_wait_set.subscriptions.add(i) };
361            if !wait_set_entry.is_null() {
362                ready_entities
363                    .subscriptions
364                    .push(Arc::clone(&subscription.waitable));
365            }
366        }
367
368        for (i, client) in self.clients.iter().enumerate() {
369            // SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
370            // equivalent to
371            // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
372            let wait_set_entry = unsafe { *self.rcl_wait_set.clients.add(i) };
373            if !wait_set_entry.is_null() {
374                ready_entities.clients.push(Arc::clone(&client.waitable));
375            }
376        }
377
378        for (i, guard_condition) in self.guard_conditions.iter().enumerate() {
379            // SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
380            // equivalent to
381            // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
382            let wait_set_entry = unsafe { *self.rcl_wait_set.guard_conditions.add(i) };
383            if !wait_set_entry.is_null() {
384                ready_entities
385                    .guard_conditions
386                    .push(Arc::clone(&guard_condition.waitable));
387            }
388        }
389
390        for (i, service) in self.services.iter().enumerate() {
391            // SAFETY: The `services` entry is an array of pointers, and this dereferencing is
392            // equivalent to
393            // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
394            let wait_set_entry = unsafe { *self.rcl_wait_set.services.add(i) };
395            if !wait_set_entry.is_null() {
396                ready_entities.services.push(Arc::clone(&service.waitable));
397            }
398        }
399        Ok(ready_entities)
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    fn assert_send<T: Send>() {}
408    fn assert_sync<T: Sync>() {}
409
410    #[test]
411    fn wait_set_is_send_and_sync() {
412        assert_send::<WaitSet>();
413        assert_sync::<WaitSet>();
414    }
415
416    #[test]
417    fn guard_condition_in_wait_set_readies() -> Result<(), RclrsError> {
418        let context = Context::new([])?;
419
420        let guard_condition = Arc::new(GuardCondition::new(&context));
421
422        let mut wait_set = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
423        wait_set.add_guard_condition(Arc::clone(&guard_condition))?;
424        guard_condition.trigger()?;
425
426        let readies = wait_set.wait(Some(std::time::Duration::from_millis(10)))?;
427        assert!(readies.guard_conditions.contains(&guard_condition));
428
429        Ok(())
430    }
431}