1use std::borrow::{Borrow, Cow};
3use std::collections::{HashMap, HashSet};
4use std::fmt::format;
5use std::net::{IpAddr, SocketAddr};
6use std::string::String;
7
8use chrono::offset::TimeZone;
9use chrono::{DateTime, Utc};
10use regex::Regex;
11use tokio::io::{
12 copy, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufStream,
13};
14use tokio::net::{TcpStream, ToSocketAddrs};
15#[cfg(feature = "ftps")]
16use tokio_rustls::{client::TlsStream, rustls::ClientConfig, rustls::ServerName, TlsConnector};
17
18use crate::cmd::Command;
19use crate::connection::Connection;
20use crate::types::{FileType, FtpError, Result};
21use crate::{cmd, ftp_reply, StringExt, MODES, REPLY_CODE_LEN};
22
23lazy_static::lazy_static! {
24 static ref PORT_RE: Regex = Regex::new(r"\((\d+),(\d+),(\d+),(\d+),(\d+),(\d+)\)").unwrap();
27
28 static ref MDTM_RE: Regex = Regex::new(r"\b(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\b").unwrap();
30
31 static ref SIZE_RE: Regex = Regex::new(r"\s+(\d+)\s*$").unwrap();
33 static ref PROT_COMMAND_VALUE: Vec<&'static str> = {vec!["C","E","S","P"]};
34}
35
36pub struct FtpClient {
37 stream: BufStream<Connection>,
38 welcome_msg: Option<String>,
39 _reply_code: u32,
40 _reply_string: Option<String>,
41 _reply_lines: Vec<String>,
42 #[cfg(feature = "ftps")]
43 ssl_cfg: Option<(ClientConfig, ServerName)>,
44 features_map: HashMap<String, Vec<String>>,
45}
46
47impl FtpClient {
48 fn new(stream: TcpStream) -> Self {
49 FtpClient {
50 stream: BufStream::new(Connection::Tcp(stream)),
51 #[cfg(feature = "ftps")]
52 ssl_cfg: None,
53 welcome_msg: None,
54 _reply_code: 0,
55 _reply_string: None,
56 features_map: HashMap::new(),
57 _reply_lines: vec![],
58 }
59 }
60
61 #[cfg(feature = "ftps")]
62 fn new_tls_client(stream: TlsStream<TcpStream>) -> Self {
63 FtpClient {
64 stream: BufStream::new(Connection::Ssl(stream)),
65 ssl_cfg: None,
66 welcome_msg: None,
67 _reply_code: 0,
68 _reply_string: None,
69 features_map: HashMap::new(),
70 _reply_lines: vec![],
71 }
72 }
73
74 pub fn init_default(&mut self) {}
75
76 pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpClient> {
78 let stream = TcpStream::connect(addr)
79 .await
80 .map_err(FtpError::ConnectionError)?;
81
82 let mut ftp_client = FtpClient::new(stream);
83 ftp_client.read_reply().await?;
84 ftp_client.check_response(ftp_reply::READY)?;
85 ftp_client.welcome_msg = Some(ftp_client._reply_string.clone().unwrap());
86 Ok(ftp_client)
87 }
88
89 #[cfg(feature = "ftps")]
114 pub async fn into_secure(
115 mut self,
116 config: ClientConfig,
117 domain: ServerName,
118 ) -> Result<FtpClient> {
119 self.exe_auth_tls().await?;
120
121 let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
122 let stream = connector
123 .connect(domain.clone(), self.stream.into_inner().into_tcp_stream())
124 .await
125 .map_err(|e| FtpError::SecureError(format!("{}", e)))?;
126
127 let mut ftps_client = FtpClient::new_tls_client(stream);
128 ftps_client.ssl_cfg = Some((config, domain));
129
130 ftps_client.send_command(Command::PBSZ, Some("0")).await?;
132 ftps_client.check_response(ftp_reply::COMMAND_OK)?;
133
134 ftps_client.send_command(Command::PROT, Some("P")).await?;
135 ftps_client.check_response(ftp_reply::COMMAND_OK)?;
136 Ok(ftps_client)
137 }
138
139 #[cfg(feature = "ftps")]
140 async fn exe_auth_tls(mut self) -> Result<()> {
141 self.send_command(Command::AUTH, Some("TLS")).await?;
142 self.check_response_in(&[ftp_reply::AUTH_OK, ftp_reply::SECURITY_MECHANISM_IS_OK])
143 }
144
145 #[cfg(feature = "ftps")]
146 pub async fn exe_auth(mut self, mechanism: &str) -> Result<u32> {
148 Ok(self.send_command(Command::AUTH, Some(mechanism)).await?)
149 }
150
151 #[cfg(feature = "ftps")]
152 pub async fn exec_pbsz(mut self, pbsz: u64) -> Result<()> {
155 if pbsz < 0 || 4294967295 < pbsz {
156 Err(FtpError::InvalidArgument(format!(
157 "Invalid pbsz value. The correct value should be: {} to {}",
158 0, "(2^32)-1"
159 )))
160 }
161 self.send_command(Command::PBSZ, Some(pbsz.to_string().as_str()))
162 .await?;
163 self.check_response_in(&[ftp_reply::COMMAND_OK])?;
164 Ok(())
165 }
166
167 #[cfg(feature = "ftps")]
168 pub async fn exec_adat(mut self, data: Option<&[u8]>) -> Result<u32> {
170 let mut args = None;
171 if data.is_some() {
172 args = Some(base64::encode(data).as_str());
173 }
174 Ok(self.send_command(Command::ADAT, args).await?)
175 }
176
177 #[cfg(feature = "ftps")]
178 pub async fn exec_conf(mut self, data: Option<&[u8]>) -> Result<u32> {
180 let mut args = None;
181 if data.is_some() {
182 args = Some(base64::encode(data).as_str());
183 }
184 Ok(self.send_command(Command::CONF, args).await?)
185 }
186
187 #[cfg(feature = "ftps")]
188 pub async fn exec_enc(mut self, data: Option<&[u8]>) -> Result<u32> {
190 let mut args = None;
191 if data.is_some() {
192 args = Some(base64::encode(data).as_str());
193 }
194 Ok(self.send_command(Command::ENC, args).await?)
195 }
196
197 #[cfg(feature = "ftps")]
198 pub async fn exec_mic(mut self, data: Option<&[u8]>) -> Result<u32> {
200 let mut args = None;
201 if data.is_some() {
202 args = Some(base64::encode(data).as_str());
203 }
204 Ok(self.send_command(Command::MIC, args).await?)
205 }
206
207 #[cfg(feature = "ftps")]
208 pub async fn exec_prot(mut self, prot: &mut str) -> Result<()> {
210 let mut p = prot.as_mut();
211 if p.is_empty() {
212 p = &mut "C";
213 }
214 if !PROT_COMMAND_VALUE.contains(&&*p) {
215 Err(FtpError::InvalidArgument(format!(
216 "Unsupported prot command value",
217 )))
218 }
219 self.send_command(Command::PROT, Some(p)).await?;
220 self.check_response_in(&[ftp_reply::COMMAND_OK])?;
221 Ok(())
222 }
223
224 #[cfg(feature = "ftps")]
249 pub async fn into_insecure(mut self) -> Result<FtpClient> {
250 self.send_command(Command::CCC, None).await?;
251 if self._reply_code == ftp_reply::COMMAND_OK {
252 Ok(FtpClient::new(self.stream.into_inner().into_tcp_stream()))
253 }
254 Err(FtpError::InvalidResponse(format!(
255 "Expected code {:?}, got response: {}",
256 ftp_reply::COMMAND_OK,
257 self._reply_string.unwrap()
258 )))
259 }
260
261 async fn data_command(&mut self, cmd: &str) -> Result<Connection> {
263 let addr = self.pasv().await?;
264 let stream = TcpStream::connect(addr)
265 .await
266 .map_err(FtpError::ConnectionError)?;
267
268 #[cfg(feature = "ftps")]
269 match &self.ssl_cfg {
270 Some((config, domain)) => {
271 let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
272 return connector
273 .connect(domain.to_owned(), stream)
274 .await
275 .map(|stream| Connection::Ssl(stream))
276 .map_err(|e| FtpError::SecureError(format!("{}", e)));
277 }
278 _ => {}
279 };
280 self.write_str(cmd).await?;
281 self.read_reply().await?;
282 Ok(Connection::Tcp(stream))
283 }
284
285 pub fn get_ref(&self) -> &TcpStream {
300 self.stream.get_ref().get_ref()
301 }
302
303 pub fn get_welcome_msg(&self) -> Option<&str> {
305 self.welcome_msg.as_deref()
306 }
307
308 pub async fn login(&mut self, user: &str, password: &str) -> Result<bool> {
310 self.send_command(Command::USER, Some(user)).await?;
311 if ftp_reply::is_positive_completion(self._reply_code) {
312 return Ok(true);
313 } else if !ftp_reply::is_positive_intermediate(self._reply_code) {
314 return Ok(false);
315 }
316 self.send_command(Command::PASS, Some(password)).await?;
317 Ok(ftp_reply::is_positive_completion(self._reply_code))
318 }
319
320 pub async fn cwd(&mut self, path: &str) -> Result<bool> {
322 self.send_command(Command::CWD, Some(path)).await?;
323 Ok(ftp_reply::is_positive_completion(self._reply_code))
324 }
325
326 pub async fn cdup(&mut self) -> Result<bool> {
328 self.send_command(Command::CDUP, None).await?;
329 Ok(ftp_reply::is_positive_completion(self._reply_code))
330 }
331
332 pub async fn pwd(&mut self) -> Result<String> {
334 self.send_command(Command::PWD, None).await?;
335 match &self._reply_string {
336 None => {
337 let cause = format!("Cannot get PWD Response from FTP server");
338 Err(FtpError::InvalidResponse(cause))
339 }
340 Some(content) => match (content.find('"'), content.rfind('"')) {
341 (Some(begin), Some(end)) if begin < end => Ok(content[begin + 1..end].to_string()),
342 _ => {
343 let cause = format!("Invalid PWD Response: {}", content);
344 Err(FtpError::InvalidResponse(cause))
345 }
346 },
347 }
348 }
349
350 pub async fn noop(&mut self) -> Result<bool> {
352 self.send_command(Command::NOOP, None).await?;
353 Ok(ftp_reply::is_positive_completion(self._reply_code))
354 }
355
356 pub async fn make_directory(&mut self, pathname: &str) -> Result<bool> {
358 match ftp_reply::is_positive_completion(self.mkd(pathname).await?) {
359 true => Ok(true),
360 false => {
361 return Err(FtpError::InvalidResponse(format!(
362 "Got error reply: {}",
363 self._reply_string.as_ref().unwrap()
364 )));
365 }
366 }
367 }
368
369 pub async fn mkd(&mut self, pathname: &str) -> Result<u32> {
371 Ok(self.send_command(Command::MKD, Some(pathname)).await?)
372 }
373
374 pub async fn acct(&mut self, account: &str) -> Result<u32> {
376 Ok(self.send_command(Command::ACCT, Some(account)).await?)
377 }
378
379 pub async fn abor(&mut self) -> Result<u32> {
381 Ok(self.send_command(Command::ABOR, None).await?)
382 }
383
384 pub async fn rein(&mut self) -> Result<u32> {
386 Ok(self.send_command(Command::REIN, None).await?)
387 }
388
389 pub async fn smnt(&mut self, dir: &str) -> Result<u32> {
391 Ok(self.send_command(Command::SMNT, Some(dir)).await?)
392 }
393
394 pub async fn epsv(&mut self) -> Result<u32> {
396 Ok(self.send_command(Command::EPSV, None).await?)
397 }
398
399 pub async fn type_cmd(&mut self, file_type: u32) -> Result<u32> {
401 let s = MODES.substring(file_type as usize, (file_type + 1) as usize);
402 Ok(self.send_command(Command::TYPE, Some(s)).await?)
403 }
404
405 pub async fn stru(&mut self, structure: u32) -> Result<u32> {
407 let s = MODES.substring(structure as usize, (structure + 1) as usize);
408 Ok(self.send_command(Command::STRU, Some(s)).await?)
409 }
410
411 pub async fn mode(&mut self, mode: u32) -> Result<u32> {
413 let s = MODES.substring(mode as usize, (mode + 1) as usize);
414 Ok(self.send_command(Command::MODE, Some(s)).await?)
415 }
416
417 pub async fn stou(&mut self) -> Result<u32> {
419 Ok(self.send_command(Command::STOU, None).await?)
420 }
421
422 pub async fn stou_pathname(&mut self, pathname: &str) -> Result<u32> {
424 Ok(self.send_command(Command::STOU, Some(pathname)).await?)
425 }
426
427 pub async fn appe(&mut self, pathname: &str) -> Result<u32> {
429 Ok(self.send_command(Command::APPE, Some(pathname)).await?)
430 }
431
432 pub async fn allo(&mut self, bytes: u32) -> Result<u32> {
434 Ok(self
435 .send_command(Command::ALLO, Some(bytes.to_string().as_str()))
436 .await?)
437 }
438
439 pub async fn allo_record_size(&mut self, bytes: u32, record_size: u32) -> Result<u32> {
441 let args = format!(
442 "{} R {}",
443 bytes.to_string().as_str(),
444 record_size.to_string().as_str()
445 );
446 Ok(self
447 .send_command(Command::ALLO, Some(args.as_str()))
448 .await?)
449 }
450
451 pub async fn port(&mut self, host: IpAddr, port: u16) -> Result<u32> {
453 let mut args = String::with_capacity(24);
454 args.push_str(host.to_string().replace('.', ",").as_str());
455 args.push_str(",");
456 args.push_str((port >> 8).to_string().as_str());
457 args.push_str(",");
458 args.push_str((port & 0xff).to_string().as_str());
459 Ok(self
460 .send_command(Command::PORT, Some(args.as_str()))
461 .await?)
462 }
463
464 pub async fn eprt(&mut self, host: IpAddr, port: u16) -> Result<u32> {
468 let mut args = String::new();
469 let mut h = host.to_string();
470 let n = h.find("%").unwrap_or(0);
471 if n > 0 {
472 h = h.substring(0, n).to_string();
473 }
474 args.push_str("|");
475 match host {
476 IpAddr::V4(addr) => args.push_str("1"),
477 IpAddr::V6(addr) => args.push_str("2"),
478 }
479 args.push_str("|");
480 args.push_str(h.as_str());
481 args.push_str("|");
482 args.push_str(port.to_string().as_str());
483 args.push_str("|");
484 Ok(self
485 .send_command(Command::EPRT, Some(args.as_str()))
486 .await?)
487 }
488
489 pub async fn mfmt(&mut self, pathname: &str, timeval: &str) -> Result<u32> {
491 Ok(self
492 .send_command(
493 Command::MFMT,
494 Some(format!("{} {}", timeval, pathname).as_str()),
495 )
496 .await?)
497 }
498
499 async fn pasv(&mut self) -> Result<SocketAddr> {
501 self.send_command(Command::PASV, None).await?;
502 self.check_response(ftp_reply::PASSIVE_MODE)?;
503 let reply_str = self._reply_string.clone().unwrap();
504 let reply_str = reply_str.as_str();
505 PORT_RE
506 .captures(reply_str)
507 .ok_or_else(|| {
508 FtpError::InvalidResponse(format!("Invalid PASV response: {}", reply_str))
509 })
510 .and_then(|caps| {
511 let (oct1, oct2, oct3, oct4) = (
513 caps[1].parse::<u8>().unwrap(),
514 caps[2].parse::<u8>().unwrap(),
515 caps[3].parse::<u8>().unwrap(),
516 caps[4].parse::<u8>().unwrap(),
517 );
518 let (msb, lsb) = (
519 caps[5].parse::<u8>().unwrap(),
520 caps[6].parse::<u8>().unwrap(),
521 );
522 let port = ((msb as u16) << 8) + lsb as u16;
523
524 use std::net::{IpAddr, Ipv4Addr};
525
526 let ip = if (oct1, oct2, oct3, oct4) == (0, 0, 0, 0) {
527 self.get_ref()
528 .peer_addr()
529 .map_err(FtpError::ConnectionError)?
530 .ip()
531 } else {
532 IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
533 };
534 Ok(SocketAddr::new(ip, port))
535 })
536 }
537
538 pub async fn transfer_type(&mut self, file_type: FileType) -> Result<bool> {
541 self.send_command(Command::TYPE, Some(file_type.to_string().as_str()))
544 .await?;
545 Ok(ftp_reply::is_positive_completion(self._reply_code))
546 }
547
548 pub async fn logout(&mut self) -> Result<bool> {
550 self.send_command(Command::QUIT, None).await?;
551 Ok(ftp_reply::is_positive_completion(self._reply_code))
552 }
553
554 pub async fn restart_from(&mut self, offset: u64) -> Result<bool> {
556 self.send_command(Command::REST, Some(offset.to_string().as_str()))
557 .await?;
558 Ok(ftp_reply::is_positive_intermediate(self._reply_code))
559 }
560
561 pub async fn get(&mut self, file_name: &str) -> Result<BufStream<Connection>> {
566 let retr_command = format!("RETR {}\r\n", file_name);
567 let data_stream = BufStream::new(self.data_command(&retr_command).await?);
568 self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
569 Ok(data_stream)
570 }
571
572 pub async fn rename(&mut self, from_name: &str, to_name: &str) -> Result<bool> {
574 self.send_command(Command::RNFR, Some(from_name)).await?;
575 if !ftp_reply::is_positive_intermediate(self._reply_code) {
576 return Ok(false);
577 }
578 self.send_command(Command::RNTO, Some(to_name)).await?;
579 Ok(ftp_reply::is_positive_completion(self._reply_code))
580 }
581
582 pub async fn retr<F, T, P, E>(&mut self, filename: &str, reader: F) -> std::result::Result<T, E>
611 where
612 F: Fn(BufStream<Connection>) -> P,
613 P: std::future::Future<Output = std::result::Result<T, E>>,
614 E: From<FtpError>,
615 {
616 let retr_command = format!("{} {}\r\n", cmd::Command::RETR.cmd_name(), filename);
617 let data_stream = BufStream::new(self.data_command(&retr_command).await?);
618 self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
619 let res = reader(data_stream).await?;
620 Ok(res)
621 }
622
623 pub async fn simple_retr(&mut self, file_name: &str) -> Result<std::io::Cursor<Vec<u8>>> {
643 async fn do_read(mut reader: BufStream<Connection>) -> Result<Vec<u8>> {
644 let mut buffer = Vec::new();
645 reader
646 .read_to_end(&mut buffer)
647 .await
648 .map_err(FtpError::ConnectionError)?;
649
650 Ok(buffer)
651 }
652
653 let buffer = self.retr(file_name, do_read).await?;
654 Ok(std::io::Cursor::new(buffer))
655 }
656
657 pub async fn remove_directory(&mut self, pathname: &str) -> Result<bool> {
658 Ok(ftp_reply::is_positive_completion(self.rmd(pathname).await?))
659 }
660
661 pub async fn rmd(&mut self, pathname: &str) -> Result<u32> {
663 Ok(self.send_command(Command::RMD, Some(pathname)).await?)
664 }
665
666 pub async fn delete_file(&mut self, filename: &str) -> Result<bool> {
667 Ok(ftp_reply::is_positive_completion(
668 self.dele(filename).await?,
669 ))
670 }
671
672 pub async fn dele(&mut self, filename: &str) -> Result<u32> {
674 Ok(self.send_command(Command::DELE, Some(filename)).await?)
675 }
676
677 async fn put_file<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
678 let stor_command = format!("{} {}\r\n", Command::STOR, filename);
679 let mut data_stream = BufStream::new(self.data_command(&stor_command).await?);
680 self.check_response_in(&[ftp_reply::ALREADY_OPEN, ftp_reply::ABOUT_TO_SEND])?;
681 copy(r, &mut data_stream)
682 .await
683 .map_err(FtpError::ConnectionError)?;
684 Ok(())
685 }
686
687 pub async fn send_command(&mut self, cmd: cmd::Command, agrs: Option<&str>) -> Result<u32> {
689 let mut ftp_cmd = format!("{}\r\n", cmd.cmd_name());
690 if agrs.is_some() {
691 ftp_cmd = format!("{} {}\r\n", cmd.cmd_name(), agrs.unwrap());
692 }
693 self.write_str(ftp_cmd).await?;
694 self.read_reply().await?;
695 Ok(self._reply_code)
696 }
697
698 async fn read_reply(&mut self) -> Result<()> {
699 self._reply_lines.clear();
700 self._reply_string = None;
701 let mut line = String::new();
702 self.stream
703 .read_line(&mut line)
704 .await
705 .map_err(FtpError::ConnectionError)?;
706
707 if line.len() < REPLY_CODE_LEN {
708 return Err(FtpError::InvalidResponse(format!(
709 "Truncated server reply: {}",
710 line
711 )));
712 }
713
714 if line.len() < 5 {
715 return Err(FtpError::InvalidResponse(
716 "error: could not read reply code".to_owned(),
717 ));
718 }
719
720 let reply_code: u32 = line[0..3].parse().map_err(|_err| {
721 FtpError::InvalidResponse(format!(
722 "Could not parse reply code. \n Server Reply: {}",
723 line
724 ))
725 })?;
726 self._reply_code = reply_code;
727 self._reply_lines.push(line.as_str().to_string());
728 let expected = format!("{} ", &line[0..3]);
729 while line.len() < 5 || line[0..4] != expected {
730 line.clear();
731 if let Err(e) = self.stream.read_line(&mut line).await {
732 return Err(FtpError::ConnectionError(e));
733 }
734 self._reply_lines.push(line.as_str().to_string());
735 }
736 let mut s = String::new();
737 for x in self._reply_lines.iter() {
738 s.push_str(x.as_str())
739 }
740 self._reply_string = Some(s);
741 Ok(())
742 }
743
744 pub async fn put<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
746 self.put_file(filename, r).await?;
747 self.check_response_in(&[
748 ftp_reply::CLOSING_DATA_CONNECTION,
749 ftp_reply::REQUESTED_FILE_ACTION_OK,
750 ])?;
751 Ok(())
752 }
753
754 async fn list_command(
756 &mut self,
757 cmd: Cow<'_, str>,
758 open_code: u32,
759 close_code: &[u32],
760 ) -> Result<Vec<String>> {
761 let data_stream = BufStream::new(self.data_command(&cmd).await?);
762 self.check_response_in(&[open_code, ftp_reply::ALREADY_OPEN])?;
763 let lines = Self::get_lines_from_stream(data_stream).await?;
764 self.check_response_in(close_code)?;
765 Ok(lines)
766 }
767
768 async fn get_lines_from_stream<R>(data_stream: R) -> Result<Vec<String>>
770 where
771 R: AsyncBufRead + Unpin,
772 {
773 let mut lines: Vec<String> = Vec::new();
774
775 let mut lines_stream = data_stream.lines();
776 loop {
777 let line = lines_stream
778 .next_line()
779 .await
780 .map_err(FtpError::ConnectionError)?;
781
782 match line {
783 Some(line) => {
784 if line.is_empty() {
785 continue;
786 }
787 lines.push(line);
788 }
789 None => break Ok(lines),
790 }
791 }
792 }
793
794 pub async fn list(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
798 let command = pathname.map_or("LIST\r\n".into(), |path| {
799 format!("LIST {}\r\n", path).into()
800 });
801
802 self.list_command(
803 command,
804 ftp_reply::ABOUT_TO_SEND,
805 &[
806 ftp_reply::CLOSING_DATA_CONNECTION,
807 ftp_reply::REQUESTED_FILE_ACTION_OK,
808 ],
809 )
810 .await
811 }
812
813 pub async fn nlst(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
817 let command = pathname.map_or("NLST\r\n".into(), |path| {
818 format!("NLST {}\r\n", path).into()
819 });
820
821 self.list_command(
822 command,
823 ftp_reply::ABOUT_TO_SEND,
824 &[
825 ftp_reply::CLOSING_DATA_CONNECTION,
826 ftp_reply::REQUESTED_FILE_ACTION_OK,
827 ],
828 )
829 .await
830 }
831
832 pub async fn mdtm(&mut self, pathname: &str) -> Result<Option<DateTime<Utc>>> {
835 self.send_command(Command::MDTM, Some(pathname)).await?;
836 self.check_response(ftp_reply::FILE)?;
837 let reply_str = self._reply_string.clone().unwrap();
838 let reply_str = reply_str.as_str();
839 match MDTM_RE.captures(reply_str) {
840 Some(caps) => {
841 let (year, month, day) = (
842 caps[1].parse::<i32>().unwrap(),
843 caps[2].parse::<u32>().unwrap(),
844 caps[3].parse::<u32>().unwrap(),
845 );
846 let (hour, minute, second) = (
847 caps[4].parse::<u32>().unwrap(),
848 caps[5].parse::<u32>().unwrap(),
849 caps[6].parse::<u32>().unwrap(),
850 );
851 Ok(Some(
852 Utc.ymd(year, month, day).and_hms(hour, minute, second),
853 ))
854 }
855 None => Ok(None),
856 }
857 }
858
859 pub async fn size(&mut self, pathname: &str) -> Result<Option<usize>> {
862 self.send_command(Command::SIZE, Some(pathname)).await?;
863 self.check_response(ftp_reply::FILE)?;
864 let reply_str = self._reply_string.clone().unwrap();
865 let reply_str = reply_str.as_str();
866 match SIZE_RE.captures(reply_str) {
867 Some(caps) => Ok(Some(caps[1].parse().unwrap())),
868 None => Ok(None),
869 }
870 }
871
872 pub async fn feat(&mut self) -> Result<u32> {
873 self._reply_lines.clear();
874 Ok(self.send_command(Command::FEAT, None).await?)
875 }
876
877 pub async fn features(&mut self, cmd: Command) -> Result<Option<Vec<String>>> {
878 let features = Vec::new();
879 if self.init_feature_map().await? {
880 let values = self.features_map.get(cmd.cmd_name());
881 if values.is_some() {
882 return Ok(Some(values.unwrap().clone()));
883 }
884 }
885 Ok(Some(features))
886 }
887
888 async fn init_feature_map(&mut self) -> Result<bool> {
889 if self.features_map.is_empty() {
890 let reply_code = self.feat().await?;
891 if reply_code == ftp_reply::NOT_LOGGED_IN.into() {
892 return Ok(false);
893 }
894 let success = ftp_reply::is_positive_completion(reply_code);
895 if !success {
896 return Ok(false);
897 }
898 for l in self._reply_lines.iter() {
899 if l.starts_with(" ") {
900 let mut key = "";
901 let mut value = "";
902 let s = &l[1..l.len() - 1];
903 let varsep = s.find(' ');
904 if varsep.is_some() {
905 key = &l[1..varsep.unwrap() + 1];
906 value = &l[varsep.unwrap() + 1..l.len()];
907 } else {
908 key = &l[1..l.len() - 1]
909 }
910 let entries = self.features_map.get_mut(key);
911 match entries {
912 None => {
913 let mut features = vec![];
914 features.push(String::from(value));
915 self.features_map.insert(key.to_string(), features);
916 }
917 Some(features) => {
918 features.push(value.to_string());
919 }
920 }
921 }
922 }
923 }
924 Ok(true)
925 }
926
927 async fn write_str<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
928 let conn = self.stream.get_mut();
929 conn.write_all(command.as_ref().as_bytes())
930 .await
931 .map_err(FtpError::ConnectionError)
932 }
933
934 pub fn check_response(&mut self, expected_code: u32) -> Result<()> {
935 self.check_response_in(&[expected_code])
936 }
937
938 pub fn check_response_in(&mut self, expected_code: &[u32]) -> Result<()> {
940 let reply_string = self._reply_string.clone();
941 if expected_code.iter().any(|ec| self._reply_code == *ec) {
942 Ok(())
943 } else {
944 Err(FtpError::InvalidResponse(format!(
945 "Expected code {:?}, got response: {}",
946 expected_code,
947 reply_string.unwrap().as_str()
948 )))
949 }
950 }
951}
952
953#[cfg(test)]
954mod tests {
955 use tokio_stream::once;
956 use tokio_util::io::StreamReader;
957
958 use super::FtpClient;
959
960 #[tokio::test]
961 async fn list_command_dos_newlines() {
962 let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
963 b"Hello\r\nWorld\r\n\r\nBe\r\nHappy\r\n" as &[u8],
964 )));
965
966 assert_eq!(
967 FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
968 ["Hello", "World", "Be", "Happy"]
969 .iter()
970 .map(<&str>::to_string)
971 .collect::<Vec<_>>()
972 );
973 }
974
975 #[tokio::test]
976 async fn list_command_unix_newlines() {
977 let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
978 b"Hello\nWorld\n\nBe\nHappy\n" as &[u8],
979 )));
980
981 assert_eq!(
982 FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
983 ["Hello", "World", "Be", "Happy"]
984 .iter()
985 .map(<&str>::to_string)
986 .collect::<Vec<_>>()
987 );
988 }
989}