Skip to main content

plexus_comms/activations/email/
imap.rs

1use super::storage::ImapAccountConfig;
2use async_trait::async_trait;
3use tokio_util::compat::TokioAsyncReadCompatExt;
4use futures::StreamExt;
5
6#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
7pub struct EmailMessage {
8    pub uid: u32,
9    pub subject: String,
10    pub from: String,
11    pub to: Vec<String>,
12    pub date: Option<String>,
13    pub body_text: Option<String>,
14    pub body_html: Option<String>,
15    pub is_seen: bool,
16}
17
18#[async_trait]
19pub trait ImapProvider: Send + Sync {
20    async fn fetch_messages(&self, limit: Option<u32>) -> Result<Vec<EmailMessage>, String>;
21    async fn search_messages(&self, query: &str) -> Result<Vec<EmailMessage>, String>;
22    async fn mark_seen(&self, uid: u32) -> Result<(), String>;
23    async fn mark_unseen(&self, uid: u32) -> Result<(), String>;
24}
25
26#[cfg(feature = "email-imap")]
27pub struct AsyncImapProvider {
28    config: ImapAccountConfig,
29}
30
31#[cfg(feature = "email-imap")]
32impl AsyncImapProvider {
33    pub fn new(config: ImapAccountConfig) -> Self {
34        Self { config }
35    }
36
37    async fn connect(&self) -> Result<async_imap::Session<async_native_tls::TlsStream<tokio_util::compat::Compat<tokio::net::TcpStream>>>, String> {
38        let tcp_stream = tokio::net::TcpStream::connect((self.config.host.as_str(), self.config.port))
39            .await
40            .map_err(|e| format!("Failed to connect to TCP: {}", e))?;
41
42        // Convert tokio AsyncRead/AsyncWrite to futures AsyncRead/AsyncWrite
43        let tcp_stream_compat = tcp_stream.compat();
44
45        let tls = async_native_tls::TlsConnector::new();
46        let tls_stream = tls
47            .connect(&self.config.host, tcp_stream_compat)
48            .await
49            .map_err(|e| format!("Failed to establish TLS: {}", e))?;
50
51        let client = async_imap::Client::new(tls_stream);
52
53        let session = client
54            .login(&self.config.username, &self.config.password)
55            .await
56            .map_err(|e| format!("Failed to login to IMAP: {:?}", e.0))?;
57
58        Ok(session)
59    }
60
61    fn parse_message(fetch: &async_imap::types::Fetch) -> Result<EmailMessage, String> {
62        let envelope = fetch
63            .envelope()
64            .ok_or_else(|| "No envelope in message".to_string())?;
65
66        let subject = envelope
67            .subject
68            .as_ref()
69            .and_then(|s| std::str::from_utf8(s).ok())
70            .unwrap_or("")
71            .to_string();
72
73        let from = envelope
74            .from
75            .as_ref()
76            .and_then(|addrs| addrs.first())
77            .and_then(|addr| {
78                let mailbox = addr.mailbox.as_ref().and_then(|m| std::str::from_utf8(m).ok())?;
79                let host = addr.host.as_ref().and_then(|h| std::str::from_utf8(h).ok())?;
80                Some(format!("{}@{}", mailbox, host))
81            })
82            .unwrap_or_default();
83
84        let to = envelope
85            .to
86            .as_ref()
87            .map(|addrs| {
88                addrs
89                    .iter()
90                    .filter_map(|addr| {
91                        let mailbox = addr.mailbox.as_ref().and_then(|m| std::str::from_utf8(m).ok())?;
92                        let host = addr.host.as_ref().and_then(|h| std::str::from_utf8(h).ok())?;
93                        Some(format!("{}@{}", mailbox, host))
94                    })
95                    .collect()
96            })
97            .unwrap_or_default();
98
99        let date = envelope
100            .date
101            .as_ref()
102            .and_then(|d| std::str::from_utf8(d).ok())
103            .map(|s| s.to_string());
104
105        let body_text = fetch.text().and_then(|b| std::str::from_utf8(b).ok()).map(|s| s.to_string());
106
107        let is_seen = fetch.flags().any(|f| matches!(f, async_imap::types::Flag::Seen));
108
109        Ok(EmailMessage {
110            uid: fetch.uid.unwrap_or(0),
111            subject,
112            from,
113            to,
114            date,
115            body_text,
116            body_html: None,
117            is_seen,
118        })
119    }
120}
121
122#[cfg(feature = "email-imap")]
123#[async_trait]
124impl ImapProvider for AsyncImapProvider {
125    async fn fetch_messages(&self, limit: Option<u32>) -> Result<Vec<EmailMessage>, String> {
126        let mut session = self.connect().await?;
127
128        session
129            .select("INBOX")
130            .await
131            .map_err(|e| format!("Failed to select INBOX: {}", e))?;
132
133        let sequence = if let Some(limit) = limit {
134            format!("1:{}", limit)
135        } else {
136            "1:*".to_string()
137        };
138
139        let messages_stream = session
140            .fetch(sequence, "(ENVELOPE FLAGS BODY[TEXT])")
141            .await
142            .map_err(|e| format!("Failed to fetch messages: {}", e))?;
143
144        let mut result = Vec::new();
145        let fetches: Vec<_> = messages_stream
146            .collect::<Vec<_>>()
147            .await;
148
149        for fetch_result in fetches {
150            match fetch_result {
151                Ok(fetch) => {
152                    if let Ok(msg) = Self::parse_message(&fetch) {
153                        result.push(msg);
154                    }
155                }
156                Err(e) => {
157                    eprintln!("Error fetching message: {}", e);
158                }
159            }
160        }
161
162        session
163            .logout()
164            .await
165            .map_err(|e| format!("Failed to logout: {}", e))?;
166
167        Ok(result)
168    }
169
170    async fn search_messages(&self, query: &str) -> Result<Vec<EmailMessage>, String> {
171        let mut session = self.connect().await?;
172
173        session
174            .select("INBOX")
175            .await
176            .map_err(|e| format!("Failed to select INBOX: {}", e))?;
177
178        let uids = session
179            .search(query)
180            .await
181            .map_err(|e| format!("Failed to search: {}", e))?;
182
183        if uids.is_empty() {
184            session.logout().await.ok();
185            return Ok(Vec::new());
186        }
187
188        let uid_sequence = uids
189            .iter()
190            .map(|u| u.to_string())
191            .collect::<Vec<_>>()
192            .join(",");
193
194        let messages_stream = session
195            .fetch(uid_sequence, "(ENVELOPE FLAGS BODY[TEXT])")
196            .await
197            .map_err(|e| format!("Failed to fetch messages: {}", e))?;
198
199        let mut result = Vec::new();
200        let fetches: Vec<_> = messages_stream
201            .collect::<Vec<_>>()
202            .await;
203
204        for fetch_result in fetches {
205            match fetch_result {
206                Ok(fetch) => {
207                    if let Ok(msg) = Self::parse_message(&fetch) {
208                        result.push(msg);
209                    }
210                }
211                Err(e) => {
212                    eprintln!("Error fetching message: {}", e);
213                }
214            }
215        }
216
217        session
218            .logout()
219            .await
220            .map_err(|e| format!("Failed to logout: {}", e))?;
221
222        Ok(result)
223    }
224
225    async fn mark_seen(&self, uid: u32) -> Result<(), String> {
226        let mut session = self.connect().await?;
227
228        session
229            .select("INBOX")
230            .await
231            .map_err(|e| format!("Failed to select INBOX: {}", e))?;
232
233        session
234            .store(format!("{}", uid), "+FLAGS (\\Seen)")
235            .await
236            .map_err(|e| format!("Failed to mark as seen: {}", e))?;
237
238        session
239            .logout()
240            .await
241            .map_err(|e| format!("Failed to logout: {}", e))?;
242
243        Ok(())
244    }
245
246    async fn mark_unseen(&self, uid: u32) -> Result<(), String> {
247        let mut session = self.connect().await?;
248
249        session
250            .select("INBOX")
251            .await
252            .map_err(|e| format!("Failed to select INBOX: {}", e))?;
253
254        session
255            .store(format!("{}", uid), "-FLAGS (\\Seen)")
256            .await
257            .map_err(|e| format!("Failed to mark as unseen: {}", e))?;
258
259        session
260            .logout()
261            .await
262            .map_err(|e| format!("Failed to logout: {}", e))?;
263
264        Ok(())
265    }
266}
267
268#[cfg(feature = "email-imap")]
269pub fn create_imap_provider(config: ImapAccountConfig) -> Box<dyn ImapProvider> {
270    Box::new(AsyncImapProvider::new(config))
271}
272
273#[cfg(not(feature = "email-imap"))]
274pub fn create_imap_provider(_config: ImapAccountConfig) -> Box<dyn ImapProvider> {
275    panic!("IMAP support not enabled. Enable 'email-imap' feature.");
276}