Skip to main content

sparrow/gateway/
email.rs

1//! Email gateway transport (§3.16).
2//!
3//! Outbound: SMTP via `lettre`. Inbound: IMAP polling of UNSEEN messages from
4//! allowed senders, surfaced as `GatewayMessage`s. Both are behind the `email`
5//! cargo feature. No fake success — misconfiguration returns real errors.
6
7use async_trait::async_trait;
8use tokio::sync::mpsc;
9
10use super::{GatewayMessage, GatewayResponse, GatewayTransport};
11
12#[cfg(feature = "email")]
13use lettre::{
14    AsyncSmtpTransport, AsyncTransport, Tokio1Executor,
15    message::{Mailbox, Message, header::ContentType},
16    transport::smtp::authentication::Credentials,
17};
18
19/// SMTP (outbound) + optional IMAP (inbound) email transport.
20pub struct EmailTransport {
21    pub from: String,
22    pub smtp_host: String,
23    pub smtp_port: u16,
24    pub username: String,
25    pub password: String,
26    pub allowed_to: Vec<String>,
27    /// IMAP server for inbound polling (None = outbound only).
28    pub imap_host: Option<String>,
29    pub imap_port: u16,
30}
31
32impl EmailTransport {
33    pub fn new(
34        from: String,
35        smtp_host: String,
36        smtp_port: u16,
37        username: String,
38        password: String,
39        allowed_to: Vec<String>,
40    ) -> Self {
41        Self {
42            from,
43            smtp_host,
44            smtp_port,
45            username,
46            password,
47            allowed_to,
48            imap_host: None,
49            imap_port: 993,
50        }
51    }
52
53    /// Enable inbound IMAP polling.
54    pub fn with_imap(mut self, host: String, port: u16) -> Self {
55        self.imap_host = Some(host);
56        self.imap_port = port;
57        self
58    }
59}
60
61#[cfg(feature = "email")]
62fn parse_email(raw: &str) -> (String, String, String) {
63    // Minimal RFC822 split: headers vs body, pull From/Subject.
64    let split = raw.find("\r\n\r\n").or_else(|| raw.find("\n\n"));
65    let (headers, body) = match split {
66        Some(i) => (&raw[..i], raw[i..].trim_start()),
67        None => (raw, ""),
68    };
69    let mut from = String::new();
70    let mut subject = String::new();
71    for line in headers.lines() {
72        let lower = line.to_ascii_lowercase();
73        if lower.starts_with("from:") {
74            from = line[5..].trim().to_string();
75        } else if lower.starts_with("subject:") {
76            subject = line[8..].trim().to_string();
77        }
78    }
79    // Extract bare address from "Name <addr>"
80    if let (Some(a), Some(b)) = (from.find('<'), from.find('>')) {
81        if a < b {
82            from = from[a + 1..b].to_string();
83        }
84    }
85    (from, subject, body.to_string())
86}
87
88#[async_trait]
89impl GatewayTransport for EmailTransport {
90    fn name(&self) -> &str {
91        "email"
92    }
93
94    #[cfg(feature = "email")]
95    async fn start(&self, tx: mpsc::UnboundedSender<GatewayMessage>) -> anyhow::Result<()> {
96        let Some(imap_host) = self.imap_host.clone() else {
97            tracing::info!(
98                "Email gateway: outbound only via SMTP {}:{}",
99                self.smtp_host,
100                self.smtp_port
101            );
102            return Ok(());
103        };
104
105        // IMAP polling runs on a dedicated blocking thread (imap crate is sync).
106        let port = self.imap_port;
107        let user = self.username.clone();
108        let pass = self.password.clone();
109        let allowed = self.allowed_to.clone();
110        tracing::info!("Email gateway: IMAP inbound polling {}:{}", imap_host, port);
111
112        std::thread::spawn(move || {
113            loop {
114                if let Err(e) = poll_imap_once(&imap_host, port, &user, &pass, &allowed, &tx) {
115                    tracing::warn!("IMAP poll error: {}", e);
116                }
117                std::thread::sleep(std::time::Duration::from_secs(30));
118            }
119        });
120        Ok(())
121    }
122
123    #[cfg(not(feature = "email"))]
124    async fn start(&self, _tx: mpsc::UnboundedSender<GatewayMessage>) -> anyhow::Result<()> {
125        anyhow::bail!("email feature not enabled — rebuild with `cargo build --features email`")
126    }
127
128    #[cfg(feature = "email")]
129    async fn send(&self, response: GatewayResponse) -> anyhow::Result<()> {
130        if !self.allowed_to.iter().any(|a| a == &response.chat_id) {
131            anyhow::bail!(
132                "email recipient {} not in allowed_to list",
133                response.chat_id
134            );
135        }
136        let from: Mailbox = self.from.parse()?;
137        let to: Mailbox = response.chat_id.parse()?;
138        let subject = response
139            .text
140            .lines()
141            .next()
142            .unwrap_or("Sparrow update")
143            .chars()
144            .take(120)
145            .collect::<String>();
146        let email = Message::builder()
147            .from(from)
148            .to(to)
149            .subject(subject)
150            .header(ContentType::TEXT_PLAIN)
151            .body(response.text)?;
152        let creds = Credentials::new(self.username.clone(), self.password.clone());
153        let mailer: AsyncSmtpTransport<Tokio1Executor> =
154            AsyncSmtpTransport::<Tokio1Executor>::relay(&self.smtp_host)?
155                .port(self.smtp_port)
156                .credentials(creds)
157                .build();
158        mailer.send(email).await?;
159        Ok(())
160    }
161
162    #[cfg(not(feature = "email"))]
163    async fn send(&self, _response: GatewayResponse) -> anyhow::Result<()> {
164        anyhow::bail!("email feature not enabled — rebuild with `cargo build --features email`")
165    }
166
167    async fn stop(&self) -> anyhow::Result<()> {
168        Ok(())
169    }
170}
171
172/// One IMAP poll: fetch UNSEEN from allowed senders, emit GatewayMessages, mark seen.
173#[cfg(feature = "email")]
174fn poll_imap_once(
175    host: &str,
176    port: u16,
177    user: &str,
178    pass: &str,
179    allowed: &[String],
180    tx: &mpsc::UnboundedSender<GatewayMessage>,
181) -> anyhow::Result<()> {
182    let tls = native_tls::TlsConnector::builder().build()?;
183    let client = imap::connect((host, port), host, &tls)?;
184    let mut session = client
185        .login(user, pass)
186        .map_err(|(e, _)| anyhow::anyhow!("IMAP login failed: {}", e))?;
187    session.select("INBOX")?;
188
189    let unseen = session.search("UNSEEN")?;
190    for uid in unseen {
191        let messages = session.fetch(uid.to_string(), "RFC822")?;
192        for msg in messages.iter() {
193            if let Some(body) = msg.body() {
194                let raw = String::from_utf8_lossy(body);
195                let (from, subject, text) = parse_email(&raw);
196                let allowed_ok =
197                    allowed.is_empty() || allowed.iter().any(|a| from.contains(a.as_str()));
198                if !allowed_ok {
199                    continue;
200                }
201                let combined = if subject.is_empty() {
202                    text
203                } else {
204                    format!("{}\n\n{}", subject, text)
205                };
206                let _ = tx.send(GatewayMessage {
207                    surface: "email".into(),
208                    user_id: from.clone(),
209                    chat_id: from,
210                    text: combined.trim().to_string(),
211                    message_id: Some(uid.to_string()),
212                });
213            }
214        }
215        // Mark as seen so we don't reprocess.
216        let _ = session.store(uid.to_string(), "+FLAGS (\\Seen)");
217    }
218
219    let _ = session.logout();
220    Ok(())
221}