#[cfg(all(test, feature = "tokio"))]
#[path = "../../tests/utils/sync.rs"]
mod tests;
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use cfg_if::cfg_if;
#[cfg(all(feature = "server", feature = "tokio"))]
use crossbeam::queue::ArrayQueue;
#[cfg(feature = "server")]
use log::debug;
cfg_if! {
if #[cfg(feature = "client")] {
use std::pin::Pin;
use futures::stream::{FuturesUnordered, StreamExt};
}
}
cfg_if! {
if #[cfg(feature = "tokio")] {
use crossbeam::queue::SegQueue;
use tokio::sync::Notify;
use tokio::time::sleep as tokio_sleep;
use tokio::runtime::{Handle, RuntimeFlavor};
pub use tokio::sync::{RwLock, Mutex};
} else if #[cfg(feature = "async-std")] {
pub use async_lock::{RwLock, Mutex};
use async_channel::{Receiver, Sender, bounded, unbounded};
use async_io::Timer;
}
}
#[cfg(feature = "tokio")]
pub fn assert_runtime() -> Result<(), &'static str> {
let handle = Handle::try_current().map_err(|_| "no tokio runtime in scope")?;
if matches!(handle.runtime_flavor(), RuntimeFlavor::MultiThread) {
Ok(())
} else {
Err("TYPHOON requires a multi-threaded tokio runtime (use `#[tokio::main]` or `Builder::new_multi_thread()`)")
}
}
#[cfg(feature = "async-std")]
pub fn assert_runtime() -> Result<(), &'static str> {
Ok(())
}
pub trait AsyncExecutor: Clone + Send + Sync {
fn new() -> Self;
fn spawn<F: Future<Output = ()> + Send + 'static>(&self, future: F);
fn block_on<F: Future<Output = ()>>(&self, future: F);
}
struct WatchState<T> {
value: std::sync::Mutex<Option<T>>,
closed: AtomicBool,
receiver_count: AtomicUsize,
#[cfg(feature = "tokio")]
notify: Notify,
#[cfg(feature = "async-std")]
notifiers: std::sync::Mutex<Vec<Sender<()>>>,
}
pub struct WatchSender<T: Send> {
state: Arc<WatchState<T>>,
}
pub struct WatchReceiver<T> {
state: Arc<WatchState<T>>,
#[cfg(feature = "async-std")]
notify: Receiver<()>,
}
impl<T: Send> WatchSender<T> {
pub fn send(&self, value: T) -> bool {
*self.state.value.lock().unwrap() = Some(value);
#[cfg(feature = "tokio")]
self.state.notify.notify_waiters();
#[cfg(feature = "async-std")]
{
let notifiers = self.state.notifiers.lock().unwrap();
for tx in notifiers.iter() {
let _ = tx.try_send(());
}
}
self.state.receiver_count.load(Ordering::Relaxed) > 0
}
#[cfg(feature = "client")]
pub fn subscribe(&self) -> WatchReceiver<T> {
self.state.receiver_count.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "tokio")]
return WatchReceiver {
state: Arc::clone(&self.state),
};
#[cfg(feature = "async-std")]
{
let (tx, rx) = bounded(1);
self.state.notifiers.lock().unwrap().push(tx);
WatchReceiver {
state: Arc::clone(&self.state),
notify: rx,
}
}
}
}
impl<T: Send> Drop for WatchSender<T> {
fn drop(&mut self) {
self.state.closed.store(true, Ordering::Release);
#[cfg(feature = "tokio")]
self.state.notify.notify_waiters();
#[cfg(feature = "async-std")]
{
let mut notifiers = self.state.notifiers.lock().unwrap();
for tx in notifiers.drain(..) {
let _ = tx.try_send(());
}
}
}
}
impl<T> Drop for WatchReceiver<T> {
fn drop(&mut self) {
self.state.receiver_count.fetch_sub(1, Ordering::Release);
}
}
impl<T: Send> WatchReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
loop {
#[cfg(feature = "tokio")]
let mut notified = std::pin::pin!(self.state.notify.notified());
#[cfg(feature = "tokio")]
notified.as_mut().enable();
{
let mut guard = self.state.value.lock().unwrap();
if let Some(v) = guard.take() {
return Some(v);
}
if self.state.closed.load(Ordering::Acquire) {
return None;
}
}
#[cfg(feature = "tokio")]
notified.await;
#[cfg(feature = "async-std")]
{
self.notify.recv().await.ok();
}
}
}
}
#[cfg(feature = "tokio")]
pub fn create_watch<T: Send>() -> (WatchSender<T>, WatchReceiver<T>) {
let state = Arc::new(WatchState {
value: std::sync::Mutex::new(None),
closed: AtomicBool::new(false),
receiver_count: AtomicUsize::new(1),
notify: Notify::new(),
});
(
WatchSender {
state: Arc::clone(&state),
},
WatchReceiver {
state,
},
)
}
#[cfg(feature = "async-std")]
pub fn create_watch<T: Send>() -> (WatchSender<T>, WatchReceiver<T>) {
let (tx, rx) = bounded(1);
let state = Arc::new(WatchState {
value: std::sync::Mutex::new(None),
closed: AtomicBool::new(false),
receiver_count: AtomicUsize::new(1),
notifiers: std::sync::Mutex::new(vec![tx]),
});
(
WatchSender {
state: Arc::clone(&state),
},
WatchReceiver {
state,
notify: rx,
},
)
}
cfg_if! {
if #[cfg(feature = "tokio")] {
struct NotifyQueueState<T> {
queue: SegQueue<T>,
notify: Notify,
closed: AtomicBool,
}
pub struct NotifyQueueSender<T: Send>(Arc<NotifyQueueState<T>>);
pub struct NotifyQueueReceiver<T: Send>(Arc<NotifyQueueState<T>>);
impl<T: Send> NotifyQueueSender<T> {
pub fn push(&self, item: T) {
self.0.queue.push(item);
self.0.notify.notify_one();
}
}
impl<T: Send> Drop for NotifyQueueSender<T> {
fn drop(&mut self) {
self.0.closed.store(true, Ordering::Release);
self.0.notify.notify_waiters();
}
}
impl<T: Send> NotifyQueueReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
loop {
if let Some(item) = self.0.queue.pop() {
return Some(item);
}
if self.0.closed.load(Ordering::Acquire) && self.0.queue.is_empty() {
return None;
}
let mut notified = std::pin::pin!(self.0.notify.notified());
notified.as_mut().enable();
if let Some(item) = self.0.queue.pop() {
return Some(item);
}
notified.await;
}
}
}
pub fn create_notify_queue<T: Send>() -> (NotifyQueueSender<T>, NotifyQueueReceiver<T>) {
let state = Arc::new(NotifyQueueState {
queue: SegQueue::new(),
notify: Notify::new(),
closed: AtomicBool::new(false),
});
(NotifyQueueSender(Arc::clone(&state)), NotifyQueueReceiver(state))
}
#[cfg(feature = "server")]
struct BoundedNotifyQueueState<T> {
queue: ArrayQueue<T>,
notify: Notify,
closed: AtomicBool,
}
#[cfg(feature = "server")]
pub struct BoundedNotifyQueueSender<T: Send>(Arc<BoundedNotifyQueueState<T>>);
#[cfg(feature = "server")]
pub struct BoundedNotifyQueueReceiver<T: Send>(Arc<BoundedNotifyQueueState<T>>);
#[cfg(feature = "server")]
impl<T: Send> BoundedNotifyQueueSender<T> {
pub fn push(&self, item: T) {
if self.0.queue.push(item).is_err() {
debug!("BoundedNotifyQueue: queue full, dropping item");
return;
}
self.0.notify.notify_one();
}
}
#[cfg(feature = "server")]
impl<T: Send> Drop for BoundedNotifyQueueSender<T> {
fn drop(&mut self) {
self.0.closed.store(true, Ordering::Release);
self.0.notify.notify_waiters();
}
}
#[cfg(feature = "server")]
impl<T: Send> BoundedNotifyQueueReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
loop {
if let Some(item) = self.0.queue.pop() {
return Some(item);
}
if self.0.closed.load(Ordering::Acquire) && self.0.queue.is_empty() {
return None;
}
let mut notified = std::pin::pin!(self.0.notify.notified());
notified.as_mut().enable();
if let Some(item) = self.0.queue.pop() {
return Some(item);
}
notified.await;
}
}
}
#[cfg(feature = "server")]
pub fn create_bounded_notify_queue<T: Send>(cap: usize) -> (BoundedNotifyQueueSender<T>, BoundedNotifyQueueReceiver<T>) {
let state = Arc::new(BoundedNotifyQueueState {
queue: ArrayQueue::new(cap),
notify: Notify::new(),
closed: AtomicBool::new(false),
});
(BoundedNotifyQueueSender(Arc::clone(&state)), BoundedNotifyQueueReceiver(state))
}
} else if #[cfg(feature = "async-std")] {
pub struct NotifyQueueSender<T: Send>(Sender<T>);
pub struct NotifyQueueReceiver<T: Send>(Receiver<T>);
impl<T: Send> NotifyQueueSender<T> {
pub fn push(&self, item: T) {
let _ = self.0.try_send(item);
}
}
impl<T: Send> NotifyQueueReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
self.0.recv().await.ok()
}
}
pub fn create_notify_queue<T: Send>() -> (NotifyQueueSender<T>, NotifyQueueReceiver<T>) {
let (tx, rx) = unbounded();
(NotifyQueueSender(tx), NotifyQueueReceiver(rx))
}
#[cfg(feature = "server")]
pub struct BoundedNotifyQueueSender<T: Send>(Sender<T>);
#[cfg(feature = "server")]
pub struct BoundedNotifyQueueReceiver<T: Send>(Receiver<T>);
#[cfg(feature = "server")]
impl<T: Send> BoundedNotifyQueueSender<T> {
pub fn push(&self, item: T) {
if self.0.try_send(item).is_err() {
debug!("BoundedNotifyQueue: queue full, dropping item");
}
}
}
#[cfg(feature = "server")]
impl<T: Send> BoundedNotifyQueueReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
self.0.recv().await.ok()
}
}
#[cfg(feature = "server")]
pub fn create_bounded_notify_queue<T: Send>(cap: usize) -> (BoundedNotifyQueueSender<T>, BoundedNotifyQueueReceiver<T>) {
let (tx, rx) = bounded(cap);
(BoundedNotifyQueueSender(tx), BoundedNotifyQueueReceiver(rx))
}
}
}
#[cfg(feature = "client")]
pub struct FuturePool<'f, T> {
tasks: FuturesUnordered<Pin<Box<dyn Future<Output = T> + Send + 'f>>>,
}
#[cfg(feature = "client")]
impl<'f, T> FuturePool<'f, T> {
pub fn new() -> Self {
Self {
tasks: FuturesUnordered::new(),
}
}
pub fn add<F: Future<Output = T> + Send + 'f>(&mut self, future: F) {
self.tasks.push(Box::pin(future));
}
pub async fn next(&mut self) -> Option<T> {
self.tasks.next().await
}
}
#[cfg(feature = "tokio")]
pub async fn sleep(duration: Duration) {
tokio_sleep(duration).await;
}
#[cfg(feature = "async-std")]
pub async fn sleep(duration: Duration) {
Timer::after(duration).await;
}