use std::borrow::Cow;
use std::io::{Read, BufRead, BufReader, BufWriter, Cursor, Write, copy};
#[cfg(feature = "secure")]
use std::error::Error;
use std::net::{TcpStream, SocketAddr};
use std::string::String;
use std::str::FromStr;
use std::net::ToSocketAddrs;
use regex::Regex;
use chrono::{DateTime, UTC};
use chrono::offset::TimeZone;
#[cfg(feature = "secure")]
use openssl::ssl::{ SslContext, Ssl };
use super::data_stream::DataStream;
use super::status;
use super::types::{FileType, FtpError, Line, Result};
lazy_static! {
static ref PORT_RE: Regex = Regex::new(r"\((\d+),(\d+),(\d+),(\d+),(\d+),(\d+)\)").unwrap();
static ref MDTM_RE: Regex = Regex::new(r"\b(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\b").unwrap();
static ref SIZE_RE: Regex = Regex::new(r"\s+(\d+)\s*$").unwrap();
}
#[derive(Debug)]
pub struct FtpStream {
reader: BufReader<DataStream>,
#[cfg(feature = "secure")]
ssl_cfg: Option<SslContext>,
}
impl FtpStream {
#[cfg(not(feature = "secure"))]
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpStream> {
TcpStream::connect(addr)
.map_err(|e| FtpError::ConnectionError(e))
.and_then(|stream| {
let mut ftp_stream = FtpStream {
reader: BufReader::new(DataStream::Tcp(stream)),
};
ftp_stream.read_response(status::READY)
.map(|_| ftp_stream)
})
}
#[cfg(feature = "secure")]
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpStream> {
TcpStream::connect(addr)
.map_err(|e| FtpError::ConnectionError(e))
.and_then(|stream| {
let mut ftp_stream = FtpStream {
reader: BufReader::new(DataStream::Tcp(stream)),
ssl_cfg: None,
};
ftp_stream.read_response(status::READY)
.map(|_| ftp_stream)
})
}
#[cfg(feature = "secure")]
pub fn into_secure(mut self, ssl_context: SslContext) -> Result<FtpStream> {
try!(self.write_str("AUTH TLS\r\n"));
try!(self.read_response(status::AUTH_OK));
let ssl_cfg = try!(Ssl::new(&ssl_context).map_err(|e| FtpError::SecureError(e.description().to_owned())));
let stream = try!(ssl_cfg.connect(self.reader.into_inner().into_tcp_stream()).map_err(|e| FtpError::SecureError(e.description().to_owned())));
let mut secured_ftp_tream = FtpStream {
reader: BufReader::new(DataStream::Ssl(stream)),
ssl_cfg: Some(ssl_context)
};
try!(secured_ftp_tream.write_str("PBSZ 0\r\n"));
try!(secured_ftp_tream.read_response(status::COMMAND_OK));
try!(secured_ftp_tream.write_str("PROT P\r\n"));
try!(secured_ftp_tream.read_response(status::COMMAND_OK));
Ok(secured_ftp_tream)
}
#[cfg(feature = "secure")]
pub fn into_insecure(mut self) -> Result<FtpStream> {
try!(self.write_str("CCC\r\n"));
try!(self.read_response(status::COMMAND_OK));
let plain_ftp_stream = FtpStream {
reader: BufReader::new(DataStream::Tcp(self.reader.into_inner().into_tcp_stream())),
ssl_cfg: None,
};
Ok(plain_ftp_stream)
}
#[cfg(not(feature = "secure"))]
fn data_command(&mut self, cmd: &str) -> Result<DataStream> {
self.pasv()
.and_then(|addr| self.write_str(cmd).map(|_| addr))
.and_then(|addr| TcpStream::connect(addr)
.map_err(|e| FtpError::ConnectionError(e)))
.map(|stream| DataStream::Tcp(stream))
}
#[cfg(feature = "secure")]
fn data_command(&mut self, cmd: &str) -> Result<DataStream> {
self.pasv()
.and_then(|addr| self.write_str(cmd).map(|_| addr))
.and_then(|addr| TcpStream::connect(addr).map_err(|e| FtpError::ConnectionError(e)))
.and_then(|stream| {
match self.ssl_cfg {
Some(ref ssl) => {
Ssl::new(ssl).unwrap().connect(stream)
.map(|stream| DataStream::Ssl(stream))
.map_err(|e| FtpError::SecureError(e.description().to_owned()))
},
None => Ok(DataStream::Tcp(stream))
}
})
}
pub fn get_ref(&self) -> &TcpStream {
self.reader.get_ref().get_ref()
}
pub fn login(&mut self, user: &str, password: &str) -> Result<()> {
try!(self.write_str(format!("USER {}\r\n", user)));
self.read_response_in(&[status::LOGGED_IN, status::NEED_PASSWORD])
.and_then(|Line(code, _)| {
if code == status::NEED_PASSWORD {
try!(self.write_str(format!("PASS {}\r\n", password)));
try!(self.read_response(status::LOGGED_IN));
}
Ok(())
})
}
pub fn cwd(&mut self, path: &str) -> Result<()> {
try!(self.write_str(format!("CWD {}\r\n", path)));
self.read_response(status::REQUESTED_FILE_ACTION_OK).map(|_| ())
}
pub fn cdup(&mut self) -> Result<()> {
try!(self.write_str("CDUP\r\n"));
self.read_response_in(&[status::COMMAND_OK, status::REQUESTED_FILE_ACTION_OK]).map(|_| ())
}
pub fn pwd(&mut self) -> Result<String> {
try!(self.write_str("PWD\r\n"));
self.read_response(status::PATH_CREATED)
.and_then(|Line(_, content)| {
match (content.find('"'), content.rfind('"')) {
(Some(begin), Some(end)) if begin < end => {
Ok(content[begin + 1 .. end].to_string())
},
_ => {
let cause = format!("Invalid PWD Response: {}", content);
Err(FtpError::InvalidResponse(cause))
}
}
})
}
pub fn noop(&mut self) -> Result<()> {
try!(self.write_str("NOOP\r\n"));
self.read_response(status::COMMAND_OK).map(|_| ())
}
pub fn mkdir(&mut self, pathname: &str) -> Result<()> {
try!(self.write_str(format!("MKD {}\r\n", pathname)));
self.read_response(status::PATH_CREATED).map(|_| ())
}
fn pasv(&mut self) -> Result<SocketAddr> {
try!(self.write_str("PASV\r\n"));
let Line(_, line) = try!(self.read_response(status::PASSIVE_MODE));
PORT_RE.captures(&line)
.ok_or(FtpError::InvalidResponse(format!("Invalid PASV response: {}", line)))
.and_then(|caps| {
let (oct1, oct2, oct3, oct4) = (
caps[1].parse::<u8>().unwrap(),
caps[2].parse::<u8>().unwrap(),
caps[3].parse::<u8>().unwrap(),
caps[4].parse::<u8>().unwrap()
);
let (msb, lsb) = (
caps[5].parse::<u8>().unwrap(),
caps[6].parse::<u8>().unwrap()
);
let port = ((msb as u16) << 8) + lsb as u16;
let addr = format!("{}.{}.{}.{}:{}", oct1, oct2, oct3, oct4, port);
SocketAddr::from_str(&addr)
.map_err(|parse_err| FtpError::InvalidAddress(parse_err))
})
}
pub fn transfer_type(&mut self, file_type: FileType) -> Result<()> {
let type_command = format!("TYPE {}\r\n", file_type.to_string());
try!(self.write_str(&type_command));
self.read_response(status::COMMAND_OK).map(|_| ())
}
pub fn quit(&mut self) -> Result<()> {
try!(self.write_str("QUIT\r\n"));
self.read_response(status::CLOSING).map(|_| ())
}
pub fn get(&mut self, file_name: &str) -> Result<BufReader<DataStream>> {
let retr_command = format!("RETR {}\r\n", file_name);
let data_stream = BufReader::new(try!(self.data_command(&retr_command)));
self.read_response(status::ABOUT_TO_SEND).map(|_| data_stream)
}
pub fn rename(&mut self, from_name: &str, to_name: &str) -> Result<()> {
try!(self.write_str(format!("RNFR {}\r\n", from_name)));
self.read_response(status::REQUEST_FILE_PENDING)
.and_then(|_| {
try!(self.write_str(format!("RNTO {}\r\n", to_name)));
self.read_response(status::REQUESTED_FILE_ACTION_OK).map(|_| ())
})
}
pub fn retr<F, T>(&mut self, filename: &str, reader: F) -> Result<T>
where F: Fn(&mut Read) -> Result<T> {
let retr_command = format!("RETR {}\r\n", filename);
{
let mut data_stream = BufReader::new(try!(self.data_command(&retr_command)));
self.read_response_in(&[status::ABOUT_TO_SEND, status::ALREADY_OPEN])
.and_then(|_| reader(&mut data_stream))
}.and_then(|res|
self.read_response_in(&[status::CLOSING_DATA_CONNECTION,status::REQUESTED_FILE_ACTION_OK]).map(|_| res))
}
pub fn simple_retr(&mut self, file_name: &str) -> Result<Cursor<Vec<u8>>> {
self.retr(file_name, |reader| {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).map(|_| buffer).map_err(|read_err| FtpError::ConnectionError(read_err))
}).map(|buffer| Cursor::new(buffer))
}
pub fn rmdir(&mut self, pathname: &str) -> Result<()> {
try!(self.write_str(format!("RMD {}\r\n", pathname)));
self.read_response(status::REQUESTED_FILE_ACTION_OK).map(|_| ())
}
pub fn rm(&mut self, filename: &str) -> Result<()> {
try!(self.write_str(format!("DELE {}\r\n", filename)));
self.read_response(status::REQUESTED_FILE_ACTION_OK).map(|_| ())
}
fn put_file<R: Read>(&mut self, filename: &str, r: &mut R) -> Result<()> {
let stor_command = format!("STOR {}\r\n", filename);
let mut data_stream = BufWriter::new(try!(self.data_command(&stor_command)));
try!(self.read_response_in(&[status::ALREADY_OPEN, status::ABOUT_TO_SEND]));
copy(r, &mut data_stream)
.map_err(|read_err| FtpError::ConnectionError(read_err))
.map(|_| ())
}
pub fn put<R: Read>(&mut self, filename: &str, r: &mut R) -> Result<()> {
try!(self.put_file(filename, r));
self.read_response_in(&[status::CLOSING_DATA_CONNECTION,status::REQUESTED_FILE_ACTION_OK])
.map(|_| ())
}
fn list_command(&mut self, cmd: Cow<'static, str>, open_code: u32, close_code: &[u32]) -> Result<Vec<String>> {
let mut lines: Vec<String> = Vec::new();
{
let mut data_stream = BufReader::new(try!(self.data_command(&cmd)));
try!(self.read_response_in(&[open_code, status::ALREADY_OPEN]));
let mut line = String::new();
loop {
match data_stream.read_to_string(&mut line) {
Ok(0) => break,
Ok(_) => lines.extend(line.split("\r\n").into_iter().map(|s| String::from(s)).filter(|s| s.len() > 0)),
Err(err) => return Err(FtpError::ConnectionError(err)),
};
}
}
self.read_response_in(close_code).map(|_| lines)
}
pub fn list(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
let command = pathname.map_or("LIST\r\n".into(), |path| format!("LIST {}\r\n", path).into());
self.list_command(command, status::ABOUT_TO_SEND, &[status::CLOSING_DATA_CONNECTION,status::REQUESTED_FILE_ACTION_OK])
}
pub fn nlst(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
let command = pathname.map_or("NLST\r\n".into(), |path| format!("NLST {}\r\n", path).into());
self.list_command(command, status::ABOUT_TO_SEND, &[status::CLOSING_DATA_CONNECTION,status::REQUESTED_FILE_ACTION_OK])
}
pub fn mdtm(&mut self, pathname: &str) -> Result<Option<DateTime<UTC>>> {
try!(self.write_str(format!("MDTM {}\r\n", pathname)));
let Line(_, content) = try!(self.read_response(status::FILE));
match MDTM_RE.captures(&content) {
Some(caps) => {
let (year, month, day) = (
caps[1].parse::<i32>().unwrap(),
caps[2].parse::<u32>().unwrap(),
caps[3].parse::<u32>().unwrap()
);
let (hour, minute, second) = (
caps[4].parse::<u32>().unwrap(),
caps[5].parse::<u32>().unwrap(),
caps[6].parse::<u32>().unwrap()
);
Ok(Some(UTC.ymd(year, month, day).and_hms(hour, minute, second)))
},
None => Ok(None)
}
}
pub fn size(&mut self, pathname: &str) -> Result<Option<usize>> {
try!(self.write_str(format!("SIZE {}\r\n", pathname)));
let Line(_, content) = try!(self.read_response(status::FILE));
match SIZE_RE.captures(&content) {
Some(caps) => Ok(Some(caps[1].parse().unwrap())),
None => Ok(None)
}
}
fn write_str<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
if cfg!(feature = "debug_print") {
print!("CMD {}", command.as_ref());
}
let stream = self.reader.get_mut();
stream.write_all(command.as_ref().as_bytes())
.map_err(|send_err| FtpError::ConnectionError(send_err))
}
pub fn read_response(&mut self, expected_code: u32) -> Result<Line> {
self.read_response_in(&[expected_code])
}
pub fn read_response_in(&mut self, expected_code: &[u32]) -> Result<Line> {
let mut line = String::new();
try!(self.reader.read_line(&mut line)
.map_err(|read_err| FtpError::ConnectionError(read_err)));
if cfg!(feature = "debug_print") {
print!("FTP {}", line);
}
if line.len() < 5 {
return Err(FtpError::InvalidResponse("error: could not read reply code".to_owned()));
}
let code: u32 = try!(line[0..3].parse()
.map_err(|err| {
FtpError::InvalidResponse(format!("error: could not parse reply code: {}", err))
}));
let expected = format!("{} ", &line[0..3]);
while line.len() < 5 || line[0..4] != expected {
line.clear();
if let Err(e) = self.reader.read_line(&mut line) {
return Err(FtpError::ConnectionError(e));
}
if cfg!(feature = "debug_print") {
print!("FTP {}", line);
}
}
if expected_code.into_iter().any(|ec| code == *ec) {
Ok(Line(code, line))
} else {
Err(FtpError::InvalidResponse(format!("Expected code {:?}, got response: {}", expected_code, line)))
}
}
}