1use super::data_stream::DataStream;
4use super::status;
5use super::types::{FileType, FtpError, Line, Result};
6use chrono::offset::TimeZone;
7use chrono::{DateTime, Utc};
8#[cfg(feature = "secure")]
9use openssl::ssl::{Ssl, SslContext};
10use regex::Regex;
11use std::borrow::Cow;
12use std::io::{copy, BufRead, BufReader, BufWriter, Cursor, Read, Write};
13use std::net::ToSocketAddrs;
14use std::net::{SocketAddr, TcpStream};
15use std::str::FromStr;
16use std::string::String;
17
18lazy_static! {
19 static ref PORT_RE: Regex = Regex::new(r"\((\d+),(\d+),(\d+),(\d+),(\d+),(\d+)\)").unwrap();
22
23 static ref MDTM_RE: Regex = Regex::new(r"\b(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\b").unwrap();
25
26 static ref SIZE_RE: Regex = Regex::new(r"\s+(\d+)\s*$").unwrap();
28}
29
30#[derive(Debug)]
32pub struct FtpStream {
33 reader: BufReader<DataStream>,
34 #[cfg(feature = "secure")]
35 ssl_cfg: Option<SslContext>,
36}
37
38impl FtpStream {
39 #[cfg(not(feature = "secure"))]
41 pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpStream> {
42 TcpStream::connect(addr)
43 .map_err(FtpError::ConnectionError)
44 .and_then(|stream| {
45 let mut ftp_stream = FtpStream {
46 reader: BufReader::new(DataStream::Tcp(stream)),
47 };
48 ftp_stream.read_response(status::READY).map(|_| ftp_stream)
49 })
50 }
51
52 #[cfg(feature = "secure")]
54 pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpStream> {
55 TcpStream::connect(addr)
56 .map_err(FtpError::ConnectionError)
57 .and_then(|stream| {
58 let mut ftp_stream = FtpStream {
59 reader: BufReader::new(DataStream::Tcp(stream)),
60 ssl_cfg: None,
61 };
62 ftp_stream.read_response(status::READY).map(|_| ftp_stream)
63 })
64 }
65
66 #[cfg(feature = "secure")]
88 pub fn into_secure(mut self, ssl_context: SslContext) -> Result<FtpStream> {
89 self.write_str("AUTH TLS\r\n")?;
91 self.read_response(status::AUTH_OK)?;
92 let ssl_cfg = Ssl::new(&ssl_context).map_err(|e| FtpError::SecureError(e.to_string()))?;
93 let stream = ssl_cfg
94 .connect(self.reader.into_inner().into_tcp_stream())
95 .map_err(|e| FtpError::SecureError(e.to_string()))?;
96
97 let mut secured_ftp_tream = FtpStream {
98 reader: BufReader::new(DataStream::Ssl(stream)),
99 ssl_cfg: Some(ssl_context),
100 };
101 secured_ftp_tream.write_str("PBSZ 0\r\n")?;
103 secured_ftp_tream.read_response(status::COMMAND_OK)?;
104 secured_ftp_tream.write_str("PROT P\r\n")?;
106 secured_ftp_tream.read_response(status::COMMAND_OK)?;
107 Ok(secured_ftp_tream)
108 }
109
110 #[cfg(feature = "secure")]
134 pub fn into_insecure(mut self) -> Result<FtpStream> {
135 self.write_str("CCC\r\n")?;
137 self.read_response(status::COMMAND_OK)?;
138 let plain_ftp_stream = FtpStream {
139 reader: BufReader::new(DataStream::Tcp(self.reader.into_inner().into_tcp_stream())),
140 ssl_cfg: None,
141 };
142 Ok(plain_ftp_stream)
143 }
144
145 #[cfg(not(feature = "secure"))]
147 fn data_command(&mut self, cmd: &str) -> Result<DataStream> {
148 self
149 .pasv()
150 .and_then(|addr| self.write_str(cmd).map(|_| addr))
151 .and_then(|addr| TcpStream::connect(addr).map_err(FtpError::ConnectionError))
152 .map(DataStream::Tcp)
153 }
154
155 #[cfg(feature = "secure")]
157 fn data_command(&mut self, cmd: &str) -> Result<DataStream> {
158 self
159 .pasv()
160 .and_then(|addr| self.write_str(cmd).map(|_| addr))
161 .and_then(|addr| TcpStream::connect(addr).map_err(FtpError::ConnectionError))
162 .and_then(|stream| match self.ssl_cfg {
163 Some(ref ssl) => Ssl::new(ssl)
164 .unwrap()
165 .connect(stream)
166 .map(DataStream::Ssl)
167 .map_err(|e| FtpError::SecureError(e.to_string())),
168 None => Ok(DataStream::Tcp(stream)),
169 })
170 }
171
172 pub fn get_ref(&self) -> &TcpStream {
186 self.reader.get_ref().get_ref()
187 }
188
189 pub fn login(&mut self, user: &str, password: &str) -> Result<()> {
191 self.write_str(format!("USER {}\r\n", user))?;
192 self
193 .read_response_in(&[status::LOGGED_IN, status::NEED_PASSWORD])
194 .and_then(|Line(code, _)| {
195 if code == status::NEED_PASSWORD {
196 self.write_str(format!("PASS {}\r\n", password))?;
197 self.read_response(status::LOGGED_IN)?;
198 }
199 Ok(())
200 })
201 }
202
203 pub fn cwd(&mut self, path: &str) -> Result<()> {
205 self.write_str(format!("CWD {}\r\n", path))?;
206 self
207 .read_response(status::REQUESTED_FILE_ACTION_OK)
208 .map(|_| ())
209 }
210
211 pub fn cdup(&mut self) -> Result<()> {
213 self.write_str("CDUP\r\n")?;
214 self
215 .read_response_in(&[status::COMMAND_OK, status::REQUESTED_FILE_ACTION_OK])
216 .map(|_| ())
217 }
218
219 pub fn pwd(&mut self) -> Result<String> {
221 self.write_str("PWD\r\n")?;
222 self
223 .read_response(status::PATH_CREATED)
224 .and_then(
225 |Line(_, content)| match (content.find('"'), content.rfind('"')) {
226 (Some(begin), Some(end)) if begin < end => Ok(content[begin + 1..end].to_string()),
227 _ => {
228 let cause = format!("Invalid PWD Response: {}", content);
229 Err(FtpError::InvalidResponse(cause))
230 }
231 },
232 )
233 }
234
235 pub fn noop(&mut self) -> Result<()> {
237 self.write_str("NOOP\r\n")?;
238 self.read_response(status::COMMAND_OK).map(|_| ())
239 }
240
241 pub fn mkdir(&mut self, pathname: &str) -> Result<()> {
243 self.write_str(format!("MKD {}\r\n", pathname))?;
244 self.read_response(status::PATH_CREATED).map(|_| ())
245 }
246
247 fn pasv(&mut self) -> Result<SocketAddr> {
249 self.write_str("PASV\r\n")?;
250 let Line(_, line) = self.read_response(status::PASSIVE_MODE)?;
252 PORT_RE
253 .captures(&line)
254 .ok_or_else(|| FtpError::InvalidResponse(format!("Invalid PASV response: {}", line)))
255 .and_then(|caps| {
256 let (oct1, oct2, oct3, oct4) = (
258 caps[1].parse::<u8>().unwrap(),
259 caps[2].parse::<u8>().unwrap(),
260 caps[3].parse::<u8>().unwrap(),
261 caps[4].parse::<u8>().unwrap(),
262 );
263 let (msb, lsb) = (
264 caps[5].parse::<u8>().unwrap(),
265 caps[6].parse::<u8>().unwrap(),
266 );
267 let port = ((msb as u16) << 8) + lsb as u16;
268 let addr = format!("{}.{}.{}.{}:{}", oct1, oct2, oct3, oct4, port);
269 SocketAddr::from_str(&addr).map_err(FtpError::InvalidAddress)
270 })
271 }
272
273 pub fn transfer_type(&mut self, file_type: FileType) -> Result<()> {
276 let type_command = format!("TYPE {}\r\n", file_type.to_string());
277 self.write_str(&type_command)?;
278 self.read_response(status::COMMAND_OK).map(|_| ())
279 }
280
281 pub fn quit(&mut self) -> Result<()> {
283 self.write_str("QUIT\r\n")?;
284 self.read_response(status::CLOSING).map(|_| ())
285 }
286
287 pub fn get(&mut self, file_name: &str) -> Result<BufReader<DataStream>> {
292 let retr_command = format!("RETR {}\r\n", file_name);
293 let data_stream = BufReader::new(self.data_command(&retr_command)?);
294 self
295 .read_response(status::ABOUT_TO_SEND)
296 .map(|_| data_stream)
297 }
298
299 pub fn rename(&mut self, from_name: &str, to_name: &str) -> Result<()> {
301 self.write_str(format!("RNFR {}\r\n", from_name))?;
302 self
303 .read_response(status::REQUEST_FILE_PENDING)
304 .and_then(|_| {
305 self.write_str(format!("RNTO {}\r\n", to_name))?;
306 self
307 .read_response(status::REQUESTED_FILE_ACTION_OK)
308 .map(|_| ())
309 })
310 }
311
312 pub fn retr<F, T>(&mut self, filename: &str, reader: F) -> Result<T>
333 where
334 F: Fn(&mut dyn Read) -> Result<T>,
335 {
336 let retr_command = format!("RETR {}\r\n", filename);
337 {
338 let mut data_stream = BufReader::new(self.data_command(&retr_command)?);
339 self
340 .read_response_in(&[status::ABOUT_TO_SEND, status::ALREADY_OPEN])
341 .and_then(|_| reader(&mut data_stream))
342 }
343 .and_then(|res| {
344 self
345 .read_response_in(&[
346 status::CLOSING_DATA_CONNECTION,
347 status::REQUESTED_FILE_ACTION_OK,
348 ])
349 .map(|_| res)
350 })
351 }
352
353 pub fn simple_retr(&mut self, file_name: &str) -> Result<Cursor<Vec<u8>>> {
369 self
370 .retr(file_name, |reader| {
371 let mut buffer = Vec::new();
372 reader
373 .read_to_end(&mut buffer)
374 .map(|_| buffer)
375 .map_err(FtpError::ConnectionError)
376 })
377 .map(Cursor::new)
378 }
379
380 pub fn rmdir(&mut self, pathname: &str) -> Result<()> {
382 self.write_str(format!("RMD {}\r\n", pathname))?;
383 self
384 .read_response(status::REQUESTED_FILE_ACTION_OK)
385 .map(|_| ())
386 }
387
388 pub fn rm(&mut self, filename: &str) -> Result<()> {
390 self.write_str(format!("DELE {}\r\n", filename))?;
391 self
392 .read_response(status::REQUESTED_FILE_ACTION_OK)
393 .map(|_| ())
394 }
395
396 fn put_file<R: Read>(&mut self, filename: &str, r: &mut R) -> Result<()> {
397 let stor_command = format!("STOR {}\r\n", filename);
398 let mut data_stream = BufWriter::new(self.data_command(&stor_command)?);
399 self.read_response_in(&[status::ALREADY_OPEN, status::ABOUT_TO_SEND])?;
400 copy(r, &mut data_stream)
401 .map_err(FtpError::ConnectionError)
402 .map(|_| ())
403 }
404
405 pub fn put<R: Read>(&mut self, filename: &str, r: &mut R) -> Result<()> {
407 self.put_file(filename, r)?;
408 self
409 .read_response_in(&[
410 status::CLOSING_DATA_CONNECTION,
411 status::REQUESTED_FILE_ACTION_OK,
412 ])
413 .map(|_| ())
414 }
415
416 pub fn start_put_file(&mut self, filename: &str) -> Result<BufWriter<DataStream>> {
417 let stor_command = format!("STOR {}\r\n", filename);
418 let data_stream = BufWriter::new(self.data_command(&stor_command)?);
419 self.read_response_in(&[status::ALREADY_OPEN, status::ABOUT_TO_SEND])?;
420 Ok(data_stream)
421 }
422
423 pub fn finish_put_file(&mut self) -> Result<()> {
424 self
425 .read_response_in(&[
426 status::CLOSING_DATA_CONNECTION,
427 status::REQUESTED_FILE_ACTION_OK,
428 ])
429 .map(|_| ())
430 }
431
432 fn list_command(
434 &mut self,
435 cmd: Cow<'static, str>,
436 open_code: u32,
437 close_code: &[u32],
438 ) -> Result<Vec<String>> {
439 let mut lines: Vec<String> = Vec::new();
440 {
441 let mut data_stream = BufReader::new(self.data_command(&cmd)?);
442 self.read_response_in(&[open_code, status::ALREADY_OPEN])?;
443
444 let mut line = String::new();
445 loop {
446 match data_stream.read_to_string(&mut line) {
447 Ok(0) => break,
448 Ok(_) => lines.extend(
449 line
450 .split("\r\n")
451 .map(String::from)
452 .filter(|s| !s.is_empty()),
453 ),
454 Err(err) => return Err(FtpError::ConnectionError(err)),
455 };
456 }
457 }
458
459 self.read_response_in(close_code).map(|_| lines)
460 }
461
462 pub fn list(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
466 let command = pathname.map_or("LIST\r\n".into(), |path| {
467 format!("LIST {}\r\n", path).into()
468 });
469
470 self.list_command(
471 command,
472 status::ABOUT_TO_SEND,
473 &[
474 status::CLOSING_DATA_CONNECTION,
475 status::REQUESTED_FILE_ACTION_OK,
476 ],
477 )
478 }
479
480 pub fn nlst(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
484 let command = pathname.map_or("NLST\r\n".into(), |path| {
485 format!("NLST {}\r\n", path).into()
486 });
487
488 self.list_command(
489 command,
490 status::ABOUT_TO_SEND,
491 &[
492 status::CLOSING_DATA_CONNECTION,
493 status::REQUESTED_FILE_ACTION_OK,
494 ],
495 )
496 }
497
498 pub fn mdtm(&mut self, pathname: &str) -> Result<Option<DateTime<Utc>>> {
501 self.write_str(format!("MDTM {}\r\n", pathname))?;
502 let Line(_, content) = self.read_response(status::FILE)?;
503
504 match MDTM_RE.captures(&content) {
505 Some(caps) => {
506 let (year, month, day) = (
507 caps[1].parse::<i32>().unwrap(),
508 caps[2].parse::<u32>().unwrap(),
509 caps[3].parse::<u32>().unwrap(),
510 );
511 let (hour, minute, second) = (
512 caps[4].parse::<u32>().unwrap(),
513 caps[5].parse::<u32>().unwrap(),
514 caps[6].parse::<u32>().unwrap(),
515 );
516 Ok(Some(
517 Utc.ymd(year, month, day).and_hms(hour, minute, second),
518 ))
519 }
520 None => Ok(None),
521 }
522 }
523
524 pub fn size(&mut self, pathname: &str) -> Result<Option<usize>> {
527 self.write_str(format!("SIZE {}\r\n", pathname))?;
528 let Line(_, content) = self.read_response(status::FILE)?;
529
530 match SIZE_RE.captures(&content) {
531 Some(caps) => Ok(Some(caps[1].parse().unwrap())),
532 None => Ok(None),
533 }
534 }
535
536 fn write_str<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
537 if cfg!(feature = "debug_print") {
538 print!("CMD {}", command.as_ref());
539 }
540
541 let stream = self.reader.get_mut();
542 stream
543 .write_all(command.as_ref().as_bytes())
544 .map_err(FtpError::ConnectionError)
545 }
546
547 pub fn read_response(&mut self, expected_code: u32) -> Result<Line> {
548 self.read_response_in(&[expected_code])
549 }
550
551 pub fn read_response_in(&mut self, expected_code: &[u32]) -> Result<Line> {
553 let mut line = String::new();
554 self
555 .reader
556 .read_line(&mut line)
557 .map_err(FtpError::ConnectionError)?;
558
559 if cfg!(feature = "debug_print") {
560 print!("FTP {}", line);
561 }
562
563 if line.len() < 5 {
564 return Err(FtpError::InvalidResponse(
565 "error: could not read reply code".to_owned(),
566 ));
567 }
568
569 let code: u32 = line[0..3].parse().map_err(|err| {
570 FtpError::InvalidResponse(format!("error: could not parse reply code: {}", err))
571 })?;
572
573 let expected = format!("{} ", &line[0..3]);
576 while line.len() < 5 || line[0..4] != expected {
577 line.clear();
578 if let Err(e) = self.reader.read_line(&mut line) {
579 return Err(FtpError::ConnectionError(e));
580 }
581
582 if cfg!(feature = "debug_print") {
583 print!("FTP {}", line);
584 }
585 }
586
587 if expected_code.iter().any(|ec| code == *ec) {
588 Ok(Line(code, line))
589 } else {
590 Err(FtpError::InvalidResponse(format!(
591 "Expected code {:?}, got response: {}",
592 expected_code, line
593 )))
594 }
595 }
596}