Skip to main content

stormchaser_engine/handler/integrations/email/
mod.rs

1pub mod ses;
2pub mod smtp;
3pub mod test_report;
4
5pub use test_report::handle_test_report_email;
6
7use anyhow::Result;
8use serde_json::Value;
9use sqlx::PgPool;
10use std::sync::Arc;
11use stormchaser_tls::TlsReloader;
12
13#[cfg(feature = "email")]
14use crate::handler::{fetch_outputs, fetch_run_context, fetch_step_instance};
15#[cfg(feature = "email")]
16use chrono::Utc;
17#[cfg(feature = "email")]
18use stormchaser_model::dsl::{self, EmailBackend};
19#[cfg(feature = "email")]
20use stormchaser_model::workflow;
21#[cfg(feature = "email")]
22use tracing::{error, info};
23
24#[cfg(feature = "email")]
25/// Handle email send.
26pub async fn handle_email_send(
27    run_id: stormchaser_model::RunId,
28    step_id: stormchaser_model::StepInstanceId,
29    spec: Value,
30    pool: PgPool,
31    nats_client: async_nats::Client,
32    tls_reloader: Arc<TlsReloader>,
33) -> Result<()> {
34    let spec: stormchaser_model::dsl::EmailSpec = serde_json::from_value(spec)?;
35
36    info!(
37        "Sending email '{}' from {} for run {}",
38        spec.subject, spec.from, run_id
39    );
40
41    // 1. Mark as Running
42    let instance = fetch_step_instance(step_id, &pool).await?;
43    let machine =
44        crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
45            instance,
46        );
47    let _ = machine
48        .start("email".to_string(), &mut *pool.acquire().await?)
49        .await?;
50
51    // 2. Prepare Context for Template Rendering
52    let run_context: workflow::RunContext = fetch_run_context(run_id, &pool).await?;
53    let outputs: Value = fetch_outputs(run_id, &pool).await?;
54
55    let template_ctx = serde_json::json!({
56        "inputs": run_context.inputs,
57        "steps": outputs,
58        "run": {
59            "id": run_id.to_string(),
60        }
61    });
62
63    // 3. Render Body
64    let env = minijinja::Environment::new();
65    let rendered_body = env
66        .render_str(&spec.body, template_ctx)
67        .map_err(|e| anyhow::anyhow!("Failed to render email body: {:?}", e))?;
68
69    let is_html = spec.html.unwrap_or(false);
70    let backend = spec.backend.clone().unwrap_or(EmailBackend::Smtp);
71
72    match backend {
73        EmailBackend::Ses => {
74            send_via_ses(
75                run_id,
76                step_id,
77                &spec,
78                rendered_body,
79                is_html,
80                pool,
81                nats_client,
82            )
83            .await
84        }
85        EmailBackend::Smtp => {
86            send_via_smtp(
87                run_id,
88                step_id,
89                &spec,
90                rendered_body,
91                is_html,
92                pool,
93                nats_client,
94                tls_reloader,
95            )
96            .await
97        }
98    }
99}
100
101#[cfg(feature = "email")]
102async fn send_via_ses(
103    run_id: stormchaser_model::RunId,
104    step_id: stormchaser_model::StepInstanceId,
105    spec: &dsl::EmailSpec,
106    rendered_body: String,
107    is_html: bool,
108    pool: PgPool,
109    nats_client: async_nats::Client,
110) -> Result<()> {
111    #[cfg(feature = "aws-ses")]
112    {
113        match self::ses::send_email_ses(
114            spec.from.clone(),
115            spec.to.clone(),
116            spec.cc.clone(),
117            spec.bcc.clone(),
118            spec.subject.clone(),
119            rendered_body,
120            is_html,
121            spec.ses_region.clone(),
122            spec.ses_role_arn.clone(),
123            spec.ses_configuration_set_name.clone(),
124            run_id,
125        )
126        .await
127        {
128            Ok(_) => {
129                info!("Email sent via SES successfully for step {}", step_id);
130                complete_email_step(run_id, step_id, pool, nats_client).await
131            }
132            Err(e) => {
133                let error_msg = format!("Failed to send email via SES: {:?}", e);
134                fail_email_step(run_id, step_id, error_msg, pool, nats_client).await
135            }
136        }
137    }
138    #[cfg(not(feature = "aws-ses"))]
139    {
140        let _ = (
141            run_id,
142            step_id,
143            spec,
144            rendered_body,
145            is_html,
146            pool,
147            nats_client,
148        );
149        anyhow::bail!("SES backend requested but 'aws-ses' feature is not enabled.");
150    }
151}
152
153#[cfg(feature = "email")]
154#[allow(clippy::too_many_arguments)]
155async fn send_via_smtp(
156    run_id: stormchaser_model::RunId,
157    step_id: stormchaser_model::StepInstanceId,
158    spec: &dsl::EmailSpec,
159    rendered_body: String,
160    is_html: bool,
161    pool: PgPool,
162    nats_client: async_nats::Client,
163    _tls_reloader: Arc<TlsReloader>,
164) -> Result<()> {
165    use lettre::message::header::ContentType;
166    use lettre::{Message, Transport};
167
168    // 4. Build Email
169    let mut builder = Message::builder()
170        .from(spec.from.parse()?)
171        .subject(spec.subject.clone());
172
173    for to in &spec.to {
174        builder = builder.to(to.parse()?);
175    }
176
177    if let Some(ref ccs) = spec.cc {
178        for cc in ccs {
179            builder = builder.cc(cc.parse()?);
180        }
181    }
182
183    if let Some(ref bccs) = spec.bcc {
184        for bcc in bccs {
185            builder = builder.bcc(bcc.parse()?);
186        }
187    }
188    let message = if is_html {
189        builder.header(ContentType::TEXT_HTML).body(rendered_body)?
190    } else {
191        builder
192            .header(ContentType::TEXT_PLAIN)
193            .body(rendered_body)?
194    };
195
196    // 5. Send Email
197    let smtp_params = self::smtp::SmtpParams {
198        server: spec.smtp_server.clone().unwrap_or_else(|| {
199            std::env::var("SMTP_SERVER").unwrap_or_else(|_| "localhost".to_string())
200        }),
201        port: spec.smtp_port.unwrap_or_else(|| {
202            std::env::var("SMTP_PORT")
203                .ok()
204                .and_then(|p| p.parse().ok())
205                .unwrap_or(25)
206        }),
207        username: spec
208            .smtp_username
209            .clone()
210            .or_else(|| std::env::var("SMTP_USERNAME").ok()),
211        password: spec
212            .smtp_password
213            .clone()
214            .or_else(|| std::env::var("SMTP_PASSWORD").ok()),
215        use_tls: spec
216            .smtp_use_tls
217            .unwrap_or_else(|| std::env::var("SMTP_USE_TLS").unwrap_or_default() == "true"),
218        use_mtls: spec
219            .smtp_use_mtls
220            .unwrap_or_else(|| std::env::var("SMTP_USE_MTLS").unwrap_or_default() == "true"),
221    };
222
223    let mailer = self::smtp::build_smtp_transport(smtp_params)?;
224
225    match mailer.send(&message) {
226        Ok(_) => {
227            info!("Email sent successfully for step {}", step_id);
228            complete_email_step(run_id, step_id, pool, nats_client).await
229        }
230        Err(e) => {
231            let error_msg = format!("Failed to send email: {:?}", e);
232            fail_email_step(run_id, step_id, error_msg, pool, nats_client).await
233        }
234    }
235}
236
237#[cfg(feature = "email")]
238pub(crate) async fn complete_email_step(
239    run_id: stormchaser_model::RunId,
240    step_id: stormchaser_model::StepInstanceId,
241    pool: PgPool,
242    nats_client: async_nats::Client,
243) -> Result<()> {
244    let instance = fetch_step_instance(step_id, &pool).await?;
245    let machine =
246        crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
247            instance,
248        );
249    let _ = machine.succeed(&mut *pool.acquire().await?).await?;
250
251    let event = serde_json::json!({
252        "run_id": run_id,
253        "step_id": step_id,
254        "event_type": "step_completed",
255        "timestamp": Utc::now(),
256    });
257    let js = async_nats::jetstream::new(nats_client);
258    js.publish("stormchaser.step.completed", event.to_string().into())
259        .await?;
260    Ok(())
261}
262
263#[cfg(feature = "email")]
264pub(crate) async fn fail_email_step(
265    run_id: stormchaser_model::RunId,
266    step_id: stormchaser_model::StepInstanceId,
267    error_msg: String,
268    pool: PgPool,
269    nats_client: async_nats::Client,
270) -> Result<()> {
271    error!("{}", error_msg);
272    let instance = fetch_step_instance(step_id, &pool).await?;
273    let machine =
274        crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
275            instance,
276        );
277    let _ = machine
278        .fail(error_msg.clone(), None, &mut *pool.acquire().await?)
279        .await?;
280
281    let event = serde_json::json!({
282        "run_id": run_id,
283        "step_id": step_id,
284        "event_type": "step_failed",
285        "error": error_msg,
286        "timestamp": Utc::now(),
287    });
288    let js = async_nats::jetstream::new(nats_client);
289    js.publish("stormchaser.step.failed", event.to_string().into())
290        .await?;
291    Ok(())
292}
293
294#[cfg(not(feature = "email"))]
295/// Handle email send.
296pub async fn handle_email_send(
297    _run_id: stormchaser_model::RunId,
298    _step_id: stormchaser_model::StepInstanceId,
299    _spec: Value,
300    _pool: PgPool,
301    _nats_client: async_nats::Client,
302    __tls_reloader: Arc<TlsReloader>,
303) -> Result<()> {
304    anyhow::bail!("Email support is not enabled. Enable 'email' feature.")
305}