Skip to main content

stormchaser_engine/handler/integrations/
lambda.rs

1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4use uuid::Uuid;
5
6#[cfg(feature = "aws-lambda")]
7use crate::handler::fetch_step_instance;
8#[cfg(feature = "aws-lambda")]
9use anyhow::Context;
10#[cfg(feature = "aws-lambda")]
11use chrono::Utc;
12#[cfg(feature = "aws-lambda")]
13use stormchaser_model::dsl::{self};
14#[cfg(feature = "aws-lambda")]
15use tracing::info;
16
17#[cfg(feature = "aws-lambda")]
18use aws_sdk_lambda::primitives::Blob;
19#[cfg(feature = "aws-lambda")]
20use aws_sdk_lambda::types::InvocationType;
21
22#[cfg(feature = "aws-lambda")]
23/// Handle lambda invoke.
24pub async fn handle_lambda_invoke(
25    run_id: Uuid,
26    step_id: Uuid,
27    spec: Value,
28    pool: PgPool,
29    nats_client: async_nats::Client,
30) -> Result<()> {
31    use stormchaser_model::dsl::LambdaInvokeSpec;
32
33    let spec: LambdaInvokeSpec = serde_json::from_value(spec)?;
34
35    info!(
36        "Invoking Lambda function {} for run {}",
37        spec.function_name, run_id
38    );
39
40    // 1. Mark as Running
41    let instance = fetch_step_instance(step_id, &pool).await?;
42    let machine =
43        crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
44            instance,
45        );
46    let _ = machine
47        .start("aws-lambda".to_string(), &mut *pool.acquire().await?)
48        .await?;
49
50    let client = build_lambda_client(&spec, run_id).await?;
51
52    // 3. Prepare Payload
53    let payload_bytes = if let Some(payload) = spec.payload {
54        serde_json::to_vec(&payload)?
55    } else {
56        Vec::new()
57    };
58
59    // 4. Invoke
60    let mut request = client
61        .invoke()
62        .function_name(spec.function_name)
63        .payload(Blob::new(payload_bytes));
64
65    if let Some(inv_type) = spec.invocation_type {
66        let it = match inv_type.as_str() {
67            "Event" => InvocationType::Event,
68            "DryRun" => InvocationType::DryRun,
69            _ => InvocationType::RequestResponse,
70        };
71        request = request.invocation_type(it);
72    }
73
74    if let Some(qualifier) = spec.qualifier {
75        request = request.qualifier(qualifier);
76    }
77
78    let response = request.send().await?;
79
80    // 5. Handle Response
81    handle_lambda_response(run_id, step_id, response, pool, nats_client).await
82}
83
84#[cfg(feature = "aws-lambda")]
85async fn build_lambda_client(
86    spec: &dsl::LambdaInvokeSpec,
87    run_id: Uuid,
88) -> Result<aws_sdk_lambda::Client> {
89    let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::v2026_01_12());
90    if let Some(region) = &spec.region {
91        config_loader = config_loader.region(aws_config::Region::new(region.clone()));
92    }
93    let config = config_loader.load().await;
94
95    if let Some(role_arn) = &spec.assume_role_arn {
96        let sts_client = aws_sdk_sts::Client::new(&config);
97        let session_name = spec
98            .role_session_name
99            .clone()
100            .unwrap_or_else(|| format!("stormchaser-run-{}", run_id));
101
102        let assume_role_res = sts_client
103            .assume_role()
104            .role_arn(role_arn)
105            .role_session_name(session_name)
106            .send()
107            .await?;
108
109        let credentials = assume_role_res
110            .credentials()
111            .context("Missing credentials from assume_role")?;
112
113        let assumed_credentials = aws_sdk_lambda::config::Credentials::new(
114            credentials.access_key_id(),
115            credentials.secret_access_key(),
116            Some(credentials.session_token().to_string()),
117            None,
118            "StsAssumedRole",
119        );
120
121        let provider = aws_sdk_lambda::config::SharedCredentialsProvider::new(assumed_credentials);
122
123        let assumed_config = aws_sdk_lambda::config::Builder::from(&config)
124            .credentials_provider(provider)
125            .build();
126
127        Ok(aws_sdk_lambda::Client::from_conf(assumed_config))
128    } else {
129        Ok(aws_sdk_lambda::Client::new(&config))
130    }
131}
132
133#[cfg(feature = "aws-lambda")]
134async fn handle_lambda_response(
135    run_id: Uuid,
136    step_id: Uuid,
137    response: aws_sdk_lambda::operation::invoke::InvokeOutput,
138    pool: PgPool,
139    nats_client: async_nats::Client,
140) -> Result<()> {
141    let status_code = response.status_code();
142    let payload = if let Some(payload) = response.payload() {
143        let s = String::from_utf8_lossy(payload.as_ref());
144        serde_json::from_str::<Value>(&s).unwrap_or(Value::String(s.to_string()))
145    } else {
146        Value::Null
147    };
148
149    if (200..300).contains(&status_code) {
150        // Success
151        let instance = fetch_step_instance(step_id, &pool).await?;
152        let machine =
153            crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
154                instance,
155            );
156        let _ = machine.succeed(&mut *pool.acquire().await?).await?;
157
158        // Save output
159        crate::db::upsert_step_output(&pool, step_id, "response", &payload).await?;
160
161        let event = serde_json::json!({
162            "run_id": run_id,
163            "step_id": step_id,
164            "event_type": "step_completed",
165            "outputs": {
166                "response": payload,
167            },
168            "timestamp": Utc::now(),
169        });
170        let js = async_nats::jetstream::new(nats_client);
171        js.publish("stormchaser.step.completed", event.to_string().into())
172            .await?;
173    } else {
174        // Failure
175        let error_msg = format!(
176            "Lambda returned status code {}: {:?}",
177            status_code,
178            response.function_error()
179        );
180        let instance = fetch_step_instance(step_id, &pool).await?;
181        let machine =
182            crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
183                instance,
184            );
185        let _ = machine
186            .fail(error_msg.clone(), None, &mut *pool.acquire().await?)
187            .await?;
188
189        let event = serde_json::json!({
190            "run_id": run_id,
191            "step_id": step_id,
192            "event_type": "step_failed",
193            "error": error_msg,
194            "timestamp": Utc::now(),
195        });
196        let js = async_nats::jetstream::new(nats_client);
197        js.publish("stormchaser.step.failed", event.to_string().into())
198            .await?;
199    }
200
201    Ok(())
202}
203
204#[cfg(not(feature = "aws-lambda"))]
205/// Handle lambda invoke.
206pub async fn handle_lambda_invoke(
207    _run_id: Uuid,
208    _step_id: Uuid,
209    _spec: Value,
210    _pool: PgPool,
211    _nats_client: async_nats::Client,
212) -> Result<()> {
213    anyhow::bail!("AWS Lambda support is not enabled. Enable 'aws-lambda' feature.")
214}