use super::data_stream::DataStream;
use super::status;
use super::types::{FileType, FtpError, Line, Result};
use chrono::offset::TimeZone;
use chrono::{DateTime, Utc};
#[cfg(feature = "secure")]
use openssl::ssl::{Ssl, SslContext};
use regex::Regex;
use std::borrow::Cow;
use std::io::{copy, BufRead, BufReader, BufWriter, Cursor, Read, Write};
use std::net::ToSocketAddrs;
use std::net::{SocketAddr, TcpStream};
use std::str::FromStr;
use std::string::String;
lazy_static! {
static ref PORT_RE: Regex = Regex::new(r"\((\d+),(\d+),(\d+),(\d+),(\d+),(\d+)\)").unwrap();
static ref MDTM_RE: Regex = Regex::new(r"\b(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\b").unwrap();
static ref SIZE_RE: Regex = Regex::new(r"\s+(\d+)\s*$").unwrap();
}
#[derive(Debug)]
pub struct FtpStream {
reader: BufReader<DataStream>,
#[cfg(feature = "secure")]
ssl_cfg: Option<SslContext>,
}
impl FtpStream {
#[cfg(not(feature = "secure"))]
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpStream> {
TcpStream::connect(addr)
.map_err(FtpError::ConnectionError)
.and_then(|stream| {
let mut ftp_stream = FtpStream {
reader: BufReader::new(DataStream::Tcp(stream)),
};
ftp_stream.read_response(status::READY).map(|_| ftp_stream)
})
}
#[cfg(feature = "secure")]
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpStream> {
TcpStream::connect(addr)
.map_err(FtpError::ConnectionError)
.and_then(|stream| {
let mut ftp_stream = FtpStream {
reader: BufReader::new(DataStream::Tcp(stream)),
ssl_cfg: None,
};
ftp_stream.read_response(status::READY).map(|_| ftp_stream)
})
}
#[cfg(feature = "secure")]
pub fn into_secure(mut self, ssl_context: SslContext) -> Result<FtpStream> {
self.write_str("AUTH TLS\r\n")?;
self.read_response(status::AUTH_OK)?;
let ssl_cfg = Ssl::new(&ssl_context).map_err(|e| FtpError::SecureError(e.to_string()))?;
let stream = ssl_cfg
.connect(self.reader.into_inner().into_tcp_stream())
.map_err(|e| FtpError::SecureError(e.to_string()))?;
let mut secured_ftp_tream = FtpStream {
reader: BufReader::new(DataStream::Ssl(stream)),
ssl_cfg: Some(ssl_context),
};
secured_ftp_tream.write_str("PBSZ 0\r\n")?;
secured_ftp_tream.read_response(status::COMMAND_OK)?;
secured_ftp_tream.write_str("PROT P\r\n")?;
secured_ftp_tream.read_response(status::COMMAND_OK)?;
Ok(secured_ftp_tream)
}
#[cfg(feature = "secure")]
pub fn into_insecure(mut self) -> Result<FtpStream> {
self.write_str("CCC\r\n")?;
self.read_response(status::COMMAND_OK)?;
let plain_ftp_stream = FtpStream {
reader: BufReader::new(DataStream::Tcp(self.reader.into_inner().into_tcp_stream())),
ssl_cfg: None,
};
Ok(plain_ftp_stream)
}
#[cfg(not(feature = "secure"))]
fn data_command(&mut self, cmd: &str) -> Result<DataStream> {
self
.pasv()
.and_then(|addr| self.write_str(cmd).map(|_| addr))
.and_then(|addr| TcpStream::connect(addr).map_err(FtpError::ConnectionError))
.map(DataStream::Tcp)
}
#[cfg(feature = "secure")]
fn data_command(&mut self, cmd: &str) -> Result<DataStream> {
self
.pasv()
.and_then(|addr| self.write_str(cmd).map(|_| addr))
.and_then(|addr| TcpStream::connect(addr).map_err(FtpError::ConnectionError))
.and_then(|stream| match self.ssl_cfg {
Some(ref ssl) => Ssl::new(ssl)
.unwrap()
.connect(stream)
.map(DataStream::Ssl)
.map_err(|e| FtpError::SecureError(e.to_string())),
None => Ok(DataStream::Tcp(stream)),
})
}
pub fn get_ref(&self) -> &TcpStream {
self.reader.get_ref().get_ref()
}
pub fn login(&mut self, user: &str, password: &str) -> Result<()> {
self.write_str(format!("USER {}\r\n", user))?;
self
.read_response_in(&[status::LOGGED_IN, status::NEED_PASSWORD])
.and_then(|Line(code, _)| {
if code == status::NEED_PASSWORD {
self.write_str(format!("PASS {}\r\n", password))?;
self.read_response(status::LOGGED_IN)?;
}
Ok(())
})
}
pub fn cwd(&mut self, path: &str) -> Result<()> {
self.write_str(format!("CWD {}\r\n", path))?;
self
.read_response(status::REQUESTED_FILE_ACTION_OK)
.map(|_| ())
}
pub fn cdup(&mut self) -> Result<()> {
self.write_str("CDUP\r\n")?;
self
.read_response_in(&[status::COMMAND_OK, status::REQUESTED_FILE_ACTION_OK])
.map(|_| ())
}
pub fn pwd(&mut self) -> Result<String> {
self.write_str("PWD\r\n")?;
self
.read_response(status::PATH_CREATED)
.and_then(
|Line(_, content)| match (content.find('"'), content.rfind('"')) {
(Some(begin), Some(end)) if begin < end => Ok(content[begin + 1..end].to_string()),
_ => {
let cause = format!("Invalid PWD Response: {}", content);
Err(FtpError::InvalidResponse(cause))
}
},
)
}
pub fn noop(&mut self) -> Result<()> {
self.write_str("NOOP\r\n")?;
self.read_response(status::COMMAND_OK).map(|_| ())
}
pub fn mkdir(&mut self, pathname: &str) -> Result<()> {
self.write_str(format!("MKD {}\r\n", pathname))?;
self.read_response(status::PATH_CREATED).map(|_| ())
}
fn pasv(&mut self) -> Result<SocketAddr> {
self.write_str("PASV\r\n")?;
let Line(_, line) = self.read_response(status::PASSIVE_MODE)?;
PORT_RE
.captures(&line)
.ok_or_else(|| FtpError::InvalidResponse(format!("Invalid PASV response: {}", line)))
.and_then(|caps| {
let (oct1, oct2, oct3, oct4) = (
caps[1].parse::<u8>().unwrap(),
caps[2].parse::<u8>().unwrap(),
caps[3].parse::<u8>().unwrap(),
caps[4].parse::<u8>().unwrap(),
);
let (msb, lsb) = (
caps[5].parse::<u8>().unwrap(),
caps[6].parse::<u8>().unwrap(),
);
let port = ((msb as u16) << 8) + lsb as u16;
let addr = format!("{}.{}.{}.{}:{}", oct1, oct2, oct3, oct4, port);
SocketAddr::from_str(&addr).map_err(FtpError::InvalidAddress)
})
}
pub fn transfer_type(&mut self, file_type: FileType) -> Result<()> {
let type_command = format!("TYPE {}\r\n", file_type.to_string());
self.write_str(&type_command)?;
self.read_response(status::COMMAND_OK).map(|_| ())
}
pub fn quit(&mut self) -> Result<()> {
self.write_str("QUIT\r\n")?;
self.read_response(status::CLOSING).map(|_| ())
}
pub fn get(&mut self, file_name: &str) -> Result<BufReader<DataStream>> {
let retr_command = format!("RETR {}\r\n", file_name);
let data_stream = BufReader::new(self.data_command(&retr_command)?);
self
.read_response(status::ABOUT_TO_SEND)
.map(|_| data_stream)
}
pub fn rename(&mut self, from_name: &str, to_name: &str) -> Result<()> {
self.write_str(format!("RNFR {}\r\n", from_name))?;
self
.read_response(status::REQUEST_FILE_PENDING)
.and_then(|_| {
self.write_str(format!("RNTO {}\r\n", to_name))?;
self
.read_response(status::REQUESTED_FILE_ACTION_OK)
.map(|_| ())
})
}
pub fn retr<F, T>(&mut self, filename: &str, reader: F) -> Result<T>
where
F: Fn(&mut dyn Read) -> Result<T>,
{
let retr_command = format!("RETR {}\r\n", filename);
{
let mut data_stream = BufReader::new(self.data_command(&retr_command)?);
self
.read_response_in(&[status::ABOUT_TO_SEND, status::ALREADY_OPEN])
.and_then(|_| reader(&mut data_stream))
}
.and_then(|res| {
self
.read_response_in(&[
status::CLOSING_DATA_CONNECTION,
status::REQUESTED_FILE_ACTION_OK,
])
.map(|_| res)
})
}
pub fn simple_retr(&mut self, file_name: &str) -> Result<Cursor<Vec<u8>>> {
self
.retr(file_name, |reader| {
let mut buffer = Vec::new();
reader
.read_to_end(&mut buffer)
.map(|_| buffer)
.map_err(FtpError::ConnectionError)
})
.map(Cursor::new)
}
pub fn rmdir(&mut self, pathname: &str) -> Result<()> {
self.write_str(format!("RMD {}\r\n", pathname))?;
self
.read_response(status::REQUESTED_FILE_ACTION_OK)
.map(|_| ())
}
pub fn rm(&mut self, filename: &str) -> Result<()> {
self.write_str(format!("DELE {}\r\n", filename))?;
self
.read_response(status::REQUESTED_FILE_ACTION_OK)
.map(|_| ())
}
fn put_file<R: Read>(&mut self, filename: &str, r: &mut R) -> Result<()> {
let stor_command = format!("STOR {}\r\n", filename);
let mut data_stream = BufWriter::new(self.data_command(&stor_command)?);
self.read_response_in(&[status::ALREADY_OPEN, status::ABOUT_TO_SEND])?;
copy(r, &mut data_stream)
.map_err(FtpError::ConnectionError)
.map(|_| ())
}
pub fn put<R: Read>(&mut self, filename: &str, r: &mut R) -> Result<()> {
self.put_file(filename, r)?;
self
.read_response_in(&[
status::CLOSING_DATA_CONNECTION,
status::REQUESTED_FILE_ACTION_OK,
])
.map(|_| ())
}
pub fn start_put_file(&mut self, filename: &str) -> Result<BufWriter<DataStream>> {
let stor_command = format!("STOR {}\r\n", filename);
let data_stream = BufWriter::new(self.data_command(&stor_command)?);
self.read_response_in(&[status::ALREADY_OPEN, status::ABOUT_TO_SEND])?;
Ok(data_stream)
}
pub fn finish_put_file(&mut self) -> Result<()> {
self
.read_response_in(&[
status::CLOSING_DATA_CONNECTION,
status::REQUESTED_FILE_ACTION_OK,
])
.map(|_| ())
}
fn list_command(
&mut self,
cmd: Cow<'static, str>,
open_code: u32,
close_code: &[u32],
) -> Result<Vec<String>> {
let mut lines: Vec<String> = Vec::new();
{
let mut data_stream = BufReader::new(self.data_command(&cmd)?);
self.read_response_in(&[open_code, status::ALREADY_OPEN])?;
let mut line = String::new();
loop {
match data_stream.read_to_string(&mut line) {
Ok(0) => break,
Ok(_) => lines.extend(
line
.split("\r\n")
.map(String::from)
.filter(|s| !s.is_empty()),
),
Err(err) => return Err(FtpError::ConnectionError(err)),
};
}
}
self.read_response_in(close_code).map(|_| lines)
}
pub fn list(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
let command = pathname.map_or("LIST\r\n".into(), |path| {
format!("LIST {}\r\n", path).into()
});
self.list_command(
command,
status::ABOUT_TO_SEND,
&[
status::CLOSING_DATA_CONNECTION,
status::REQUESTED_FILE_ACTION_OK,
],
)
}
pub fn nlst(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
let command = pathname.map_or("NLST\r\n".into(), |path| {
format!("NLST {}\r\n", path).into()
});
self.list_command(
command,
status::ABOUT_TO_SEND,
&[
status::CLOSING_DATA_CONNECTION,
status::REQUESTED_FILE_ACTION_OK,
],
)
}
pub fn mdtm(&mut self, pathname: &str) -> Result<Option<DateTime<Utc>>> {
self.write_str(format!("MDTM {}\r\n", pathname))?;
let Line(_, content) = self.read_response(status::FILE)?;
match MDTM_RE.captures(&content) {
Some(caps) => {
let (year, month, day) = (
caps[1].parse::<i32>().unwrap(),
caps[2].parse::<u32>().unwrap(),
caps[3].parse::<u32>().unwrap(),
);
let (hour, minute, second) = (
caps[4].parse::<u32>().unwrap(),
caps[5].parse::<u32>().unwrap(),
caps[6].parse::<u32>().unwrap(),
);
Ok(Some(
Utc.ymd(year, month, day).and_hms(hour, minute, second),
))
}
None => Ok(None),
}
}
pub fn size(&mut self, pathname: &str) -> Result<Option<usize>> {
self.write_str(format!("SIZE {}\r\n", pathname))?;
let Line(_, content) = self.read_response(status::FILE)?;
match SIZE_RE.captures(&content) {
Some(caps) => Ok(Some(caps[1].parse().unwrap())),
None => Ok(None),
}
}
fn write_str<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
if cfg!(feature = "debug_print") {
print!("CMD {}", command.as_ref());
}
let stream = self.reader.get_mut();
stream
.write_all(command.as_ref().as_bytes())
.map_err(FtpError::ConnectionError)
}
pub fn read_response(&mut self, expected_code: u32) -> Result<Line> {
self.read_response_in(&[expected_code])
}
pub fn read_response_in(&mut self, expected_code: &[u32]) -> Result<Line> {
let mut line = String::new();
self
.reader
.read_line(&mut line)
.map_err(FtpError::ConnectionError)?;
if cfg!(feature = "debug_print") {
print!("FTP {}", line);
}
if line.len() < 5 {
return Err(FtpError::InvalidResponse(
"error: could not read reply code".to_owned(),
));
}
let code: u32 = line[0..3].parse().map_err(|err| {
FtpError::InvalidResponse(format!("error: could not parse reply code: {}", err))
})?;
let expected = format!("{} ", &line[0..3]);
while line.len() < 5 || line[0..4] != expected {
line.clear();
if let Err(e) = self.reader.read_line(&mut line) {
return Err(FtpError::ConnectionError(e));
}
if cfg!(feature = "debug_print") {
print!("FTP {}", line);
}
}
if expected_code.iter().any(|ec| code == *ec) {
Ok(Line(code, line))
} else {
Err(FtpError::InvalidResponse(format!(
"Expected code {:?}, got response: {}",
expected_code, line
)))
}
}
}