taskflow_rs/executor/handlers/
notification.rs

1use lettre::{
2    Transport,
3    message::{Mailbox, MessageBuilder},
4    transport::smtp::{SmtpTransport, authentication::Credentials, client::Tls},
5};
6use reqwest::Client;
7use tracing::{error, info};
8
9use crate::error::{Result, TaskFlowError};
10use crate::task::{Task, TaskHandler, TaskResult};
11
12pub struct NotificationTaskHandler {
13    client: Client,
14}
15
16impl NotificationTaskHandler {
17    pub fn new() -> Self {
18        Self {
19            client: Client::new(),
20        }
21    }
22}
23
24#[async_trait::async_trait]
25impl TaskHandler for NotificationTaskHandler {
26    fn task_type(&self) -> &'static str {
27        "notification"
28    }
29
30    async fn execute(&self, task: &Task) -> Result<TaskResult> {
31        let notification_type = task
32            .definition
33            .payload
34            .get("type")
35            .and_then(|v| v.as_str())
36            .ok_or_else(|| {
37                TaskFlowError::InvalidConfiguration("Missing notification type".to_string())
38            })?;
39
40        info!(
41            "Sending {} notification: {}",
42            notification_type, task.definition.id
43        );
44
45        match notification_type {
46            "webhook" => self.send_webhook(task).await,
47            "email" => self.send_email(task).await,
48            "slack" => self.send_slack(task).await,
49            "discord" => self.send_discord(task).await,
50            nt => Err(TaskFlowError::InvalidConfiguration(format!(
51                "Unknown notification type: {}",
52                nt
53            ))),
54        }
55    }
56}
57
58impl NotificationTaskHandler {
59    async fn send_webhook(&self, task: &Task) -> Result<TaskResult> {
60        let url = task
61            .definition
62            .payload
63            .get("url")
64            .and_then(|v| v.as_str())
65            .ok_or_else(|| {
66                TaskFlowError::InvalidConfiguration("Missing webhook URL".to_string())
67            })?;
68
69        let payload = task.definition.payload.get("payload").cloned();
70
71        let response = self
72            .client
73            .post(url)
74            .json(&payload.unwrap_or(serde_json::Value::Object(Default::default())))
75            .send()
76            .await
77            .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
78
79        if !response.status().is_success() {
80            return Err(TaskFlowError::ExecutionError(format!(
81                "Webhook failed with status: {}",
82                response.status()
83            )));
84        }
85
86        Ok(TaskResult {
87            success: true,
88            output: Some("Webhook sent successfully".to_string()),
89            error: None,
90            execution_time_ms: 0,
91            metadata: Default::default(),
92        })
93    }
94
95    async fn send_email(&self, task: &Task) -> Result<TaskResult> {
96        let smtp_server = task
97            .definition
98            .payload
99            .get("smtp_server")
100            .and_then(|v| v.as_str())
101            .ok_or_else(|| {
102                TaskFlowError::InvalidConfiguration("Missing SMTP server".to_string())
103            })?;
104
105        let from_email = task
106            .definition
107            .payload
108            .get("from")
109            .and_then(|v| v.as_str())
110            .ok_or_else(|| TaskFlowError::InvalidConfiguration("Missing from email".to_string()))?;
111
112        let to_email = task
113            .definition
114            .payload
115            .get("to")
116            .and_then(|v| v.as_str())
117            .ok_or_else(|| TaskFlowError::InvalidConfiguration("Missing to email".to_string()))?;
118
119        let subject = task
120            .definition
121            .payload
122            .get("subject")
123            .and_then(|v| v.as_str())
124            .unwrap_or("TaskFlow Notification");
125
126        let body = task
127            .definition
128            .payload
129            .get("body")
130            .and_then(|v| v.as_str())
131            .unwrap_or("Task completed successfully");
132
133        info!(
134            "Sending email notification for task: {}",
135            task.definition.id
136        );
137
138        // Build email message
139        let email = MessageBuilder::new()
140            .from(
141                from_email
142                    .parse::<Mailbox>()
143                    .map_err(|e| TaskFlowError::InvalidConfiguration(e.to_string()))?,
144            )
145            .to(to_email
146                .parse::<Mailbox>()
147                .map_err(|e| TaskFlowError::InvalidConfiguration(e.to_string()))?)
148            .subject(subject)
149            .body(body.to_string())
150            .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
151
152        // Configure SMTP transport
153        let mut mailer_builder = SmtpTransport::relay(smtp_server)
154            .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
155
156        // Optional authentication
157        if let (Some(username), Some(password)) = (
158            task.definition
159                .payload
160                .get("username")
161                .and_then(|v| v.as_str()),
162            task.definition
163                .payload
164                .get("password")
165                .and_then(|v| v.as_str()),
166        ) {
167            mailer_builder = mailer_builder
168                .credentials(Credentials::new(username.to_string(), password.to_string()));
169        }
170
171        // Optional TLS configuration
172        let tls_mode = task
173            .definition
174            .payload
175            .get("tls")
176            .and_then(|v| v.as_str())
177            .unwrap_or("required");
178
179        let mailer = match tls_mode {
180            "required" => mailer_builder.build(),
181            "none" => mailer_builder.tls(Tls::None).build(),
182            _ => mailer_builder.build(),
183        };
184
185        // Send email
186        match mailer.send(&email) {
187            Ok(_) => {
188                info!("Email sent successfully to: {}", to_email);
189                Ok(TaskResult {
190                    success: true,
191                    output: Some(format!("Email sent successfully to {}", to_email)),
192                    error: None,
193                    execution_time_ms: 0,
194                    metadata: Default::default(),
195                })
196            }
197            Err(e) => {
198                error!("Failed to send email: {}", e);
199                Ok(TaskResult {
200                    success: false,
201                    output: None,
202                    error: Some(format!("Email sending failed: {}", e)),
203                    execution_time_ms: 0,
204                    metadata: Default::default(),
205                })
206            }
207        }
208    }
209
210    async fn send_slack(&self, task: &Task) -> Result<TaskResult> {
211        let webhook_url = task
212            .definition
213            .payload
214            .get("webhook_url")
215            .and_then(|v| v.as_str())
216            .ok_or_else(|| {
217                TaskFlowError::InvalidConfiguration("Missing Slack webhook URL".to_string())
218            })?;
219
220        let message = task
221            .definition
222            .payload
223            .get("message")
224            .and_then(|v| v.as_str())
225            .unwrap_or("Task completed");
226
227        let payload = serde_json::json!({
228            "text": message,
229            "attachments": [{
230                "color": "#36a64f",
231                "fields": [
232                    {
233                        "title": "Task ID",
234                        "value": task.definition.id,
235                        "short": true
236                    },
237                    {
238                        "title": "Task Name",
239                        "value": task.definition.name,
240                        "short": true
241                    }
242                ]
243            }]
244        });
245
246        let response = self
247            .client
248            .post(webhook_url)
249            .json(&payload)
250            .send()
251            .await
252            .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
253
254        if !response.status().is_success() {
255            return Err(TaskFlowError::ExecutionError(format!(
256                "Slack notification failed: {}",
257                response.status()
258            )));
259        }
260
261        Ok(TaskResult {
262            success: true,
263            output: Some("Slack notification sent".to_string()),
264            error: None,
265            execution_time_ms: 0,
266            metadata: Default::default(),
267        })
268    }
269
270    async fn send_discord(&self, task: &Task) -> Result<TaskResult> {
271        let webhook_url = task
272            .definition
273            .payload
274            .get("webhook_url")
275            .and_then(|v| v.as_str())
276            .ok_or_else(|| {
277                TaskFlowError::InvalidConfiguration("Missing Discord webhook URL".to_string())
278            })?;
279
280        let message = task
281            .definition
282            .payload
283            .get("message")
284            .and_then(|v| v.as_str())
285            .unwrap_or("Task completed");
286
287        let payload = serde_json::json!({
288            "content": message,
289            "embeds": [{
290                "title": task.definition.name,
291                "description": format!("Task ID: {}", task.definition.id),
292                "color": 5814783,
293                "timestamp": chrono::Utc::now().to_rfc3339()
294            }]
295        });
296
297        let response = self
298            .client
299            .post(webhook_url)
300            .json(&payload)
301            .send()
302            .await
303            .map_err(|e| TaskFlowError::ExecutionError(e.to_string()))?;
304
305        if !response.status().is_success() {
306            return Err(TaskFlowError::ExecutionError(format!(
307                "Discord notification failed: {}",
308                response.status()
309            )));
310        }
311
312        Ok(TaskResult {
313            success: true,
314            output: Some("Discord notification sent".to_string()),
315            error: None,
316            execution_time_ms: 0,
317            metadata: Default::default(),
318        })
319    }
320}