commons_net/ftp/
ftp_client.rs

1//! FTP module.
2use std::borrow::{Borrow, Cow};
3use std::collections::{HashMap, HashSet};
4use std::fmt::format;
5use std::net::{IpAddr, SocketAddr};
6use std::string::String;
7
8use chrono::offset::TimeZone;
9use chrono::{DateTime, Utc};
10use regex::Regex;
11use tokio::io::{
12    copy, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufStream,
13};
14use tokio::net::{TcpStream, ToSocketAddrs};
15#[cfg(feature = "ftps")]
16use tokio_rustls::{client::TlsStream, rustls::ClientConfig, rustls::ServerName, TlsConnector};
17
18use crate::ftp::connection::Connection;
19use crate::ftp::{Command, ftp_reply, MODES, REPLY_CODE_LEN};
20use crate::ftp::types::{FileType, FtpError, Result};
21use crate::StringExt;
22
23lazy_static::lazy_static! {
24    // This regex extracts IP and Port details from PASV command response.
25    // The regex looks for the pattern (h1,h2,h3,h4,p1,p2).
26    static ref PORT_RE: Regex = Regex::new(r"\((\d+),(\d+),(\d+),(\d+),(\d+),(\d+)\)").unwrap();
27
28    // This regex extracts modification time from MDTM command response.
29    static ref MDTM_RE: Regex = Regex::new(r"\b(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\b").unwrap();
30
31    // This regex extracts File size from SIZE command response.
32    static ref SIZE_RE: Regex = Regex::new(r"\s+(\d+)\s*$").unwrap();
33    static ref PROT_COMMAND_VALUE: Vec<&'static str> = vec!["C","E","S","P"];
34}
35
36pub struct FtpClient {
37    stream: BufStream<Connection>,
38    welcome_msg: Option<String>,
39    _reply_code: u32,
40    _reply_string: Option<String>,
41    _reply_lines: Vec<String>,
42    #[cfg(feature = "ftps")]
43    ssl_cfg: Option<(ClientConfig, ServerName)>,
44    features_map: HashMap<String, Vec<String>>,
45}
46
47impl FtpClient {
48    fn new(stream: TcpStream) -> Self {
49        FtpClient {
50            stream: BufStream::new(Connection::Tcp(stream)),
51            #[cfg(feature = "ftps")]
52            ssl_cfg: None,
53            welcome_msg: None,
54            _reply_code: 0,
55            _reply_string: None,
56            features_map: HashMap::new(),
57            _reply_lines: vec![],
58        }
59    }
60
61    #[cfg(feature = "ftps")]
62    fn new_tls_client(stream: TlsStream<TcpStream>) -> Self {
63        FtpClient {
64            stream: BufStream::new(Connection::Ssl(stream)),
65            ssl_cfg: None,
66            welcome_msg: None,
67            _reply_code: 0,
68            _reply_string: None,
69            features_map: HashMap::new(),
70            _reply_lines: vec![],
71        }
72    }
73
74    pub fn init_default(&mut self) {}
75
76    /// Creates an FTP Client.
77    pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpClient> {
78        let stream = TcpStream::connect(addr)
79            .await
80            .map_err(FtpError::ConnectionError)?;
81
82        let mut ftp_client = FtpClient::new(stream);
83        ftp_client.read_reply().await?;
84        ftp_client.check_response(ftp_reply::READY)?;
85        ftp_client.welcome_msg = Some(ftp_client._reply_string.clone().unwrap());
86        Ok(ftp_client)
87    }
88
89    /// Switch to a secure mode if possible, using a provided SSL configuration.
90    /// This method does nothing if the connect is already secured.
91    ///
92    /// ## Panics
93    ///
94    /// Panics if the plain TCP connection cannot be switched to TLS mode.
95    ///
96    /// ## Example
97    ///
98    /// ```rust,no_run
99    /// use std::convert::TryFrom;
100    /// use std::path::Path;
101    /// use tokio_rustls::rustls::{ClientConfig, RootCertStore, ServerName};
102    /// use commons_net::ftp::ftp_client::FtpClient;
103    ///
104    /// let mut root_store = RootCertStore::empty();
105    /// // root_store.add_pem_file(...);
106    /// let conf = ClientConfig::builder().with_safe_defaults().with_root_certificates(root_store).with_no_client_auth();
107    /// let domain = ServerName::try_from("www.cert-domain.com").expect("invalid DNS name");
108    /// async {
109    ///   let mut ftp_client = FtpClient::connect("192.168.32.204:21").await.unwrap();
110    ///   let mut ftp_client = ftp_client.into_secure(conf, domain).await.unwrap();
111    /// };
112    /// ```
113    #[cfg(feature = "ftps")]
114    pub async fn into_secure(
115        mut self,
116        config: ClientConfig,
117        domain: ServerName,
118    ) -> Result<FtpClient> {
119        self.exe_auth_tls().await?;
120
121        let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
122        let stream = connector
123            .connect(domain.clone(), self.stream.into_inner().into_tcp_stream())
124            .await
125            .map_err(|e| FtpError::SecureError(format!("{}", e)))?;
126
127        let mut ftps_client = FtpClient::new_tls_client(stream);
128        ftps_client.ssl_cfg = Some((config, domain));
129
130        // Set protection buffer size
131        ftps_client.send_command(Command::PBSZ, Some("0")).await?;
132        ftps_client.check_response(ftp_reply::COMMAND_OK)?;
133
134        ftps_client.send_command(Command::PROT, Some("P")).await?;
135        ftps_client.check_response(ftp_reply::COMMAND_OK)?;
136        Ok(ftps_client)
137    }
138
139    #[cfg(feature = "ftps")]
140    async fn exe_auth_tls(mut self) -> Result<()> {
141        self.send_command(Command::AUTH, Some("TLS")).await?;
142        self.check_response_in(&[ftp_reply::AUTH_OK, ftp_reply::SECURITY_MECHANISM_IS_OK])
143    }
144
145    #[cfg(feature = "ftps")]
146    /// Send the AUTH command with the specified mechanism.
147    pub async fn exe_auth(mut self, mechanism: &str) -> Result<u32> {
148        Ok(self.send_command(Command::AUTH, Some(mechanism)).await?)
149    }
150
151    #[cfg(feature = "ftps")]
152    /// PBSZ command. pbsz value: 0 to (2^32)-1 decimal integer.
153    /// * pbsz: Protection Buffer Size
154    pub async fn exec_pbsz(mut self, pbsz: u64) -> Result<()> {
155        if pbsz < 0 || 4294967295 < pbsz {
156            Err(FtpError::InvalidArgument(format!(
157                "Invalid pbsz value. The correct value should be: {} to {}",
158                0, "(2^32)-1"
159            )))
160        }
161        self.send_command(Command::PBSZ, Some(pbsz.to_string().as_str()))
162            .await?;
163        self.check_response_in(&[ftp_reply::COMMAND_OK])?;
164        Ok(())
165    }
166
167    #[cfg(feature = "ftps")]
168    /// Send the ADAT command with the specified authentication data.
169    pub async fn exec_adat(mut self, data: Option<&[u8]>) -> Result<u32> {
170        let mut args = None;
171        if data.is_some() {
172            args = Some(base64::encode(data).as_str());
173        }
174        Ok(self.send_command(Command::ADAT, args).await?)
175    }
176
177    #[cfg(feature = "ftps")]
178    /// Send the CONF command with the specified authentication data.
179    pub async fn exec_conf(mut self, data: Option<&[u8]>) -> Result<u32> {
180        let mut args = None;
181        if data.is_some() {
182            args = Some(base64::encode(data).as_str());
183        }
184        Ok(self.send_command(Command::CONF, args).await?)
185    }
186
187    #[cfg(feature = "ftps")]
188    /// Send the ENC command with the specified authentication data.
189    pub async fn exec_enc(mut self, data: Option<&[u8]>) -> Result<u32> {
190        let mut args = None;
191        if data.is_some() {
192            args = Some(base64::encode(data).as_str());
193        }
194        Ok(self.send_command(Command::ENC, args).await?)
195    }
196
197    #[cfg(feature = "ftps")]
198    /// Send the MIC command with the specified authentication data.
199    pub async fn exec_mic(mut self, data: Option<&[u8]>) -> Result<u32> {
200        let mut args = None;
201        if data.is_some() {
202            args = Some(base64::encode(data).as_str());
203        }
204        Ok(self.send_command(Command::MIC, args).await?)
205    }
206
207    #[cfg(feature = "ftps")]
208    /// Send the MIC command with the specified authentication data.
209    pub async fn exec_prot(mut self, prot: &mut str) -> Result<()> {
210        let mut p = prot.as_mut();
211        if p.is_empty() {
212            p = &mut "C";
213        }
214        if !PROT_COMMAND_VALUE.contains(&&*p) {
215            Err(FtpError::InvalidArgument(format!(
216                "Unsupported prot command value",
217            )))
218        }
219        self.send_command(Command::PROT, Some(p)).await?;
220        self.check_response_in(&[ftp_reply::COMMAND_OK])?;
221        Ok(())
222    }
223
224    /// Switch to insecure mode. If the connection is already
225    /// insecure does nothing.
226    ///
227    /// ## Example
228    ///
229    /// ```rust,no_run
230    /// use std::convert::TryFrom;
231    /// use std::path::Path;
232    /// use tokio_rustls::rustls::{ClientConfig, RootCertStore, ServerName};
233    /// use commons_net::ftp::ftp_client::FtpClient;
234    ///
235    /// let mut root_store = RootCertStore::empty();
236    /// // root_store.add_pem_file(...);
237    /// let conf = ClientConfig::builder().with_safe_defaults().with_root_certificates(root_store).with_no_client_auth();
238    /// let domain = ServerName::try_from("www.cert-domain.com").expect("invalid DNS name");
239    /// async {
240    ///   let mut ftp_client = FtpClient::connect("192.168.32.204:21").await.unwrap();
241    ///   let mut ftp_client = ftp_client.into_secure(conf, domain).await.unwrap();
242    ///   // Switch back to the insecure mode
243    ///   let mut ftp_client = ftp_client.into_insecure().await.unwrap();
244    ///   // Do all public things
245    ///   let _ = ftp_client.quit();
246    /// };
247    /// ```
248    #[cfg(feature = "ftps")]
249    pub async fn into_insecure(mut self) -> Result<FtpClient> {
250        self.send_command(Command::CCC, None).await?;
251        if self._reply_code == ftp_reply::COMMAND_OK {
252            Ok(FtpClient::new(self.stream.into_inner().into_tcp_stream()))
253        }
254        Err(FtpError::InvalidResponse(format!(
255            "Expected code {:?}, got response: {}",
256            ftp_reply::COMMAND_OK,
257            self._reply_string.unwrap()
258        )))
259    }
260
261    /// Execute command which send data back in a separate stream
262    async fn data_command(&mut self, cmd: &str) -> Result<Connection> {
263        let addr = self.pasv().await?;
264        let stream = TcpStream::connect(addr)
265            .await
266            .map_err(FtpError::ConnectionError)?;
267
268        #[cfg(feature = "ftps")]
269        match &self.ssl_cfg {
270            Some((config, domain)) => {
271                let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
272                return connector
273                    .connect(domain.to_owned(), stream)
274                    .await
275                    .map(|stream| Connection::Ssl(stream))
276                    .map_err(|e| FtpError::SecureError(format!("{}", e)));
277            }
278            _ => {}
279        };
280        self.write_str(cmd).await?;
281        self.read_reply().await?;
282        Ok(Connection::Tcp(stream))
283    }
284
285    /// Returns a reference to the underlying TcpStream.
286    ///
287    /// Example:
288    /// ```no_run
289    /// use tokio::net::TcpStream;
290    /// use std::time::Duration;
291    /// use commons_net::ftp::ftp_client::FtpClient;
292    ///
293    /// async {
294    ///   let client = FtpClient::connect("192.168.32.204:21").await
295    ///                          .expect("Couldn't connect to the server...");
296    ///   let s: &TcpStream = client.get_ref();
297    /// };
298    /// ```
299    pub fn get_ref(&self) -> &TcpStream {
300        self.stream.get_ref().get_ref()
301    }
302
303    /// Get welcome message from the server on connect.
304    pub fn get_welcome_msg(&self) -> Option<&str> {
305        self.welcome_msg.as_deref()
306    }
307
308    /// Log in to the FTP server.
309    pub async fn login(&mut self, user: &str, password: &str) -> Result<bool> {
310        self.send_command(Command::USER, Some(user)).await?;
311        if ftp_reply::is_positive_completion(self._reply_code) {
312            return Ok(true);
313        } else if !ftp_reply::is_positive_intermediate(self._reply_code) {
314            return Ok(false);
315        }
316        self.send_command(Command::PASS, Some(password)).await?;
317        Ok(ftp_reply::is_positive_completion(self._reply_code))
318    }
319
320    /// Change the current Directory to the path specified.
321    pub async fn cwd(&mut self, path: &str) -> Result<bool> {
322        self.send_command(Command::CWD, Some(path)).await?;
323        Ok(ftp_reply::is_positive_completion(self._reply_code))
324    }
325
326    /// Move the current Directory to the parent Directory.
327    pub async fn cdup(&mut self) -> Result<bool> {
328        self.send_command(Command::CDUP, None).await?;
329        Ok(ftp_reply::is_positive_completion(self._reply_code))
330    }
331
332    /// Gets the current Directory
333    pub async fn pwd(&mut self) -> Result<String> {
334        self.send_command(Command::PWD, None).await?;
335        match &self._reply_string {
336            None => {
337                let cause = format!("Cannot get PWD Response from FTP server");
338                Err(FtpError::InvalidResponse(cause))
339            }
340            Some(content) => match (content.find('"'), content.rfind('"')) {
341                (Some(begin), Some(end)) if begin < end => Ok(content[begin + 1..end].to_string()),
342                _ => {
343                    let cause = format!("Invalid PWD Response: {}", content);
344                    Err(FtpError::InvalidResponse(cause))
345                }
346            },
347        }
348    }
349
350    /// This does nothing. This is usually just used to keep the connection open.
351    pub async fn noop(&mut self) -> Result<bool> {
352        self.send_command(Command::NOOP, None).await?;
353        Ok(ftp_reply::is_positive_completion(self._reply_code))
354    }
355
356    /// This creates a new Directory on the server.
357    pub async fn make_directory(&mut self, pathname: &str) -> Result<bool> {
358        match ftp_reply::is_positive_completion(self.mkd(pathname).await?) {
359            true => Ok(true),
360            false => {
361                return Err(FtpError::InvalidResponse(format!(
362                    "Got error reply: {}",
363                    self._reply_string.as_ref().unwrap()
364                )));
365            }
366        }
367    }
368
369    /// A convenience method to send the FTP MKD command to the server, receive the reply, and return the reply code.
370    pub async fn mkd(&mut self, pathname: &str) -> Result<u32> {
371        Ok(self.send_command(Command::MKD, Some(pathname)).await?)
372    }
373
374    /// A convenience method to send the FTP ACCT command to the server, receive the reply, and return the reply code.
375    pub async fn acct(&mut self, account: &str) -> Result<u32> {
376        Ok(self.send_command(Command::ACCT, Some(account)).await?)
377    }
378
379    /// A convenience method to send the FTP ABOR command to the server, receive the reply, and return the reply code.
380    pub async fn abor(&mut self) -> Result<u32> {
381        Ok(self.send_command(Command::ABOR, None).await?)
382    }
383
384    /// A convenience method to send the FTP REIN command to the server, receive the reply, and return the reply code.
385    pub async fn rein(&mut self) -> Result<u32> {
386        Ok(self.send_command(Command::REIN, None).await?)
387    }
388
389    /// A convenience method to send the FTP SMNT command to the server, receive the reply, and return the reply code.
390    pub async fn smnt(&mut self, dir: &str) -> Result<u32> {
391        Ok(self.send_command(Command::SMNT, Some(dir)).await?)
392    }
393
394    /// A convenience method to send the FTP EPSV command to the server, receive the reply, and return the reply code.
395    pub async fn epsv(&mut self) -> Result<u32> {
396        Ok(self.send_command(Command::EPSV, None).await?)
397    }
398
399    /// A convenience method to send the FTP TYPE command to the server, receive the reply, and return the reply code.
400    pub async fn type_cmd(&mut self, file_type: u32) -> Result<u32> {
401        let s = MODES.substring(file_type as usize, (file_type + 1) as usize);
402        Ok(self.send_command(Command::TYPE, Some(s)).await?)
403    }
404
405    /// A convenience method to send the FTP STRU command to the server, receive the reply, and return the reply code.
406    pub async fn stru(&mut self, structure: u32) -> Result<u32> {
407        let s = MODES.substring(structure as usize, (structure + 1) as usize);
408        Ok(self.send_command(Command::STRU, Some(s)).await?)
409    }
410
411    /// A convenience method to send the FTP MODE command to the server, receive the reply, and return the reply code.
412    pub async fn mode(&mut self, mode: u32) -> Result<u32> {
413        let s = MODES.substring(mode as usize, (mode + 1) as usize);
414        Ok(self.send_command(Command::MODE, Some(s)).await?)
415    }
416
417    /// A convenience method to send the FTP STOU command to the server, receive the reply, and return the reply code.
418    pub async fn stou(&mut self) -> Result<u32> {
419        Ok(self.send_command(Command::STOU, None).await?)
420    }
421
422    /// A convenience method to send the FTP STOU command to the server, receive the reply, and return the reply code.
423    pub async fn stou_pathname(&mut self, pathname: &str) -> Result<u32> {
424        Ok(self.send_command(Command::STOU, Some(pathname)).await?)
425    }
426
427    /// A convenience method to send the FTP APPE command to the server, receive the reply, and return the reply code.
428    pub async fn appe(&mut self, pathname: &str) -> Result<u32> {
429        Ok(self.send_command(Command::APPE, Some(pathname)).await?)
430    }
431
432    /// A convenience method to send the FTP ALLO command to the server, receive the reply, and return the reply code.
433    pub async fn allo(&mut self, bytes: u32) -> Result<u32> {
434        Ok(self
435            .send_command(Command::ALLO, Some(bytes.to_string().as_str()))
436            .await?)
437    }
438
439    /// A convenience method to send the FTP ALLO command to the server, receive the reply, and return the reply code.
440    pub async fn allo_record_size(&mut self, bytes: u32, record_size: u32) -> Result<u32> {
441        let args = format!(
442            "{} R {}",
443            bytes.to_string().as_str(),
444            record_size.to_string().as_str()
445        );
446        Ok(self
447            .send_command(Command::ALLO, Some(args.as_str()))
448            .await?)
449    }
450
451    /// A convenience method to send the FTP PORT command to the server, receive the reply, and return the reply code.
452    pub async fn port(&mut self, host: IpAddr, port: u16) -> Result<u32> {
453        let mut args = String::with_capacity(24);
454        args.push_str(host.to_string().replace('.', ",").as_str());
455        args.push_str(",");
456        args.push_str((port >> 8).to_string().as_str());
457        args.push_str(",");
458        args.push_str((port & 0xff).to_string().as_str());
459        Ok(self
460            .send_command(Command::PORT, Some(args.as_str()))
461            .await?)
462    }
463
464    /// A convenience method to send the FTP EPRT command to the server, receive the reply, and return the reply code.
465    /// * EPRT |1|132.235.1.2|6275|
466    /// * EPRT |2|1080::8:800:200C:417A|5282|
467    pub async fn eprt(&mut self, host: IpAddr, port: u16) -> Result<u32> {
468        let mut args = String::new();
469        let mut h = host.to_string();
470        let n = h.find("%").unwrap_or(0);
471        if n > 0 {
472            h = h.substring(0, n).to_string();
473        }
474        args.push_str("|");
475        match host {
476            IpAddr::V4(addr) => args.push_str("1"),
477            IpAddr::V6(addr) => args.push_str("2"),
478        }
479        args.push_str("|");
480        args.push_str(h.as_str());
481        args.push_str("|");
482        args.push_str(port.to_string().as_str());
483        args.push_str("|");
484        Ok(self
485            .send_command(Command::EPRT, Some(args.as_str()))
486            .await?)
487    }
488
489    /// A convenience method to send the FTP MFMT command to the server, receive the reply, and return the reply code.
490    pub async fn mfmt(&mut self, pathname: &str, timeval: &str) -> Result<u32> {
491        Ok(self
492            .send_command(
493                Command::MFMT,
494                Some(format!("{} {}", timeval, pathname).as_str()),
495            )
496            .await?)
497    }
498
499    /// Runs the PASV command.
500    async fn pasv(&mut self) -> Result<SocketAddr> {
501        self.send_command(Command::PASV, None).await?;
502        self.check_response(ftp_reply::PASSIVE_MODE)?;
503        let reply_str = self._reply_string.clone().unwrap();
504        let reply_str = reply_str.as_str();
505        PORT_RE
506            .captures(reply_str)
507            .ok_or_else(|| {
508                FtpError::InvalidResponse(format!("Invalid PASV response: {}", reply_str))
509            })
510            .and_then(|caps| {
511                // If the regex matches we can be sure groups contains numbers
512                let (oct1, oct2, oct3, oct4) = (
513                    caps[1].parse::<u8>().unwrap(),
514                    caps[2].parse::<u8>().unwrap(),
515                    caps[3].parse::<u8>().unwrap(),
516                    caps[4].parse::<u8>().unwrap(),
517                );
518                let (msb, lsb) = (
519                    caps[5].parse::<u8>().unwrap(),
520                    caps[6].parse::<u8>().unwrap(),
521                );
522                let port = ((msb as u16) << 8) + lsb as u16;
523
524                use std::net::{IpAddr, Ipv4Addr};
525
526                let ip = if (oct1, oct2, oct3, oct4) == (0, 0, 0, 0) {
527                    self.get_ref()
528                        .peer_addr()
529                        .map_err(FtpError::ConnectionError)?
530                        .ip()
531                } else {
532                    IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
533                };
534                Ok(SocketAddr::new(ip, port))
535            })
536    }
537
538    /// Sets the type of File to be transferred. That is the implementation
539    /// of `TYPE` command.
540    pub async fn transfer_type(&mut self, file_type: FileType) -> Result<bool> {
541        // let type_command = format!("TYPE {}\r\n", file_type.to_string());
542        // self.write_str(&type_command).await?;
543        self.send_command(Command::TYPE, Some(file_type.to_string().as_str()))
544            .await?;
545        Ok(ftp_reply::is_positive_completion(self._reply_code))
546    }
547
548    /// Quits the current FTP session.
549    pub async fn logout(&mut self) -> Result<bool> {
550        self.send_command(Command::QUIT, None).await?;
551        Ok(ftp_reply::is_positive_completion(self._reply_code))
552    }
553
554    /// Sets the byte from which the transfer is to be restarted.
555    pub async fn restart_from(&mut self, offset: u64) -> Result<bool> {
556        self.send_command(Command::REST, Some(offset.to_string().as_str()))
557            .await?;
558        Ok(ftp_reply::is_positive_intermediate(self._reply_code))
559    }
560
561    /// Retrieves the File name specified from the server.
562    /// This method is a more complicated way to retrieve a File.
563    /// The reader returned should be dropped.
564    /// Also you will have to read the response to make sure it has the correct value.
565    pub async fn get(&mut self, file_name: &str) -> Result<BufStream<Connection>> {
566        let retr_command = format!("RETR {}\r\n", file_name);
567        let data_stream = BufStream::new(self.data_command(&retr_command).await?);
568        self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
569        Ok(data_stream)
570    }
571
572    /// Renames the File from_name to to_name
573    pub async fn rename(&mut self, from_name: &str, to_name: &str) -> Result<bool> {
574        self.send_command(Command::RNFR, Some(from_name)).await?;
575        if !ftp_reply::is_positive_intermediate(self._reply_code) {
576            return Ok(false);
577        }
578        self.send_command(Command::RNTO, Some(to_name)).await?;
579        Ok(ftp_reply::is_positive_completion(self._reply_code))
580    }
581
582    /// The implementation of `RETR` command where `filename` is the name of the File
583    /// to download from FTP and `reader` is the function which operates with the
584    /// data stream opened.
585    ///
586    /// ```
587    /// use tokio::io::{AsyncReadExt, BufStream};
588    /// use std::io::Cursor;
589    /// use commons_net::ftp::connection::Connection;
590    /// use commons_net::ftp::ftp_client::FtpClient;
591    /// use commons_net::ftp::types::FtpError;
592    /// async {
593    ///   let mut conn = FtpClient::connect("192.168.32.204:21").await.unwrap();
594    ///   conn.login("Doe", "mumble").await.unwrap();
595    ///   let mut reader = Cursor::new("hello, world!".as_bytes());
596    ///   conn.put("retr.txt", &mut reader).await.unwrap();
597    ///
598    ///   async fn lambda(mut reader: BufStream<Connection>) -> Result<Vec<u8>, FtpError> {
599    ///     let mut buffer = Vec::new();
600    ///     reader
601    ///         .read_to_end(&mut buffer)
602    ///         .await
603    ///         .map_err(FtpError::ConnectionError)?;
604    ///     assert_eq!(buffer, "hello, world!".as_bytes());
605    ///     Ok(buffer)
606    ///   };
607    ///
608    ///   assert!(conn.retr("retr.txt", lambda).await.is_ok());
609    ///   assert!(conn.rm("retr.txt").await.is_ok());
610    /// };
611    /// ```
612    pub async fn retr<F, T, P, E>(&mut self, filename: &str, reader: F) -> std::result::Result<T, E>
613    where
614        F: Fn(BufStream<Connection>) -> P,
615        P: std::future::Future<Output = std::result::Result<T, E>>,
616        E: From<FtpError>,
617    {
618        let retr_command = format!("{} {}\r\n", Command::RETR.cmd_name(), filename);
619        let data_stream = BufStream::new(self.data_command(&retr_command).await?);
620        self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
621        let res = reader(data_stream).await?;
622        Ok(res)
623    }
624
625    /// Simple way to retr a File from the server. This stores the File in memory.
626    ///
627    /// ```
628    /// use std::io::Cursor;
629    /// use commons_net::ftp::ftp_client::FtpClient;
630    /// use commons_net::ftp::types::FtpError;
631    /// async {
632    ///     let mut conn = FtpClient::connect("192.168.32.204:21").await?;
633    ///     conn.login("Doe", "mumble").await?;
634    ///     let mut reader = Cursor::new("hello, world!".as_bytes());
635    ///     conn.put("simple_retr.txt", &mut reader).await?;
636    ///
637    ///     let cursor = conn.simple_retr("simple_retr.txt").await?;
638    ///
639    ///     assert_eq!(cursor.into_inner(), "hello, world!".as_bytes());
640    ///     assert!(conn.rm("simple_retr.txt").await.is_ok());
641    ///
642    ///     Ok::<(), FtpError>(())
643    /// };
644    /// ```
645    pub async fn simple_retr(&mut self, file_name: &str) -> Result<std::io::Cursor<Vec<u8>>> {
646        async fn do_read(mut reader: BufStream<Connection>) -> Result<Vec<u8>> {
647            let mut buffer = Vec::new();
648            reader
649                .read_to_end(&mut buffer)
650                .await
651                .map_err(FtpError::ConnectionError)?;
652
653            Ok(buffer)
654        }
655
656        let buffer = self.retr(file_name, do_read).await?;
657        Ok(std::io::Cursor::new(buffer))
658    }
659
660    pub async fn remove_directory(&mut self, pathname: &str) -> Result<bool> {
661        Ok(ftp_reply::is_positive_completion(self.rmd(pathname).await?))
662    }
663
664    /// Removes the remote pathname from the server.
665    pub async fn rmd(&mut self, pathname: &str) -> Result<u32> {
666        Ok(self.send_command(Command::RMD, Some(pathname)).await?)
667    }
668
669    pub async fn delete_file(&mut self, filename: &str) -> Result<bool> {
670        Ok(ftp_reply::is_positive_completion(
671            self.dele(filename).await?,
672        ))
673    }
674
675    /// Remove the remote File from the server.
676    pub async fn dele(&mut self, filename: &str) -> Result<u32> {
677        Ok(self.send_command(Command::DELE, Some(filename)).await?)
678    }
679
680    async fn put_file<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
681        let stor_command = format!("{} {}\r\n", Command::STOR, filename);
682        let mut data_stream = BufStream::new(self.data_command(&stor_command).await?);
683        self.check_response_in(&[ftp_reply::ALREADY_OPEN, ftp_reply::ABOUT_TO_SEND])?;
684        copy(r, &mut data_stream)
685            .await
686            .map_err(FtpError::ConnectionError)?;
687        Ok(())
688    }
689
690    /// Sends an FTP command to the server, waits for a reply and returns the numerical response code.
691    pub async fn send_command(&mut self, cmd: Command, agrs: Option<&str>) -> Result<u32> {
692        let mut ftp_cmd = format!("{}\r\n", cmd.cmd_name());
693        if agrs.is_some() {
694            ftp_cmd = format!("{} {}\r\n", cmd.cmd_name(), agrs.unwrap());
695        }
696        self.write_str(ftp_cmd).await?;
697        self.read_reply().await?;
698        Ok(self._reply_code)
699    }
700
701    async fn read_reply(&mut self) -> Result<()> {
702        self._reply_lines.clear();
703        self._reply_string = None;
704        let mut line = String::new();
705        self.stream
706            .read_line(&mut line)
707            .await
708            .map_err(FtpError::ConnectionError)?;
709
710        if line.len() < REPLY_CODE_LEN {
711            return Err(FtpError::InvalidResponse(format!(
712                "Truncated server reply: {}",
713                line
714            )));
715        }
716
717        if line.len() < 5 {
718            return Err(FtpError::InvalidResponse(
719                "error: could not read reply code".to_owned(),
720            ));
721        }
722
723        let reply_code: u32 = line[0..3].parse().map_err(|_err| {
724            FtpError::InvalidResponse(format!(
725                "Could not parse reply code. \n Server Reply: {}",
726                line
727            ))
728        })?;
729        self._reply_code = reply_code;
730        self._reply_lines.push(line.as_str().to_string());
731        let expected = format!("{} ", &line[0..3]);
732        while line.len() < 5 || line[0..4] != expected {
733            line.clear();
734            if let Err(e) = self.stream.read_line(&mut line).await {
735                return Err(FtpError::ConnectionError(e));
736            }
737            self._reply_lines.push(line.as_str().to_string());
738        }
739        let mut s = String::new();
740        for x in self._reply_lines.iter() {
741            s.push_str(x.as_str())
742        }
743        self._reply_string = Some(s);
744        Ok(())
745    }
746
747    /// This stores a File on the server.
748    pub async fn put<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
749        self.put_file(filename, r).await?;
750        self.check_response_in(&[
751            ftp_reply::CLOSING_DATA_CONNECTION,
752            ftp_reply::REQUESTED_FILE_ACTION_OK,
753        ])?;
754        Ok(())
755    }
756
757    /// Execute a command which returns list of strings in a separate stream
758    async fn list_command(
759        &mut self,
760        cmd: Cow<'_, str>,
761        open_code: u32,
762        close_code: &[u32],
763    ) -> Result<Vec<String>> {
764        let data_stream = BufStream::new(self.data_command(&cmd).await?);
765        self.check_response_in(&[open_code, ftp_reply::ALREADY_OPEN])?;
766        let lines = Self::get_lines_from_stream(data_stream).await?;
767        self.check_response_in(close_code)?;
768        Ok(lines)
769    }
770
771    /// Consume a stream and return a vector of lines
772    async fn get_lines_from_stream<R>(data_stream: R) -> Result<Vec<String>>
773    where
774        R: AsyncBufRead + Unpin,
775    {
776        let mut lines: Vec<String> = Vec::new();
777
778        let mut lines_stream = data_stream.lines();
779        loop {
780            let line = lines_stream
781                .next_line()
782                .await
783                .map_err(FtpError::ConnectionError)?;
784
785            match line {
786                Some(line) => {
787                    if line.is_empty() {
788                        continue;
789                    }
790                    lines.push(line);
791                }
792                None => break Ok(lines),
793            }
794        }
795    }
796
797    /// Execute `LIST` command which returns the detailed File listing in human readable format.
798    /// If `pathname` is omited then the list of files in the current Directory will be
799    /// returned otherwise it will the list of files on `pathname`.
800    pub async fn list(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
801        let command = pathname.map_or("LIST\r\n".into(), |path| {
802            format!("LIST {}\r\n", path).into()
803        });
804
805        self.list_command(
806            command,
807            ftp_reply::ABOUT_TO_SEND,
808            &[
809                ftp_reply::CLOSING_DATA_CONNECTION,
810                ftp_reply::REQUESTED_FILE_ACTION_OK,
811            ],
812        )
813        .await
814    }
815
816    /// Execute `NLST` command which returns the list of File names only.
817    /// If `pathname` is omited then the list of files in the current Directory will be
818    /// returned otherwise it will the list of files on `pathname`.
819    pub async fn nlst(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
820        let command = pathname.map_or("NLST\r\n".into(), |path| {
821            format!("NLST {}\r\n", path).into()
822        });
823
824        self.list_command(
825            command,
826            ftp_reply::ABOUT_TO_SEND,
827            &[
828                ftp_reply::CLOSING_DATA_CONNECTION,
829                ftp_reply::REQUESTED_FILE_ACTION_OK,
830            ],
831        )
832        .await
833    }
834
835    /// Retrieves the modification time of the File at `pathname` if it exists.
836    /// In case the File does not exist `None` is returned.
837    pub async fn mdtm(&mut self, pathname: &str) -> Result<Option<DateTime<Utc>>> {
838        self.send_command(Command::MDTM, Some(pathname)).await?;
839        self.check_response(ftp_reply::FILE)?;
840        let reply_str = self._reply_string.clone().unwrap();
841        let reply_str = reply_str.as_str();
842        match MDTM_RE.captures(reply_str) {
843            Some(caps) => {
844                let (year, month, day) = (
845                    caps[1].parse::<i32>().unwrap(),
846                    caps[2].parse::<u32>().unwrap(),
847                    caps[3].parse::<u32>().unwrap(),
848                );
849                let (hour, minute, second) = (
850                    caps[4].parse::<u32>().unwrap(),
851                    caps[5].parse::<u32>().unwrap(),
852                    caps[6].parse::<u32>().unwrap(),
853                );
854                Ok(Some(
855                    Utc.ymd(year, month, day).and_hms(hour, minute, second),
856                ))
857            }
858            None => Ok(None),
859        }
860    }
861
862    /// Retrieves the size of the File in bytes at `pathname` if it exists.
863    /// In case the File does not exist `None` is returned.
864    pub async fn size(&mut self, pathname: &str) -> Result<Option<usize>> {
865        self.send_command(Command::SIZE, Some(pathname)).await?;
866        self.check_response(ftp_reply::FILE)?;
867        let reply_str = self._reply_string.clone().unwrap();
868        let reply_str = reply_str.as_str();
869        match SIZE_RE.captures(reply_str) {
870            Some(caps) => Ok(Some(caps[1].parse().unwrap())),
871            None => Ok(None),
872        }
873    }
874
875    pub async fn feat(&mut self) -> Result<u32> {
876        self._reply_lines.clear();
877        Ok(self.send_command(Command::FEAT, None).await?)
878    }
879
880    pub async fn features(&mut self, cmd: Command) -> Result<Option<Vec<String>>> {
881        let features = Vec::new();
882        if self.init_feature_map().await? {
883            let values = self.features_map.get(cmd.cmd_name());
884            if values.is_some() {
885                return Ok(Some(values.unwrap().clone()));
886            }
887        }
888        Ok(Some(features))
889    }
890
891    async fn init_feature_map(&mut self) -> Result<bool> {
892        if self.features_map.is_empty() {
893            let reply_code = self.feat().await?;
894            if reply_code == ftp_reply::NOT_LOGGED_IN.into() {
895                return Ok(false);
896            }
897            let success = ftp_reply::is_positive_completion(reply_code);
898            if !success {
899                return Ok(false);
900            }
901            for l in self._reply_lines.iter() {
902                if l.starts_with(" ") {
903                    let mut key = "";
904                    let mut value = "";
905                    let s = &l[1..l.len() - 1];
906                    let varsep = s.find(' ');
907                    if varsep.is_some() {
908                        key = &l[1..varsep.unwrap() + 1];
909                        value = &l[varsep.unwrap() + 1..l.len()];
910                    } else {
911                        key = &l[1..l.len() - 1]
912                    }
913                    let entries = self.features_map.get_mut(key);
914                    match entries {
915                        None => {
916                            let mut features = vec![];
917                            features.push(String::from(value));
918                            self.features_map.insert(key.to_string(), features);
919                        }
920                        Some(features) => {
921                            features.push(value.to_string());
922                        }
923                    }
924                }
925            }
926        }
927        Ok(true)
928    }
929
930    async fn write_str<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
931        let conn = self.stream.get_mut();
932        conn.write_all(command.as_ref().as_bytes())
933            .await
934            .map_err(FtpError::ConnectionError)
935    }
936
937    pub fn check_response(&mut self, expected_code: u32) -> Result<()> {
938        self.check_response_in(&[expected_code])
939    }
940
941    /// Retrieve single line response
942    pub fn check_response_in(&mut self, expected_code: &[u32]) -> Result<()> {
943        let reply_string = self._reply_string.clone();
944        if expected_code.iter().any(|ec| self._reply_code == *ec) {
945            Ok(())
946        } else {
947            Err(FtpError::InvalidResponse(format!(
948                "Expected code {:?}, got response: {}",
949                expected_code,
950                reply_string.unwrap().as_str()
951            )))
952        }
953    }
954}
955
956#[cfg(test)]
957mod tests {
958    use tokio_stream::once;
959    use tokio_util::io::StreamReader;
960
961    use super::FtpClient;
962
963    #[tokio::test]
964    async fn list_command_dos_newlines() {
965        let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
966            b"Hello\r\nWorld\r\n\r\nBe\r\nHappy\r\n" as &[u8],
967        )));
968
969        assert_eq!(
970            FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
971            ["Hello", "World", "Be", "Happy"]
972                .iter()
973                .map(<&str>::to_string)
974                .collect::<Vec<_>>()
975        );
976    }
977
978    #[tokio::test]
979    async fn list_command_unix_newlines() {
980        let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
981            b"Hello\nWorld\n\nBe\nHappy\n" as &[u8],
982        )));
983
984        assert_eq!(
985            FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
986            ["Hello", "World", "Be", "Happy"]
987                .iter()
988                .map(<&str>::to_string)
989                .collect::<Vec<_>>()
990        );
991    }
992}