Skip to main content

postcrate_core/smtp/
relay.rs

1//! Minimal outbound SMTP client used by `Service::release_email` to
2//! forward a captured message to a real address through a real relay
3//! (e.g. a developer's transactional provider).
4//!
5//! Deliberately tiny:
6//!   - plain TCP only (no STARTTLS, no AUTH yet);
7//!   - dot-stuffs the body on the way out;
8//!   - drops the connection on the first error.
9//!
10//! Per PROD.md §9.3 the "release" action is opt-in and audit-logged;
11//! callers are responsible for not pointing it at production relays
12//! unintentionally.
13
14use std::time::Duration;
15
16use serde::{Deserialize, Serialize};
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::net::TcpStream;
19
20use crate::error::{Error, Result};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[cfg_attr(feature = "specta", derive(specta::Type))]
24#[serde(rename_all = "camelCase")]
25pub struct RelayConfig {
26    /// Relay host, e.g. `smtp.resend.com` or `127.0.0.1`.
27    pub host: String,
28    /// Relay port. 25 for legacy, 587 for submission, 1025 for local.
29    pub port: u16,
30    /// Connect+IO timeout (defaults to 30s).
31    #[serde(default)]
32    pub timeout_seconds: Option<u32>,
33    /// Glob-pattern allowlist of recipient addresses the relay is
34    /// allowed to deliver to. `["alice@example.com", "*@test.local"]`.
35    /// `None` or empty means "any recipient" (no restriction). Used
36    /// by `Service::release_email` to prevent accidentally pointing
37    /// the relay at production recipients.
38    #[serde(default)]
39    pub allowed_recipients: Option<Vec<String>>,
40}
41
42impl RelayConfig {
43    fn timeout(&self) -> Duration {
44        Duration::from_secs(u64::from(self.timeout_seconds.unwrap_or(30).max(1)))
45    }
46}
47
48/// Forward `raw` to `relay` with the given envelope. The raw bytes
49/// are sent unchanged (only dot-stuffed for transport).
50///
51/// Recipients are filtered against `relay.allowed_recipients` (glob
52/// matching) before any network call. Any recipient that fails the
53/// allowlist makes the whole release fail — we'd rather error than
54/// silently drop a recipient.
55pub async fn relay_message(
56    relay: &RelayConfig,
57    mail_from: &str,
58    rcpt_to: &[String],
59    raw: &[u8],
60) -> Result<()> {
61    if rcpt_to.is_empty() {
62        return Err(Error::Invalid("release requires at least one recipient".into()));
63    }
64    if let Some(allow) = relay.allowed_recipients.as_ref().filter(|v| !v.is_empty()) {
65        for rcpt in rcpt_to {
66            if !allow.iter().any(|pat| glob_match(pat, rcpt)) {
67                return Err(Error::Invalid(format!(
68                    "recipient {rcpt:?} not in relay allowlist"
69                )));
70            }
71        }
72    }
73    let to = (relay.host.as_str(), relay.port);
74    let stream = tokio::time::timeout(relay.timeout(), TcpStream::connect(to))
75        .await
76        .map_err(|_| Error::Internal(format!("connect timeout to {}:{}", relay.host, relay.port)))?
77        .map_err(|e| Error::Internal(format!("connect {}:{}: {e}", relay.host, relay.port)))?;
78    let (r, mut w) = stream.into_split();
79    let mut br = BufReader::new(r);
80
81    expect_code(&mut br, "220").await?;
82    send_line(&mut w, &format!("EHLO postcrate.local\r\n")).await?;
83    drain_multi(&mut br, "250").await?;
84    send_line(&mut w, &format!("MAIL FROM:<{}>\r\n", mail_from)).await?;
85    expect_code(&mut br, "250").await?;
86    for rcpt in rcpt_to {
87        send_line(&mut w, &format!("RCPT TO:<{}>\r\n", rcpt)).await?;
88        expect_code(&mut br, "250").await?;
89    }
90    send_line(&mut w, "DATA\r\n").await?;
91    expect_code(&mut br, "354").await?;
92
93    let stuffed = dot_stuff(raw);
94    w.write_all(&stuffed).await?;
95    if !stuffed.ends_with(b"\r\n") {
96        w.write_all(b"\r\n").await?;
97    }
98    w.write_all(b".\r\n").await?;
99    w.flush().await?;
100    expect_code(&mut br, "250").await?;
101    let _ = send_line(&mut w, "QUIT\r\n").await;
102    let _ = expect_code(&mut br, "221").await;
103    Ok(())
104}
105
106async fn send_line<W: AsyncWriteExt + Unpin>(w: &mut W, s: &str) -> Result<()> {
107    w.write_all(s.as_bytes()).await?;
108    w.flush().await?;
109    Ok(())
110}
111
112async fn expect_code<R: tokio::io::AsyncRead + Unpin>(
113    br: &mut BufReader<R>,
114    code: &str,
115) -> Result<()> {
116    let mut line = String::new();
117    br.read_line(&mut line).await?;
118    if !line.starts_with(code) {
119        return Err(Error::Internal(format!(
120            "relay expected {code}, got {}",
121            line.trim_end()
122        )));
123    }
124    Ok(())
125}
126
127async fn drain_multi<R: tokio::io::AsyncRead + Unpin>(
128    br: &mut BufReader<R>,
129    code: &str,
130) -> Result<()> {
131    loop {
132        let mut line = String::new();
133        let n = br.read_line(&mut line).await?;
134        if n == 0 {
135            return Err(Error::Internal("relay closed mid-reply".into()));
136        }
137        if !line.starts_with(code) {
138            return Err(Error::Internal(format!(
139                "relay expected {code}, got {}",
140                line.trim_end()
141            )));
142        }
143        // Final line of a multi-line reply has a space at index 3.
144        if line.len() >= 4 && line.as_bytes()[3] == b' ' {
145            return Ok(());
146        }
147    }
148}
149
150/// Match `address` against a glob pattern. Same semantics as the
151/// bounce-rule matcher: `*` is the only wildcard, comparison is
152/// case-insensitive.
153fn glob_match(pattern: &str, address: &str) -> bool {
154    let p = pattern.to_lowercase();
155    let a = address.to_lowercase();
156    glob_inner(&p, &a)
157}
158
159fn glob_inner(p: &str, a: &str) -> bool {
160    let mut pi = p.chars().peekable();
161    let mut ai = a.chars().peekable();
162    loop {
163        match (pi.peek().copied(), ai.peek().copied()) {
164            (None, None) => return true,
165            (None, Some(_)) => return false,
166            (Some('*'), _) => {
167                pi.next();
168                if pi.peek().is_none() {
169                    return true;
170                }
171                let rest_p: String = pi.clone().collect();
172                let mut rest_a: String = ai.clone().collect();
173                loop {
174                    if glob_inner(&rest_p, &rest_a) {
175                        return true;
176                    }
177                    if rest_a.is_empty() {
178                        return false;
179                    }
180                    rest_a.remove(0);
181                }
182            }
183            (Some(pc), Some(ac)) if pc == ac => {
184                pi.next();
185                ai.next();
186            }
187            _ => return false,
188        }
189    }
190}
191
192fn dot_stuff(body: &[u8]) -> Vec<u8> {
193    let mut out = Vec::with_capacity(body.len());
194    let mut at_line_start = true;
195    for &b in body {
196        if at_line_start && b == b'.' {
197            out.push(b'.');
198        }
199        out.push(b);
200        at_line_start = b == b'\n';
201    }
202    out
203}