Skip to main content

plexus_comms/activations/email/
activation.rs

1use super::imap::{create_imap_provider, EmailMessage};
2use super::smtp::{create_provider as create_smtp_provider, EmailProvider};
3use super::storage::{EmailAccount, EmailStorage, EmailStorageConfig, ImapAccountConfig, SmtpAccountConfig};
4use super::types::*;
5use async_stream::stream;
6use futures::Stream;
7use std::collections::HashMap;
8use std::sync::Arc;
9
10// Required for macro-generated code
11use plexus_core::plexus;
12use plexus_core::serde_helpers;
13
14#[derive(Clone)]
15pub struct Email {
16    storage: Arc<EmailStorage>,
17    templates: Arc<tokio::sync::RwLock<HashMap<String, EmailTemplate>>>,
18}
19
20impl Email {
21    pub async fn new() -> Result<Self, String> {
22        let storage = EmailStorage::new(EmailStorageConfig::default()).await?;
23
24        Ok(Self {
25            storage: Arc::new(storage),
26            templates: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
27        })
28    }
29
30    pub async fn with_config(config: EmailStorageConfig) -> Result<Self, String> {
31        let storage = EmailStorage::new(config).await?;
32
33        Ok(Self {
34            storage: Arc::new(storage),
35            templates: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
36        })
37    }
38}
39
40#[plexus_macros::hub_methods(
41    namespace = "email",
42    version = "2.0.0",
43    description = "Multi-account email with IMAP reading and SMTP sending"
44)]
45impl Email {
46    // ==================== Account Management ====================
47
48    #[plexus_macros::hub_method(
49        description = "Register a new email account with SMTP and/or IMAP credentials",
50        params(
51            name = "Account name (typically the email address)",
52            smtp = "SMTP configuration for sending (optional)",
53            imap = "IMAP configuration for reading (optional)"
54        )
55    )]
56    async fn register_account(
57        &self,
58        name: String,
59        smtp: Option<SmtpAccountConfig>,
60        imap: Option<ImapAccountConfig>,
61    ) -> impl Stream<Item = RegisterAccountEvent> + Send + 'static {
62        let storage = self.storage.clone();
63
64        stream! {
65            let now = chrono::Utc::now().timestamp();
66            let account = EmailAccount {
67                name: name.clone(),
68                smtp: smtp.clone(),
69                imap: imap.clone(),
70                created_at: now,
71                updated_at: now,
72            };
73
74            match storage.register_account(account).await {
75                Ok(_) => yield RegisterAccountEvent::Registered {
76                    account_name: name,
77                    has_smtp: smtp.is_some(),
78                    has_imap: imap.is_some(),
79                },
80                Err(e) => yield RegisterAccountEvent::Error { message: e },
81            }
82        }
83    }
84
85    #[plexus_macros::hub_method(
86        streaming,
87        description = "List all registered email accounts"
88    )]
89    async fn list_accounts(&self) -> impl Stream<Item = ListAccountsEvent> + Send + 'static {
90        let storage = self.storage.clone();
91
92        stream! {
93            match storage.list_accounts().await {
94                Ok(accounts) => {
95                    let total = accounts.len();
96                    for account in accounts {
97                        yield ListAccountsEvent::Account {
98                            name: account.name,
99                            has_smtp: account.smtp.is_some(),
100                            has_imap: account.imap.is_some(),
101                            created_at: account.created_at,
102                        };
103                    }
104                    yield ListAccountsEvent::Complete { total };
105                }
106                Err(e) => {
107                    // Return error as a complete event with 0 total
108                    tracing::error!("Failed to list accounts: {}", e);
109                    yield ListAccountsEvent::Complete { total: 0 };
110                }
111            }
112        }
113    }
114
115    #[plexus_macros::hub_method(
116        description = "Remove an email account",
117        params(name = "Account name to remove")
118    )]
119    async fn remove_account(
120        &self,
121        name: String,
122    ) -> impl Stream<Item = RemoveAccountEvent> + Send + 'static {
123        let storage = self.storage.clone();
124
125        stream! {
126            match storage.remove_account(&name).await {
127                Ok(true) => yield RemoveAccountEvent::Removed { account_name: name },
128                Ok(false) => yield RemoveAccountEvent::NotFound { account_name: name },
129                Err(e) => yield RemoveAccountEvent::Error { message: e },
130            }
131        }
132    }
133
134    // ==================== SMTP Sending ====================
135
136    #[plexus_macros::hub_method(
137        description = "Send an email from a registered account",
138        params(
139            account = "Account name to send from",
140            to = "Recipients",
141            cc = "CC recipients (optional)",
142            bcc = "BCC recipients (optional)",
143            subject = "Email subject",
144            body = "Email body (text, HTML, or both)",
145            attachments = "File attachments (optional)",
146            reply_to = "Reply-to address (optional)"
147        )
148    )]
149    async fn send_from(
150        &self,
151        account: String,
152        to: Vec<String>,
153        cc: Option<Vec<String>>,
154        bcc: Option<Vec<String>>,
155        subject: String,
156        body: EmailBody,
157        attachments: Option<Vec<Attachment>>,
158        reply_to: Option<String>,
159    ) -> impl Stream<Item = SendEmailEvent> + Send + 'static {
160        let storage = self.storage.clone();
161
162        stream! {
163            // Get account
164            let account_config = match storage.get_account(&account).await {
165                Ok(Some(acc)) => acc,
166                Ok(None) => {
167                    yield SendEmailEvent::Error {
168                        message: format!("Account '{}' not found", account),
169                        code: Some("ACCOUNT_NOT_FOUND".to_string()),
170                    };
171                    return;
172                }
173                Err(e) => {
174                    yield SendEmailEvent::Error {
175                        message: format!("Failed to load account: {}", e),
176                        code: None,
177                    };
178                    return;
179                }
180            };
181
182            // Check if SMTP is configured
183            let smtp_config = match account_config.smtp {
184                Some(config) => config,
185                None => {
186                    yield SendEmailEvent::Error {
187                        message: format!("Account '{}' has no SMTP configuration", account),
188                        code: Some("NO_SMTP_CONFIG".to_string()),
189                    };
190                    return;
191                }
192            };
193
194            // Convert to EmailConfig format for provider
195            let email_config = crate::config::EmailConfig {
196                provider: crate::config::EmailProvider::Smtp,
197                credentials: crate::config::EmailCredentials::Smtp {
198                    smtp_host: smtp_config.host,
199                    smtp_port: smtp_config.port,
200                    smtp_username: smtp_config.username,
201                    smtp_password: smtp_config.password,
202                    smtp_from: smtp_config.from_email,
203                },
204            };
205
206            // Create provider and send
207            let provider = match create_smtp_provider(&email_config) {
208                Ok(p) => p,
209                Err(e) => {
210                    yield SendEmailEvent::Error {
211                        message: format!("Failed to create SMTP provider: {}", e),
212                        code: None,
213                    };
214                    return;
215                }
216            };
217
218            let params = SendEmailParams {
219                to,
220                cc,
221                bcc,
222                subject,
223                body,
224                attachments,
225                reply_to,
226            };
227
228            match provider.send(params).await {
229                Ok(event) => yield event,
230                Err(e) => yield SendEmailEvent::Error {
231                    message: e,
232                    code: None,
233                },
234            }
235        }
236    }
237
238    #[plexus_macros::hub_method(
239        streaming,
240        description = "Send multiple emails with progress tracking from a registered account",
241        params(
242            account = "Account name to send from",
243            emails = "List of emails to send"
244        )
245    )]
246    async fn send_batch_from(
247        &self,
248        account: String,
249        emails: Vec<SendEmailParams>,
250    ) -> impl Stream<Item = BatchSendEvent> + Send + 'static {
251        let storage = self.storage.clone();
252        let total = emails.len();
253
254        stream! {
255            // Get account
256            let account_config = match storage.get_account(&account).await {
257                Ok(Some(acc)) => acc,
258                Ok(None) => {
259                    yield BatchSendEvent::Complete {
260                        total_sent: 0,
261                        total_failed: total,
262                    };
263                    return;
264                }
265                Err(_) => {
266                    yield BatchSendEvent::Complete {
267                        total_sent: 0,
268                        total_failed: total,
269                    };
270                    return;
271                }
272            };
273
274            // Check SMTP config
275            let smtp_config = match account_config.smtp {
276                Some(config) => config,
277                None => {
278                    yield BatchSendEvent::Complete {
279                        total_sent: 0,
280                        total_failed: total,
281                    };
282                    return;
283                }
284            };
285
286            // Create provider
287            let email_config = crate::config::EmailConfig {
288                provider: crate::config::EmailProvider::Smtp,
289                credentials: crate::config::EmailCredentials::Smtp {
290                    smtp_host: smtp_config.host,
291                    smtp_port: smtp_config.port,
292                    smtp_username: smtp_config.username,
293                    smtp_password: smtp_config.password,
294                    smtp_from: smtp_config.from_email,
295                },
296            };
297
298            let provider = match create_smtp_provider(&email_config) {
299                Ok(p) => p,
300                Err(_) => {
301                    yield BatchSendEvent::Complete {
302                        total_sent: 0,
303                        total_failed: total,
304                    };
305                    return;
306                }
307            };
308
309            let mut sent = 0;
310            let mut failed = 0;
311
312            for (index, email) in emails.into_iter().enumerate() {
313                match provider.send(email).await {
314                    Ok(SendEmailEvent::Sent { message_id, .. }) |
315                    Ok(SendEmailEvent::Queued { message_id }) => {
316                        sent += 1;
317                        yield BatchSendEvent::EmailSent { index, message_id };
318                    }
319                    Ok(SendEmailEvent::Error { message, .. }) | Err(message) => {
320                        failed += 1;
321                        yield BatchSendEvent::EmailFailed { index, error: message };
322                    }
323                }
324
325                if (index + 1) % 10 == 0 || index + 1 == total {
326                    yield BatchSendEvent::Progress {
327                        sent,
328                        total,
329                        percentage: ((sent + failed) as f32 / total as f32) * 100.0,
330                    };
331                }
332            }
333
334            yield BatchSendEvent::Complete {
335                total_sent: sent,
336                total_failed: failed,
337            };
338        }
339    }
340
341    // ==================== IMAP Reading ====================
342
343    #[plexus_macros::hub_method(
344        streaming,
345        description = "Read inbox messages from a registered account",
346        params(
347            account = "Account name to read from",
348            limit = "Maximum number of messages to fetch (optional)"
349        )
350    )]
351    async fn read_inbox(
352        &self,
353        account: String,
354        limit: Option<u32>,
355    ) -> impl Stream<Item = ReadInboxEvent> + Send + 'static {
356        let storage = self.storage.clone();
357
358        stream! {
359            // Get account
360            let account_config = match storage.get_account(&account).await {
361                Ok(Some(acc)) => acc,
362                Ok(None) => {
363                    yield ReadInboxEvent::Error {
364                        message: format!("Account '{}' not found", account),
365                    };
366                    return;
367                }
368                Err(e) => {
369                    yield ReadInboxEvent::Error {
370                        message: format!("Failed to load account: {}", e),
371                    };
372                    return;
373                }
374            };
375
376            // Check IMAP config
377            let imap_config = match account_config.imap {
378                Some(config) => config,
379                None => {
380                    yield ReadInboxEvent::Error {
381                        message: format!("Account '{}' has no IMAP configuration", account),
382                    };
383                    return;
384                }
385            };
386
387            // Create IMAP provider
388            #[cfg(feature = "email-imap")]
389            {
390                let provider = create_imap_provider(imap_config);
391
392                match provider.fetch_messages(limit).await {
393                    Ok(messages) => {
394                        let total = messages.len();
395                        for message in messages {
396                            yield ReadInboxEvent::Message { message };
397                        }
398                        yield ReadInboxEvent::Complete { total };
399                    }
400                    Err(e) => {
401                        yield ReadInboxEvent::Error { message: e };
402                    }
403                }
404            }
405
406            #[cfg(not(feature = "email-imap"))]
407            {
408                yield ReadInboxEvent::Error {
409                    message: "IMAP support not enabled. Enable 'email-imap' feature.".to_string(),
410                };
411            }
412        }
413    }
414
415    #[plexus_macros::hub_method(
416        streaming,
417        description = "Search messages in a registered account",
418        params(
419            account = "Account name to search in",
420            query = "IMAP search query (e.g., 'FROM github', 'SUBJECT deploy')"
421        )
422    )]
423    async fn search_messages(
424        &self,
425        account: String,
426        query: String,
427    ) -> impl Stream<Item = SearchMessagesEvent> + Send + 'static {
428        let storage = self.storage.clone();
429
430        stream! {
431            // Get account
432            let account_config = match storage.get_account(&account).await {
433                Ok(Some(acc)) => acc,
434                Ok(None) => {
435                    yield SearchMessagesEvent::Error {
436                        message: format!("Account '{}' not found", account),
437                    };
438                    return;
439                }
440                Err(e) => {
441                    yield SearchMessagesEvent::Error {
442                        message: format!("Failed to load account: {}", e),
443                    };
444                    return;
445                }
446            };
447
448            // Check IMAP config
449            let imap_config = match account_config.imap {
450                Some(config) => config,
451                None => {
452                    yield SearchMessagesEvent::Error {
453                        message: format!("Account '{}' has no IMAP configuration", account),
454                    };
455                    return;
456                }
457            };
458
459            // Create IMAP provider
460            #[cfg(feature = "email-imap")]
461            {
462                let provider = create_imap_provider(imap_config);
463
464                match provider.search_messages(&query).await {
465                    Ok(messages) => {
466                        let total = messages.len();
467                        for message in messages {
468                            yield SearchMessagesEvent::Message { message };
469                        }
470                        yield SearchMessagesEvent::Complete { total };
471                    }
472                    Err(e) => {
473                        yield SearchMessagesEvent::Error { message: e };
474                    }
475                }
476            }
477
478            #[cfg(not(feature = "email-imap"))]
479            {
480                yield SearchMessagesEvent::Error {
481                    message: "IMAP support not enabled. Enable 'email-imap' feature.".to_string(),
482                };
483            }
484        }
485    }
486
487    #[plexus_macros::hub_method(
488        description = "Mark a message as read",
489        params(
490            account = "Account name",
491            uid = "Message UID"
492        )
493    )]
494    async fn mark_read(
495        &self,
496        account: String,
497        uid: u32,
498    ) -> impl Stream<Item = MarkMessageEvent> + Send + 'static {
499        let storage = self.storage.clone();
500
501        stream! {
502            let account_config = match storage.get_account(&account).await {
503                Ok(Some(acc)) => acc,
504                Ok(None) => {
505                    yield MarkMessageEvent::Error {
506                        message: format!("Account '{}' not found", account),
507                    };
508                    return;
509                }
510                Err(e) => {
511                    yield MarkMessageEvent::Error {
512                        message: format!("Failed to load account: {}", e),
513                    };
514                    return;
515                }
516            };
517
518            let imap_config = match account_config.imap {
519                Some(config) => config,
520                None => {
521                    yield MarkMessageEvent::Error {
522                        message: format!("Account '{}' has no IMAP configuration", account),
523                    };
524                    return;
525                }
526            };
527
528            #[cfg(feature = "email-imap")]
529            {
530                let provider = create_imap_provider(imap_config);
531
532                match provider.mark_seen(uid).await {
533                    Ok(_) => yield MarkMessageEvent::Marked {
534                        uid,
535                        status: "read".to_string(),
536                    },
537                    Err(e) => yield MarkMessageEvent::Error { message: e },
538                }
539            }
540
541            #[cfg(not(feature = "email-imap"))]
542            {
543                yield MarkMessageEvent::Error {
544                    message: "IMAP support not enabled.".to_string(),
545                };
546            }
547        }
548    }
549
550    #[plexus_macros::hub_method(
551        description = "Mark a message as unread",
552        params(
553            account = "Account name",
554            uid = "Message UID"
555        )
556    )]
557    async fn mark_unread(
558        &self,
559        account: String,
560        uid: u32,
561    ) -> impl Stream<Item = MarkMessageEvent> + Send + 'static {
562        let storage = self.storage.clone();
563
564        stream! {
565            let account_config = match storage.get_account(&account).await {
566                Ok(Some(acc)) => acc,
567                Ok(None) => {
568                    yield MarkMessageEvent::Error {
569                        message: format!("Account '{}' not found", account),
570                    };
571                    return;
572                }
573                Err(e) => {
574                    yield MarkMessageEvent::Error {
575                        message: format!("Failed to load account: {}", e),
576                    };
577                    return;
578                }
579            };
580
581            let imap_config = match account_config.imap {
582                Some(config) => config,
583                None => {
584                    yield MarkMessageEvent::Error {
585                        message: format!("Account '{}' has no IMAP configuration", account),
586                    };
587                    return;
588                }
589            };
590
591            #[cfg(feature = "email-imap")]
592            {
593                let provider = create_imap_provider(imap_config);
594
595                match provider.mark_unseen(uid).await {
596                    Ok(_) => yield MarkMessageEvent::Marked {
597                        uid,
598                        status: "unread".to_string(),
599                    },
600                    Err(e) => yield MarkMessageEvent::Error { message: e },
601                }
602            }
603
604            #[cfg(not(feature = "email-imap"))]
605            {
606                yield MarkMessageEvent::Error {
607                    message: "IMAP support not enabled.".to_string(),
608                };
609            }
610        }
611    }
612
613    // ==================== Template Management (kept from original) ====================
614
615    #[plexus_macros::hub_method(
616        description = "Validate an email address",
617        params(email = "Email address to validate")
618    )]
619    async fn validate_address(
620        &self,
621        email: String,
622    ) -> impl Stream<Item = ValidateAddressEvent> + Send + 'static {
623        stream! {
624            // Basic validation
625            if email.contains('@') && email.contains('.') {
626                yield ValidateAddressEvent::Valid { email };
627            } else {
628                yield ValidateAddressEvent::Invalid {
629                    email,
630                    reason: "Invalid email format".to_string(),
631                };
632            }
633        }
634    }
635}