temporalio-sdk-core 0.4.0

Library for building new Temporal SDKs
Documentation
//! Shared tests that are meant to be run against both local dev server and cloud

use crate::common::{CoreWfStarter, activity_functions::StdActivities};
use std::{
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering::Relaxed},
    },
    time::Duration,
};
use temporalio_client::{
    WorkflowFetchHistoryOptions, WorkflowStartOptions, WorkflowTerminateOptions,
};
use temporalio_common::{
    UntypedWorkflow,
    protos::temporal::api::{
        enums::v1::{EventType, WorkflowTaskFailedCause::GrpcMessageTooLarge},
        history::v1::history_event::Attributes::{
            WorkflowExecutionTerminatedEventAttributes, WorkflowTaskFailedEventAttributes,
        },
    },
    worker::WorkerTaskTypes,
};
use temporalio_macros::{workflow, workflow_methods};
use temporalio_sdk::{ActivityOptions, WorkflowContext, WorkflowResult, WorkflowTermination};

pub(crate) mod priority;

#[workflow]
struct OversizeGrpcMessageWf {
    run_flag: Arc<AtomicBool>,
}

#[workflow_methods(factory_only)]
impl OversizeGrpcMessageWf {
    #[run]
    async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<Vec<u8>> {
        if ctx.state(|wf| wf.run_flag.load(Relaxed)) {
            Ok(vec![])
        } else {
            ctx.state(|wf| wf.run_flag.store(true, Relaxed));
            let result: Vec<u8> = vec![0; 5000000];
            Ok(result)
        }
    }
}

pub(crate) async fn grpc_message_too_large() {
    let run_flag = Arc::new(AtomicBool::new(false));
    let run_flag_clone = run_flag.clone();

    let wf_name = "oversize_grpc_message";
    let mut starter = CoreWfStarter::new_cloud_or_local(wf_name, "")
        .await
        .unwrap();
    starter.sdk_config.task_types = WorkerTaskTypes::workflow_only();
    starter
        .sdk_config
        .register_workflow_with_factory(move || OversizeGrpcMessageWf {
            run_flag: run_flag_clone.clone(),
        });

    let mut sdk = starter.worker().await;
    sdk.submit_workflow(
        OversizeGrpcMessageWf::run,
        (),
        starter.workflow_options.clone(),
    )
    .await
    .unwrap();
    sdk.run_until_done().await.unwrap();

    let events = starter.get_history().await.events;
    // Depending on the version of server, it may terminate the workflow, or simply be a task
    // failure
    assert!(
        events.iter().any(is_oversize_grpc_event),
        "Expected workflow task failure or termination b/c grpc message too large: {events:?}",
    );
}

pub(crate) fn is_oversize_grpc_event(
    e: &temporalio_common::protos::temporal::api::history::v1::HistoryEvent,
) -> bool {
    // Task failure
    e.event_type == EventType::WorkflowTaskFailed as i32
        && if let WorkflowTaskFailedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
            attr.cause == GrpcMessageTooLarge as i32
                && attr.failure.as_ref().unwrap().message == "GRPC Message too large"
        } else {
            false
        }
    // Workflow terminated
    ||
    e.event_type == EventType::WorkflowExecutionTerminated as i32
        && if let WorkflowExecutionTerminatedEventAttributes(attr) = e.attributes.as_ref().unwrap() {
            attr.reason == "GrpcMessageTooLarge"
        } else {
            false
        }
}

#[workflow]
#[derive(Default)]
struct ShutdownTimerActivityLoopWf;

#[workflow_methods]
impl ShutdownTimerActivityLoopWf {
    #[run]
    async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
        loop {
            ctx.timer(Duration::from_millis(10)).await;
            ctx.start_activity(
                StdActivities::no_op,
                (),
                ActivityOptions::start_to_close_timeout(Duration::from_secs(10)),
            )
            .await
            .map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
        }
    }
}

/// Starts 10 workflows that each run a tight timer+activity loop, then shuts down the worker
/// and verifies:
///   1. Shutdown completes rapidly (< 5s)
///   2. No workflow task failures or timeouts appear in any workflow's history
pub(crate) async fn shutdown_during_active_timer_activity_workflows() {
    let wf_name = "shutdown_during_active_timer_activity_workflows";
    let num_workflows = 10;

    let mut starter =
        if let Some(wfs) = CoreWfStarter::new_cloud_or_local(wf_name, ">=1.6.3-serverless").await {
            wfs
        } else {
            return;
        };
    starter.sdk_config.register_activities(StdActivities);
    let mut worker = starter.worker().await;
    worker.register_workflow::<ShutdownTimerActivityLoopWf>();

    let core = worker.core_worker();
    core.validate().await.unwrap();
    assert!(
        core.get_namespace_capabilities().graceful_poll_shutdown(),
        "Server must support graceful poll shutdown for this test"
    );

    let task_queue = starter.get_task_queue().to_owned();
    let mut wf_ids = Vec::with_capacity(num_workflows);
    for i in 0..num_workflows {
        let wf_id = format!("{task_queue}-{i}");
        worker
            .submit_workflow(
                ShutdownTimerActivityLoopWf::run,
                (),
                WorkflowStartOptions::new(task_queue.clone(), wf_id.clone()).build(),
            )
            .await
            .unwrap();
        wf_ids.push(wf_id);
    }
    // Don't wait for workflow completion — these loop forever
    worker.fetch_results = false;

    let shutdown_handle = worker.inner_mut().shutdown_handle();
    let run_fut = async { worker.run_until_done().await.unwrap() };

    let shutdown_fut = async {
        // Let workflows run a few iterations
        tokio::time::sleep(Duration::from_secs(2)).await;
        shutdown_handle();
    };

    let shutdown_start = std::time::Instant::now();
    tokio::join!(run_fut, shutdown_fut);
    let shutdown_elapsed = shutdown_start.elapsed();

    assert!(
        shutdown_elapsed < Duration::from_secs(5),
        "Worker shutdown took {shutdown_elapsed:?}, expected < 5s"
    );

    let client = starter.get_client().await;
    for wf_id in &wf_ids {
        client
            .get_workflow_handle::<UntypedWorkflow>(wf_id)
            .terminate(WorkflowTerminateOptions::default())
            .await
            .unwrap();

        let history = client
            .get_workflow_handle::<UntypedWorkflow>(wf_id)
            .fetch_history(WorkflowFetchHistoryOptions::default())
            .await
            .unwrap();
        let bad_events: Vec<_> = history
            .events()
            .iter()
            .filter(|e| {
                e.event_type() == EventType::WorkflowTaskFailed
                    || e.event_type() == EventType::WorkflowTaskTimedOut
            })
            .collect();
        assert!(
            bad_events.is_empty(),
            "Workflow {wf_id} had unexpected WFT failures/timeouts: {bad_events:?}"
        );
    }
}