#![allow(unreachable_pub)]
use futures_util::future::LocalBoxFuture;
use std::time::Duration;
use temporalio_common::ActivityDefinition;
use temporalio_macros::{activities, workflow, workflow_methods};
use temporalio_sdk::{
ActivityExecutionError, ActivityOptions, ApplicationFailure, WorkflowContext, WorkflowResult,
activities::{ActivityContext, ActivityError},
};
#[workflow]
#[derive(Default)]
pub struct SagaWorkflow;
#[workflow_methods]
impl SagaWorkflow {
#[run]
pub async fn run(
ctx: &mut WorkflowContext<Self>,
trip_id: String,
) -> WorkflowResult<Vec<String>> {
let mut saga = Saga::new(ctx, activity_opts());
match Self::book_trip(&mut saga, trip_id).await {
Ok(ids) => Ok(ids),
Err(e) => {
saga.compensate().await;
Err(e.into())
}
}
}
async fn book_trip(
saga: &mut Saga,
trip_id: String,
) -> Result<Vec<String>, ActivityExecutionError> {
let hotel = saga
.step(
BookingActivities::book_hotel,
trip_id.clone(),
BookingActivities::cancel_hotel,
)
.await?;
let flight = saga
.step(
BookingActivities::book_flight,
trip_id.clone(),
BookingActivities::cancel_flight,
)
.await?;
let car = saga
.step(
BookingActivities::book_car,
trip_id.clone(),
BookingActivities::cancel_car,
)
.await?;
Ok(vec![hotel, flight, car])
}
}
struct Saga {
ctx: WorkflowContext<SagaWorkflow>,
opts: ActivityOptions,
compensations: Vec<LocalBoxFuture<'static, ()>>,
}
impl Saga {
fn new(ctx: &WorkflowContext<SagaWorkflow>, opts: ActivityOptions) -> Self {
Self {
ctx: ctx.clone(),
opts,
compensations: Vec::new(),
}
}
async fn step<Step, Compensation>(
&mut self,
forward: Step,
input: impl Into<Step::Input>,
compensate: Compensation,
) -> Result<Step::Output, ActivityExecutionError>
where
Step: ActivityDefinition,
Step::Output: Clone + Into<Compensation::Input>,
Compensation: ActivityDefinition<Output = ()> + 'static,
{
let out = self
.ctx
.start_activity(forward, input, self.opts.clone())
.await?;
let cmp_input: Compensation::Input = out.clone().into();
let ctx = self.ctx.clone();
let opts = self.opts.clone();
self.compensations.push(Box::pin(async move {
if let Err(e) = ctx.start_activity(compensate, cmp_input, opts).await {
eprintln!("Compensation {} failed: {e}", Compensation::name());
}
}));
Ok(out)
}
async fn compensate(self) {
for c in self.compensations.into_iter().rev() {
c.await;
}
}
}
pub struct BookingActivities;
#[activities]
impl BookingActivities {
#[activity]
pub async fn book_hotel(
_ctx: ActivityContext,
trip_id: String,
) -> Result<String, ActivityError> {
Ok(format!("hotel-{trip_id}"))
}
#[activity]
pub async fn book_flight(
_ctx: ActivityContext,
trip_id: String,
) -> Result<String, ActivityError> {
Ok(format!("flight-{trip_id}"))
}
#[activity]
pub async fn book_car(_ctx: ActivityContext, trip_id: String) -> Result<String, ActivityError> {
if trip_id.contains("fail") {
return Err(ActivityError::application(
ApplicationFailure::non_retryable("Car booking failed for trip {trip_id}"),
));
}
Ok(format!("car-{trip_id}"))
}
#[activity]
pub async fn cancel_hotel(
_ctx: ActivityContext,
booking_id: String,
) -> Result<(), ActivityError> {
println!("Cancelled hotel booking: {booking_id}");
Ok(())
}
#[activity]
pub async fn cancel_flight(
_ctx: ActivityContext,
booking_id: String,
) -> Result<(), ActivityError> {
println!("Cancelled flight booking: {booking_id}");
Ok(())
}
#[activity]
pub async fn cancel_car(
_ctx: ActivityContext,
booking_id: String,
) -> Result<(), ActivityError> {
println!("Cancelled car booking: {booking_id}");
Ok(())
}
}
fn activity_opts() -> ActivityOptions {
ActivityOptions::start_to_close_timeout(Duration::from_secs(1))
}