1use super::imap::{create_imap_provider, EmailMessage};
2use super::smtp::{create_provider as create_smtp_provider, EmailProvider};
3use super::storage::{EmailAccount, EmailStorage, EmailStorageConfig, ImapAccountConfig, SmtpAccountConfig};
4use super::types::*;
5use async_stream::stream;
6use futures::Stream;
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use plexus_core::plexus;
12use plexus_core::serde_helpers;
13
14#[derive(Clone)]
15pub struct Email {
16 storage: Arc<EmailStorage>,
17 templates: Arc<tokio::sync::RwLock<HashMap<String, EmailTemplate>>>,
18}
19
20impl Email {
21 pub async fn new() -> Result<Self, String> {
22 let storage = EmailStorage::new(EmailStorageConfig::default()).await?;
23
24 Ok(Self {
25 storage: Arc::new(storage),
26 templates: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
27 })
28 }
29
30 pub async fn with_config(config: EmailStorageConfig) -> Result<Self, String> {
31 let storage = EmailStorage::new(config).await?;
32
33 Ok(Self {
34 storage: Arc::new(storage),
35 templates: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
36 })
37 }
38}
39
40#[plexus_macros::hub_methods(
41 namespace = "email",
42 version = "2.0.0",
43 description = "Multi-account email with IMAP reading and SMTP sending"
44)]
45impl Email {
46 #[plexus_macros::hub_method(
49 description = "Register a new email account with SMTP and/or IMAP credentials",
50 params(
51 name = "Account name (typically the email address)",
52 smtp = "SMTP configuration for sending (optional)",
53 imap = "IMAP configuration for reading (optional)"
54 )
55 )]
56 async fn register_account(
57 &self,
58 name: String,
59 smtp: Option<SmtpAccountConfig>,
60 imap: Option<ImapAccountConfig>,
61 ) -> impl Stream<Item = RegisterAccountEvent> + Send + 'static {
62 let storage = self.storage.clone();
63
64 stream! {
65 let now = chrono::Utc::now().timestamp();
66 let account = EmailAccount {
67 name: name.clone(),
68 smtp: smtp.clone(),
69 imap: imap.clone(),
70 created_at: now,
71 updated_at: now,
72 };
73
74 match storage.register_account(account).await {
75 Ok(_) => yield RegisterAccountEvent::Registered {
76 account_name: name,
77 has_smtp: smtp.is_some(),
78 has_imap: imap.is_some(),
79 },
80 Err(e) => yield RegisterAccountEvent::Error { message: e },
81 }
82 }
83 }
84
85 #[plexus_macros::hub_method(
86 streaming,
87 description = "List all registered email accounts"
88 )]
89 async fn list_accounts(&self) -> impl Stream<Item = ListAccountsEvent> + Send + 'static {
90 let storage = self.storage.clone();
91
92 stream! {
93 match storage.list_accounts().await {
94 Ok(accounts) => {
95 let total = accounts.len();
96 for account in accounts {
97 yield ListAccountsEvent::Account {
98 name: account.name,
99 has_smtp: account.smtp.is_some(),
100 has_imap: account.imap.is_some(),
101 created_at: account.created_at,
102 };
103 }
104 yield ListAccountsEvent::Complete { total };
105 }
106 Err(e) => {
107 tracing::error!("Failed to list accounts: {}", e);
109 yield ListAccountsEvent::Complete { total: 0 };
110 }
111 }
112 }
113 }
114
115 #[plexus_macros::hub_method(
116 description = "Remove an email account",
117 params(name = "Account name to remove")
118 )]
119 async fn remove_account(
120 &self,
121 name: String,
122 ) -> impl Stream<Item = RemoveAccountEvent> + Send + 'static {
123 let storage = self.storage.clone();
124
125 stream! {
126 match storage.remove_account(&name).await {
127 Ok(true) => yield RemoveAccountEvent::Removed { account_name: name },
128 Ok(false) => yield RemoveAccountEvent::NotFound { account_name: name },
129 Err(e) => yield RemoveAccountEvent::Error { message: e },
130 }
131 }
132 }
133
134 #[plexus_macros::hub_method(
137 description = "Send an email from a registered account",
138 params(
139 account = "Account name to send from",
140 to = "Recipients",
141 cc = "CC recipients (optional)",
142 bcc = "BCC recipients (optional)",
143 subject = "Email subject",
144 body = "Email body (text, HTML, or both)",
145 attachments = "File attachments (optional)",
146 reply_to = "Reply-to address (optional)"
147 )
148 )]
149 async fn send_from(
150 &self,
151 account: String,
152 to: Vec<String>,
153 cc: Option<Vec<String>>,
154 bcc: Option<Vec<String>>,
155 subject: String,
156 body: EmailBody,
157 attachments: Option<Vec<Attachment>>,
158 reply_to: Option<String>,
159 ) -> impl Stream<Item = SendEmailEvent> + Send + 'static {
160 let storage = self.storage.clone();
161
162 stream! {
163 let account_config = match storage.get_account(&account).await {
165 Ok(Some(acc)) => acc,
166 Ok(None) => {
167 yield SendEmailEvent::Error {
168 message: format!("Account '{}' not found", account),
169 code: Some("ACCOUNT_NOT_FOUND".to_string()),
170 };
171 return;
172 }
173 Err(e) => {
174 yield SendEmailEvent::Error {
175 message: format!("Failed to load account: {}", e),
176 code: None,
177 };
178 return;
179 }
180 };
181
182 let smtp_config = match account_config.smtp {
184 Some(config) => config,
185 None => {
186 yield SendEmailEvent::Error {
187 message: format!("Account '{}' has no SMTP configuration", account),
188 code: Some("NO_SMTP_CONFIG".to_string()),
189 };
190 return;
191 }
192 };
193
194 let email_config = crate::config::EmailConfig {
196 provider: crate::config::EmailProvider::Smtp,
197 credentials: crate::config::EmailCredentials::Smtp {
198 smtp_host: smtp_config.host,
199 smtp_port: smtp_config.port,
200 smtp_username: smtp_config.username,
201 smtp_password: smtp_config.password,
202 smtp_from: smtp_config.from_email,
203 },
204 };
205
206 let provider = match create_smtp_provider(&email_config) {
208 Ok(p) => p,
209 Err(e) => {
210 yield SendEmailEvent::Error {
211 message: format!("Failed to create SMTP provider: {}", e),
212 code: None,
213 };
214 return;
215 }
216 };
217
218 let params = SendEmailParams {
219 to,
220 cc,
221 bcc,
222 subject,
223 body,
224 attachments,
225 reply_to,
226 };
227
228 match provider.send(params).await {
229 Ok(event) => yield event,
230 Err(e) => yield SendEmailEvent::Error {
231 message: e,
232 code: None,
233 },
234 }
235 }
236 }
237
238 #[plexus_macros::hub_method(
239 streaming,
240 description = "Send multiple emails with progress tracking from a registered account",
241 params(
242 account = "Account name to send from",
243 emails = "List of emails to send"
244 )
245 )]
246 async fn send_batch_from(
247 &self,
248 account: String,
249 emails: Vec<SendEmailParams>,
250 ) -> impl Stream<Item = BatchSendEvent> + Send + 'static {
251 let storage = self.storage.clone();
252 let total = emails.len();
253
254 stream! {
255 let account_config = match storage.get_account(&account).await {
257 Ok(Some(acc)) => acc,
258 Ok(None) => {
259 yield BatchSendEvent::Complete {
260 total_sent: 0,
261 total_failed: total,
262 };
263 return;
264 }
265 Err(_) => {
266 yield BatchSendEvent::Complete {
267 total_sent: 0,
268 total_failed: total,
269 };
270 return;
271 }
272 };
273
274 let smtp_config = match account_config.smtp {
276 Some(config) => config,
277 None => {
278 yield BatchSendEvent::Complete {
279 total_sent: 0,
280 total_failed: total,
281 };
282 return;
283 }
284 };
285
286 let email_config = crate::config::EmailConfig {
288 provider: crate::config::EmailProvider::Smtp,
289 credentials: crate::config::EmailCredentials::Smtp {
290 smtp_host: smtp_config.host,
291 smtp_port: smtp_config.port,
292 smtp_username: smtp_config.username,
293 smtp_password: smtp_config.password,
294 smtp_from: smtp_config.from_email,
295 },
296 };
297
298 let provider = match create_smtp_provider(&email_config) {
299 Ok(p) => p,
300 Err(_) => {
301 yield BatchSendEvent::Complete {
302 total_sent: 0,
303 total_failed: total,
304 };
305 return;
306 }
307 };
308
309 let mut sent = 0;
310 let mut failed = 0;
311
312 for (index, email) in emails.into_iter().enumerate() {
313 match provider.send(email).await {
314 Ok(SendEmailEvent::Sent { message_id, .. }) |
315 Ok(SendEmailEvent::Queued { message_id }) => {
316 sent += 1;
317 yield BatchSendEvent::EmailSent { index, message_id };
318 }
319 Ok(SendEmailEvent::Error { message, .. }) | Err(message) => {
320 failed += 1;
321 yield BatchSendEvent::EmailFailed { index, error: message };
322 }
323 }
324
325 if (index + 1) % 10 == 0 || index + 1 == total {
326 yield BatchSendEvent::Progress {
327 sent,
328 total,
329 percentage: ((sent + failed) as f32 / total as f32) * 100.0,
330 };
331 }
332 }
333
334 yield BatchSendEvent::Complete {
335 total_sent: sent,
336 total_failed: failed,
337 };
338 }
339 }
340
341 #[plexus_macros::hub_method(
344 streaming,
345 description = "Read inbox messages from a registered account",
346 params(
347 account = "Account name to read from",
348 limit = "Maximum number of messages to fetch (optional)"
349 )
350 )]
351 async fn read_inbox(
352 &self,
353 account: String,
354 limit: Option<u32>,
355 ) -> impl Stream<Item = ReadInboxEvent> + Send + 'static {
356 let storage = self.storage.clone();
357
358 stream! {
359 let account_config = match storage.get_account(&account).await {
361 Ok(Some(acc)) => acc,
362 Ok(None) => {
363 yield ReadInboxEvent::Error {
364 message: format!("Account '{}' not found", account),
365 };
366 return;
367 }
368 Err(e) => {
369 yield ReadInboxEvent::Error {
370 message: format!("Failed to load account: {}", e),
371 };
372 return;
373 }
374 };
375
376 let imap_config = match account_config.imap {
378 Some(config) => config,
379 None => {
380 yield ReadInboxEvent::Error {
381 message: format!("Account '{}' has no IMAP configuration", account),
382 };
383 return;
384 }
385 };
386
387 #[cfg(feature = "email-imap")]
389 {
390 let provider = create_imap_provider(imap_config);
391
392 match provider.fetch_messages(limit).await {
393 Ok(messages) => {
394 let total = messages.len();
395 for message in messages {
396 yield ReadInboxEvent::Message { message };
397 }
398 yield ReadInboxEvent::Complete { total };
399 }
400 Err(e) => {
401 yield ReadInboxEvent::Error { message: e };
402 }
403 }
404 }
405
406 #[cfg(not(feature = "email-imap"))]
407 {
408 yield ReadInboxEvent::Error {
409 message: "IMAP support not enabled. Enable 'email-imap' feature.".to_string(),
410 };
411 }
412 }
413 }
414
415 #[plexus_macros::hub_method(
416 streaming,
417 description = "Search messages in a registered account",
418 params(
419 account = "Account name to search in",
420 query = "IMAP search query (e.g., 'FROM github', 'SUBJECT deploy')"
421 )
422 )]
423 async fn search_messages(
424 &self,
425 account: String,
426 query: String,
427 ) -> impl Stream<Item = SearchMessagesEvent> + Send + 'static {
428 let storage = self.storage.clone();
429
430 stream! {
431 let account_config = match storage.get_account(&account).await {
433 Ok(Some(acc)) => acc,
434 Ok(None) => {
435 yield SearchMessagesEvent::Error {
436 message: format!("Account '{}' not found", account),
437 };
438 return;
439 }
440 Err(e) => {
441 yield SearchMessagesEvent::Error {
442 message: format!("Failed to load account: {}", e),
443 };
444 return;
445 }
446 };
447
448 let imap_config = match account_config.imap {
450 Some(config) => config,
451 None => {
452 yield SearchMessagesEvent::Error {
453 message: format!("Account '{}' has no IMAP configuration", account),
454 };
455 return;
456 }
457 };
458
459 #[cfg(feature = "email-imap")]
461 {
462 let provider = create_imap_provider(imap_config);
463
464 match provider.search_messages(&query).await {
465 Ok(messages) => {
466 let total = messages.len();
467 for message in messages {
468 yield SearchMessagesEvent::Message { message };
469 }
470 yield SearchMessagesEvent::Complete { total };
471 }
472 Err(e) => {
473 yield SearchMessagesEvent::Error { message: e };
474 }
475 }
476 }
477
478 #[cfg(not(feature = "email-imap"))]
479 {
480 yield SearchMessagesEvent::Error {
481 message: "IMAP support not enabled. Enable 'email-imap' feature.".to_string(),
482 };
483 }
484 }
485 }
486
487 #[plexus_macros::hub_method(
488 description = "Mark a message as read",
489 params(
490 account = "Account name",
491 uid = "Message UID"
492 )
493 )]
494 async fn mark_read(
495 &self,
496 account: String,
497 uid: u32,
498 ) -> impl Stream<Item = MarkMessageEvent> + Send + 'static {
499 let storage = self.storage.clone();
500
501 stream! {
502 let account_config = match storage.get_account(&account).await {
503 Ok(Some(acc)) => acc,
504 Ok(None) => {
505 yield MarkMessageEvent::Error {
506 message: format!("Account '{}' not found", account),
507 };
508 return;
509 }
510 Err(e) => {
511 yield MarkMessageEvent::Error {
512 message: format!("Failed to load account: {}", e),
513 };
514 return;
515 }
516 };
517
518 let imap_config = match account_config.imap {
519 Some(config) => config,
520 None => {
521 yield MarkMessageEvent::Error {
522 message: format!("Account '{}' has no IMAP configuration", account),
523 };
524 return;
525 }
526 };
527
528 #[cfg(feature = "email-imap")]
529 {
530 let provider = create_imap_provider(imap_config);
531
532 match provider.mark_seen(uid).await {
533 Ok(_) => yield MarkMessageEvent::Marked {
534 uid,
535 status: "read".to_string(),
536 },
537 Err(e) => yield MarkMessageEvent::Error { message: e },
538 }
539 }
540
541 #[cfg(not(feature = "email-imap"))]
542 {
543 yield MarkMessageEvent::Error {
544 message: "IMAP support not enabled.".to_string(),
545 };
546 }
547 }
548 }
549
550 #[plexus_macros::hub_method(
551 description = "Mark a message as unread",
552 params(
553 account = "Account name",
554 uid = "Message UID"
555 )
556 )]
557 async fn mark_unread(
558 &self,
559 account: String,
560 uid: u32,
561 ) -> impl Stream<Item = MarkMessageEvent> + Send + 'static {
562 let storage = self.storage.clone();
563
564 stream! {
565 let account_config = match storage.get_account(&account).await {
566 Ok(Some(acc)) => acc,
567 Ok(None) => {
568 yield MarkMessageEvent::Error {
569 message: format!("Account '{}' not found", account),
570 };
571 return;
572 }
573 Err(e) => {
574 yield MarkMessageEvent::Error {
575 message: format!("Failed to load account: {}", e),
576 };
577 return;
578 }
579 };
580
581 let imap_config = match account_config.imap {
582 Some(config) => config,
583 None => {
584 yield MarkMessageEvent::Error {
585 message: format!("Account '{}' has no IMAP configuration", account),
586 };
587 return;
588 }
589 };
590
591 #[cfg(feature = "email-imap")]
592 {
593 let provider = create_imap_provider(imap_config);
594
595 match provider.mark_unseen(uid).await {
596 Ok(_) => yield MarkMessageEvent::Marked {
597 uid,
598 status: "unread".to_string(),
599 },
600 Err(e) => yield MarkMessageEvent::Error { message: e },
601 }
602 }
603
604 #[cfg(not(feature = "email-imap"))]
605 {
606 yield MarkMessageEvent::Error {
607 message: "IMAP support not enabled.".to_string(),
608 };
609 }
610 }
611 }
612
613 #[plexus_macros::hub_method(
616 description = "Validate an email address",
617 params(email = "Email address to validate")
618 )]
619 async fn validate_address(
620 &self,
621 email: String,
622 ) -> impl Stream<Item = ValidateAddressEvent> + Send + 'static {
623 stream! {
624 if email.contains('@') && email.contains('.') {
626 yield ValidateAddressEvent::Valid { email };
627 } else {
628 yield ValidateAddressEvent::Invalid {
629 email,
630 reason: "Invalid email format".to_string(),
631 };
632 }
633 }
634 }
635}