use std::fmt::Debug;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use crate::error::IonError;
use crate::value::Value;
pub trait TaskHandle: Send + Sync + Debug {
fn join(&self) -> Result<Value, IonError>;
fn join_timeout(&self, timeout: Duration) -> Option<Result<Value, IonError>>;
fn is_finished(&self) -> bool;
fn cancel(&self);
fn is_cancelled(&self) -> bool;
fn subscribe(&self, sub: Subscriber);
}
#[derive(Debug)]
pub struct Subscriber {
pub rendezvous: Arc<(Mutex<Option<usize>>, Condvar)>,
pub my_index: usize,
}
pub trait ChannelSender: Send + Sync + Debug {
fn send(&self, val: Value) -> Result<(), IonError>;
fn close(&self);
}
pub trait ChannelReceiver: Send + Sync + Debug {
fn recv(&self) -> Option<Value>;
fn try_recv(&self) -> Option<Value>;
fn recv_timeout(&self, timeout: Duration) -> Option<Value>;
}
#[derive(Debug, Clone)]
pub enum ChannelEnd {
Sender(Arc<dyn ChannelSender>),
Receiver(Arc<dyn ChannelReceiver>),
}
#[derive(Debug)]
pub struct Nursery {
tasks: Vec<Arc<dyn TaskHandle>>,
}
impl Default for Nursery {
fn default() -> Self {
Self::new()
}
}
impl Nursery {
pub fn new() -> Self {
Self { tasks: Vec::new() }
}
pub fn spawn(&mut self, handle: Arc<dyn TaskHandle>) {
self.tasks.push(handle);
}
pub fn join_all(&self) -> Result<Vec<Value>, IonError> {
let mut results = Vec::new();
for task in &self.tasks {
results.push(task.join()?);
}
Ok(results)
}
}
use std::sync::atomic::AtomicBool;
pub fn spawn_task<F>(f: F) -> Arc<dyn TaskHandle>
where
F: FnOnce() -> Result<Value, IonError> + Send + 'static,
{
let flag = Arc::new(AtomicBool::new(false));
spawn_task_with_cancel(flag, move |_| f())
}
pub fn spawn_task_with_cancel<F>(cancel: Arc<AtomicBool>, f: F) -> Arc<dyn TaskHandle>
where
F: FnOnce(Arc<AtomicBool>) -> Result<Value, IonError> + Send + 'static,
{
crate::async_rt_std::spawn_task_with_cancel(cancel, f)
}
pub fn sleep(duration: Duration) {
std::thread::sleep(duration);
}
pub fn wait_any(tasks: &[Arc<dyn TaskHandle>]) -> (usize, Result<Value, IonError>) {
let rendezvous: Arc<(Mutex<Option<usize>>, Condvar)> =
Arc::new((Mutex::new(None), Condvar::new()));
for (i, task) in tasks.iter().enumerate() {
task.subscribe(Subscriber {
rendezvous: rendezvous.clone(),
my_index: i,
});
}
let (mtx, cv) = &*rendezvous;
let mut guard = mtx.lock().unwrap();
while guard.is_none() {
guard = cv.wait(guard).unwrap();
}
let winner = guard.expect("rendezvous set but reads as None");
drop(guard);
(winner, tasks[winner].join())
}
pub fn create_channel(buffer: usize) -> (Value, Value) {
crate::async_rt_std::create_channel(buffer)
}