zenoh_sync/
condition.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{pin::Pin, sync::MutexGuard};
15
16use event_listener::{Event, EventListener};
17use tokio::sync::MutexGuard as AsyncMutexGuard;
18
19pub type ConditionWaiter = Pin<Box<EventListener>>;
20/// This is a Condition Variable similar to that provided by POSIX.
21/// As for POSIX condition variables, this assumes that a mutex is
22/// properly used to coordinate behaviour. In other terms there should
23/// not be race condition on [notify_one](Condition::notify_one) or
24/// [notify_all](Condition::notify_all).
25///
26pub struct Condition {
27    event: Event,
28}
29
30impl Default for Condition {
31    fn default() -> Condition {
32        Condition {
33            event: Event::new(),
34        }
35    }
36}
37
38impl Condition {
39    /// Creates a new condition variable with a given capacity.
40    /// The capacity indicates the maximum number of tasks that
41    /// may be waiting on the condition.
42    pub fn new() -> Condition {
43        Condition::default()
44    }
45
46    /// Waits for the condition to be notified
47    #[inline]
48    pub async fn wait<T>(&self, guard: AsyncMutexGuard<'_, T>) {
49        let listener = self.event.listen();
50        drop(guard);
51        listener.await;
52    }
53
54    #[inline]
55    pub fn waiter<T>(&self, guard: MutexGuard<'_, T>) -> ConditionWaiter {
56        let listener = self.event.listen();
57        drop(guard);
58        Box::pin(listener)
59    }
60
61    /// Notifies one pending listener
62    #[inline]
63    pub fn notify_one(&self) {
64        self.event.notify_additional_relaxed(1);
65    }
66
67    /// Notifies all pending listeners
68    #[inline]
69    pub fn notify_all(&self) {
70        self.event.notify_additional_relaxed(usize::MAX);
71    }
72}