Skip to main content

stormchaser_engine/handler/step/intrinsic/
lambda.rs

1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4use std::sync::Arc;
5use stormchaser_tls::TlsReloader;
6use uuid::Uuid;
7
8#[cfg(feature = "aws-lambda")]
9use crate::handler::fetch_step_instance;
10#[cfg(feature = "aws-lambda")]
11use crate::handler::handle_lambda_invoke;
12
13/// Attempts to dispatch an AWS Lambda execution step instance.
14pub async fn try_dispatch(
15    run_id: Uuid,
16    step_instance_id: Uuid,
17    step_type: &str,
18    resolved_spec: &Value,
19    pool: PgPool,
20    nats_client: async_nats::Client,
21    _tls_reloader: Arc<TlsReloader>,
22) -> Result<bool> {
23    if step_type == "LambdaInvoke" {
24        #[cfg(feature = "aws-lambda")]
25        {
26            let pool = pool.clone();
27            let nats_client = nats_client.clone();
28            let spec = resolved_spec.clone();
29
30            tokio::spawn(async move {
31                if let Err(e) = handle_lambda_invoke(
32                    run_id,
33                    step_instance_id,
34                    spec,
35                    pool.clone(),
36                    nats_client.clone(),
37                )
38                .await
39                {
40                    if let Ok(instance) = fetch_step_instance(step_instance_id, &pool).await {
41                        let machine = crate::step_machine::StepMachine::<
42                            crate::step_machine::state::Pending,
43                        >::from_instance(instance);
44                        if let Ok(mut conn) = pool.acquire().await {
45                            if let Ok(_machine) = machine
46                                .start("error-recovery".to_string(), &mut *conn)
47                                .await
48                            {
49                                if let Ok(instance) =
50                                    fetch_step_instance(step_instance_id, &pool).await
51                                {
52                                    let machine = crate::step_machine::StepMachine::<
53                                        crate::step_machine::state::Running,
54                                    >::from_instance(
55                                        instance
56                                    );
57                                    let _ = machine
58                                        .fail(
59                                            format!("Lambda invoke failed: {:?}", e),
60                                            None,
61                                            &mut *conn,
62                                        )
63                                        .await;
64                                }
65                            }
66                        }
67                    }
68                }
69            });
70            return Ok(true);
71        }
72        #[cfg(not(feature = "aws-lambda"))]
73        {
74            let _ = run_id;
75            let _ = step_instance_id;
76            let _ = resolved_spec;
77            let _ = pool;
78            let _ = nats_client;
79            return Ok(true); // Ignore if feature is not enabled
80        }
81    }
82
83    Ok(false)
84}