stormchaser_engine/handler/integrations/
lambda.rs1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4use uuid::Uuid;
5
6#[cfg(feature = "aws-lambda")]
7use crate::handler::fetch_step_instance;
8#[cfg(feature = "aws-lambda")]
9use anyhow::Context;
10#[cfg(feature = "aws-lambda")]
11use chrono::Utc;
12#[cfg(feature = "aws-lambda")]
13use stormchaser_model::dsl::{self};
14#[cfg(feature = "aws-lambda")]
15use tracing::info;
16
17#[cfg(feature = "aws-lambda")]
18use aws_sdk_lambda::primitives::Blob;
19#[cfg(feature = "aws-lambda")]
20use aws_sdk_lambda::types::InvocationType;
21
22#[cfg(feature = "aws-lambda")]
23pub async fn handle_lambda_invoke(
25 run_id: Uuid,
26 step_id: Uuid,
27 spec: Value,
28 pool: PgPool,
29 nats_client: async_nats::Client,
30) -> Result<()> {
31 use stormchaser_model::dsl::LambdaInvokeSpec;
32
33 let spec: LambdaInvokeSpec = serde_json::from_value(spec)?;
34
35 info!(
36 "Invoking Lambda function {} for run {}",
37 spec.function_name, run_id
38 );
39
40 let instance = fetch_step_instance(step_id, &pool).await?;
42 let machine =
43 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
44 instance,
45 );
46 let _ = machine
47 .start("aws-lambda".to_string(), &mut *pool.acquire().await?)
48 .await?;
49
50 let client = build_lambda_client(&spec, run_id).await?;
51
52 let payload_bytes = if let Some(payload) = spec.payload {
54 serde_json::to_vec(&payload)?
55 } else {
56 Vec::new()
57 };
58
59 let mut request = client
61 .invoke()
62 .function_name(spec.function_name)
63 .payload(Blob::new(payload_bytes));
64
65 if let Some(inv_type) = spec.invocation_type {
66 let it = match inv_type.as_str() {
67 "Event" => InvocationType::Event,
68 "DryRun" => InvocationType::DryRun,
69 _ => InvocationType::RequestResponse,
70 };
71 request = request.invocation_type(it);
72 }
73
74 if let Some(qualifier) = spec.qualifier {
75 request = request.qualifier(qualifier);
76 }
77
78 let response = request.send().await?;
79
80 handle_lambda_response(run_id, step_id, response, pool, nats_client).await
82}
83
84#[cfg(feature = "aws-lambda")]
85async fn build_lambda_client(
86 spec: &dsl::LambdaInvokeSpec,
87 run_id: Uuid,
88) -> Result<aws_sdk_lambda::Client> {
89 let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::v2026_01_12());
90 if let Some(region) = &spec.region {
91 config_loader = config_loader.region(aws_config::Region::new(region.clone()));
92 }
93 let config = config_loader.load().await;
94
95 if let Some(role_arn) = &spec.assume_role_arn {
96 let sts_client = aws_sdk_sts::Client::new(&config);
97 let session_name = spec
98 .role_session_name
99 .clone()
100 .unwrap_or_else(|| format!("stormchaser-run-{}", run_id));
101
102 let assume_role_res = sts_client
103 .assume_role()
104 .role_arn(role_arn)
105 .role_session_name(session_name)
106 .send()
107 .await?;
108
109 let credentials = assume_role_res
110 .credentials()
111 .context("Missing credentials from assume_role")?;
112
113 let assumed_credentials = aws_sdk_lambda::config::Credentials::new(
114 credentials.access_key_id(),
115 credentials.secret_access_key(),
116 Some(credentials.session_token().to_string()),
117 None,
118 "StsAssumedRole",
119 );
120
121 let provider = aws_sdk_lambda::config::SharedCredentialsProvider::new(assumed_credentials);
122
123 let assumed_config = aws_sdk_lambda::config::Builder::from(&config)
124 .credentials_provider(provider)
125 .build();
126
127 Ok(aws_sdk_lambda::Client::from_conf(assumed_config))
128 } else {
129 Ok(aws_sdk_lambda::Client::new(&config))
130 }
131}
132
133#[cfg(feature = "aws-lambda")]
134async fn handle_lambda_response(
135 run_id: Uuid,
136 step_id: Uuid,
137 response: aws_sdk_lambda::operation::invoke::InvokeOutput,
138 pool: PgPool,
139 nats_client: async_nats::Client,
140) -> Result<()> {
141 let status_code = response.status_code();
142 let payload = if let Some(payload) = response.payload() {
143 let s = String::from_utf8_lossy(payload.as_ref());
144 serde_json::from_str::<Value>(&s).unwrap_or(Value::String(s.to_string()))
145 } else {
146 Value::Null
147 };
148
149 if (200..300).contains(&status_code) {
150 let instance = fetch_step_instance(step_id, &pool).await?;
152 let machine =
153 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
154 instance,
155 );
156 let _ = machine.succeed(&mut *pool.acquire().await?).await?;
157
158 crate::db::upsert_step_output(&pool, step_id, "response", &payload).await?;
160
161 let event = serde_json::json!({
162 "run_id": run_id,
163 "step_id": step_id,
164 "event_type": "step_completed",
165 "outputs": {
166 "response": payload,
167 },
168 "timestamp": Utc::now(),
169 });
170 let js = async_nats::jetstream::new(nats_client);
171 js.publish("stormchaser.step.completed", event.to_string().into())
172 .await?;
173 } else {
174 let error_msg = format!(
176 "Lambda returned status code {}: {:?}",
177 status_code,
178 response.function_error()
179 );
180 let instance = fetch_step_instance(step_id, &pool).await?;
181 let machine =
182 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
183 instance,
184 );
185 let _ = machine
186 .fail(error_msg.clone(), None, &mut *pool.acquire().await?)
187 .await?;
188
189 let event = serde_json::json!({
190 "run_id": run_id,
191 "step_id": step_id,
192 "event_type": "step_failed",
193 "error": error_msg,
194 "timestamp": Utc::now(),
195 });
196 let js = async_nats::jetstream::new(nats_client);
197 js.publish("stormchaser.step.failed", event.to_string().into())
198 .await?;
199 }
200
201 Ok(())
202}
203
204#[cfg(not(feature = "aws-lambda"))]
205pub async fn handle_lambda_invoke(
207 _run_id: Uuid,
208 _step_id: Uuid,
209 _spec: Value,
210 _pool: PgPool,
211 _nats_client: async_nats::Client,
212) -> Result<()> {
213 anyhow::bail!("AWS Lambda support is not enabled. Enable 'aws-lambda' feature.")
214}