stormchaser_engine/handler/integrations/
lambda.rs1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4
5#[cfg(feature = "aws-lambda")]
6use crate::handler::fetch_step_instance;
7#[cfg(feature = "aws-lambda")]
8use anyhow::Context;
9#[cfg(feature = "aws-lambda")]
10use chrono::Utc;
11#[cfg(feature = "aws-lambda")]
12use stormchaser_model::dsl::{self};
13#[cfg(feature = "aws-lambda")]
14use tracing::info;
15
16#[cfg(feature = "aws-lambda")]
17use aws_sdk_lambda::primitives::Blob;
18#[cfg(feature = "aws-lambda")]
19use aws_sdk_lambda::types::InvocationType;
20
21#[cfg(feature = "aws-lambda")]
22pub async fn handle_lambda_invoke(
24 run_id: stormchaser_model::RunId,
25 step_id: stormchaser_model::StepInstanceId,
26 spec: Value,
27 pool: PgPool,
28 nats_client: async_nats::Client,
29) -> Result<()> {
30 let spec: stormchaser_model::dsl::LambdaInvokeSpec = serde_json::from_value(spec)?;
31
32 info!(
33 "Invoking Lambda function {} for run {}",
34 spec.function_name, run_id
35 );
36
37 let instance = fetch_step_instance(step_id, &pool).await?;
39 let machine =
40 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
41 instance,
42 );
43 let _ = machine
44 .start("aws-lambda".to_string(), &mut *pool.acquire().await?)
45 .await?;
46
47 let client = build_lambda_client(&spec, run_id).await?;
48
49 let payload_bytes = if let Some(payload) = spec.payload {
51 serde_json::to_vec(&payload)?
52 } else {
53 Vec::new()
54 };
55
56 let mut request = client
58 .invoke()
59 .function_name(spec.function_name)
60 .payload(Blob::new(payload_bytes));
61
62 if let Some(inv_type) = spec.invocation_type {
63 let it = match inv_type.as_str() {
64 "Event" => InvocationType::Event,
65 "DryRun" => InvocationType::DryRun,
66 _ => InvocationType::RequestResponse,
67 };
68 request = request.invocation_type(it);
69 }
70
71 if let Some(qualifier) = spec.qualifier {
72 request = request.qualifier(qualifier);
73 }
74
75 let response = request.send().await?;
76
77 handle_lambda_response(run_id, step_id, response, pool, nats_client).await
79}
80
81#[cfg(feature = "aws-lambda")]
82async fn build_lambda_client(
83 spec: &dsl::LambdaInvokeSpec,
84 run_id: stormchaser_model::RunId,
85) -> Result<aws_sdk_lambda::Client> {
86 let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::v2026_01_12());
87 if let Some(region) = &spec.region {
88 config_loader = config_loader.region(aws_config::Region::new(region.clone()));
89 }
90 let config = config_loader.load().await;
91
92 if let Some(role_arn) = &spec.assume_role_arn {
93 let sts_client = aws_sdk_sts::Client::new(&config);
94 let session_name = spec
95 .role_session_name
96 .clone()
97 .unwrap_or_else(|| format!("stormchaser-run-{}", run_id));
98
99 let assume_role_res = sts_client
100 .assume_role()
101 .role_arn(role_arn)
102 .role_session_name(session_name)
103 .send()
104 .await?;
105
106 let credentials = assume_role_res
107 .credentials()
108 .context("Missing credentials from assume_role")?;
109
110 let assumed_credentials = aws_sdk_lambda::config::Credentials::new(
111 credentials.access_key_id(),
112 credentials.secret_access_key(),
113 Some(credentials.session_token().to_string()),
114 None,
115 "StsAssumedRole",
116 );
117
118 let provider = aws_sdk_lambda::config::SharedCredentialsProvider::new(assumed_credentials);
119
120 let assumed_config = aws_sdk_lambda::config::Builder::from(&config)
121 .credentials_provider(provider)
122 .build();
123
124 Ok(aws_sdk_lambda::Client::from_conf(assumed_config))
125 } else {
126 Ok(aws_sdk_lambda::Client::new(&config))
127 }
128}
129
130#[cfg(feature = "aws-lambda")]
131async fn handle_lambda_response(
132 run_id: stormchaser_model::RunId,
133 step_id: stormchaser_model::StepInstanceId,
134 response: aws_sdk_lambda::operation::invoke::InvokeOutput,
135 pool: PgPool,
136 nats_client: async_nats::Client,
137) -> Result<()> {
138 let status_code = response.status_code();
139 let payload = if let Some(payload) = response.payload() {
140 let s = String::from_utf8_lossy(payload.as_ref());
141 serde_json::from_str::<Value>(&s).unwrap_or(Value::String(s.to_string()))
142 } else {
143 Value::Null
144 };
145
146 if (200..300).contains(&status_code) {
147 let instance = fetch_step_instance(step_id, &pool).await?;
149 let machine =
150 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
151 instance,
152 );
153 let _ = machine.succeed(&mut *pool.acquire().await?).await?;
154
155 crate::db::upsert_step_output(&pool, step_id, "response", &payload).await?;
157
158 let mut outputs_map = std::collections::HashMap::new();
159 outputs_map.insert("response".to_string(), payload.clone());
160
161 let event = stormchaser_model::events::StepCompletedEvent {
162 run_id,
163 step_id,
164 event_type: "stormchaser.v1.step.completed".to_string(),
165 outputs: Some(outputs_map),
166 exit_code: Some(0),
167 runner_id: None,
168 storage_hashes: None,
169 artifacts: None,
170 test_reports: None,
171 timestamp: Utc::now(),
172 };
173 let js = async_nats::jetstream::new(nats_client);
174 stormchaser_model::nats::publish_cloudevent(
175 &js,
176 "stormchaser.v1.step.completed",
177 "stormchaser.v1.step.completed",
178 "/stormchaser",
179 serde_json::to_value(event).unwrap(),
180 Some("1.0"),
181 None,
182 )
183 .await?;
184 } else {
185 let error_msg = format!(
187 "Lambda returned status code {}: {:?}",
188 status_code,
189 response.function_error()
190 );
191 let instance = fetch_step_instance(step_id, &pool).await?;
192 let machine =
193 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
194 instance,
195 );
196 let _ = machine
197 .fail(error_msg.clone(), None, &mut *pool.acquire().await?)
198 .await?;
199
200 let event = serde_json::json!({
201 "run_id": run_id,
202 "step_id": step_id,
203 "event_type": "step_failed",
204 "error": error_msg,
205 "timestamp": Utc::now(),
206 });
207 let js = async_nats::jetstream::new(nats_client);
208 stormchaser_model::nats::publish_cloudevent(
209 &js,
210 "stormchaser.v1.step.failed",
211 "stormchaser.v1.step.failed",
212 "/stormchaser",
213 serde_json::to_value(event).unwrap(),
214 Some("1.0"),
215 None,
216 )
217 .await?;
218 }
219
220 Ok(())
221}
222
223#[cfg(not(feature = "aws-lambda"))]
224pub async fn handle_lambda_invoke(
226 _run_id: stormchaser_model::RunId,
227 _step_id: stormchaser_model::StepInstanceId,
228 _spec: Value,
229 _pool: PgPool,
230 _nats_client: async_nats::Client,
231) -> Result<()> {
232 anyhow::bail!("AWS Lambda support is not enabled. Enable 'aws-lambda' feature.")
233}
234
235#[cfg(test)]
236mod tests {
237
238 #[tokio::test]
239 #[cfg(not(feature = "aws-lambda"))]
240 async fn test_handle_lambda_invoke_not_enabled() {
241 use uuid::Uuid;
242
243 let _run_id = Uuid::new_v4();
245 let _step_id = Uuid::new_v4();
246 let _spec = serde_json::json!({});
247 }
251}