ftp_rs/
ftp_client.rs

1//! FTP module.
2use std::borrow::{Borrow, Cow};
3use std::collections::{HashMap, HashSet};
4use std::fmt::format;
5use std::net::{IpAddr, SocketAddr};
6use std::string::String;
7
8use chrono::offset::TimeZone;
9use chrono::{DateTime, Utc};
10use regex::Regex;
11use tokio::io::{
12    copy, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufStream,
13};
14use tokio::net::{TcpStream, ToSocketAddrs};
15#[cfg(feature = "ftps")]
16use tokio_rustls::{client::TlsStream, rustls::ClientConfig, rustls::ServerName, TlsConnector};
17
18use crate::cmd::Command;
19use crate::connection::Connection;
20use crate::types::{FileType, FtpError, Result};
21use crate::{cmd, ftp_reply, StringExt, MODES, REPLY_CODE_LEN};
22
23lazy_static::lazy_static! {
24    // This regex extracts IP and Port details from PASV command response.
25    // The regex looks for the pattern (h1,h2,h3,h4,p1,p2).
26    static ref PORT_RE: Regex = Regex::new(r"\((\d+),(\d+),(\d+),(\d+),(\d+),(\d+)\)").unwrap();
27
28    // This regex extracts modification time from MDTM command response.
29    static ref MDTM_RE: Regex = Regex::new(r"\b(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})\b").unwrap();
30
31    // This regex extracts File size from SIZE command response.
32    static ref SIZE_RE: Regex = Regex::new(r"\s+(\d+)\s*$").unwrap();
33    static ref PROT_COMMAND_VALUE: Vec<&'static str> = {vec!["C","E","S","P"]};
34}
35
36pub struct FtpClient {
37    stream: BufStream<Connection>,
38    welcome_msg: Option<String>,
39    _reply_code: u32,
40    _reply_string: Option<String>,
41    _reply_lines: Vec<String>,
42    #[cfg(feature = "ftps")]
43    ssl_cfg: Option<(ClientConfig, ServerName)>,
44    features_map: HashMap<String, Vec<String>>,
45}
46
47impl FtpClient {
48    fn new(stream: TcpStream) -> Self {
49        FtpClient {
50            stream: BufStream::new(Connection::Tcp(stream)),
51            #[cfg(feature = "ftps")]
52            ssl_cfg: None,
53            welcome_msg: None,
54            _reply_code: 0,
55            _reply_string: None,
56            features_map: HashMap::new(),
57            _reply_lines: vec![],
58        }
59    }
60
61    #[cfg(feature = "ftps")]
62    fn new_tls_client(stream: TlsStream<TcpStream>) -> Self {
63        FtpClient {
64            stream: BufStream::new(Connection::Ssl(stream)),
65            ssl_cfg: None,
66            welcome_msg: None,
67            _reply_code: 0,
68            _reply_string: None,
69            features_map: HashMap::new(),
70            _reply_lines: vec![],
71        }
72    }
73
74    pub fn init_default(&mut self) {}
75
76    /// Creates an FTP Client.
77    pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<FtpClient> {
78        let stream = TcpStream::connect(addr)
79            .await
80            .map_err(FtpError::ConnectionError)?;
81
82        let mut ftp_client = FtpClient::new(stream);
83        ftp_client.read_reply().await?;
84        ftp_client.check_response(ftp_reply::READY)?;
85        ftp_client.welcome_msg = Some(ftp_client._reply_string.clone().unwrap());
86        Ok(ftp_client)
87    }
88
89    /// Switch to a secure mode if possible, using a provided SSL configuration.
90    /// This method does nothing if the connect is already secured.
91    ///
92    /// ## Panics
93    ///
94    /// Panics if the plain TCP connection cannot be switched to TLS mode.
95    ///
96    /// ## Example
97    ///
98    /// ```rust,no_run
99    /// use std::convert::TryFrom;
100    /// use std::path::Path;
101    /// use ftp_rs::FtpClient;
102    /// use tokio_rustls::rustls::{ClientConfig, RootCertStore, ServerName};
103    ///
104    /// let mut root_store = RootCertStore::empty();
105    /// // root_store.add_pem_file(...);
106    /// let conf = ClientConfig::builder().with_safe_defaults().with_root_certificates(root_store).with_no_client_auth();
107    /// let domain = ServerName::try_from("www.cert-domain.com").expect("invalid DNS name");
108    /// async {
109    ///   let mut ftp_client = FtpClient::connect("192.168.32.204:21").await.unwrap();
110    ///   let mut ftp_client = ftp_client.into_secure(conf, domain).await.unwrap();
111    /// };
112    /// ```
113    #[cfg(feature = "ftps")]
114    pub async fn into_secure(
115        mut self,
116        config: ClientConfig,
117        domain: ServerName,
118    ) -> Result<FtpClient> {
119        self.exe_auth_tls().await?;
120
121        let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
122        let stream = connector
123            .connect(domain.clone(), self.stream.into_inner().into_tcp_stream())
124            .await
125            .map_err(|e| FtpError::SecureError(format!("{}", e)))?;
126
127        let mut ftps_client = FtpClient::new_tls_client(stream);
128        ftps_client.ssl_cfg = Some((config, domain));
129
130        // Set protection buffer size
131        ftps_client.send_command(Command::PBSZ, Some("0")).await?;
132        ftps_client.check_response(ftp_reply::COMMAND_OK)?;
133
134        ftps_client.send_command(Command::PROT, Some("P")).await?;
135        ftps_client.check_response(ftp_reply::COMMAND_OK)?;
136        Ok(ftps_client)
137    }
138
139    #[cfg(feature = "ftps")]
140    async fn exe_auth_tls(mut self) -> Result<()> {
141        self.send_command(Command::AUTH, Some("TLS")).await?;
142        self.check_response_in(&[ftp_reply::AUTH_OK, ftp_reply::SECURITY_MECHANISM_IS_OK])
143    }
144
145    #[cfg(feature = "ftps")]
146    /// Send the AUTH command with the specified mechanism.
147    pub async fn exe_auth(mut self, mechanism: &str) -> Result<u32> {
148        Ok(self.send_command(Command::AUTH, Some(mechanism)).await?)
149    }
150
151    #[cfg(feature = "ftps")]
152    /// PBSZ command. pbsz value: 0 to (2^32)-1 decimal integer.
153    /// * pbsz: Protection Buffer Size
154    pub async fn exec_pbsz(mut self, pbsz: u64) -> Result<()> {
155        if pbsz < 0 || 4294967295 < pbsz {
156            Err(FtpError::InvalidArgument(format!(
157                "Invalid pbsz value. The correct value should be: {} to {}",
158                0, "(2^32)-1"
159            )))
160        }
161        self.send_command(Command::PBSZ, Some(pbsz.to_string().as_str()))
162            .await?;
163        self.check_response_in(&[ftp_reply::COMMAND_OK])?;
164        Ok(())
165    }
166
167    #[cfg(feature = "ftps")]
168    /// Send the ADAT command with the specified authentication data.
169    pub async fn exec_adat(mut self, data: Option<&[u8]>) -> Result<u32> {
170        let mut args = None;
171        if data.is_some() {
172            args = Some(base64::encode(data).as_str());
173        }
174        Ok(self.send_command(Command::ADAT, args).await?)
175    }
176
177    #[cfg(feature = "ftps")]
178    /// Send the CONF command with the specified authentication data.
179    pub async fn exec_conf(mut self, data: Option<&[u8]>) -> Result<u32> {
180        let mut args = None;
181        if data.is_some() {
182            args = Some(base64::encode(data).as_str());
183        }
184        Ok(self.send_command(Command::CONF, args).await?)
185    }
186
187    #[cfg(feature = "ftps")]
188    /// Send the ENC command with the specified authentication data.
189    pub async fn exec_enc(mut self, data: Option<&[u8]>) -> Result<u32> {
190        let mut args = None;
191        if data.is_some() {
192            args = Some(base64::encode(data).as_str());
193        }
194        Ok(self.send_command(Command::ENC, args).await?)
195    }
196
197    #[cfg(feature = "ftps")]
198    /// Send the MIC command with the specified authentication data.
199    pub async fn exec_mic(mut self, data: Option<&[u8]>) -> Result<u32> {
200        let mut args = None;
201        if data.is_some() {
202            args = Some(base64::encode(data).as_str());
203        }
204        Ok(self.send_command(Command::MIC, args).await?)
205    }
206
207    #[cfg(feature = "ftps")]
208    /// Send the MIC command with the specified authentication data.
209    pub async fn exec_prot(mut self, prot: &mut str) -> Result<()> {
210        let mut p = prot.as_mut();
211        if p.is_empty() {
212            p = &mut "C";
213        }
214        if !PROT_COMMAND_VALUE.contains(&&*p) {
215            Err(FtpError::InvalidArgument(format!(
216                "Unsupported prot command value",
217            )))
218        }
219        self.send_command(Command::PROT, Some(p)).await?;
220        self.check_response_in(&[ftp_reply::COMMAND_OK])?;
221        Ok(())
222    }
223
224    /// Switch to insecure mode. If the connection is already
225    /// insecure does nothing.
226    ///
227    /// ## Example
228    ///
229    /// ```rust,no_run
230    /// use std::convert::TryFrom;
231    /// use std::path::Path;
232    /// use ftp_rs::FtpClient;
233    /// use tokio_rustls::rustls::{ClientConfig, RootCertStore, ServerName};
234    ///
235    /// let mut root_store = RootCertStore::empty();
236    /// // root_store.add_pem_file(...);
237    /// let conf = ClientConfig::builder().with_safe_defaults().with_root_certificates(root_store).with_no_client_auth();
238    /// let domain = ServerName::try_from("www.cert-domain.com").expect("invalid DNS name");
239    /// async {
240    ///   let mut ftp_client = FtpClient::connect("192.168.32.204:21").await.unwrap();
241    ///   let mut ftp_client = ftp_client.into_secure(conf, domain).await.unwrap();
242    ///   // Switch back to the insecure mode
243    ///   let mut ftp_client = ftp_client.into_insecure().await.unwrap();
244    ///   // Do all public things
245    ///   let _ = ftp_client.quit();
246    /// };
247    /// ```
248    #[cfg(feature = "ftps")]
249    pub async fn into_insecure(mut self) -> Result<FtpClient> {
250        self.send_command(Command::CCC, None).await?;
251        if self._reply_code == ftp_reply::COMMAND_OK {
252            Ok(FtpClient::new(self.stream.into_inner().into_tcp_stream()))
253        }
254        Err(FtpError::InvalidResponse(format!(
255            "Expected code {:?}, got response: {}",
256            ftp_reply::COMMAND_OK,
257            self._reply_string.unwrap()
258        )))
259    }
260
261    /// Execute command which send data back in a separate stream
262    async fn data_command(&mut self, cmd: &str) -> Result<Connection> {
263        let addr = self.pasv().await?;
264        let stream = TcpStream::connect(addr)
265            .await
266            .map_err(FtpError::ConnectionError)?;
267
268        #[cfg(feature = "ftps")]
269        match &self.ssl_cfg {
270            Some((config, domain)) => {
271                let connector: TlsConnector = std::sync::Arc::new(config.clone()).into();
272                return connector
273                    .connect(domain.to_owned(), stream)
274                    .await
275                    .map(|stream| Connection::Ssl(stream))
276                    .map_err(|e| FtpError::SecureError(format!("{}", e)));
277            }
278            _ => {}
279        };
280        self.write_str(cmd).await?;
281        self.read_reply().await?;
282        Ok(Connection::Tcp(stream))
283    }
284
285    /// Returns a reference to the underlying TcpStream.
286    ///
287    /// Example:
288    /// ```no_run
289    /// use tokio::net::TcpStream;
290    /// use std::time::Duration;
291    /// use ftp_rs::FtpClient;
292    ///
293    /// async {
294    ///   let client = FtpClient::connect("192.168.32.204:21").await
295    ///                          .expect("Couldn't connect to the server...");
296    ///   let s: &TcpStream = client.get_ref();
297    /// };
298    /// ```
299    pub fn get_ref(&self) -> &TcpStream {
300        self.stream.get_ref().get_ref()
301    }
302
303    /// Get welcome message from the server on connect.
304    pub fn get_welcome_msg(&self) -> Option<&str> {
305        self.welcome_msg.as_deref()
306    }
307
308    /// Log in to the FTP server.
309    pub async fn login(&mut self, user: &str, password: &str) -> Result<bool> {
310        self.send_command(Command::USER, Some(user)).await?;
311        if ftp_reply::is_positive_completion(self._reply_code) {
312            return Ok(true);
313        } else if !ftp_reply::is_positive_intermediate(self._reply_code) {
314            return Ok(false);
315        }
316        self.send_command(Command::PASS, Some(password)).await?;
317        Ok(ftp_reply::is_positive_completion(self._reply_code))
318    }
319
320    /// Change the current Directory to the path specified.
321    pub async fn cwd(&mut self, path: &str) -> Result<bool> {
322        self.send_command(Command::CWD, Some(path)).await?;
323        Ok(ftp_reply::is_positive_completion(self._reply_code))
324    }
325
326    /// Move the current Directory to the parent Directory.
327    pub async fn cdup(&mut self) -> Result<bool> {
328        self.send_command(Command::CDUP, None).await?;
329        Ok(ftp_reply::is_positive_completion(self._reply_code))
330    }
331
332    /// Gets the current Directory
333    pub async fn pwd(&mut self) -> Result<String> {
334        self.send_command(Command::PWD, None).await?;
335        match &self._reply_string {
336            None => {
337                let cause = format!("Cannot get PWD Response from FTP server");
338                Err(FtpError::InvalidResponse(cause))
339            }
340            Some(content) => match (content.find('"'), content.rfind('"')) {
341                (Some(begin), Some(end)) if begin < end => Ok(content[begin + 1..end].to_string()),
342                _ => {
343                    let cause = format!("Invalid PWD Response: {}", content);
344                    Err(FtpError::InvalidResponse(cause))
345                }
346            },
347        }
348    }
349
350    /// This does nothing. This is usually just used to keep the connection open.
351    pub async fn noop(&mut self) -> Result<bool> {
352        self.send_command(Command::NOOP, None).await?;
353        Ok(ftp_reply::is_positive_completion(self._reply_code))
354    }
355
356    /// This creates a new Directory on the server.
357    pub async fn make_directory(&mut self, pathname: &str) -> Result<bool> {
358        match ftp_reply::is_positive_completion(self.mkd(pathname).await?) {
359            true => Ok(true),
360            false => {
361                return Err(FtpError::InvalidResponse(format!(
362                    "Got error reply: {}",
363                    self._reply_string.as_ref().unwrap()
364                )));
365            }
366        }
367    }
368
369    /// A convenience method to send the FTP MKD command to the server, receive the reply, and return the reply code.
370    pub async fn mkd(&mut self, pathname: &str) -> Result<u32> {
371        Ok(self.send_command(Command::MKD, Some(pathname)).await?)
372    }
373
374    /// A convenience method to send the FTP ACCT command to the server, receive the reply, and return the reply code.
375    pub async fn acct(&mut self, account: &str) -> Result<u32> {
376        Ok(self.send_command(Command::ACCT, Some(account)).await?)
377    }
378
379    /// A convenience method to send the FTP ABOR command to the server, receive the reply, and return the reply code.
380    pub async fn abor(&mut self) -> Result<u32> {
381        Ok(self.send_command(Command::ABOR, None).await?)
382    }
383
384    /// A convenience method to send the FTP REIN command to the server, receive the reply, and return the reply code.
385    pub async fn rein(&mut self) -> Result<u32> {
386        Ok(self.send_command(Command::REIN, None).await?)
387    }
388
389    /// A convenience method to send the FTP SMNT command to the server, receive the reply, and return the reply code.
390    pub async fn smnt(&mut self, dir: &str) -> Result<u32> {
391        Ok(self.send_command(Command::SMNT, Some(dir)).await?)
392    }
393
394    /// A convenience method to send the FTP EPSV command to the server, receive the reply, and return the reply code.
395    pub async fn epsv(&mut self) -> Result<u32> {
396        Ok(self.send_command(Command::EPSV, None).await?)
397    }
398
399    /// A convenience method to send the FTP TYPE command to the server, receive the reply, and return the reply code.
400    pub async fn type_cmd(&mut self, file_type: u32) -> Result<u32> {
401        let s = MODES.substring(file_type as usize, (file_type + 1) as usize);
402        Ok(self.send_command(Command::TYPE, Some(s)).await?)
403    }
404
405    /// A convenience method to send the FTP STRU command to the server, receive the reply, and return the reply code.
406    pub async fn stru(&mut self, structure: u32) -> Result<u32> {
407        let s = MODES.substring(structure as usize, (structure + 1) as usize);
408        Ok(self.send_command(Command::STRU, Some(s)).await?)
409    }
410
411    /// A convenience method to send the FTP MODE command to the server, receive the reply, and return the reply code.
412    pub async fn mode(&mut self, mode: u32) -> Result<u32> {
413        let s = MODES.substring(mode as usize, (mode + 1) as usize);
414        Ok(self.send_command(Command::MODE, Some(s)).await?)
415    }
416
417    /// A convenience method to send the FTP STOU command to the server, receive the reply, and return the reply code.
418    pub async fn stou(&mut self) -> Result<u32> {
419        Ok(self.send_command(Command::STOU, None).await?)
420    }
421
422    /// A convenience method to send the FTP STOU command to the server, receive the reply, and return the reply code.
423    pub async fn stou_pathname(&mut self, pathname: &str) -> Result<u32> {
424        Ok(self.send_command(Command::STOU, Some(pathname)).await?)
425    }
426
427    /// A convenience method to send the FTP APPE command to the server, receive the reply, and return the reply code.
428    pub async fn appe(&mut self, pathname: &str) -> Result<u32> {
429        Ok(self.send_command(Command::APPE, Some(pathname)).await?)
430    }
431
432    /// A convenience method to send the FTP ALLO command to the server, receive the reply, and return the reply code.
433    pub async fn allo(&mut self, bytes: u32) -> Result<u32> {
434        Ok(self
435            .send_command(Command::ALLO, Some(bytes.to_string().as_str()))
436            .await?)
437    }
438
439    /// A convenience method to send the FTP ALLO command to the server, receive the reply, and return the reply code.
440    pub async fn allo_record_size(&mut self, bytes: u32, record_size: u32) -> Result<u32> {
441        let args = format!(
442            "{} R {}",
443            bytes.to_string().as_str(),
444            record_size.to_string().as_str()
445        );
446        Ok(self
447            .send_command(Command::ALLO, Some(args.as_str()))
448            .await?)
449    }
450
451    /// A convenience method to send the FTP PORT command to the server, receive the reply, and return the reply code.
452    pub async fn port(&mut self, host: IpAddr, port: u16) -> Result<u32> {
453        let mut args = String::with_capacity(24);
454        args.push_str(host.to_string().replace('.', ",").as_str());
455        args.push_str(",");
456        args.push_str((port >> 8).to_string().as_str());
457        args.push_str(",");
458        args.push_str((port & 0xff).to_string().as_str());
459        Ok(self
460            .send_command(Command::PORT, Some(args.as_str()))
461            .await?)
462    }
463
464    /// A convenience method to send the FTP EPRT command to the server, receive the reply, and return the reply code.
465    /// * EPRT |1|132.235.1.2|6275|
466    /// * EPRT |2|1080::8:800:200C:417A|5282|
467    pub async fn eprt(&mut self, host: IpAddr, port: u16) -> Result<u32> {
468        let mut args = String::new();
469        let mut h = host.to_string();
470        let n = h.find("%").unwrap_or(0);
471        if n > 0 {
472            h = h.substring(0, n).to_string();
473        }
474        args.push_str("|");
475        match host {
476            IpAddr::V4(addr) => args.push_str("1"),
477            IpAddr::V6(addr) => args.push_str("2"),
478        }
479        args.push_str("|");
480        args.push_str(h.as_str());
481        args.push_str("|");
482        args.push_str(port.to_string().as_str());
483        args.push_str("|");
484        Ok(self
485            .send_command(Command::EPRT, Some(args.as_str()))
486            .await?)
487    }
488
489    /// A convenience method to send the FTP MFMT command to the server, receive the reply, and return the reply code.
490    pub async fn mfmt(&mut self, pathname: &str, timeval: &str) -> Result<u32> {
491        Ok(self
492            .send_command(
493                Command::MFMT,
494                Some(format!("{} {}", timeval, pathname).as_str()),
495            )
496            .await?)
497    }
498
499    /// Runs the PASV command.
500    async fn pasv(&mut self) -> Result<SocketAddr> {
501        self.send_command(Command::PASV, None).await?;
502        self.check_response(ftp_reply::PASSIVE_MODE)?;
503        let reply_str = self._reply_string.clone().unwrap();
504        let reply_str = reply_str.as_str();
505        PORT_RE
506            .captures(reply_str)
507            .ok_or_else(|| {
508                FtpError::InvalidResponse(format!("Invalid PASV response: {}", reply_str))
509            })
510            .and_then(|caps| {
511                // If the regex matches we can be sure groups contains numbers
512                let (oct1, oct2, oct3, oct4) = (
513                    caps[1].parse::<u8>().unwrap(),
514                    caps[2].parse::<u8>().unwrap(),
515                    caps[3].parse::<u8>().unwrap(),
516                    caps[4].parse::<u8>().unwrap(),
517                );
518                let (msb, lsb) = (
519                    caps[5].parse::<u8>().unwrap(),
520                    caps[6].parse::<u8>().unwrap(),
521                );
522                let port = ((msb as u16) << 8) + lsb as u16;
523
524                use std::net::{IpAddr, Ipv4Addr};
525
526                let ip = if (oct1, oct2, oct3, oct4) == (0, 0, 0, 0) {
527                    self.get_ref()
528                        .peer_addr()
529                        .map_err(FtpError::ConnectionError)?
530                        .ip()
531                } else {
532                    IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
533                };
534                Ok(SocketAddr::new(ip, port))
535            })
536    }
537
538    /// Sets the type of File to be transferred. That is the implementation
539    /// of `TYPE` command.
540    pub async fn transfer_type(&mut self, file_type: FileType) -> Result<bool> {
541        // let type_command = format!("TYPE {}\r\n", file_type.to_string());
542        // self.write_str(&type_command).await?;
543        self.send_command(Command::TYPE, Some(file_type.to_string().as_str()))
544            .await?;
545        Ok(ftp_reply::is_positive_completion(self._reply_code))
546    }
547
548    /// Quits the current FTP session.
549    pub async fn logout(&mut self) -> Result<bool> {
550        self.send_command(Command::QUIT, None).await?;
551        Ok(ftp_reply::is_positive_completion(self._reply_code))
552    }
553
554    /// Sets the byte from which the transfer is to be restarted.
555    pub async fn restart_from(&mut self, offset: u64) -> Result<bool> {
556        self.send_command(Command::REST, Some(offset.to_string().as_str()))
557            .await?;
558        Ok(ftp_reply::is_positive_intermediate(self._reply_code))
559    }
560
561    /// Retrieves the File name specified from the server.
562    /// This method is a more complicated way to retrieve a File.
563    /// The reader returned should be dropped.
564    /// Also you will have to read the response to make sure it has the correct value.
565    pub async fn get(&mut self, file_name: &str) -> Result<BufStream<Connection>> {
566        let retr_command = format!("RETR {}\r\n", file_name);
567        let data_stream = BufStream::new(self.data_command(&retr_command).await?);
568        self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
569        Ok(data_stream)
570    }
571
572    /// Renames the File from_name to to_name
573    pub async fn rename(&mut self, from_name: &str, to_name: &str) -> Result<bool> {
574        self.send_command(Command::RNFR, Some(from_name)).await?;
575        if !ftp_reply::is_positive_intermediate(self._reply_code) {
576            return Ok(false);
577        }
578        self.send_command(Command::RNTO, Some(to_name)).await?;
579        Ok(ftp_reply::is_positive_completion(self._reply_code))
580    }
581
582    /// The implementation of `RETR` command where `filename` is the name of the File
583    /// to download from FTP and `reader` is the function which operates with the
584    /// data stream opened.
585    ///
586    /// ```
587    /// use ftp_rs::{FtpClient, Connection, FtpError};
588    /// use tokio::io::{AsyncReadExt, BufStream};
589    /// use std::io::Cursor;
590    /// async {
591    ///   let mut conn = FtpClient::connect("192.168.32.204:21").await.unwrap();
592    ///   conn.login("Doe", "mumble").await.unwrap();
593    ///   let mut reader = Cursor::new("hello, world!".as_bytes());
594    ///   conn.put("retr.txt", &mut reader).await.unwrap();
595    ///
596    ///   async fn lambda(mut reader: BufStream<Connection>) -> Result<Vec<u8>, FtpError> {
597    ///     let mut buffer = Vec::new();
598    ///     reader
599    ///         .read_to_end(&mut buffer)
600    ///         .await
601    ///         .map_err(FtpError::ConnectionError)?;
602    ///     assert_eq!(buffer, "hello, world!".as_bytes());
603    ///     Ok(buffer)
604    ///   };
605    ///
606    ///   assert!(conn.retr("retr.txt", lambda).await.is_ok());
607    ///   assert!(conn.rm("retr.txt").await.is_ok());
608    /// };
609    /// ```
610    pub async fn retr<F, T, P, E>(&mut self, filename: &str, reader: F) -> std::result::Result<T, E>
611    where
612        F: Fn(BufStream<Connection>) -> P,
613        P: std::future::Future<Output = std::result::Result<T, E>>,
614        E: From<FtpError>,
615    {
616        let retr_command = format!("{} {}\r\n", cmd::Command::RETR.cmd_name(), filename);
617        let data_stream = BufStream::new(self.data_command(&retr_command).await?);
618        self.check_response_in(&[ftp_reply::ABOUT_TO_SEND, ftp_reply::ALREADY_OPEN])?;
619        let res = reader(data_stream).await?;
620        Ok(res)
621    }
622
623    /// Simple way to retr a File from the server. This stores the File in memory.
624    ///
625    /// ```
626    /// use ftp_rs::{FtpClient, FtpError};
627    /// use std::io::Cursor;
628    /// async {
629    ///     let mut conn = FtpClient::connect("192.168.32.204:21").await?;
630    ///     conn.login("Doe", "mumble").await?;
631    ///     let mut reader = Cursor::new("hello, world!".as_bytes());
632    ///     conn.put("simple_retr.txt", &mut reader).await?;
633    ///
634    ///     let cursor = conn.simple_retr("simple_retr.txt").await?;
635    ///
636    ///     assert_eq!(cursor.into_inner(), "hello, world!".as_bytes());
637    ///     assert!(conn.rm("simple_retr.txt").await.is_ok());
638    ///
639    ///     Ok::<(), FtpError>(())
640    /// };
641    /// ```
642    pub async fn simple_retr(&mut self, file_name: &str) -> Result<std::io::Cursor<Vec<u8>>> {
643        async fn do_read(mut reader: BufStream<Connection>) -> Result<Vec<u8>> {
644            let mut buffer = Vec::new();
645            reader
646                .read_to_end(&mut buffer)
647                .await
648                .map_err(FtpError::ConnectionError)?;
649
650            Ok(buffer)
651        }
652
653        let buffer = self.retr(file_name, do_read).await?;
654        Ok(std::io::Cursor::new(buffer))
655    }
656
657    pub async fn remove_directory(&mut self, pathname: &str) -> Result<bool> {
658        Ok(ftp_reply::is_positive_completion(self.rmd(pathname).await?))
659    }
660
661    /// Removes the remote pathname from the server.
662    pub async fn rmd(&mut self, pathname: &str) -> Result<u32> {
663        Ok(self.send_command(Command::RMD, Some(pathname)).await?)
664    }
665
666    pub async fn delete_file(&mut self, filename: &str) -> Result<bool> {
667        Ok(ftp_reply::is_positive_completion(
668            self.dele(filename).await?,
669        ))
670    }
671
672    /// Remove the remote File from the server.
673    pub async fn dele(&mut self, filename: &str) -> Result<u32> {
674        Ok(self.send_command(Command::DELE, Some(filename)).await?)
675    }
676
677    async fn put_file<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
678        let stor_command = format!("{} {}\r\n", Command::STOR, filename);
679        let mut data_stream = BufStream::new(self.data_command(&stor_command).await?);
680        self.check_response_in(&[ftp_reply::ALREADY_OPEN, ftp_reply::ABOUT_TO_SEND])?;
681        copy(r, &mut data_stream)
682            .await
683            .map_err(FtpError::ConnectionError)?;
684        Ok(())
685    }
686
687    /// Sends an FTP command to the server, waits for a reply and returns the numerical response code.
688    pub async fn send_command(&mut self, cmd: cmd::Command, agrs: Option<&str>) -> Result<u32> {
689        let mut ftp_cmd = format!("{}\r\n", cmd.cmd_name());
690        if agrs.is_some() {
691            ftp_cmd = format!("{} {}\r\n", cmd.cmd_name(), agrs.unwrap());
692        }
693        self.write_str(ftp_cmd).await?;
694        self.read_reply().await?;
695        Ok(self._reply_code)
696    }
697
698    async fn read_reply(&mut self) -> Result<()> {
699        self._reply_lines.clear();
700        self._reply_string = None;
701        let mut line = String::new();
702        self.stream
703            .read_line(&mut line)
704            .await
705            .map_err(FtpError::ConnectionError)?;
706
707        if line.len() < REPLY_CODE_LEN {
708            return Err(FtpError::InvalidResponse(format!(
709                "Truncated server reply: {}",
710                line
711            )));
712        }
713
714        if line.len() < 5 {
715            return Err(FtpError::InvalidResponse(
716                "error: could not read reply code".to_owned(),
717            ));
718        }
719
720        let reply_code: u32 = line[0..3].parse().map_err(|_err| {
721            FtpError::InvalidResponse(format!(
722                "Could not parse reply code. \n Server Reply: {}",
723                line
724            ))
725        })?;
726        self._reply_code = reply_code;
727        self._reply_lines.push(line.as_str().to_string());
728        let expected = format!("{} ", &line[0..3]);
729        while line.len() < 5 || line[0..4] != expected {
730            line.clear();
731            if let Err(e) = self.stream.read_line(&mut line).await {
732                return Err(FtpError::ConnectionError(e));
733            }
734            self._reply_lines.push(line.as_str().to_string());
735        }
736        let mut s = String::new();
737        for x in self._reply_lines.iter() {
738            s.push_str(x.as_str())
739        }
740        self._reply_string = Some(s);
741        Ok(())
742    }
743
744    /// This stores a File on the server.
745    pub async fn put<R: AsyncRead + Unpin>(&mut self, filename: &str, r: &mut R) -> Result<()> {
746        self.put_file(filename, r).await?;
747        self.check_response_in(&[
748            ftp_reply::CLOSING_DATA_CONNECTION,
749            ftp_reply::REQUESTED_FILE_ACTION_OK,
750        ])?;
751        Ok(())
752    }
753
754    /// Execute a command which returns list of strings in a separate stream
755    async fn list_command(
756        &mut self,
757        cmd: Cow<'_, str>,
758        open_code: u32,
759        close_code: &[u32],
760    ) -> Result<Vec<String>> {
761        let data_stream = BufStream::new(self.data_command(&cmd).await?);
762        self.check_response_in(&[open_code, ftp_reply::ALREADY_OPEN])?;
763        let lines = Self::get_lines_from_stream(data_stream).await?;
764        self.check_response_in(close_code)?;
765        Ok(lines)
766    }
767
768    /// Consume a stream and return a vector of lines
769    async fn get_lines_from_stream<R>(data_stream: R) -> Result<Vec<String>>
770    where
771        R: AsyncBufRead + Unpin,
772    {
773        let mut lines: Vec<String> = Vec::new();
774
775        let mut lines_stream = data_stream.lines();
776        loop {
777            let line = lines_stream
778                .next_line()
779                .await
780                .map_err(FtpError::ConnectionError)?;
781
782            match line {
783                Some(line) => {
784                    if line.is_empty() {
785                        continue;
786                    }
787                    lines.push(line);
788                }
789                None => break Ok(lines),
790            }
791        }
792    }
793
794    /// Execute `LIST` command which returns the detailed File listing in human readable format.
795    /// If `pathname` is omited then the list of files in the current Directory will be
796    /// returned otherwise it will the list of files on `pathname`.
797    pub async fn list(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
798        let command = pathname.map_or("LIST\r\n".into(), |path| {
799            format!("LIST {}\r\n", path).into()
800        });
801
802        self.list_command(
803            command,
804            ftp_reply::ABOUT_TO_SEND,
805            &[
806                ftp_reply::CLOSING_DATA_CONNECTION,
807                ftp_reply::REQUESTED_FILE_ACTION_OK,
808            ],
809        )
810        .await
811    }
812
813    /// Execute `NLST` command which returns the list of File names only.
814    /// If `pathname` is omited then the list of files in the current Directory will be
815    /// returned otherwise it will the list of files on `pathname`.
816    pub async fn nlst(&mut self, pathname: Option<&str>) -> Result<Vec<String>> {
817        let command = pathname.map_or("NLST\r\n".into(), |path| {
818            format!("NLST {}\r\n", path).into()
819        });
820
821        self.list_command(
822            command,
823            ftp_reply::ABOUT_TO_SEND,
824            &[
825                ftp_reply::CLOSING_DATA_CONNECTION,
826                ftp_reply::REQUESTED_FILE_ACTION_OK,
827            ],
828        )
829        .await
830    }
831
832    /// Retrieves the modification time of the File at `pathname` if it exists.
833    /// In case the File does not exist `None` is returned.
834    pub async fn mdtm(&mut self, pathname: &str) -> Result<Option<DateTime<Utc>>> {
835        self.send_command(Command::MDTM, Some(pathname)).await?;
836        self.check_response(ftp_reply::FILE)?;
837        let reply_str = self._reply_string.clone().unwrap();
838        let reply_str = reply_str.as_str();
839        match MDTM_RE.captures(reply_str) {
840            Some(caps) => {
841                let (year, month, day) = (
842                    caps[1].parse::<i32>().unwrap(),
843                    caps[2].parse::<u32>().unwrap(),
844                    caps[3].parse::<u32>().unwrap(),
845                );
846                let (hour, minute, second) = (
847                    caps[4].parse::<u32>().unwrap(),
848                    caps[5].parse::<u32>().unwrap(),
849                    caps[6].parse::<u32>().unwrap(),
850                );
851                Ok(Some(
852                    Utc.ymd(year, month, day).and_hms(hour, minute, second),
853                ))
854            }
855            None => Ok(None),
856        }
857    }
858
859    /// Retrieves the size of the File in bytes at `pathname` if it exists.
860    /// In case the File does not exist `None` is returned.
861    pub async fn size(&mut self, pathname: &str) -> Result<Option<usize>> {
862        self.send_command(Command::SIZE, Some(pathname)).await?;
863        self.check_response(ftp_reply::FILE)?;
864        let reply_str = self._reply_string.clone().unwrap();
865        let reply_str = reply_str.as_str();
866        match SIZE_RE.captures(reply_str) {
867            Some(caps) => Ok(Some(caps[1].parse().unwrap())),
868            None => Ok(None),
869        }
870    }
871
872    pub async fn feat(&mut self) -> Result<u32> {
873        self._reply_lines.clear();
874        Ok(self.send_command(Command::FEAT, None).await?)
875    }
876
877    pub async fn features(&mut self, cmd: Command) -> Result<Option<Vec<String>>> {
878        let features = Vec::new();
879        if self.init_feature_map().await? {
880            let values = self.features_map.get(cmd.cmd_name());
881            if values.is_some() {
882                return Ok(Some(values.unwrap().clone()));
883            }
884        }
885        Ok(Some(features))
886    }
887
888    async fn init_feature_map(&mut self) -> Result<bool> {
889        if self.features_map.is_empty() {
890            let reply_code = self.feat().await?;
891            if reply_code == ftp_reply::NOT_LOGGED_IN.into() {
892                return Ok(false);
893            }
894            let success = ftp_reply::is_positive_completion(reply_code);
895            if !success {
896                return Ok(false);
897            }
898            for l in self._reply_lines.iter() {
899                if l.starts_with(" ") {
900                    let mut key = "";
901                    let mut value = "";
902                    let s = &l[1..l.len() - 1];
903                    let varsep = s.find(' ');
904                    if varsep.is_some() {
905                        key = &l[1..varsep.unwrap() + 1];
906                        value = &l[varsep.unwrap() + 1..l.len()];
907                    } else {
908                        key = &l[1..l.len() - 1]
909                    }
910                    let entries = self.features_map.get_mut(key);
911                    match entries {
912                        None => {
913                            let mut features = vec![];
914                            features.push(String::from(value));
915                            self.features_map.insert(key.to_string(), features);
916                        }
917                        Some(features) => {
918                            features.push(value.to_string());
919                        }
920                    }
921                }
922            }
923        }
924        Ok(true)
925    }
926
927    async fn write_str<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
928        let conn = self.stream.get_mut();
929        conn.write_all(command.as_ref().as_bytes())
930            .await
931            .map_err(FtpError::ConnectionError)
932    }
933
934    pub fn check_response(&mut self, expected_code: u32) -> Result<()> {
935        self.check_response_in(&[expected_code])
936    }
937
938    /// Retrieve single line response
939    pub fn check_response_in(&mut self, expected_code: &[u32]) -> Result<()> {
940        let reply_string = self._reply_string.clone();
941        if expected_code.iter().any(|ec| self._reply_code == *ec) {
942            Ok(())
943        } else {
944            Err(FtpError::InvalidResponse(format!(
945                "Expected code {:?}, got response: {}",
946                expected_code,
947                reply_string.unwrap().as_str()
948            )))
949        }
950    }
951}
952
953#[cfg(test)]
954mod tests {
955    use tokio_stream::once;
956    use tokio_util::io::StreamReader;
957
958    use super::FtpClient;
959
960    #[tokio::test]
961    async fn list_command_dos_newlines() {
962        let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
963            b"Hello\r\nWorld\r\n\r\nBe\r\nHappy\r\n" as &[u8],
964        )));
965
966        assert_eq!(
967            FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
968            ["Hello", "World", "Be", "Happy"]
969                .iter()
970                .map(<&str>::to_string)
971                .collect::<Vec<_>>()
972        );
973    }
974
975    #[tokio::test]
976    async fn list_command_unix_newlines() {
977        let data_stream = StreamReader::new(once(Ok::<_, std::io::Error>(
978            b"Hello\nWorld\n\nBe\nHappy\n" as &[u8],
979        )));
980
981        assert_eq!(
982            FtpClient::get_lines_from_stream(data_stream).await.unwrap(),
983            ["Hello", "World", "Be", "Happy"]
984                .iter()
985                .map(<&str>::to_string)
986                .collect::<Vec<_>>()
987        );
988    }
989}