plexus_comms/activations/email/
imap.rs1use super::storage::ImapAccountConfig;
2use async_trait::async_trait;
3use tokio_util::compat::TokioAsyncReadCompatExt;
4use futures::StreamExt;
5
6#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
7pub struct EmailMessage {
8 pub uid: u32,
9 pub subject: String,
10 pub from: String,
11 pub to: Vec<String>,
12 pub date: Option<String>,
13 pub body_text: Option<String>,
14 pub body_html: Option<String>,
15 pub is_seen: bool,
16}
17
18#[async_trait]
19pub trait ImapProvider: Send + Sync {
20 async fn fetch_messages(&self, limit: Option<u32>) -> Result<Vec<EmailMessage>, String>;
21 async fn search_messages(&self, query: &str) -> Result<Vec<EmailMessage>, String>;
22 async fn mark_seen(&self, uid: u32) -> Result<(), String>;
23 async fn mark_unseen(&self, uid: u32) -> Result<(), String>;
24}
25
26#[cfg(feature = "email-imap")]
27pub struct AsyncImapProvider {
28 config: ImapAccountConfig,
29}
30
31#[cfg(feature = "email-imap")]
32impl AsyncImapProvider {
33 pub fn new(config: ImapAccountConfig) -> Self {
34 Self { config }
35 }
36
37 async fn connect(&self) -> Result<async_imap::Session<async_native_tls::TlsStream<tokio_util::compat::Compat<tokio::net::TcpStream>>>, String> {
38 let tcp_stream = tokio::net::TcpStream::connect((self.config.host.as_str(), self.config.port))
39 .await
40 .map_err(|e| format!("Failed to connect to TCP: {}", e))?;
41
42 let tcp_stream_compat = tcp_stream.compat();
44
45 let tls = async_native_tls::TlsConnector::new();
46 let tls_stream = tls
47 .connect(&self.config.host, tcp_stream_compat)
48 .await
49 .map_err(|e| format!("Failed to establish TLS: {}", e))?;
50
51 let client = async_imap::Client::new(tls_stream);
52
53 let session = client
54 .login(&self.config.username, &self.config.password)
55 .await
56 .map_err(|e| format!("Failed to login to IMAP: {:?}", e.0))?;
57
58 Ok(session)
59 }
60
61 fn parse_message(fetch: &async_imap::types::Fetch) -> Result<EmailMessage, String> {
62 let envelope = fetch
63 .envelope()
64 .ok_or_else(|| "No envelope in message".to_string())?;
65
66 let subject = envelope
67 .subject
68 .as_ref()
69 .and_then(|s| std::str::from_utf8(s).ok())
70 .unwrap_or("")
71 .to_string();
72
73 let from = envelope
74 .from
75 .as_ref()
76 .and_then(|addrs| addrs.first())
77 .and_then(|addr| {
78 let mailbox = addr.mailbox.as_ref().and_then(|m| std::str::from_utf8(m).ok())?;
79 let host = addr.host.as_ref().and_then(|h| std::str::from_utf8(h).ok())?;
80 Some(format!("{}@{}", mailbox, host))
81 })
82 .unwrap_or_default();
83
84 let to = envelope
85 .to
86 .as_ref()
87 .map(|addrs| {
88 addrs
89 .iter()
90 .filter_map(|addr| {
91 let mailbox = addr.mailbox.as_ref().and_then(|m| std::str::from_utf8(m).ok())?;
92 let host = addr.host.as_ref().and_then(|h| std::str::from_utf8(h).ok())?;
93 Some(format!("{}@{}", mailbox, host))
94 })
95 .collect()
96 })
97 .unwrap_or_default();
98
99 let date = envelope
100 .date
101 .as_ref()
102 .and_then(|d| std::str::from_utf8(d).ok())
103 .map(|s| s.to_string());
104
105 let body_text = fetch.text().and_then(|b| std::str::from_utf8(b).ok()).map(|s| s.to_string());
106
107 let is_seen = fetch.flags().any(|f| matches!(f, async_imap::types::Flag::Seen));
108
109 Ok(EmailMessage {
110 uid: fetch.uid.unwrap_or(0),
111 subject,
112 from,
113 to,
114 date,
115 body_text,
116 body_html: None,
117 is_seen,
118 })
119 }
120}
121
122#[cfg(feature = "email-imap")]
123#[async_trait]
124impl ImapProvider for AsyncImapProvider {
125 async fn fetch_messages(&self, limit: Option<u32>) -> Result<Vec<EmailMessage>, String> {
126 let mut session = self.connect().await?;
127
128 session
129 .select("INBOX")
130 .await
131 .map_err(|e| format!("Failed to select INBOX: {}", e))?;
132
133 let sequence = if let Some(limit) = limit {
134 format!("1:{}", limit)
135 } else {
136 "1:*".to_string()
137 };
138
139 let messages_stream = session
140 .fetch(sequence, "(ENVELOPE FLAGS BODY[TEXT])")
141 .await
142 .map_err(|e| format!("Failed to fetch messages: {}", e))?;
143
144 let mut result = Vec::new();
145 let fetches: Vec<_> = messages_stream
146 .collect::<Vec<_>>()
147 .await;
148
149 for fetch_result in fetches {
150 match fetch_result {
151 Ok(fetch) => {
152 if let Ok(msg) = Self::parse_message(&fetch) {
153 result.push(msg);
154 }
155 }
156 Err(e) => {
157 eprintln!("Error fetching message: {}", e);
158 }
159 }
160 }
161
162 session
163 .logout()
164 .await
165 .map_err(|e| format!("Failed to logout: {}", e))?;
166
167 Ok(result)
168 }
169
170 async fn search_messages(&self, query: &str) -> Result<Vec<EmailMessage>, String> {
171 let mut session = self.connect().await?;
172
173 session
174 .select("INBOX")
175 .await
176 .map_err(|e| format!("Failed to select INBOX: {}", e))?;
177
178 let uids = session
179 .search(query)
180 .await
181 .map_err(|e| format!("Failed to search: {}", e))?;
182
183 if uids.is_empty() {
184 session.logout().await.ok();
185 return Ok(Vec::new());
186 }
187
188 let uid_sequence = uids
189 .iter()
190 .map(|u| u.to_string())
191 .collect::<Vec<_>>()
192 .join(",");
193
194 let messages_stream = session
195 .fetch(uid_sequence, "(ENVELOPE FLAGS BODY[TEXT])")
196 .await
197 .map_err(|e| format!("Failed to fetch messages: {}", e))?;
198
199 let mut result = Vec::new();
200 let fetches: Vec<_> = messages_stream
201 .collect::<Vec<_>>()
202 .await;
203
204 for fetch_result in fetches {
205 match fetch_result {
206 Ok(fetch) => {
207 if let Ok(msg) = Self::parse_message(&fetch) {
208 result.push(msg);
209 }
210 }
211 Err(e) => {
212 eprintln!("Error fetching message: {}", e);
213 }
214 }
215 }
216
217 session
218 .logout()
219 .await
220 .map_err(|e| format!("Failed to logout: {}", e))?;
221
222 Ok(result)
223 }
224
225 async fn mark_seen(&self, uid: u32) -> Result<(), String> {
226 let mut session = self.connect().await?;
227
228 session
229 .select("INBOX")
230 .await
231 .map_err(|e| format!("Failed to select INBOX: {}", e))?;
232
233 session
234 .store(format!("{}", uid), "+FLAGS (\\Seen)")
235 .await
236 .map_err(|e| format!("Failed to mark as seen: {}", e))?;
237
238 session
239 .logout()
240 .await
241 .map_err(|e| format!("Failed to logout: {}", e))?;
242
243 Ok(())
244 }
245
246 async fn mark_unseen(&self, uid: u32) -> Result<(), String> {
247 let mut session = self.connect().await?;
248
249 session
250 .select("INBOX")
251 .await
252 .map_err(|e| format!("Failed to select INBOX: {}", e))?;
253
254 session
255 .store(format!("{}", uid), "-FLAGS (\\Seen)")
256 .await
257 .map_err(|e| format!("Failed to mark as unseen: {}", e))?;
258
259 session
260 .logout()
261 .await
262 .map_err(|e| format!("Failed to logout: {}", e))?;
263
264 Ok(())
265 }
266}
267
268#[cfg(feature = "email-imap")]
269pub fn create_imap_provider(config: ImapAccountConfig) -> Box<dyn ImapProvider> {
270 Box::new(AsyncImapProvider::new(config))
271}
272
273#[cfg(not(feature = "email-imap"))]
274pub fn create_imap_provider(_config: ImapAccountConfig) -> Box<dyn ImapProvider> {
275 panic!("IMAP support not enabled. Enable 'email-imap' feature.");
276}