Skip to main content

stormchaser_engine/handler/step/intrinsic/
lambda.rs

1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4use std::sync::Arc;
5use stormchaser_model::RunId;
6use stormchaser_model::StepInstanceId;
7use stormchaser_tls::TlsReloader;
8
9#[cfg(feature = "aws-lambda")]
10use crate::handler::fetch_step_instance;
11#[cfg(feature = "aws-lambda")]
12use crate::handler::handle_lambda_invoke;
13
14/// Attempts to dispatch an AWS Lambda execution step instance.
15pub async fn try_dispatch(
16    run_id: RunId,
17    step_instance_id: StepInstanceId,
18    step_type: &str,
19    resolved_spec: &Value,
20    pool: PgPool,
21    nats_client: async_nats::Client,
22    _tls_reloader: Arc<TlsReloader>,
23) -> Result<bool> {
24    if step_type == "LambdaInvoke" {
25        #[cfg(feature = "aws-lambda")]
26        {
27            let pool = pool.clone();
28            let nats_client = nats_client.clone();
29            let spec = resolved_spec.clone();
30
31            tokio::spawn(async move {
32                if let Err(e) = handle_lambda_invoke(
33                    run_id,
34                    step_instance_id,
35                    spec,
36                    pool.clone(),
37                    nats_client.clone(),
38                )
39                .await
40                {
41                    if let Ok(instance) = fetch_step_instance(step_instance_id, &pool).await {
42                        let machine = crate::step_machine::StepMachine::<
43                            crate::step_machine::state::Pending,
44                        >::from_instance(instance);
45                        if let Ok(mut conn) = pool.acquire().await {
46                            if let Ok(_machine) = machine
47                                .start("error-recovery".to_string(), &mut *conn)
48                                .await
49                            {
50                                if let Ok(instance) =
51                                    fetch_step_instance(step_instance_id, &pool).await
52                                {
53                                    let machine = crate::step_machine::StepMachine::<
54                                        crate::step_machine::state::Running,
55                                    >::from_instance(
56                                        instance
57                                    );
58                                    let _ = machine
59                                        .fail(
60                                            format!("Lambda invoke failed: {:?}", e),
61                                            None,
62                                            &mut *conn,
63                                        )
64                                        .await;
65                                }
66                            }
67                        }
68                    }
69                }
70            });
71            return Ok(true);
72        }
73        #[cfg(not(feature = "aws-lambda"))]
74        {
75            let _ = run_id;
76            let _ = step_instance_id;
77            let _ = resolved_spec;
78            let _ = pool;
79            let _ = nats_client;
80            return Ok(true); // Ignore if feature is not enabled
81        }
82    }
83
84    Ok(false)
85}