1use super::{
7 Channel, ChannelCapabilities, ChannelMessage, ChannelStatus, ChannelType, ChannelUser,
8 MessageId, StreamingMode,
9};
10use crate::error::{ChannelError, RustantError};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13
14#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum EmailAuthMethod {
18 #[default]
20 Password,
21 #[serde(rename = "xoauth2")]
27 XOAuth2,
28}
29
30#[derive(Debug, Clone, Default, Serialize, Deserialize)]
32pub struct EmailConfig {
33 pub imap_host: String,
34 pub imap_port: u16,
35 pub smtp_host: String,
36 pub smtp_port: u16,
37 pub username: String,
39 #[serde(default)]
42 pub password: String,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub password_env: Option<String>,
47 pub from_address: String,
48 pub allowed_senders: Vec<String>,
49 #[serde(default)]
51 pub auth_method: EmailAuthMethod,
52}
53
54impl EmailConfig {
55 pub fn resolve_password(&self) -> String {
57 if let Some(ref env_var) = self.password_env
58 && let Ok(val) = std::env::var(env_var)
59 {
60 return val;
61 }
62 self.password.clone()
63 }
64}
65
66#[async_trait]
68pub trait SmtpSender: Send + Sync {
69 async fn send_email(&self, to: &str, subject: &str, body: &str) -> Result<String, String>;
70}
71
72#[async_trait]
74pub trait ImapReader: Send + Sync {
75 async fn fetch_unseen(&self) -> Result<Vec<IncomingEmail>, String>;
76 async fn connect(&self) -> Result<(), String>;
77}
78
79#[derive(Debug, Clone)]
81pub struct IncomingEmail {
82 pub message_id: String,
83 pub from: String,
84 pub subject: String,
85 pub body: String,
86}
87
88pub struct EmailChannel {
90 config: EmailConfig,
91 status: ChannelStatus,
92 smtp: Box<dyn SmtpSender>,
93 imap: Box<dyn ImapReader>,
94 name: String,
95}
96
97impl EmailChannel {
98 pub fn new(config: EmailConfig, smtp: Box<dyn SmtpSender>, imap: Box<dyn ImapReader>) -> Self {
99 Self {
100 config,
101 status: ChannelStatus::Disconnected,
102 smtp,
103 imap,
104 name: "email".to_string(),
105 }
106 }
107
108 pub fn with_name(mut self, name: impl Into<String>) -> Self {
109 self.name = name.into();
110 self
111 }
112}
113
114#[async_trait]
115impl Channel for EmailChannel {
116 fn name(&self) -> &str {
117 &self.name
118 }
119
120 fn channel_type(&self) -> ChannelType {
121 ChannelType::Email
122 }
123
124 async fn connect(&mut self) -> Result<(), RustantError> {
125 if self.config.username.is_empty() {
126 return Err(RustantError::Channel(ChannelError::AuthFailed {
127 name: self.name.clone(),
128 }));
129 }
130 if self.config.resolve_password().is_empty() {
132 return Err(RustantError::Channel(ChannelError::AuthFailed {
133 name: self.name.clone(),
134 }));
135 }
136 self.imap.connect().await.map_err(|e| {
137 RustantError::Channel(ChannelError::ConnectionFailed {
138 name: self.name.clone(),
139 message: e,
140 })
141 })?;
142 self.status = ChannelStatus::Connected;
143 Ok(())
144 }
145
146 async fn disconnect(&mut self) -> Result<(), RustantError> {
147 self.status = ChannelStatus::Disconnected;
148 Ok(())
149 }
150
151 async fn send_message(&self, msg: ChannelMessage) -> Result<MessageId, RustantError> {
152 let text = msg.content.as_text().unwrap_or("");
153 let subject = msg
154 .metadata
155 .get("subject")
156 .map(|s| s.as_str())
157 .unwrap_or("Message from Rustant");
158
159 self.smtp
160 .send_email(&msg.channel_id, subject, text)
161 .await
162 .map(MessageId::new)
163 .map_err(|e| {
164 RustantError::Channel(ChannelError::SendFailed {
165 name: self.name.clone(),
166 message: e,
167 })
168 })
169 }
170
171 async fn receive_messages(&self) -> Result<Vec<ChannelMessage>, RustantError> {
172 let emails = self.imap.fetch_unseen().await.map_err(|e| {
173 RustantError::Channel(ChannelError::ConnectionFailed {
174 name: self.name.clone(),
175 message: e,
176 })
177 })?;
178
179 let messages = emails
180 .into_iter()
181 .filter(|e| {
182 self.config.allowed_senders.is_empty()
183 || self.config.allowed_senders.contains(&e.from)
184 })
185 .map(|e| {
186 let sender = ChannelUser::new(&e.from, ChannelType::Email);
187 ChannelMessage::text(ChannelType::Email, &e.from, sender, &e.body)
188 .with_metadata("subject", &e.subject)
189 })
190 .collect();
191
192 Ok(messages)
193 }
194
195 fn status(&self) -> ChannelStatus {
196 self.status
197 }
198
199 fn capabilities(&self) -> ChannelCapabilities {
200 ChannelCapabilities {
201 supports_threads: false,
202 supports_reactions: false,
203 supports_files: true,
204 supports_voice: false,
205 supports_video: false,
206 max_message_length: None,
207 supports_editing: false,
208 supports_deletion: false,
209 }
210 }
211
212 fn streaming_mode(&self) -> StreamingMode {
213 StreamingMode::Polling { interval_ms: 30000 }
214 }
215}
216
217pub struct RealSmtp {
219 host: String,
220 port: u16,
221 username: String,
222 password: String,
223 from_address: String,
224 pub auth_method: EmailAuthMethod,
226}
227
228impl RealSmtp {
229 pub fn new(
230 host: String,
231 port: u16,
232 username: String,
233 password: String,
234 from_address: String,
235 auth_method: EmailAuthMethod,
236 ) -> Self {
237 Self {
238 host,
239 port,
240 username,
241 password,
242 from_address,
243 auth_method,
244 }
245 }
246}
247
248#[async_trait]
249impl SmtpSender for RealSmtp {
250 async fn send_email(&self, to: &str, subject: &str, body: &str) -> Result<String, String> {
251 let email = lettre::Message::builder()
252 .from(
253 self.from_address
254 .parse()
255 .map_err(|e| format!("Invalid from address: {e}"))?,
256 )
257 .to(to.parse().map_err(|e| format!("Invalid to address: {e}"))?)
258 .subject(subject)
259 .body(body.to_string())
260 .map_err(|e| format!("Failed to build email: {e}"))?;
261
262 let creds = lettre::transport::smtp::authentication::Credentials::new(
263 self.username.clone(),
264 self.password.clone(),
265 );
266
267 let mut builder =
268 lettre::AsyncSmtpTransport::<lettre::Tokio1Executor>::starttls_relay(&self.host)
269 .map_err(|e| format!("SMTP relay error: {e}"))?
270 .port(self.port)
271 .credentials(creds);
272
273 if self.auth_method == EmailAuthMethod::XOAuth2 {
276 use lettre::transport::smtp::authentication::Mechanism;
277 builder = builder.authentication(vec![Mechanism::Xoauth2]);
278 }
279
280 let mailer = builder.build();
281
282 use lettre::AsyncTransport;
283 let response = mailer
284 .send(email)
285 .await
286 .map_err(|e| format!("SMTP send error: {e}"))?;
287
288 Ok(format!("{}", response.code()))
289 }
290}
291
292pub struct XOAuth2Authenticator {
297 user: String,
298 access_token: String,
299}
300
301impl XOAuth2Authenticator {
302 pub fn new(user: &str, access_token: &str) -> Self {
303 Self {
304 user: user.to_string(),
305 access_token: access_token.to_string(),
306 }
307 }
308
309 pub fn response(&self) -> String {
311 format!(
312 "user={}\x01auth=Bearer {}\x01\x01",
313 self.user, self.access_token
314 )
315 }
316}
317
318impl async_imap::Authenticator for XOAuth2Authenticator {
319 type Response = String;
320
321 fn process(&mut self, _challenge: &[u8]) -> Self::Response {
322 self.response()
323 }
324}
325
326pub struct RealImap {
328 host: String,
329 port: u16,
330 username: String,
331 password: String,
332 pub auth_method: EmailAuthMethod,
334}
335
336impl RealImap {
337 pub fn new(
338 host: String,
339 port: u16,
340 username: String,
341 password: String,
342 auth_method: EmailAuthMethod,
343 ) -> Self {
344 Self {
345 host,
346 port,
347 username,
348 password,
349 auth_method,
350 }
351 }
352}
353
354#[async_trait]
355impl ImapReader for RealImap {
356 async fn fetch_unseen(&self) -> Result<Vec<IncomingEmail>, String> {
357 let tcp = tokio::net::TcpStream::connect((self.host.as_str(), self.port))
358 .await
359 .map_err(|e| format!("TCP connect error: {e}"))?;
360
361 let native_tls_connector =
362 native_tls::TlsConnector::new().map_err(|e| format!("TLS connector error: {e}"))?;
363 let tls_connector = tokio_native_tls::TlsConnector::from(native_tls_connector);
364 let tls_stream = tls_connector
365 .connect(&self.host, tcp)
366 .await
367 .map_err(|e| format!("TLS connect error: {e}"))?;
368
369 let mut client = async_imap::Client::new(tls_stream);
370
371 client
376 .read_response()
377 .await
378 .map_err(|e| format!("IMAP greeting read error: {e}"))?
379 .ok_or_else(|| "IMAP server closed connection before greeting".to_string())?;
380
381 let mut session = match self.auth_method {
382 EmailAuthMethod::XOAuth2 => {
383 let auth = XOAuth2Authenticator::new(&self.username, &self.password);
384 client
385 .authenticate("XOAUTH2", auth)
386 .await
387 .map_err(|e| format!("IMAP XOAUTH2 auth error: {}", e.0))?
388 }
389 EmailAuthMethod::Password => client
390 .login(&self.username, &self.password)
391 .await
392 .map_err(|e| format!("IMAP login error: {}", e.0))?,
393 };
394
395 session
396 .select("INBOX")
397 .await
398 .map_err(|e| format!("IMAP select error: {e}"))?;
399
400 let unseen = session
401 .search("UNSEEN")
402 .await
403 .map_err(|e| format!("IMAP search error: {e}"))?;
404
405 let mut emails = Vec::new();
406 if !unseen.is_empty() {
407 let seq_set: String = unseen
408 .iter()
409 .map(|s: &u32| s.to_string())
410 .collect::<Vec<_>>()
411 .join(",");
412
413 let fetch_stream = session
414 .fetch(&seq_set, "RFC822")
415 .await
416 .map_err(|e| format!("IMAP fetch error: {e}"))?;
417
418 use futures::TryStreamExt;
419 let messages: Vec<_> = fetch_stream
420 .try_collect()
421 .await
422 .map_err(|e| format!("IMAP stream error: {e}"))?;
423
424 for msg in &messages {
425 if let Some(body_bytes) = msg.body() {
426 let raw = String::from_utf8_lossy(body_bytes).to_string();
427 let from = raw
428 .lines()
429 .find(|l| l.starts_with("From:"))
430 .map(|l| l.trim_start_matches("From:").trim().to_string())
431 .unwrap_or_default();
432 let subject = raw
433 .lines()
434 .find(|l| l.starts_with("Subject:"))
435 .map(|l| l.trim_start_matches("Subject:").trim().to_string())
436 .unwrap_or_default();
437 let body_text = raw.split("\r\n\r\n").nth(1).unwrap_or("").to_string();
438
439 emails.push(IncomingEmail {
440 message_id: format!("imap-{}", msg.message),
441 from,
442 subject,
443 body: body_text,
444 });
445 }
446 }
447 }
448
449 let _ = session.logout().await;
450 Ok(emails)
451 }
452
453 async fn connect(&self) -> Result<(), String> {
454 let tcp = tokio::net::TcpStream::connect((self.host.as_str(), self.port))
455 .await
456 .map_err(|e| format!("TCP connect error: {e}"))?;
457
458 let native_tls_connector =
459 native_tls::TlsConnector::new().map_err(|e| format!("TLS connector error: {e}"))?;
460 let tls_connector = tokio_native_tls::TlsConnector::from(native_tls_connector);
461 let tls_stream = tls_connector
462 .connect(&self.host, tcp)
463 .await
464 .map_err(|e| format!("TLS connect error: {e}"))?;
465
466 let mut client = async_imap::Client::new(tls_stream);
467
468 client
470 .read_response()
471 .await
472 .map_err(|e| format!("IMAP greeting read error: {e}"))?
473 .ok_or_else(|| "IMAP server closed connection before greeting".to_string())?;
474
475 let mut session = match self.auth_method {
476 EmailAuthMethod::XOAuth2 => {
477 let auth = XOAuth2Authenticator::new(&self.username, &self.password);
478 client
479 .authenticate("XOAUTH2", auth)
480 .await
481 .map_err(|e| format!("IMAP XOAUTH2 auth error: {}", e.0))?
482 }
483 EmailAuthMethod::Password => client
484 .login(&self.username, &self.password)
485 .await
486 .map_err(|e| format!("IMAP login error: {}", e.0))?,
487 };
488
489 let _ = session.logout().await;
490 Ok(())
491 }
492}
493
494pub fn create_email_channel(config: EmailConfig) -> EmailChannel {
496 let resolved_password = config.resolve_password();
497 let smtp = RealSmtp::new(
498 config.smtp_host.clone(),
499 config.smtp_port,
500 config.username.clone(),
501 resolved_password.clone(),
502 config.from_address.clone(),
503 config.auth_method.clone(),
504 );
505 let imap = RealImap::new(
506 config.imap_host.clone(),
507 config.imap_port,
508 config.username.clone(),
509 resolved_password,
510 config.auth_method.clone(),
511 );
512 EmailChannel::new(config, Box::new(smtp), Box::new(imap))
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518
519 struct MockSmtp;
520
521 #[async_trait]
522 impl SmtpSender for MockSmtp {
523 async fn send_email(
524 &self,
525 _to: &str,
526 _subject: &str,
527 _body: &str,
528 ) -> Result<String, String> {
529 Ok("email-id-1".to_string())
530 }
531 }
532
533 struct MockImap;
534
535 #[async_trait]
536 impl ImapReader for MockImap {
537 async fn fetch_unseen(&self) -> Result<Vec<IncomingEmail>, String> {
538 Ok(vec![IncomingEmail {
539 message_id: "msg1".into(),
540 from: "alice@example.com".into(),
541 subject: "Test".into(),
542 body: "hello email".into(),
543 }])
544 }
545 async fn connect(&self) -> Result<(), String> {
546 Ok(())
547 }
548 }
549
550 #[tokio::test]
551 async fn test_email_connect() {
552 let config = EmailConfig {
553 username: "bot@example.com".into(),
554 password: "pass".into(),
555 ..Default::default()
556 };
557 let mut ch = EmailChannel::new(config, Box::new(MockSmtp), Box::new(MockImap));
558 ch.connect().await.unwrap();
559 assert!(ch.is_connected());
560 }
561
562 #[tokio::test]
563 async fn test_email_send() {
564 let config = EmailConfig {
565 username: "bot@example.com".into(),
566 password: "pass".into(),
567 ..Default::default()
568 };
569 let mut ch = EmailChannel::new(config, Box::new(MockSmtp), Box::new(MockImap));
570 ch.connect().await.unwrap();
571
572 let sender = ChannelUser::new("bot@ex.com", ChannelType::Email);
573 let msg = ChannelMessage::text(ChannelType::Email, "alice@ex.com", sender, "hi email")
574 .with_metadata("subject", "Greetings");
575 let id = ch.send_message(msg).await.unwrap();
576 assert_eq!(id.0, "email-id-1");
577 }
578
579 #[tokio::test]
580 async fn test_email_receive() {
581 let config = EmailConfig {
582 username: "bot@example.com".into(),
583 password: "pass".into(),
584 ..Default::default()
585 };
586 let mut ch = EmailChannel::new(config, Box::new(MockSmtp), Box::new(MockImap));
587 ch.connect().await.unwrap();
588
589 let msgs = ch.receive_messages().await.unwrap();
590 assert_eq!(msgs.len(), 1);
591 assert_eq!(msgs[0].content.as_text(), Some("hello email"));
592 assert_eq!(
593 msgs[0].metadata.get("subject").map(|s| s.as_str()),
594 Some("Test")
595 );
596 }
597
598 #[test]
599 fn test_email_capabilities() {
600 let ch = EmailChannel::new(
601 EmailConfig::default(),
602 Box::new(MockSmtp),
603 Box::new(MockImap),
604 );
605 let caps = ch.capabilities();
606 assert!(!caps.supports_threads);
607 assert!(caps.supports_files);
608 assert!(caps.max_message_length.is_none());
609 }
610
611 #[test]
612 fn test_email_streaming_mode() {
613 let ch = EmailChannel::new(
614 EmailConfig::default(),
615 Box::new(MockSmtp),
616 Box::new(MockImap),
617 );
618 assert_eq!(
619 ch.streaming_mode(),
620 StreamingMode::Polling { interval_ms: 30000 }
621 );
622 }
623
624 #[tokio::test]
625 async fn test_email_xoauth2_connect() {
626 let config = EmailConfig {
627 username: "user@gmail.com".into(),
628 password: "ya29.oauth-access-token".into(),
629 auth_method: EmailAuthMethod::XOAuth2,
630 ..Default::default()
631 };
632 let mut ch = EmailChannel::new(config, Box::new(MockSmtp), Box::new(MockImap));
633 ch.connect().await.unwrap();
634 assert!(ch.is_connected());
635 }
636
637 #[test]
638 fn test_email_auth_method_default() {
639 let config = EmailConfig::default();
640 assert_eq!(config.auth_method, EmailAuthMethod::Password);
641 }
642
643 #[test]
644 fn test_email_auth_method_serde() {
645 let config = EmailConfig {
646 username: "user@gmail.com".into(),
647 password: "token".into(),
648 auth_method: EmailAuthMethod::XOAuth2,
649 ..Default::default()
650 };
651 let json = serde_json::to_string(&config).unwrap();
652 assert!(json.contains("\"xoauth2\""));
653 let parsed: EmailConfig = serde_json::from_str(&json).unwrap();
654 assert_eq!(parsed.auth_method, EmailAuthMethod::XOAuth2);
655 }
656
657 #[test]
658 fn test_email_xoauth2_token_format() {
659 use crate::oauth::build_xoauth2_token;
660 let token = build_xoauth2_token("user@gmail.com", "ya29.access-token");
661 assert!(token.starts_with("user=user@gmail.com\x01"));
662 assert!(token.contains("auth=Bearer ya29.access-token"));
663 assert!(token.ends_with("\x01\x01"));
664 }
665
666 #[test]
669 fn test_xoauth2_authenticator_response_format() {
670 let auth = XOAuth2Authenticator::new("user@gmail.com", "ya29.test-token");
671 let response = auth.response();
672 assert_eq!(
673 response,
674 "user=user@gmail.com\x01auth=Bearer ya29.test-token\x01\x01"
675 );
676 }
677
678 #[test]
679 fn test_xoauth2_authenticator_process() {
680 let mut auth = XOAuth2Authenticator::new("user@gmail.com", "ya29.test-token");
681 let response = async_imap::Authenticator::process(&mut auth, b"");
683 assert_eq!(
684 response,
685 "user=user@gmail.com\x01auth=Bearer ya29.test-token\x01\x01"
686 );
687 }
688
689 #[test]
690 fn test_xoauth2_authenticator_ignores_challenge() {
691 let mut auth = XOAuth2Authenticator::new("alice@example.com", "token123");
692 let r1 = async_imap::Authenticator::process(&mut auth, b"");
694 let r2 = async_imap::Authenticator::process(&mut auth, b"some challenge data");
695 assert_eq!(r1, r2);
696 }
697
698 #[test]
701 fn test_real_imap_stores_auth_method_password() {
702 let imap = RealImap::new(
703 "imap.gmail.com".into(),
704 993,
705 "user@gmail.com".into(),
706 "password123".into(),
707 EmailAuthMethod::Password,
708 );
709 assert_eq!(imap.auth_method, EmailAuthMethod::Password);
710 }
711
712 #[test]
713 fn test_real_imap_stores_auth_method_xoauth2() {
714 let imap = RealImap::new(
715 "imap.gmail.com".into(),
716 993,
717 "user@gmail.com".into(),
718 "ya29.token".into(),
719 EmailAuthMethod::XOAuth2,
720 );
721 assert_eq!(imap.auth_method, EmailAuthMethod::XOAuth2);
722 }
723
724 #[test]
725 fn test_real_smtp_stores_auth_method_password() {
726 let smtp = RealSmtp::new(
727 "smtp.gmail.com".into(),
728 587,
729 "user@gmail.com".into(),
730 "password123".into(),
731 "user@gmail.com".into(),
732 EmailAuthMethod::Password,
733 );
734 assert_eq!(smtp.auth_method, EmailAuthMethod::Password);
735 }
736
737 #[test]
738 fn test_real_smtp_stores_auth_method_xoauth2() {
739 let smtp = RealSmtp::new(
740 "smtp.gmail.com".into(),
741 587,
742 "user@gmail.com".into(),
743 "ya29.token".into(),
744 "user@gmail.com".into(),
745 EmailAuthMethod::XOAuth2,
746 );
747 assert_eq!(smtp.auth_method, EmailAuthMethod::XOAuth2);
748 }
749
750 #[test]
751 fn test_create_email_channel_passes_auth_method_password() {
752 let config = EmailConfig {
753 imap_host: "imap.gmail.com".into(),
754 imap_port: 993,
755 smtp_host: "smtp.gmail.com".into(),
756 smtp_port: 587,
757 username: "user@gmail.com".into(),
758 password: "pass".into(),
759 from_address: "user@gmail.com".into(),
760 auth_method: EmailAuthMethod::Password,
761 ..Default::default()
762 };
763 let _ch = create_email_channel(config);
765 }
766
767 #[test]
768 fn test_create_email_channel_passes_auth_method_xoauth2() {
769 let config = EmailConfig {
770 imap_host: "imap.gmail.com".into(),
771 imap_port: 993,
772 smtp_host: "smtp.gmail.com".into(),
773 smtp_port: 587,
774 username: "user@gmail.com".into(),
775 password: "ya29.token".into(),
776 from_address: "user@gmail.com".into(),
777 auth_method: EmailAuthMethod::XOAuth2,
778 ..Default::default()
779 };
780 let _ch = create_email_channel(config);
782 }
783}