use crate::net::NetStream;
use std::io::{BufRead, BufReader, Read, Write};
use crate::error::{Error, Result};
use crate::tls::{connect_over, TlsStream};
use crate::url::Url;
const MAX_RESPONSE_BYTES: usize = 64 * 1024 * 1024;
pub fn fetch(url: &Url) -> Result<Vec<u8>> {
fetch_with(url, &crate::net::NetConfig::default())
}
pub(crate) fn fetch_with(url: &Url, cfg: &crate::net::NetConfig) -> Result<Vec<u8>> {
let userinfo = url
.userinfo
.as_deref()
.ok_or_else(|| Error::BadResponse("pop3: missing userinfo".into()))?;
let (user, pass) = split_userinfo(userinfo);
let action = parse_path(&url.path)
.ok_or_else(|| Error::InvalidUrl(format!("pop3 path: {}", url.path)))?;
let tcp = cfg.connect(&url.host, url.port)?;
if url.is_tls() {
let tls = connect_over(tcp, &url.host)?;
let mut session = Session::new(BufReader::new(IoAdapter::Tls(Box::new(tls))));
session.read_status()?; run(&mut session, user, pass, action)
} else {
let mut io = BufReader::new(IoAdapter::Plain(tcp));
read_status_buf(&mut io)?; let upgraded = try_stls(&mut io, &url.host)?;
if cfg.require_tls && !upgraded {
return Err(Error::BadResponse(
"pop3: TLS required (--ssl-reqd) but server did not offer STLS".into(),
));
}
let mut session = Session::new(io);
run(&mut session, user, pass, action)
}
}
fn read_status_buf<R: Read + Write>(io: &mut BufReader<R>) -> Result<String> {
let mut buf = Vec::new();
let n = io.read_until(b'\n', &mut buf)?;
if n == 0 {
return Err(Error::UnexpectedEof);
}
while matches!(buf.last(), Some(b'\n') | Some(b'\r')) {
buf.pop();
}
let line = String::from_utf8(buf)
.map_err(|_| Error::BadResponse("pop3: non-UTF8 status line".into()))?;
if let Some(rest) = line.strip_prefix("+OK") {
Ok(rest.strip_prefix(' ').unwrap_or(rest).to_string())
} else if let Some(rest) = line.strip_prefix("-ERR") {
let text = rest.strip_prefix(' ').unwrap_or(rest);
Err(Error::BadResponse(format!("pop3: {text}")))
} else {
Err(Error::BadResponse(format!(
"pop3: unexpected status line: {line}"
)))
}
}
fn try_stls(io: &mut BufReader<IoAdapter>, host: &str) -> Result<bool> {
if !stls_negotiate(io)? {
return Ok(false);
}
let plain = match std::mem::replace(io.get_mut(), IoAdapter::Poisoned) {
IoAdapter::Plain(s) => s,
other => {
*io.get_mut() = other;
return Err(Error::BadResponse(
"pop3: STLS on non-plaintext connection".into(),
));
}
};
let tls = connect_over(plain, host)?;
*io.get_mut() = IoAdapter::Tls(Box::new(tls));
Ok(true)
}
fn stls_negotiate<R: Read + Write>(io: &mut BufReader<R>) -> Result<bool> {
{
let inner = io.get_mut();
inner.write_all(b"STLS\r\n")?;
inner.flush()?;
}
match read_status_buf(io) {
Ok(_) => {}
Err(Error::BadResponse(_)) => return Ok(false),
Err(e) => return Err(e),
}
if !io.buffer().is_empty() {
return Err(Error::BadResponse(
"pop3: server sent data after STLS before TLS (injection)".into(),
));
}
Ok(true)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Action {
List,
Retr(u32),
}
fn split_userinfo(s: &str) -> (&str, &str) {
match s.find(':') {
Some(i) => (&s[..i], &s[i + 1..]),
None => (s, ""),
}
}
fn reject_ctl(s: &str, what: &str) -> Result<()> {
if let Some(b) = s.bytes().find(|b| *b < 0x20 || *b == 0x7f) {
return Err(Error::BadResponse(format!(
"pop3: {what} contains illegal control byte {b:#04x}"
)));
}
Ok(())
}
fn parse_path(path: &str) -> Option<Action> {
let trimmed = path.strip_prefix('/').unwrap_or(path);
if trimmed.is_empty() {
return Some(Action::List);
}
if trimmed.contains('/') || trimmed.contains('?') || trimmed.contains('#') {
return None;
}
trimmed.parse::<u32>().ok().map(Action::Retr)
}
fn un_dot_stuff(body: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(body.len());
let mut i = 0;
let mut at_line_start = true;
while i < body.len() {
if at_line_start && body[i] == b'.' {
i += 1;
at_line_start = false;
continue;
}
let b = body[i];
out.push(b);
i += 1;
at_line_start = b == b'\n';
}
out
}
enum IoAdapter {
Plain(Box<dyn NetStream>),
Tls(Box<TlsStream<Box<dyn NetStream>>>),
Poisoned,
}
impl Read for IoAdapter {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
IoAdapter::Plain(s) => s.read(buf),
IoAdapter::Tls(s) => s.read(buf),
IoAdapter::Poisoned => Err(std::io::Error::other("pop3: stream poisoned")),
}
}
}
impl Write for IoAdapter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match self {
IoAdapter::Plain(s) => s.write(buf),
IoAdapter::Tls(s) => s.write(buf),
IoAdapter::Poisoned => Err(std::io::Error::other("pop3: stream poisoned")),
}
}
fn flush(&mut self) -> std::io::Result<()> {
match self {
IoAdapter::Plain(s) => s.flush(),
IoAdapter::Tls(s) => s.flush(),
IoAdapter::Poisoned => Err(std::io::Error::other("pop3: stream poisoned")),
}
}
}
struct Session<R: Read + Write> {
io: BufReader<R>,
}
impl<R: Read + Write> Session<R> {
fn new(io: BufReader<R>) -> Self {
Self { io }
}
fn send(&mut self, cmd: &str) -> Result<()> {
if cmd.bytes().any(|b| b == b'\r' || b == b'\n' || b == 0) {
return Err(Error::BadResponse(
"pop3: refusing to send command line with embedded CR/LF/NUL".into(),
));
}
let inner = self.io.get_mut();
inner.write_all(cmd.as_bytes())?;
inner.write_all(b"\r\n")?;
inner.flush()?;
Ok(())
}
fn read_line(&mut self) -> Result<String> {
let mut buf = Vec::new();
let n = self.io.read_until(b'\n', &mut buf)?;
if n == 0 {
return Err(Error::UnexpectedEof);
}
while matches!(buf.last(), Some(b'\n') | Some(b'\r')) {
buf.pop();
}
String::from_utf8(buf).map_err(|_| Error::BadResponse("pop3: non-UTF8 status line".into()))
}
fn read_status(&mut self) -> Result<String> {
let line = self.read_line()?;
if let Some(rest) = line.strip_prefix("+OK") {
Ok(rest.strip_prefix(' ').unwrap_or(rest).to_string())
} else if let Some(rest) = line.strip_prefix("-ERR") {
let text = rest.strip_prefix(' ').unwrap_or(rest);
Err(Error::BadResponse(format!("pop3: {text}")))
} else {
Err(Error::BadResponse(format!(
"pop3: unexpected status line: {line}"
)))
}
}
fn read_multiline(&mut self) -> Result<Vec<u8>> {
let mut out = Vec::new();
loop {
let mut line = Vec::new();
let line_cap = MAX_RESPONSE_BYTES - out.len();
let n = (&mut self.io)
.take(line_cap as u64 + 1)
.read_until(b'\n', &mut line)?;
if n == 0 {
return Err(Error::UnexpectedEof);
}
let is_terminator = matches!(line.as_slice(), b".\r\n" | b".\n");
if is_terminator {
return Ok(out);
}
if line.len() > line_cap {
return Err(Error::BadResponse(format!(
"pop3: response exceeds maximum {MAX_RESPONSE_BYTES} bytes"
)));
}
out.extend_from_slice(&line);
}
}
}
fn run<R: Read + Write>(
session: &mut Session<R>,
user: &str,
pass: &str,
action: Action,
) -> Result<Vec<u8>> {
reject_ctl(user, "pop3 user")?;
reject_ctl(pass, "pop3 password")?;
session.send(&format!("USER {user}"))?;
session.read_status()?;
session.send(&format!("PASS {pass}"))?;
session.read_status()?;
let payload = match action {
Action::List => {
session.send("LIST")?;
session.read_status()?;
session.read_multiline()?
}
Action::Retr(n) => {
session.send(&format!("RETR {n}"))?;
session.read_status()?;
let raw = session.read_multiline()?;
un_dot_stuff(&raw)
}
};
let _ = session.send("QUIT");
let _ = session.read_status();
Ok(payload)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn un_dot_stuff_strips_leading_dot_on_each_line() {
let input = b"hello\r\n..dotted\r\n.line\r\nplain\r\n";
let got = un_dot_stuff(input);
assert_eq!(got, b"hello\r\n.dotted\r\nline\r\nplain\r\n");
}
#[test]
fn un_dot_stuff_handles_empty_body() {
assert_eq!(un_dot_stuff(b""), b"");
}
#[test]
fn un_dot_stuff_handles_first_line_dot() {
let input = b"..first\r\nbody\r\n";
assert_eq!(un_dot_stuff(input), b".first\r\nbody\r\n");
}
#[test]
fn un_dot_stuff_does_not_consume_dot_in_middle() {
let input = b"a.b\r\n.x\r\n";
assert_eq!(un_dot_stuff(input), b"a.b\r\nx\r\n");
}
#[test]
fn un_dot_stuff_handles_trailing_partial_line() {
let input = b".end";
assert_eq!(un_dot_stuff(input), b"end");
}
#[test]
fn parse_path_root_means_list() {
assert_eq!(parse_path("/"), Some(Action::List));
assert_eq!(parse_path(""), Some(Action::List));
}
#[test]
fn parse_path_numeric_means_retr() {
assert_eq!(parse_path("/1"), Some(Action::Retr(1)));
assert_eq!(parse_path("/42"), Some(Action::Retr(42)));
assert_eq!(parse_path("/0"), Some(Action::Retr(0)));
}
#[test]
fn parse_path_rejects_garbage() {
assert_eq!(parse_path("/abc"), None);
assert_eq!(parse_path("/1/2"), None);
assert_eq!(parse_path("/1?x=1"), None);
assert_eq!(parse_path("/-1"), None);
assert_eq!(parse_path("/1.0"), None);
}
#[test]
fn split_userinfo_splits_on_first_colon() {
assert_eq!(split_userinfo("alice:secret"), ("alice", "secret"));
assert_eq!(split_userinfo("alice"), ("alice", ""));
assert_eq!(split_userinfo("alice:s:e:c"), ("alice", "s:e:c"));
assert_eq!(split_userinfo(":pass"), ("", "pass"));
}
#[test]
fn reject_ctl_flags_control_bytes() {
assert!(reject_ctl("alice", "pop3 user").is_ok());
assert!(reject_ctl("p@ss:word", "pop3 password").is_ok());
assert!(reject_ctl("alice\r\nDELE 1", "pop3 user").is_err());
assert!(reject_ctl("alice\npass", "pop3 user").is_err());
assert!(reject_ctl("alice\0", "pop3 user").is_err());
assert!(reject_ctl("alice\x7f", "pop3 user").is_err());
}
struct MockIo {
written: Vec<u8>,
}
impl Read for MockIo {
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
Ok(0)
}
}
impl Write for MockIo {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.written.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
struct ReplayIo {
data: std::io::Cursor<Vec<u8>>,
}
impl Read for ReplayIo {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.data.read(buf)
}
}
impl Write for ReplayIo {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
fn session_replaying(data: Vec<u8>) -> Session<ReplayIo> {
Session::new(BufReader::new(ReplayIo {
data: std::io::Cursor::new(data),
}))
}
#[test]
fn read_multiline_returns_body_on_terminator() {
let mut s = session_replaying(b"line one\r\nline two\r\n.\r\n".to_vec());
let body = s.read_multiline().unwrap();
assert_eq!(body, b"line one\r\nline two\r\n");
}
#[test]
fn read_multiline_aborts_past_aggregate_cap() {
let line = {
let mut l = vec![b'a'; 1022];
l.extend_from_slice(b"\r\n");
l
};
let n_lines = MAX_RESPONSE_BYTES / line.len() + 2;
let mut data = Vec::with_capacity(n_lines * line.len());
for _ in 0..n_lines {
data.extend_from_slice(&line);
}
let mut s = session_replaying(data);
match s.read_multiline() {
Err(Error::BadResponse(m)) => assert!(m.contains("maximum"), "got {m}"),
other => panic!("expected BadResponse(maximum), got {other:?}"),
}
}
#[test]
fn read_multiline_aborts_on_unbounded_single_line() {
let data = vec![b'x'; MAX_RESPONSE_BYTES + 1024];
let mut s = session_replaying(data);
match s.read_multiline() {
Err(Error::BadResponse(m)) => assert!(m.contains("maximum"), "got {m}"),
other => panic!("expected BadResponse(maximum), got {other:?}"),
}
}
#[test]
fn send_rejects_embedded_crlf() {
let mut s = Session::new(BufReader::new(MockIo {
written: Vec::new(),
}));
assert!(matches!(
s.send("USER alice\r\nPASS x"),
Err(Error::BadResponse(_))
));
assert!(matches!(s.send("USER a\nb"), Err(Error::BadResponse(_))));
assert!(matches!(s.send("USER a\0b"), Err(Error::BadResponse(_))));
s.send("USER alice").unwrap();
assert_eq!(s.io.get_ref().written, b"USER alice\r\n");
}
struct DuplexIo {
to_read: std::io::Cursor<Vec<u8>>,
written: Vec<u8>,
}
impl DuplexIo {
fn new(script: &[u8]) -> Self {
Self {
to_read: std::io::Cursor::new(script.to_vec()),
written: Vec::new(),
}
}
}
impl Read for DuplexIo {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.to_read.read(buf)
}
}
impl Write for DuplexIo {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.written.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[test]
fn stls_negotiate_issues_command_and_accepts_ok() {
let mut io = BufReader::new(DuplexIo::new(b"+OK begin TLS\r\n"));
let upgrade = stls_negotiate(&mut io).expect("stls negotiate");
assert!(upgrade, "expected upgrade decision on +OK");
assert_eq!(io.get_ref().written, b"STLS\r\n");
}
#[test]
fn stls_negotiate_declines_on_err() {
let mut io = BufReader::new(DuplexIo::new(b"-ERR unknown command\r\n"));
let upgrade = stls_negotiate(&mut io).expect("stls negotiate");
assert!(!upgrade, "expected no upgrade on -ERR");
assert_eq!(io.get_ref().written, b"STLS\r\n");
}
#[test]
fn stls_negotiate_rejects_pipelined_plaintext_injection() {
let mut io = BufReader::new(DuplexIo::new(b"+OK begin TLS\r\n+OK 1 messages\r\n.\r\n"));
match stls_negotiate(&mut io) {
Err(Error::BadResponse(m)) => assert!(m.contains("injection"), "got {m}"),
other => panic!("expected BadResponse(injection), got {other:?}"),
}
}
#[test]
fn stls_negotiate_clear_when_server_waits() {
let mut io = BufReader::new(DuplexIo::new(b"+OK begin TLS\r\n"));
assert!(stls_negotiate(&mut io).expect("ok"));
assert!(io.buffer().is_empty(), "reader must be clear after +OK");
}
#[test]
fn require_tls_errors_before_credentials_when_no_stls() {
let greeting_and_stls = b"+OK POP3 ready\r\n-ERR unknown command\r\n";
let mut io = BufReader::new(DuplexIo::new(greeting_and_stls));
read_status_buf(&mut io).expect("greeting");
let upgraded = stls_negotiate(&mut io).expect("stls");
assert!(!upgraded);
let err: Result<()> = if !upgraded {
Err(Error::BadResponse(
"pop3: TLS required (--ssl-reqd) but server did not offer STLS".into(),
))
} else {
Ok(())
};
match err {
Err(Error::BadResponse(m)) => assert!(m.contains("TLS required"), "got {m}"),
other => panic!("expected TLS required error, got {other:?}"),
}
assert_eq!(io.get_ref().written, b"STLS\r\n");
}
}