use crate::{
ActContext, ActivityError, ActivityOptions, ChildWorkflowOptions, WfContext, WfExitValue,
};
use anyhow::anyhow;
use futures::FutureExt;
use futures::future::BoxFuture;
use std::time::Duration;
use std::{fmt::Debug, future::Future};
use squads_temporal_sdk_core_protos::coresdk::{
AsJsonPayloadExt, FromJsonPayloadExt, activity_result::activity_resolution,
child_workflow::child_workflow_result,
};
pub trait AsyncFn<Arg0, Arg1>: Fn(Arg0, Arg1) -> Self::OutputFuture {
type Output;
type OutputFuture: Future<Output = <Self as AsyncFn<Arg0, Arg1>>::Output> + Send + 'static;
}
impl<F: ?Sized, Fut, Arg0, Arg1> AsyncFn<Arg0, Arg1> for F
where
F: Fn(Arg0, Arg1) -> Fut,
Fut: Future + Send + 'static,
{
type Output = Fut::Output;
type OutputFuture = Fut;
}
pub async fn execute_activity<A, F, R>(
ctx: &WfContext,
options: ActivityOptions,
_f: F,
a: A,
) -> Result<R, anyhow::Error>
where
F: AsyncFn<ActContext, A, Output = Result<R, ActivityError>> + Send + Sync + 'static,
A: AsJsonPayloadExt + Debug,
R: FromJsonPayloadExt + Debug,
{
let input = A::as_json_payload(&a).expect("serializes fine");
let activity_type = if options.activity_type.is_empty() {
std::any::type_name::<F>().to_string()
} else {
options.activity_type
};
let options = ActivityOptions {
activity_type,
input,
..options
};
let activity_resolution = ctx.activity(options).await;
match activity_resolution.status {
Some(status) => match status {
activity_resolution::Status::Completed(success) => {
Ok(R::from_json_payload(&success.result.unwrap()).unwrap())
}
activity_resolution::Status::Failed(failure) => Err(anyhow::anyhow!("{:?}", failure)),
activity_resolution::Status::Cancelled(reason) => Err(anyhow::anyhow!("{:?}", reason)),
activity_resolution::Status::Backoff(reason) => Err(anyhow::anyhow!("{:?}", reason)),
},
None => panic!("activity task failed {activity_resolution:?}"),
}
}
pub fn into_workflow<A, F, R, O>(
f: F,
) -> impl Fn(WfContext) -> BoxFuture<'static, Result<WfExitValue<O>, anyhow::Error>> + Send + Sync
where
A: FromJsonPayloadExt + Send,
F: AsyncFn<WfContext, A, Output = Result<R, anyhow::Error>> + Send + Sync + 'static,
R: Into<WfExitValue<O>>,
O: AsJsonPayloadExt + Debug,
{
move |ctx: WfContext| match A::from_json_payload(&ctx.get_args()[0]) {
Ok(a) => (f)(ctx, a).map(|r| r.map(|r| r.into())).boxed(),
Err(e) => async move { Err(e.into()) }.boxed(),
}
}
pub async fn execute_child_workflow<A, F, R>(
ctx: &WfContext,
options: ChildWorkflowOptions,
_f: F,
a: A,
) -> Result<R, anyhow::Error>
where
F: AsyncFn<WfContext, A, Output = Result<R, anyhow::Error>> + Send + Sync + 'static,
A: AsJsonPayloadExt + Debug,
R: FromJsonPayloadExt + Debug,
{
let input = A::as_json_payload(&a).expect("serializes fine");
let workflow_type = if options.workflow_type.is_empty() {
std::any::type_name::<F>().to_string()
} else {
options.workflow_type
};
let child = ctx.child_workflow(ChildWorkflowOptions {
workflow_type,
input: vec![input],
..options
});
let started = child
.start(ctx)
.await
.into_started()
.expect("Child should start OK");
match started.result().await.status {
Some(status) => match status {
child_workflow_result::Status::Completed(success) => {
Ok(R::from_json_payload(&success.result.unwrap()).unwrap())
}
child_workflow_result::Status::Failed(failure) => Err(anyhow::anyhow!("{:?}", failure)),
child_workflow_result::Status::Cancelled(reason) => {
Err(anyhow::anyhow!("{:?}", reason))
}
},
None => Err(anyhow!("Unexpected child WF status")),
}
}
pub async fn sleep(ctx: &WfContext, duration: Duration) {
ctx.timer(duration).await;
}