use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{
AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, ReadBuf,
};
use tokio::io::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use crate::error::Error;
use crate::protocol::http::response::Response;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FtpSslMode {
None,
Explicit,
Implicit,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum UseSsl {
#[default]
None,
Try,
All,
}
#[allow(clippy::large_enum_variant)]
pub(crate) enum FtpStream {
Plain(TcpStream),
#[cfg(feature = "rustls")]
Tls(tokio_rustls::client::TlsStream<TcpStream>),
}
impl AsyncRead for FtpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match self.get_mut() {
Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
#[cfg(feature = "rustls")]
Self::Tls(s) => Pin::new(s).poll_read(cx, buf),
}
}
}
impl AsyncWrite for FtpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
match self.get_mut() {
Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
#[cfg(feature = "rustls")]
Self::Tls(s) => Pin::new(s).poll_write(cx, buf),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match self.get_mut() {
Self::Plain(s) => Pin::new(s).poll_flush(cx),
#[cfg(feature = "rustls")]
Self::Tls(s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match self.get_mut() {
Self::Plain(s) => Pin::new(s).poll_shutdown(cx),
#[cfg(feature = "rustls")]
Self::Tls(s) => Pin::new(s).poll_shutdown(cx),
}
}
}
#[derive(Debug, Clone)]
pub struct FtpResponse {
pub code: u16,
pub message: String,
pub raw_bytes: Vec<u8>,
}
impl FtpResponse {
#[must_use]
pub const fn is_preliminary(&self) -> bool {
self.code >= 100 && self.code < 200
}
#[must_use]
pub const fn is_complete(&self) -> bool {
self.code >= 200 && self.code < 300
}
#[must_use]
pub const fn is_intermediate(&self) -> bool {
self.code >= 300 && self.code < 400
}
#[must_use]
pub const fn is_negative_transient(&self) -> bool {
self.code >= 400 && self.code < 500
}
#[must_use]
pub const fn is_negative_permanent(&self) -> bool {
self.code >= 500 && self.code < 600
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum FtpMethod {
#[default]
MultiCwd,
SingleCwd,
NoCwd,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferType {
Ascii,
Binary,
}
#[derive(Debug, Clone, Default)]
#[allow(clippy::struct_excessive_bools)]
pub struct FtpFeatures {
pub epsv: bool,
pub mlst: bool,
pub rest_stream: bool,
pub size: bool,
pub utf8: bool,
pub auth_tls: bool,
pub raw: Vec<String>,
}
#[derive(Debug, Clone)]
pub enum FtpProxyConfig {
Socks4 {
host: String,
port: u16,
user_id: String,
socks4a: bool,
},
Socks5 {
host: String,
port: u16,
auth: Option<(String, String)>,
},
HttpConnect {
host: String,
port: u16,
user_agent: String,
},
}
async fn connect_via_proxy(
proxy: &FtpProxyConfig,
target_host: &str,
target_port: u16,
) -> Result<TcpStream, Error> {
let (proxy_host, proxy_port) = match proxy {
FtpProxyConfig::Socks4 { host, port, .. }
| FtpProxyConfig::Socks5 { host, port, .. }
| FtpProxyConfig::HttpConnect { host, port, .. } => (host.as_str(), *port),
};
let proxy_addr = format!("{proxy_host}:{proxy_port}");
let tcp = TcpStream::connect(&proxy_addr).await.map_err(Error::Connect)?;
match proxy {
FtpProxyConfig::Socks5 { auth, .. } => {
let auth_ref = auth.as_ref().map(|(u, p)| (u.as_str(), p.as_str()));
crate::proxy::socks::connect_socks5(tcp, target_host, target_port, auth_ref).await
}
FtpProxyConfig::Socks4 { user_id, .. } => {
crate::proxy::socks::connect_socks4(tcp, target_host, target_port, user_id).await
}
FtpProxyConfig::HttpConnect { user_agent, .. } => {
connect_http_tunnel(tcp, target_host, target_port, user_agent).await
}
}
}
async fn connect_http_tunnel(
mut stream: TcpStream,
target_host: &str,
target_port: u16,
user_agent: &str,
) -> Result<TcpStream, Error> {
use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _};
let request = format!(
"CONNECT {target_host}:{target_port} HTTP/1.1\r\n\
Host: {target_host}:{target_port}\r\n\
User-Agent: {user_agent}\r\n\
Proxy-Connection: Keep-Alive\r\n\
\r\n"
);
stream.write_all(request.as_bytes()).await.map_err(Error::Io)?;
stream.flush().await.map_err(Error::Io)?;
let mut buf_reader = tokio::io::BufReader::new(&mut stream);
let mut status_line = String::new();
let _ = buf_reader.read_line(&mut status_line).await.map_err(Error::Io)?;
let status_code =
status_line.split_whitespace().nth(1).and_then(|s| s.parse::<u16>().ok()).unwrap_or(0);
loop {
let mut line = String::new();
let _ = buf_reader.read_line(&mut line).await.map_err(Error::Io)?;
if line.trim().is_empty() {
break;
}
}
if status_code == 200 {
Ok(stream)
} else {
Err(Error::Transfer {
code: 56,
message: format!(
"CONNECT tunnel to {target_host}:{target_port} failed with status {status_code}"
),
})
}
}
#[derive(Debug, Clone)]
#[allow(clippy::struct_excessive_bools)] pub struct FtpConfig {
pub use_epsv: bool,
pub use_eprt: bool,
pub skip_pasv_ip: bool,
pub account: Option<String>,
pub create_dirs: bool,
pub method: FtpMethod,
pub active_port: Option<String>,
pub use_ascii: bool,
pub append: bool,
pub crlf: bool,
pub list_only: bool,
pub nobody: bool,
pub pre_quote: Vec<String>,
pub post_pasv_quote: Vec<String>,
pub post_quote: Vec<String>,
pub time_condition: Option<(i64, bool)>,
pub range_end: Option<u64>,
pub range_from_end: Option<u64>,
pub ignore_content_length: bool,
pub max_filesize: Option<u64>,
pub use_pret: bool,
pub ssl_control: bool,
pub ssl_ccc: bool,
pub alternative_to_user: Option<String>,
}
impl Default for FtpConfig {
fn default() -> Self {
Self {
use_epsv: true,
use_eprt: true,
skip_pasv_ip: false,
account: None,
create_dirs: false,
method: FtpMethod::default(),
active_port: None,
use_ascii: false,
append: false,
crlf: false,
list_only: false,
nobody: false,
pre_quote: Vec::new(),
post_pasv_quote: Vec::new(),
post_quote: Vec::new(),
time_condition: None,
range_end: None,
range_from_end: None,
ignore_content_length: false,
max_filesize: None,
use_pret: false,
ssl_control: false,
ssl_ccc: false,
alternative_to_user: None,
}
}
}
#[allow(clippy::large_enum_variant)]
pub(crate) enum DataConnection {
Connected(FtpStream),
PendingActive {
listener: tokio::net::TcpListener,
use_tls: bool,
},
}
impl DataConnection {
async fn into_stream(
self,
session: &FtpSession,
timeout: Option<std::time::Duration>,
) -> Result<FtpStream, Error> {
match self {
Self::Connected(stream) => Ok(stream),
Self::PendingActive { listener, use_tls } => {
let accept_fut = listener.accept();
let (tcp, _) = if let Some(dur) = timeout {
tokio::time::timeout(dur, accept_fut).await.map_err(|_| Error::Transfer {
code: 10,
message: "FTP active mode accept timed out".to_string(),
})?
} else {
accept_fut.await
}
.map_err(|e| Error::Http(format!("FTP active mode accept failed: {e}")))?;
if use_tls {
session.maybe_wrap_data_tls(tcp).await
} else {
Ok(FtpStream::Plain(tcp))
}
}
}
}
}
pub struct FtpSession {
reader: BufReader<ReadHalf<FtpStream>>,
writer: WriteHalf<FtpStream>,
features: Option<FtpFeatures>,
hostname: String,
port: u16,
user: String,
local_addr: SocketAddr,
use_tls_data: bool,
active_port: Option<String>,
#[cfg(feature = "rustls")]
tls_connector: Option<crate::tls::TlsConnector>,
config: FtpConfig,
header_bytes: Vec<u8>,
current_dir: Vec<String>,
home_dir: Option<String>,
current_type: Option<TransferType>,
proxy_config: Option<FtpProxyConfig>,
connect_response_bytes: Vec<u8>,
}
impl FtpSession {
async fn read_and_record(&mut self) -> Result<FtpResponse, Error> {
let resp = read_response(&mut self.reader).await?;
self.header_bytes.extend_from_slice(&resp.raw_bytes);
Ok(resp)
}
pub async fn connect(
host: &str,
port: u16,
user: &str,
pass: &str,
config: FtpConfig,
) -> Result<Self, Error> {
Self::connect_maybe_proxy(host, port, user, pass, config, None).await
}
pub async fn connect_maybe_proxy(
host: &str,
port: u16,
user: &str,
pass: &str,
config: FtpConfig,
proxy: Option<FtpProxyConfig>,
) -> Result<Self, Error> {
let (tcp, connect_response_bytes) = if let Some(ref proxy_config) = proxy {
let stream = connect_via_proxy(proxy_config, host, port).await?;
let connect_bytes = if matches!(proxy_config, FtpProxyConfig::HttpConnect { .. }) {
b"HTTP/1.1 200 Connection established\r\n\r\n".to_vec()
} else {
Vec::new()
};
(stream, connect_bytes)
} else {
let addr = format!("{host}:{port}");
let stream = TcpStream::connect(&addr).await.map_err(Error::Connect)?;
(stream, Vec::new())
};
let local_addr = tcp.local_addr().map_err(Error::Connect)?;
let stream = FtpStream::Plain(tcp);
let (reader, writer) = tokio::io::split(stream);
let mut reader = BufReader::new(reader);
let greeting = read_response(&mut reader).await?;
if !greeting.is_complete() {
return Err(Error::Http(format!(
"FTP server rejected connection: {} {}",
greeting.code, greeting.message
)));
}
let skip_login = greeting.code == 230;
let active_port = config.active_port.clone();
let alt_to_user = config.alternative_to_user.clone();
let mut header_bytes = Vec::new();
header_bytes.extend_from_slice(&greeting.raw_bytes);
let mut session = Self {
reader,
writer,
features: None,
hostname: host.to_string(),
port,
user: user.to_string(),
local_addr,
use_tls_data: false,
active_port,
#[cfg(feature = "rustls")]
tls_connector: None,
config,
header_bytes,
current_dir: Vec::new(),
home_dir: None,
current_type: None,
proxy_config: proxy,
connect_response_bytes,
};
session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
if let Some(ref account) = session.config.account {
let acct_cmd = format!("ACCT {account}");
send_command(&mut session.writer, &acct_cmd).await?;
let acct_resp = session.read_and_record().await?;
if !acct_resp.is_complete() {
return Err(Error::Transfer {
code: 11,
message: format!("FTP ACCT failed: {} {}", acct_resp.code, acct_resp.message),
});
}
}
Ok(session)
}
#[cfg(feature = "rustls")]
#[allow(clippy::too_many_arguments)]
pub async fn connect_with_tls(
host: &str,
port: u16,
user: &str,
pass: &str,
ssl_mode: FtpSslMode,
use_ssl: UseSsl,
tls_config: &crate::tls::TlsConfig,
config: FtpConfig,
) -> Result<Self, Error> {
if ssl_mode == FtpSslMode::None {
return Self::connect(host, port, user, pass, config).await;
}
let tls_connector = crate::tls::TlsConnector::new_no_alpn(tls_config)?;
let addr = format!("{host}:{port}");
let tcp = TcpStream::connect(&addr).await.map_err(Error::Connect)?;
let local_addr = tcp.local_addr().map_err(Error::Connect)?;
let stream = match ssl_mode {
FtpSslMode::Implicit => {
let (tls_stream, _) = tls_connector.connect(tcp, host).await?;
FtpStream::Tls(tls_stream)
}
FtpSslMode::Explicit | FtpSslMode::None => {
FtpStream::Plain(tcp)
}
};
let (reader, writer) = tokio::io::split(stream);
let mut reader = BufReader::new(reader);
let greeting = read_response(&mut reader).await?;
if !greeting.is_complete() {
return Err(Error::Http(format!(
"FTP server rejected connection: {} {}",
greeting.code, greeting.message
)));
}
let skip_login = greeting.code == 230;
let active_port = config.active_port.clone();
let alt_to_user = config.alternative_to_user.clone();
let mut header_bytes = Vec::new();
header_bytes.extend_from_slice(&greeting.raw_bytes);
let mut session = Self {
reader,
writer,
features: None,
hostname: host.to_string(),
port,
user: user.to_string(),
local_addr,
use_tls_data: false,
active_port,
tls_connector: Some(tls_connector),
config,
header_bytes,
current_dir: Vec::new(),
home_dir: None,
current_type: None,
proxy_config: None,
connect_response_bytes: Vec::new(),
};
if ssl_mode == FtpSslMode::Explicit {
let (upgraded_session, auth_succeeded) =
session.auth_tls_with_fallback(use_ssl).await?;
session = upgraded_session;
if auth_succeeded {
session.setup_data_protection().await?;
}
session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
} else {
session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
session.setup_data_protection().await?;
}
if let Some(ref account) = session.config.account {
let acct_cmd = format!("ACCT {account}");
send_command(&mut session.writer, &acct_cmd).await?;
let acct_resp = session.read_and_record().await?;
if !acct_resp.is_complete() {
return Err(Error::Transfer {
code: 11,
message: format!("FTP ACCT failed: {} {}", acct_resp.code, acct_resp.message),
});
}
}
Ok(session)
}
#[cfg(feature = "rustls")]
async fn auth_tls_with_fallback(mut self, use_ssl: UseSsl) -> Result<(Self, bool), Error> {
send_command(&mut self.writer, "AUTH SSL").await?;
let resp = self.read_and_record().await?;
if resp.is_complete() {
return Ok((self.do_tls_upgrade().await?, true));
}
if use_ssl == UseSsl::All {
send_command(&mut self.writer, "AUTH TLS").await?;
let resp2 = self.read_and_record().await?;
if resp2.is_complete() {
return Ok((self.do_tls_upgrade().await?, true));
}
return Err(Error::Transfer {
code: 64,
message: "FTP AUTH SSL/TLS failed: server does not support TLS".to_string(),
});
}
if !self.reader.buffer().is_empty() {
return Err(Error::Protocol(8));
}
Ok((self, false))
}
#[cfg(feature = "rustls")]
async fn do_tls_upgrade(self) -> Result<Self, Error> {
let reader_inner = self.reader.into_inner();
let stream = reader_inner.unsplit(self.writer);
let tcp = match stream {
FtpStream::Plain(tcp) => tcp,
FtpStream::Tls(_) => {
return Err(Error::Http("AUTH TLS on already-encrypted connection".to_string()));
}
};
let connector = self
.tls_connector
.as_ref()
.ok_or_else(|| Error::Http("No TLS connector available for AUTH TLS".to_string()))?;
let (tls_stream, _) = connector.connect(tcp, &self.hostname).await?;
let ftp_stream = FtpStream::Tls(tls_stream);
let (reader, writer) = tokio::io::split(ftp_stream);
Ok(Self {
reader: BufReader::new(reader),
writer,
features: self.features,
hostname: self.hostname,
port: self.port,
user: self.user,
local_addr: self.local_addr,
use_tls_data: false,
active_port: self.active_port,
tls_connector: self.tls_connector,
config: self.config,
header_bytes: self.header_bytes,
current_dir: self.current_dir,
home_dir: self.home_dir,
current_type: self.current_type,
proxy_config: self.proxy_config,
connect_response_bytes: self.connect_response_bytes,
})
}
#[cfg(feature = "rustls")]
async fn setup_data_protection(&mut self) -> Result<(), Error> {
send_command(&mut self.writer, "PBSZ 0").await?;
let _pbsz_resp = self.read_and_record().await?;
let prot_cmd = if self.config.ssl_control { "PROT C" } else { "PROT P" };
send_command(&mut self.writer, prot_cmd).await?;
let prot_resp = self.read_and_record().await?;
if prot_resp.is_complete() {
self.use_tls_data = !self.config.ssl_control;
}
if self.config.ssl_ccc {
send_command(&mut self.writer, "CCC").await?;
let _ccc_resp = self.read_and_record().await?;
}
Ok(())
}
pub fn set_active_port(&mut self, addr: &str) {
self.active_port = Some(addr.to_string());
}
async fn login(
&mut self,
user: &str,
pass: &str,
skip_login: bool,
alternative_to_user: Option<&str>,
) -> Result<(), Error> {
if skip_login {
return Ok(());
}
send_command(&mut self.writer, &format!("USER {user}")).await?;
let user_resp = self.read_and_record().await?;
if user_resp.code == 331 {
send_command(&mut self.writer, &format!("PASS {pass}")).await?;
let pass_resp = self.read_and_record().await?;
if pass_resp.code == 332 {
if self.config.account.is_none() {
return Err(Error::Transfer {
code: 67,
message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
});
}
} else if !pass_resp.is_complete() {
return Err(Error::Transfer {
code: 67,
message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
});
}
} else if user_resp.is_complete() {
} else if alternative_to_user.is_some() {
let alt = alternative_to_user.unwrap_or_default();
send_command(&mut self.writer, alt).await?;
let alt_resp = self.read_and_record().await?;
if alt_resp.code == 331 {
send_command(&mut self.writer, &format!("PASS {pass}")).await?;
let pass_resp = self.read_and_record().await?;
if !pass_resp.is_complete() && pass_resp.code != 332 {
return Err(Error::Transfer {
code: 67,
message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
});
}
} else if !alt_resp.is_complete() {
return Err(Error::Transfer {
code: 67,
message: format!("Access denied: {} {}", alt_resp.code, alt_resp.message),
});
}
} else {
return Err(Error::Transfer {
code: 67,
message: format!("Access denied: {} {}", user_resp.code, user_resp.message),
});
}
Ok(())
}
async fn pwd_safe(&mut self) -> Option<String> {
if send_command(&mut self.writer, "PWD").await.is_err() {
return None;
}
match self.read_and_record().await {
Ok(resp) if resp.is_complete() => {
if let Some(start) = resp.message.find('"') {
if let Some(end) = resp.message[start + 1..].find('"') {
return Some(resp.message[start + 1..start + 1 + end].to_string());
}
return None;
}
Some(resp.message)
}
_ => None,
}
}
pub async fn feat(&mut self) -> Result<&FtpFeatures, Error> {
send_command(&mut self.writer, "FEAT").await?;
let resp = self.read_and_record().await?;
let features = if resp.is_complete() {
parse_feat_response(&resp.message)
} else {
FtpFeatures::default()
};
self.features = Some(features);
Ok(self.features.get_or_insert_with(FtpFeatures::default))
}
pub async fn set_type(&mut self, transfer_type: TransferType) -> Result<(), Error> {
let type_cmd = match transfer_type {
TransferType::Ascii => "TYPE A",
TransferType::Binary => "TYPE I",
};
send_command(&mut self.writer, type_cmd).await?;
let resp = self.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Http(format!("FTP TYPE failed: {} {}", resp.code, resp.message)));
}
Ok(())
}
async fn open_data_connection(&mut self) -> Result<DataConnection, Error> {
if let Some(ref addr) = self.active_port {
let addr = addr.clone();
self.open_active_data_connection(&addr).await
} else {
let stream = self.open_passive_data_connection(None).await?;
Ok(DataConnection::Connected(stream))
}
}
async fn open_data_connection_with_pret(
&mut self,
pret_cmd: &str,
) -> Result<DataConnection, Error> {
if let Some(ref addr) = self.active_port {
let addr = addr.clone();
self.open_active_data_connection(&addr).await
} else {
let pret = if self.config.use_pret { Some(pret_cmd) } else { None };
let stream = self.open_passive_data_connection(pret).await?;
Ok(DataConnection::Connected(stream))
}
}
async fn open_passive_data_connection(
&mut self,
pret_cmd: Option<&str>,
) -> Result<FtpStream, Error> {
if let Some(cmd) = pret_cmd {
send_command(&mut self.writer, &format!("PRET {cmd}")).await?;
let pret_resp = self.read_and_record().await?;
if pret_resp.code >= 400 {
return Err(Error::Transfer {
code: 84,
message: format!(
"PRET command not accepted: {} {}",
pret_resp.code, pret_resp.message
),
});
}
}
let force_epsv = self.local_addr.is_ipv6();
if self.config.use_epsv || force_epsv {
send_command(&mut self.writer, "EPSV").await?;
let epsv_resp = self.read_and_record().await?;
if epsv_resp.code == 229 {
match parse_epsv_response(&epsv_resp.message) {
Ok(data_port) => {
let tcp = self.connect_data(&self.hostname.clone(), data_port).await;
match tcp {
Ok(tcp) => {
if matches!(
self.proxy_config,
Some(FtpProxyConfig::HttpConnect { .. })
) {
self.connect_response_bytes.extend_from_slice(
b"HTTP/1.1 200 Connection established\r\n\r\n",
);
}
return self.maybe_wrap_data_tls(tcp).await;
}
Err(e) => {
if force_epsv {
return Err(Error::Http(format!(
"FTP EPSV data connection failed: {e}"
)));
}
self.config.use_epsv = false;
}
}
}
Err(e) => {
return Err(e);
}
}
} else if force_epsv {
return Err(Error::Transfer {
code: 13,
message: format!(
"FTP EPSV failed: {} {} (PASV not available for IPv6)",
epsv_resp.code, epsv_resp.message
),
});
} else {
self.config.use_epsv = false;
}
}
send_command(&mut self.writer, "PASV").await?;
let pasv_resp = self.read_and_record().await?;
if pasv_resp.code != 227 {
return Err(Error::Transfer {
code: 13,
message: format!("FTP PASV failed: {} {}", pasv_resp.code, pasv_resp.message),
});
}
let (data_host, data_port) = parse_pasv_response(&pasv_resp.message)?;
let effective_host =
if self.config.skip_pasv_ip { self.hostname.clone() } else { data_host };
let tcp = self
.connect_data(&effective_host, data_port)
.await
.map_err(|e| Error::Http(format!("FTP data connection failed: {e}")))?;
self.maybe_wrap_data_tls(tcp).await
}
async fn connect_data(&self, host: &str, port: u16) -> Result<TcpStream, Error> {
if let Some(ref proxy) = self.proxy_config {
connect_via_proxy(proxy, host, port).await
} else {
let data_addr = format!("{host}:{port}");
TcpStream::connect(&data_addr).await.map_err(Error::Connect)
}
}
async fn open_active_data_connection(
&mut self,
bind_addr: &str,
) -> Result<DataConnection, Error> {
let advertise_ip: std::net::IpAddr = if bind_addr == "-" {
self.local_addr.ip()
} else {
bind_addr.parse().map_err(|e| {
Error::Http(format!("Invalid FTP active address '{bind_addr}': {e}"))
})?
};
let bind_ip = if bind_addr == "-" {
self.local_addr.ip()
} else if advertise_ip.is_ipv6() {
std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED)
} else {
std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)
};
let bind = SocketAddr::new(bind_ip, 0);
let listener = tokio::net::TcpListener::bind(bind)
.await
.map_err(|e| Error::Http(format!("FTP active mode bind failed: {e}")))?;
let listen_addr = listener
.local_addr()
.map_err(|e| Error::Http(format!("FTP active mode local_addr failed: {e}")))?;
let advertise_addr = SocketAddr::new(advertise_ip, listen_addr.port());
let local_ip = advertise_ip;
let mut port_ok = false;
if local_ip.is_ipv6() || self.config.use_eprt {
let eprt_cmd = format_eprt_command(&advertise_addr);
send_command(&mut self.writer, &eprt_cmd).await?;
let resp = self.read_and_record().await?;
if resp.is_complete() {
port_ok = true;
} else if local_ip.is_ipv6() {
return Err(Error::Transfer {
code: 30,
message: format!("FTP EPRT failed: {} {}", resp.code, resp.message),
});
}
if !port_ok {
self.config.use_eprt = false;
}
}
if !port_ok && local_ip.is_ipv4() {
let port_cmd = format_port_command(&advertise_addr);
send_command(&mut self.writer, &port_cmd).await?;
let resp = self.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Transfer {
code: 30,
message: format!("FTP PORT failed: {} {}", resp.code, resp.message),
});
}
}
Ok(DataConnection::PendingActive { listener, use_tls: self.use_tls_data })
}
async fn maybe_wrap_data_tls(&self, tcp: TcpStream) -> Result<FtpStream, Error> {
#[cfg(feature = "rustls")]
if self.use_tls_data {
if let Some(ref connector) = self.tls_connector {
let (tls_stream, _) = connector.connect(tcp, &self.hostname).await?;
return Ok(FtpStream::Tls(tls_stream));
}
}
Ok(FtpStream::Plain(tcp))
}
pub async fn download(&mut self, path: &str) -> Result<Vec<u8>, Error> {
self.set_type(TransferType::Binary).await?;
let data_conn = self.open_data_connection().await?;
let mut data_stream = data_conn.into_stream(self, None).await?;
send_command(&mut self.writer, &format!("RETR {path}")).await?;
let retr_resp = self.read_and_record().await?;
if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
return Err(Error::Http(format!(
"FTP RETR failed: {} {}",
retr_resp.code, retr_resp.message
)));
}
let mut data = Vec::new();
let _ = data_stream
.read_to_end(&mut data)
.await
.map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
drop(data_stream);
let complete_resp = self.read_and_record().await?;
if !complete_resp.is_complete() {
return Err(Error::Http(format!(
"FTP transfer failed: {} {}",
complete_resp.code, complete_resp.message
)));
}
Ok(data)
}
pub async fn download_resume(&mut self, path: &str, offset: u64) -> Result<Vec<u8>, Error> {
self.set_type(TransferType::Binary).await?;
let data_conn = self.open_data_connection().await?;
let mut data_stream = data_conn.into_stream(self, None).await?;
send_command(&mut self.writer, &format!("REST {offset}")).await?;
let rest_resp = self.read_and_record().await?;
if !rest_resp.is_intermediate() {
return Err(Error::Http(format!(
"FTP REST failed: {} {}",
rest_resp.code, rest_resp.message
)));
}
send_command(&mut self.writer, &format!("RETR {path}")).await?;
let retr_resp = self.read_and_record().await?;
if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
return Err(Error::Http(format!(
"FTP RETR failed: {} {}",
retr_resp.code, retr_resp.message
)));
}
let mut data = Vec::new();
let _ = data_stream
.read_to_end(&mut data)
.await
.map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
drop(data_stream);
let complete_resp = self.read_and_record().await?;
if !complete_resp.is_complete() {
return Err(Error::Http(format!(
"FTP transfer failed: {} {}",
complete_resp.code, complete_resp.message
)));
}
Ok(data)
}
pub async fn upload(&mut self, path: &str, data: &[u8]) -> Result<(), Error> {
self.set_type(TransferType::Binary).await?;
let data_conn = self.open_data_connection().await?;
let mut data_stream = data_conn.into_stream(self, None).await?;
send_command(&mut self.writer, &format!("STOR {path}")).await?;
let stor_resp = self.read_and_record().await?;
if !stor_resp.is_preliminary() && !stor_resp.is_complete() {
return Err(Error::Http(format!(
"FTP STOR failed: {} {}",
stor_resp.code, stor_resp.message
)));
}
data_stream
.write_all(data)
.await
.map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
data_stream
.shutdown()
.await
.map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
drop(data_stream);
let complete_resp = self.read_and_record().await?;
if !complete_resp.is_complete() {
return Err(Error::Http(format!(
"FTP upload failed: {} {}",
complete_resp.code, complete_resp.message
)));
}
Ok(())
}
pub async fn append(&mut self, path: &str, data: &[u8]) -> Result<(), Error> {
self.set_type(TransferType::Binary).await?;
let data_conn = self.open_data_connection().await?;
let mut data_stream = data_conn.into_stream(self, None).await?;
send_command(&mut self.writer, &format!("APPE {path}")).await?;
let appe_resp = self.read_and_record().await?;
if !appe_resp.is_preliminary() && !appe_resp.is_complete() {
return Err(Error::Http(format!(
"FTP APPE failed: {} {}",
appe_resp.code, appe_resp.message
)));
}
data_stream
.write_all(data)
.await
.map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
data_stream
.shutdown()
.await
.map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
drop(data_stream);
let complete_resp = self.read_and_record().await?;
if !complete_resp.is_complete() {
return Err(Error::Http(format!(
"FTP append failed: {} {}",
complete_resp.code, complete_resp.message
)));
}
Ok(())
}
pub async fn list(&mut self, path: Option<&str>) -> Result<Vec<u8>, Error> {
if let Some(dir) = path {
if !dir.is_empty() && dir != "/" {
send_command(&mut self.writer, &format!("CWD {dir}")).await?;
let cwd_resp = self.read_and_record().await?;
if !cwd_resp.is_complete() {
return Err(Error::Http(format!(
"FTP CWD failed: {} {}",
cwd_resp.code, cwd_resp.message
)));
}
}
}
let data_conn = self.open_data_connection().await?;
let mut data_stream = data_conn.into_stream(self, None).await?;
send_command(&mut self.writer, "LIST").await?;
let list_resp = self.read_and_record().await?;
if !list_resp.is_preliminary() && !list_resp.is_complete() {
return Err(Error::Http(format!(
"FTP LIST failed: {} {}",
list_resp.code, list_resp.message
)));
}
let mut data = Vec::new();
let _ = data_stream
.read_to_end(&mut data)
.await
.map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
drop(data_stream);
let complete_resp = self.read_and_record().await?;
if !complete_resp.is_complete() {
return Err(Error::Http(format!(
"FTP transfer failed: {} {}",
complete_resp.code, complete_resp.message
)));
}
Ok(data)
}
pub async fn mlsd(&mut self, path: Option<&str>) -> Result<Vec<u8>, Error> {
let data_conn = self.open_data_connection().await?;
let mut data_stream = data_conn.into_stream(self, None).await?;
let cmd = path.map_or_else(|| "MLSD".to_string(), |p| format!("MLSD {p}"));
send_command(&mut self.writer, &cmd).await?;
let resp = self.read_and_record().await?;
if !resp.is_preliminary() && !resp.is_complete() {
return Err(Error::Http(format!("FTP MLSD failed: {} {}", resp.code, resp.message)));
}
let mut data = Vec::new();
let _ = data_stream
.read_to_end(&mut data)
.await
.map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
drop(data_stream);
let complete_resp = self.read_and_record().await?;
if !complete_resp.is_complete() {
return Err(Error::Http(format!(
"FTP MLSD transfer failed: {} {}",
complete_resp.code, complete_resp.message
)));
}
Ok(data)
}
pub async fn size(&mut self, path: &str) -> Result<u64, Error> {
send_command(&mut self.writer, &format!("SIZE {path}")).await?;
let resp = self.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Http(format!("FTP SIZE failed: {} {}", resp.code, resp.message)));
}
resp.message
.trim()
.parse::<u64>()
.map_err(|e| Error::Http(format!("FTP SIZE parse error: {e}")))
}
pub async fn mkdir(&mut self, path: &str) -> Result<(), Error> {
send_command(&mut self.writer, &format!("MKD {path}")).await?;
let resp = self.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Http(format!("FTP MKD failed: {} {}", resp.code, resp.message)));
}
Ok(())
}
pub async fn rmdir(&mut self, path: &str) -> Result<(), Error> {
send_command(&mut self.writer, &format!("RMD {path}")).await?;
let resp = self.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Http(format!("FTP RMD failed: {} {}", resp.code, resp.message)));
}
Ok(())
}
pub async fn delete(&mut self, path: &str) -> Result<(), Error> {
send_command(&mut self.writer, &format!("DELE {path}")).await?;
let resp = self.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Http(format!("FTP DELE failed: {} {}", resp.code, resp.message)));
}
Ok(())
}
pub async fn rename(&mut self, from: &str, to: &str) -> Result<(), Error> {
send_command(&mut self.writer, &format!("RNFR {from}")).await?;
let rnfr_resp = self.read_and_record().await?;
if !rnfr_resp.is_intermediate() {
return Err(Error::Http(format!(
"FTP RNFR failed: {} {}",
rnfr_resp.code, rnfr_resp.message
)));
}
send_command(&mut self.writer, &format!("RNTO {to}")).await?;
let rnto_resp = self.read_and_record().await?;
if !rnto_resp.is_complete() {
return Err(Error::Http(format!(
"FTP RNTO failed: {} {}",
rnto_resp.code, rnto_resp.message
)));
}
Ok(())
}
pub async fn site(&mut self, command: &str) -> Result<FtpResponse, Error> {
send_command(&mut self.writer, &format!("SITE {command}")).await?;
self.read_and_record().await
}
pub async fn pwd(&mut self) -> Result<String, Error> {
send_command(&mut self.writer, "PWD").await?;
let resp = self.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Http(format!("FTP PWD failed: {} {}", resp.code, resp.message)));
}
if let Some(start) = resp.message.find('"') {
if let Some(end) = resp.message[start + 1..].find('"') {
return Ok(resp.message[start + 1..start + 1 + end].to_string());
}
}
Ok(resp.message)
}
pub async fn cwd(&mut self, path: &str) -> Result<(), Error> {
send_command(&mut self.writer, &format!("CWD {path}")).await?;
let resp = self.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Http(format!("FTP CWD failed: {} {}", resp.code, resp.message)));
}
Ok(())
}
#[allow(dead_code)]
async fn navigate_to_path(&mut self, path: &str) -> Result<String, Error> {
match self.config.method {
FtpMethod::NoCwd => Ok(path.to_string()),
FtpMethod::SingleCwd => {
if let Some((dir, file)) = path.rsplit_once('/') {
if !dir.is_empty() {
self.cwd(dir).await?;
}
Ok(file.to_string())
} else {
Ok(path.to_string())
}
}
FtpMethod::MultiCwd => {
if let Some((dir, file)) = path.rsplit_once('/') {
for component in dir.split('/') {
if !component.is_empty() {
self.cwd(component).await?;
}
}
Ok(file.to_string())
} else {
Ok(path.to_string())
}
}
}
}
#[allow(dead_code)]
async fn create_dirs(&mut self, dir_path: &str) -> Result<(), Error> {
for component in dir_path.split('/') {
if component.is_empty() {
continue;
}
send_command(&mut self.writer, &format!("CWD {component}")).await?;
let cwd_resp = self.read_and_record().await?;
if cwd_resp.is_complete() {
continue;
}
send_command(&mut self.writer, &format!("MKD {component}")).await?;
let mkd_resp = self.read_and_record().await?;
if !mkd_resp.is_complete() {
return Err(Error::Http(format!(
"FTP MKD failed for '{}': {} {}",
component, mkd_resp.code, mkd_resp.message
)));
}
send_command(&mut self.writer, &format!("CWD {component}")).await?;
let retry_resp = self.read_and_record().await?;
if !retry_resp.is_complete() {
return Err(Error::Http(format!(
"FTP CWD failed after MKD for '{}': {} {}",
component, retry_resp.code, retry_resp.message
)));
}
}
let _ = self.cwd("/").await;
Ok(())
}
pub async fn quit(&mut self) -> Result<(), Error> {
let _ = send_command(&mut self.writer, "QUIT").await;
let _ = tokio::time::timeout(
std::time::Duration::from_millis(500),
read_response(&mut self.reader),
)
.await;
Ok(())
}
#[must_use]
pub fn can_reuse(&self, host: &str, port: u16, user: &str) -> bool {
self.hostname == host && self.port == port && self.user == user
}
pub fn take_connect_response_bytes(&mut self) -> Vec<u8> {
std::mem::take(&mut self.connect_response_bytes)
}
}
pub async fn read_response<S: AsyncRead + Unpin>(
stream: &mut BufReader<S>,
) -> Result<FtpResponse, Error> {
let mut full_message = String::new();
let mut final_code: Option<u16> = None;
let mut raw_bytes = Vec::new();
loop {
let mut line = String::new();
let bytes_read = stream
.read_line(&mut line)
.await
.map_err(|e| Error::Http(format!("FTP read error: {e}")))?;
if bytes_read == 0 {
return Err(Error::Http("FTP connection closed unexpectedly".to_string()));
}
if line.ends_with("\r\n") {
raw_bytes.extend_from_slice(line.as_bytes());
} else if line.ends_with('\n') {
raw_bytes.extend_from_slice(&line.as_bytes()[..line.len() - 1]);
raw_bytes.extend_from_slice(b"\r\n");
} else {
raw_bytes.extend_from_slice(line.as_bytes());
raw_bytes.extend_from_slice(b"\r\n");
}
let line = line.trim_end_matches('\n').trim_end_matches('\r');
if line.len() < 4 {
full_message.push_str(line);
full_message.push('\n');
continue;
}
let code_str = &line[..3];
let separator = line.as_bytes().get(3).copied();
if let Ok(code) = code_str.parse::<u16>() {
match separator {
Some(b' ') => {
let msg = &line[4..];
full_message.push_str(msg);
final_code = Some(code);
break;
}
Some(b'-') => {
let msg = &line[4..];
full_message.push_str(msg);
full_message.push('\n');
if final_code.is_none() {
final_code = Some(code);
}
}
_ => {
full_message.push_str(line);
full_message.push('\n');
}
}
} else {
full_message.push_str(line);
full_message.push('\n');
}
}
let code =
final_code.ok_or_else(|| Error::Http("FTP response has no status code".to_string()))?;
Ok(FtpResponse { code, message: full_message, raw_bytes })
}
pub async fn send_command<S: AsyncWrite + Unpin>(
stream: &mut S,
command: &str,
) -> Result<(), Error> {
let cmd = format!("{command}\r\n");
stream
.write_all(cmd.as_bytes())
.await
.map_err(|e| Error::Http(format!("FTP write error: {e}")))?;
stream.flush().await.map_err(|e| Error::Http(format!("FTP flush error: {e}")))?;
Ok(())
}
pub fn parse_pasv_response(message: &str) -> Result<(String, u16), Error> {
let start = message.find('(').ok_or_else(|| Error::Transfer {
code: 14,
message: "PASV response missing address".to_string(),
})?;
let end = message.find(')').ok_or_else(|| Error::Transfer {
code: 14,
message: "PASV response missing closing paren".to_string(),
})?;
let nums: Vec<u16> =
message[start + 1..end].split(',').filter_map(|s| s.trim().parse().ok()).collect();
if nums.len() != 6 {
return Err(Error::Transfer {
code: 14,
message: format!("PASV response has {} numbers, expected 6", nums.len()),
});
}
if nums[0] > 255 || nums[1] > 255 || nums[2] > 255 || nums[3] > 255 {
return Err(Error::Transfer {
code: 14,
message: format!(
"PASV response has invalid IP: {}.{}.{}.{}",
nums[0], nums[1], nums[2], nums[3]
),
});
}
if nums[4] > 255 || nums[5] > 255 {
return Err(Error::Transfer {
code: 14,
message: format!("PASV response has invalid port values: {},{}", nums[4], nums[5]),
});
}
let host = format!("{}.{}.{}.{}", nums[0], nums[1], nums[2], nums[3]);
let port = nums[4] * 256 + nums[5];
Ok((host, port))
}
pub fn parse_epsv_response(message: &str) -> Result<u16, Error> {
let start = message.find("|||").ok_or_else(|| Error::Transfer {
code: 13,
message: "EPSV response missing port delimiter".to_string(),
})?;
let rest = &message[start + 3..];
let end = rest.find('|').ok_or_else(|| Error::Transfer {
code: 13,
message: "EPSV response missing closing delimiter".to_string(),
})?;
let port_num: u32 = rest[..end].parse().map_err(|e| Error::Transfer {
code: 13,
message: format!("EPSV port parse error: {e}"),
})?;
if port_num == 0 || port_num > 65535 {
return Err(Error::Transfer {
code: 13,
message: format!("EPSV port out of range: {port_num}"),
});
}
#[allow(clippy::cast_possible_truncation)]
Ok(port_num as u16)
}
#[must_use]
pub fn parse_feat_response(message: &str) -> FtpFeatures {
let mut features = FtpFeatures::default();
for line in message.lines() {
let feature = line.trim().to_uppercase();
if feature.starts_with("EPSV") {
features.epsv = true;
} else if feature.starts_with("MLST") {
features.mlst = true;
} else if feature.starts_with("REST") && feature.contains("STREAM") {
features.rest_stream = true;
} else if feature.starts_with("SIZE") {
features.size = true;
} else if feature.starts_with("UTF8") {
features.utf8 = true;
} else if feature.starts_with("AUTH") && feature.contains("TLS") {
features.auth_tls = true;
}
if !feature.is_empty() {
features.raw.push(line.trim().to_string());
}
}
features
}
#[must_use]
pub fn format_port_command(addr: &SocketAddr) -> String {
match addr.ip() {
std::net::IpAddr::V4(ip) => {
let octets = ip.octets();
let port = addr.port();
format!(
"PORT {},{},{},{},{},{}",
octets[0],
octets[1],
octets[2],
octets[3],
port / 256,
port % 256
)
}
std::net::IpAddr::V6(_) => {
format_eprt_command(addr)
}
}
}
#[must_use]
pub fn format_eprt_command(addr: &SocketAddr) -> String {
let (proto, ip_str) = match addr.ip() {
std::net::IpAddr::V4(ip) => (1, ip.to_string()),
std::net::IpAddr::V6(ip) => (2, ip.to_string()),
};
format!("EPRT |{proto}|{ip_str}|{}|", addr.port())
}
#[allow(clippy::too_many_arguments)]
async fn connect_session(
host: &str,
port: u16,
user: &str,
pass: &str,
ssl_mode: FtpSslMode,
use_ssl: UseSsl,
tls_config: &crate::tls::TlsConfig,
config: FtpConfig,
proxy: Option<FtpProxyConfig>,
) -> Result<FtpSession, Error> {
match ssl_mode {
FtpSslMode::None => {
FtpSession::connect_maybe_proxy(host, port, user, pass, config, proxy).await
}
#[cfg(feature = "rustls")]
_ => {
if proxy.is_some() {
return Err(Error::Http("FTPS through proxy is not yet supported".to_string()));
}
FtpSession::connect_with_tls(
host, port, user, pass, ssl_mode, use_ssl, tls_config, config,
)
.await
}
#[cfg(not(feature = "rustls"))]
_ => {
let _ = (tls_config, config, use_ssl, proxy);
Err(Error::Http("FTPS requires the 'rustls' feature".to_string()))
}
}
}
#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
pub async fn perform(
url: &crate::url::Url,
upload_data: Option<&[u8]>,
ssl_mode: FtpSslMode,
use_ssl: UseSsl,
tls_config: &crate::tls::TlsConfig,
resume_from: Option<u64>,
config: &FtpConfig,
credentials: Option<(&str, &str)>,
ftp_session: &mut Option<FtpSession>,
proxy: Option<FtpProxyConfig>,
) -> Result<Response, Error> {
let range_end = config.range_end;
let (host, port) = url.host_and_port()?;
let raw_path = url.path();
let decoded_path = percent_decode(raw_path);
let path = decoded_path.as_str();
let url_creds = url.credentials();
let decoded_user;
let decoded_pass;
#[allow(clippy::option_if_let_else)]
let (user, pass) = if let Some(creds) = credentials {
creds
} else if let Some((raw_user, raw_pass)) = url_creds {
decoded_user = percent_decode(raw_user);
decoded_pass = percent_decode(raw_pass);
(decoded_user.as_str(), decoded_pass.as_str())
} else {
("anonymous", "ftp@example.com")
};
let is_dir_list = path.ends_with('/') && upload_data.is_none();
let (effective_path, type_override) = parse_ftp_type(path);
let is_reuse = if let Some(existing) = ftp_session.take() {
if existing.can_reuse(&host, port, user) {
*ftp_session = Some(existing);
true
} else {
let mut old = existing;
let _ = old.quit().await;
drop(old);
false
}
} else {
false
};
if !is_reuse {
let new_session = connect_session(
&host,
port,
user,
pass,
ssl_mode,
use_ssl,
tls_config,
config.clone(),
proxy,
)
.await?;
*ftp_session = Some(new_session);
}
let Some(session) = ftp_session.as_mut() else {
return Err(Error::Http("internal: FTP session missing".to_string()));
};
let result = perform_inner(
session,
url,
upload_data,
resume_from,
config,
is_reuse,
effective_path,
type_override,
is_dir_list,
range_end,
)
.await;
if let Err(ref e) = result {
if is_connection_error(e) {
let _ = ftp_session.take();
}
}
if matches!(&result, Err(Error::UrlParse(_))) {
let _ = ftp_session.take();
}
if let Err(Error::Transfer { code, .. }) = &result {
if matches!(code, 14 | 28 | 84) {
let _ = ftp_session.take();
}
}
if let Ok(ref resp) = result {
if resp.body_error().is_some() {
let _ = ftp_session.take();
}
}
result
}
const fn is_connection_error(e: &Error) -> bool {
matches!(e, Error::Connect(_) | Error::Io(_))
}
async fn send_type_if_needed(
session: &mut FtpSession,
transfer_type: TransferType,
) -> Result<(), Error> {
if session.current_type == Some(transfer_type) {
return Ok(());
}
let cmd = match transfer_type {
TransferType::Ascii => "TYPE A",
TransferType::Binary => "TYPE I",
};
send_command(&mut session.writer, cmd).await?;
let resp = session.read_and_record().await?;
if !resp.is_complete() {
return Err(Error::Transfer {
code: 17,
message: format!("FTP TYPE failed: {} {}", resp.code, resp.message),
});
}
session.current_type = Some(transfer_type);
Ok(())
}
async fn execute_quote_commands(
session: &mut FtpSession,
commands: &[String],
) -> Result<(), Error> {
for raw_cmd in commands {
#[allow(clippy::option_if_let_else)]
let (ignore_fail, actual_cmd) = if let Some(stripped) = raw_cmd.strip_prefix('*') {
(true, stripped)
} else {
(false, raw_cmd.as_str())
};
send_command(&mut session.writer, actual_cmd).await?;
let resp = session.read_and_record().await?;
if !ignore_fail && !resp.is_complete() && !resp.is_preliminary() {
return Err(Error::Transfer {
code: 21,
message: format!(
"FTP quote command '{}' failed: {} {}",
actual_cmd, resp.code, resp.message
),
});
}
}
Ok(())
}
#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
async fn perform_inner(
session: &mut FtpSession,
url: &crate::url::Url,
upload_data: Option<&[u8]>,
resume_from: Option<u64>,
config: &FtpConfig,
is_reuse: bool,
effective_path: &str,
type_override: Option<TransferType>,
is_dir_list: bool,
range_end: Option<u64>,
) -> Result<Response, Error> {
if !is_reuse {
let pwd_path = session.pwd_safe().await;
session.home_dir.clone_from(&pwd_path);
let pwd_not_slash = pwd_path.as_ref().is_some_and(|p| !p.starts_with('/'));
if pwd_not_slash {
send_command(&mut session.writer, "SYST").await?;
let syst_resp = session.read_and_record().await?;
if syst_resp.is_complete() && syst_resp.message.contains("OS/400") {
send_command(&mut session.writer, "SITE NAMEFMT 1").await?;
let _site_resp = session.read_and_record().await?;
let _pwd2 = session.pwd_safe().await;
}
}
}
if effective_path.contains('\0') {
return Err(Error::UrlParse("FTP path contains null byte".to_string()));
}
let (dir_components, filename) = if is_dir_list {
if config.method == FtpMethod::NoCwd {
(Vec::new(), String::new())
} else {
let trimmed = effective_path.trim_start_matches('/');
let trimmed = trimmed.trim_end_matches('/');
if trimmed.is_empty() {
if effective_path.starts_with("//") {
(vec!["/"], String::new())
} else {
(Vec::new(), String::new())
}
} else {
let components: Vec<&str> = trimmed.split('/').collect();
(components, String::new())
}
}
} else {
split_path_for_method(effective_path, config.method)
};
execute_quote_commands(session, &config.pre_quote).await?;
let target_dir: Vec<String> =
dir_components.iter().filter(|c| !c.is_empty()).map(ToString::to_string).collect();
let need_cwd = if is_reuse {
target_dir != session.current_dir
} else {
!target_dir.is_empty()
};
if need_cwd {
let did_reset_to_root = if is_reuse && !session.current_dir.is_empty() {
let reset_dir = session.home_dir.clone().unwrap_or_else(|| "/".to_string());
send_command(&mut session.writer, &format!("CWD {reset_dir}")).await?;
let cwd_resp = session.read_and_record().await?;
if !cwd_resp.is_complete() {
return Err(Error::Transfer {
code: 9,
message: format!(
"FTP CWD {reset_dir} failed: {} {}",
cwd_resp.code, cwd_resp.message
),
});
}
session.current_dir.clear();
true
} else {
false
};
for component in &dir_components {
if component.is_empty() {
continue;
}
if *component == "/" && did_reset_to_root {
continue;
}
send_command(&mut session.writer, &format!("CWD {component}")).await?;
let cwd_resp = session.read_and_record().await?;
if !cwd_resp.is_complete() {
if config.create_dirs {
send_command(&mut session.writer, &format!("MKD {component}")).await?;
let _mkd_resp = session.read_and_record().await?;
send_command(&mut session.writer, &format!("CWD {component}")).await?;
let retry_resp = session.read_and_record().await?;
if !retry_resp.is_complete() {
return Err(Error::Transfer {
code: 9,
message: format!(
"FTP CWD failed after MKD: {} {}",
retry_resp.code, retry_resp.message
),
});
}
} else if cwd_resp.code == 421 {
return Err(Error::Transfer {
code: 28,
message: format!(
"FTP server timeout: {} {}",
cwd_resp.code, cwd_resp.message
),
});
} else {
return Err(Error::Transfer {
code: 9,
message: format!("FTP CWD failed: {} {}", cwd_resp.code, cwd_resp.message),
});
}
}
}
session.current_dir = target_dir;
}
if config.nobody {
if is_dir_list {
let raw = std::mem::take(&mut session.header_bytes);
let headers = std::collections::HashMap::new();
let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
let mut last_modified: Option<String> = None;
let mut content_length: Option<String> = None;
if !filename.is_empty() {
send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
let mdtm_resp = session.read_and_record().await?;
if mdtm_resp.is_complete() {
let mdtm_str = mdtm_resp.message.trim();
if let Some(date) = format_mdtm_as_http_date(mdtm_str) {
last_modified = Some(date);
}
}
}
send_type_if_needed(session, TransferType::Binary).await?;
if !filename.is_empty() {
send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
let size_resp = session.read_and_record().await?;
if size_resp.is_complete() {
content_length = Some(size_resp.message.trim().to_string());
}
}
send_command(&mut session.writer, "REST 0").await?;
let _rest_resp = session.read_and_record().await?;
let raw = std::mem::take(&mut session.header_bytes);
let mut body_text = String::new();
if let Some(ref lm) = last_modified {
body_text.push_str("Last-Modified: ");
body_text.push_str(lm);
body_text.push_str("\r\n");
}
if let Some(ref cl) = content_length {
body_text.push_str("Content-Length: ");
body_text.push_str(cl);
body_text.push_str("\r\n");
}
body_text.push_str("Accept-ranges: bytes\r\n");
let mut headers = std::collections::HashMap::new();
if let Some(ref cl) = content_length {
let _old = headers.insert("content-length".to_string(), cl.clone());
}
if let Some(ref lm) = last_modified {
let _old = headers.insert("last-modified".to_string(), lm.clone());
}
let mut resp =
Response::new(200, headers, body_text.into_bytes(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
if let Some(upload_bytes) = upload_data {
if let Some((cond_ts, negate)) = config.time_condition {
send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
let mdtm_resp = session.read_and_record().await?;
if mdtm_resp.is_complete() {
let mdtm_str = mdtm_resp.message.trim();
if let Some(file_ts) = parse_mdtm_timestamp(mdtm_str) {
let should_skip = if negate { file_ts >= cond_ts } else { file_ts <= cond_ts };
if should_skip {
let raw = std::mem::take(&mut session.header_bytes);
let headers = std::collections::HashMap::new();
let mut resp =
Response::new(200, headers, Vec::new(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
}
}
}
let is_auto_resume = resume_from == Some(0);
let explicit_offset = resume_from.filter(|&o| o > 0);
let (mut effective_upload_data, mut use_appe) = if let Some(offset) = explicit_offset {
#[allow(clippy::cast_possible_truncation)]
let offset_usize = offset as usize;
if offset_usize >= upload_bytes.len() {
let _ = session.open_data_connection().await;
send_type_if_needed(session, TransferType::Binary).await?;
let raw = std::mem::take(&mut session.header_bytes);
let headers = std::collections::HashMap::new();
let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
(&upload_bytes[offset_usize..], true)
} else {
(upload_bytes, config.append)
};
let pret_cmd =
if config.append { format!("APPE {filename}") } else { format!("STOR {filename}") };
let data_conn_result = session.open_data_connection_with_pret(&pret_cmd).await;
let data_conn = match data_conn_result {
Ok(s) => s,
Err(e) => {
return Err(e);
}
};
let mut data_stream = data_conn.into_stream(session, None).await?;
let upload_type = match type_override {
Some(TransferType::Ascii) => TransferType::Ascii,
_ => TransferType::Binary,
};
send_type_if_needed(session, upload_type).await?;
if is_auto_resume {
send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
let size_resp = session.read_and_record().await?;
if size_resp.is_complete() {
if let Ok(remote_size) = size_resp.message.trim().parse::<u64>() {
if remote_size > 0 {
#[allow(clippy::cast_possible_truncation)]
let skip = remote_size as usize;
if skip >= upload_bytes.len() {
drop(data_stream);
let raw = std::mem::take(&mut session.header_bytes);
let headers = std::collections::HashMap::new();
let mut resp =
Response::new(200, headers, Vec::new(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
effective_upload_data = &upload_bytes[skip..];
use_appe = true;
}
}
}
}
let stor_cmd =
if use_appe { format!("APPE {filename}") } else { format!("STOR {filename}") };
send_command(&mut session.writer, &stor_cmd).await?;
let stor_resp = session.read_and_record().await?;
if !stor_resp.is_preliminary() && !stor_resp.is_complete() {
return Err(Error::Transfer {
code: 25,
message: format!("FTP STOR/APPE failed: {} {}", stor_resp.code, stor_resp.message),
});
}
let ascii_upload = config.crlf || type_override == Some(TransferType::Ascii);
if ascii_upload {
let converted = lf_to_crlf(effective_upload_data);
data_stream
.write_all(&converted)
.await
.map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
} else {
data_stream
.write_all(effective_upload_data)
.await
.map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
}
data_stream
.shutdown()
.await
.map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
drop(data_stream);
let complete_resp = session.read_and_record().await?;
if !complete_resp.is_complete() {
let code = if complete_resp.code == 452 || complete_resp.code == 552 { 70 } else { 25 };
return Err(Error::Transfer {
code,
message: format!(
"FTP upload failed: {} {}",
complete_resp.code, complete_resp.message
),
});
}
execute_quote_commands(session, &config.post_quote).await?;
let raw = std::mem::take(&mut session.header_bytes);
let headers = std::collections::HashMap::new();
let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
if is_dir_list {
let list_base = if config.list_only { "NLST" } else { "LIST" };
let data_conn_result = session.open_data_connection_with_pret(list_base).await;
let data_conn = match data_conn_result {
Ok(s) => s,
Err(e) => {
return Err(e);
}
};
let mut data_stream = data_conn.into_stream(session, None).await?;
send_type_if_needed(session, TransferType::Ascii).await?;
execute_quote_commands(session, &config.post_pasv_quote).await?;
let list_base = if config.list_only { "NLST" } else { "LIST" };
let list_cmd = if config.method == FtpMethod::NoCwd {
let path = effective_path.trim_end_matches('/');
let path =
if path.starts_with("//") { &path[1..] } else { path.trim_start_matches('/') };
if path.is_empty() {
format!("{list_base} /")
} else {
format!("{list_base} {path}")
}
} else {
list_base.to_string()
};
send_command(&mut session.writer, &list_cmd).await?;
let list_resp = session.read_and_record().await?;
if list_resp.is_negative_transient() {
drop(data_stream);
let raw = std::mem::take(&mut session.header_bytes);
let headers = std::collections::HashMap::new();
let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
if !list_resp.is_preliminary() && !list_resp.is_complete() {
return Err(Error::Transfer {
code: 19,
message: format!("FTP LIST failed: {} {}", list_resp.code, list_resp.message),
});
}
let mut data = Vec::new();
let _ = data_stream
.read_to_end(&mut data)
.await
.map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
drop(data_stream);
if list_resp.is_preliminary() {
let complete_resp = session.read_and_record().await?;
if !complete_resp.is_complete() {
return Err(Error::Http(format!(
"FTP transfer failed: {} {}",
complete_resp.code, complete_resp.message
)));
}
}
execute_quote_commands(session, &config.post_quote).await?;
let raw = std::mem::take(&mut session.header_bytes);
let mut headers = std::collections::HashMap::new();
let _old = headers.insert("content-length".to_string(), data.len().to_string());
let mut resp = Response::new(200, headers, data, url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
let transfer_type = type_override.unwrap_or(if config.use_ascii {
TransferType::Ascii
} else {
TransferType::Binary
});
let use_ascii = transfer_type == TransferType::Ascii;
if let Some((cond_ts, negate)) = config.time_condition {
send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
let mdtm_resp = session.read_and_record().await?;
if mdtm_resp.is_complete() {
let mdtm_str = mdtm_resp.message.trim();
if let Some(file_ts) = parse_mdtm_timestamp(mdtm_str) {
let should_skip = if negate {
file_ts >= cond_ts
} else {
file_ts <= cond_ts
};
if should_skip {
let raw = std::mem::take(&mut session.header_bytes);
let headers = std::collections::HashMap::new();
let mut resp =
Response::new(200, headers, Vec::new(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
}
}
}
let data_conn_result =
session.open_data_connection_with_pret(&format!("RETR {filename}")).await;
let data_conn = match data_conn_result {
Ok(s) => s,
Err(e) => {
return Err(e);
}
};
send_type_if_needed(session, transfer_type).await?;
execute_quote_commands(session, &config.post_pasv_quote).await?;
let mut remote_size: Option<u64> = None;
if !use_ascii && !config.ignore_content_length {
send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
let size_resp = session.read_and_record().await?;
if size_resp.is_complete() {
if let Ok(sz) = size_resp.message.trim().parse::<u64>() {
remote_size = Some(sz);
}
}
}
let mut resume_from = resume_from;
let mut range_end = range_end;
if let Some(from_end) = config.range_from_end {
if let Some(sz) = remote_size {
let offset = sz.saturating_sub(from_end);
resume_from = Some(offset);
range_end = Some(sz.saturating_sub(1));
}
}
if let (Some(max_size), Some(sz)) = (config.max_filesize, remote_size) {
if sz > max_size {
drop(data_conn);
return Err(Error::Transfer {
code: 63,
message: format!("Maximum file size exceeded ({sz} > {max_size})"),
});
}
}
if let Some(offset) = resume_from {
if let Some(sz) = remote_size {
if offset > sz {
drop(data_conn);
return Err(Error::Transfer {
code: 36,
message: format!("Offset ({offset}) was beyond the end of the file ({sz})"),
});
}
if offset == sz {
drop(data_conn);
let raw = std::mem::take(&mut session.header_bytes);
let headers = std::collections::HashMap::new();
let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
resp.set_raw_headers(raw);
return Ok(resp);
}
}
send_command(&mut session.writer, &format!("REST {offset}")).await?;
let rest_resp = session.read_and_record().await?;
if !rest_resp.is_intermediate() {
drop(data_conn);
return Err(Error::Transfer {
code: 36,
message: format!("FTP REST failed: {} {}", rest_resp.code, rest_resp.message),
});
}
}
send_command(&mut session.writer, &format!("RETR {filename}")).await?;
let retr_resp = session.read_and_record().await?;
if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
drop(data_conn);
let code = if retr_resp.code == 425 || retr_resp.code == 421 {
10
} else if retr_resp.code == 550 {
78
} else {
19
};
return Err(Error::Transfer {
code,
message: format!("FTP RETR failed: {} {}", retr_resp.code, retr_resp.message),
});
}
let mut data_stream = match data_conn {
DataConnection::Connected(stream) => stream,
DataConnection::PendingActive { listener, use_tls } => {
let accept_fut = listener.accept();
tokio::select! {
accept_result = accept_fut => {
let (tcp, _) = accept_result
.map_err(|e| Error::Http(format!("FTP active mode accept failed: {e}")))?;
if use_tls {
session.maybe_wrap_data_tls(tcp).await?
} else {
FtpStream::Plain(tcp)
}
}
ctrl_result = read_response(&mut session.reader) => {
let ctrl_resp = ctrl_result?;
session.header_bytes.extend_from_slice(&ctrl_resp.raw_bytes);
let code = if ctrl_resp.code == 425 || ctrl_resp.code == 421 {
10
} else {
19
};
return Err(Error::Transfer {
code,
message: format!("FTP RETR failed: {} {}", ctrl_resp.code, ctrl_resp.message),
});
}
}
}
};
let mut data = Vec::new();
let start_offset = resume_from.unwrap_or(0);
if let Some(end) = range_end {
#[allow(clippy::cast_possible_truncation)]
let max_bytes = (end - start_offset + 1) as usize;
let mut limited = data_stream.take(max_bytes as u64);
let _ = limited
.read_to_end(&mut data)
.await
.map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
drop(limited);
send_command(&mut session.writer, "ABOR").await?;
let _ = session.read_and_record().await;
} else {
let _ = data_stream
.read_to_end(&mut data)
.await
.map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
drop(data_stream);
}
if range_end.is_none() {
if let Some(expected) = remote_size {
let actual = data.len() as u64 + resume_from.unwrap_or(0);
if actual < expected {
let mut headers = std::collections::HashMap::new();
let _old = headers.insert("content-length".to_string(), data.len().to_string());
let mut resp = Response::new(200, headers, data, url.as_str().to_string());
resp.set_raw_headers(std::mem::take(&mut session.header_bytes));
resp.set_body_error(Some("partial".to_string()));
return Ok(resp);
}
}
}
if retr_resp.is_preliminary() && range_end.is_none() {
let complete_resp = session.read_and_record().await?;
if !complete_resp.is_complete() {
return Err(Error::Http(format!(
"FTP transfer failed: {} {}",
complete_resp.code, complete_resp.message
)));
}
}
execute_quote_commands(session, &config.post_quote).await?;
let raw = std::mem::take(&mut session.header_bytes);
let mut headers = std::collections::HashMap::new();
let _old = headers.insert("content-length".to_string(), data.len().to_string());
let mut resp = Response::new(200, headers, data, url.as_str().to_string());
resp.set_raw_headers(raw);
Ok(resp)
}
fn percent_decode(s: &str) -> String {
let mut result = String::with_capacity(s.len());
let mut chars = s.bytes();
while let Some(b) = chars.next() {
if b == b'%' {
let hi = chars.next();
let lo = chars.next();
if let (Some(h), Some(l)) = (hi, lo) {
let hex = [h, l];
if let Ok(s) = std::str::from_utf8(&hex) {
if let Ok(val) = u8::from_str_radix(s, 16) {
result.push(val as char);
continue;
}
}
result.push('%');
result.push(h as char);
result.push(l as char);
} else {
result.push('%');
}
} else {
result.push(b as char);
}
}
result
}
fn parse_mdtm_timestamp(s: &str) -> Option<i64> {
if s.len() < 14 {
return None;
}
let year: i64 = s[0..4].parse().ok()?;
let month: i64 = s[4..6].parse().ok()?;
let day: i64 = s[6..8].parse().ok()?;
let hour: i64 = s[8..10].parse().ok()?;
let min: i64 = s[10..12].parse().ok()?;
let sec: i64 = s[12..14].parse().ok()?;
let days = days_from_date(year, month, day)?;
Some(days * 86400 + hour * 3600 + min * 60 + sec)
}
fn days_from_date(year: i64, month: i64, day: i64) -> Option<i64> {
if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
return None;
}
let month_days: [i64; 12] = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334];
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let m = (month - 1) as usize;
let y = year - 1970;
let leap_years = if year > 1970 {
((year - 1) / 4 - (year - 1) / 100 + (year - 1) / 400)
- (1969 / 4 - 1969 / 100 + 1969 / 400)
} else {
0
};
let mut days = y * 365 + leap_years + month_days[m] + day - 1;
if month > 2 && (year % 4 == 0 && (year % 100 != 0 || year % 400 == 0)) {
days += 1;
}
Some(days)
}
fn format_mdtm_as_http_date(s: &str) -> Option<String> {
if s.len() < 14 {
return None;
}
let year: u32 = s[0..4].parse().ok()?;
let month: u32 = s[4..6].parse().ok()?;
let day: u32 = s[6..8].parse().ok()?;
let hour: u32 = s[8..10].parse().ok()?;
let min: u32 = s[10..12].parse().ok()?;
let sec: u32 = s[12..14].parse().ok()?;
let month_names =
["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"];
#[allow(clippy::cast_sign_loss)]
let month_name = month_names.get((month - 1) as usize)?;
let ts = parse_mdtm_timestamp(s)?;
#[allow(clippy::cast_sign_loss)]
let day_of_week = ((ts / 86400 + 4) % 7) as usize; let day_names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"];
let dow = day_names.get(day_of_week)?;
Some(format!("{dow}, {day:02} {month_name} {year} {hour:02}:{min:02}:{sec:02} GMT"))
}
fn parse_ftp_type(path: &str) -> (&str, Option<TransferType>) {
if let Some(pos) = path.rfind(";type=") {
let type_str = &path[pos + 6..];
let transfer_type = match type_str {
"A" | "a" => Some(TransferType::Ascii),
"I" | "i" => Some(TransferType::Binary),
_ => None,
};
if transfer_type.is_some() {
return (&path[..pos], transfer_type);
}
}
(path, None)
}
fn split_path_for_method(path: &str, method: FtpMethod) -> (Vec<&str>, String) {
let trimmed = path.trim_start_matches('/');
match method {
FtpMethod::NoCwd => {
let filename = if path.starts_with("//") { &path[1..] } else { trimmed };
(Vec::new(), filename.to_string())
}
FtpMethod::SingleCwd => {
if let Some((dir, file)) = trimmed.rsplit_once('/') {
if dir.is_empty() {
(vec!["/"], file.to_string())
} else {
(vec![dir], file.to_string())
}
} else if path.starts_with("//") {
(vec!["/"], trimmed.to_string())
} else {
(Vec::new(), trimmed.to_string())
}
}
FtpMethod::MultiCwd => {
if let Some((dir, file)) = trimmed.rsplit_once('/') {
let mut components = Vec::new();
if path.starts_with("//") {
components.push("/");
}
for component in dir.split('/') {
if !component.is_empty() {
components.push(component);
}
}
(components, file.to_string())
} else if path.starts_with("//") {
(vec!["/"], trimmed.to_string())
} else {
(Vec::new(), trimmed.to_string())
}
}
}
}
fn lf_to_crlf(data: &[u8]) -> Vec<u8> {
let mut result = Vec::with_capacity(data.len() + data.len() / 10);
let mut prev = 0u8;
for &byte in data {
if byte == b'\n' && prev != b'\r' {
result.push(b'\r');
}
result.push(byte);
prev = byte;
}
result
}
#[allow(clippy::too_many_lines)]
pub async fn download(
url: &crate::url::Url,
ssl_mode: FtpSslMode,
tls_config: &crate::tls::TlsConfig,
resume_from: Option<u64>,
config: &FtpConfig,
) -> Result<Response, Error> {
perform(
url,
None,
ssl_mode,
UseSsl::None,
tls_config,
resume_from,
config,
None,
&mut None,
None,
)
.await
}
#[allow(clippy::too_many_lines)]
pub async fn list(
url: &crate::url::Url,
ssl_mode: FtpSslMode,
tls_config: &crate::tls::TlsConfig,
config: &FtpConfig,
) -> Result<Response, Error> {
perform(url, None, ssl_mode, UseSsl::None, tls_config, None, config, None, &mut None, None)
.await
}
pub async fn upload(
url: &crate::url::Url,
data: &[u8],
ssl_mode: FtpSslMode,
tls_config: &crate::tls::TlsConfig,
config: &FtpConfig,
) -> Result<Response, Error> {
perform(
url,
Some(data),
ssl_mode,
UseSsl::None,
tls_config,
None,
config,
None,
&mut None,
None,
)
.await
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[tokio::test]
async fn read_simple_response() {
let data = b"220 Welcome to FTP\r\n";
let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
let resp = read_response(&mut reader).await.unwrap();
assert_eq!(resp.code, 220);
assert_eq!(resp.message, "Welcome to FTP");
}
#[tokio::test]
async fn read_multiline_response() {
let data = b"220-Welcome\r\n220-to the\r\n220 FTP server\r\n";
let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
let resp = read_response(&mut reader).await.unwrap();
assert_eq!(resp.code, 220);
assert!(resp.message.contains("Welcome"));
assert!(resp.message.contains("FTP server"));
}
#[tokio::test]
async fn read_response_connection_closed() {
let data = b"";
let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
let result = read_response(&mut reader).await;
assert!(result.is_err());
}
#[test]
fn parse_pasv_simple() {
let msg = "Entering Passive Mode (192,168,1,1,4,1)";
let (host, port) = parse_pasv_response(msg).unwrap();
assert_eq!(host, "192.168.1.1");
assert_eq!(port, 1025); }
#[test]
fn parse_pasv_high_port() {
let msg = "Entering Passive Mode (127,0,0,1,200,100)";
let (host, port) = parse_pasv_response(msg).unwrap();
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 51300); }
#[test]
fn parse_epsv_simple() {
let msg = "Entering Extended Passive Mode (|||12345|)";
let port = parse_epsv_response(msg).unwrap();
assert_eq!(port, 12345);
}
#[test]
fn ftp_response_status_categories() {
let preliminary = FtpResponse { code: 150, message: String::new(), raw_bytes: Vec::new() };
assert!(preliminary.is_preliminary());
assert!(!preliminary.is_complete());
let complete = FtpResponse { code: 226, message: String::new(), raw_bytes: Vec::new() };
assert!(complete.is_complete());
assert!(!complete.is_intermediate());
let intermediate = FtpResponse { code: 331, message: String::new(), raw_bytes: Vec::new() };
assert!(intermediate.is_intermediate());
assert!(!intermediate.is_complete());
}
#[test]
fn parse_feat_response_full() {
let message = "Extensions supported:\n EPSV\n MLST size*;modify*;type*\n REST STREAM\n SIZE\n UTF8\n AUTH TLS";
let features = parse_feat_response(message);
assert!(features.epsv);
assert!(features.mlst);
assert!(features.rest_stream);
assert!(features.size);
assert!(features.utf8);
assert!(features.auth_tls);
}
#[test]
fn parse_feat_response_minimal() {
let message = "SIZE\nREST STREAM";
let features = parse_feat_response(message);
assert!(features.size);
assert!(features.rest_stream);
assert!(!features.epsv);
assert!(!features.mlst);
}
#[test]
fn parse_feat_response_empty() {
let features = parse_feat_response("");
assert!(!features.epsv);
assert!(!features.mlst);
assert!(!features.rest_stream);
assert!(!features.size);
assert!(!features.utf8);
assert!(!features.auth_tls);
assert!(features.raw.is_empty());
}
#[test]
fn parse_feat_response_auth_tls() {
let message = "AUTH TLS\nAUTH SSL";
let features = parse_feat_response(message);
assert!(features.auth_tls);
}
#[test]
fn transfer_type_equality() {
assert_eq!(TransferType::Ascii, TransferType::Ascii);
assert_eq!(TransferType::Binary, TransferType::Binary);
assert_ne!(TransferType::Ascii, TransferType::Binary);
}
#[test]
fn ftp_features_default() {
let features = FtpFeatures::default();
assert!(!features.epsv);
assert!(!features.mlst);
assert!(!features.rest_stream);
assert!(!features.size);
assert!(!features.utf8);
assert!(!features.auth_tls);
assert!(features.raw.is_empty());
}
#[test]
fn ftp_ssl_mode_equality() {
assert_eq!(FtpSslMode::None, FtpSslMode::None);
assert_eq!(FtpSslMode::Explicit, FtpSslMode::Explicit);
assert_eq!(FtpSslMode::Implicit, FtpSslMode::Implicit);
assert_ne!(FtpSslMode::None, FtpSslMode::Explicit);
assert_ne!(FtpSslMode::Explicit, FtpSslMode::Implicit);
}
#[test]
fn ftp_method_default() {
assert_eq!(FtpMethod::default(), FtpMethod::MultiCwd);
}
#[test]
fn ftp_method_equality() {
assert_eq!(FtpMethod::MultiCwd, FtpMethod::MultiCwd);
assert_eq!(FtpMethod::SingleCwd, FtpMethod::SingleCwd);
assert_eq!(FtpMethod::NoCwd, FtpMethod::NoCwd);
assert_ne!(FtpMethod::MultiCwd, FtpMethod::SingleCwd);
assert_ne!(FtpMethod::SingleCwd, FtpMethod::NoCwd);
}
#[test]
fn format_port_ipv4() {
let addr: SocketAddr = "192.168.1.100:12345".parse().unwrap();
let cmd = format_port_command(&addr);
assert_eq!(cmd, "PORT 192,168,1,100,48,57");
}
#[test]
fn format_port_low_port() {
let addr: SocketAddr = "10.0.0.1:21".parse().unwrap();
let cmd = format_port_command(&addr);
assert_eq!(cmd, "PORT 10,0,0,1,0,21");
}
#[test]
fn format_port_high_port() {
let addr: SocketAddr = "127.0.0.1:65535".parse().unwrap();
let cmd = format_port_command(&addr);
assert_eq!(cmd, "PORT 127,0,0,1,255,255");
}
#[test]
fn format_eprt_ipv4() {
let addr: SocketAddr = "192.168.1.100:12345".parse().unwrap();
let cmd = format_eprt_command(&addr);
assert_eq!(cmd, "EPRT |1|192.168.1.100|12345|");
}
#[test]
fn format_eprt_ipv6() {
let addr: SocketAddr = "[::1]:54321".parse().unwrap();
let cmd = format_eprt_command(&addr);
assert_eq!(cmd, "EPRT |2|::1|54321|");
}
#[test]
fn format_port_roundtrip() {
let addr: SocketAddr = "10.20.30.40:5000".parse().unwrap();
let cmd = format_port_command(&addr);
assert!(cmd.starts_with("PORT "));
let nums: Vec<&str> = cmd[5..].split(',').collect();
assert_eq!(nums.len(), 6);
let h1: u16 = nums[0].parse().unwrap();
let h2: u16 = nums[1].parse().unwrap();
let h3: u16 = nums[2].parse().unwrap();
let h4: u16 = nums[3].parse().unwrap();
let p1: u16 = nums[4].parse().unwrap();
let p2: u16 = nums[5].parse().unwrap();
assert_eq!(format!("{h1}.{h2}.{h3}.{h4}"), "10.20.30.40");
assert_eq!(p1 * 256 + p2, 5000);
}
#[tokio::test]
async fn send_command_format() {
let mut buf = Vec::new();
send_command(&mut buf, "USER test").await.unwrap();
assert_eq!(buf, b"USER test\r\n");
}
#[tokio::test]
async fn send_command_feat() {
let mut buf = Vec::new();
send_command(&mut buf, "FEAT").await.unwrap();
assert_eq!(buf, b"FEAT\r\n");
}
#[tokio::test]
async fn read_auth_tls_response() {
let data = b"234 AUTH TLS OK\r\n";
let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
let resp = read_response(&mut reader).await.unwrap();
assert_eq!(resp.code, 234);
assert!(resp.is_complete());
}
#[tokio::test]
async fn read_pbsz_response() {
let data = b"200 PBSZ=0\r\n";
let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
let resp = read_response(&mut reader).await.unwrap();
assert_eq!(resp.code, 200);
assert!(resp.is_complete());
}
#[tokio::test]
async fn read_prot_p_response() {
let data = b"200 Protection set to Private\r\n";
let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
let resp = read_response(&mut reader).await.unwrap();
assert_eq!(resp.code, 200);
assert!(resp.is_complete());
}
#[cfg(feature = "rustls")]
#[test]
fn tls_connector_no_alpn_creates_ok() {
let tls_config = crate::tls::TlsConfig::default();
let connector = crate::tls::TlsConnector::new_no_alpn(&tls_config);
assert!(connector.is_ok());
}
#[test]
fn ftp_config_default() {
let config = FtpConfig::default();
assert!(config.use_epsv);
assert!(config.use_eprt);
assert!(!config.skip_pasv_ip);
assert!(config.account.is_none());
assert!(!config.create_dirs);
assert_eq!(config.method, FtpMethod::MultiCwd);
assert!(config.active_port.is_none());
}
#[test]
fn ftp_config_clone() {
let config = FtpConfig {
use_epsv: false,
use_eprt: false,
skip_pasv_ip: true,
account: Some("myacct".to_string()),
create_dirs: true,
method: FtpMethod::NoCwd,
active_port: Some("-".to_string()),
..Default::default()
};
#[allow(clippy::redundant_clone)] let cloned = config.clone();
assert!(!cloned.use_epsv);
assert!(!cloned.use_eprt);
assert!(cloned.skip_pasv_ip);
assert_eq!(cloned.account.as_deref(), Some("myacct"));
assert!(cloned.create_dirs);
assert_eq!(cloned.method, FtpMethod::NoCwd);
assert_eq!(cloned.active_port.as_deref(), Some("-"));
}
}