postcrate_core/smtp/
relay.rs1use std::time::Duration;
15
16use serde::{Deserialize, Serialize};
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::net::TcpStream;
19
20use crate::error::{Error, Result};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[cfg_attr(feature = "specta", derive(specta::Type))]
24#[serde(rename_all = "camelCase")]
25pub struct RelayConfig {
26 pub host: String,
28 pub port: u16,
30 #[serde(default)]
32 pub timeout_seconds: Option<u32>,
33 #[serde(default)]
39 pub allowed_recipients: Option<Vec<String>>,
40}
41
42impl RelayConfig {
43 fn timeout(&self) -> Duration {
44 Duration::from_secs(u64::from(self.timeout_seconds.unwrap_or(30).max(1)))
45 }
46}
47
48pub async fn relay_message(
56 relay: &RelayConfig,
57 mail_from: &str,
58 rcpt_to: &[String],
59 raw: &[u8],
60) -> Result<()> {
61 if rcpt_to.is_empty() {
62 return Err(Error::Invalid("release requires at least one recipient".into()));
63 }
64 if let Some(allow) = relay.allowed_recipients.as_ref().filter(|v| !v.is_empty()) {
65 for rcpt in rcpt_to {
66 if !allow.iter().any(|pat| glob_match(pat, rcpt)) {
67 return Err(Error::Invalid(format!(
68 "recipient {rcpt:?} not in relay allowlist"
69 )));
70 }
71 }
72 }
73 let to = (relay.host.as_str(), relay.port);
74 let stream = tokio::time::timeout(relay.timeout(), TcpStream::connect(to))
75 .await
76 .map_err(|_| Error::Internal(format!("connect timeout to {}:{}", relay.host, relay.port)))?
77 .map_err(|e| Error::Internal(format!("connect {}:{}: {e}", relay.host, relay.port)))?;
78 let (r, mut w) = stream.into_split();
79 let mut br = BufReader::new(r);
80
81 expect_code(&mut br, "220").await?;
82 send_line(&mut w, &format!("EHLO postcrate.local\r\n")).await?;
83 drain_multi(&mut br, "250").await?;
84 send_line(&mut w, &format!("MAIL FROM:<{}>\r\n", mail_from)).await?;
85 expect_code(&mut br, "250").await?;
86 for rcpt in rcpt_to {
87 send_line(&mut w, &format!("RCPT TO:<{}>\r\n", rcpt)).await?;
88 expect_code(&mut br, "250").await?;
89 }
90 send_line(&mut w, "DATA\r\n").await?;
91 expect_code(&mut br, "354").await?;
92
93 let stuffed = dot_stuff(raw);
94 w.write_all(&stuffed).await?;
95 if !stuffed.ends_with(b"\r\n") {
96 w.write_all(b"\r\n").await?;
97 }
98 w.write_all(b".\r\n").await?;
99 w.flush().await?;
100 expect_code(&mut br, "250").await?;
101 let _ = send_line(&mut w, "QUIT\r\n").await;
102 let _ = expect_code(&mut br, "221").await;
103 Ok(())
104}
105
106async fn send_line<W: AsyncWriteExt + Unpin>(w: &mut W, s: &str) -> Result<()> {
107 w.write_all(s.as_bytes()).await?;
108 w.flush().await?;
109 Ok(())
110}
111
112async fn expect_code<R: tokio::io::AsyncRead + Unpin>(
113 br: &mut BufReader<R>,
114 code: &str,
115) -> Result<()> {
116 let mut line = String::new();
117 br.read_line(&mut line).await?;
118 if !line.starts_with(code) {
119 return Err(Error::Internal(format!(
120 "relay expected {code}, got {}",
121 line.trim_end()
122 )));
123 }
124 Ok(())
125}
126
127async fn drain_multi<R: tokio::io::AsyncRead + Unpin>(
128 br: &mut BufReader<R>,
129 code: &str,
130) -> Result<()> {
131 loop {
132 let mut line = String::new();
133 let n = br.read_line(&mut line).await?;
134 if n == 0 {
135 return Err(Error::Internal("relay closed mid-reply".into()));
136 }
137 if !line.starts_with(code) {
138 return Err(Error::Internal(format!(
139 "relay expected {code}, got {}",
140 line.trim_end()
141 )));
142 }
143 if line.len() >= 4 && line.as_bytes()[3] == b' ' {
145 return Ok(());
146 }
147 }
148}
149
150fn glob_match(pattern: &str, address: &str) -> bool {
154 let p = pattern.to_lowercase();
155 let a = address.to_lowercase();
156 glob_inner(&p, &a)
157}
158
159fn glob_inner(p: &str, a: &str) -> bool {
160 let mut pi = p.chars().peekable();
161 let mut ai = a.chars().peekable();
162 loop {
163 match (pi.peek().copied(), ai.peek().copied()) {
164 (None, None) => return true,
165 (None, Some(_)) => return false,
166 (Some('*'), _) => {
167 pi.next();
168 if pi.peek().is_none() {
169 return true;
170 }
171 let rest_p: String = pi.clone().collect();
172 let mut rest_a: String = ai.clone().collect();
173 loop {
174 if glob_inner(&rest_p, &rest_a) {
175 return true;
176 }
177 if rest_a.is_empty() {
178 return false;
179 }
180 rest_a.remove(0);
181 }
182 }
183 (Some(pc), Some(ac)) if pc == ac => {
184 pi.next();
185 ai.next();
186 }
187 _ => return false,
188 }
189 }
190}
191
192fn dot_stuff(body: &[u8]) -> Vec<u8> {
193 let mut out = Vec::with_capacity(body.len());
194 let mut at_line_start = true;
195 for &b in body {
196 if at_line_start && b == b'.' {
197 out.push(b'.');
198 }
199 out.push(b);
200 at_line_start = b == b'\n';
201 }
202 out
203}