mod data_stream;
mod tls;
use std::future::Future;
#[cfg(not(feature = "async-secure"))]
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::pin::Pin;
use std::string::String;
use std::time::Duration;
use async_std::io::prelude::BufReadExt;
use async_std::io::{BufReader, Read, Write, copy};
use async_std::net::{TcpListener, TcpStream, ToSocketAddrs};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
pub use data_stream::DataStream;
use futures_lite::AsyncWriteExt;
#[cfg(feature = "async-secure")]
pub use tls::AsyncTlsConnector;
#[cfg(feature = "async-std-async-native-tls")]
pub use tls::{AsyncNativeTlsConnector, AsyncNativeTlsStream};
pub use tls::{AsyncNoTlsStream, AsyncStdTlsStream};
#[cfg(any(
feature = "async-std-rustls-aws-lc-rs",
feature = "async-std-rustls-ring"
))]
pub use tls::{AsyncRustlsConnector, AsyncRustlsStream};
use super::super::Status;
use super::super::regex::{EPSV_PORT_RE, MDTM_RE, SIZE_RE};
use super::super::types::{FileType, FtpError, FtpResult, Mode, Response};
use crate::FtpStream;
use crate::command::Command;
#[cfg(feature = "async-secure")]
use crate::command::ProtectionLevel;
use crate::types::Features;
pub type AsyncStdPassiveStreamBuilder = dyn Fn(SocketAddr) -> Pin<Box<dyn Future<Output = FtpResult<TcpStream>> + Send + Sync>>
+ Send
+ Sync;
pub struct ImplAsyncFtpStream<T>
where
T: AsyncStdTlsStream + Send,
{
reader: BufReader<DataStream<T>>,
mode: Mode,
nat_workaround: bool,
welcome_msg: Option<String>,
active_timeout: Duration,
passive_stream_builder: Box<AsyncStdPassiveStreamBuilder>,
data_connection_open: bool,
#[cfg(not(feature = "async-secure"))]
marker: PhantomData<T>,
#[cfg(feature = "async-secure")]
tls_ctx: Option<Box<dyn AsyncTlsConnector<Stream = T> + Send + Sync + 'static>>,
#[cfg(feature = "async-secure")]
domain: Option<String>,
}
impl<T> ImplAsyncFtpStream<T>
where
T: AsyncStdTlsStream + Send,
{
pub async fn connect<A: ToSocketAddrs>(addr: A) -> FtpResult<Self> {
debug!("Connecting to server");
let stream = TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)?;
debug!("Established connection with server");
Self::connect_with_stream(stream).await
}
pub async fn connect_timeout(addr: SocketAddr, timeout: Duration) -> FtpResult<Self> {
debug!("Connecting to server {addr}");
let stream = async_std::io::timeout(timeout, async move { TcpStream::connect(addr).await })
.await
.map_err(FtpError::ConnectionError)?;
Self::connect_with_stream(stream).await
}
pub async fn connect_with_stream(stream: TcpStream) -> FtpResult<Self> {
debug!("Established connection with server");
let mut ftp_stream = ImplAsyncFtpStream {
reader: BufReader::new(DataStream::Tcp(stream)),
#[cfg(not(feature = "async-secure"))]
marker: PhantomData {},
mode: Mode::Passive,
data_connection_open: false,
nat_workaround: false,
passive_stream_builder: Self::default_passive_stream_builder(),
welcome_msg: None,
#[cfg(feature = "async-secure")]
tls_ctx: None,
#[cfg(feature = "async-secure")]
domain: None,
active_timeout: Duration::from_secs(60),
};
debug!("Reading server response...");
match ftp_stream.read_response(Status::Ready).await {
Ok(response) => {
let welcome_msg = response.as_string().ok();
debug!("Server READY; response: {:?}", welcome_msg);
ftp_stream.welcome_msg = welcome_msg;
Ok(ftp_stream)
}
Err(err) => Err(err),
}
}
#[cfg(feature = "async-secure")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-secure")))]
pub async fn into_secure(
mut self,
tls_connector: impl AsyncTlsConnector<Stream = T> + Send + Sync + 'static,
domain: &str,
) -> FtpResult<Self> {
debug!("Initializing TLS auth");
self.perform(Command::Auth).await?;
self.read_response(Status::AuthOk).await?;
debug!("TLS OK; initializing ssl stream");
let stream = tls_connector
.connect(
domain,
self.reader.into_inner().into_tcp_stream().to_owned(),
)
.await
.map_err(|e| FtpError::SecureError(format!("{e}")))?;
let mut secured_ftp_tream = ImplAsyncFtpStream {
reader: BufReader::new(DataStream::Ssl(Box::new(stream))),
mode: self.mode,
data_connection_open: self.data_connection_open,
nat_workaround: self.nat_workaround,
passive_stream_builder: self.passive_stream_builder,
tls_ctx: Some(Box::new(tls_connector)),
domain: Some(String::from(domain)),
welcome_msg: self.welcome_msg,
active_timeout: self.active_timeout,
};
secured_ftp_tream.perform(Command::Pbsz(0)).await?;
secured_ftp_tream.read_response(Status::CommandOk).await?;
secured_ftp_tream
.perform(Command::Prot(ProtectionLevel::Private))
.await?;
secured_ftp_tream.read_response(Status::CommandOk).await?;
Ok(secured_ftp_tream)
}
#[cfg(all(feature = "async-secure", feature = "deprecated"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "async-secure", feature = "deprecated")))
)]
pub async fn connect_secure_implicit<A: ToSocketAddrs>(
addr: A,
tls_connector: impl AsyncTlsConnector<Stream = T> + Send + Sync + 'static,
domain: &str,
) -> FtpResult<Self> {
debug!("Connecting to server (secure)");
let stream = TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)
.map(|stream| {
debug!("Established connection with server");
Self {
reader: BufReader::new(DataStream::Tcp(stream)),
mode: Mode::Passive,
data_connection_open: false,
nat_workaround: false,
welcome_msg: None,
passive_stream_builder: Self::default_passive_stream_builder(),
tls_ctx: None,
domain: None,
active_timeout: Duration::from_secs(60),
}
})?;
debug!("Established connection with server");
debug!("TLS OK; initializing ssl stream");
let stream = tls_connector
.connect(domain, stream.reader.into_inner().into_tcp_stream())
.await
.map_err(|e| FtpError::SecureError(format!("{e}")))?;
debug!("TLS Steam OK");
let mut stream = ImplAsyncFtpStream {
reader: BufReader::new(DataStream::Ssl(stream.into())),
mode: Mode::Passive,
nat_workaround: false,
data_connection_open: false,
passive_stream_builder: Self::default_passive_stream_builder(),
tls_ctx: Some(Box::new(tls_connector)),
domain: Some(String::from(domain)),
welcome_msg: None,
active_timeout: Duration::from_secs(60),
};
debug!("Reading server response...");
match stream.read_response(Status::Ready).await {
Ok(response) => {
let welcome_msg = response.as_string().ok();
debug!("Server READY; response: {:?}", welcome_msg);
stream.welcome_msg = welcome_msg;
}
Err(err) => return Err(err),
}
Ok(stream)
}
pub fn active_mode(mut self, listener_timeout: Duration) -> Self {
self.mode = Mode::Active;
self.active_timeout = listener_timeout;
self
}
pub fn passive_stream_builder<F>(mut self, stream_builder: F) -> Self
where
F: Fn(SocketAddr) -> Pin<Box<dyn Future<Output = FtpResult<TcpStream>> + Send + Sync>>
+ Send
+ Sync
+ 'static,
{
self.passive_stream_builder = Box::new(stream_builder);
self
}
pub fn get_welcome_msg(&self) -> Option<&str> {
self.welcome_msg.as_deref()
}
pub fn set_mode(&mut self, mode: Mode) {
debug!("Changed mode to {:?}", mode);
self.mode = mode;
}
pub fn set_passive_nat_workaround(&mut self, nat_workaround: bool) {
self.nat_workaround = nat_workaround;
}
pub fn get_ref(&self) -> &TcpStream {
self.reader.get_ref().get_ref()
}
pub async fn login<S: AsRef<str>>(&mut self, user: S, password: S) -> FtpResult<()> {
debug!("Signin in with user '{}'", user.as_ref());
self.perform(Command::User(user.as_ref().to_string()))
.await?;
let response = self
.read_response_in(&[Status::LoggedIn, Status::NeedPassword])
.await?;
if response.status == Status::NeedPassword {
debug!("Password is required");
self.perform(Command::Pass(password.as_ref().to_string()))
.await?;
self.read_response(Status::LoggedIn).await?;
}
debug!("Login OK");
Ok(())
}
#[cfg(feature = "async-secure")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-secure")))]
pub async fn clear_command_channel(mut self) -> FtpResult<Self> {
debug!("performing clear command channel");
self.perform(Command::ClearCommandChannel).await?;
self.read_response(Status::CommandOk).await?;
trace!("CCC OK");
self.reader = BufReader::new(DataStream::Tcp(self.reader.into_inner().into_tcp_stream()));
Ok(self)
}
pub async fn cwd<S: AsRef<str>>(&mut self, path: S) -> FtpResult<()> {
debug!("Changing working directory to {}", path.as_ref());
self.perform(Command::Cwd(path.as_ref().to_string()))
.await?;
self.read_response_in(&[Status::CommandOk, Status::RequestedFileActionOk])
.await
.map(|_| ())
}
pub async fn cdup(&mut self) -> FtpResult<()> {
debug!("Going to parent directory");
self.perform(Command::Cdup).await?;
self.read_response_in(&[Status::CommandOk, Status::RequestedFileActionOk])
.await
.map(|_| ())
}
pub async fn pwd(&mut self) -> FtpResult<String> {
debug!("Getting working directory");
self.perform(Command::Pwd).await?;
let response = self.read_response(Status::PathCreated).await?;
let body = response.as_string().map_err(|_| FtpError::BadResponse)?;
let status = response.status;
match (body.find('"'), body.rfind('"')) {
(Some(begin), Some(end)) if begin < end => Ok(body[begin + 1..end].to_string()),
_ => Err(FtpError::UnexpectedResponse(Response::new(
status,
response.body,
))),
}
}
pub async fn noop(&mut self) -> FtpResult<()> {
debug!("Pinging server");
self.perform(Command::Noop).await?;
self.read_response(Status::CommandOk).await.map(|_| ())
}
pub async fn eprt(&mut self, address: SocketAddr) -> FtpResult<()> {
debug!("EPRT with address {address}");
self.perform(Command::Eprt(address)).await?;
self.read_response(Status::CommandOk).await.map(|_| ())
}
pub async fn mkdir<S: AsRef<str>>(&mut self, pathname: S) -> FtpResult<()> {
debug!("Creating directory at {}", pathname.as_ref());
self.perform(Command::Mkd(pathname.as_ref().to_string()))
.await?;
self.read_response(Status::PathCreated).await.map(|_| ())
}
pub async fn transfer_type(&mut self, file_type: FileType) -> FtpResult<()> {
debug!("Setting transfer type {}", file_type);
self.perform(Command::Type(file_type)).await?;
self.read_response(Status::CommandOk).await.map(|_| ())
}
pub async fn quit(&mut self) -> FtpResult<()> {
debug!("Quitting stream");
self.perform(Command::Quit).await?;
self.read_response(Status::Closing).await.map(|_| ())
}
pub async fn rename<S: AsRef<str>>(&mut self, from_name: S, to_name: S) -> FtpResult<()> {
debug!(
"Renaming '{}' to '{}'",
from_name.as_ref(),
to_name.as_ref()
);
self.perform(Command::RenameFrom(from_name.as_ref().to_string()))
.await?;
self.read_response(Status::RequestFilePending).await?;
self.perform(Command::RenameTo(to_name.as_ref().to_string()))
.await?;
self.read_response(Status::RequestedFileActionOk)
.await
.map(|_| ())
}
pub async fn retr<S, F, U>(&mut self, file_name: S, mut reader: F) -> FtpResult<U>
where
F: FnMut(
DataStream<T>,
) -> Pin<Box<dyn Future<Output = FtpResult<(U, DataStream<T>)>> + Send>>,
S: AsRef<str>,
{
match self.retr_as_stream(file_name).await {
Ok(stream) => {
let (result, stream) = reader(stream).await?;
self.finalize_retr_stream(stream).await?;
Ok(result)
}
Err(err) => Err(err),
}
}
pub async fn retr_as_stream<S: AsRef<str>>(
&mut self,
file_name: S,
) -> FtpResult<DataStream<T>> {
debug!("Retrieving '{}'", file_name.as_ref());
let data_stream = self
.data_command(Command::Retr(file_name.as_ref().to_string()))
.await?;
self.read_response_in(&[Status::AboutToSend, Status::AlreadyOpen])
.await?;
Ok(data_stream)
}
pub async fn finalize_retr_stream(&mut self, stream: impl Read) -> FtpResult<()> {
debug!("Finalizing retr stream");
drop(stream);
self.data_connection_open = false;
trace!("dropped stream");
self.read_response_in(&[Status::ClosingDataConnection, Status::RequestedFileActionOk])
.await
.map(|_| ())
}
pub async fn rmdir<S: AsRef<str>>(&mut self, pathname: S) -> FtpResult<()> {
debug!("Removing directory {}", pathname.as_ref());
self.perform(Command::Rmd(pathname.as_ref().to_string()))
.await?;
self.read_response(Status::RequestedFileActionOk)
.await
.map(|_| ())
}
pub async fn rm<S: AsRef<str>>(&mut self, filename: S) -> FtpResult<()> {
debug!("Removing file {}", filename.as_ref());
self.perform(Command::Dele(filename.as_ref().to_string()))
.await?;
self.read_response(Status::RequestedFileActionOk)
.await
.map(|_| ())
}
pub async fn put_file<S, R>(&mut self, filename: S, r: &mut R) -> FtpResult<u64>
where
R: Read + std::marker::Unpin,
S: AsRef<str>,
{
let mut data_stream = self.put_with_stream(filename).await?;
let bytes = copy(r, &mut data_stream)
.await
.map_err(FtpError::ConnectionError)?;
self.finalize_put_stream(data_stream).await?;
Ok(bytes)
}
pub async fn put_with_stream<S: AsRef<str>>(
&mut self,
filename: S,
) -> FtpResult<DataStream<T>> {
debug!("Put file {}", filename.as_ref());
let stream = self
.data_command(Command::Store(filename.as_ref().to_string()))
.await?;
self.read_response_in(&[Status::AlreadyOpen, Status::AboutToSend])
.await?;
Ok(stream)
}
pub async fn finalize_put_stream(&mut self, mut stream: impl Write + Unpin) -> FtpResult<()> {
debug!("Finalizing put stream");
stream.close().await.map_err(FtpError::ConnectionError)?;
drop(stream);
self.data_connection_open = false;
trace!("Stream dropped");
self.read_response_in(&[Status::ClosingDataConnection, Status::RequestedFileActionOk])
.await
.map(|_| ())
}
pub async fn append_with_stream<S: AsRef<str>>(
&mut self,
filename: S,
) -> FtpResult<DataStream<T>> {
debug!("Appending to file {}", filename.as_ref());
let stream = self
.data_command(Command::Appe(filename.as_ref().to_string()))
.await?;
self.read_response_in(&[Status::AlreadyOpen, Status::AboutToSend])
.await?;
Ok(stream)
}
pub async fn append_file<R>(&mut self, filename: &str, r: &mut R) -> FtpResult<u64>
where
R: Read + std::marker::Unpin,
{
let mut data_stream = self.append_with_stream(filename).await?;
let bytes = copy(r, &mut data_stream)
.await
.map_err(FtpError::ConnectionError)?;
self.finalize_put_stream(Box::new(data_stream)).await?;
Ok(bytes)
}
pub async fn abort<R>(&mut self, data_stream: R) -> FtpResult<()>
where
R: Read + std::marker::Unpin + 'static,
{
debug!("Aborting active file transfer");
self.perform(Command::Abor).await?;
drop(data_stream);
self.data_connection_open = false;
trace!("dropped stream");
let response = self
.read_response_in(&[Status::ClosingDataConnection, Status::TransferAborted])
.await?;
if response.status == Status::TransferAborted {
self.read_response(Status::ClosingDataConnection).await?;
}
trace!("Transfer aborted");
Ok(())
}
pub async fn resume_transfer(&mut self, offset: usize) -> FtpResult<()> {
debug!("Requesting to resume transfer at offset {}", offset);
self.perform(Command::Rest(offset)).await?;
self.read_response(Status::RequestFilePending).await?;
debug!("Resume transfer accepted");
Ok(())
}
pub async fn list(&mut self, pathname: Option<&str>) -> FtpResult<Vec<String>> {
debug!(
"Reading {} directory content",
pathname.unwrap_or("working")
);
self.stream_lines(
Command::List(pathname.map(|x| x.to_string())),
Status::AboutToSend,
)
.await
}
pub async fn nlst(&mut self, pathname: Option<&str>) -> FtpResult<Vec<String>> {
debug!(
"Getting file names for {} directory",
pathname.unwrap_or("working")
);
self.stream_lines(
Command::Nlst(pathname.map(|x| x.to_string())),
Status::AboutToSend,
)
.await
}
pub async fn mlsd(&mut self, pathname: Option<&str>) -> FtpResult<Vec<String>> {
debug!(
"Reading {} directory content",
pathname.unwrap_or("working")
);
self.stream_lines(
Command::Mlsd(pathname.map(|x| x.to_string())),
Status::AboutToSend,
)
.await
}
pub async fn mlst(&mut self, pathname: Option<&str>) -> FtpResult<String> {
debug!("Reading {} path information", pathname.unwrap_or("working"));
self.perform(Command::Mlst(pathname.map(|x| x.to_string())))
.await?;
let response = self
.read_response_in(&[Status::RequestedFileActionOk])
.await?;
let response_str = String::from_utf8_lossy(&response.body).to_string();
match response_str.lines().nth(1) {
Some("") => Err(FtpError::BadResponse),
Some(line) => Ok(line.trim().to_string()),
None => Err(FtpError::BadResponse),
}
}
pub async fn mdtm<S: AsRef<str>>(&mut self, pathname: S) -> FtpResult<NaiveDateTime> {
debug!("Getting modification time for {}", pathname.as_ref());
self.perform(Command::Mdtm(pathname.as_ref().to_string()))
.await?;
let response: Response = self.read_response(Status::File).await?;
let body = response.as_string().map_err(|_| FtpError::BadResponse)?;
match MDTM_RE.captures(&body) {
Some(caps) => {
let (year, month, day) = (
caps[1].parse::<i32>().map_err(|_| FtpError::BadResponse)?,
caps[2].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
caps[3].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
);
let (hour, minute, second) = (
caps[4].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
caps[5].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
caps[6].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
);
let date = match NaiveDate::from_ymd_opt(year, month, day) {
Some(d) => d,
None => return Err(FtpError::BadResponse),
};
let time = match NaiveTime::from_hms_opt(hour, minute, second) {
Some(t) => t,
None => return Err(FtpError::BadResponse),
};
Ok(NaiveDateTime::new(date, time))
}
None => Err(FtpError::BadResponse),
}
}
pub async fn size<S: AsRef<str>>(&mut self, pathname: S) -> FtpResult<usize> {
debug!("Getting file size for {}", pathname.as_ref());
self.perform(Command::Size(pathname.as_ref().to_string()))
.await?;
let response: Response = self.read_response(Status::File).await?;
let body = response.as_string().map_err(|_| FtpError::BadResponse)?;
match SIZE_RE.captures(&body) {
Some(caps) => caps[1].parse().map_err(|_| FtpError::BadResponse),
None => Err(FtpError::BadResponse),
}
}
pub async fn feat(&mut self) -> FtpResult<Features> {
debug!("Getting server supported features");
self.perform(Command::Feat).await?;
let response = self.read_response(Status::System).await?;
let first_line = String::from_utf8_lossy(&response.body);
debug!("FEAT response: {}", first_line);
let mut feat_lines = vec![first_line.to_string()];
loop {
let mut line = Vec::new();
let bytes_read = self.read_line(&mut line).await?;
if bytes_read == 0 {
break;
}
let line = String::from_utf8_lossy(&line);
trace!("FEAT IN: {:?}", line);
feat_lines.push(line.to_string());
if crate::command::feat::is_last_line(&line) {
break;
}
}
crate::command::feat::parse_features(&feat_lines)
}
pub async fn opts(
&mut self,
option: impl ToString,
value: Option<impl ToString>,
) -> FtpResult<()> {
debug!("Getting server supported features");
self.perform(Command::Opts(
option.to_string(),
value.map(|x| x.to_string()),
))
.await?;
self.read_response(Status::CommandOk).await?;
Ok(())
}
pub async fn site(&mut self, command: impl ToString) -> FtpResult<Response> {
debug!("Sending SITE command: {}", command.to_string());
self.perform(Command::Site(command.to_string())).await?;
self.read_response(Status::CommandOk).await
}
pub async fn custom_command(
&mut self,
command: impl ToString,
expected_code: &[Status],
) -> FtpResult<Response> {
let command = command.to_string();
debug!("Sending custom command: {}", command);
self.perform(Command::Custom(command)).await?;
self.read_response_in(expected_code).await
}
pub async fn custom_data_command(
&mut self,
command: impl ToString,
expected_code: &[Status],
) -> FtpResult<(Response, DataStream<T>)> {
let command = command.to_string();
debug!("Sending custom data command: {}", command);
let data_stream = self.data_command(Command::Custom(command)).await?;
let response = self.read_response_in(expected_code).await?;
Ok((response, data_stream))
}
pub async fn close_data_connection(&mut self, stream: impl Read) -> FtpResult<()> {
debug!("closing data connection");
drop(stream);
self.data_connection_open = false;
trace!("dropped stream");
self.read_response_in(&[Status::ClosingDataConnection, Status::RequestedFileActionOk])
.await
.map(|_| ())
}
pub async fn get_lines_from_stream(
data_stream: &mut BufReader<DataStream<T>>,
) -> FtpResult<Vec<String>> {
let mut lines: Vec<String> = Vec::new();
loop {
let mut line_buf = vec![];
match data_stream.read_until(b'\n', &mut line_buf).await {
Ok(0) => break,
Ok(len) => {
let mut line = String::from_utf8_lossy(&line_buf[..len]).to_string();
trace!("STREAM IN: {:?}", line);
if line.ends_with('\n') {
line.pop();
}
if line.ends_with('\r') {
line.pop();
}
if line.is_empty() {
continue;
}
lines.push(line);
}
Err(err) => {
error!("failed to get lines from stream: {err}");
return Err(FtpError::BadResponse);
}
}
}
trace!("Lines from stream {:?}", lines);
Ok(lines)
}
async fn data_command(&mut self, cmd: Command) -> FtpResult<DataStream<T>> {
self.guard_multiple_data_connections()?;
let stream = match self.mode {
Mode::Active => {
let listener = self.active().await?;
self.perform(cmd).await?;
match async_std::future::timeout(self.active_timeout, listener.accept()).await {
Ok(Ok((stream, addr))) => {
debug!("Connection received from {}", addr);
stream
}
Ok(Err(e)) => return Err(FtpError::ConnectionError(e)), Err(e) => {
return Err(FtpError::ConnectionError(std::io::Error::new(
std::io::ErrorKind::TimedOut,
e,
)));
}
}
}
Mode::ExtendedPassive => {
let addr = self.epsv().await?;
self.perform(cmd).await?;
(self.passive_stream_builder)(addr).await?
}
Mode::Passive => {
let addr = self.pasv().await?;
self.perform(cmd).await?;
(self.passive_stream_builder)(addr).await?
}
};
#[cfg(not(feature = "async-secure"))]
let result = Ok(DataStream::Tcp(stream));
#[cfg(feature = "async-secure")]
let result = match self.tls_ctx {
Some(ref tls_ctx) => tls_ctx
.connect(self.domain.as_ref().unwrap(), stream)
.await
.map(|x| DataStream::Ssl(Box::new(x)))
.map_err(|e| FtpError::SecureError(format!("{e}"))),
None => Ok(DataStream::Tcp(stream)),
};
if result.is_ok() {
self.data_connection_open = true;
}
result
}
async fn epsv(&mut self) -> FtpResult<SocketAddr> {
debug!("EPSV command");
self.perform(Command::Epsv).await?;
let response: Response = self.read_response(Status::ExtendedPassiveMode).await?;
let response_str = response.as_string().map_err(|_| FtpError::BadResponse)?;
let caps = EPSV_PORT_RE
.captures(&response_str)
.ok_or_else(|| FtpError::UnexpectedResponse(response.clone()))?;
let new_port = caps[1].parse::<u16>().map_err(|_| FtpError::BadResponse)?;
trace!("Got port number from EPSV: {}", new_port);
let mut remote = self
.reader
.get_ref()
.get_ref()
.peer_addr()
.map_err(FtpError::ConnectionError)?;
remote.set_port(new_port);
trace!("Remote address for extended passive mode is {}", remote);
Ok(remote)
}
async fn pasv(&mut self) -> FtpResult<SocketAddr> {
debug!("PASV command");
self.perform(Command::Pasv).await?;
let response: Response = self.read_response(Status::PassiveMode).await?;
let addr = FtpStream::parse_passive_address_from_response(response)?;
trace!("Passive address: {}", addr);
if self.nat_workaround {
let mut remote = self
.reader
.get_ref()
.get_ref()
.peer_addr()
.map_err(FtpError::ConnectionError)?;
remote.set_port(addr.port());
trace!("Replacing site local address {} with {}", addr, remote);
Ok(remote)
} else {
Ok(addr)
}
}
async fn active(&mut self) -> FtpResult<TcpListener> {
debug!("Starting local tcp listener...");
let conn = TcpListener::bind("0.0.0.0:0")
.await
.map_err(FtpError::ConnectionError)?;
let addr = conn.local_addr().map_err(FtpError::ConnectionError)?;
trace!("Local address is {}", addr);
let ip = match self.reader.get_mut() {
DataStream::Tcp(stream) => stream.local_addr().map_err(FtpError::ConnectionError)?.ip(),
DataStream::Ssl(stream) => stream
.get_ref()
.local_addr()
.map_err(FtpError::ConnectionError)?
.ip(),
};
debug!("Active mode, listening on {}:{}", ip, addr.port());
match ip {
std::net::IpAddr::V4(_) => {
let msb = addr.port() / 256;
let lsb = addr.port() % 256;
let ip_port = format!("{},{},{}", ip.to_string().replace('.', ","), msb, lsb);
debug!("Running PORT command");
self.perform(Command::Port(ip_port)).await?;
}
std::net::IpAddr::V6(_) => {
debug!("Running EPRT command");
self.perform(Command::Eprt(SocketAddr::new(ip, addr.port())))
.await?;
}
}
self.read_response(Status::CommandOk).await?;
Ok(conn)
}
async fn perform(&mut self, command: Command) -> FtpResult<()> {
let command = command.to_string();
trace!("CC OUT: {}", command.trim_end_matches("\r\n"));
let stream = self.reader.get_mut();
stream
.write_all(command.as_bytes())
.await
.map_err(FtpError::ConnectionError)
}
pub async fn read_response(&mut self, expected_code: Status) -> FtpResult<Response> {
self.read_response_in(&[expected_code]).await
}
pub async fn read_response_in(&mut self, expected_code: &[Status]) -> FtpResult<Response> {
let mut line = Vec::new();
let mut body: Vec<u8> = Vec::new();
self.read_line(&mut line).await?;
body.extend(line.iter());
trace!("CC IN: {:?}", line);
if line.len() < 5 {
return Err(FtpError::BadResponse);
}
let code_word: u32 = self.code_from_buffer(&line, 3)?;
let code = Status::from(code_word);
trace!("Code parsed from response: {} ({})", code, code_word);
let expected = [line[0], line[1], line[2], 0x20];
let alt_expected = if expected_code.contains(&Status::System) {
[line[0], line[1], line[2], b'-']
} else {
expected
};
trace!("CC IN: {:?}", line);
while line.len() < 5 || (line[0..4] != expected && line[0..4] != alt_expected) {
line.clear();
let bytes_read = self.read_line(&mut line).await?;
if bytes_read == 0 {
return Err(FtpError::ConnectionError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"connection closed during multiline response",
)));
}
body.extend(line.iter());
trace!("CC IN: {:?}", line);
}
let response: Response = Response::new(code, body);
if expected_code.contains(&code) {
Ok(response)
} else {
Err(FtpError::UnexpectedResponse(response))
}
}
async fn read_line(&mut self, line: &mut Vec<u8>) -> FtpResult<usize> {
self.reader
.read_until(0x0A, line.as_mut())
.await
.map_err(FtpError::ConnectionError)?;
Ok(line.len())
}
fn code_from_buffer(&self, buf: &[u8], len: usize) -> Result<u32, FtpError> {
if buf.len() < len {
return Err(FtpError::BadResponse);
}
let buffer = buf[0..len].to_vec();
let as_string = String::from_utf8(buffer).map_err(|_| FtpError::BadResponse)?;
as_string.parse::<u32>().map_err(|_| FtpError::BadResponse)
}
async fn stream_lines(&mut self, cmd: Command, open_code: Status) -> FtpResult<Vec<String>> {
let mut data_stream = BufReader::new(self.data_command(cmd).await?);
self.read_response_in(&[open_code, Status::AlreadyOpen])
.await?;
let lines = Self::get_lines_from_stream(&mut data_stream).await;
self.finalize_retr_stream(data_stream).await?;
lines
}
fn default_passive_stream_builder() -> Box<AsyncStdPassiveStreamBuilder> {
Box::new(|address| {
Box::pin(async move {
TcpStream::connect(address)
.await
.map_err(FtpError::ConnectionError)
})
})
}
fn guard_multiple_data_connections(&self) -> FtpResult<()> {
if self.data_connection_open {
Err(FtpError::DataConnectionAlreadyOpen)
} else {
Ok(())
}
}
}
#[cfg(test)]
mod test {
use std::str::FromStr as _;
use std::sync::Arc;
use async_std::io::ReadExt;
#[cfg(feature = "async-secure")]
use pretty_assertions::assert_eq;
use rand::distr::Alphanumeric;
use rand::{Rng, rng};
use super::super::async_std::AsyncFtpStream;
use super::*;
use crate::test_container::SyncPureFtpRunner;
use crate::types::FormatControl;
use crate::{FtpError, Status};
#[async_attributes::test]
async fn connect() {
crate::log_init();
let (stream, _container) = setup_stream().await;
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn should_change_mode() {
crate::log_init();
let (mut stream, _container) = setup_stream().await;
assert_eq!(stream.mode, Mode::Passive);
stream.set_mode(Mode::Active);
assert_eq!(stream.mode, Mode::Active);
}
#[async_attributes::test]
async fn should_connect_with_timeout() {
crate::log_init();
let container = SyncPureFtpRunner::start();
let port = container.get_ftp_port();
let url = format!("127.0.0.1:{port}");
let addr: SocketAddr = url.parse().expect("invalid hostname");
let mut stream = AsyncFtpStream::connect_timeout(addr, Duration::from_secs(15))
.await
.unwrap();
assert!(stream.login("test", "test").await.is_ok());
assert!(stream.get_welcome_msg().unwrap().contains("220 "));
}
#[async_attributes::test]
async fn welcome_message() {
crate::log_init();
let (stream, _container) = setup_stream().await;
assert!(stream.get_welcome_msg().unwrap().contains("220 "));
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn should_set_passive_nat_workaround() {
crate::log_init();
let (mut stream, _container) = setup_stream().await;
stream.set_passive_nat_workaround(true);
assert!(stream.nat_workaround);
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn get_ref() {
let (stream, _container) = setup_stream().await;
assert!(stream.get_ref().set_ttl(255).is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn change_wrkdir() {
let (mut stream, _container) = setup_stream().await;
let wrkdir: String = stream.pwd().await.unwrap();
assert!(stream.cwd("/").await.is_ok());
assert_eq!(stream.pwd().await.unwrap().as_str(), "/");
assert!(stream.cwd(wrkdir.as_str()).await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn cd_up() {
let (mut stream, _container) = setup_stream().await;
let wrkdir: String = stream.pwd().await.unwrap();
assert!(stream.cdup().await.is_ok());
assert_eq!(stream.pwd().await.unwrap().as_str(), "/home/test");
assert!(stream.cwd(wrkdir.as_str()).await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn noop() {
let (mut stream, _container) = setup_stream().await;
assert!(stream.noop().await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn make_and_remove_dir() {
let (mut stream, _container) = setup_stream().await;
assert!(stream.mkdir("omar").await.is_ok());
match stream.mkdir("omar").await.err().unwrap() {
FtpError::UnexpectedResponse(Response { status, body: _ }) => {
assert_eq!(status, Status::FileUnavailable)
}
err => panic!("Expected UnexpectedResponse, got {}", err),
}
assert!(stream.rmdir("omar").await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn should_get_feat_and_set_opts() {
let (mut stream, _container) = setup_stream().await;
let features = stream.feat().await.expect("failed to get features");
assert!(features.contains_key("UTF8"));
assert!(stream.opts("UTF8", Some("ON")).await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn set_transfer_type() {
let (mut stream, _container) = setup_stream().await;
assert!(stream.transfer_type(FileType::Binary).await.is_ok());
assert!(
stream
.transfer_type(FileType::Ascii(FormatControl::Default))
.await
.is_ok()
);
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_should_use_retr() {
use async_std::io::Cursor;
let (mut stream, _container) = setup_stream().await;
assert!(stream.transfer_type(FileType::Binary).await.is_ok());
let file_data = "test data\n";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("test.txt", &mut reader).await.is_ok());
let reader = stream
.retr("test.txt", |mut reader| {
Box::pin(async move {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.expect("failed to read");
Ok((buf, reader))
})
})
.await
.unwrap();
assert_eq!(reader, "test data\n".as_bytes());
}
#[async_attributes::test]
async fn transfer_file() {
use async_std::io::Cursor;
let (mut stream, _container) = setup_stream().await;
assert!(stream.transfer_type(FileType::Binary).await.is_ok());
let file_data = "test data\n";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("test.txt", &mut reader).await.is_ok());
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.append_file("test.txt", &mut reader).await.is_ok());
let mut reader = stream.retr_as_stream("test.txt").await.unwrap();
let mut buffer = Vec::new();
assert!(
async_std::io::ReadExt::read_to_end(&mut reader, &mut buffer)
.await
.is_ok()
);
assert_eq!(buffer.as_slice(), "test data\ntest data\n".as_bytes());
assert!(stream.finalize_retr_stream(reader).await.is_ok());
assert_eq!(stream.size("test.txt").await.unwrap(), 20);
assert!(stream.size("omarone.txt").await.is_err());
assert_eq!(stream.list(None).await.unwrap().len(), 1);
assert_eq!(stream.nlst(None).await.unwrap().as_slice(), &["test.txt"]);
assert!(stream.mdtm("test.txt").await.is_ok());
assert!(stream.rm("test.txt").await.is_ok());
assert!(stream.mdtm("test.txt").await.is_err());
let file_data = "test data\n";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("test.txt", &mut reader).await.is_ok());
assert!(stream.rename("test.txt", "toast.txt").await.is_ok());
assert!(stream.rm("toast.txt").await.is_ok());
assert_eq!(stream.list(None).await.unwrap().len(), 0);
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn should_resume_transfer() {
let (mut stream, container) = setup_stream().await;
assert!(stream.transfer_type(FileType::Binary).await.is_ok());
let wrkdir = stream.pwd().await.unwrap();
let mut transfer_stream = stream.put_with_stream("test.bin").await.unwrap();
assert_eq!(
transfer_stream
.write(&[0x00, 0x01, 0x02, 0x03, 0x04])
.await
.unwrap(),
5
);
drop(stream);
drop(transfer_stream);
let port = container.get_ftp_port();
let url = format!("localhost:{port}");
let mut stream = AsyncFtpStream::connect(url).await.unwrap();
assert!(stream.login("test", "test").await.is_ok());
let tempdir: String = generate_tempdir();
assert!(stream.mkdir(tempdir.as_str()).await.is_ok());
assert!(stream.cwd(tempdir.as_str()).await.is_ok());
let container_t = container.clone();
let mut stream = stream.passive_stream_builder(move |addr| {
let container_t = container_t.clone();
Box::pin(async move {
let mut addr = addr.clone();
let port = addr.port();
let mapped = container_t.get_mapped_port(port);
addr.set_port(mapped);
info!("mapped port {port} to {mapped} for PASV");
TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)
})
});
assert!(stream.cwd(wrkdir).await.is_ok());
assert!(stream.transfer_type(FileType::Binary).await.is_ok());
assert!(stream.resume_transfer(5).await.is_ok());
let mut transfer_stream = stream.put_with_stream("test.bin").await.unwrap();
assert_eq!(
transfer_stream
.write(&[0x05, 0x06, 0x07, 0x08, 0x09, 0x0a])
.await
.unwrap(),
6
);
assert!(stream.finalize_put_stream(transfer_stream).await.is_ok());
assert!(stream.rm("test.bin").await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_should_transfer_file_with_extended_passive_mode() {
crate::log_init();
use async_std::io::Cursor;
let (mut stream, _container) = setup_stream().await;
assert!(stream.transfer_type(FileType::Binary).await.is_ok());
stream.set_mode(Mode::ExtendedPassive);
let file_data = "test data\n";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("test.txt", &mut reader).await.is_ok());
assert!(stream.rm("test.txt").await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_should_list_files_with_non_utf8_names() {
let (mut stream, container) = setup_stream().await;
let files = stream
.nlst(Some("/home/test/invalid-utf8/"))
.await
.expect("Failed to list files");
assert_eq!(files.len(), 1);
let files = stream
.list(Some("/home/test/invalid-utf8/"))
.await
.expect("Failed to list files");
assert_eq!(files.len(), 1);
crate::list::File::from_str(files[0].as_str()).expect("Failed to parse file");
finalize_stream(stream).await;
drop(container);
}
#[async_attributes::test]
async fn test_should_perform_custom_command() {
let (mut stream, _container) = setup_stream().await;
let command = "PWD";
assert!(
stream
.custom_command(command, &[Status::PathCreated])
.await
.is_ok()
);
}
#[async_attributes::test]
async fn test_should_perform_custom_data_command() {
let (mut stream, _container) = setup_stream().await;
let command = "LIST";
let (response, data_stream) = stream
.custom_data_command(command, &[Status::AboutToSend])
.await
.expect("Failed to perform custom data command");
assert_eq!(response.status, Status::AboutToSend);
let mut reader = async_std::io::BufReader::new(data_stream);
AsyncFtpStream::get_lines_from_stream(&mut reader)
.await
.expect("Failed to get lines from stream");
assert!(stream.close_data_connection(reader).await.is_ok());
}
#[async_attributes::test]
async fn test_should_prevent_multiple_data_connections() {
let (mut stream, _container) = setup_stream().await;
let command = "LIST";
let _data_stream = stream
.custom_data_command(command, &[Status::AboutToSend])
.await
.expect("Failed to perform custom data command");
match stream
.custom_data_command(command, &[Status::AboutToSend])
.await
{
Err(FtpError::DataConnectionAlreadyOpen) => {}
_ => panic!("Expected DataConnectionAlreadyOpen error"),
}
match stream.retr_as_stream("somefile.txt").await {
Err(FtpError::DataConnectionAlreadyOpen) => {}
_ => panic!("Expected DataConnectionAlreadyOpen error"),
}
match stream.put_with_stream("somefile.txt").await {
Err(FtpError::DataConnectionAlreadyOpen) => {}
_ => panic!("Expected DataConnectionAlreadyOpen error"),
}
match stream.append_with_stream("somefile.txt").await {
Err(FtpError::DataConnectionAlreadyOpen) => {}
_ => panic!("Expected DataConnectionAlreadyOpen error"),
}
}
#[async_attributes::test]
async fn test_should_free_data_connection_after_close() {
let (mut stream, _container) = setup_stream().await;
let command = "LIST";
let (response, data_stream) = stream
.custom_data_command(command, &[Status::AboutToSend])
.await
.expect("Failed to perform custom data command");
assert_eq!(response.status, Status::AboutToSend);
let mut reader = async_std::io::BufReader::new(data_stream);
AsyncFtpStream::get_lines_from_stream(&mut reader)
.await
.expect("Failed to get lines from stream");
assert!(stream.close_data_connection(reader).await.is_ok());
let (response, data_stream) = stream
.custom_data_command(command, &[Status::AboutToSend])
.await
.expect("Failed to perform custom data command");
assert_eq!(response.status, Status::AboutToSend);
let mut reader = async_std::io::BufReader::new(data_stream);
AsyncFtpStream::get_lines_from_stream(&mut reader)
.await
.expect("Failed to get lines from stream");
assert!(stream.close_data_connection(reader).await.is_ok());
}
#[async_attributes::test]
async fn test_abort_transfer() {
crate::log_init();
let (mut stream, _container) = setup_stream().await;
assert!(stream.transfer_type(FileType::Binary).await.is_ok());
let file_data = "test data for abort\n";
let mut reader = async_std::io::Cursor::new(file_data.as_bytes());
assert!(stream.put_file("abort_test.txt", &mut reader).await.is_ok());
let data_stream = stream.retr_as_stream("abort_test.txt").await.unwrap();
assert!(stream.abort(data_stream).await.is_ok());
drop(stream);
}
#[async_attributes::test]
async fn test_append_with_stream() {
let (mut stream, _container) = setup_stream().await;
assert!(stream.transfer_type(FileType::Binary).await.is_ok());
let mut reader = async_std::io::Cursor::new("part1".as_bytes());
assert!(
stream
.put_file("append_stream.txt", &mut reader)
.await
.is_ok()
);
let mut data_stream = stream
.append_with_stream("append_stream.txt")
.await
.unwrap();
async_std::io::WriteExt::write_all(&mut data_stream, b"part2")
.await
.unwrap();
stream.finalize_put_stream(data_stream).await.unwrap();
let mut reader = stream.retr_as_stream("append_stream.txt").await.unwrap();
let mut buffer = Vec::new();
async_std::io::ReadExt::read_to_end(&mut reader, &mut buffer)
.await
.unwrap();
stream.finalize_retr_stream(reader).await.unwrap();
assert_eq!(buffer, b"part1part2");
assert!(stream.rm("append_stream.txt").await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_active_mode_builder() {
crate::log_init();
let stream = AsyncFtpStream::connect("test.rebex.net:21").await.unwrap();
let stream = stream.active_mode(Duration::from_secs(30));
assert_eq!(stream.mode, Mode::Active);
assert_eq!(stream.active_timeout, Duration::from_secs(30));
}
#[async_attributes::test]
async fn test_passive_stream_builder() {
crate::log_init();
let stream = AsyncFtpStream::connect("test.rebex.net:21").await.unwrap();
let stream = stream.passive_stream_builder(|addr| {
Box::pin(async move {
TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)
})
});
assert_eq!(stream.mode, Mode::Passive);
}
#[async_attributes::test]
async fn test_site_command() {
let (mut stream, _container) = setup_stream().await;
let result = stream.site("HELP").await;
match result {
Ok(response) => {
assert_eq!(response.status, Status::CommandOk);
}
Err(FtpError::UnexpectedResponse(_)) => {}
Err(err) => panic!("Unexpected error: {}", err),
}
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_eprt_command() {
let (mut stream, _container) = setup_stream().await;
let addr: SocketAddr = "127.0.0.1:12345".parse().unwrap();
let _ = stream.eprt(addr).await;
}
#[async_attributes::test]
async fn test_transfer_type_variants() {
let (mut stream, _container) = setup_stream().await;
assert!(
stream
.transfer_type(FileType::Ascii(FormatControl::NonPrint))
.await
.is_ok()
);
assert!(stream.transfer_type(FileType::Image).await.is_ok());
let _ = stream.transfer_type(FileType::Local(8)).await;
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_cwd_error() {
let (mut stream, _container) = setup_stream().await;
match stream.cwd("/nonexistent/directory/path").await {
Err(FtpError::UnexpectedResponse(response)) => {
assert_eq!(response.status, Status::FileUnavailable);
}
_ => panic!("Expected UnexpectedResponse for nonexistent directory"),
}
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_rm_nonexistent_file() {
let (mut stream, _container) = setup_stream().await;
match stream.rm("nonexistent_file.txt").await {
Err(FtpError::UnexpectedResponse(_)) => {}
_ => panic!("Expected error when removing nonexistent file"),
}
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_rmdir_nonexistent() {
let (mut stream, _container) = setup_stream().await;
match stream.rmdir("nonexistent_dir").await {
Err(FtpError::UnexpectedResponse(_)) => {}
_ => panic!("Expected error when removing nonexistent directory"),
}
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_rename_nonexistent() {
let (mut stream, _container) = setup_stream().await;
match stream.rename("nonexistent.txt", "new_name.txt").await {
Err(FtpError::UnexpectedResponse(_)) => {}
_ => panic!("Expected error when renaming nonexistent file"),
}
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_retr_nonexistent_file() {
let (mut stream, _container) = setup_stream().await;
match stream.retr_as_stream("nonexistent_file.txt").await {
Err(FtpError::UnexpectedResponse(_)) => {}
_ => panic!("Expected error when retrieving nonexistent file"),
}
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_list_nonexistent_path() {
let (mut stream, _container) = setup_stream().await;
let _ = stream.list(Some("/nonexistent/path")).await;
}
#[async_attributes::test]
async fn test_nlst_with_path() {
let (mut stream, _container) = setup_stream().await;
let mut reader = async_std::io::Cursor::new("data".as_bytes());
assert!(stream.put_file("nlst_test.txt", &mut reader).await.is_ok());
let files = stream.nlst(None).await.unwrap();
assert!(files.contains(&"nlst_test.txt".to_string()));
assert!(stream.rm("nlst_test.txt").await.is_ok());
finalize_stream(stream).await;
}
#[async_attributes::test]
async fn test_list_with_explicit_current_dir() {
let (mut stream, _container) = setup_stream().await;
let mut reader = async_std::io::Cursor::new("data".as_bytes());
assert!(stream.put_file("list_test.txt", &mut reader).await.is_ok());
let files = stream.list(Some(".")).await.unwrap();
assert!(!files.is_empty());
assert!(stream.rm("list_test.txt").await.is_ok());
finalize_stream(stream).await;
}
fn is_send<T: Send>(_send: T) {}
#[async_attributes::test]
#[ignore = "just needs to compile"]
async fn test_ftp_stream_should_be_send() {
crate::log_init();
let ftp_stream = AsyncFtpStream::connect("test.rebex.net:21")
.await
.unwrap()
.passive_stream_builder(|addr| {
Box::pin(async move {
println!("Connecting to {}", addr);
TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)
})
});
is_send::<AsyncFtpStream>(ftp_stream);
}
fn is_sync<T: Sync>(_send: T) {}
#[async_attributes::test]
#[ignore = "just needs to compile"]
async fn test_ftp_stream_should_be_sync() {
crate::log_init();
let ftp_stream = AsyncFtpStream::connect("test.rebex.net:21")
.await
.unwrap()
.passive_stream_builder(|addr| {
Box::pin(async move {
println!("Connecting to {}", addr);
TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)
})
});
is_sync::<AsyncFtpStream>(ftp_stream);
}
async fn setup_stream() -> (AsyncFtpStream, Arc<SyncPureFtpRunner>) {
crate::log_init();
let container = Arc::new(SyncPureFtpRunner::start());
let port = container.get_ftp_port();
let url = format!("localhost:{port}");
let mut ftp_stream = ImplAsyncFtpStream::connect(url).await.unwrap();
assert!(ftp_stream.login("test", "test").await.is_ok());
let tempdir: String = generate_tempdir();
assert!(ftp_stream.mkdir(tempdir.as_str()).await.is_ok());
assert!(ftp_stream.cwd(tempdir.as_str()).await.is_ok());
let container_t = container.clone();
let ftp_stream = ftp_stream.passive_stream_builder(move |addr| {
let container_t = container_t.clone();
Box::pin(async move {
let mut addr = addr.clone();
let port = addr.port();
let mapped = container_t.get_mapped_port(port);
addr.set_port(mapped);
info!("mapped port {port} to {mapped} for PASV");
TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)
})
});
(ftp_stream, container)
}
async fn finalize_stream(mut stream: AsyncFtpStream) {
assert!(stream.quit().await.is_ok());
}
fn generate_tempdir() -> String {
let mut rng = rng();
let name: String = std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.map(char::from)
.take(5)
.collect();
format!("temp_{}", name)
}
}