use crate::eval::Value;
use crate::diagnostics::{Error, Result};
use super::ConcurrencyError;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::{sleep, timeout};
use futures::future::{BoxFuture, FutureExt};
#[derive(Debug, Clone)]
pub struct Future {
inner: Arc<Mutex<FutureState>>,
}
enum FutureState {
Pending(BoxFuture<'static, Result<Value>>),
Resolved(Value),
Rejected(Box<Error>),
}
impl std::fmt::Debug for FutureState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FutureState::Pending(_) => f.debug_tuple("Pending").field(&"<future>").finish(),
FutureState::Resolved(value) => f.debug_tuple("Resolved").field(value).finish(),
FutureState::Rejected(error) => f.debug_tuple("Rejected").field(error).finish(),
}
}
}
impl Future {
pub fn new<F>(future: F) -> Self
where
F: std::future::Future<Output = Result<Value>> + Send + 'static,
{
Self {
inner: Arc::new(Mutex::new(FutureState::Pending(future.boxed()))),
}
}
pub fn resolved(value: Value) -> Self {
Self {
inner: Arc::new(Mutex::new(FutureState::Resolved(value))),
}
}
pub fn rejected(error: Error) -> Self {
Self {
inner: Arc::new(Mutex::new(FutureState::Rejected(Box::new(error)))),
}
}
pub fn from_promise(promise: Promise) -> Self {
promise.future
}
pub fn is_completed(&self) -> bool {
let state = self.inner.lock().unwrap();
matches!(*state, FutureState::Resolved(_) | FutureState::Rejected(_))
}
pub fn is_resolved(&self) -> bool {
let state = self.inner.lock().unwrap();
matches!(*state, FutureState::Resolved(_))
}
pub fn is_rejected(&self) -> bool {
let state = self.inner.lock().unwrap();
matches!(*state, FutureState::Rejected(_))
}
pub async fn await_result(&self) -> Result<Value> {
{
let mut state = self.inner.lock().unwrap();
match &mut *state {
FutureState::Resolved(value) => return Ok(value.clone()),
FutureState::Rejected(error) => return Err(error.clone()),
FutureState::Pending(_) => {}
}
}
let mut future_opt = None;
{
let mut state = self.inner.lock().unwrap();
if let FutureState::Pending(future) = &mut *state {
future_opt = Some(std::mem::replace(future, futures::future::pending().boxed()));
}
}
if let Some(future) = future_opt {
let result = future.await;
{
let mut state = self.inner.lock().unwrap();
*state = match &result {
Ok(value) => FutureState::Resolved(value.clone()),
Err(error) => FutureState::Rejected(error.clone()),
};
}
result
} else {
let state = self.inner.lock().unwrap();
match &*state {
FutureState::Resolved(value) => Ok(value.clone()),
FutureState::Rejected(error) => Err(error.clone()),
FutureState::Pending(_) => unreachable!(),
}
}
}
pub async fn await_timeout(&self, duration: Duration) -> Result<Value> {
match timeout(duration, self.await_result()).await {
Ok(result) => result,
Err(_) => Err(ConcurrencyError::Timeout.into()),
}
}
pub fn map<F>(self, f: F) -> Future
where
F: FnOnce(Value) -> Result<Value> + Send + 'static,
{
let future = async move {
let value = self.await_result().await?;
f(value)
};
Future::new(future)
}
pub fn flat_map<F>(self, f: F) -> Future
where
F: FnOnce(Value) -> Future + Send + 'static,
{
let future = async move {
let value = self.await_result().await?;
f(value).await_result().await
};
Future::new(future)
}
pub fn then<F>(self, f: F) -> Future
where
F: FnOnce(Result<Value>) -> Future + Send + 'static,
{
let future = async move {
let result = self.await_result().await;
f(result).await_result().await
};
Future::new(future)
}
pub fn catch<F>(self, f: F) -> Future
where
F: FnOnce(Error) -> Result<Value> + Send + 'static,
{
let future = async move {
match self.await_result().await {
Ok(value) => Ok(value),
Err(error) => f(*error),
}
};
Future::new(future)
}
}
#[derive(Debug)]
pub struct Promise {
sender: Option<tokio::sync::oneshot::Sender<Result<Value>>>,
future: Future,
}
impl Promise {
pub fn new() -> Self {
let (sender, receiver) = tokio::sync::oneshot::channel();
let future = Future::new(async move {
match receiver.await {
Ok(result) => result,
Err(_) => Err(ConcurrencyError::Cancelled.into()),
}
});
Self {
sender: Some(sender),
future,
}
}
pub fn future(&self) -> Future {
self.future.clone()
}
pub fn resolve(mut self, value: Value) -> Result<()> {
if let Some(sender) = self.sender.take() {
sender.send(Ok(value)).map_err(|_| ConcurrencyError::Cancelled.into())
} else {
Err(Error::runtime_error("Promise already completed".to_string(), None).into())
}
}
pub fn reject(mut self, error: Error) -> Result<()> {
if let Some(sender) = self.sender.take() {
sender.send(Err(error.into())).map_err(|_| ConcurrencyError::Cancelled.into())
} else {
Err(Error::runtime_error("Promise already completed".to_string(), None).into())
}
}
pub fn is_pending(&self) -> bool {
self.sender.is_some()
}
}
impl Default for Promise {
fn default() -> Self {
Self::new()
}
}
pub struct FutureOps;
impl FutureOps {
pub fn delay(duration: Duration) -> Future {
Future::new(async move {
sleep(duration).await;
Ok(Value::Unspecified)
})
}
pub fn delay_value(duration: Duration, value: Value) -> Future {
Future::new(async move {
sleep(duration).await;
Ok(value)
})
}
pub fn race(futures: Vec<Future>) -> Future {
if futures.is_empty() {
return Future::rejected(Error::runtime_error("No futures to race".to_string(), None));
}
Future::new(async move {
let futures: Vec<_> = futures.into_iter()
.map(|f| Box::pin(async move { f.await_result().await }))
.collect();
futures::future::select_all(futures).await.0
})
}
pub fn all(futures: Vec<Future>) -> Future {
Future::new(async move {
let mut results = Vec::new();
for future in futures {
results.push(future.await_result().await?);
}
let mut list = Value::Nil;
for value in results.into_iter().rev() {
list = Value::pair(value, list);
}
Ok(list)
})
}
pub fn all_settled(futures: Vec<Future>) -> Future {
Future::new(async move {
let mut results = Vec::new();
for future in futures {
match future.await_result().await {
Ok(value) => {
let result = vec![
Value::symbol_from_str("fulfilled"),
value,
];
results.push(Value::from_vec(result));
}
Err(error) => {
let result = vec![
Value::symbol_from_str("rejected"),
Value::string(error.to_string()),
];
results.push(Value::from_vec(result));
}
}
}
let mut list = Value::Nil;
for value in results.into_iter().rev() {
list = Value::pair(value, list);
}
Ok(list)
})
}
pub fn retry<F, Fut>(f: F, max_attempts: usize, initial_delay: Duration) -> Future
where
F: Fn() -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
{
Future::new(async move {
let mut attempt = 0;
let mut delay = initial_delay;
loop {
attempt += 1;
match f().await {
Ok(value) => return Ok(value),
Err(error) => {
if attempt >= max_attempts {
return Err(error);
}
sleep(delay).await;
delay *= 2; }
}
}
})
}
}
pub trait ValueFutureExt {
fn to_future(self) -> Future;
}
impl ValueFutureExt for Value {
fn to_future(self) -> Future {
Future::resolved(self)
}
}
impl ValueFutureExt for Result<Value> {
fn to_future(self) -> Future {
match self {
Ok(value) => Future::resolved(value),
Err(error) => Future::rejected(*error),
}
}
}
pub trait IntoFuture<T> {
fn into_future(self) -> Future;
}
impl<F, Fut> IntoFuture<F> for F
where
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<Value>> + Send + 'static,
{
fn into_future(self) -> Future {
Future::new(self())
}
}