temporalio-sdk 0.3.0

Temporal Rust SDK
Documentation
#![allow(unreachable_pub)]
use std::time::Duration;
use temporalio_macros::{activities, workflow, workflow_methods};
use temporalio_sdk::{
    ActivityExecutionError, ActivityOptions, WorkflowContext, WorkflowResult, WorkflowTermination,
    activities::{ActivityContext, ActivityError},
};

#[derive(Debug)]
enum Booking {
    Hotel,
    Flight,
    Car,
}

#[workflow]
#[derive(Default)]
pub struct SagaWorkflow;

#[workflow_methods]
impl SagaWorkflow {
    #[run]
    pub async fn run(
        ctx: &mut WorkflowContext<Self>,
        trip_id: String,
    ) -> WorkflowResult<Vec<String>> {
        let mut compensations: Vec<(Booking, String)> = Vec::new();

        match Self::book_trip(ctx, trip_id, &mut compensations).await {
            Ok(()) => Ok(compensations.into_iter().map(|(_, id)| id).collect()),
            Err(e) => {
                Self::run_compensations(ctx, &compensations).await;
                Err(WorkflowTermination::failed(e))
            }
        }
    }

    async fn book_trip(
        ctx: &mut WorkflowContext<SagaWorkflow>,
        trip_id: String,
        compensations: &mut Vec<(Booking, String)>,
    ) -> Result<(), ActivityExecutionError> {
        let hotel = ctx
            .start_activity(
                BookingActivities::book_hotel,
                trip_id.clone(),
                activity_opts(),
            )
            .await?;
        compensations.push((Booking::Hotel, hotel));

        let flight = ctx
            .start_activity(
                BookingActivities::book_flight,
                trip_id.clone(),
                activity_opts(),
            )
            .await?;
        compensations.push((Booking::Flight, flight));

        let car = ctx
            .start_activity(
                BookingActivities::book_car,
                trip_id.clone(),
                activity_opts(),
            )
            .await?;
        compensations.push((Booking::Car, car));

        Ok(())
    }

    async fn run_compensations<W>(
        ctx: &mut WorkflowContext<W>,
        compensations: &[(Booking, String)],
    ) {
        for (service, booking_id) in compensations.iter().rev() {
            let result = match *service {
                Booking::Hotel => {
                    ctx.start_activity(
                        BookingActivities::cancel_hotel,
                        booking_id.clone(),
                        activity_opts(),
                    )
                    .await
                }
                Booking::Flight => {
                    ctx.start_activity(
                        BookingActivities::cancel_flight,
                        booking_id.clone(),
                        activity_opts(),
                    )
                    .await
                }
                Booking::Car => {
                    ctx.start_activity(
                        BookingActivities::cancel_car,
                        booking_id.clone(),
                        activity_opts(),
                    )
                    .await
                }
            };

            if let Err(e) = result {
                eprintln!("Compensation failed for {service:?} {booking_id}: {e}");
            }
        }
    }
}

pub struct BookingActivities;

#[activities]
impl BookingActivities {
    #[activity]
    pub async fn book_hotel(
        _ctx: ActivityContext,
        trip_id: String,
    ) -> Result<String, ActivityError> {
        Ok(format!("hotel-{trip_id}"))
    }

    #[activity]
    pub async fn book_flight(
        _ctx: ActivityContext,
        trip_id: String,
    ) -> Result<String, ActivityError> {
        Ok(format!("flight-{trip_id}"))
    }

    #[activity]
    pub async fn book_car(_ctx: ActivityContext, trip_id: String) -> Result<String, ActivityError> {
        if trip_id.contains("fail") {
            return Err(ActivityError::NonRetryable(
                anyhow::anyhow!("Car booking failed for trip {trip_id}").into(),
            ));
        }
        Ok(format!("car-{trip_id}"))
    }

    #[activity]
    pub async fn cancel_hotel(
        _ctx: ActivityContext,
        booking_id: String,
    ) -> Result<(), ActivityError> {
        println!("Cancelled hotel booking: {booking_id}");
        Ok(())
    }

    #[activity]
    pub async fn cancel_flight(
        _ctx: ActivityContext,
        booking_id: String,
    ) -> Result<(), ActivityError> {
        println!("Cancelled flight booking: {booking_id}");
        Ok(())
    }

    #[activity]
    pub async fn cancel_car(
        _ctx: ActivityContext,
        booking_id: String,
    ) -> Result<(), ActivityError> {
        println!("Cancelled car booking: {booking_id}");
        Ok(())
    }
}

fn activity_opts() -> ActivityOptions {
    ActivityOptions::start_to_close_timeout(Duration::from_secs(1))
}