zenoh_sync/condition.rs
1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{pin::Pin, sync::MutexGuard};
15
16use event_listener::{Event, EventListener};
17use tokio::sync::MutexGuard as AsyncMutexGuard;
18
19pub type ConditionWaiter = Pin<Box<EventListener>>;
20/// This is a Condition Variable similar to that provided by POSIX.
21/// As for POSIX condition variables, this assumes that a mutex is
22/// properly used to coordinate behaviour. In other terms there should
23/// not be race condition on [notify_one](Condition::notify_one) or
24/// [notify_all](Condition::notify_all).
25///
26#[derive(Default)]
27pub struct Condition {
28 event: Event,
29}
30
31impl Condition {
32 /// Creates a new condition variable with a given capacity.
33 /// The capacity indicates the maximum number of tasks that
34 /// may be waiting on the condition.
35 pub fn new() -> Condition {
36 Condition::default()
37 }
38
39 /// Waits for the condition to be notified
40 #[inline]
41 pub async fn wait<T>(&self, guard: AsyncMutexGuard<'_, T>) {
42 let listener = self.event.listen();
43 drop(guard);
44 listener.await;
45 }
46
47 #[inline]
48 pub fn waiter<T>(&self, guard: MutexGuard<'_, T>) -> ConditionWaiter {
49 let listener = self.event.listen();
50 drop(guard);
51 Box::pin(listener)
52 }
53
54 /// Notifies one pending listener
55 #[inline]
56 pub fn notify_one(&self) {
57 self.event.notify_additional_relaxed(1);
58 }
59
60 /// Notifies all pending listeners
61 #[inline]
62 pub fn notify_all(&self) {
63 self.event.notify_additional_relaxed(usize::MAX);
64 }
65}