zenoh_sync/condition.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{pin::Pin, sync::MutexGuard};
use event_listener::{Event, EventListener};
use tokio::sync::MutexGuard as AsyncMutexGuard;
pub type ConditionWaiter = Pin<Box<EventListener>>;
/// This is a Condition Variable similar to that provided by POSIX.
/// As for POSIX condition variables, this assumes that a mutex is
/// properly used to coordinate behaviour. In other terms there should
/// not be race condition on [notify_one](Condition::notify_one) or
/// [notify_all](Condition::notify_all).
///
pub struct Condition {
event: Event,
}
impl Default for Condition {
fn default() -> Condition {
Condition {
event: Event::new(),
}
}
}
impl Condition {
/// Creates a new condition variable with a given capacity.
/// The capacity indicates the maximum number of tasks that
/// may be waiting on the condition.
pub fn new() -> Condition {
Condition::default()
}
/// Waits for the condition to be notified
#[inline]
pub async fn wait<T>(&self, guard: AsyncMutexGuard<'_, T>) {
let listener = self.event.listen();
drop(guard);
listener.await;
}
#[inline]
pub fn waiter<T>(&self, guard: MutexGuard<'_, T>) -> ConditionWaiter {
let listener = self.event.listen();
drop(guard);
Box::pin(listener)
}
/// Notifies one pending listener
#[inline]
pub fn notify_one(&self) {
self.event.notify_additional_relaxed(1);
}
/// Notifies all pending listeners
#[inline]
pub fn notify_all(&self) {
self.event.notify_additional_relaxed(usize::MAX);
}
}