stormchaser_engine/handler/integrations/email/
mod.rs1pub mod ses;
2pub mod smtp;
3pub mod test_report;
4
5pub use test_report::handle_test_report_email;
6
7use anyhow::Result;
8use serde_json::Value;
9use sqlx::PgPool;
10use std::sync::Arc;
11use stormchaser_tls::TlsReloader;
12
13#[cfg(feature = "email")]
14use crate::handler::{fetch_outputs, fetch_run_context, fetch_step_instance};
15#[cfg(feature = "email")]
16use chrono::Utc;
17#[cfg(feature = "email")]
18use stormchaser_model::dsl::{self, EmailBackend};
19#[cfg(feature = "email")]
20use stormchaser_model::workflow;
21#[cfg(feature = "email")]
22use tracing::{error, info};
23
24#[cfg(feature = "email")]
25pub async fn handle_email_send(
27 run_id: stormchaser_model::RunId,
28 step_id: stormchaser_model::StepInstanceId,
29 spec: Value,
30 pool: PgPool,
31 nats_client: async_nats::Client,
32 tls_reloader: Arc<TlsReloader>,
33) -> Result<()> {
34 let spec: stormchaser_model::dsl::EmailSpec = serde_json::from_value(spec)?;
35
36 info!(
37 "Sending email '{}' from {} for run {}",
38 spec.subject, spec.from, run_id
39 );
40
41 let instance = fetch_step_instance(step_id, &pool).await?;
43 let machine =
44 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
45 instance,
46 );
47 let _ = machine
48 .start("email".to_string(), &mut *pool.acquire().await?)
49 .await?;
50
51 let run_context: workflow::RunContext = fetch_run_context(run_id, &pool).await?;
53 let outputs: Value = fetch_outputs(run_id, &pool).await?;
54
55 let template_ctx = serde_json::json!({
56 "inputs": run_context.inputs,
57 "steps": outputs,
58 "run": {
59 "id": run_id.to_string(),
60 }
61 });
62
63 let env = minijinja::Environment::new();
65 let rendered_body = env
66 .render_str(&spec.body, template_ctx)
67 .map_err(|e| anyhow::anyhow!("Failed to render email body: {:?}", e))?;
68
69 let is_html = spec.html.unwrap_or(false);
70 let backend = spec.backend.clone().unwrap_or(EmailBackend::Smtp);
71
72 match backend {
73 EmailBackend::Ses => {
74 send_via_ses(
75 run_id,
76 step_id,
77 &spec,
78 rendered_body,
79 is_html,
80 pool,
81 nats_client,
82 )
83 .await
84 }
85 EmailBackend::Smtp => {
86 send_via_smtp(
87 run_id,
88 step_id,
89 &spec,
90 rendered_body,
91 is_html,
92 pool,
93 nats_client,
94 tls_reloader,
95 )
96 .await
97 }
98 }
99}
100
101#[cfg(feature = "email")]
102async fn send_via_ses(
103 run_id: stormchaser_model::RunId,
104 step_id: stormchaser_model::StepInstanceId,
105 spec: &dsl::EmailSpec,
106 rendered_body: String,
107 is_html: bool,
108 pool: PgPool,
109 nats_client: async_nats::Client,
110) -> Result<()> {
111 #[cfg(feature = "aws-ses")]
112 {
113 match self::ses::send_email_ses(
114 spec.from.clone(),
115 spec.to.clone(),
116 spec.cc.clone(),
117 spec.bcc.clone(),
118 spec.subject.clone(),
119 rendered_body,
120 is_html,
121 spec.ses_region.clone(),
122 spec.ses_role_arn.clone(),
123 spec.ses_configuration_set_name.clone(),
124 run_id,
125 )
126 .await
127 {
128 Ok(_) => {
129 info!("Email sent via SES successfully for step {}", step_id);
130 complete_email_step(run_id, step_id, pool, nats_client).await
131 }
132 Err(e) => {
133 let error_msg = format!("Failed to send email via SES: {:?}", e);
134 fail_email_step(run_id, step_id, error_msg, pool, nats_client).await
135 }
136 }
137 }
138 #[cfg(not(feature = "aws-ses"))]
139 {
140 let _ = (
141 run_id,
142 step_id,
143 spec,
144 rendered_body,
145 is_html,
146 pool,
147 nats_client,
148 );
149 anyhow::bail!("SES backend requested but 'aws-ses' feature is not enabled.");
150 }
151}
152
153#[cfg(feature = "email")]
154#[allow(clippy::too_many_arguments)]
155async fn send_via_smtp(
156 run_id: stormchaser_model::RunId,
157 step_id: stormchaser_model::StepInstanceId,
158 spec: &dsl::EmailSpec,
159 rendered_body: String,
160 is_html: bool,
161 pool: PgPool,
162 nats_client: async_nats::Client,
163 _tls_reloader: Arc<TlsReloader>,
164) -> Result<()> {
165 use lettre::message::header::ContentType;
166 use lettre::{Message, Transport};
167
168 let mut builder = Message::builder()
170 .from(spec.from.parse()?)
171 .subject(spec.subject.clone());
172
173 for to in &spec.to {
174 builder = builder.to(to.parse()?);
175 }
176
177 if let Some(ref ccs) = spec.cc {
178 for cc in ccs {
179 builder = builder.cc(cc.parse()?);
180 }
181 }
182
183 if let Some(ref bccs) = spec.bcc {
184 for bcc in bccs {
185 builder = builder.bcc(bcc.parse()?);
186 }
187 }
188 let message = if is_html {
189 builder.header(ContentType::TEXT_HTML).body(rendered_body)?
190 } else {
191 builder
192 .header(ContentType::TEXT_PLAIN)
193 .body(rendered_body)?
194 };
195
196 let smtp_params = self::smtp::SmtpParams {
198 server: spec.smtp_server.clone().unwrap_or_else(|| {
199 std::env::var("SMTP_SERVER").unwrap_or_else(|_| "localhost".to_string())
200 }),
201 port: spec.smtp_port.unwrap_or_else(|| {
202 std::env::var("SMTP_PORT")
203 .ok()
204 .and_then(|p| p.parse().ok())
205 .unwrap_or(25)
206 }),
207 username: spec
208 .smtp_username
209 .clone()
210 .or_else(|| std::env::var("SMTP_USERNAME").ok()),
211 password: spec
212 .smtp_password
213 .clone()
214 .or_else(|| std::env::var("SMTP_PASSWORD").ok()),
215 use_tls: spec
216 .smtp_use_tls
217 .unwrap_or_else(|| std::env::var("SMTP_USE_TLS").unwrap_or_default() == "true"),
218 use_mtls: spec
219 .smtp_use_mtls
220 .unwrap_or_else(|| std::env::var("SMTP_USE_MTLS").unwrap_or_default() == "true"),
221 };
222
223 let mailer = self::smtp::build_smtp_transport(smtp_params)?;
224
225 match mailer.send(&message) {
226 Ok(_) => {
227 info!("Email sent successfully for step {}", step_id);
228 complete_email_step(run_id, step_id, pool, nats_client).await
229 }
230 Err(e) => {
231 let error_msg = format!("Failed to send email: {:?}", e);
232 fail_email_step(run_id, step_id, error_msg, pool, nats_client).await
233 }
234 }
235}
236
237#[cfg(feature = "email")]
238pub(crate) async fn complete_email_step(
239 run_id: stormchaser_model::RunId,
240 step_id: stormchaser_model::StepInstanceId,
241 pool: PgPool,
242 nats_client: async_nats::Client,
243) -> Result<()> {
244 let instance = fetch_step_instance(step_id, &pool).await?;
245 let machine =
246 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
247 instance,
248 );
249 let _ = machine.succeed(&mut *pool.acquire().await?).await?;
250
251 let event = serde_json::json!({
252 "run_id": run_id,
253 "step_id": step_id,
254 "event_type": "step_completed",
255 "timestamp": Utc::now(),
256 });
257 let js = async_nats::jetstream::new(nats_client);
258 js.publish("stormchaser.step.completed", event.to_string().into())
259 .await?;
260 Ok(())
261}
262
263#[cfg(feature = "email")]
264pub(crate) async fn fail_email_step(
265 run_id: stormchaser_model::RunId,
266 step_id: stormchaser_model::StepInstanceId,
267 error_msg: String,
268 pool: PgPool,
269 nats_client: async_nats::Client,
270) -> Result<()> {
271 error!("{}", error_msg);
272 let instance = fetch_step_instance(step_id, &pool).await?;
273 let machine =
274 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
275 instance,
276 );
277 let _ = machine
278 .fail(error_msg.clone(), None, &mut *pool.acquire().await?)
279 .await?;
280
281 let event = serde_json::json!({
282 "run_id": run_id,
283 "step_id": step_id,
284 "event_type": "step_failed",
285 "error": error_msg,
286 "timestamp": Utc::now(),
287 });
288 let js = async_nats::jetstream::new(nats_client);
289 js.publish("stormchaser.step.failed", event.to_string().into())
290 .await?;
291 Ok(())
292}
293
294#[cfg(not(feature = "email"))]
295pub async fn handle_email_send(
297 _run_id: stormchaser_model::RunId,
298 _step_id: stormchaser_model::StepInstanceId,
299 _spec: Value,
300 _pool: PgPool,
301 _nats_client: async_nats::Client,
302 __tls_reloader: Arc<TlsReloader>,
303) -> Result<()> {
304 anyhow::bail!("Email support is not enabled. Enable 'email' feature.")
305}