stormchaser_engine/handler/step/intrinsic/
lambda.rs1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4use std::sync::Arc;
5use stormchaser_model::RunId;
6use stormchaser_model::StepInstanceId;
7use stormchaser_tls::TlsReloader;
8
9#[cfg(feature = "aws-lambda")]
10use crate::handler::fetch_step_instance;
11#[cfg(feature = "aws-lambda")]
12use crate::handler::handle_lambda_invoke;
13
14pub async fn try_dispatch(
16 run_id: RunId,
17 step_instance_id: StepInstanceId,
18 step_type: &str,
19 resolved_spec: &Value,
20 pool: PgPool,
21 nats_client: async_nats::Client,
22 _tls_reloader: Arc<TlsReloader>,
23) -> Result<bool> {
24 if step_type == "LambdaInvoke" {
25 #[cfg(feature = "aws-lambda")]
26 {
27 let pool = pool.clone();
28 let nats_client = nats_client.clone();
29 let spec = resolved_spec.clone();
30
31 tokio::spawn(async move {
32 if let Err(e) = handle_lambda_invoke(
33 run_id,
34 step_instance_id,
35 spec,
36 pool.clone(),
37 nats_client.clone(),
38 )
39 .await
40 {
41 if let Ok(instance) = fetch_step_instance(step_instance_id, &pool).await {
42 let machine = crate::step_machine::StepMachine::<
43 crate::step_machine::state::Pending,
44 >::from_instance(instance);
45 if let Ok(mut conn) = pool.acquire().await {
46 if let Ok(_machine) = machine
47 .start("error-recovery".to_string(), &mut *conn)
48 .await
49 {
50 if let Ok(instance) =
51 fetch_step_instance(step_instance_id, &pool).await
52 {
53 let machine = crate::step_machine::StepMachine::<
54 crate::step_machine::state::Running,
55 >::from_instance(
56 instance
57 );
58 let _ = machine
59 .fail(
60 format!("Lambda invoke failed: {:?}", e),
61 None,
62 &mut *conn,
63 )
64 .await;
65 }
66 }
67 }
68 }
69 }
70 });
71 return Ok(true);
72 }
73 #[cfg(not(feature = "aws-lambda"))]
74 {
75 let _ = run_id;
76 let _ = step_instance_id;
77 let _ = resolved_spec;
78 let _ = pool;
79 let _ = nats_client;
80 return Ok(true); }
82 }
83
84 Ok(false)
85}