1use std::borrow::{Borrow, Cow};
3use std::collections::{HashMap, HashSet};
4use std::fmt::format;
5use std::net::{IpAddr, SocketAddr};
6use std::string::String;
7
8use chrono::offset::TimeZone;
9use chrono::{DateTime, Utc};
10use regex::Regex;
11use tokio::io::{
12 copy, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufStream,
13};
14use tokio::net::{TcpStream, ToSocketAddrs};
15#[cfg(feature = "ftps")]
16use tokio_rustls::{client::TlsStream, rustls::ClientConfig, rustls::ServerName, TlsConnector};
17
18use crate::ftp::connection::Connection;
19use crate::ftp::{Command, ftp_reply, MODES, REPLY_CODE_LEN};
20use crate::ftp::types::{FileType, FtpError, Result};
21use crate::StringExt;
22
23lazy_static::lazy_static! {
24 static ref PORT_RE: Regex = Regex::new(r"\((\d+),(\d+),(\d+),(\d+),(\d+),(\d+)\)").unwrap();
27
28 static ref MDTM_RE: Regex = Regex::new(r"\b(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\b").unwrap();
30
31 static ref SIZE_RE: Regex = Regex::new(r"\s+(\d+)\s*$").unwrap();
33 static ref PROT_COMMAND_VALUE: Vec<&'static str> = vec!["C","E","S","P"];
34}
35
36pub struct FtpClient {
37 stream: BufStream<Connection>,
38 welcome_msg: Option<String>,
39 _reply_code: u32,
40 _reply_string: Option<String>,
41 _reply_lines: Vec<String>,
42 #[cfg(feature = "ftps")]
43 ssl_cfg: Option<(ClientConfig, ServerName)>,
44 features_map: HashMap<String, Vec<String>>,
45}
46
47impl FtpClient {
48 fn new(stream: TcpStream) -> Self {
49 FtpClient {
50 stream: BufStream::new(Connection::Tcp(stream)),
51 #[cfg(feature = "ftps")]
52 ssl_cfg: None,
53 welcome_msg: None,
54 _reply_code: 0,
55 _reply_string: None,
56 features_map: HashMap::new(),
57 _reply_lines: vec![],
58 }
59 }
60
61 #[cfg(feature = "ftps")]
62 fn new_tls_client(stream: TlsStream<TcpStream>) -> Self {
63 FtpClient {
64 stream: BufStream::new(Connection::Ssl(stream)),
65 ssl_cfg: None,
66 welcome_msg: None,
67 _reply_code: 0,
68 _reply_string: None,
69 features_map: HashMap::new(),
70 _reply_lines: vec![],
71 }
72 }
73
74 pub fn init_default(&mut self) {}
75
76 pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpClient> {
78 let stream = TcpStream::connect(addr)
79 .await
80 .map_err(FtpError::ConnectionError)?;
81
82 let mut ftp_client = FtpClient::new(stream);
83 ftp_client.read_reply().await?;
84 ftp_client.check_response(ftp_reply::READY)?;
85 ftp_client.welcome_msg = Some(ftp_client._reply_string.clone().unwrap());
86 Ok(ftp_client)
87 }
88
89 #[cfg(feature = "ftps")]
114 pub async fn into_secure(
115 mut self,
116 config: ClientConfig,
117 domain: ServerName,
118 ) -> Result<FtpClient> {
119 self.exe_auth_tls().await?;
120
121 let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
122 let stream = connector
123 .connect(domain.clone(), self.stream.into_inner().into_tcp_stream())
124 .await
125 .map_err(|e| FtpError::SecureError(format!("{}", e)))?;
126
127 let mut ftps_client = FtpClient::new_tls_client(stream);
128 ftps_client.ssl_cfg = Some((config, domain));
129
130 ftps_client.send_command(Command::PBSZ, Some("0")).await?;
132 ftps_client.check_response(ftp_reply::COMMAND_OK)?;
133
134 ftps_client.send_command(Command::PROT, Some("P")).await?;
135 ftps_client.check_response(ftp_reply::COMMAND_OK)?;
136 Ok(ftps_client)
137 }
138
139 #[cfg(feature = "ftps")]
140 async fn exe_auth_tls(mut self) -> Result<()> {
141 self.send_command(Command::AUTH, Some("TLS")).await?;
142 self.check_response_in(&[ftp_reply::AUTH_OK, ftp_reply::SECURITY_MECHANISM_IS_OK])
143 }
144
145 #[cfg(feature = "ftps")]
146 pub async fn exe_auth(mut self, mechanism: &str) -> Result<u32> {
148 Ok(self.send_command(Command::AUTH, Some(mechanism)).await?)
149 }
150
151 #[cfg(feature = "ftps")]
152 pub async fn exec_pbsz(mut self, pbsz: u64) -> Result<()> {
155 if pbsz < 0 || 4294967295 < pbsz {
156 Err(FtpError::InvalidArgument(format!(
157 "Invalid pbsz value. The correct value should be: {} to {}",
158 0, "(2^32)-1"
159 )))
160 }
161 self.send_command(Command::PBSZ, Some(pbsz.to_string().as_str()))
162 .await?;
163 self.check_response_in(&[ftp_reply::COMMAND_OK])?;
164 Ok(())
165 }
166
167 #[cfg(feature = "ftps")]
168 pub async fn exec_adat(mut self, data: Option<&[u8]>) -> Result<u32> {
170 let mut args = None;
171 if data.is_some() {
172 args = Some(base64::encode(data).as_str());
173 }
174 Ok(self.send_command(Command::ADAT, args).await?)
175 }
176
177 #[cfg(feature = "ftps")]
178 pub async fn exec_conf(mut self, data: Option<&[u8]>) -> Result<u32> {
180 let mut args = None;
181 if data.is_some() {
182 args = Some(base64::encode(data).as_str());
183 }
184 Ok(self.send_command(Command::CONF, args).await?)
185 }
186
187 #[cfg(feature = "ftps")]
188 pub async fn exec_enc(mut self, data: Option<&[u8]>) -> Result<u32> {
190 let mut args = None;
191 if data.is_some() {
192 args = Some(base64::encode(data).as_str());
193 }
194 Ok(self.send_command(Command::ENC, args).await?)
195 }
196
197 #[cfg(feature = "ftps")]
198 pub async fn exec_mic(mut self, data: Option<&[u8]>) -> Result<u32> {
200 let mut args = None;
201 if data.is_some() {
202 args = Some(base64::encode(data).as_str());
203 }
204 Ok(self.send_command(Command::MIC, args).await?)
205 }
206
207 #[cfg(feature = "ftps")]
208 pub async fn exec_prot(mut self, prot: &mut str) -> Result<()> {
210 let mut p = prot.as_mut();
211 if p.is_empty() {
212 p = &mut "C";
213 }
214 if !PROT_COMMAND_VALUE.contains(&&*p) {
215 Err(FtpError::InvalidArgument(format!(
216 "Unsupported prot command value",
217 )))
218 }
219 self.send_command(Command::PROT, Some(p)).await?;
220 self.check_response_in(&[ftp_reply::COMMAND_OK])?;
221 Ok(())
222 }
223
224 #[cfg(feature = "ftps")]
249 pub async fn into_insecure(mut self) -> Result<FtpClient> {
250 self.send_command(Command::CCC, None).await?;
251 if self._reply_code == ftp_reply::COMMAND_OK {
252 Ok(FtpClient::new(self.stream.into_inner().into_tcp_stream()))
253 }
254 Err(FtpError::InvalidResponse(format!(
255 "Expected code {:?}, got response: {}",
256 ftp_reply::COMMAND_OK,
257 self._reply_string.unwrap()
258 )))
259 }
260
261 async fn data_command(&mut self, cmd: &str) -> Result<Connection> {
263 let addr = self.pasv().await?;
264 let stream = TcpStream::connect(addr)
265 .await
266 .map_err(FtpError::ConnectionError)?;
267
268 #[cfg(feature = "ftps")]
269 match &self.ssl_cfg {
270 Some((config, domain)) => {
271 let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
272 return connector
273 .connect(domain.to_owned(), stream)
274 .await
275 .map(|stream| Connection::Ssl(stream))
276 .map_err(|e| FtpError::SecureError(format!("{}", e)));
277 }
278 _ => {}
279 };
280 self.write_str(cmd).await?;
281 self.read_reply().await?;
282 Ok(Connection::Tcp(stream))
283 }
284
285 pub fn get_ref(&self) -> &TcpStream {
300 self.stream.get_ref().get_ref()
301 }
302
303 pub fn get_welcome_msg(&self) -> Option<&str> {
305 self.welcome_msg.as_deref()
306 }
307
308 pub async fn login(&mut self, user: &str, password: &str) -> Result<bool> {
310 self.send_command(Command::USER, Some(user)).await?;
311 if ftp_reply::is_positive_completion(self._reply_code) {
312 return Ok(true);
313 } else if !ftp_reply::is_positive_intermediate(self._reply_code) {
314 return Ok(false);
315 }
316 self.send_command(Command::PASS, Some(password)).await?;
317 Ok(ftp_reply::is_positive_completion(self._reply_code))
318 }
319
320 pub async fn cwd(&mut self, path: &str) -> Result<bool> {
322 self.send_command(Command::CWD, Some(path)).await?;
323 Ok(ftp_reply::is_positive_completion(self._reply_code))
324 }
325
326 pub async fn cdup(&mut self) -> Result<bool> {
328 self.send_command(Command::CDUP, None).await?;
329 Ok(ftp_reply::is_positive_completion(self._reply_code))
330 }
331
332 pub async fn pwd(&mut self) -> Result<String> {
334 self.send_command(Command::PWD, None).await?;
335 match &self._reply_string {
336 None => {
337 let cause = format!("Cannot get PWD Response from FTP server");
338 Err(FtpError::InvalidResponse(cause))
339 }
340 Some(content) => match (content.find('"'), content.rfind('"')) {
341 (Some(begin), Some(end)) if begin < end => Ok(content[begin + 1..end].to_string()),
342 _ => {
343 let cause = format!("Invalid PWD Response: {}", content);
344 Err(FtpError::InvalidResponse(cause))
345 }
346 },
347 }
348 }
349
350 pub async fn noop(&mut self) -> Result<bool> {
352 self.send_command(Command::NOOP, None).await?;
353 Ok(ftp_reply::is_positive_completion(self._reply_code))
354 }
355
356 pub async fn make_directory(&mut self, pathname: &str) -> Result<bool> {
358 match ftp_reply::is_positive_completion(self.mkd(pathname).await?) {
359 true => Ok(true),
360 false => {
361 return Err(FtpError::InvalidResponse(format!(
362 "Got error reply: {}",
363 self._reply_string.as_ref().unwrap()
364 )));
365 }
366 }
367 }
368
369 pub async fn mkd(&mut self, pathname: &str) -> Result<u32> {
371 Ok(self.send_command(Command::MKD, Some(pathname)).await?)
372 }
373
374 pub async fn acct(&mut self, account: &str) -> Result<u32> {
376 Ok(self.send_command(Command::ACCT, Some(account)).await?)
377 }
378
379 pub async fn abor(&mut self) -> Result<u32> {
381 Ok(self.send_command(Command::ABOR, None).await?)
382 }
383
384 pub async fn rein(&mut self) -> Result<u32> {
386 Ok(self.send_command(Command::REIN, None).await?)
387 }
388
389 pub async fn smnt(&mut self, dir: &str) -> Result<u32> {
391 Ok(self.send_command(Command::SMNT, Some(dir)).await?)
392 }
393
394 pub async fn epsv(&mut self) -> Result<u32> {
396 Ok(self.send_command(Command::EPSV, None).await?)
397 }
398
399 pub async fn type_cmd(&mut self, file_type: u32) -> Result<u32> {
401 let s = MODES.substring(file_type as usize, (file_type + 1) as usize);
402 Ok(self.send_command(Command::TYPE, Some(s)).await?)
403 }
404
405 pub async fn stru(&mut self, structure: u32) -> Result<u32> {
407 let s = MODES.substring(structure as usize, (structure + 1) as usize);
408 Ok(self.send_command(Command::STRU, Some(s)).await?)
409 }
410
411 pub async fn mode(&mut self, mode: u32) -> Result<u32> {
413 let s = MODES.substring(mode as usize, (mode + 1) as usize);
414 Ok(self.send_command(Command::MODE, Some(s)).await?)
415 }
416
417 pub async fn stou(&mut self) -> Result<u32> {
419 Ok(self.send_command(Command::STOU, None).await?)
420 }
421
422 pub async fn stou_pathname(&mut self, pathname: &str) -> Result<u32> {
424 Ok(self.send_command(Command::STOU, Some(pathname)).await?)
425 }
426
427 pub async fn appe(&mut self, pathname: &str) -> Result<u32> {
429 Ok(self.send_command(Command::APPE, Some(pathname)).await?)
430 }
431
432 pub async fn allo(&mut self, bytes: u32) -> Result<u32> {
434 Ok(self
435 .send_command(Command::ALLO, Some(bytes.to_string().as_str()))
436 .await?)
437 }
438
439 pub async fn allo_record_size(&mut self, bytes: u32, record_size: u32) -> Result<u32> {
441 let args = format!(
442 "{} R {}",
443 bytes.to_string().as_str(),
444 record_size.to_string().as_str()
445 );
446 Ok(self
447 .send_command(Command::ALLO, Some(args.as_str()))
448 .await?)
449 }
450
451 pub async fn port(&mut self, host: IpAddr, port: u16) -> Result<u32> {
453 let mut args = String::with_capacity(24);
454 args.push_str(host.to_string().replace('.', ",").as_str());
455 args.push_str(",");
456 args.push_str((port >> 8).to_string().as_str());
457 args.push_str(",");
458 args.push_str((port & 0xff).to_string().as_str());
459 Ok(self
460 .send_command(Command::PORT, Some(args.as_str()))
461 .await?)
462 }
463
464 pub async fn eprt(&mut self, host: IpAddr, port: u16) -> Result<u32> {
468 let mut args = String::new();
469 let mut h = host.to_string();
470 let n = h.find("%").unwrap_or(0);
471 if n > 0 {
472 h = h.substring(0, n).to_string();
473 }
474 args.push_str("|");
475 match host {
476 IpAddr::V4(addr) => args.push_str("1"),
477 IpAddr::V6(addr) => args.push_str("2"),
478 }
479 args.push_str("|");
480 args.push_str(h.as_str());
481 args.push_str("|");
482 args.push_str(port.to_string().as_str());
483 args.push_str("|");
484 Ok(self
485 .send_command(Command::EPRT, Some(args.as_str()))
486 .await?)
487 }
488
489 pub async fn mfmt(&mut self, pathname: &str, timeval: &str) -> Result<u32> {
491 Ok(self
492 .send_command(
493 Command::MFMT,
494 Some(format!("{} {}", timeval, pathname).as_str()),
495 )
496 .await?)
497 }
498
499 async fn pasv(&mut self) -> Result<SocketAddr> {
501 self.send_command(Command::PASV, None).await?;
502 self.check_response(ftp_reply::PASSIVE_MODE)?;
503 let reply_str = self._reply_string.clone().unwrap();
504 let reply_str = reply_str.as_str();
505 PORT_RE
506 .captures(reply_str)
507 .ok_or_else(|| {
508 FtpError::InvalidResponse(format!("Invalid PASV response: {}", reply_str))
509 })
510 .and_then(|caps| {
511 let (oct1, oct2, oct3, oct4) = (
513 caps[1].parse::<u8>().unwrap(),
514 caps[2].parse::<u8>().unwrap(),
515 caps[3].parse::<u8>().unwrap(),
516 caps[4].parse::<u8>().unwrap(),
517 );
518 let (msb, lsb) = (
519 caps[5].parse::<u8>().unwrap(),
520 caps[6].parse::<u8>().unwrap(),
521 );
522 let port = ((msb as u16) << 8) + lsb as u16;
523
524 use std::net::{IpAddr, Ipv4Addr};
525
526 let ip = if (oct1, oct2, oct3, oct4) == (0, 0, 0, 0) {
527 self.get_ref()
528 .peer_addr()
529 .map_err(FtpError::ConnectionError)?
530 .ip()
531 } else {
532 IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
533 };
534 Ok(SocketAddr::new(ip, port))
535 })
536 }
537
538 pub async fn transfer_type(&mut self, file_type: FileType) -> Result<bool> {
541 self.send_command(Command::TYPE, Some(file_type.to_string().as_str()))
544 .await?;
545 Ok(ftp_reply::is_positive_completion(self._reply_code))
546 }
547
548 pub async fn logout(&mut self) -> Result<bool> {
550 self.send_command(Command::QUIT, None).await?;
551 Ok(ftp_reply::is_positive_completion(self._reply_code))
552 }
553
554 pub async fn restart_from(&mut self, offset: u64) -> Result<bool> {
556 self.send_command(Command::REST, Some(offset.to_string().as_str()))
557 .await?;
558 Ok(ftp_reply::is_positive_intermediate(self._reply_code))
559 }
560
561 pub async fn get(&mut self, file_name: &str) -> Result<BufStream<Connection>> {
566 let retr_command = format!("RETR {}\r\n", file_name);
567 let data_stream = BufStream::new(self.data_command(&retr_command).await?);
568 self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
569 Ok(data_stream)
570 }
571
572 pub async fn rename(&mut self, from_name: &str, to_name: &str) -> Result<bool> {
574 self.send_command(Command::RNFR, Some(from_name)).await?;
575 if !ftp_reply::is_positive_intermediate(self._reply_code) {
576 return Ok(false);
577 }
578 self.send_command(Command::RNTO, Some(to_name)).await?;
579 Ok(ftp_reply::is_positive_completion(self._reply_code))
580 }
581
582 pub async fn retr<F, T, P, E>(&mut self, filename: &str, reader: F) -> std::result::Result<T, E>
613 where
614 F: Fn(BufStream<Connection>) -> P,
615 P: std::future::Future<Output = std::result::Result<T, E>>,
616 E: From<FtpError>,
617 {
618 let retr_command = format!("{} {}\r\n", Command::RETR.cmd_name(), filename);
619 let data_stream = BufStream::new(self.data_command(&retr_command).await?);
620 self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
621 let res = reader(data_stream).await?;
622 Ok(res)
623 }
624
625 pub async fn simple_retr(&mut self, file_name: &str) -> Result<std::io::Cursor<Vec<u8>>> {
646 async fn do_read(mut reader: BufStream<Connection>) -> Result<Vec<u8>> {
647 let mut buffer = Vec::new();
648 reader
649 .read_to_end(&mut buffer)
650 .await
651 .map_err(FtpError::ConnectionError)?;
652
653 Ok(buffer)
654 }
655
656 let buffer = self.retr(file_name, do_read).await?;
657 Ok(std::io::Cursor::new(buffer))
658 }
659
660 pub async fn remove_directory(&mut self, pathname: &str) -> Result<bool> {
661 Ok(ftp_reply::is_positive_completion(self.rmd(pathname).await?))
662 }
663
664 pub async fn rmd(&mut self, pathname: &str) -> Result<u32> {
666 Ok(self.send_command(Command::RMD, Some(pathname)).await?)
667 }
668
669 pub async fn delete_file(&mut self, filename: &str) -> Result<bool> {
670 Ok(ftp_reply::is_positive_completion(
671 self.dele(filename).await?,
672 ))
673 }
674
675 pub async fn dele(&mut self, filename: &str) -> Result<u32> {
677 Ok(self.send_command(Command::DELE, Some(filename)).await?)
678 }
679
680 async fn put_file<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
681 let stor_command = format!("{} {}\r\n", Command::STOR, filename);
682 let mut data_stream = BufStream::new(self.data_command(&stor_command).await?);
683 self.check_response_in(&[ftp_reply::ALREADY_OPEN, ftp_reply::ABOUT_TO_SEND])?;
684 copy(r, &mut data_stream)
685 .await
686 .map_err(FtpError::ConnectionError)?;
687 Ok(())
688 }
689
690 pub async fn send_command(&mut self, cmd: Command, agrs: Option<&str>) -> Result<u32> {
692 let mut ftp_cmd = format!("{}\r\n", cmd.cmd_name());
693 if agrs.is_some() {
694 ftp_cmd = format!("{} {}\r\n", cmd.cmd_name(), agrs.unwrap());
695 }
696 self.write_str(ftp_cmd).await?;
697 self.read_reply().await?;
698 Ok(self._reply_code)
699 }
700
701 async fn read_reply(&mut self) -> Result<()> {
702 self._reply_lines.clear();
703 self._reply_string = None;
704 let mut line = String::new();
705 self.stream
706 .read_line(&mut line)
707 .await
708 .map_err(FtpError::ConnectionError)?;
709
710 if line.len() < REPLY_CODE_LEN {
711 return Err(FtpError::InvalidResponse(format!(
712 "Truncated server reply: {}",
713 line
714 )));
715 }
716
717 if line.len() < 5 {
718 return Err(FtpError::InvalidResponse(
719 "error: could not read reply code".to_owned(),
720 ));
721 }
722
723 let reply_code: u32 = line[0..3].parse().map_err(|_err| {
724 FtpError::InvalidResponse(format!(
725 "Could not parse reply code. \n Server Reply: {}",
726 line
727 ))
728 })?;
729 self._reply_code = reply_code;
730 self._reply_lines.push(line.as_str().to_string());
731 let expected = format!("{} ", &line[0..3]);
732 while line.len() < 5 || line[0..4] != expected {
733 line.clear();
734 if let Err(e) = self.stream.read_line(&mut line).await {
735 return Err(FtpError::ConnectionError(e));
736 }
737 self._reply_lines.push(line.as_str().to_string());
738 }
739 let mut s = String::new();
740 for x in self._reply_lines.iter() {
741 s.push_str(x.as_str())
742 }
743 self._reply_string = Some(s);
744 Ok(())
745 }
746
747 pub async fn put<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
749 self.put_file(filename, r).await?;
750 self.check_response_in(&[
751 ftp_reply::CLOSING_DATA_CONNECTION,
752 ftp_reply::REQUESTED_FILE_ACTION_OK,
753 ])?;
754 Ok(())
755 }
756
757 async fn list_command(
759 &mut self,
760 cmd: Cow<'_, str>,
761 open_code: u32,
762 close_code: &[u32],
763 ) -> Result<Vec<String>> {
764 let data_stream = BufStream::new(self.data_command(&cmd).await?);
765 self.check_response_in(&[open_code, ftp_reply::ALREADY_OPEN])?;
766 let lines = Self::get_lines_from_stream(data_stream).await?;
767 self.check_response_in(close_code)?;
768 Ok(lines)
769 }
770
771 async fn get_lines_from_stream<R>(data_stream: R) -> Result<Vec<String>>
773 where
774 R: AsyncBufRead + Unpin,
775 {
776 let mut lines: Vec<String> = Vec::new();
777
778 let mut lines_stream = data_stream.lines();
779 loop {
780 let line = lines_stream
781 .next_line()
782 .await
783 .map_err(FtpError::ConnectionError)?;
784
785 match line {
786 Some(line) => {
787 if line.is_empty() {
788 continue;
789 }
790 lines.push(line);
791 }
792 None => break Ok(lines),
793 }
794 }
795 }
796
797 pub async fn list(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
801 let command = pathname.map_or("LIST\r\n".into(), |path| {
802 format!("LIST {}\r\n", path).into()
803 });
804
805 self.list_command(
806 command,
807 ftp_reply::ABOUT_TO_SEND,
808 &[
809 ftp_reply::CLOSING_DATA_CONNECTION,
810 ftp_reply::REQUESTED_FILE_ACTION_OK,
811 ],
812 )
813 .await
814 }
815
816 pub async fn nlst(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
820 let command = pathname.map_or("NLST\r\n".into(), |path| {
821 format!("NLST {}\r\n", path).into()
822 });
823
824 self.list_command(
825 command,
826 ftp_reply::ABOUT_TO_SEND,
827 &[
828 ftp_reply::CLOSING_DATA_CONNECTION,
829 ftp_reply::REQUESTED_FILE_ACTION_OK,
830 ],
831 )
832 .await
833 }
834
835 pub async fn mdtm(&mut self, pathname: &str) -> Result<Option<DateTime<Utc>>> {
838 self.send_command(Command::MDTM, Some(pathname)).await?;
839 self.check_response(ftp_reply::FILE)?;
840 let reply_str = self._reply_string.clone().unwrap();
841 let reply_str = reply_str.as_str();
842 match MDTM_RE.captures(reply_str) {
843 Some(caps) => {
844 let (year, month, day) = (
845 caps[1].parse::<i32>().unwrap(),
846 caps[2].parse::<u32>().unwrap(),
847 caps[3].parse::<u32>().unwrap(),
848 );
849 let (hour, minute, second) = (
850 caps[4].parse::<u32>().unwrap(),
851 caps[5].parse::<u32>().unwrap(),
852 caps[6].parse::<u32>().unwrap(),
853 );
854 Ok(Some(
855 Utc.ymd(year, month, day).and_hms(hour, minute, second),
856 ))
857 }
858 None => Ok(None),
859 }
860 }
861
862 pub async fn size(&mut self, pathname: &str) -> Result<Option<usize>> {
865 self.send_command(Command::SIZE, Some(pathname)).await?;
866 self.check_response(ftp_reply::FILE)?;
867 let reply_str = self._reply_string.clone().unwrap();
868 let reply_str = reply_str.as_str();
869 match SIZE_RE.captures(reply_str) {
870 Some(caps) => Ok(Some(caps[1].parse().unwrap())),
871 None => Ok(None),
872 }
873 }
874
875 pub async fn feat(&mut self) -> Result<u32> {
876 self._reply_lines.clear();
877 Ok(self.send_command(Command::FEAT, None).await?)
878 }
879
880 pub async fn features(&mut self, cmd: Command) -> Result<Option<Vec<String>>> {
881 let features = Vec::new();
882 if self.init_feature_map().await? {
883 let values = self.features_map.get(cmd.cmd_name());
884 if values.is_some() {
885 return Ok(Some(values.unwrap().clone()));
886 }
887 }
888 Ok(Some(features))
889 }
890
891 async fn init_feature_map(&mut self) -> Result<bool> {
892 if self.features_map.is_empty() {
893 let reply_code = self.feat().await?;
894 if reply_code == ftp_reply::NOT_LOGGED_IN.into() {
895 return Ok(false);
896 }
897 let success = ftp_reply::is_positive_completion(reply_code);
898 if !success {
899 return Ok(false);
900 }
901 for l in self._reply_lines.iter() {
902 if l.starts_with(" ") {
903 let mut key = "";
904 let mut value = "";
905 let s = &l[1..l.len() - 1];
906 let varsep = s.find(' ');
907 if varsep.is_some() {
908 key = &l[1..varsep.unwrap() + 1];
909 value = &l[varsep.unwrap() + 1..l.len()];
910 } else {
911 key = &l[1..l.len() - 1]
912 }
913 let entries = self.features_map.get_mut(key);
914 match entries {
915 None => {
916 let mut features = vec![];
917 features.push(String::from(value));
918 self.features_map.insert(key.to_string(), features);
919 }
920 Some(features) => {
921 features.push(value.to_string());
922 }
923 }
924 }
925 }
926 }
927 Ok(true)
928 }
929
930 async fn write_str<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
931 let conn = self.stream.get_mut();
932 conn.write_all(command.as_ref().as_bytes())
933 .await
934 .map_err(FtpError::ConnectionError)
935 }
936
937 pub fn check_response(&mut self, expected_code: u32) -> Result<()> {
938 self.check_response_in(&[expected_code])
939 }
940
941 pub fn check_response_in(&mut self, expected_code: &[u32]) -> Result<()> {
943 let reply_string = self._reply_string.clone();
944 if expected_code.iter().any(|ec| self._reply_code == *ec) {
945 Ok(())
946 } else {
947 Err(FtpError::InvalidResponse(format!(
948 "Expected code {:?}, got response: {}",
949 expected_code,
950 reply_string.unwrap().as_str()
951 )))
952 }
953 }
954}
955
956#[cfg(test)]
957mod tests {
958 use tokio_stream::once;
959 use tokio_util::io::StreamReader;
960
961 use super::FtpClient;
962
963 #[tokio::test]
964 async fn list_command_dos_newlines() {
965 let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
966 b"Hello\r\nWorld\r\n\r\nBe\r\nHappy\r\n" as &[u8],
967 )));
968
969 assert_eq!(
970 FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
971 ["Hello", "World", "Be", "Happy"]
972 .iter()
973 .map(<&str>::to_string)
974 .collect::<Vec<_>>()
975 );
976 }
977
978 #[tokio::test]
979 async fn list_command_unix_newlines() {
980 let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
981 b"Hello\nWorld\n\nBe\nHappy\n" as &[u8],
982 )));
983
984 assert_eq!(
985 FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
986 ["Hello", "World", "Be", "Happy"]
987 .iter()
988 .map(<&str>::to_string)
989 .collect::<Vec<_>>()
990 );
991 }
992}