use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use std::future::{Future, IntoFuture};
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::codec::deserialize_error;
use crate::durable::{Durable, ExecutionEnv};
use crate::effects::Effects;
use crate::error::{Error, Result};
use crate::futures::{DetachedHandle, DurableFuture, RemoteFuture};
use crate::info::Info;
use crate::sequencing::{CreationSequencer, CreationState};
use crate::types::{
DurableKind, Outcome, PromiseCreateReq, PromiseRecord, PromiseState, TaskData, Value,
};
pub type TargetResolver = Arc<dyn Fn(Option<&str>) -> String + Send + Sync>;
type TaskFuture<'ctx, T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'ctx>>;
pub(crate) struct SpawnedHandle {
pub id: String,
pub handle: tokio::task::JoinHandle<Outcome<()>>,
}
impl Drop for SpawnedHandle {
fn drop(&mut self) {
self.handle.abort();
}
}
pub struct Context {
id: String,
origin_id: String,
branch_id: String,
parent_id: String,
func_name: String,
timeout_at: i64,
seq: AtomicU32,
effects: Arc<Effects>,
target_resolver: TargetResolver,
remote_todos: Arc<Mutex<Vec<String>>>,
spawned_handles: Arc<Mutex<Vec<SpawnedHandle>>>,
sequencer: CreationSequencer,
deps: Arc<crate::DependencyMap>,
}
impl Context {
pub(crate) fn root(
id: String,
timeout_at: i64,
func_name: String,
effects: Effects,
target_resolver: TargetResolver,
deps: Arc<crate::DependencyMap>,
) -> Self {
Self {
origin_id: id.clone(),
branch_id: id.clone(),
parent_id: String::new(),
id,
func_name,
timeout_at,
seq: AtomicU32::new(0),
effects: Arc::new(effects),
target_resolver,
remote_todos: Arc::new(Mutex::new(Vec::new())),
spawned_handles: Arc::new(Mutex::new(Vec::new())),
sequencer: CreationSequencer::new(),
deps,
}
}
fn child(&self, id: &str, func_name: &str, timeout_at: i64) -> Context {
self.child_seed().context(id, func_name, timeout_at)
}
fn child_info(&self, id: &str, func_name: &str, timeout_at: i64) -> Info {
self.child_seed().info(id, func_name, timeout_at)
}
pub fn get_dependency<T: Send + Sync + 'static>(&self) -> Arc<T> {
self.deps.get::<T>()
}
fn next_id(&self) -> String {
let seq = self.seq.fetch_add(1, Ordering::Relaxed);
format!("{}.{}", self.id, seq)
}
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(86_400);
fn child_timeout(&self, requested: Option<Duration>) -> i64 {
let timeout = requested.unwrap_or(Self::DEFAULT_TIMEOUT);
let now = now_ms();
let child_deadline = now.saturating_add(timeout.as_millis() as i64);
std::cmp::min(child_deadline, self.timeout_at)
}
pub fn info(&self) -> Info {
Info::new(
self.id.clone(),
self.parent_id.clone(),
self.origin_id.clone(),
self.branch_id.clone(),
self.timeout_at,
self.func_name.clone(),
HashMap::new(),
self.deps.clone(),
)
}
pub fn id(&self) -> &str {
&self.id
}
pub fn parent_id(&self) -> &str {
&self.parent_id
}
pub fn origin_id(&self) -> &str {
&self.origin_id
}
pub fn timeout_at(&self) -> i64 {
self.timeout_at
}
pub fn func_name(&self) -> &str {
&self.func_name
}
fn child_tags(&self, scope: &str, branch: &str) -> HashMap<String, String> {
HashMap::from([
("resonate:scope".to_string(), scope.to_string()),
("resonate:branch".to_string(), branch.to_string()),
("resonate:parent".to_string(), self.id.clone()),
("resonate:origin".to_string(), self.origin_id.clone()),
])
}
fn local_create_req(
&self,
id: &str,
args: &impl Serialize,
timeout: Option<Duration>,
) -> Result<PromiseCreateReq> {
let tags = self.child_tags("local", &self.branch_id);
Ok(PromiseCreateReq {
id: id.to_string(),
timeout_at: self.child_timeout(timeout),
param: Value::from_serializable(args)?,
tags,
})
}
fn remote_create_req(
&self,
id: &str,
func_name: &str,
args: &impl Serialize,
timeout: Option<Duration>,
target_override: Option<&str>,
) -> Result<PromiseCreateReq> {
let target = (self.target_resolver)(target_override);
let mut tags = self.child_tags("global", id);
tags.insert("resonate:target".to_string(), target);
Ok(PromiseCreateReq {
id: id.to_string(),
timeout_at: self.child_timeout(timeout),
param: TaskData::into_value(func_name, args)?,
tags,
})
}
pub fn run<D, Args, T>(&self, func: D, args: Args) -> RunTask<'_, D, Args, T>
where
D: Durable<Args, T>,
Args: Serialize,
{
let child_id = self.next_id();
RunTask {
child_id,
ctx: self,
func,
args,
timeout_override: None,
_phantom: PhantomData,
}
}
fn promise_create_req(&self, id: &str, timeout: Option<Duration>) -> PromiseCreateReq {
PromiseCreateReq {
id: id.to_string(),
timeout_at: self.child_timeout(timeout),
param: Value::default(),
tags: self.child_tags("global", id),
}
}
fn sleep_create_req(&self, id: &str, duration: Duration) -> PromiseCreateReq {
let mut tags = self.child_tags("global", id);
tags.insert("resonate:timer".to_string(), "true".to_string());
PromiseCreateReq {
id: id.to_string(),
timeout_at: self.child_timeout(Some(duration)),
param: Value::default(),
tags,
}
}
pub fn sleep(&self, duration: Duration) -> SleepTask<'_> {
let child_id = self.next_id();
let req = self.sleep_create_req(&child_id, duration);
SleepTask {
child_id,
ctx: self,
req,
}
}
pub fn promise<T>(&self) -> PromiseTask<'_, T> {
let child_id = self.next_id();
let req = self.promise_create_req(&child_id, None);
PromiseTask {
child_id,
ctx: self,
req,
_phantom: PhantomData,
}
}
pub fn rpc<T>(&self, func: &str, args: impl Serialize) -> RpcTask<'_, T> {
let child_id = self.next_id();
let (req, serialization_error) =
match self.remote_create_req(&child_id, func, &args, None, None) {
Ok(req) => (req, None),
Err(e) => (
PromiseCreateReq::default_with_id(&child_id),
Some(e.to_string()),
),
};
RpcTask {
child_id,
ctx: self,
req,
serialization_error,
_phantom: PhantomData,
}
}
pub fn detached(&self, func: &str, args: impl Serialize) -> DetachedTask<'_> {
let raw = self.next_id();
let child_id = format!("{}.{}", self.origin_id, hash_id(&raw));
let (req, serialization_error) =
match self.remote_create_req(&child_id, func, &args, None, None) {
Ok(req) => (req, None),
Err(e) => (
PromiseCreateReq::default_with_id(&child_id),
Some(e.to_string()),
),
};
DetachedTask {
child_id,
ctx: self,
req,
serialization_error,
}
}
pub(crate) fn take_remote_todos(&self) -> Vec<String> {
let mut todos = self.remote_todos.lock();
std::mem::take(&mut *todos)
}
fn check_serialization_error(err: &Option<String>) -> Result<()> {
if let Some(ref e) = err {
return Err(Error::EncodingError(format!(
"failed to serialize args: {}",
e
)));
}
Ok(())
}
pub(crate) async fn flush_local_work(&self) -> Result<Vec<String>> {
let handles = {
let mut handles = self.spawned_handles.lock();
std::mem::take(&mut *handles)
};
let mut remote_todos = Vec::new();
let mut first_err: Option<Error> = None;
for mut task in handles {
match (&mut task.handle).await {
Ok(Outcome::Done(Ok(_))) => {
}
Ok(Outcome::Done(Err(e))) => {
tracing::error!(task_id = %task.id, error = %e, "spawned task failed");
if first_err.is_none() {
first_err = Some(e);
}
}
Ok(Outcome::Suspended {
remote_todos: child_remote,
}) => {
remote_todos.extend(child_remote);
}
Err(e) => {
tracing::error!(task_id = %task.id, error = %e, "spawned task panicked");
}
}
}
match first_err {
Some(e) => Err(e),
None => Ok(remote_todos),
}
}
pub(crate) async fn collect_remote_todos(&self) -> Result<Vec<String>> {
let spawned_todos = self.flush_local_work().await;
let mut todos = self.take_remote_todos();
todos.extend(spawned_todos?);
Ok(todos)
}
fn child_seed(&self) -> ChildSeed {
ChildSeed {
parent_id: self.id.clone(),
origin_id: self.origin_id.clone(),
effects: Arc::clone(&self.effects),
target_resolver: self.target_resolver.clone(),
deps: Arc::clone(&self.deps),
}
}
}
struct ChildSeed {
parent_id: String,
origin_id: String,
effects: Arc<Effects>,
target_resolver: TargetResolver,
deps: Arc<crate::DependencyMap>,
}
impl ChildSeed {
fn context(&self, id: &str, func_name: &str, timeout_at: i64) -> Context {
Context {
id: id.to_string(),
origin_id: self.origin_id.clone(),
branch_id: id.to_string(),
parent_id: self.parent_id.clone(),
func_name: func_name.to_string(),
timeout_at,
seq: AtomicU32::new(0),
effects: Arc::clone(&self.effects),
target_resolver: self.target_resolver.clone(),
remote_todos: Arc::new(Mutex::new(Vec::new())),
spawned_handles: Arc::new(Mutex::new(Vec::new())),
sequencer: CreationSequencer::new(),
deps: Arc::clone(&self.deps),
}
}
fn info(&self, id: &str, func_name: &str, timeout_at: i64) -> Info {
Info::new(
id.to_string(),
self.parent_id.clone(),
self.origin_id.clone(),
id.to_string(),
timeout_at,
func_name.to_string(),
HashMap::new(),
self.deps.clone(),
)
}
}
fn duplicate_error(e: &Error) -> Error {
Error::Application {
message: e.to_string(),
}
}
impl PromiseRecord {
fn as_result<T: DeserializeOwned>(&self) -> Option<Result<T>> {
match self.state {
PromiseState::Resolved => Some(self.value.decode::<T>()),
PromiseState::Rejected
| PromiseState::RejectedCanceled
| PromiseState::RejectedTimedout => {
Some(Err(deserialize_error(self.value.data_or_null())))
}
PromiseState::Pending => None,
}
}
}
pub struct RunTask<'ctx, D, Args, T> {
child_id: String,
ctx: &'ctx Context,
func: D,
args: Args,
timeout_override: Option<Duration>,
_phantom: PhantomData<fn() -> T>,
}
impl<'ctx, D, Args, T> RunTask<'ctx, D, Args, T>
where
Args: Serialize,
{
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout_override = Some(timeout);
self
}
pub fn spawn(self) -> Result<DurableFuture<T>>
where
D: Durable<Args, T> + Send + 'static,
Args: Serialize + DeserializeOwned + Send + 'static,
T: Serialize + DeserializeOwned + Send + Sync + 'static,
{
let RunTask {
child_id,
ctx,
func,
args,
timeout_override,
..
} = self;
let req = ctx.local_create_req(&child_id, &args, timeout_override)?;
let effects = Arc::clone(&ctx.effects);
let seed = ctx.child_seed();
let task_id = child_id.clone();
tracing::info!(
target: "resonate::validation",
promise_id = %child_id,
"promise_execution_spawn"
);
let (rx, created_rx) = spawn_sequenced(ctx, child_id.clone(), req, move |record, tx| {
async move {
match record.state {
PromiseState::Resolved => {
return match record.value.into_decoded::<T>() {
Ok(val) => {
let _ = tx.send(Ok(val));
Outcome::Done(Ok(()))
}
Err(e) => {
let _ = tx.send(Err(duplicate_error(&e)));
Outcome::Done(Err(e))
}
};
}
PromiseState::Rejected
| PromiseState::RejectedCanceled
| PromiseState::RejectedTimedout => {
let _ = tx.send(Err(deserialize_error(record.value.into_data_or_null())));
return Outcome::Done(Ok(()));
}
PromiseState::Pending => {}
}
let info = seed.info(&task_id, D::NAME, record.timeout_at);
let child_ctx = seed.context(&task_id, D::NAME, record.timeout_at);
let env = match D::KIND {
DurableKind::Function => ExecutionEnv::Function(&info),
DurableKind::Workflow => ExecutionEnv::Workflow(&child_ctx),
};
let result = func.execute(env, args).await;
let mut child_remote = Vec::new();
if D::KIND == DurableKind::Workflow {
match child_ctx.collect_remote_todos().await {
Ok(todos) => child_remote = todos,
Err(e) => {
let _ = tx.send(Err(duplicate_error(&e)));
return Outcome::Done(Err(e));
}
}
}
if matches!(&result, Err(Error::Suspended)) {
debug_assert!(
!child_remote.is_empty(),
"Suspended error but no remote todos — this is a bug"
);
let _ = tx.send(Err(Error::Suspended));
return Outcome::Suspended {
remote_todos: child_remote,
};
}
if child_remote.is_empty() {
let _ = effects.settle_promise(&task_id, &result).await;
let _ = tx.send(result);
Outcome::Done(Ok(()))
} else {
let _ = tx.send(Err(Error::Suspended));
Outcome::Suspended {
remote_todos: child_remote,
}
}
}
});
Ok(DurableFuture::pending(child_id, rx, created_rx))
}
}
impl<'ctx, D, Args, T> IntoFuture for RunTask<'ctx, D, Args, T>
where
D: Durable<Args, T>,
Args: Serialize + DeserializeOwned + Send + 'static,
T: Serialize + DeserializeOwned + Send + Sync + 'static,
{
type Output = Result<T>;
type IntoFuture = TaskFuture<'ctx, T>;
fn into_future(self) -> Self::IntoFuture {
let RunTask {
child_id,
ctx,
func,
args,
timeout_override,
..
} = self;
let req = match ctx.local_create_req(&child_id, &args, timeout_override) {
Ok(req) => req,
Err(e) => return Box::pin(async move { Err(e) }),
};
let slot = ctx.sequencer.claim_slot();
Box::pin(async move {
let promise_record = slot.create(&ctx.effects, req).await?;
if let Some(result) = promise_record.as_result::<T>() {
return result;
}
tracing::info!(
target: "resonate::validation",
promise_id = %child_id,
"promise_execution_spawn"
);
let timeout_at = promise_record.timeout_at;
let info;
let child_ctx;
let env = match D::KIND {
DurableKind::Function => {
info = ctx.child_info(&child_id, D::NAME, timeout_at);
ExecutionEnv::Function(&info)
}
DurableKind::Workflow => {
child_ctx = ctx.child(&child_id, D::NAME, timeout_at);
ExecutionEnv::Workflow(&child_ctx)
}
};
tracing::info!(
target: "resonate::validation",
promise_id = %child_id,
"promise_execution_await"
);
let result = func.execute(env, args).await;
let child_remotes = env.collect_remote_todos().await?;
if child_remotes.is_empty() {
debug_assert!(!matches!(&result, Err(Error::Suspended)));
ctx.effects.settle_promise(&child_id, &result).await?;
} else {
ctx.remote_todos.lock().extend(child_remotes);
}
result
})
}
}
pub struct RpcTask<'ctx, T> {
child_id: String,
ctx: &'ctx Context,
req: PromiseCreateReq,
serialization_error: Option<String>,
_phantom: PhantomData<T>,
}
impl<'ctx, T> RpcTask<'ctx, T> {
pub fn timeout(mut self, timeout: Duration) -> Self {
self.req.timeout_at = self.ctx.child_timeout(Some(timeout));
self
}
pub fn target(mut self, target: &str) -> Self {
let resolved = (self.ctx.target_resolver)(Some(target));
self.req
.tags
.insert("resonate:target".to_string(), resolved);
self
}
pub fn spawn(self) -> Result<RemoteFuture<T>>
where
T: DeserializeOwned + Send + 'static,
{
Context::check_serialization_error(&self.serialization_error)?;
let RpcTask {
child_id, ctx, req, ..
} = self;
Ok(spawn_remote(ctx, child_id, req))
}
}
impl<'ctx, T> IntoFuture for RpcTask<'ctx, T>
where
T: DeserializeOwned + Send + 'static,
{
type Output = Result<T>;
type IntoFuture = TaskFuture<'ctx, T>;
fn into_future(self) -> Self::IntoFuture {
let RpcTask {
child_id,
ctx,
req,
serialization_error,
..
} = self;
if let Err(e) = Context::check_serialization_error(&serialization_error) {
return Box::pin(async move { Err(e) });
}
block_on_remote(ctx, child_id, req)
}
}
fn spawn_sequenced<T, F, Fut>(
ctx: &Context,
child_id: String,
req: PromiseCreateReq,
on_created: F,
) -> (
tokio::sync::oneshot::Receiver<Result<T>>,
tokio::sync::watch::Receiver<CreationState>,
)
where
T: Send + 'static,
F: FnOnce(PromiseRecord, tokio::sync::oneshot::Sender<Result<T>>) -> Fut + Send + 'static,
Fut: Future<Output = Outcome<()>> + Send,
{
let slot = ctx.sequencer.claim_slot();
let created_rx = slot.subscribe();
let (tx, rx) = tokio::sync::oneshot::channel();
let effects = Arc::clone(&ctx.effects);
let handle = tokio::spawn(async move {
match slot.create(&effects, req).await {
Ok(record) => on_created(record, tx).await,
Err(e) => {
let _ = tx.send(Err(duplicate_error(&e)));
Outcome::Done(Err(e))
}
}
});
ctx.spawned_handles.lock().push(SpawnedHandle {
id: child_id,
handle,
});
(rx, created_rx)
}
fn spawn_remote<T>(ctx: &Context, child_id: String, req: PromiseCreateReq) -> RemoteFuture<T>
where
T: DeserializeOwned + Send + 'static,
{
let task_id = child_id.clone();
let (rx, created_rx) = spawn_sequenced(ctx, child_id.clone(), req, move |record, tx| {
async move {
match record.as_result::<T>() {
Some(result) => {
let _ = tx.send(result);
Outcome::Done(Ok(()))
}
None => {
tracing::info!(
target: "resonate::validation",
promise_id = %task_id,
"promise_execution_block"
);
let _ = tx.send(Err(Error::Suspended));
Outcome::Suspended {
remote_todos: vec![task_id],
}
}
}
}
});
RemoteFuture::pending(child_id, rx, created_rx)
}
fn block_on_remote<'ctx, T>(
ctx: &'ctx Context,
child_id: String,
req: PromiseCreateReq,
) -> TaskFuture<'ctx, T>
where
T: DeserializeOwned + Send + 'ctx,
{
let slot = ctx.sequencer.claim_slot();
Box::pin(async move {
let record = slot.create(&ctx.effects, req).await?;
if let Some(result) = record.as_result::<T>() {
return result;
}
tracing::info!(
target: "resonate::validation",
promise_id = %child_id,
"promise_execution_block"
);
ctx.remote_todos.lock().push(child_id);
Err(Error::Suspended)
})
}
pub struct PromiseTask<'ctx, T> {
child_id: String,
ctx: &'ctx Context,
req: PromiseCreateReq,
_phantom: PhantomData<T>,
}
impl<'ctx, T> PromiseTask<'ctx, T> {
pub fn timeout(mut self, timeout: Duration) -> Self {
self.req.timeout_at = self.ctx.child_timeout(Some(timeout));
self
}
pub fn data(mut self, data: &impl Serialize) -> Result<Self> {
self.req.param = Value::from_serializable(data)?;
Ok(self)
}
pub fn create(self) -> Result<RemoteFuture<T>>
where
T: DeserializeOwned + Send + 'static,
{
let PromiseTask {
child_id, ctx, req, ..
} = self;
Ok(spawn_remote(ctx, child_id, req))
}
}
impl<'ctx, T> IntoFuture for PromiseTask<'ctx, T>
where
T: DeserializeOwned + Send + 'static,
{
type Output = Result<T>;
type IntoFuture = TaskFuture<'ctx, T>;
fn into_future(self) -> Self::IntoFuture {
let PromiseTask {
child_id, ctx, req, ..
} = self;
block_on_remote(ctx, child_id, req)
}
}
pub struct SleepTask<'ctx> {
child_id: String,
ctx: &'ctx Context,
req: PromiseCreateReq,
}
impl<'ctx> SleepTask<'ctx> {
pub fn spawn(self) -> Result<RemoteFuture<()>> {
let SleepTask {
child_id, ctx, req, ..
} = self;
Ok(spawn_remote(ctx, child_id, req))
}
}
impl<'ctx> IntoFuture for SleepTask<'ctx> {
type Output = Result<()>;
type IntoFuture = TaskFuture<'ctx, ()>;
fn into_future(self) -> Self::IntoFuture {
let SleepTask {
child_id, ctx, req, ..
} = self;
block_on_remote(ctx, child_id, req)
}
}
use crate::now_ms;
fn hash_id(s: &str) -> String {
use std::hash::Hasher;
let mut h = seahash::SeaHasher::new();
h.write(s.as_bytes());
format!("{:016x}", h.finish())
}
pub struct DetachedTask<'ctx> {
child_id: String,
ctx: &'ctx Context,
req: PromiseCreateReq,
serialization_error: Option<String>,
}
impl<'ctx> DetachedTask<'ctx> {
pub fn timeout(mut self, timeout: Duration) -> Self {
self.req.timeout_at = self.ctx.child_timeout(Some(timeout));
self
}
pub fn target(mut self, target: &str) -> Self {
let resolved = (self.ctx.target_resolver)(Some(target));
self.req
.tags
.insert("resonate:target".to_string(), resolved);
self
}
pub fn spawn(self) -> Result<DetachedHandle> {
Context::check_serialization_error(&self.serialization_error)?;
let DetachedTask {
child_id, ctx, req, ..
} = self;
let task_id = child_id.clone();
let (_rx, created_rx) = spawn_sequenced::<(), _, _>(
ctx,
child_id.clone(),
req,
move |_record, _tx| async move {
tracing::info!(
target: "resonate::validation",
promise_id = %task_id,
"promise_detached_spawn"
);
Outcome::Done(Ok(()))
},
);
Ok(DetachedHandle::pending(child_id, created_rx))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::durable::{Durable, ExecutionEnv};
#[allow(unused_imports)]
use crate::futures::RemoteFuture;
use crate::test_utils::*;
use crate::types::{DurableKind, Outcome};
struct Bar;
impl Durable<(), i32> for Bar {
const NAME: &'static str = "bar";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
Ok(42)
}
}
struct Baz;
impl Durable<(), i32> for Baz {
const NAME: &'static str = "baz";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
Ok(31416)
}
}
struct Add;
impl Durable<(i32, i32), i32> for Add {
const NAME: &'static str = "add";
const KIND: DurableKind = DurableKind::Function;
async fn execute(
&self,
_env: ExecutionEnv<'_>,
args: (i32, i32),
) -> crate::error::Result<i32> {
Ok(args.0 + args.1)
}
}
struct Double;
impl Durable<i32, i32> for Double {
const NAME: &'static str = "double";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, args: i32) -> crate::error::Result<i32> {
Ok(args * 2)
}
}
struct Square;
impl Durable<i32, i32> for Square {
const NAME: &'static str = "square";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, args: i32) -> crate::error::Result<i32> {
Ok(args * args)
}
}
struct Multiply;
impl Durable<(i32, i32), i32> for Multiply {
const NAME: &'static str = "multiply";
const KIND: DurableKind = DurableKind::Function;
async fn execute(
&self,
_env: ExecutionEnv<'_>,
args: (i32, i32),
) -> crate::error::Result<i32> {
Ok(args.0 * args.1)
}
}
struct Failing;
impl Durable<(), i32> for Failing {
const NAME: &'static str = "failing";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
Err(Error::Application {
message: "boom".to_string(),
})
}
}
struct Noop;
impl Durable<(), ()> for Noop {
const NAME: &'static str = "noop";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<()> {
Ok(())
}
}
struct Concat;
impl Durable<(String, String, String), String> for Concat {
const NAME: &'static str = "concat";
const KIND: DurableKind = DurableKind::Function;
async fn execute(
&self,
_env: ExecutionEnv<'_>,
args: (String, String, String),
) -> crate::error::Result<String> {
Ok(format!("{}-{}-{}", args.0, args.1, args.2))
}
}
struct Slow;
impl Durable<(), i32> for Slow {
const NAME: &'static str = "slow";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
Ok(1)
}
}
struct Fast;
impl Durable<(), i32> for Fast {
const NAME: &'static str = "fast";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
Ok(2)
}
}
use std::sync::atomic::AtomicI32;
static CALL_COUNT: AtomicI32 = AtomicI32::new(0);
struct Counter;
impl Durable<(), i32> for Counter {
const NAME: &'static str = "counter";
const KIND: DurableKind = DurableKind::Function;
async fn execute(&self, _env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
let val = CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(val + 1)
}
}
struct ChildWorkflow;
impl Durable<(), i32> for ChildWorkflow {
const NAME: &'static str = "child";
const KIND: DurableKind = DurableKind::Workflow;
async fn execute(&self, env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
let ctx = env.into_context();
let v: i32 = ctx.rpc("remoteFunc", ()).await?;
Ok(v * 2)
}
}
struct ChildWithLeaves;
impl Durable<(), i32> for ChildWithLeaves {
const NAME: &'static str = "child_with_leaves";
const KIND: DurableKind = DurableKind::Workflow;
async fn execute(&self, env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
let ctx = env.into_context();
let a: i32 = ctx.run(Bar, ()).await?;
let b: i32 = ctx.run(Bar, ()).await?;
Ok(a + b)
}
}
struct InnerFailing;
impl Durable<(), i32> for InnerFailing {
const NAME: &'static str = "inner_failing";
const KIND: DurableKind = DurableKind::Workflow;
async fn execute(&self, env: ExecutionEnv<'_>, _args: ()) -> crate::error::Result<i32> {
let ctx = env.into_context();
ctx.run(Failing, ()).await
}
}
#[tokio::test]
async fn workflow_calling_leaf_completes_with_done() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let val: i32 = ctx.run(Bar, ()).await.unwrap();
let outcome = finalize_context(&ctx, Ok(val)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 42),
other => panic!("expected Done(Ok(42)), got {:?}", other),
}
}
#[tokio::test]
async fn workflow_calling_multiple_leaves_completes_with_final_value() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let a: i32 = ctx.run(Bar, ()).await.unwrap();
let b: i32 = ctx.run(Baz, ()).await.unwrap();
let outcome = finalize_context(&ctx, Ok(a + b)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 31458),
other => panic!("expected Done(Ok(31458)), got {:?}", other),
}
}
#[tokio::test]
async fn workflow_with_remote_suspends_then_completes_after_settlement() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let local_future = ctx.run(Bar, ()).spawn().unwrap();
let _remote_future: RemoteFuture<i32> = ctx.rpc::<i32>("bar", &()).spawn().unwrap();
let local_val: i32 = local_future.await.unwrap();
let outcome = finalize_context(&ctx, Ok(local_val)).await;
let remote_id = match &outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
remote_todos[0].clone()
}
other => panic!("expected Suspended, got {:?}", other),
};
harness.settle_promise_in_stub(&remote_id, 100_i32).await;
let effects2 = harness.build_effects(vec![resolved_promise(&remote_id, 100_i32)]);
let ctx2 = test_context("root", effects2);
let local_future2 = ctx2.run(Bar, ()).spawn().unwrap();
let remote_future2: RemoteFuture<i32> = ctx2.rpc::<i32>("bar", &()).spawn().unwrap();
let local_val2: i32 = local_future2.await.unwrap();
let remote_val2: i32 = remote_future2.await.unwrap();
let outcome2 = finalize_context(&ctx2, Ok(local_val2 + remote_val2)).await;
match outcome2 {
Outcome::Done(Ok(v)) => assert_eq!(v, 142),
other => panic!("expected Done after settlement, got {:?}", other),
}
}
#[tokio::test]
async fn structured_concurrency_multiple_remotes_require_multiple_settle_cycles() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _r1 = ctx.rpc::<i32>("bar", &()).spawn().unwrap();
let _r2 = ctx.rpc::<i32>("bar", &()).spawn().unwrap();
let outcome = finalize_context(&ctx, Ok(99)).await;
let todos = match &outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 2);
remote_todos.clone()
}
other => panic!("expected Suspended with 2 todos, got {:?}", other),
};
harness.settle_promise_in_stub(&todos[0], 10_i32).await;
let effects2 = harness.build_effects(vec![resolved_promise(&todos[0], 10_i32)]);
let ctx2 = test_context("root", effects2);
let _r1 = ctx2.rpc::<i32>("bar", &()).spawn().unwrap();
let _r2 = ctx2.rpc::<i32>("bar", &()).spawn().unwrap();
let outcome2 = finalize_context(&ctx2, Ok(99)).await;
match &outcome2 {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
}
other => panic!("expected Suspended with 1 todo, got {:?}", other),
}
harness.settle_promise_in_stub(&todos[1], 20_i32).await;
let effects3 = harness.build_effects(vec![
resolved_promise(&todos[0], 10_i32),
resolved_promise(&todos[1], 20_i32),
]);
let ctx3 = test_context("root", effects3);
let _r1 = ctx3.rpc::<i32>("bar", &()).spawn().unwrap();
let _r2 = ctx3.rpc::<i32>("bar", &()).spawn().unwrap();
let outcome3 = finalize_context(&ctx3, Ok(99)).await;
match outcome3 {
Outcome::Done(Ok(v)) => assert_eq!(v, 99),
other => panic!("expected Done(99), got {:?}", other),
}
}
#[tokio::test]
async fn fire_and_forget_local_leaves_flushed_at_return() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _f1 = ctx.run(Bar, ()).spawn().unwrap();
let _f2 = ctx.run(Baz, ()).spawn().unwrap();
let outcome = finalize_context(&ctx, Ok(99)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 99),
other => panic!("expected Done(99), got {:?}", other),
}
}
#[tokio::test]
async fn mixed_local_fire_and_forget_plus_remote_suspends_on_remote() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _local = ctx.run(Bar, ()).spawn().unwrap();
let _remote = ctx.rpc::<i32>("someRemote", &()).spawn().unwrap();
let outcome = finalize_context(&ctx, Ok(77)).await;
match &outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
}
other => panic!("expected Suspended, got {:?}", other),
}
}
#[tokio::test]
async fn local_function_error_surfaces_at_await_time() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let result: crate::error::Result<i32> = ctx.run(Failing, ()).await;
let msg = match result {
Err(e) => format!("caught: {}", e),
Ok(_) => "should not happen".to_string(),
};
let outcome = finalize_context(&ctx, Ok(msg)).await;
match outcome {
Outcome::Done(Ok(v)) => {
assert!(v.contains("caught:"), "got: {}", v);
assert!(v.contains("boom"), "got: {}", v);
}
other => panic!("expected Done(Ok(caught: boom)), got {:?}", other),
}
}
#[tokio::test]
async fn multiple_local_functions_run_concurrently() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let f_slow = ctx.run(Slow, ()).spawn().unwrap();
let f_fast = ctx.run(Fast, ()).spawn().unwrap();
let a: i32 = f_slow.await.unwrap();
let b: i32 = f_fast.await.unwrap();
let outcome = finalize_context(&ctx, Ok(a + b)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 3),
other => panic!("expected Done(3), got {:?}", other),
}
}
#[tokio::test]
async fn child_workflow_suspends_on_remote_parent_suspends_too() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("foo", effects);
let result: crate::error::Result<i32> = ctx.run(ChildWorkflow, ()).await;
let workflow_result = match result {
Ok(v) => Ok(v + 1),
Err(Error::Suspended) => Err(Error::Suspended),
Err(e) => Err(e),
};
let outcome = finalize_context(&ctx, workflow_result).await;
let remote_id = match &outcome {
Outcome::Suspended { remote_todos } => {
assert!(!remote_todos.is_empty(), "expected at least 1 remote todo");
remote_todos[0].clone()
}
other => panic!("expected Suspended, got {:?}", other),
};
harness.settle_promise_in_stub(&remote_id, 21_i32).await;
let effects2 = harness.build_effects(vec![resolved_promise(&remote_id, 21_i32)]);
let ctx2 = test_context("foo", effects2);
let result2: crate::error::Result<i32> = ctx2.run(ChildWorkflow, ()).await;
let workflow_result2 = match result2 {
Ok(v) => Ok(v + 1),
Err(e) => Err(e),
};
let outcome2 = finalize_context(&ctx2, workflow_result2).await;
match outcome2 {
Outcome::Done(Ok(v)) => assert_eq!(v, 43),
other => panic!("expected Done(43), got {:?}", other),
}
}
#[tokio::test]
async fn awaiting_a_durable_future_returns_correct_value() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let future = ctx.run(Bar, ()).spawn().unwrap();
let v: i32 = future.await.unwrap();
let outcome = finalize_context(&ctx, Ok(v)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 42),
other => panic!("expected Done(42), got {:?}", other),
}
}
#[tokio::test]
async fn single_run_with_leaf_resolves_correctly() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let result: i32 = ctx.run(Add, (3, 4)).await.unwrap();
let outcome = finalize_context(&ctx, Ok(result)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 7),
other => panic!("expected Done(7), got {:?}", other),
}
}
#[tokio::test]
async fn multiple_run_calls_with_leaves_complete_with_final_value() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let a: i32 = ctx.run(Double, 5).await.unwrap();
let b: i32 = ctx.run(Square, 3).await.unwrap();
let outcome = finalize_context(&ctx, Ok(a + b)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 19),
other => panic!("expected Done(19), got {:?}", other),
}
}
#[tokio::test]
async fn single_rpc_suspends_with_awaited_id() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let result: crate::error::Result<i32> = ctx.rpc("remoteFunc", ()).await;
let outcome = finalize_context(&ctx, result).await;
match outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
}
other => panic!("expected Suspended with 1 entry, got {:?}", other),
}
}
#[tokio::test]
async fn multiple_rpc_spawn_suspends_with_multiple_awaited_ids() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let _a = ctx.rpc::<i32>("a", &()).spawn().unwrap();
let _b = ctx.rpc::<i32>("b", &()).spawn().unwrap();
let outcome = finalize_context(&ctx, Ok(0)).await;
match outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 2);
}
other => panic!("expected Suspended with 2 entries, got {:?}", other),
}
}
#[tokio::test]
async fn local_todo_processed_first_then_suspends_on_remote() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let local_val: i32 = ctx.run(Add, (1, 2)).await.unwrap();
let remote_result: crate::error::Result<i32> = ctx.rpc("remoteFunc", ()).await;
let workflow_result = match remote_result {
Ok(v) => Ok(local_val + v),
Err(Error::Suspended) => Err(Error::Suspended),
Err(e) => Err(e),
};
let outcome = finalize_context(&ctx, workflow_result).await;
match outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
}
other => panic!("expected Suspended, got {:?}", other),
}
}
#[tokio::test]
async fn spawn_local_and_rpc_remote_in_parallel() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let local_future = ctx.run(Multiply, (3, 7)).spawn().unwrap();
let _remote_future = ctx.rpc::<i32>("remote", &()).spawn().unwrap();
let local_val: i32 = local_future.await.unwrap();
let outcome = finalize_context(&ctx, Ok(local_val)).await;
match outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
}
other => panic!("expected Suspended, got {:?}", other),
}
}
#[tokio::test]
async fn regular_function_resolves_with_returned_value() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let v: i32 = ctx.run(Add, (3, 4)).await.unwrap();
let outcome = finalize_context(&ctx, Ok(v)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 7),
other => panic!("expected Done(7), got {:?}", other),
}
}
#[tokio::test]
async fn regular_function_rejects_when_function_throws() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let result: crate::error::Result<i32> = ctx.run(Failing, ()).await;
let outcome = finalize_context(&ctx, result).await;
match outcome {
Outcome::Done(Err(e)) => {
assert!(e.to_string().contains("boom"), "got: {}", e);
}
other => panic!("expected Done(Err(boom)), got {:?}", other),
}
}
#[tokio::test]
async fn regular_function_resolves_with_unit_when_nothing_returned() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let _: () = ctx.run(Noop, ()).await.unwrap();
let outcome = finalize_context(&ctx, Ok(())).await;
match outcome {
Outcome::Done(Ok(())) => {}
other => panic!("expected Done(()), got {:?}", other),
}
}
#[tokio::test]
async fn regular_function_passes_arguments_correctly() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let v: String = ctx
.run(Concat, ("x".to_string(), "y".to_string(), "z".to_string()))
.await
.unwrap();
let outcome = finalize_context(&ctx, Ok(v)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, "x-y-z"),
other => panic!("expected Done(x-y-z), got {:?}", other),
}
}
#[tokio::test]
async fn local_function_that_throws_results_in_rejected() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let result: crate::error::Result<i32> = ctx.run(Failing, ()).await;
let outcome = finalize_context(&ctx, result).await;
match outcome {
Outcome::Done(Err(_)) => {} other => panic!("expected Done(Err), got {:?}", other),
}
}
#[tokio::test]
async fn local_function_executes_exactly_once() {
CALL_COUNT.store(0, std::sync::atomic::Ordering::SeqCst);
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("main", effects);
let v: i32 = ctx.run(Counter, ()).await.unwrap();
let outcome = finalize_context(&ctx, Ok(v)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 1),
other => panic!("expected Done(1), got {:?}", other),
}
assert_eq!(
CALL_COUNT.load(std::sync::atomic::Ordering::SeqCst),
1,
"counter should have been called exactly once"
);
}
#[tokio::test]
async fn local_call_sets_correct_tags() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _: i32 = ctx.run(Bar, ()).await.unwrap();
let requests = harness.sent_requests_json().await;
let create_req = requests.iter().find(|r| r["kind"] == "promise.create");
assert!(create_req.is_some(), "should have sent a PromiseCreate");
let create = create_req.unwrap();
assert_eq!(create["tags"]["resonate:scope"].as_str().unwrap(), "local");
assert_eq!(create["tags"]["resonate:parent"].as_str().unwrap(), "root");
assert_eq!(create["tags"]["resonate:origin"].as_str().unwrap(), "root");
assert!(create["tags"].get("resonate:branch").is_some());
}
#[tokio::test]
async fn remote_call_sets_correct_tags() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _: crate::error::Result<i32> = ctx.rpc("remote", ()).await;
let requests = harness.sent_requests_json().await;
let create_req = requests.iter().find(|r| r["kind"] == "promise.create");
assert!(create_req.is_some());
let create = create_req.unwrap();
assert_eq!(create["tags"]["resonate:scope"].as_str().unwrap(), "global");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"default"
);
assert_eq!(create["tags"]["resonate:parent"].as_str().unwrap(), "root");
assert_eq!(create["tags"]["resonate:origin"].as_str().unwrap(), "root");
assert!(create["tags"].get("resonate:branch").is_some());
}
#[tokio::test]
async fn origin_matches_root_for_all_nested_calls() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _: i32 = ctx.run(ChildWithLeaves, ()).await.unwrap();
let _: i32 = ctx.run(Bar, ()).await.unwrap();
let requests = harness.sent_requests_json().await;
let creates: Vec<_> = requests
.iter()
.filter(|r| r["kind"] == "promise.create")
.collect();
for create in &creates {
assert_eq!(
create["tags"]["resonate:origin"].as_str().unwrap(),
"root",
"promise {} should have origin 'root'",
create["id"].as_str().unwrap(),
);
}
let ids: Vec<&str> = creates.iter().map(|c| c["id"].as_str().unwrap()).collect();
assert!(ids.contains(&"root.0"), "should have root.0");
assert!(ids.contains(&"root.0.0"), "should have root.0.0");
assert!(ids.contains(&"root.0.1"), "should have root.0.1");
assert!(ids.contains(&"root.1"), "should have root.1");
}
#[tokio::test]
async fn rpc_target_is_resolved_through_target_resolver() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
format!("local://any@{}", target.unwrap_or("default"))
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<i32> = ctx.rpc("hello", ()).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"local://any@default"
);
}
#[tokio::test]
async fn rpc_target_uses_custom_prefix_from_target_resolver() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
format!("http://server:8001/workers/{}", target.unwrap_or("default"))
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<String> = ctx.rpc("my_func", 42i32).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"http://server:8001/workers/default"
);
}
#[tokio::test]
async fn rpc_spawn_target_is_resolved_through_target_resolver() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
format!("remote://group/{}", target.unwrap_or("default"))
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _ = ctx.rpc::<i32>("greet", &"world").spawn().unwrap().await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"remote://group/default"
);
}
#[tokio::test]
async fn identity_target_resolver_passes_target_through_unchanged() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| target.unwrap_or("default").to_string());
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<i32> = ctx.rpc("bare_name", ()).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"default"
);
}
#[tokio::test]
async fn target_resolver_propagates_through_multiple_rpc_calls() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
format!("custom://{}", target.unwrap_or("default"))
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<i32> = ctx.rpc("func_a", ()).await;
let _: crate::error::Result<i32> = ctx.rpc("func_b", ()).await;
let requests = harness.sent_requests_json().await;
let creates: Vec<_> = requests
.iter()
.filter(|r| r["kind"] == "promise.create")
.collect();
assert_eq!(creates.len(), 2);
assert_eq!(
creates[0]["tags"]["resonate:target"].as_str().unwrap(),
"custom://default"
);
assert_eq!(
creates[1]["tags"]["resonate:target"].as_str().unwrap(),
"custom://default"
);
}
#[tokio::test]
async fn rpc_target_override_with_url_passes_through_unchanged() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
let t = target.unwrap_or("default");
if t.contains("://") {
t.to_string()
} else {
format!("local://any@{}", t)
}
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<i32> = ctx
.rpc("some_func", ())
.target("http://other-host:8001/workers/hello")
.await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"http://other-host:8001/workers/hello",
"URL target should pass through unchanged"
);
}
#[tokio::test]
async fn rpc_target_override_bare_name_is_resolved_url_passes_through() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
let t = target.unwrap_or("default");
if t.contains("://") {
t.to_string()
} else {
format!("local://any@{}", t)
}
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<i32> = ctx.rpc("func_a", ()).target("workers").await;
let _: crate::error::Result<i32> = ctx
.rpc("func_b", ())
.target("https://remote.example.com/workers/greet")
.await;
let requests = harness.sent_requests_json().await;
let creates: Vec<_> = requests
.iter()
.filter(|r| r["kind"] == "promise.create")
.collect();
assert_eq!(creates.len(), 2);
assert_eq!(
creates[0]["tags"]["resonate:target"].as_str().unwrap(),
"local://any@workers",
"bare name target override should be rewritten by resolver"
);
assert_eq!(
creates[1]["tags"]["resonate:target"].as_str().unwrap(),
"https://remote.example.com/workers/greet",
"URL target override should pass through unchanged"
);
}
#[tokio::test]
async fn local_run_does_not_set_target_tag() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
format!("SHOULD_NOT_APPEAR://{}", target.unwrap_or("default"))
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: i32 = ctx.run(Bar, ()).await.unwrap();
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(create["tags"]["resonate:scope"].as_str().unwrap(), "local");
assert!(
create["tags"].get("resonate:target").is_none(),
"local run should not set resonate:target"
);
}
#[tokio::test]
async fn rpc_with_target_builder_overrides_func_name() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
format!("local://any@{}", target.unwrap_or("default"))
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<i32> =
ctx.rpc::<i32>("my_func", &()).target("custom-target").await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"local://any@custom-target",
"custom target should override func_name in target_resolver"
);
}
#[tokio::test]
async fn rpc_default_target_uses_group_name() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
format!("local://any@{}", target.unwrap_or("default"))
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<i32> = ctx.rpc::<i32>("my_func", &()).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"local://any@default",
"default target should use group name, not func_name"
);
}
#[tokio::test]
async fn rpc_with_url_target_passes_through() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
let t = target.unwrap_or("default");
if t.contains("://") {
t.to_string()
} else {
format!("local://any@{}", t)
}
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _: crate::error::Result<i32> = ctx
.rpc::<i32>("my_func", &())
.target("https://remote:9000/workers/foo")
.await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"https://remote:9000/workers/foo",
"URL target should pass through unchanged"
);
}
#[tokio::test]
async fn rpc_spawn_with_target_builder() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let target_resolver: crate::context::TargetResolver =
std::sync::Arc::new(|target: Option<&str>| {
format!("remote://{}", target.unwrap_or("default"))
});
let ctx = test_context_with_match("root", effects, target_resolver);
let _ = ctx
.rpc::<i32>("my_func", &())
.target("override-target")
.spawn()
.unwrap()
.await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["tags"]["resonate:target"].as_str().unwrap(),
"remote://override-target",
);
}
#[tokio::test]
async fn sequential_calls_produce_deterministic_ids() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _: i32 = ctx.run(Bar, ()).await.unwrap();
let _: i32 = ctx.run(Baz, ()).await.unwrap();
let _: i32 = ctx.run(Bar, ()).await.unwrap();
let requests = harness.sent_requests_json().await;
let create_ids: Vec<String> = requests
.iter()
.filter(|r| r["kind"] == "promise.create")
.map(|r| r["id"].as_str().unwrap().to_string())
.collect();
assert_eq!(create_ids[0], "root.0");
assert_eq!(create_ids[1], "root.1");
assert_eq!(create_ids[2], "root.2");
}
#[tokio::test]
async fn nested_calls_produce_hierarchical_ids() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _: i32 = ctx.run(ChildWithLeaves, ()).await.unwrap();
let requests = harness.sent_requests_json().await;
let create_ids: Vec<String> = requests
.iter()
.filter(|r| r["kind"] == "promise.create")
.map(|r| r["id"].as_str().unwrap().to_string())
.collect();
assert!(create_ids.contains(&"root.0".to_string()));
assert!(create_ids.contains(&"root.0.0".to_string()));
assert!(create_ids.contains(&"root.0.1".to_string()));
}
#[tokio::test]
async fn concurrent_execution_spawn_is_actually_concurrent() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let start = tokio::time::Instant::now();
let f1 = ctx.run(Slow, ()).spawn().unwrap();
let f2 = ctx.run(Fast, ()).spawn().unwrap();
let a: i32 = f1.await.unwrap();
let b: i32 = f2.await.unwrap();
let elapsed = start.elapsed();
let outcome = finalize_context(&ctx, Ok(a + b)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 3),
other => panic!("expected Done(3), got {:?}", other),
}
assert!(elapsed.as_millis() < 200, "took too long: {:?}", elapsed);
}
#[tokio::test]
async fn sequential_execution_run_is_actually_sequential() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let a: i32 = ctx.run(Bar, ()).await.unwrap();
let b: i32 = ctx.run(Baz, ()).await.unwrap();
let outcome = finalize_context(&ctx, Ok(a + b)).await;
match outcome {
Outcome::Done(Ok(v)) => assert_eq!(v, 31458),
other => panic!("expected Done(31458), got {:?}", other),
}
}
#[tokio::test]
async fn leaf_throwing_error_propagates_to_workflow_result() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let result: crate::error::Result<i32> = ctx.run(Failing, ()).await;
let outcome = finalize_context(&ctx, result).await;
match outcome {
Outcome::Done(Err(e)) => {
assert!(e.to_string().contains("boom"), "got: {}", e);
}
other => panic!("expected Done(Err), got {:?}", other),
}
}
#[tokio::test]
async fn nested_workflow_error_propagates_upward() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let result: crate::error::Result<i32> = ctx.run(InnerFailing, ()).await;
let outcome = finalize_context(&ctx, result).await;
match outcome {
Outcome::Done(Err(e)) => {
assert!(e.to_string().contains("boom"), "got: {}", e);
}
other => panic!("expected Done(Err), got {:?}", other),
}
}
#[tokio::test]
async fn child_timeout_capped_to_parent_for_local_run() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 5_000;
let ctx = test_context_with_timeout("root", parent_timeout, effects);
let _: i32 = ctx.run(Bar, ()).await.unwrap();
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert!(
create["timeoutAt"].as_i64().unwrap() <= parent_timeout,
"child timeout_at ({}) should be <= parent timeout_at ({})",
create["timeoutAt"].as_i64().unwrap(),
parent_timeout
);
}
#[tokio::test]
async fn child_timeout_capped_to_parent_for_remote_rpc() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 5_000;
let ctx = test_context_with_timeout("root", parent_timeout, effects);
let _: crate::error::Result<i32> = ctx.rpc("remote_func", ()).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert!(
create["timeoutAt"].as_i64().unwrap() <= parent_timeout,
"child timeout_at ({}) should be <= parent timeout_at ({})",
create["timeoutAt"].as_i64().unwrap(),
parent_timeout
);
}
#[tokio::test]
async fn child_timeout_capped_to_parent_for_run_spawn() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 5_000;
let ctx = test_context_with_timeout("root", parent_timeout, effects);
let _: i32 = ctx.run(Bar, ()).spawn().unwrap().await.unwrap();
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert!(
create["timeoutAt"].as_i64().unwrap() <= parent_timeout,
"child timeout_at ({}) should be <= parent timeout_at ({})",
create["timeoutAt"].as_i64().unwrap(),
parent_timeout
);
}
#[tokio::test]
async fn child_timeout_capped_to_parent_for_rpc_spawn() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 5_000;
let ctx = test_context_with_timeout("root", parent_timeout, effects);
let _ = ctx.rpc::<i32>("remote", &()).spawn().unwrap().await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert!(
create["timeoutAt"].as_i64().unwrap() <= parent_timeout,
"child timeout_at ({}) should be <= parent timeout_at ({})",
create["timeoutAt"].as_i64().unwrap(),
parent_timeout
);
}
#[tokio::test]
async fn explicit_child_timeout_smaller_than_parent_is_respected() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 60_000; let ctx = test_context_with_timeout("root", parent_timeout, effects);
let child_timeout = std::time::Duration::from_secs(10);
let _: i32 = ctx.run(Bar, ()).timeout(child_timeout).await.unwrap();
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let expected_approx = now + 10_000;
let tolerance = 1_000; assert!(
create["timeoutAt"].as_i64().unwrap() >= expected_approx - tolerance
&& create["timeoutAt"].as_i64().unwrap() <= expected_approx + tolerance,
"child timeout_at ({}) should be ~{} (now + 10s), not parent timeout_at ({})",
create["timeoutAt"].as_i64().unwrap(),
expected_approx,
parent_timeout
);
}
#[tokio::test]
async fn explicit_child_timeout_exceeding_parent_is_clamped() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 5_000; let ctx = test_context_with_timeout("root", parent_timeout, effects);
let child_timeout = std::time::Duration::from_secs(60);
let _: i32 = ctx.run(Bar, ()).timeout(child_timeout).await.unwrap();
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["timeoutAt"].as_i64().unwrap(),
parent_timeout,
"child timeout_at should be clamped to parent timeout_at"
);
}
#[tokio::test]
async fn rpc_with_timeout_builder_smaller_than_parent_is_respected() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 60_000;
let ctx = test_context_with_timeout("root", parent_timeout, effects);
let child_timeout = std::time::Duration::from_secs(10);
let _: crate::error::Result<i32> =
ctx.rpc::<i32>("remote", &()).timeout(child_timeout).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let expected_approx = now + 10_000;
let tolerance = 1_000;
assert!(
create["timeoutAt"].as_i64().unwrap() >= expected_approx - tolerance
&& create["timeoutAt"].as_i64().unwrap() <= expected_approx + tolerance,
"child timeout_at ({}) should be ~{} (now + 10s)",
create["timeoutAt"].as_i64().unwrap(),
expected_approx
);
}
#[tokio::test]
async fn rpc_spawn_with_timeout_exceeding_parent_is_clamped() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 5_000;
let ctx = test_context_with_timeout("root", parent_timeout, effects);
let child_timeout = std::time::Duration::from_secs(60);
let _ = ctx
.rpc::<i32>("remote", &())
.timeout(child_timeout)
.spawn()
.unwrap()
.await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["timeoutAt"].as_i64().unwrap(),
parent_timeout,
"child timeout_at should be clamped to parent timeout_at"
);
}
#[tokio::test]
async fn default_child_timeout_with_large_parent_uses_24h() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _: i32 = ctx.run(Bar, ()).await.unwrap();
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let now = super::now_ms();
let expected_24h = now + 86_400_000; let tolerance = 2_000;
assert!(
create["timeoutAt"].as_i64().unwrap() >= expected_24h - tolerance
&& create["timeoutAt"].as_i64().unwrap() <= expected_24h + tolerance,
"child timeout_at ({}) should be ~{} (now + 24h), got diff={}ms",
create["timeoutAt"].as_i64().unwrap(),
expected_24h,
(create["timeoutAt"].as_i64().unwrap() - expected_24h).abs()
);
}
#[tokio::test]
async fn sleep_suspends_on_pending() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let result = ctx.sleep(Duration::from_secs(60)).await;
assert!(matches!(result, Err(Error::Suspended)));
let outcome = finalize_context::<()>(&ctx, Err(Error::Suspended)).await;
match &outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
}
other => panic!("expected Suspended, got {:?}", other),
}
}
#[tokio::test]
async fn sleep_creates_promise_with_timer_tags() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _ = ctx.sleep(Duration::from_secs(60)).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let tags = create["tags"]
.as_object()
.expect("tags should be an object");
assert_eq!(tags["resonate:scope"].as_str().unwrap(), "global");
assert_eq!(tags["resonate:timer"].as_str().unwrap(), "true");
assert_eq!(tags["resonate:parent"].as_str().unwrap(), "root");
assert_eq!(tags["resonate:origin"].as_str().unwrap(), "root");
assert!(tags.contains_key("resonate:branch"));
assert!(!tags.contains_key("resonate:target"));
}
#[tokio::test]
async fn sleep_timeout_uses_duration() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 120_000; let ctx = test_context_with_timeout("root", parent_timeout, effects);
let _ = ctx.sleep(Duration::from_secs(60)).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let expected_approx = now + 60_000;
let tolerance = 1_000;
assert!(
create["timeoutAt"].as_i64().unwrap() >= expected_approx - tolerance
&& create["timeoutAt"].as_i64().unwrap() <= expected_approx + tolerance,
"sleep timeout_at ({}) should be ~{} (now + 60s)",
create["timeoutAt"].as_i64().unwrap(),
expected_approx
);
}
#[tokio::test]
async fn sleep_timeout_capped_to_parent() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 5_000; let ctx = test_context_with_timeout("root", parent_timeout, effects);
let _ = ctx.sleep(Duration::from_secs(60)).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
assert_eq!(
create["timeoutAt"].as_i64().unwrap(),
parent_timeout,
"sleep timeout_at should be clamped to parent timeout_at"
);
}
#[tokio::test]
async fn sleep_returns_ok_when_already_resolved() {
let harness = TestHarness::new();
let sleep_id = "root.0";
harness.settle_promise_in_stub(sleep_id, ()).await;
let effects = harness.build_effects(vec![resolved_promise(sleep_id, ())]);
let ctx = test_context("root", effects);
let result = ctx.sleep(Duration::from_secs(60)).await;
assert!(result.is_ok(), "sleep should return Ok(()) when resolved");
}
#[tokio::test]
async fn sleep_spawn_returns_remote_future_pending() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _handle = ctx.sleep(Duration::from_secs(30)).spawn().unwrap();
let outcome = finalize_context(&ctx, Ok("done")).await;
match &outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
}
other => panic!("expected Suspended, got {:?}", other),
}
}
#[tokio::test]
async fn sleep_spawn_resolved_returns_ok() {
let harness = TestHarness::new();
let sleep_id = "root.0";
harness.settle_promise_in_stub(sleep_id, ()).await;
let effects = harness.build_effects(vec![resolved_promise(sleep_id, ())]);
let ctx = test_context("root", effects);
let handle = ctx.sleep(Duration::from_secs(30)).spawn().unwrap();
let result = handle.await;
assert!(result.is_ok(), "sleep spawn should resolve to Ok(())");
}
#[tokio::test]
async fn sleep_has_empty_param() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _ = ctx.sleep(Duration::from_secs(10)).await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let param = &create["param"];
let data = ¶m["data"];
assert!(
data.is_null() || data.as_str().is_some_and(|s| s.is_empty()),
"sleep param data should be null or empty, got {:?}",
data
);
}
#[tokio::test]
async fn workflow_with_sleep_suspends_then_completes_after_settlement() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let result = ctx.sleep(Duration::from_secs(30)).await;
let outcome = finalize_context(&ctx, result).await;
let sleep_id = match &outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
remote_todos[0].clone()
}
other => panic!("expected Suspended, got {:?}", other),
};
harness.settle_promise_in_stub(&sleep_id, ()).await;
let effects2 = harness.build_effects(vec![resolved_promise(&sleep_id, ())]);
let ctx2 = test_context("root", effects2);
let result2 = ctx2.sleep(Duration::from_secs(30)).await;
assert!(result2.is_ok());
let outcome2 = finalize_context(&ctx2, Ok(())).await;
match outcome2 {
Outcome::Done(Ok(())) => {}
other => panic!("expected Done(\"awake\"), got {:?}", other),
}
}
#[tokio::test]
async fn promise_suspends_on_pending() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let result = ctx.promise::<String>().await;
assert!(matches!(result, Err(Error::Suspended)));
let outcome = finalize_context::<()>(&ctx, Err(Error::Suspended)).await;
match &outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos.len(), 1);
}
other => panic!("expected Suspended, got {:?}", other),
}
}
#[tokio::test]
async fn promise_creates_with_correct_tags() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _ = ctx.promise::<String>().await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let tags = create["tags"]
.as_object()
.expect("tags should be an object");
assert_eq!(tags["resonate:scope"].as_str().unwrap(), "global");
assert_eq!(tags["resonate:parent"].as_str().unwrap(), "root");
assert_eq!(tags["resonate:origin"].as_str().unwrap(), "root");
assert!(tags.contains_key("resonate:branch"));
assert!(!tags.contains_key("resonate:target"));
assert!(!tags.contains_key("resonate:timer"));
}
#[tokio::test]
async fn promise_has_empty_param() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _ = ctx.promise::<String>().await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let param = &create["param"];
let data = ¶m["data"];
assert!(
data.is_null() || data.as_str().is_some_and(|s| s.is_empty()),
"promise param data should be null or empty, got {:?}",
data
);
}
#[tokio::test]
async fn promise_with_timeout() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let now = super::now_ms();
let parent_timeout = now + 300_000; let ctx = test_context_with_timeout("root", parent_timeout, effects);
let _ = ctx
.promise::<String>()
.timeout(Duration::from_secs(120))
.await;
let requests = harness.sent_requests_json().await;
let create = requests
.iter()
.find(|r| r["kind"] == "promise.create")
.expect("should have sent promise.create");
let expected_approx = now + 120_000;
let tolerance = 1_000;
assert!(
create["timeoutAt"].as_i64().unwrap() >= expected_approx - tolerance
&& create["timeoutAt"].as_i64().unwrap() <= expected_approx + tolerance,
"promise timeout_at ({}) should be ~{} (now + 120s)",
create["timeoutAt"].as_i64().unwrap(),
expected_approx
);
}
#[tokio::test]
async fn promise_create_pending_returns_remote_future() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let handle = ctx.promise::<String>().create().unwrap();
let err = handle.await.unwrap_err();
assert!(matches!(err, Error::Suspended));
}
#[tokio::test]
async fn promise_resolved_returns_value() {
let harness = TestHarness::new();
let promise_id = "root.0";
harness
.settle_promise_in_stub(promise_id, "hello".to_string())
.await;
let effects =
harness.build_effects(vec![resolved_promise(promise_id, "hello".to_string())]);
let ctx = test_context("root", effects);
let result: String = ctx.promise().await.unwrap();
assert_eq!(result, "hello");
}
async fn create_ids_in_order(harness: &TestHarness) -> Vec<String> {
harness
.sent_requests_json()
.await
.iter()
.filter(|r| r["kind"] == "promise.create")
.map(|r| r["id"].as_str().unwrap().to_string())
.collect()
}
#[tokio::test]
async fn sequenced_spawns_create_promises_in_call_order() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
for i in 0..8 {
let _ = ctx.rpc::<i32>(&format!("f{}", i), &()).spawn().unwrap();
}
let _ = finalize_context(&ctx, Ok(0)).await;
let ids = create_ids_in_order(&harness).await;
let expected: Vec<String> = (0..8).map(|i| format!("root.{}", i)).collect();
assert_eq!(
ids, expected,
"creations must reach the server in call order"
);
}
#[tokio::test]
async fn mixed_spawn_and_sequential_ops_create_in_call_order() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let _h0 = ctx.run(Bar, ()).spawn().unwrap(); let _h1 = ctx.rpc::<i32>("remote", &()).spawn().unwrap(); let _: i32 = ctx.run(Baz, ()).await.unwrap(); let _h3 = ctx.sleep(Duration::from_secs(30)).spawn().unwrap(); let _ = finalize_context(&ctx, Ok(0)).await;
let ids = create_ids_in_order(&harness).await;
let root_children: Vec<&String> = ids
.iter()
.filter(|id| id.matches('.').count() == 1)
.collect();
assert_eq!(root_children, ["root.0", "root.1", "root.2", "root.3"]);
}
#[tokio::test]
async fn failed_creation_aborts_all_successors() {
let harness = TestHarness::new();
harness.set_fail_promise_create(true).await;
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let h0 = ctx.rpc::<i32>("a", &()).spawn().unwrap();
let h1 = ctx.rpc::<i32>("b", &()).spawn().unwrap();
let h2 = ctx.rpc::<i32>("c", &()).spawn().unwrap();
let e0 = h0.await.unwrap_err();
assert!(
!matches!(e0, Error::Suspended),
"first handle should fail, got: {e0}"
);
let e1 = h1.await.unwrap_err();
assert!(matches!(e1, Error::Application { .. }), "got: {e1}");
let e2 = h2.await.unwrap_err();
assert!(matches!(e2, Error::Application { .. }), "got: {e2}");
let result = try_finalize_context(&ctx, Ok(0)).await;
assert!(result.is_err(), "flush should surface the creation failure");
let ids = create_ids_in_order(&harness).await;
assert_eq!(ids, ["root.0"], "successors must not issue promise.create");
}
#[tokio::test]
async fn fire_and_forget_failed_creation_fails_at_flush() {
let harness = TestHarness::new();
harness.set_fail_promise_create(true).await;
{
let ctx = test_context("r1", harness.build_effects(vec![]));
let _ = ctx.rpc::<i32>("f", &()).spawn().unwrap();
assert!(try_finalize_context(&ctx, Ok(0)).await.is_err(), "rpc");
}
{
let ctx = test_context("r2", harness.build_effects(vec![]));
let _ = ctx.run(Bar, ()).spawn().unwrap();
assert!(try_finalize_context(&ctx, Ok(0)).await.is_err(), "run");
}
{
let ctx = test_context("r3", harness.build_effects(vec![]));
let _ = ctx.sleep(Duration::from_secs(5)).spawn().unwrap();
assert!(try_finalize_context(&ctx, Ok(0)).await.is_err(), "sleep");
}
{
let ctx = test_context("r4", harness.build_effects(vec![]));
let _ = ctx.detached("f", ()).spawn().unwrap();
assert!(try_finalize_context(&ctx, Ok(0)).await.is_err(), "detached");
}
}
#[tokio::test]
async fn rpc_spawn_handle_awaits_suspended_then_flush_collects_todo() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let handle = ctx.rpc::<i32>("remote", &()).spawn().unwrap();
let err = handle.await.unwrap_err();
assert!(matches!(err, Error::Suspended), "got: {err}");
let outcome = finalize_context(&ctx, Err::<i32, _>(Error::Suspended)).await;
match &outcome {
Outcome::Suspended { remote_todos } => {
assert_eq!(remote_todos, &["root.0".to_string()]);
}
other => panic!("expected Suspended, got {:?}", other),
}
}
#[tokio::test]
async fn rpc_spawn_preloaded_resolved_returns_value_via_handle() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![resolved_promise("root.0", 99_i32)]);
let ctx = test_context("root", effects);
let handle = ctx.rpc::<i32>("remote", &()).spawn().unwrap();
let v: i32 = handle.await.unwrap();
assert_eq!(v, 99);
}
#[tokio::test]
async fn detached_spawn_handle_yields_id_after_creation() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let handle = ctx.detached("audit", ()).spawn().unwrap();
let id = handle.id().await.unwrap();
assert!(id.starts_with("root."), "id = {id}");
let ids = create_ids_in_order(&harness).await;
assert_eq!(ids, [id]);
let outcome = finalize_context(&ctx, Ok(0)).await;
assert!(matches!(outcome, Outcome::Done(_)));
}
#[tokio::test]
async fn handle_id_returns_only_after_creation_reached_server() {
let harness = TestHarness::new();
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let handle = ctx.rpc::<i32>("remote", &()).spawn().unwrap();
let id = handle.id().await.unwrap();
assert_eq!(id, "root.0");
let ids = create_ids_in_order(&harness).await;
assert_eq!(ids, ["root.0"]);
let _ = finalize_context(&ctx, Ok(0)).await;
}
#[tokio::test]
async fn handle_id_fails_when_creation_fails() {
let harness = TestHarness::new();
harness.set_fail_promise_create(true).await;
let effects = harness.build_effects(vec![]);
let ctx = test_context("root", effects);
let h0 = ctx.rpc::<i32>("a", &()).spawn().unwrap();
let h1 = ctx.rpc::<i32>("b", &()).spawn().unwrap();
let e0 = h0.id().await.unwrap_err();
assert!(matches!(e0, Error::PromiseCreation(_)), "got: {e0}");
let e1 = h1.id().await.unwrap_err();
assert!(matches!(e1, Error::PromiseCreation(_)), "got: {e1}");
let _ = try_finalize_context(&ctx, Ok(0)).await;
}
#[tokio::test]
async fn dropping_spawned_handle_aborts_inflight_task() {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
struct Probe(Arc<AtomicBool>);
impl Drop for Probe {
fn drop(&mut self) {
self.0.store(true, SeqCst);
}
}
let cancelled = Arc::new(AtomicBool::new(false));
let probe = Probe(cancelled.clone());
let (started_tx, started_rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn(async move {
let _probe = probe;
started_tx.send(()).unwrap();
std::future::pending::<()>().await; Outcome::Done(Ok(()))
});
let sh = SpawnedHandle {
id: "child".to_string(),
handle,
};
started_rx.await.unwrap();
drop(sh); tokio::task::yield_now().await;
assert!(
cancelled.load(SeqCst),
"parked task should have been aborted when its SpawnedHandle dropped"
);
}
}