use std::future::IntoFuture;
use std::pin::Pin;
use crate::error::{Error, Result};
#[cfg(test)]
use crate::sequencing::creation_channel;
use crate::sequencing::{await_created_id, CreationState};
struct Handle<T> {
id: String,
created: tokio::sync::watch::Receiver<CreationState>,
receiver: tokio::sync::oneshot::Receiver<Result<T>>,
}
impl<T> Handle<T> {
async fn id(&self) -> Result<String> {
await_created_id(&self.id, &self.created).await
}
async fn recv(self) -> Result<T> {
self.receiver
.await
.map_err(|_| Error::JoinError(format!("task {} was dropped", self.id)))?
}
}
pub struct DurableFuture<T>(Handle<T>);
impl<T> DurableFuture<T> {
pub(crate) fn pending(
id: String,
receiver: tokio::sync::oneshot::Receiver<Result<T>>,
created: tokio::sync::watch::Receiver<CreationState>,
) -> Self {
Self(Handle {
id,
created,
receiver,
})
}
pub async fn id(&self) -> Result<String> {
self.0.id().await
}
}
impl<T: Send + 'static> IntoFuture for DurableFuture<T> {
type Output = Result<T>;
type IntoFuture = Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
tracing::info!(
target: "resonate::validation",
promise_id = %self.0.id,
"promise_execution_await"
);
self.0.recv().await
})
}
}
pub struct RemoteFuture<T>(Handle<T>);
impl<T> RemoteFuture<T> {
pub(crate) fn pending(
id: String,
receiver: tokio::sync::oneshot::Receiver<Result<T>>,
created: tokio::sync::watch::Receiver<CreationState>,
) -> Self {
Self(Handle {
id,
created,
receiver,
})
}
pub async fn id(&self) -> Result<String> {
self.0.id().await
}
}
impl<T: Send + 'static> IntoFuture for RemoteFuture<T> {
type Output = Result<T>;
type IntoFuture = Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { self.0.recv().await })
}
}
pub struct DetachedHandle {
id: String,
created: tokio::sync::watch::Receiver<CreationState>,
}
impl DetachedHandle {
pub(crate) fn pending(
id: String,
created: tokio::sync::watch::Receiver<CreationState>,
) -> Self {
Self { id, created }
}
pub async fn id(&self) -> Result<String> {
await_created_id(&self.id, &self.created).await
}
}
#[cfg(test)]
mod tests {
use super::*;
fn settled_creation() -> tokio::sync::watch::Receiver<CreationState> {
let (tx, rx) = creation_channel();
let _ = tx.send(CreationState::Created);
rx
}
#[tokio::test]
async fn durable_future_pending_resolves_via_await() {
let (tx, rx) = tokio::sync::oneshot::channel();
let future: DurableFuture<String> =
DurableFuture::pending("test-id".into(), rx, settled_creation());
tx.send(Ok("hello".to_string())).unwrap();
let result: String = future.await.unwrap();
assert_eq!(result, "hello");
}
#[tokio::test]
async fn durable_future_pending_error_via_await() {
let (tx, rx) = tokio::sync::oneshot::channel();
let future: DurableFuture<i32> =
DurableFuture::pending("test-id".into(), rx, settled_creation());
tx.send(Err(Error::Application {
message: "task failed".into(),
}))
.unwrap();
let err = future.await.unwrap_err();
assert!(matches!(err, Error::Application { .. }));
}
#[tokio::test]
async fn durable_future_pending_suspended_via_await() {
let (tx, rx) = tokio::sync::oneshot::channel();
let future: DurableFuture<i32> =
DurableFuture::pending("test-id".into(), rx, settled_creation());
tx.send(Err(Error::Suspended)).unwrap();
let err = future.await.unwrap_err();
assert!(matches!(err, Error::Suspended));
}
#[tokio::test]
async fn remote_future_completed_via_await() {
let (tx, rx) = tokio::sync::oneshot::channel();
let future: RemoteFuture<String> =
RemoteFuture::pending("test-id".into(), rx, settled_creation());
tx.send(Ok("remote-value".to_string())).unwrap();
let result: String = future.await.unwrap();
assert_eq!(result, "remote-value");
}
#[tokio::test]
async fn remote_future_failed_via_await() {
let (tx, rx) = tokio::sync::oneshot::channel();
let future: RemoteFuture<i32> =
RemoteFuture::pending("test-id".into(), rx, settled_creation());
tx.send(Err(Error::Application {
message: "remote error".into(),
}))
.unwrap();
let err = future.await.unwrap_err();
assert!(matches!(err, Error::Application { .. }));
}
#[tokio::test]
async fn remote_future_pending_returns_suspended_via_await() {
let (tx, rx) = tokio::sync::oneshot::channel();
let future: RemoteFuture<i32> =
RemoteFuture::pending("test-id".into(), rx, settled_creation());
tx.send(Err(Error::Suspended)).unwrap();
let err = future.await.unwrap_err();
assert!(matches!(err, Error::Suspended));
}
#[tokio::test]
async fn id_returns_after_creation_succeeds() {
let (created_tx, created_rx) = creation_channel();
let (_result_tx, result_rx) = tokio::sync::oneshot::channel::<Result<i32>>();
let future: RemoteFuture<i32> = RemoteFuture::pending("p.1".into(), result_rx, created_rx);
created_tx.send(CreationState::Created).unwrap();
assert_eq!(future.id().await.unwrap(), "p.1");
assert_eq!(future.id().await.unwrap(), "p.1");
}
#[tokio::test]
async fn id_fails_when_creation_failed() {
let (created_tx, created_rx) = creation_channel();
let (_result_tx, result_rx) = tokio::sync::oneshot::channel::<Result<i32>>();
let future: RemoteFuture<i32> = RemoteFuture::pending("p.1".into(), result_rx, created_rx);
created_tx
.send(CreationState::Failed("boom".into()))
.unwrap();
let err = future.id().await.unwrap_err();
assert!(matches!(err, Error::PromiseCreation(_)), "got: {err}");
}
}