1use std::{
12 net::{IpAddr, SocketAddr},
13 time::Duration,
14};
15
16use smtp_proto::{response::parser::ResponseReceiver, Response};
17use tokio::{
18 io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
19 net::{TcpSocket, TcpStream},
20};
21
22use crate::SmtpClient;
23
24impl<T: AsyncRead + AsyncWrite + Unpin> SmtpClient<T> {
25 pub async fn read(&mut self) -> crate::Result<Response<String>> {
26 let mut buf = vec![0u8; 1024];
27 let mut parser = ResponseReceiver::default();
28
29 loop {
30 let br = self.stream.read(&mut buf).await?;
31
32 if br > 0 {
33 match parser.parse(&mut buf[..br].iter()) {
34 Ok(reply) => return Ok(reply),
35 Err(err) => match err {
36 smtp_proto::Error::NeedsMoreData { .. } => (),
37 _ => {
38 return Err(crate::Error::UnparseableReply);
39 }
40 },
41 }
42 } else {
43 return Err(crate::Error::UnparseableReply);
44 }
45 }
46 }
47
48 pub async fn read_many(&mut self, num: usize) -> crate::Result<Vec<Response<String>>> {
49 let mut buf = vec![0u8; 1024];
50 let mut response = Vec::with_capacity(num);
51 let mut parser = ResponseReceiver::default();
52
53 'outer: loop {
54 let br = self.stream.read(&mut buf).await?;
55
56 if br > 0 {
57 let mut iter = buf[..br].iter();
58
59 loop {
60 match parser.parse(&mut iter) {
61 Ok(reply) => {
62 response.push(reply);
63 if response.len() != num {
64 parser.reset();
65 } else {
66 break 'outer;
67 }
68 }
69 Err(err) => match err {
70 smtp_proto::Error::NeedsMoreData { .. } => break,
71 _ => {
72 return Err(crate::Error::UnparseableReply);
73 }
74 },
75 }
76 }
77 } else {
78 return Err(crate::Error::UnparseableReply);
79 }
80 }
81
82 Ok(response)
83 }
84
85 pub async fn cmd(&mut self, cmd: impl AsRef<[u8]>) -> crate::Result<Response<String>> {
87 tokio::time::timeout(self.timeout, async {
88 self.stream.write_all(cmd.as_ref()).await?;
89 self.stream.flush().await?;
90 self.read().await
91 })
92 .await
93 .map_err(|_| crate::Error::Timeout)?
94 }
95
96 pub async fn cmds(
98 &mut self,
99 cmds: impl IntoIterator<Item = impl AsRef<[u8]>>,
100 ) -> crate::Result<Vec<Response<String>>> {
101 tokio::time::timeout(self.timeout, async {
102 let mut num_replies = 0;
103 for cmd in cmds {
104 self.stream.write_all(cmd.as_ref()).await?;
105 num_replies += 1;
106 }
107 self.stream.flush().await?;
108 self.read_many(num_replies).await
109 })
110 .await
111 .map_err(|_| crate::Error::Timeout)?
112 }
113}
114
115impl SmtpClient<TcpStream> {
116 pub async fn connect(remote_addr: SocketAddr, timeout: Duration) -> crate::Result<Self> {
118 tokio::time::timeout(timeout, async {
119 Ok(SmtpClient {
120 stream: TcpStream::connect(remote_addr).await?,
121 timeout,
122 })
123 })
124 .await
125 .map_err(|_| crate::Error::Timeout)?
126 }
127
128 pub async fn connect_using(
130 local_ip: IpAddr,
131 remote_addr: SocketAddr,
132 timeout: Duration,
133 ) -> crate::Result<Self> {
134 tokio::time::timeout(timeout, async {
135 let socket = if local_ip.is_ipv4() {
136 TcpSocket::new_v4()?
137 } else {
138 TcpSocket::new_v6()?
139 };
140 socket.bind(SocketAddr::new(local_ip, 0))?;
141
142 Ok(SmtpClient {
143 stream: socket.connect(remote_addr).await?,
144 timeout,
145 })
146 })
147 .await
148 .map_err(|_| crate::Error::Timeout)?
149 }
150}
151
152#[cfg(test)]
153mod test {
154 use std::time::Duration;
155
156 use tokio::io::{AsyncRead, AsyncWrite};
157
158 use crate::{SmtpClient, SmtpClientBuilder};
159
160 #[tokio::test]
161 async fn smtp_basic() {
162 env_logger::init();
164 let client = SmtpClientBuilder::new("mail.smtp2go.com", 2525)
165 .implicit_tls(false)
166 .connect()
167 .await
168 .unwrap();
169 client.quit().await.unwrap();
170 let client = SmtpClientBuilder::new("mail.smtp2go.com", 2525)
171 .allow_invalid_certs()
172 .implicit_tls(false)
173 .connect()
174 .await
175 .unwrap();
176 client.quit().await.unwrap();
177
178 let client = SmtpClientBuilder::new("smtp.gmail.com", 465)
180 .connect()
181 .await
182 .unwrap();
183 client.quit().await.unwrap();
184
185 let client = SmtpClientBuilder::new("smtp.gmail.com", 465)
187 .allow_invalid_certs()
188 .connect()
189 .await
190 .unwrap();
191 client.quit().await.unwrap();
192 }
193
194 #[derive(Default)]
195 struct AsyncBufWriter {
196 buf: Vec<u8>,
197 }
198
199 impl AsyncRead for AsyncBufWriter {
200 fn poll_read(
201 self: std::pin::Pin<&mut Self>,
202 _cx: &mut std::task::Context<'_>,
203 _buf: &mut tokio::io::ReadBuf<'_>,
204 ) -> std::task::Poll<std::io::Result<()>> {
205 unreachable!()
206 }
207 }
208
209 impl AsyncWrite for AsyncBufWriter {
210 fn poll_write(
211 mut self: std::pin::Pin<&mut Self>,
212 _cx: &mut std::task::Context<'_>,
213 buf: &[u8],
214 ) -> std::task::Poll<Result<usize, std::io::Error>> {
215 self.buf.extend_from_slice(buf);
216 std::task::Poll::Ready(Ok(buf.len()))
217 }
218
219 fn poll_flush(
220 self: std::pin::Pin<&mut Self>,
221 _cx: &mut std::task::Context<'_>,
222 ) -> std::task::Poll<Result<(), std::io::Error>> {
223 std::task::Poll::Ready(Ok(()))
224 }
225
226 fn poll_shutdown(
227 self: std::pin::Pin<&mut Self>,
228 _cx: &mut std::task::Context<'_>,
229 ) -> std::task::Poll<Result<(), std::io::Error>> {
230 std::task::Poll::Ready(Ok(()))
231 }
232 }
233
234 #[tokio::test]
235 async fn transparency_procedure() {
236 const SMUGGLER: &str = r#"From: Joe SixPack <john@foobar.net>
237To: Suzie Q <suzie@foobar.org>
238Subject: Is dinner ready?
239
240Hi.
241
242We lost the game. Are you hungry yet?
243
244Joe.
245
246<SEP>.
247MAIL FROM:<admin@foobar.net>
248RCPT TO:<ok@foobar.org>
249DATA
250From: Joe SixPack <admin@foobar.net>
251To: Suzie Q <suzie@foobar.org>
252Subject: smuggled message
253
254This is a smuggled message
255"#;
256
257 for (test, result) in [
258 (
259 "A: b\r\n.\r\n".to_string(),
260 "A: b\r\n..\r\n\r\n.\r\n".to_string(),
261 ),
262 ("A: b\r\n.".to_string(), "A: b\r\n..\r\n.\r\n".to_string()),
263 (
264 "A: b\r\n..\r\n".to_string(),
265 "A: b\r\n...\r\n\r\n.\r\n".to_string(),
266 ),
267 ("A: ...b".to_string(), "A: ...b\r\n.\r\n".to_string()),
268 (
269 "A: \n.\r\nMAIL FROM:<>".to_string(),
270 "A: \n..\r\nMAIL FROM:<>\r\n.\r\n".to_string(),
271 ),
272 (
273 "A: \r.\r\nMAIL FROM:<>".to_string(),
274 "A: \r..\r\nMAIL FROM:<>\r\n.\r\n".to_string(),
275 ),
276 (
277 SMUGGLER
278 .replace('\r', "")
279 .replace('\n', "\r\n")
280 .replace("<SEP>", "\r"),
281 SMUGGLER
282 .replace('\r', "")
283 .replace('\n', "\r\n")
284 .replace("<SEP>", "\r.")
285 + "\r\n.\r\n",
286 ),
287 (
288 SMUGGLER
289 .replace('\r', "")
290 .replace('\n', "\r\n")
291 .replace("<SEP>", "\n"),
292 SMUGGLER
293 .replace('\r', "")
294 .replace('\n', "\r\n")
295 .replace("<SEP>", "\n.")
296 + "\r\n.\r\n",
297 ),
298 ] {
299 let mut client = SmtpClient {
300 stream: AsyncBufWriter::default(),
301 timeout: Duration::from_secs(30),
302 };
303 client.write_message(test.as_bytes()).await.unwrap();
304 assert_eq!(String::from_utf8(client.stream.buf).unwrap(), result);
305 }
306 }
307}