zenoh_sync/condition.rs
1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{pin::Pin, sync::MutexGuard};
15
16use event_listener::{Event, EventListener};
17use tokio::sync::MutexGuard as AsyncMutexGuard;
18
19pub type ConditionWaiter = Pin<Box<EventListener>>;
20/// This is a Condition Variable similar to that provided by POSIX.
21/// As for POSIX condition variables, this assumes that a mutex is
22/// properly used to coordinate behaviour. In other terms there should
23/// not be race condition on [notify_one](Condition::notify_one) or
24/// [notify_all](Condition::notify_all).
25///
26pub struct Condition {
27 event: Event,
28}
29
30impl Default for Condition {
31 fn default() -> Condition {
32 Condition {
33 event: Event::new(),
34 }
35 }
36}
37
38impl Condition {
39 /// Creates a new condition variable with a given capacity.
40 /// The capacity indicates the maximum number of tasks that
41 /// may be waiting on the condition.
42 pub fn new() -> Condition {
43 Condition::default()
44 }
45
46 /// Waits for the condition to be notified
47 #[inline]
48 pub async fn wait<T>(&self, guard: AsyncMutexGuard<'_, T>) {
49 let listener = self.event.listen();
50 drop(guard);
51 listener.await;
52 }
53
54 #[inline]
55 pub fn waiter<T>(&self, guard: MutexGuard<'_, T>) -> ConditionWaiter {
56 let listener = self.event.listen();
57 drop(guard);
58 Box::pin(listener)
59 }
60
61 /// Notifies one pending listener
62 #[inline]
63 pub fn notify_one(&self) {
64 self.event.notify_additional_relaxed(1);
65 }
66
67 /// Notifies all pending listeners
68 #[inline]
69 pub fn notify_all(&self) {
70 self.event.notify_additional_relaxed(usize::MAX);
71 }
72}