taskflow_rs/executor/handlers/
notification.rs1use lettre::{
2 Transport,
3 message::{Mailbox, MessageBuilder},
4 transport::smtp::{SmtpTransport, authentication::Credentials, client::Tls},
5};
6use reqwest::Client;
7use tracing::{error, info};
8
9use crate::error::{Result, TaskFlowError};
10use crate::task::{Task, TaskHandler, TaskResult};
11
12pub struct NotificationTaskHandler {
13 client: Client,
14}
15
16impl NotificationTaskHandler {
17 pub fn new() -> Self {
18 Self {
19 client: Client::new(),
20 }
21 }
22}
23
24#[async_trait::async_trait]
25impl TaskHandler for NotificationTaskHandler {
26 fn task_type(&self) -> &'static str {
27 "notification"
28 }
29
30 async fn execute(&self, task: &Task) -> Result<TaskResult> {
31 let notification_type = task
32 .definition
33 .payload
34 .get("type")
35 .and_then(|v| v.as_str())
36 .ok_or_else(|| {
37 TaskFlowError::InvalidConfiguration("Missing notification type".to_string())
38 })?;
39
40 info!(
41 "Sending {} notification: {}",
42 notification_type, task.definition.id
43 );
44
45 match notification_type {
46 "webhook" => self.send_webhook(task).await,
47 "email" => self.send_email(task).await,
48 "slack" => self.send_slack(task).await,
49 "discord" => self.send_discord(task).await,
50 nt => Err(TaskFlowError::InvalidConfiguration(format!(
51 "Unknown notification type: {}",
52 nt
53 ))),
54 }
55 }
56}
57
58impl NotificationTaskHandler {
59 async fn send_webhook(&self, task: &Task) -> Result<TaskResult> {
60 let url = task
61 .definition
62 .payload
63 .get("url")
64 .and_then(|v| v.as_str())
65 .ok_or_else(|| {
66 TaskFlowError::InvalidConfiguration("Missing webhook URL".to_string())
67 })?;
68
69 let payload = task.definition.payload.get("payload").cloned();
70
71 let response = self
72 .client
73 .post(url)
74 .json(&payload.unwrap_or(serde_json::Value::Object(Default::default())))
75 .send()
76 .await
77 .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
78
79 if !response.status().is_success() {
80 return Err(TaskFlowError::ExecutionError(format!(
81 "Webhook failed with status: {}",
82 response.status()
83 )));
84 }
85
86 Ok(TaskResult {
87 success: true,
88 output: Some("Webhook sent successfully".to_string()),
89 error: None,
90 execution_time_ms: 0,
91 metadata: Default::default(),
92 })
93 }
94
95 async fn send_email(&self, task: &Task) -> Result<TaskResult> {
96 let smtp_server = task
97 .definition
98 .payload
99 .get("smtp_server")
100 .and_then(|v| v.as_str())
101 .ok_or_else(|| {
102 TaskFlowError::InvalidConfiguration("Missing SMTP server".to_string())
103 })?;
104
105 let from_email = task
106 .definition
107 .payload
108 .get("from")
109 .and_then(|v| v.as_str())
110 .ok_or_else(|| TaskFlowError::InvalidConfiguration("Missing from email".to_string()))?;
111
112 let to_email = task
113 .definition
114 .payload
115 .get("to")
116 .and_then(|v| v.as_str())
117 .ok_or_else(|| TaskFlowError::InvalidConfiguration("Missing to email".to_string()))?;
118
119 let subject = task
120 .definition
121 .payload
122 .get("subject")
123 .and_then(|v| v.as_str())
124 .unwrap_or("TaskFlow Notification");
125
126 let body = task
127 .definition
128 .payload
129 .get("body")
130 .and_then(|v| v.as_str())
131 .unwrap_or("Task completed successfully");
132
133 info!(
134 "Sending email notification for task: {}",
135 task.definition.id
136 );
137
138 let email = MessageBuilder::new()
140 .from(
141 from_email
142 .parse::<Mailbox>()
143 .map_err(|e| TaskFlowError::InvalidConfiguration(e.to_string()))?,
144 )
145 .to(to_email
146 .parse::<Mailbox>()
147 .map_err(|e| TaskFlowError::InvalidConfiguration(e.to_string()))?)
148 .subject(subject)
149 .body(body.to_string())
150 .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
151
152 let mut mailer_builder = SmtpTransport::relay(smtp_server)
154 .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
155
156 if let (Some(username), Some(password)) = (
158 task.definition
159 .payload
160 .get("username")
161 .and_then(|v| v.as_str()),
162 task.definition
163 .payload
164 .get("password")
165 .and_then(|v| v.as_str()),
166 ) {
167 mailer_builder = mailer_builder
168 .credentials(Credentials::new(username.to_string(), password.to_string()));
169 }
170
171 let tls_mode = task
173 .definition
174 .payload
175 .get("tls")
176 .and_then(|v| v.as_str())
177 .unwrap_or("required");
178
179 let mailer = match tls_mode {
180 "required" => mailer_builder.build(),
181 "none" => mailer_builder.tls(Tls::None).build(),
182 _ => mailer_builder.build(),
183 };
184
185 match mailer.send(&email) {
187 Ok(_) => {
188 info!("Email sent successfully to: {}", to_email);
189 Ok(TaskResult {
190 success: true,
191 output: Some(format!("Email sent successfully to {}", to_email)),
192 error: None,
193 execution_time_ms: 0,
194 metadata: Default::default(),
195 })
196 }
197 Err(e) => {
198 error!("Failed to send email: {}", e);
199 Ok(TaskResult {
200 success: false,
201 output: None,
202 error: Some(format!("Email sending failed: {}", e)),
203 execution_time_ms: 0,
204 metadata: Default::default(),
205 })
206 }
207 }
208 }
209
210 async fn send_slack(&self, task: &Task) -> Result<TaskResult> {
211 let webhook_url = task
212 .definition
213 .payload
214 .get("webhook_url")
215 .and_then(|v| v.as_str())
216 .ok_or_else(|| {
217 TaskFlowError::InvalidConfiguration("Missing Slack webhook URL".to_string())
218 })?;
219
220 let message = task
221 .definition
222 .payload
223 .get("message")
224 .and_then(|v| v.as_str())
225 .unwrap_or("Task completed");
226
227 let payload = serde_json::json!({
228 "text": message,
229 "attachments": [{
230 "color": "#36a64f",
231 "fields": [
232 {
233 "title": "Task ID",
234 "value": task.definition.id,
235 "short": true
236 },
237 {
238 "title": "Task Name",
239 "value": task.definition.name,
240 "short": true
241 }
242 ]
243 }]
244 });
245
246 let response = self
247 .client
248 .post(webhook_url)
249 .json(&payload)
250 .send()
251 .await
252 .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
253
254 if !response.status().is_success() {
255 return Err(TaskFlowError::ExecutionError(format!(
256 "Slack notification failed: {}",
257 response.status()
258 )));
259 }
260
261 Ok(TaskResult {
262 success: true,
263 output: Some("Slack notification sent".to_string()),
264 error: None,
265 execution_time_ms: 0,
266 metadata: Default::default(),
267 })
268 }
269
270 async fn send_discord(&self, task: &Task) -> Result<TaskResult> {
271 let webhook_url = task
272 .definition
273 .payload
274 .get("webhook_url")
275 .and_then(|v| v.as_str())
276 .ok_or_else(|| {
277 TaskFlowError::InvalidConfiguration("Missing Discord webhook URL".to_string())
278 })?;
279
280 let message = task
281 .definition
282 .payload
283 .get("message")
284 .and_then(|v| v.as_str())
285 .unwrap_or("Task completed");
286
287 let payload = serde_json::json!({
288 "content": message,
289 "embeds": [{
290 "title": task.definition.name,
291 "description": format!("Task ID: {}", task.definition.id),
292 "color": 5814783,
293 "timestamp": chrono::Utc::now().to_rfc3339()
294 }]
295 });
296
297 let response = self
298 .client
299 .post(webhook_url)
300 .json(&payload)
301 .send()
302 .await
303 .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
304
305 if !response.status().is_success() {
306 return Err(TaskFlowError::ExecutionError(format!(
307 "Discord notification failed: {}",
308 response.status()
309 )));
310 }
311
312 Ok(TaskResult {
313 success: true,
314 output: Some("Discord notification sent".to_string()),
315 error: None,
316 execution_time_ms: 0,
317 metadata: Default::default(),
318 })
319 }
320}