mod data_stream;
mod tls;
use std::io::{BufRead, BufReader, Cursor, Read, Write, copy};
#[cfg(not(feature = "secure"))]
use std::marker::PhantomData;
use std::net::{Ipv4Addr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::time::{Duration, Instant};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
pub use data_stream::DataStream;
#[cfg(feature = "secure")]
pub use tls::TlsConnector;
#[cfg(feature = "native-tls")]
pub use tls::{NativeTlsConnector, NativeTlsStream};
pub use tls::{NoTlsStream, TlsStream};
#[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
pub use tls::{RustlsConnector, RustlsStream};
use super::Status;
use super::regex::{EPSV_PORT_RE, MDTM_RE, PASV_PORT_RE, SIZE_RE};
use super::types::{FileType, FtpError, FtpResult, Mode, Response};
use crate::command::Command;
#[cfg(feature = "secure")]
use crate::command::ProtectionLevel;
use crate::types::Features;
pub type PassiveStreamBuilder = dyn Fn(SocketAddr) -> FtpResult<TcpStream> + Send + Sync;
pub struct ImplFtpStream<T>
where
T: TlsStream,
{
reader: BufReader<DataStream<T>>,
mode: Mode,
nat_workaround: bool,
welcome_msg: Option<String>,
data_connection_open: bool,
active_timeout: Duration,
passive_stream_builder: Box<PassiveStreamBuilder>,
#[cfg(not(feature = "secure"))]
marker: PhantomData<T>,
#[cfg(feature = "secure")]
tls_ctx: Option<Box<dyn TlsConnector<Stream = T> + Send + Sync + 'static>>,
#[cfg(feature = "secure")]
domain: Option<String>,
}
impl<T> ImplFtpStream<T>
where
T: TlsStream,
{
pub fn connect<A: ToSocketAddrs>(addr: A) -> FtpResult<Self> {
debug!("Connecting to server");
TcpStream::connect(addr)
.map_err(FtpError::ConnectionError)
.and_then(|stream| Self::connect_with_stream(stream))
}
pub fn connect_timeout(addr: std::net::SocketAddr, timeout: Duration) -> FtpResult<Self> {
debug!("Connecting to server {addr}");
TcpStream::connect_timeout(&addr, timeout)
.map_err(FtpError::ConnectionError)
.and_then(|stream| Self::connect_with_stream(stream))
}
pub fn connect_with_stream(stream: TcpStream) -> FtpResult<Self> {
debug!("Established connection with server");
let mut ftp_stream = Self {
reader: BufReader::new(DataStream::Tcp(stream)),
mode: Mode::Passive,
nat_workaround: false,
welcome_msg: None,
data_connection_open: false,
active_timeout: Duration::from_secs(60),
passive_stream_builder: Self::default_passive_stream_builder(),
#[cfg(feature = "secure")]
tls_ctx: None,
#[cfg(feature = "secure")]
domain: None,
#[cfg(not(feature = "secure"))]
marker: PhantomData {},
};
debug!("Reading server response...");
match ftp_stream.read_response(Status::Ready) {
Ok(response) => {
let welcome_msg = response.as_string().ok();
debug!("Server READY; response: {:?}", welcome_msg);
ftp_stream.welcome_msg = welcome_msg;
Ok(ftp_stream)
}
Err(err) => Err(err),
}
}
pub fn active_mode(mut self, accept_timeout: Duration) -> Self {
self.mode = Mode::Active;
self.active_timeout = accept_timeout;
self
}
pub fn passive_stream_builder<F>(mut self, stream_builder: F) -> Self
where
F: Fn(SocketAddr) -> FtpResult<TcpStream> + Send + Sync + 'static,
{
self.passive_stream_builder = Box::new(stream_builder);
self
}
pub fn set_mode(&mut self, mode: Mode) {
debug!("Changed mode to {:?}", mode);
self.mode = mode;
}
pub fn set_passive_nat_workaround(&mut self, nat_workaround: bool) {
self.nat_workaround = nat_workaround;
}
#[cfg(feature = "secure")]
#[cfg_attr(docsrs, doc(cfg(feature = "secure")))]
pub fn into_secure(
mut self,
tls_connector: impl TlsConnector<Stream = T> + Send + Sync + 'static,
domain: &str,
) -> FtpResult<Self> {
debug!("Initializing TLS auth");
self.perform(Command::Auth)?;
self.read_response(Status::AuthOk)?;
debug!("TLS OK; initializing ssl stream");
let stream = tls_connector
.connect(domain, self.reader.into_inner().into_tcp_stream())
.map_err(|e| FtpError::SecureError(format!("{e}")))?;
debug!("TLS Steam OK");
let mut secured_ftp_tream = Self {
reader: BufReader::new(DataStream::Ssl(Box::new(stream))),
mode: self.mode,
data_connection_open: self.data_connection_open,
nat_workaround: self.nat_workaround,
passive_stream_builder: self.passive_stream_builder,
tls_ctx: Some(Box::new(tls_connector)),
domain: Some(String::from(domain)),
welcome_msg: self.welcome_msg,
active_timeout: self.active_timeout,
};
secured_ftp_tream.perform(Command::Pbsz(0))?;
secured_ftp_tream.read_response(Status::CommandOk)?;
secured_ftp_tream.perform(Command::Prot(ProtectionLevel::Private))?;
secured_ftp_tream.read_response(Status::CommandOk)?;
Ok(secured_ftp_tream)
}
#[cfg(all(feature = "secure", feature = "deprecated"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "secure", feature = "deprecated"))))]
pub fn connect_secure_implicit<A: ToSocketAddrs>(
addr: A,
tls_connector: impl TlsConnector<Stream = T> + Send + Sync + 'static,
domain: &str,
) -> FtpResult<Self> {
debug!("Connecting to server (secure)");
let stream = TcpStream::connect(addr)
.map_err(FtpError::ConnectionError)
.map(|stream| {
debug!("Established connection with server");
Self {
reader: BufReader::new(DataStream::Tcp(stream)),
mode: Mode::Passive,
nat_workaround: false,
data_connection_open: false,
passive_stream_builder: Self::default_passive_stream_builder(),
welcome_msg: None,
tls_ctx: None,
domain: None,
active_timeout: Duration::from_secs(60),
}
})?;
debug!("Established connection with server");
debug!("TLS OK; initializing ssl stream");
let stream = tls_connector
.connect(domain, stream.reader.into_inner().into_tcp_stream())
.map_err(|e| FtpError::SecureError(format!("{e}")))?;
debug!("TLS Steam OK");
let mut stream = Self {
reader: BufReader::new(DataStream::Ssl(Box::new(stream))),
mode: Mode::Passive,
nat_workaround: false,
data_connection_open: false,
tls_ctx: Some(Box::new(tls_connector)),
passive_stream_builder: Self::default_passive_stream_builder(),
domain: Some(String::from(domain)),
welcome_msg: None,
active_timeout: Duration::from_secs(60),
};
debug!("Reading server response...");
match stream.read_response(Status::Ready) {
Ok(response) => {
let welcome_msg = response.as_string().ok();
debug!("Server READY; response: {:?}", welcome_msg);
stream.welcome_msg = welcome_msg;
}
Err(err) => return Err(err),
}
Ok(stream)
}
pub fn get_welcome_msg(&self) -> Option<&str> {
self.welcome_msg.as_deref()
}
pub fn get_ref(&self) -> &TcpStream {
self.reader.get_ref().get_ref()
}
pub fn login<S: AsRef<str>>(&mut self, user: S, password: S) -> FtpResult<()> {
debug!("Signin in with user '{}'", user.as_ref());
self.perform(Command::User(user.as_ref().to_string()))?;
self.read_response_in(&[Status::LoggedIn, Status::NeedPassword])
.and_then(|Response { status, body: _ }| {
if status == Status::NeedPassword {
debug!("Password is required");
self.perform(Command::Pass(password.as_ref().to_string()))?;
self.read_response(Status::LoggedIn)?;
}
debug!("Login OK");
Ok(())
})
}
#[cfg(feature = "secure")]
#[cfg_attr(docsrs, doc(cfg(feature = "secure")))]
pub fn clear_command_channel(mut self) -> FtpResult<Self> {
debug!("performing clear command channel");
self.perform(Command::ClearCommandChannel)?;
self.read_response(Status::CommandOk)?;
trace!("CCC OK");
self.reader = BufReader::new(DataStream::Tcp(self.reader.into_inner().into_tcp_stream()));
Ok(self)
}
pub fn cwd<S: AsRef<str>>(&mut self, path: S) -> FtpResult<()> {
debug!("Changing working directory to {}", path.as_ref());
self.perform(Command::Cwd(path.as_ref().to_string()))?;
self.read_response_in(&[Status::CommandOk, Status::RequestedFileActionOk])
.map(|_| ())
}
pub fn cdup(&mut self) -> FtpResult<()> {
debug!("Going to parent directory");
self.perform(Command::Cdup)?;
self.read_response_in(&[Status::CommandOk, Status::RequestedFileActionOk])
.map(|_| ())
}
pub fn pwd(&mut self) -> FtpResult<String> {
debug!("Getting working directory");
self.perform(Command::Pwd)?;
self.read_response(Status::PathCreated)
.and_then(|response| {
let body = response.as_string().map_err(|_| FtpError::BadResponse)?;
let status = response.status;
match (body.find('"'), body.rfind('"')) {
(Some(begin), Some(end)) if begin < end => Ok(body[begin + 1..end].to_string()),
_ => Err(FtpError::UnexpectedResponse(Response::new(
status,
response.body,
))),
}
})
}
pub fn noop(&mut self) -> FtpResult<()> {
debug!("Pinging server");
self.perform(Command::Noop)?;
self.read_response(Status::CommandOk).map(|_| ())
}
pub fn eprt(&mut self, address: SocketAddr) -> FtpResult<()> {
debug!("EPRT with address {address}");
self.perform(Command::Eprt(address))?;
self.read_response(Status::CommandOk).map(|_| ())
}
pub fn mkdir<S: AsRef<str>>(&mut self, pathname: S) -> FtpResult<()> {
debug!("Creating directory at {}", pathname.as_ref());
self.perform(Command::Mkd(pathname.as_ref().to_string()))?;
self.read_response(Status::PathCreated).map(|_| ())
}
pub fn transfer_type(&mut self, file_type: FileType) -> FtpResult<()> {
debug!("Setting transfer type {}", file_type);
self.perform(Command::Type(file_type))?;
self.read_response(Status::CommandOk).map(|_| ())
}
pub fn quit(&mut self) -> FtpResult<()> {
debug!("Quitting stream");
self.perform(Command::Quit)?;
self.read_response(Status::Closing).map(|_| ())
}
pub fn rename<S: AsRef<str>>(&mut self, from_name: S, to_name: S) -> FtpResult<()> {
debug!(
"Renaming '{}' to '{}'",
from_name.as_ref(),
to_name.as_ref()
);
self.perform(Command::RenameFrom(from_name.as_ref().to_string()))?;
self.read_response(Status::RequestFilePending)
.and_then(|_| {
self.perform(Command::RenameTo(to_name.as_ref().to_string()))?;
self.read_response(Status::RequestedFileActionOk)
.map(|_| ())
})
}
pub fn retr<F, D>(&mut self, file_name: &str, mut reader: F) -> FtpResult<D>
where
F: FnMut(&mut dyn Read) -> FtpResult<D>,
{
match self.retr_as_stream(file_name) {
Ok(mut stream) => {
let result = reader(&mut stream)?;
self.finalize_retr_stream(stream)?;
Ok(result)
}
Err(err) => Err(err),
}
}
pub fn retr_as_buffer(&mut self, file_name: &str) -> FtpResult<Cursor<Vec<u8>>> {
self.retr(file_name, |reader| {
let mut buffer = Vec::new();
reader
.read_to_end(&mut buffer)
.map(|_| buffer)
.map_err(FtpError::ConnectionError)
})
.map(Cursor::new)
}
pub fn retr_as_stream<S: AsRef<str>>(&mut self, file_name: S) -> FtpResult<DataStream<T>> {
debug!("Retrieving '{}'", file_name.as_ref());
let data_stream = self.data_command(Command::Retr(file_name.as_ref().to_string()))?;
self.read_response_in(&[Status::AboutToSend, Status::AlreadyOpen])?;
Ok(data_stream)
}
pub fn finalize_retr_stream(&mut self, stream: impl Read) -> FtpResult<()> {
debug!("Finalizing retr stream");
drop(stream);
self.data_connection_open = false;
trace!("dropped stream");
self.read_response_in(&[Status::ClosingDataConnection, Status::RequestedFileActionOk])
.map(|_| ())
}
pub fn rmdir<S: AsRef<str>>(&mut self, pathname: S) -> FtpResult<()> {
debug!("Removing directory {}", pathname.as_ref());
self.perform(Command::Rmd(pathname.as_ref().to_string()))?;
self.read_response(Status::RequestedFileActionOk)
.map(|_| ())
}
pub fn rm<S: AsRef<str>>(&mut self, filename: S) -> FtpResult<()> {
debug!("Removing file {}", filename.as_ref());
self.perform(Command::Dele(filename.as_ref().to_string()))?;
self.read_response(Status::RequestedFileActionOk)
.map(|_| ())
}
pub fn put_file<S: AsRef<str>, R: Read>(&mut self, filename: S, r: &mut R) -> FtpResult<u64> {
let mut data_stream = self.put_with_stream(filename.as_ref())?;
let bytes = copy(r, &mut data_stream).map_err(FtpError::ConnectionError)?;
self.finalize_put_stream(data_stream)?;
Ok(bytes)
}
pub fn put_with_stream<S: AsRef<str>>(&mut self, filename: S) -> FtpResult<DataStream<T>> {
debug!("Put file {}", filename.as_ref());
let stream = self.data_command(Command::Store(filename.as_ref().to_string()))?;
self.read_response_in(&[Status::AlreadyOpen, Status::AboutToSend])?;
Ok(stream)
}
pub fn finalize_put_stream(&mut self, stream: impl Write) -> FtpResult<()> {
debug!("Finalizing put stream");
drop(stream);
self.data_connection_open = false;
trace!("Stream dropped");
self.read_response_in(&[Status::ClosingDataConnection, Status::RequestedFileActionOk])
.map(|_| ())
}
pub fn append_with_stream<S: AsRef<str>>(&mut self, filename: S) -> FtpResult<DataStream<T>> {
debug!("Appending to file {}", filename.as_ref());
let stream = self.data_command(Command::Appe(filename.as_ref().to_string()))?;
self.read_response_in(&[Status::AlreadyOpen, Status::AboutToSend])?;
Ok(stream)
}
pub fn append_file<R: Read>(&mut self, filename: &str, r: &mut R) -> FtpResult<u64> {
let mut data_stream = self.append_with_stream(filename)?;
let bytes = copy(r, &mut data_stream).map_err(FtpError::ConnectionError)?;
self.finalize_put_stream(Box::new(data_stream))?;
Ok(bytes)
}
pub fn abort(&mut self, data_stream: impl Read + 'static) -> FtpResult<()> {
debug!("Aborting active file transfer");
self.perform(Command::Abor)?;
drop(data_stream);
self.data_connection_open = false;
trace!("dropped stream");
let response =
self.read_response_in(&[Status::ClosingDataConnection, Status::TransferAborted])?;
if response.status == Status::TransferAborted {
self.read_response(Status::ClosingDataConnection)?;
}
debug!("Transfer aborted");
Ok(())
}
pub fn resume_transfer(&mut self, offset: usize) -> FtpResult<()> {
debug!("Requesting to resume transfer at offset {}", offset);
self.perform(Command::Rest(offset))?;
self.read_response(Status::RequestFilePending)?;
debug!("Resume transfer accepted");
Ok(())
}
pub fn list(&mut self, pathname: Option<&str>) -> FtpResult<Vec<String>> {
debug!(
"Reading {} directory content",
pathname.unwrap_or("working")
);
self.stream_lines(
Command::List(pathname.map(|x| x.to_string())),
Status::AboutToSend,
)
}
pub fn nlst(&mut self, pathname: Option<&str>) -> FtpResult<Vec<String>> {
debug!(
"Getting file names for {} directory",
pathname.unwrap_or("working")
);
self.stream_lines(
Command::Nlst(pathname.map(|x| x.to_string())),
Status::AboutToSend,
)
}
pub fn mlsd(&mut self, pathname: Option<&str>) -> FtpResult<Vec<String>> {
debug!(
"Reading {} directory content",
pathname.unwrap_or("working")
);
self.stream_lines(
Command::Mlsd(pathname.map(|x| x.to_string())),
Status::AboutToSend,
)
}
pub fn mlst(&mut self, pathname: Option<&str>) -> FtpResult<String> {
debug!("Reading {} path information", pathname.unwrap_or("working"));
self.perform(Command::Mlst(pathname.map(|x| x.to_string())))?;
let response = self.read_response_in(&[Status::RequestedFileActionOk])?;
let response_str = String::from_utf8_lossy(&response.body).to_string();
match response_str.lines().nth(1) {
Some("") => Err(FtpError::BadResponse),
Some(line) => Ok(line.trim().to_string()),
None => Err(FtpError::BadResponse),
}
}
pub fn mdtm<S: AsRef<str>>(&mut self, pathname: S) -> FtpResult<NaiveDateTime> {
debug!("Getting modification time for {}", pathname.as_ref());
self.perform(Command::Mdtm(pathname.as_ref().to_string()))?;
let response: Response = self.read_response(Status::File)?;
let body = response.as_string().map_err(|_| FtpError::BadResponse)?;
match MDTM_RE.captures(&body) {
Some(caps) => {
let (year, month, day) = (
caps[1].parse::<i32>().map_err(|_| FtpError::BadResponse)?,
caps[2].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
caps[3].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
);
let (hour, minute, second) = (
caps[4].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
caps[5].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
caps[6].parse::<u32>().map_err(|_| FtpError::BadResponse)?,
);
let date = match NaiveDate::from_ymd_opt(year, month, day) {
Some(d) => d,
None => return Err(FtpError::BadResponse),
};
let time = match NaiveTime::from_hms_opt(hour, minute, second) {
Some(t) => t,
None => return Err(FtpError::BadResponse),
};
Ok(NaiveDateTime::new(date, time))
}
None => Err(FtpError::BadResponse),
}
}
pub fn size<S: AsRef<str>>(&mut self, pathname: S) -> FtpResult<usize> {
debug!("Getting file size for {}", pathname.as_ref());
self.perform(Command::Size(pathname.as_ref().to_string()))?;
let response: Response = self.read_response(Status::File)?;
let body = response.as_string().map_err(|_| FtpError::BadResponse)?;
match SIZE_RE.captures(&body) {
Some(caps) => caps[1].parse().map_err(|_| FtpError::BadResponse),
None => Err(FtpError::BadResponse),
}
}
pub fn feat(&mut self) -> FtpResult<Features> {
debug!("Getting server supported features");
self.perform(Command::Feat)?;
let response = self.read_response(Status::System)?;
let first_line = String::from_utf8_lossy(&response.body);
let mut feat_lines = vec![first_line.to_string()];
loop {
let mut line = Vec::new();
let line_sz = self.read_line(&mut line)?;
if line_sz == 0 {
break;
}
let line = String::from_utf8_lossy(&line);
trace!("FEAT IN: {:?}", line);
feat_lines.push(line.to_string());
if crate::command::feat::is_last_line(&line) {
break;
}
}
crate::command::feat::parse_features(&feat_lines)
}
pub fn opts(&mut self, option: impl ToString, value: Option<impl ToString>) -> FtpResult<()> {
debug!("Getting server supported features");
self.perform(Command::Opts(
option.to_string(),
value.map(|x| x.to_string()),
))?;
self.read_response(Status::CommandOk)?;
Ok(())
}
pub fn site(&mut self, command: impl ToString) -> FtpResult<Response> {
debug!("Sending SITE command: {}", command.to_string());
self.perform(Command::Site(command.to_string()))?;
self.read_response(Status::CommandOk)
}
pub fn custom_command(
&mut self,
command: impl ToString,
expected_code: &[Status],
) -> FtpResult<Response> {
let command = command.to_string();
debug!("Sending custom command: {}", command);
self.perform(Command::Custom(command))?;
self.read_response_in(expected_code)
}
pub fn custom_data_command(
&mut self,
command: impl ToString,
expected_code: &[Status],
) -> FtpResult<(Response, DataStream<T>)> {
let command = command.to_string();
debug!("Sending custom data command: {}", command);
let data_stream = self.data_command(Command::Custom(command))?;
let response = self.read_response_in(expected_code)?;
Ok((response, data_stream))
}
pub fn close_data_connection(&mut self, stream: impl Read) -> FtpResult<()> {
debug!("closing data connection");
drop(stream);
self.data_connection_open = false;
trace!("dropped stream");
self.read_response_in(&[Status::ClosingDataConnection, Status::RequestedFileActionOk])
.map(|_| ())
}
pub fn get_lines_from_stream(
data_stream: &mut BufReader<DataStream<T>>,
) -> FtpResult<Vec<String>> {
let mut lines: Vec<String> = Vec::new();
loop {
let mut line_buf = vec![];
match data_stream.read_until(b'\n', &mut line_buf) {
Ok(0) => break,
Ok(len) => {
let mut line = String::from_utf8_lossy(&line_buf[..len]).to_string();
trace!("STREAM IN: {:?}", line);
if line.ends_with('\n') {
line.pop();
}
if line.ends_with('\r') {
line.pop();
}
if line.is_empty() {
continue;
}
lines.push(line);
}
Err(err) => {
error!("failed to get lines from stream: {err}");
return Err(FtpError::BadResponse);
}
}
}
trace!("Lines from stream {:?}", lines);
Ok(lines)
}
fn read_response(&mut self, expected_code: Status) -> FtpResult<Response> {
self.read_response_in(&[expected_code])
}
fn read_response_in(&mut self, expected_code: &[Status]) -> FtpResult<Response> {
let mut line = Vec::new();
let mut body: Vec<u8> = Vec::new();
self.read_line(&mut line)?;
body.extend(line.iter());
trace!("CC IN: {:?}", line);
if line.len() < 5 {
return Err(FtpError::BadResponse);
}
let code_word: u32 = self.code_from_buffer(&line, 3)?;
let code = Status::from(code_word);
trace!("Code parsed from response: {} ({})", code, code_word);
let expected = [line[0], line[1], line[2], 0x20];
let alt_expected = if expected_code.contains(&Status::System) {
[line[0], line[1], line[2], b'-']
} else {
expected
};
trace!("CC IN: {:?}", line);
while line.len() < 5 || (line[0..4] != expected && line[0..4] != alt_expected) {
line.clear();
let bytes_read = self.read_line(&mut line)?;
if bytes_read == 0 {
return Err(FtpError::ConnectionError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"connection closed during multiline response",
)));
}
body.extend(line.iter());
trace!("CC IN: {:?}", line);
}
let response: Response = Response::new(code, body);
if expected_code.contains(&code) {
Ok(response)
} else {
Err(FtpError::UnexpectedResponse(response))
}
}
fn read_line(&mut self, line: &mut Vec<u8>) -> FtpResult<usize> {
self.reader
.read_until(0x0A, line.as_mut())
.map_err(FtpError::ConnectionError)?;
Ok(line.len())
}
fn code_from_buffer(&self, buf: &[u8], len: usize) -> Result<u32, FtpError> {
if buf.len() < len {
return Err(FtpError::BadResponse);
}
let buffer = buf[0..len].to_vec();
let as_string = String::from_utf8(buffer).map_err(|_| FtpError::BadResponse)?;
as_string.parse::<u32>().map_err(|_| FtpError::BadResponse)
}
fn perform(&mut self, command: Command) -> FtpResult<()> {
let command = command.to_string();
trace!("CC OUT: {}", command.trim_end_matches("\r\n"));
let stream = self.reader.get_mut();
stream
.write_all(command.as_bytes())
.map_err(FtpError::ConnectionError)
}
fn data_command(&mut self, cmd: Command) -> FtpResult<DataStream<T>> {
self.guard_multiple_data_connections()?;
let stream = match self.mode {
Mode::Active => self
.active()
.and_then(|listener| self.perform(cmd).map(|_| listener))
.and_then(|listener| {
let start = Instant::now();
loop {
match listener.accept() {
Ok((stream, _)) => break Ok(stream),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
if start.elapsed() > self.active_timeout {
break Err(FtpError::ConnectionError(
std::io::ErrorKind::WouldBlock.into(),
));
}
std::thread::sleep(Duration::from_millis(100));
}
Err(e) => break Err(FtpError::ConnectionError(e)),
}
}
})?,
Mode::ExtendedPassive => self
.epsv()
.and_then(|addr| self.perform(cmd).map(|_| addr))
.and_then(|addr| (self.passive_stream_builder)(addr))?,
Mode::Passive => self
.pasv()
.and_then(|addr| self.perform(cmd).map(|_| addr))
.and_then(|addr| (self.passive_stream_builder)(addr))?,
};
#[cfg(not(feature = "secure"))]
let result = Ok(DataStream::Tcp(stream));
#[cfg(feature = "secure")]
let result = match self.tls_ctx {
Some(ref tls_ctx) => tls_ctx
.connect(self.domain.as_ref().unwrap(), stream)
.map(|x| DataStream::Ssl(Box::new(x)))
.map_err(|e| FtpError::SecureError(format!("{e}"))),
None => Ok(DataStream::Tcp(stream)),
};
if result.is_ok() {
self.data_connection_open = true;
}
result
}
fn active(&mut self) -> FtpResult<TcpListener> {
debug!("Starting local tcp listener...");
let conn = TcpListener::bind("0.0.0.0:0").map_err(FtpError::ConnectionError)?;
conn.set_nonblocking(true)
.map_err(FtpError::ConnectionError)?;
let addr = conn.local_addr().map_err(FtpError::ConnectionError)?;
trace!("Local address is {}", addr);
let ip = match self.reader.get_mut() {
DataStream::Tcp(stream) => stream.local_addr().map_err(FtpError::ConnectionError)?.ip(),
DataStream::Ssl(stream) => stream
.get_ref()
.local_addr()
.map_err(FtpError::ConnectionError)?
.ip(),
};
debug!("Active mode, listening on {}:{}", ip, addr.port());
match ip {
std::net::IpAddr::V4(_) => {
let msb = addr.port() / 256;
let lsb = addr.port() % 256;
let ip_port = format!("{},{},{}", ip.to_string().replace('.', ","), msb, lsb);
debug!("Running PORT command");
self.perform(Command::Port(ip_port))?;
}
std::net::IpAddr::V6(_) => {
debug!("Running EPRT command");
self.perform(Command::Eprt(SocketAddr::new(ip, addr.port())))?;
}
}
self.read_response(Status::CommandOk)?;
Ok(conn)
}
fn epsv(&mut self) -> FtpResult<SocketAddr> {
debug!("EPSV command");
self.perform(Command::Epsv)?;
let response: Response = self.read_response(Status::ExtendedPassiveMode)?;
let response_str = response.as_string().map_err(|_| FtpError::BadResponse)?;
let caps = EPSV_PORT_RE
.captures(&response_str)
.ok_or_else(|| FtpError::UnexpectedResponse(response.clone()))?;
let new_port = caps[1].parse::<u16>().map_err(|_| FtpError::BadResponse)?;
trace!("Got port number from EPSV: {}", new_port);
let mut remote = self
.reader
.get_ref()
.get_ref()
.peer_addr()
.map_err(FtpError::ConnectionError)?;
remote.set_port(new_port);
trace!("Remote address for extended passive mode is {}", remote);
Ok(remote)
}
fn pasv(&mut self) -> FtpResult<SocketAddr> {
debug!("PASV command");
self.perform(Command::Pasv)?;
let response = self.read_response(Status::PassiveMode)?;
let addr = Self::parse_passive_address_from_response(response)?;
trace!("Passive address: {addr}",);
if self.nat_workaround {
let mut remote = self
.reader
.get_ref()
.get_ref()
.peer_addr()
.map_err(FtpError::ConnectionError)?;
remote.set_port(addr.port());
trace!("Replacing site local address {} with {}", addr, remote);
Ok(remote)
} else {
Ok(addr)
}
}
pub(crate) fn parse_passive_address_from_response(response: Response) -> FtpResult<SocketAddr> {
let response_str = response.as_string().map_err(|_| FtpError::BadResponse)?;
trace!("PASV response: {response_str}",);
let caps = PASV_PORT_RE
.captures(&response_str)
.ok_or_else(|| FtpError::UnexpectedResponse(response.clone()))?;
let (oct1, oct2, oct3, oct4) = (
caps[1].parse::<u8>().unwrap(),
caps[2].parse::<u8>().unwrap(),
caps[3].parse::<u8>().unwrap(),
caps[4].parse::<u8>().unwrap(),
);
let (msb, lsb) = (
caps[5].parse::<u8>().unwrap(),
caps[6].parse::<u8>().unwrap(),
);
let ip = Ipv4Addr::new(oct1, oct2, oct3, oct4);
let port = (u16::from(msb) << 8) | u16::from(lsb);
let addr = SocketAddr::new(ip.into(), port);
Ok(addr)
}
fn stream_lines(&mut self, cmd: Command, open_code: Status) -> FtpResult<Vec<String>> {
let mut data_stream = BufReader::new(self.data_command(cmd)?);
self.read_response_in(&[open_code, Status::AlreadyOpen])?;
let lines = Self::get_lines_from_stream(&mut data_stream);
self.finalize_retr_stream(data_stream)?;
lines
}
fn default_passive_stream_builder() -> Box<PassiveStreamBuilder> {
Box::new(|addr| TcpStream::connect(addr).map_err(FtpError::ConnectionError))
}
fn guard_multiple_data_connections(&self) -> FtpResult<()> {
if self.data_connection_open {
Err(FtpError::DataConnectionAlreadyOpen)
} else {
Ok(())
}
}
}
#[cfg(test)]
mod test {
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Arc;
#[cfg(feature = "secure")]
use pretty_assertions::assert_eq;
use rand::distr::Alphanumeric;
use rand::{Rng, rng};
use super::*;
use crate::FtpStream;
use crate::test_container::SyncPureFtpRunner;
use crate::types::FormatControl;
#[test]
fn connect() {
crate::log_init();
with_test_ftp_stream(|_stream| {});
}
#[test]
fn test_should_parse_passive_address_from_response() {
let response = vec![
50, 50, 55, 32, 69, 110, 116, 101, 114, 105, 110, 103, 32, 80, 97, 115, 115, 105, 118,
101, 32, 77, 111, 100, 101, 32, 40, 49, 50, 55, 44, 48, 44, 48, 44, 49, 44, 49, 49, 55,
44, 53, 54, 41, 13, 10,
];
let response = Response::new(Status::PassiveMode, response);
let address = FtpStream::parse_passive_address_from_response(response)
.expect("Failed to parse passive address");
assert_eq!(
address.ip(),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
"IP address is not correct"
);
assert_eq!(address.port(), 30008, "Port is not correct");
let response = vec![
50, 50, 55, 32, 69, 110, 116, 101, 114, 105, 110, 103, 32, 80, 97, 115, 115, 105, 118,
101, 32, 77, 111, 100, 101, 32, 40, 53, 56, 44, 50, 52, 55, 44, 57, 50, 44, 49, 50, 50,
44, 49, 52, 54, 44, 50, 51, 57, 41, 46, 13, 10,
];
let response = Response::new(Status::PassiveMode, response);
let address = FtpStream::parse_passive_address_from_response(response)
.expect("Failed to parse passive address");
assert_eq!(
address.ip(),
IpAddr::V4(Ipv4Addr::new(58, 247, 92, 122)),
"IP address is not correct"
);
assert_eq!(address.port(), 37615, "Port is not correct");
}
#[test]
fn should_change_mode() {
with_test_ftp_stream(|stream| {
assert_eq!(stream.mode, Mode::Passive);
stream.set_mode(Mode::Active);
assert_eq!(stream.mode, Mode::Active);
})
}
#[test]
fn should_connect_with_timeout() {
crate::log_init();
let container = SyncPureFtpRunner::start();
let port = container.get_ftp_port();
let url = format!("127.0.0.1:{port}");
let addr: SocketAddr = url.parse().expect("invalid hostname");
let mut stream = FtpStream::connect_timeout(addr, Duration::from_secs(15)).unwrap();
assert!(stream.login("test", "test").is_ok());
assert!(
stream
.get_welcome_msg()
.unwrap()
.contains("220 Welcome Alpine ftp server")
);
}
#[test]
fn welcome_message() {
crate::log_init();
with_test_ftp_stream(|stream| {
assert!(
stream
.get_welcome_msg()
.unwrap()
.contains("220 Welcome Alpine ftp server")
);
});
}
#[test]
fn should_set_passive_nat_workaround() {
with_test_ftp_stream(|stream| {
stream.set_passive_nat_workaround(true);
assert!(stream.nat_workaround);
});
}
#[test]
fn get_ref() {
use std::time::Duration;
with_test_ftp_stream(|stream| {
assert!(
stream
.get_ref()
.set_read_timeout(Some(Duration::from_secs(10)))
.is_ok()
);
});
}
#[test]
fn change_wrkdir() {
with_test_ftp_stream(|stream| {
let wrkdir: String = stream.pwd().unwrap();
assert!(stream.cwd("/").is_ok());
assert_eq!(stream.pwd().unwrap().as_str(), "/");
assert!(stream.cwd(wrkdir.as_str()).is_ok());
})
}
#[test]
fn cd_up() {
with_test_ftp_stream(|stream| {
let wrkdir: String = stream.pwd().unwrap();
assert!(stream.cdup().is_ok());
assert_eq!(stream.pwd().unwrap().as_str(), "/home/test");
assert!(stream.cwd(wrkdir.as_str()).is_ok());
})
}
#[test]
fn noop() {
with_test_ftp_stream(|stream| {
assert!(stream.noop().is_ok());
})
}
#[test]
fn make_and_remove_dir() {
with_test_ftp_stream(|stream| {
assert!(stream.mkdir("omar").is_ok());
match stream.mkdir("omar").err().unwrap() {
FtpError::UnexpectedResponse(Response { status, body: _ }) => {
assert_eq!(status, Status::FileUnavailable)
}
err => panic!("Expected UnexpectedResponse, got {}", err),
}
assert!(stream.rmdir("omar").is_ok());
})
}
#[test]
fn set_transfer_type() {
with_test_ftp_stream(|stream| {
assert!(stream.transfer_type(FileType::Binary).is_ok());
assert!(
stream
.transfer_type(FileType::Ascii(FormatControl::Default))
.is_ok()
);
})
}
#[test]
fn test_should_list_files_with_non_utf8_names() {
with_test_ftp_stream(|stream| {
let files = stream
.nlst(Some("/home/test/invalid-utf8/"))
.expect("Failed to list files");
assert_eq!(files.len(), 1);
let files = stream
.list(Some("/home/test/invalid-utf8/"))
.expect("Failed to list files");
assert_eq!(files.len(), 1);
crate::list::File::from_str(files[0].as_str()).expect("Failed to parse file");
});
}
#[test]
fn should_transfer_file() {
with_test_ftp_stream(|stream| {
assert!(stream.transfer_type(FileType::Binary).is_ok());
let file_data = "test data\n";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("test.txt", &mut reader).is_ok());
assert_eq!(
stream
.retr_as_buffer("test.txt")
.map(|bytes| bytes.into_inner())
.unwrap(),
file_data.as_bytes()
);
assert_eq!(stream.size("test.txt").unwrap(), 10);
assert!(stream.size("omarone.txt").is_err());
assert_eq!(stream.list(None).unwrap().len(), 1);
assert_eq!(stream.nlst(None).unwrap().as_slice(), &["test.txt"]);
assert!(stream.mdtm("test.txt").is_ok());
assert!(stream.rm("test.txt").is_ok());
assert!(stream.mdtm("test.txt").is_err());
let file_data = "test data\n";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("test.txt", &mut reader).is_ok());
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.append_file("test.txt", &mut reader).is_ok());
let mut reader = stream.retr_as_stream("test.txt").unwrap();
let mut buffer = Vec::new();
assert!(reader.read_to_end(&mut buffer).is_ok());
assert!(stream.finalize_retr_stream(Box::new(reader)).is_ok());
assert_eq!(buffer.as_slice(), "test data\ntest data\n".as_bytes());
assert!(stream.rename("test.txt", "toast.txt").is_ok());
assert!(stream.rm("toast.txt").is_ok());
assert_eq!(stream.list(None).unwrap().len(), 0);
})
}
#[test]
fn should_get_feat_and_set_opts() {
with_test_ftp_stream(|stream| {
let features = stream.feat().expect("Failed to get features");
assert!(features.contains_key("UTF8"));
assert!(stream.opts("UTF8", Some("ON")).is_ok());
})
}
#[test]
fn should_resume_transfer() {
crate::log_init();
let container = Arc::new(SyncPureFtpRunner::start());
let port = container.get_ftp_port();
let url = format!("localhost:{port}");
let mut stream: FtpStream = setup_stream(&url, &container);
assert!(stream.transfer_type(FileType::Binary).is_ok());
let wrkdir = stream.pwd().unwrap();
let mut transfer_stream = stream.put_with_stream("test.bin").unwrap();
assert_eq!(
transfer_stream
.write(&[0x00, 0x01, 0x02, 0x03, 0x04])
.unwrap(),
5
);
drop(stream);
drop(transfer_stream);
let mut stream = setup_stream(&url, &container);
assert!(stream.cwd(wrkdir).is_ok());
assert!(stream.transfer_type(FileType::Binary).is_ok());
assert!(stream.resume_transfer(5).is_ok());
let mut transfer_stream = stream.put_with_stream("test.bin").unwrap();
assert_eq!(
transfer_stream
.write(&[0x05, 0x06, 0x07, 0x08, 0x09, 0x0a])
.unwrap(),
6
);
assert!(stream.finalize_put_stream(transfer_stream).is_ok());
assert!(stream.rm("test.bin").is_ok());
finalize_stream(stream);
}
#[test]
fn should_transfer_with_extended_passive_mode() {
with_test_ftp_stream(|stream| {
assert!(stream.transfer_type(FileType::Binary).is_ok());
stream.set_mode(Mode::ExtendedPassive);
let file_data = "test data\n";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("test.txt", &mut reader).is_ok());
assert!(stream.rm("test.txt").is_ok());
})
}
#[test]
fn test_should_perform_custom_command() {
with_test_ftp_stream(|stream| {
let command = "PWD";
assert!(
stream
.custom_command(command, &[Status::PathCreated])
.is_ok()
);
});
}
#[test]
fn test_should_perform_custom_data_command() {
with_test_ftp_stream(|stream| {
let command = "LIST";
let (response, data_stream) = stream
.custom_data_command(command, &[Status::AboutToSend])
.expect("Failed to perform custom data command");
assert_eq!(response.status, Status::AboutToSend);
let mut reader = BufReader::new(data_stream);
FtpStream::get_lines_from_stream(&mut reader).expect("Failed to get lines from stream");
assert!(stream.close_data_connection(reader).is_ok());
});
}
#[test]
fn test_abort_transfer() {
crate::log_init();
let container = Arc::new(SyncPureFtpRunner::start());
let port = container.get_ftp_port();
let url = format!("localhost:{port}");
let mut stream: FtpStream = setup_stream(&url, &container);
assert!(stream.transfer_type(FileType::Binary).is_ok());
let file_data = "test data for abort\n";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("abort_test.txt", &mut reader).is_ok());
let data_stream = stream.retr_as_stream("abort_test.txt").unwrap();
assert!(stream.abort(data_stream).is_ok());
drop(stream);
}
#[test]
fn test_retr_with_callback() {
with_test_ftp_stream(|stream| {
assert!(stream.transfer_type(FileType::Binary).is_ok());
let file_data = "hello callback";
let mut reader = Cursor::new(file_data.as_bytes());
assert!(stream.put_file("callback.txt", &mut reader).is_ok());
let result = stream
.retr("callback.txt", |reader| {
let mut buf = Vec::new();
reader
.read_to_end(&mut buf)
.map_err(FtpError::ConnectionError)?;
Ok(buf)
})
.unwrap();
assert_eq!(result, file_data.as_bytes());
assert!(stream.rm("callback.txt").is_ok());
})
}
#[test]
fn test_append_with_stream() {
with_test_ftp_stream(|stream| {
assert!(stream.transfer_type(FileType::Binary).is_ok());
let mut reader = Cursor::new("part1".as_bytes());
assert!(stream.put_file("append_stream.txt", &mut reader).is_ok());
let mut data_stream = stream.append_with_stream("append_stream.txt").unwrap();
data_stream.write_all(b"part2").unwrap();
stream.finalize_put_stream(data_stream).unwrap();
let content = stream
.retr_as_buffer("append_stream.txt")
.unwrap()
.into_inner();
assert_eq!(content, b"part1part2");
assert!(stream.rm("append_stream.txt").is_ok());
})
}
#[test]
fn test_active_mode_builder() {
crate::log_init();
let stream = FtpStream::connect("test.rebex.net:21").unwrap();
let stream = stream.active_mode(Duration::from_secs(30));
assert_eq!(stream.mode, Mode::Active);
assert_eq!(stream.active_timeout, Duration::from_secs(30));
}
#[test]
fn test_passive_stream_builder() {
crate::log_init();
let stream = FtpStream::connect("test.rebex.net:21").unwrap();
let stream = stream.passive_stream_builder(|addr| {
TcpStream::connect(addr).map_err(FtpError::ConnectionError)
});
assert_eq!(stream.mode, Mode::Passive);
}
#[test]
fn test_parse_passive_address_bad_response() {
let response = Response::new(Status::PassiveMode, vec![0xff, 0xfe]);
assert!(FtpStream::parse_passive_address_from_response(response).is_err());
let response = Response::new(
Status::PassiveMode,
"227 No passive mode info here".as_bytes().to_vec(),
);
assert!(FtpStream::parse_passive_address_from_response(response).is_err());
}
#[test]
fn test_should_prevent_multiple_data_connections() {
with_test_ftp_stream(|stream| {
let command = "LIST";
let (_, data_stream) = stream
.custom_data_command(command, &[Status::AboutToSend])
.expect("Failed to perform custom data command");
let reader = BufReader::new(data_stream);
match stream.custom_data_command(command, &[Status::AboutToSend]) {
Err(FtpError::DataConnectionAlreadyOpen) => {}
_ => panic!("Expected DataConnectionAlreadyOpen error"),
}
match stream.retr_as_stream("somefile.txt") {
Err(FtpError::DataConnectionAlreadyOpen) => {}
_ => panic!("Expected DataConnectionAlreadyOpen error"),
}
match stream.put_with_stream("somefile.txt") {
Err(FtpError::DataConnectionAlreadyOpen) => {}
_ => panic!("Expected DataConnectionAlreadyOpen error"),
}
match stream.append_with_stream("somefile.txt") {
Err(FtpError::DataConnectionAlreadyOpen) => {}
_ => panic!("Expected DataConnectionAlreadyOpen error"),
}
assert!(stream.close_data_connection(reader).is_ok());
});
}
#[test]
fn test_should_free_data_connection_after_close() {
with_test_ftp_stream(|stream| {
let command = "LIST";
let (response, data_stream) = stream
.custom_data_command(command, &[Status::AboutToSend])
.expect("Failed to perform custom data command");
assert_eq!(response.status, Status::AboutToSend);
let mut reader = BufReader::new(data_stream);
FtpStream::get_lines_from_stream(&mut reader).expect("Failed to get lines from stream");
assert!(stream.close_data_connection(reader).is_ok());
let (response, data_stream) = stream
.custom_data_command(command, &[Status::AboutToSend])
.expect("Failed to perform custom data command");
assert_eq!(response.status, Status::AboutToSend);
let mut reader = BufReader::new(data_stream);
FtpStream::get_lines_from_stream(&mut reader).expect("Failed to get lines from stream");
assert!(stream.close_data_connection(reader).is_ok());
});
}
#[test]
fn test_site_command() {
with_test_ftp_stream(|stream| {
let result = stream.site("HELP");
match result {
Ok(response) => {
assert_eq!(response.status, Status::CommandOk);
}
Err(FtpError::UnexpectedResponse(_)) => {
}
Err(err) => panic!("Unexpected error: {}", err),
}
})
}
#[test]
fn test_eprt_command() {
with_test_ftp_stream(|stream| {
let addr: SocketAddr = "127.0.0.1:12345".parse().unwrap();
let _ = stream.eprt(addr);
})
}
#[test]
fn test_transfer_type_variants() {
with_test_ftp_stream(|stream| {
assert!(
stream
.transfer_type(FileType::Ascii(FormatControl::NonPrint))
.is_ok()
);
assert!(stream.transfer_type(FileType::Image).is_ok());
let _ = stream.transfer_type(FileType::Local(8));
})
}
#[test]
fn test_cwd_error() {
with_test_ftp_stream(|stream| match stream.cwd("/nonexistent/directory/path") {
Err(FtpError::UnexpectedResponse(response)) => {
assert_eq!(response.status, Status::FileUnavailable);
}
_ => panic!("Expected UnexpectedResponse for nonexistent directory"),
})
}
#[test]
fn test_rm_nonexistent_file() {
with_test_ftp_stream(|stream| match stream.rm("nonexistent_file.txt") {
Err(FtpError::UnexpectedResponse(_)) => {}
_ => panic!("Expected error when removing nonexistent file"),
})
}
#[test]
fn test_rmdir_nonexistent() {
with_test_ftp_stream(|stream| match stream.rmdir("nonexistent_dir") {
Err(FtpError::UnexpectedResponse(_)) => {}
_ => panic!("Expected error when removing nonexistent directory"),
})
}
#[test]
fn test_rename_nonexistent() {
with_test_ftp_stream(
|stream| match stream.rename("nonexistent.txt", "new_name.txt") {
Err(FtpError::UnexpectedResponse(_)) => {}
_ => panic!("Expected error when renaming nonexistent file"),
},
)
}
#[test]
fn test_retr_nonexistent_file() {
with_test_ftp_stream(
|stream| match stream.retr_as_buffer("nonexistent_file.txt") {
Err(FtpError::UnexpectedResponse(_)) => {}
_ => panic!("Expected error when retrieving nonexistent file"),
},
)
}
#[test]
fn test_list_nonexistent_path() {
with_test_ftp_stream(|stream| {
let _ = stream.list(Some("/nonexistent/path"));
})
}
#[test]
fn test_nlst_with_path() {
with_test_ftp_stream(|stream| {
let mut reader = Cursor::new("data".as_bytes());
assert!(stream.put_file("nlst_test.txt", &mut reader).is_ok());
let files = stream.nlst(None).unwrap();
assert!(files.contains(&"nlst_test.txt".to_string()));
assert!(stream.rm("nlst_test.txt").is_ok());
})
}
#[test]
fn test_list_with_explicit_current_dir() {
with_test_ftp_stream(|stream| {
let mut reader = Cursor::new("data".as_bytes());
assert!(stream.put_file("list_test.txt", &mut reader).is_ok());
let files = stream.list(Some(".")).unwrap();
assert!(!files.is_empty());
assert!(stream.rm("list_test.txt").is_ok());
})
}
fn with_test_ftp_stream<F>(f: F)
where
F: FnOnce(&mut FtpStream),
{
crate::log_init();
let container = Arc::new(SyncPureFtpRunner::start());
let port = container.get_ftp_port();
let mut stream: FtpStream = setup_stream(&format!("localhost:{port}"), &container);
f(&mut stream);
finalize_stream(stream);
drop(container);
}
fn setup_stream(url: &str, container: &Arc<SyncPureFtpRunner>) -> FtpStream {
let mut ftp_stream = FtpStream::connect(url).unwrap();
assert!(ftp_stream.login("test", "test").is_ok());
let tempdir: String = generate_tempdir();
assert!(ftp_stream.mkdir(tempdir.as_str()).is_ok());
assert!(ftp_stream.cwd(tempdir.as_str()).is_ok());
let container_t = container.clone();
ftp_stream.passive_stream_builder(move |addr| {
let mut addr = addr.clone();
let port = addr.port();
let mapped = container_t.get_mapped_port(port);
addr.set_port(mapped);
info!("mapped port {port} to {mapped} for PASV");
TcpStream::connect(addr).map_err(FtpError::ConnectionError)
})
}
fn finalize_stream(mut stream: FtpStream) {
assert!(stream.quit().is_ok());
}
fn generate_tempdir() -> String {
let mut rng = rng();
let name: String = std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.map(char::from)
.take(5)
.collect();
format!("temp_{}", name)
}
fn is_send<T: Send>(_send: T) {}
fn is_sync<T: Sync>(_sync: T) {}
#[test]
#[ignore = "just needs to compile"]
fn test_ftp_stream_should_be_send() {
crate::log_init();
let ftp_stream = FtpStream::connect("test.rebex.net:21")
.unwrap()
.passive_stream_builder(|addr| {
println!("Connecting to {}", addr);
TcpStream::connect(addr).map_err(FtpError::ConnectionError)
});
is_send::<FtpStream>(ftp_stream);
}
#[test]
#[ignore = "just needs to compile"]
fn test_ftp_stream_should_be_sync() {
crate::log_init();
let ftp_stream = FtpStream::connect("test.rebex.net:21")
.unwrap()
.passive_stream_builder(|addr| {
println!("Connecting to {}", addr);
TcpStream::connect(addr).map_err(FtpError::ConnectionError)
});
is_sync::<FtpStream>(ftp_stream);
}
}