use std::borrow::{Borrow, Cow};
use std::collections::{HashMap, HashSet};
use std::fmt::format;
use std::net::{IpAddr, SocketAddr};
use std::string::String;
use chrono::offset::TimeZone;
use chrono::{DateTime, Utc};
use regex::Regex;
use tokio::io::{
copy, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufStream,
};
use tokio::net::{TcpStream, ToSocketAddrs};
#[cfg(feature = "ftps")]
use tokio_rustls::{client::TlsStream, rustls::ClientConfig, rustls::ServerName, TlsConnector};
use crate::ftp::connection::Connection;
use crate::ftp::{Command, ftp_reply, MODES, REPLY_CODE_LEN};
use crate::ftp::types::{FileType, FtpError, Result};
use crate::StringExt;
lazy_static::lazy_static! {
static ref PORT_RE: Regex = Regex::new(r"\((\d+),(\d+),(\d+),(\d+),(\d+),(\d+)\)").unwrap();
static ref MDTM_RE: Regex = Regex::new(r"\b(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\b").unwrap();
static ref SIZE_RE: Regex = Regex::new(r"\s+(\d+)\s*$").unwrap();
static ref PROT_COMMAND_VALUE: Vec<&'static str> = vec!["C","E","S","P"];
}
pub struct FtpClient {
stream: BufStream<Connection>,
welcome_msg: Option<String>,
_reply_code: u32,
_reply_string: Option<String>,
_reply_lines: Vec<String>,
#[cfg(feature = "ftps")]
ssl_cfg: Option<(ClientConfig, ServerName)>,
features_map: HashMap<String, Vec<String>>,
}
impl FtpClient {
fn new(stream: TcpStream) -> Self {
FtpClient {
stream: BufStream::new(Connection::Tcp(stream)),
#[cfg(feature = "ftps")]
ssl_cfg: None,
welcome_msg: None,
_reply_code: 0,
_reply_string: None,
features_map: HashMap::new(),
_reply_lines: vec![],
}
}
#[cfg(feature = "ftps")]
fn new_tls_client(stream: TlsStream<TcpStream>) -> Self {
FtpClient {
stream: BufStream::new(Connection::Ssl(stream)),
ssl_cfg: None,
welcome_msg: None,
_reply_code: 0,
_reply_string: None,
features_map: HashMap::new(),
_reply_lines: vec![],
}
}
pub fn init_default(&mut self) {}
pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpClient> {
let stream = TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)?;
let mut ftp_client = FtpClient::new(stream);
ftp_client.read_reply().await?;
ftp_client.check_response(ftp_reply::READY)?;
ftp_client.welcome_msg = Some(ftp_client._reply_string.clone().unwrap());
Ok(ftp_client)
}
#[cfg(feature = "ftps")]
pub async fn into_secure(
mut self,
config: ClientConfig,
domain: ServerName,
) -> Result<FtpClient> {
self.exe_auth_tls().await?;
let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
let stream = connector
.connect(domain.clone(), self.stream.into_inner().into_tcp_stream())
.await
.map_err(|e| FtpError::SecureError(format!("{}", e)))?;
let mut ftps_client = FtpClient::new_tls_client(stream);
ftps_client.ssl_cfg = Some((config, domain));
ftps_client.send_command(Command::PBSZ, Some("0")).await?;
ftps_client.check_response(ftp_reply::COMMAND_OK)?;
ftps_client.send_command(Command::PROT, Some("P")).await?;
ftps_client.check_response(ftp_reply::COMMAND_OK)?;
Ok(ftps_client)
}
#[cfg(feature = "ftps")]
async fn exe_auth_tls(mut self) -> Result<()> {
self.send_command(Command::AUTH, Some("TLS")).await?;
self.check_response_in(&[ftp_reply::AUTH_OK, ftp_reply::SECURITY_MECHANISM_IS_OK])
}
#[cfg(feature = "ftps")]
pub async fn exe_auth(mut self, mechanism: &str) -> Result<u32> {
Ok(self.send_command(Command::AUTH, Some(mechanism)).await?)
}
#[cfg(feature = "ftps")]
pub async fn exec_pbsz(mut self, pbsz: u64) -> Result<()> {
if pbsz < 0 || 4294967295 < pbsz {
Err(FtpError::InvalidArgument(format!(
"Invalid pbsz value. The correct value should be: {} to {}",
0, "(2^32)-1"
)))
}
self.send_command(Command::PBSZ, Some(pbsz.to_string().as_str()))
.await?;
self.check_response_in(&[ftp_reply::COMMAND_OK])?;
Ok(())
}
#[cfg(feature = "ftps")]
pub async fn exec_adat(mut self, data: Option<&[u8]>) -> Result<u32> {
let mut args = None;
if data.is_some() {
args = Some(base64::encode(data).as_str());
}
Ok(self.send_command(Command::ADAT, args).await?)
}
#[cfg(feature = "ftps")]
pub async fn exec_conf(mut self, data: Option<&[u8]>) -> Result<u32> {
let mut args = None;
if data.is_some() {
args = Some(base64::encode(data).as_str());
}
Ok(self.send_command(Command::CONF, args).await?)
}
#[cfg(feature = "ftps")]
pub async fn exec_enc(mut self, data: Option<&[u8]>) -> Result<u32> {
let mut args = None;
if data.is_some() {
args = Some(base64::encode(data).as_str());
}
Ok(self.send_command(Command::ENC, args).await?)
}
#[cfg(feature = "ftps")]
pub async fn exec_mic(mut self, data: Option<&[u8]>) -> Result<u32> {
let mut args = None;
if data.is_some() {
args = Some(base64::encode(data).as_str());
}
Ok(self.send_command(Command::MIC, args).await?)
}
#[cfg(feature = "ftps")]
pub async fn exec_prot(mut self, prot: &mut str) -> Result<()> {
let mut p = prot.as_mut();
if p.is_empty() {
p = &mut "C";
}
if !PROT_COMMAND_VALUE.contains(&&*p) {
Err(FtpError::InvalidArgument(format!(
"Unsupported prot command value",
)))
}
self.send_command(Command::PROT, Some(p)).await?;
self.check_response_in(&[ftp_reply::COMMAND_OK])?;
Ok(())
}
#[cfg(feature = "ftps")]
pub async fn into_insecure(mut self) -> Result<FtpClient> {
self.send_command(Command::CCC, None).await?;
if self._reply_code == ftp_reply::COMMAND_OK {
Ok(FtpClient::new(self.stream.into_inner().into_tcp_stream()))
}
Err(FtpError::InvalidResponse(format!(
"Expected code {:?}, got response: {}",
ftp_reply::COMMAND_OK,
self._reply_string.unwrap()
)))
}
async fn data_command(&mut self, cmd: &str) -> Result<Connection> {
let addr = self.pasv().await?;
let stream = TcpStream::connect(addr)
.await
.map_err(FtpError::ConnectionError)?;
#[cfg(feature = "ftps")]
match &self.ssl_cfg {
Some((config, domain)) => {
let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
return connector
.connect(domain.to_owned(), stream)
.await
.map(|stream| Connection::Ssl(stream))
.map_err(|e| FtpError::SecureError(format!("{}", e)));
}
_ => {}
};
self.write_str(cmd).await?;
self.read_reply().await?;
Ok(Connection::Tcp(stream))
}
pub fn get_ref(&self) -> &TcpStream {
self.stream.get_ref().get_ref()
}
pub fn get_welcome_msg(&self) -> Option<&str> {
self.welcome_msg.as_deref()
}
pub async fn login(&mut self, user: &str, password: &str) -> Result<bool> {
self.send_command(Command::USER, Some(user)).await?;
if ftp_reply::is_positive_completion(self._reply_code) {
return Ok(true);
} else if !ftp_reply::is_positive_intermediate(self._reply_code) {
return Ok(false);
}
self.send_command(Command::PASS, Some(password)).await?;
Ok(ftp_reply::is_positive_completion(self._reply_code))
}
pub async fn cwd(&mut self, path: &str) -> Result<bool> {
self.send_command(Command::CWD, Some(path)).await?;
Ok(ftp_reply::is_positive_completion(self._reply_code))
}
pub async fn cdup(&mut self) -> Result<bool> {
self.send_command(Command::CDUP, None).await?;
Ok(ftp_reply::is_positive_completion(self._reply_code))
}
pub async fn pwd(&mut self) -> Result<String> {
self.send_command(Command::PWD, None).await?;
match &self._reply_string {
None => {
let cause = format!("Cannot get PWD Response from FTP server");
Err(FtpError::InvalidResponse(cause))
}
Some(content) => match (content.find('"'), content.rfind('"')) {
(Some(begin), Some(end)) if begin < end => Ok(content[begin + 1..end].to_string()),
_ => {
let cause = format!("Invalid PWD Response: {}", content);
Err(FtpError::InvalidResponse(cause))
}
},
}
}
pub async fn noop(&mut self) -> Result<bool> {
self.send_command(Command::NOOP, None).await?;
Ok(ftp_reply::is_positive_completion(self._reply_code))
}
pub async fn make_directory(&mut self, pathname: &str) -> Result<bool> {
match ftp_reply::is_positive_completion(self.mkd(pathname).await?) {
true => Ok(true),
false => {
return Err(FtpError::InvalidResponse(format!(
"Got error reply: {}",
self._reply_string.as_ref().unwrap()
)));
}
}
}
pub async fn mkd(&mut self, pathname: &str) -> Result<u32> {
Ok(self.send_command(Command::MKD, Some(pathname)).await?)
}
pub async fn acct(&mut self, account: &str) -> Result<u32> {
Ok(self.send_command(Command::ACCT, Some(account)).await?)
}
pub async fn abor(&mut self) -> Result<u32> {
Ok(self.send_command(Command::ABOR, None).await?)
}
pub async fn rein(&mut self) -> Result<u32> {
Ok(self.send_command(Command::REIN, None).await?)
}
pub async fn smnt(&mut self, dir: &str) -> Result<u32> {
Ok(self.send_command(Command::SMNT, Some(dir)).await?)
}
pub async fn epsv(&mut self) -> Result<u32> {
Ok(self.send_command(Command::EPSV, None).await?)
}
pub async fn type_cmd(&mut self, file_type: u32) -> Result<u32> {
let s = MODES.substring(file_type as usize, (file_type + 1) as usize);
Ok(self.send_command(Command::TYPE, Some(s)).await?)
}
pub async fn stru(&mut self, structure: u32) -> Result<u32> {
let s = MODES.substring(structure as usize, (structure + 1) as usize);
Ok(self.send_command(Command::STRU, Some(s)).await?)
}
pub async fn mode(&mut self, mode: u32) -> Result<u32> {
let s = MODES.substring(mode as usize, (mode + 1) as usize);
Ok(self.send_command(Command::MODE, Some(s)).await?)
}
pub async fn stou(&mut self) -> Result<u32> {
Ok(self.send_command(Command::STOU, None).await?)
}
pub async fn stou_pathname(&mut self, pathname: &str) -> Result<u32> {
Ok(self.send_command(Command::STOU, Some(pathname)).await?)
}
pub async fn appe(&mut self, pathname: &str) -> Result<u32> {
Ok(self.send_command(Command::APPE, Some(pathname)).await?)
}
pub async fn allo(&mut self, bytes: u32) -> Result<u32> {
Ok(self
.send_command(Command::ALLO, Some(bytes.to_string().as_str()))
.await?)
}
pub async fn allo_record_size(&mut self, bytes: u32, record_size: u32) -> Result<u32> {
let args = format!(
"{} R {}",
bytes.to_string().as_str(),
record_size.to_string().as_str()
);
Ok(self
.send_command(Command::ALLO, Some(args.as_str()))
.await?)
}
pub async fn port(&mut self, host: IpAddr, port: u16) -> Result<u32> {
let mut args = String::with_capacity(24);
args.push_str(host.to_string().replace('.', ",").as_str());
args.push_str(",");
args.push_str((port >> 8).to_string().as_str());
args.push_str(",");
args.push_str((port & 0xff).to_string().as_str());
Ok(self
.send_command(Command::PORT, Some(args.as_str()))
.await?)
}
pub async fn eprt(&mut self, host: IpAddr, port: u16) -> Result<u32> {
let mut args = String::new();
let mut h = host.to_string();
let n = h.find("%").unwrap_or(0);
if n > 0 {
h = h.substring(0, n).to_string();
}
args.push_str("|");
match host {
IpAddr::V4(addr) => args.push_str("1"),
IpAddr::V6(addr) => args.push_str("2"),
}
args.push_str("|");
args.push_str(h.as_str());
args.push_str("|");
args.push_str(port.to_string().as_str());
args.push_str("|");
Ok(self
.send_command(Command::EPRT, Some(args.as_str()))
.await?)
}
pub async fn mfmt(&mut self, pathname: &str, timeval: &str) -> Result<u32> {
Ok(self
.send_command(
Command::MFMT,
Some(format!("{} {}", timeval, pathname).as_str()),
)
.await?)
}
async fn pasv(&mut self) -> Result<SocketAddr> {
self.send_command(Command::PASV, None).await?;
self.check_response(ftp_reply::PASSIVE_MODE)?;
let reply_str = self._reply_string.clone().unwrap();
let reply_str = reply_str.as_str();
PORT_RE
.captures(reply_str)
.ok_or_else(|| {
FtpError::InvalidResponse(format!("Invalid PASV response: {}", reply_str))
})
.and_then(|caps| {
let (oct1, oct2, oct3, oct4) = (
caps[1].parse::<u8>().unwrap(),
caps[2].parse::<u8>().unwrap(),
caps[3].parse::<u8>().unwrap(),
caps[4].parse::<u8>().unwrap(),
);
let (msb, lsb) = (
caps[5].parse::<u8>().unwrap(),
caps[6].parse::<u8>().unwrap(),
);
let port = ((msb as u16) << 8) + lsb as u16;
use std::net::{IpAddr, Ipv4Addr};
let ip = if (oct1, oct2, oct3, oct4) == (0, 0, 0, 0) {
self.get_ref()
.peer_addr()
.map_err(FtpError::ConnectionError)?
.ip()
} else {
IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
};
Ok(SocketAddr::new(ip, port))
})
}
pub async fn transfer_type(&mut self, file_type: FileType) -> Result<bool> {
self.send_command(Command::TYPE, Some(file_type.to_string().as_str()))
.await?;
Ok(ftp_reply::is_positive_completion(self._reply_code))
}
pub async fn logout(&mut self) -> Result<bool> {
self.send_command(Command::QUIT, None).await?;
Ok(ftp_reply::is_positive_completion(self._reply_code))
}
pub async fn restart_from(&mut self, offset: u64) -> Result<bool> {
self.send_command(Command::REST, Some(offset.to_string().as_str()))
.await?;
Ok(ftp_reply::is_positive_intermediate(self._reply_code))
}
pub async fn get(&mut self, file_name: &str) -> Result<BufStream<Connection>> {
let retr_command = format!("RETR {}\r\n", file_name);
let data_stream = BufStream::new(self.data_command(&retr_command).await?);
self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
Ok(data_stream)
}
pub async fn rename(&mut self, from_name: &str, to_name: &str) -> Result<bool> {
self.send_command(Command::RNFR, Some(from_name)).await?;
if !ftp_reply::is_positive_intermediate(self._reply_code) {
return Ok(false);
}
self.send_command(Command::RNTO, Some(to_name)).await?;
Ok(ftp_reply::is_positive_completion(self._reply_code))
}
pub async fn retr<F, T, P, E>(&mut self, filename: &str, reader: F) -> std::result::Result<T, E>
where
F: Fn(BufStream<Connection>) -> P,
P: std::future::Future<Output = std::result::Result<T, E>>,
E: From<FtpError>,
{
let retr_command = format!("{} {}\r\n", Command::RETR.cmd_name(), filename);
let data_stream = BufStream::new(self.data_command(&retr_command).await?);
self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
let res = reader(data_stream).await?;
Ok(res)
}
pub async fn simple_retr(&mut self, file_name: &str) -> Result<std::io::Cursor<Vec<u8>>> {
async fn do_read(mut reader: BufStream<Connection>) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
reader
.read_to_end(&mut buffer)
.await
.map_err(FtpError::ConnectionError)?;
Ok(buffer)
}
let buffer = self.retr(file_name, do_read).await?;
Ok(std::io::Cursor::new(buffer))
}
pub async fn remove_directory(&mut self, pathname: &str) -> Result<bool> {
Ok(ftp_reply::is_positive_completion(self.rmd(pathname).await?))
}
pub async fn rmd(&mut self, pathname: &str) -> Result<u32> {
Ok(self.send_command(Command::RMD, Some(pathname)).await?)
}
pub async fn delete_file(&mut self, filename: &str) -> Result<bool> {
Ok(ftp_reply::is_positive_completion(
self.dele(filename).await?,
))
}
pub async fn dele(&mut self, filename: &str) -> Result<u32> {
Ok(self.send_command(Command::DELE, Some(filename)).await?)
}
async fn put_file<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
let stor_command = format!("{} {}\r\n", Command::STOR, filename);
let mut data_stream = BufStream::new(self.data_command(&stor_command).await?);
self.check_response_in(&[ftp_reply::ALREADY_OPEN, ftp_reply::ABOUT_TO_SEND])?;
copy(r, &mut data_stream)
.await
.map_err(FtpError::ConnectionError)?;
Ok(())
}
pub async fn send_command(&mut self, cmd: Command, agrs: Option<&str>) -> Result<u32> {
let mut ftp_cmd = format!("{}\r\n", cmd.cmd_name());
if agrs.is_some() {
ftp_cmd = format!("{} {}\r\n", cmd.cmd_name(), agrs.unwrap());
}
self.write_str(ftp_cmd).await?;
self.read_reply().await?;
Ok(self._reply_code)
}
async fn read_reply(&mut self) -> Result<()> {
self._reply_lines.clear();
self._reply_string = None;
let mut line = String::new();
self.stream
.read_line(&mut line)
.await
.map_err(FtpError::ConnectionError)?;
if line.len() < REPLY_CODE_LEN {
return Err(FtpError::InvalidResponse(format!(
"Truncated server reply: {}",
line
)));
}
if line.len() < 5 {
return Err(FtpError::InvalidResponse(
"error: could not read reply code".to_owned(),
));
}
let reply_code: u32 = line[0..3].parse().map_err(|_err| {
FtpError::InvalidResponse(format!(
"Could not parse reply code. \n Server Reply: {}",
line
))
})?;
self._reply_code = reply_code;
self._reply_lines.push(line.as_str().to_string());
let expected = format!("{} ", &line[0..3]);
while line.len() < 5 || line[0..4] != expected {
line.clear();
if let Err(e) = self.stream.read_line(&mut line).await {
return Err(FtpError::ConnectionError(e));
}
self._reply_lines.push(line.as_str().to_string());
}
let mut s = String::new();
for x in self._reply_lines.iter() {
s.push_str(x.as_str())
}
self._reply_string = Some(s);
Ok(())
}
pub async fn put<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
self.put_file(filename, r).await?;
self.check_response_in(&[
ftp_reply::CLOSING_DATA_CONNECTION,
ftp_reply::REQUESTED_FILE_ACTION_OK,
])?;
Ok(())
}
async fn list_command(
&mut self,
cmd: Cow<'_, str>,
open_code: u32,
close_code: &[u32],
) -> Result<Vec<String>> {
let data_stream = BufStream::new(self.data_command(&cmd).await?);
self.check_response_in(&[open_code, ftp_reply::ALREADY_OPEN])?;
let lines = Self::get_lines_from_stream(data_stream).await?;
self.check_response_in(close_code)?;
Ok(lines)
}
async fn get_lines_from_stream<R>(data_stream: R) -> Result<Vec<String>>
where
R: AsyncBufRead + Unpin,
{
let mut lines: Vec<String> = Vec::new();
let mut lines_stream = data_stream.lines();
loop {
let line = lines_stream
.next_line()
.await
.map_err(FtpError::ConnectionError)?;
match line {
Some(line) => {
if line.is_empty() {
continue;
}
lines.push(line);
}
None => break Ok(lines),
}
}
}
pub async fn list(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
let command = pathname.map_or("LIST\r\n".into(), |path| {
format!("LIST {}\r\n", path).into()
});
self.list_command(
command,
ftp_reply::ABOUT_TO_SEND,
&[
ftp_reply::CLOSING_DATA_CONNECTION,
ftp_reply::REQUESTED_FILE_ACTION_OK,
],
)
.await
}
pub async fn nlst(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
let command = pathname.map_or("NLST\r\n".into(), |path| {
format!("NLST {}\r\n", path).into()
});
self.list_command(
command,
ftp_reply::ABOUT_TO_SEND,
&[
ftp_reply::CLOSING_DATA_CONNECTION,
ftp_reply::REQUESTED_FILE_ACTION_OK,
],
)
.await
}
pub async fn mdtm(&mut self, pathname: &str) -> Result<Option<DateTime<Utc>>> {
self.send_command(Command::MDTM, Some(pathname)).await?;
self.check_response(ftp_reply::FILE)?;
let reply_str = self._reply_string.clone().unwrap();
let reply_str = reply_str.as_str();
match MDTM_RE.captures(reply_str) {
Some(caps) => {
let (year, month, day) = (
caps[1].parse::<i32>().unwrap(),
caps[2].parse::<u32>().unwrap(),
caps[3].parse::<u32>().unwrap(),
);
let (hour, minute, second) = (
caps[4].parse::<u32>().unwrap(),
caps[5].parse::<u32>().unwrap(),
caps[6].parse::<u32>().unwrap(),
);
Ok(Some(
Utc.ymd(year, month, day).and_hms(hour, minute, second),
))
}
None => Ok(None),
}
}
pub async fn size(&mut self, pathname: &str) -> Result<Option<usize>> {
self.send_command(Command::SIZE, Some(pathname)).await?;
self.check_response(ftp_reply::FILE)?;
let reply_str = self._reply_string.clone().unwrap();
let reply_str = reply_str.as_str();
match SIZE_RE.captures(reply_str) {
Some(caps) => Ok(Some(caps[1].parse().unwrap())),
None => Ok(None),
}
}
pub async fn feat(&mut self) -> Result<u32> {
self._reply_lines.clear();
Ok(self.send_command(Command::FEAT, None).await?)
}
pub async fn features(&mut self, cmd: Command) -> Result<Option<Vec<String>>> {
let features = Vec::new();
if self.init_feature_map().await? {
let values = self.features_map.get(cmd.cmd_name());
if values.is_some() {
return Ok(Some(values.unwrap().clone()));
}
}
Ok(Some(features))
}
async fn init_feature_map(&mut self) -> Result<bool> {
if self.features_map.is_empty() {
let reply_code = self.feat().await?;
if reply_code == ftp_reply::NOT_LOGGED_IN.into() {
return Ok(false);
}
let success = ftp_reply::is_positive_completion(reply_code);
if !success {
return Ok(false);
}
for l in self._reply_lines.iter() {
if l.starts_with(" ") {
let mut key = "";
let mut value = "";
let s = &l[1..l.len() - 1];
let varsep = s.find(' ');
if varsep.is_some() {
key = &l[1..varsep.unwrap() + 1];
value = &l[varsep.unwrap() + 1..l.len()];
} else {
key = &l[1..l.len() - 1]
}
let entries = self.features_map.get_mut(key);
match entries {
None => {
let mut features = vec![];
features.push(String::from(value));
self.features_map.insert(key.to_string(), features);
}
Some(features) => {
features.push(value.to_string());
}
}
}
}
}
Ok(true)
}
async fn write_str<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
let conn = self.stream.get_mut();
conn.write_all(command.as_ref().as_bytes())
.await
.map_err(FtpError::ConnectionError)
}
pub fn check_response(&mut self, expected_code: u32) -> Result<()> {
self.check_response_in(&[expected_code])
}
pub fn check_response_in(&mut self, expected_code: &[u32]) -> Result<()> {
let reply_string = self._reply_string.clone();
if expected_code.iter().any(|ec| self._reply_code == *ec) {
Ok(())
} else {
Err(FtpError::InvalidResponse(format!(
"Expected code {:?}, got response: {}",
expected_code,
reply_string.unwrap().as_str()
)))
}
}
}
#[cfg(test)]
mod tests {
use tokio_stream::once;
use tokio_util::io::StreamReader;
use super::FtpClient;
#[tokio::test]
async fn list_command_dos_newlines() {
let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
b"Hello\r\nWorld\r\n\r\nBe\r\nHappy\r\n" as &[u8],
)));
assert_eq!(
FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
["Hello", "World", "Be", "Happy"]
.iter()
.map(<&str>::to_string)
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn list_command_unix_newlines() {
let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
b"Hello\nWorld\n\nBe\nHappy\n" as &[u8],
)));
assert_eq!(
FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
["Hello", "World", "Be", "Happy"]
.iter()
.map(<&str>::to_string)
.collect::<Vec<_>>()
);
}
}