stormchaser_engine/handler/integrations/
lambda.rs1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4
5#[cfg(feature = "aws-lambda")]
6use crate::handler::fetch_step_instance;
7#[cfg(feature = "aws-lambda")]
8use anyhow::Context;
9#[cfg(feature = "aws-lambda")]
10use chrono::Utc;
11#[cfg(feature = "aws-lambda")]
12use stormchaser_model::dsl::{self};
13#[cfg(feature = "aws-lambda")]
14use stormchaser_model::events::{
15 EventSource, EventType, SchemaVersion, StepEventType, StepFailedEvent,
16};
17#[cfg(feature = "aws-lambda")]
18use stormchaser_model::nats::NatsSubject;
19#[cfg(feature = "aws-lambda")]
20use tracing::info;
21
22#[cfg(feature = "aws-lambda")]
23use aws_sdk_lambda::primitives::Blob;
24#[cfg(feature = "aws-lambda")]
25use aws_sdk_lambda::types::InvocationType;
26
27#[cfg(feature = "aws-lambda")]
29pub async fn handle_lambda_invoke(
30 run_id: stormchaser_model::RunId,
31 step_id: stormchaser_model::StepInstanceId,
32 spec: Value,
33 pool: PgPool,
34 nats_client: async_nats::Client,
35) -> Result<()> {
36 let spec: stormchaser_model::dsl::LambdaInvokeSpec = serde_json::from_value(spec)?;
37
38 info!(
39 "Invoking Lambda function {} for run {}",
40 spec.function_name, run_id
41 );
42
43 let instance = fetch_step_instance(step_id, &pool).await?;
45 let machine =
46 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
47 instance,
48 );
49 let _ = machine
50 .start("aws-lambda".to_string(), &mut *pool.acquire().await?)
51 .await?;
52
53 let client = build_lambda_client(&spec, run_id).await?;
54
55 let payload_bytes = if let Some(payload) = spec.payload {
57 serde_json::to_vec(&payload)?
58 } else {
59 Vec::new()
60 };
61
62 let mut request = client
64 .invoke()
65 .function_name(spec.function_name)
66 .payload(Blob::new(payload_bytes));
67
68 if let Some(inv_type) = spec.invocation_type {
69 let it = match inv_type.as_str() {
70 "Event" => InvocationType::Event,
71 "DryRun" => InvocationType::DryRun,
72 _ => InvocationType::RequestResponse,
73 };
74 request = request.invocation_type(it);
75 }
76
77 if let Some(qualifier) = spec.qualifier {
78 request = request.qualifier(qualifier);
79 }
80
81 let response = request.send().await?;
82
83 handle_lambda_response(run_id, step_id, response, pool, nats_client).await
85}
86
87#[cfg(feature = "aws-lambda")]
88async fn build_lambda_client(
89 spec: &dsl::LambdaInvokeSpec,
90 run_id: stormchaser_model::RunId,
91) -> Result<aws_sdk_lambda::Client> {
92 let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::v2026_01_12());
93 if let Some(region) = &spec.region {
94 config_loader = config_loader.region(aws_config::Region::new(region.clone()));
95 }
96 let config = config_loader.load().await;
97
98 if let Some(role_arn) = &spec.assume_role_arn {
99 let sts_client = aws_sdk_sts::Client::new(&config);
100 let session_name = spec
101 .role_session_name
102 .clone()
103 .unwrap_or_else(|| format!("stormchaser-run-{}", run_id));
104
105 let assume_role_res = sts_client
106 .assume_role()
107 .role_arn(role_arn)
108 .role_session_name(session_name)
109 .send()
110 .await?;
111
112 let credentials = assume_role_res
113 .credentials()
114 .context("Missing credentials from assume_role")?;
115
116 let assumed_credentials = aws_sdk_lambda::config::Credentials::new(
117 credentials.access_key_id(),
118 credentials.secret_access_key(),
119 Some(credentials.session_token().to_string()),
120 None,
121 "StsAssumedRole",
122 );
123
124 let provider = aws_sdk_lambda::config::SharedCredentialsProvider::new(assumed_credentials);
125
126 let assumed_config = aws_sdk_lambda::config::Builder::from(&config)
127 .credentials_provider(provider)
128 .build();
129
130 Ok(aws_sdk_lambda::Client::from_conf(assumed_config))
131 } else {
132 Ok(aws_sdk_lambda::Client::new(&config))
133 }
134}
135
136#[cfg(feature = "aws-lambda")]
137async fn handle_lambda_response(
138 run_id: stormchaser_model::RunId,
139 step_id: stormchaser_model::StepInstanceId,
140 response: aws_sdk_lambda::operation::invoke::InvokeOutput,
141 pool: PgPool,
142 nats_client: async_nats::Client,
143) -> Result<()> {
144 let status_code = response.status_code();
145 let payload = if let Some(payload) = response.payload() {
146 let s = String::from_utf8_lossy(payload.as_ref());
147 serde_json::from_str::<Value>(&s).unwrap_or(Value::String(s.to_string()))
148 } else {
149 Value::Null
150 };
151
152 if (200..300).contains(&status_code) {
153 let instance = fetch_step_instance(step_id, &pool).await?;
155 let machine =
156 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
157 instance,
158 );
159 let _ = machine.succeed(&mut *pool.acquire().await?).await?;
160
161 crate::db::upsert_step_output(&pool, step_id, "response", &payload).await?;
163
164 let mut outputs_map = std::collections::HashMap::new();
165 outputs_map.insert("response".to_string(), payload.clone());
166
167 let event = stormchaser_model::events::StepCompletedEvent {
168 run_id,
169 step_id,
170 event_type: EventType::Step(StepEventType::Completed),
171 outputs: Some(outputs_map),
172 exit_code: Some(0),
173 runner_id: None,
174 storage_hashes: None,
175 artifacts: None,
176 test_reports: None,
177 timestamp: Utc::now(),
178 };
179 let js = async_nats::jetstream::new(nats_client);
180 stormchaser_model::nats::publish_cloudevent(
181 &js,
182 NatsSubject::StepCompleted,
183 EventType::Step(StepEventType::Completed),
184 EventSource::System,
185 serde_json::to_value(event).unwrap(),
186 Some(SchemaVersion::new("1.0".to_string())),
187 None,
188 )
189 .await?;
190 } else {
191 let error_msg = format!(
193 "Lambda returned status code {}: {:?}",
194 status_code,
195 response.function_error()
196 );
197 let instance = fetch_step_instance(step_id, &pool).await?;
198 let machine =
199 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
200 instance,
201 );
202 let _ = machine
203 .fail(error_msg.clone(), None, &mut *pool.acquire().await?)
204 .await?;
205
206 let event = StepFailedEvent {
207 run_id,
208 step_id,
209 event_type: EventType::Step(StepEventType::Failed),
210 error: error_msg,
211 runner_id: None,
212 exit_code: None,
213 storage_hashes: None,
214 artifacts: None,
215 test_reports: None,
216 outputs: None,
217 timestamp: Utc::now(),
218 };
219 let js = async_nats::jetstream::new(nats_client);
220 stormchaser_model::nats::publish_cloudevent(
221 &js,
222 NatsSubject::StepFailed,
223 EventType::Step(StepEventType::Failed),
224 EventSource::System,
225 serde_json::to_value(event).unwrap(),
226 Some(SchemaVersion::new("1.0".to_string())),
227 None,
228 )
229 .await?;
230 }
231
232 Ok(())
233}
234
235#[cfg(not(feature = "aws-lambda"))]
236pub async fn handle_lambda_invoke(
238 _run_id: stormchaser_model::RunId,
239 _step_id: stormchaser_model::StepInstanceId,
240 _spec: Value,
241 _pool: PgPool,
242 _nats_client: async_nats::Client,
243) -> Result<()> {
244 anyhow::bail!("AWS Lambda support is not enabled. Enable 'aws-lambda' feature.")
245}
246
247#[cfg(test)]
248mod tests {
249
250 #[tokio::test]
251 #[cfg(not(feature = "aws-lambda"))]
252 async fn test_handle_lambda_invoke_not_enabled() {
253 use uuid::Uuid;
254
255 let _run_id = Uuid::new_v4();
257 let _step_id = Uuid::new_v4();
258 let _spec = serde_json::json!({});
259 }
263}