stormchaser_engine/handler/step/intrinsic/
lambda.rs1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4use std::sync::Arc;
5use stormchaser_tls::TlsReloader;
6use uuid::Uuid;
7
8#[cfg(feature = "aws-lambda")]
9use crate::handler::fetch_step_instance;
10#[cfg(feature = "aws-lambda")]
11use crate::handler::handle_lambda_invoke;
12
13pub async fn try_dispatch(
15 run_id: Uuid,
16 step_instance_id: Uuid,
17 step_type: &str,
18 resolved_spec: &Value,
19 pool: PgPool,
20 nats_client: async_nats::Client,
21 _tls_reloader: Arc<TlsReloader>,
22) -> Result<bool> {
23 if step_type == "LambdaInvoke" {
24 #[cfg(feature = "aws-lambda")]
25 {
26 let pool = pool.clone();
27 let nats_client = nats_client.clone();
28 let spec = resolved_spec.clone();
29
30 tokio::spawn(async move {
31 if let Err(e) = handle_lambda_invoke(
32 run_id,
33 step_instance_id,
34 spec,
35 pool.clone(),
36 nats_client.clone(),
37 )
38 .await
39 {
40 if let Ok(instance) = fetch_step_instance(step_instance_id, &pool).await {
41 let machine = crate::step_machine::StepMachine::<
42 crate::step_machine::state::Pending,
43 >::from_instance(instance);
44 if let Ok(mut conn) = pool.acquire().await {
45 if let Ok(_machine) = machine
46 .start("error-recovery".to_string(), &mut *conn)
47 .await
48 {
49 if let Ok(instance) =
50 fetch_step_instance(step_instance_id, &pool).await
51 {
52 let machine = crate::step_machine::StepMachine::<
53 crate::step_machine::state::Running,
54 >::from_instance(
55 instance
56 );
57 let _ = machine
58 .fail(
59 format!("Lambda invoke failed: {:?}", e),
60 None,
61 &mut *conn,
62 )
63 .await;
64 }
65 }
66 }
67 }
68 }
69 });
70 return Ok(true);
71 }
72 #[cfg(not(feature = "aws-lambda"))]
73 {
74 let _ = run_id;
75 let _ = step_instance_id;
76 let _ = resolved_spec;
77 let _ = pool;
78 let _ = nats_client;
79 return Ok(true); }
81 }
82
83 Ok(false)
84}