1use std::net::SocketAddr;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12use tokio::io::{
13 AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, ReadBuf,
14};
15use tokio::io::{ReadHalf, WriteHalf};
16use tokio::net::TcpStream;
17
18use crate::error::Error;
19use crate::protocol::http::response::Response;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum FtpSslMode {
24 None,
26 Explicit,
28 Implicit,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum UseSsl {
38 #[default]
40 None,
41 Try,
43 All,
45}
46
47#[allow(clippy::large_enum_variant)]
51pub(crate) enum FtpStream {
52 Plain(TcpStream),
54 #[cfg(feature = "rustls")]
56 Tls(tokio_rustls::client::TlsStream<TcpStream>),
57}
58
59impl AsyncRead for FtpStream {
60 fn poll_read(
61 self: Pin<&mut Self>,
62 cx: &mut Context<'_>,
63 buf: &mut ReadBuf<'_>,
64 ) -> Poll<std::io::Result<()>> {
65 match self.get_mut() {
66 Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
67 #[cfg(feature = "rustls")]
68 Self::Tls(s) => Pin::new(s).poll_read(cx, buf),
69 }
70 }
71}
72
73impl AsyncWrite for FtpStream {
74 fn poll_write(
75 self: Pin<&mut Self>,
76 cx: &mut Context<'_>,
77 buf: &[u8],
78 ) -> Poll<std::io::Result<usize>> {
79 match self.get_mut() {
80 Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
81 #[cfg(feature = "rustls")]
82 Self::Tls(s) => Pin::new(s).poll_write(cx, buf),
83 }
84 }
85
86 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
87 match self.get_mut() {
88 Self::Plain(s) => Pin::new(s).poll_flush(cx),
89 #[cfg(feature = "rustls")]
90 Self::Tls(s) => Pin::new(s).poll_flush(cx),
91 }
92 }
93
94 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
95 match self.get_mut() {
96 Self::Plain(s) => Pin::new(s).poll_shutdown(cx),
97 #[cfg(feature = "rustls")]
98 Self::Tls(s) => Pin::new(s).poll_shutdown(cx),
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct FtpResponse {
106 pub code: u16,
108 pub message: String,
110 pub raw_bytes: Vec<u8>,
112}
113
114impl FtpResponse {
115 #[must_use]
117 pub const fn is_preliminary(&self) -> bool {
118 self.code >= 100 && self.code < 200
119 }
120
121 #[must_use]
123 pub const fn is_complete(&self) -> bool {
124 self.code >= 200 && self.code < 300
125 }
126
127 #[must_use]
129 pub const fn is_intermediate(&self) -> bool {
130 self.code >= 300 && self.code < 400
131 }
132
133 #[must_use]
135 pub const fn is_negative_transient(&self) -> bool {
136 self.code >= 400 && self.code < 500
137 }
138
139 #[must_use]
141 pub const fn is_negative_permanent(&self) -> bool {
142 self.code >= 500 && self.code < 600
143 }
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
151pub enum FtpMethod {
152 #[default]
154 MultiCwd,
155 SingleCwd,
157 NoCwd,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub enum TransferType {
164 Ascii,
166 Binary,
168}
169
170#[derive(Debug, Clone, Default)]
172#[allow(clippy::struct_excessive_bools)]
173pub struct FtpFeatures {
174 pub epsv: bool,
176 pub mlst: bool,
178 pub rest_stream: bool,
180 pub size: bool,
182 pub utf8: bool,
184 pub auth_tls: bool,
186 pub raw: Vec<String>,
188}
189
190#[derive(Debug, Clone)]
195pub enum FtpProxyConfig {
196 Socks4 {
198 host: String,
200 port: u16,
202 user_id: String,
204 socks4a: bool,
206 },
207 Socks5 {
209 host: String,
211 port: u16,
213 auth: Option<(String, String)>,
215 },
216 HttpConnect {
218 host: String,
220 port: u16,
222 user_agent: String,
224 },
225}
226
227async fn connect_via_proxy(
229 proxy: &FtpProxyConfig,
230 target_host: &str,
231 target_port: u16,
232) -> Result<TcpStream, Error> {
233 let (proxy_host, proxy_port) = match proxy {
234 FtpProxyConfig::Socks4 { host, port, .. }
235 | FtpProxyConfig::Socks5 { host, port, .. }
236 | FtpProxyConfig::HttpConnect { host, port, .. } => (host.as_str(), *port),
237 };
238 let proxy_addr = format!("{proxy_host}:{proxy_port}");
239 let tcp = TcpStream::connect(&proxy_addr).await.map_err(Error::Connect)?;
240
241 match proxy {
242 FtpProxyConfig::Socks5 { auth, .. } => {
243 let auth_ref = auth.as_ref().map(|(u, p)| (u.as_str(), p.as_str()));
244 crate::proxy::socks::connect_socks5(tcp, target_host, target_port, auth_ref).await
245 }
246 FtpProxyConfig::Socks4 { user_id, .. } => {
247 crate::proxy::socks::connect_socks4(tcp, target_host, target_port, user_id).await
248 }
249 FtpProxyConfig::HttpConnect { user_agent, .. } => {
250 connect_http_tunnel(tcp, target_host, target_port, user_agent).await
251 }
252 }
253}
254
255async fn connect_http_tunnel(
260 mut stream: TcpStream,
261 target_host: &str,
262 target_port: u16,
263 user_agent: &str,
264) -> Result<TcpStream, Error> {
265 use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _};
266 let request = format!(
267 "CONNECT {target_host}:{target_port} HTTP/1.1\r\n\
268 Host: {target_host}:{target_port}\r\n\
269 User-Agent: {user_agent}\r\n\
270 Proxy-Connection: Keep-Alive\r\n\
271 \r\n"
272 );
273 stream.write_all(request.as_bytes()).await.map_err(Error::Io)?;
274 stream.flush().await.map_err(Error::Io)?;
275
276 let mut buf_reader = tokio::io::BufReader::new(&mut stream);
278 let mut status_line = String::new();
279 let _ = buf_reader.read_line(&mut status_line).await.map_err(Error::Io)?;
280
281 let status_code =
283 status_line.split_whitespace().nth(1).and_then(|s| s.parse::<u16>().ok()).unwrap_or(0);
284
285 loop {
287 let mut line = String::new();
288 let _ = buf_reader.read_line(&mut line).await.map_err(Error::Io)?;
289 if line.trim().is_empty() {
290 break;
291 }
292 }
293
294 if status_code == 200 {
295 Ok(stream)
296 } else {
297 Err(Error::Transfer {
298 code: 56,
299 message: format!(
300 "CONNECT tunnel to {target_host}:{target_port} failed with status {status_code}"
301 ),
302 })
303 }
304}
305
306#[derive(Debug, Clone)]
311#[allow(clippy::struct_excessive_bools)] pub struct FtpConfig {
313 pub use_epsv: bool,
315 pub use_eprt: bool,
317 pub skip_pasv_ip: bool,
319 pub account: Option<String>,
321 pub create_dirs: bool,
323 pub method: FtpMethod,
325 pub active_port: Option<String>,
327 pub use_ascii: bool,
329 pub append: bool,
331 pub crlf: bool,
333 pub list_only: bool,
335 pub nobody: bool,
337 pub pre_quote: Vec<String>,
339 pub post_pasv_quote: Vec<String>,
341 pub post_quote: Vec<String>,
343 pub time_condition: Option<(i64, bool)>,
347 pub range_end: Option<u64>,
350 pub range_from_end: Option<u64>,
353 pub ignore_content_length: bool,
355 pub max_filesize: Option<u64>,
358 pub use_pret: bool,
360 pub ssl_control: bool,
363 pub ssl_ccc: bool,
365 pub alternative_to_user: Option<String>,
367}
368
369impl Default for FtpConfig {
370 fn default() -> Self {
371 Self {
372 use_epsv: true,
373 use_eprt: true,
374 skip_pasv_ip: false,
375 account: None,
376 create_dirs: false,
377 method: FtpMethod::default(),
378 active_port: None,
379 use_ascii: false,
380 append: false,
381 crlf: false,
382 list_only: false,
383 nobody: false,
384 pre_quote: Vec::new(),
385 post_pasv_quote: Vec::new(),
386 post_quote: Vec::new(),
387 time_condition: None,
388 range_end: None,
389 range_from_end: None,
390 ignore_content_length: false,
391 max_filesize: None,
392 use_pret: false,
393 ssl_control: false,
394 ssl_ccc: false,
395 alternative_to_user: None,
396 }
397 }
398}
399
400#[allow(clippy::large_enum_variant)]
406pub(crate) enum DataConnection {
407 Connected(FtpStream),
409 PendingActive {
411 listener: tokio::net::TcpListener,
413 use_tls: bool,
415 },
416}
417
418impl DataConnection {
419 async fn into_stream(
424 self,
425 session: &FtpSession,
426 timeout: Option<std::time::Duration>,
427 ) -> Result<FtpStream, Error> {
428 match self {
429 Self::Connected(stream) => Ok(stream),
430 Self::PendingActive { listener, use_tls } => {
431 let accept_fut = listener.accept();
432 let (tcp, _) = if let Some(dur) = timeout {
433 tokio::time::timeout(dur, accept_fut).await.map_err(|_| Error::Transfer {
434 code: 10,
435 message: "FTP active mode accept timed out".to_string(),
436 })?
437 } else {
438 accept_fut.await
439 }
440 .map_err(|e| Error::Http(format!("FTP active mode accept failed: {e}")))?;
441
442 if use_tls {
443 session.maybe_wrap_data_tls(tcp).await
444 } else {
445 Ok(FtpStream::Plain(tcp))
446 }
447 }
448 }
449 }
450}
451
452pub struct FtpSession {
457 reader: BufReader<ReadHalf<FtpStream>>,
458 writer: WriteHalf<FtpStream>,
459 features: Option<FtpFeatures>,
460 hostname: String,
462 port: u16,
464 user: String,
466 local_addr: SocketAddr,
468 use_tls_data: bool,
470 active_port: Option<String>,
472 #[cfg(feature = "rustls")]
474 tls_connector: Option<crate::tls::TlsConnector>,
475 config: FtpConfig,
477 header_bytes: Vec<u8>,
479 current_dir: Vec<String>,
482 home_dir: Option<String>,
485 current_type: Option<TransferType>,
487 proxy_config: Option<FtpProxyConfig>,
489 connect_response_bytes: Vec<u8>,
491}
492
493impl FtpSession {
494 async fn read_and_record(&mut self) -> Result<FtpResponse, Error> {
496 let resp = read_response(&mut self.reader).await?;
497 self.header_bytes.extend_from_slice(&resp.raw_bytes);
498 Ok(resp)
499 }
500
501 pub async fn connect(
507 host: &str,
508 port: u16,
509 user: &str,
510 pass: &str,
511 config: FtpConfig,
512 ) -> Result<Self, Error> {
513 Self::connect_maybe_proxy(host, port, user, pass, config, None).await
514 }
515
516 pub async fn connect_maybe_proxy(
525 host: &str,
526 port: u16,
527 user: &str,
528 pass: &str,
529 config: FtpConfig,
530 proxy: Option<FtpProxyConfig>,
531 ) -> Result<Self, Error> {
532 let (tcp, connect_response_bytes) = if let Some(ref proxy_config) = proxy {
533 let stream = connect_via_proxy(proxy_config, host, port).await?;
534 let connect_bytes = if matches!(proxy_config, FtpProxyConfig::HttpConnect { .. }) {
536 b"HTTP/1.1 200 Connection established\r\n\r\n".to_vec()
537 } else {
538 Vec::new()
539 };
540 (stream, connect_bytes)
541 } else {
542 let addr = format!("{host}:{port}");
543 let stream = TcpStream::connect(&addr).await.map_err(Error::Connect)?;
544 (stream, Vec::new())
545 };
546 let local_addr = tcp.local_addr().map_err(Error::Connect)?;
547 let stream = FtpStream::Plain(tcp);
548 let (reader, writer) = tokio::io::split(stream);
549 let mut reader = BufReader::new(reader);
550
551 let greeting = read_response(&mut reader).await?;
553 if !greeting.is_complete() {
554 return Err(Error::Http(format!(
555 "FTP server rejected connection: {} {}",
556 greeting.code, greeting.message
557 )));
558 }
559 let skip_login = greeting.code == 230;
561
562 let active_port = config.active_port.clone();
563 let alt_to_user = config.alternative_to_user.clone();
564 let mut header_bytes = Vec::new();
565 header_bytes.extend_from_slice(&greeting.raw_bytes);
566 let mut session = Self {
567 reader,
568 writer,
569 features: None,
570 hostname: host.to_string(),
571 port,
572 user: user.to_string(),
573 local_addr,
574 use_tls_data: false,
575 active_port,
576 #[cfg(feature = "rustls")]
577 tls_connector: None,
578 config,
579 header_bytes,
580 current_dir: Vec::new(),
581 home_dir: None,
582 current_type: None,
583 proxy_config: proxy,
584 connect_response_bytes,
585 };
586
587 session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
589
590 if let Some(ref account) = session.config.account {
592 let acct_cmd = format!("ACCT {account}");
593 send_command(&mut session.writer, &acct_cmd).await?;
594 let acct_resp = session.read_and_record().await?;
595 if !acct_resp.is_complete() {
596 return Err(Error::Transfer {
597 code: 11,
598 message: format!("FTP ACCT failed: {} {}", acct_resp.code, acct_resp.message),
599 });
600 }
601 }
602
603 Ok(session)
604 }
605
606 #[cfg(feature = "rustls")]
616 #[allow(clippy::too_many_arguments)]
617 pub async fn connect_with_tls(
618 host: &str,
619 port: u16,
620 user: &str,
621 pass: &str,
622 ssl_mode: FtpSslMode,
623 use_ssl: UseSsl,
624 tls_config: &crate::tls::TlsConfig,
625 config: FtpConfig,
626 ) -> Result<Self, Error> {
627 if ssl_mode == FtpSslMode::None {
628 return Self::connect(host, port, user, pass, config).await;
629 }
630
631 let tls_connector = crate::tls::TlsConnector::new_no_alpn(tls_config)?;
632
633 let addr = format!("{host}:{port}");
634 let tcp = TcpStream::connect(&addr).await.map_err(Error::Connect)?;
635 let local_addr = tcp.local_addr().map_err(Error::Connect)?;
636
637 let stream = match ssl_mode {
638 FtpSslMode::Implicit => {
639 let (tls_stream, _) = tls_connector.connect(tcp, host).await?;
641 FtpStream::Tls(tls_stream)
642 }
643 FtpSslMode::Explicit | FtpSslMode::None => {
644 FtpStream::Plain(tcp)
646 }
647 };
648
649 let (reader, writer) = tokio::io::split(stream);
650 let mut reader = BufReader::new(reader);
651
652 let greeting = read_response(&mut reader).await?;
654 if !greeting.is_complete() {
655 return Err(Error::Http(format!(
656 "FTP server rejected connection: {} {}",
657 greeting.code, greeting.message
658 )));
659 }
660 let skip_login = greeting.code == 230;
661
662 let active_port = config.active_port.clone();
663 let alt_to_user = config.alternative_to_user.clone();
664 let mut header_bytes = Vec::new();
665 header_bytes.extend_from_slice(&greeting.raw_bytes);
666 let mut session = Self {
667 reader,
668 writer,
669 features: None,
670 hostname: host.to_string(),
671 port,
672 user: user.to_string(),
673 local_addr,
674 use_tls_data: false,
675 active_port,
676 tls_connector: Some(tls_connector),
677 config,
678 header_bytes,
679 current_dir: Vec::new(),
680 home_dir: None,
681 current_type: None,
682 proxy_config: None,
683 connect_response_bytes: Vec::new(),
684 };
685
686 if ssl_mode == FtpSslMode::Explicit {
688 let (upgraded_session, auth_succeeded) =
689 session.auth_tls_with_fallback(use_ssl).await?;
690 session = upgraded_session;
691 if auth_succeeded {
692 session.setup_data_protection().await?;
694 }
695 session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
696 } else {
697 session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
698 session.setup_data_protection().await?;
699 }
700
701 if let Some(ref account) = session.config.account {
703 let acct_cmd = format!("ACCT {account}");
704 send_command(&mut session.writer, &acct_cmd).await?;
705 let acct_resp = session.read_and_record().await?;
706 if !acct_resp.is_complete() {
707 return Err(Error::Transfer {
708 code: 11,
709 message: format!("FTP ACCT failed: {} {}", acct_resp.code, acct_resp.message),
710 });
711 }
712 }
713
714 Ok(session)
715 }
716
717 #[cfg(feature = "rustls")]
727 async fn auth_tls_with_fallback(mut self, use_ssl: UseSsl) -> Result<(Self, bool), Error> {
728 send_command(&mut self.writer, "AUTH SSL").await?;
730 let resp = self.read_and_record().await?;
731 if resp.is_complete() {
732 return Ok((self.do_tls_upgrade().await?, true));
734 }
735
736 if use_ssl == UseSsl::All {
737 send_command(&mut self.writer, "AUTH TLS").await?;
739 let resp2 = self.read_and_record().await?;
740 if resp2.is_complete() {
741 return Ok((self.do_tls_upgrade().await?, true));
743 }
744
745 return Err(Error::Transfer {
747 code: 64,
748 message: "FTP AUTH SSL/TLS failed: server does not support TLS".to_string(),
749 });
750 }
751
752 if !self.reader.buffer().is_empty() {
756 return Err(Error::Protocol(8));
757 }
758 Ok((self, false))
760 }
761
762 #[cfg(feature = "rustls")]
764 async fn do_tls_upgrade(self) -> Result<Self, Error> {
765 let reader_inner = self.reader.into_inner();
767 let stream = reader_inner.unsplit(self.writer);
768
769 let tcp = match stream {
771 FtpStream::Plain(tcp) => tcp,
772 FtpStream::Tls(_) => {
773 return Err(Error::Http("AUTH TLS on already-encrypted connection".to_string()));
774 }
775 };
776
777 let connector = self
779 .tls_connector
780 .as_ref()
781 .ok_or_else(|| Error::Http("No TLS connector available for AUTH TLS".to_string()))?;
782 let (tls_stream, _) = connector.connect(tcp, &self.hostname).await?;
783
784 let ftp_stream = FtpStream::Tls(tls_stream);
786 let (reader, writer) = tokio::io::split(ftp_stream);
787
788 Ok(Self {
789 reader: BufReader::new(reader),
790 writer,
791 features: self.features,
792 hostname: self.hostname,
793 port: self.port,
794 user: self.user,
795 local_addr: self.local_addr,
796 use_tls_data: false,
797 active_port: self.active_port,
798 tls_connector: self.tls_connector,
799 config: self.config,
800 header_bytes: self.header_bytes,
801 current_dir: self.current_dir,
802 home_dir: self.home_dir,
803 current_type: self.current_type,
804 proxy_config: self.proxy_config,
805 connect_response_bytes: self.connect_response_bytes,
806 })
807 }
808
809 #[cfg(feature = "rustls")]
815 async fn setup_data_protection(&mut self) -> Result<(), Error> {
816 send_command(&mut self.writer, "PBSZ 0").await?;
818 let _pbsz_resp = self.read_and_record().await?;
819 let prot_cmd = if self.config.ssl_control { "PROT C" } else { "PROT P" };
824 send_command(&mut self.writer, prot_cmd).await?;
825 let prot_resp = self.read_and_record().await?;
826 if prot_resp.is_complete() {
828 self.use_tls_data = !self.config.ssl_control;
830 }
831
832 if self.config.ssl_ccc {
836 send_command(&mut self.writer, "CCC").await?;
837 let _ccc_resp = self.read_and_record().await?;
838 }
840
841 Ok(())
842 }
843
844 pub fn set_active_port(&mut self, addr: &str) {
850 self.active_port = Some(addr.to_string());
851 }
852
853 async fn login(
861 &mut self,
862 user: &str,
863 pass: &str,
864 skip_login: bool,
865 alternative_to_user: Option<&str>,
866 ) -> Result<(), Error> {
867 if skip_login {
868 return Ok(());
869 }
870
871 send_command(&mut self.writer, &format!("USER {user}")).await?;
872 let user_resp = self.read_and_record().await?;
873
874 if user_resp.code == 331 {
875 send_command(&mut self.writer, &format!("PASS {pass}")).await?;
877 let pass_resp = self.read_and_record().await?;
878 if pass_resp.code == 332 {
879 if self.config.account.is_none() {
882 return Err(Error::Transfer {
883 code: 67,
884 message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
885 });
886 }
887 } else if !pass_resp.is_complete() {
888 return Err(Error::Transfer {
889 code: 67,
890 message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
891 });
892 }
893 } else if user_resp.is_complete() {
894 } else if alternative_to_user.is_some() {
896 let alt = alternative_to_user.unwrap_or_default();
898 send_command(&mut self.writer, alt).await?;
899 let alt_resp = self.read_and_record().await?;
900 if alt_resp.code == 331 {
901 send_command(&mut self.writer, &format!("PASS {pass}")).await?;
903 let pass_resp = self.read_and_record().await?;
904 if !pass_resp.is_complete() && pass_resp.code != 332 {
905 return Err(Error::Transfer {
906 code: 67,
907 message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
908 });
909 }
910 } else if !alt_resp.is_complete() {
911 return Err(Error::Transfer {
912 code: 67,
913 message: format!("Access denied: {} {}", alt_resp.code, alt_resp.message),
914 });
915 }
916 } else {
917 return Err(Error::Transfer {
918 code: 67,
919 message: format!("Access denied: {} {}", user_resp.code, user_resp.message),
920 });
921 }
922
923 Ok(())
924 }
925
926 async fn pwd_safe(&mut self) -> Option<String> {
928 if send_command(&mut self.writer, "PWD").await.is_err() {
929 return None;
930 }
931 match self.read_and_record().await {
932 Ok(resp) if resp.is_complete() => {
933 if let Some(start) = resp.message.find('"') {
937 if let Some(end) = resp.message[start + 1..].find('"') {
938 return Some(resp.message[start + 1..start + 1 + end].to_string());
939 }
940 return None;
942 }
943 Some(resp.message)
944 }
945 _ => None,
946 }
947 }
948
949 pub async fn feat(&mut self) -> Result<&FtpFeatures, Error> {
956 send_command(&mut self.writer, "FEAT").await?;
957 let resp = self.read_and_record().await?;
958
959 let features = if resp.is_complete() {
960 parse_feat_response(&resp.message)
961 } else {
962 FtpFeatures::default()
964 };
965
966 self.features = Some(features);
967 Ok(self.features.get_or_insert_with(FtpFeatures::default))
969 }
970
971 pub async fn set_type(&mut self, transfer_type: TransferType) -> Result<(), Error> {
977 let type_cmd = match transfer_type {
978 TransferType::Ascii => "TYPE A",
979 TransferType::Binary => "TYPE I",
980 };
981 send_command(&mut self.writer, type_cmd).await?;
982 let resp = self.read_and_record().await?;
983 if !resp.is_complete() {
984 return Err(Error::Http(format!("FTP TYPE failed: {} {}", resp.code, resp.message)));
985 }
986 Ok(())
987 }
988
989 async fn open_data_connection(&mut self) -> Result<DataConnection, Error> {
994 if let Some(ref addr) = self.active_port {
995 let addr = addr.clone();
996 self.open_active_data_connection(&addr).await
997 } else {
998 let stream = self.open_passive_data_connection(None).await?;
999 Ok(DataConnection::Connected(stream))
1000 }
1001 }
1002
1003 async fn open_data_connection_with_pret(
1008 &mut self,
1009 pret_cmd: &str,
1010 ) -> Result<DataConnection, Error> {
1011 if let Some(ref addr) = self.active_port {
1012 let addr = addr.clone();
1013 self.open_active_data_connection(&addr).await
1014 } else {
1015 let pret = if self.config.use_pret { Some(pret_cmd) } else { None };
1016 let stream = self.open_passive_data_connection(pret).await?;
1017 Ok(DataConnection::Connected(stream))
1018 }
1019 }
1020
1021 async fn open_passive_data_connection(
1031 &mut self,
1032 pret_cmd: Option<&str>,
1033 ) -> Result<FtpStream, Error> {
1034 if let Some(cmd) = pret_cmd {
1036 send_command(&mut self.writer, &format!("PRET {cmd}")).await?;
1037 let pret_resp = self.read_and_record().await?;
1038 if pret_resp.code >= 400 {
1039 return Err(Error::Transfer {
1041 code: 84,
1042 message: format!(
1043 "PRET command not accepted: {} {}",
1044 pret_resp.code, pret_resp.message
1045 ),
1046 });
1047 }
1048 }
1049 let force_epsv = self.local_addr.is_ipv6();
1052
1053 if self.config.use_epsv || force_epsv {
1055 send_command(&mut self.writer, "EPSV").await?;
1056 let epsv_resp = self.read_and_record().await?;
1057 if epsv_resp.code == 229 {
1058 match parse_epsv_response(&epsv_resp.message) {
1059 Ok(data_port) => {
1060 let tcp = self.connect_data(&self.hostname.clone(), data_port).await;
1061 match tcp {
1062 Ok(tcp) => {
1063 if matches!(
1065 self.proxy_config,
1066 Some(FtpProxyConfig::HttpConnect { .. })
1067 ) {
1068 self.connect_response_bytes.extend_from_slice(
1069 b"HTTP/1.1 200 Connection established\r\n\r\n",
1070 );
1071 }
1072 return self.maybe_wrap_data_tls(tcp).await;
1073 }
1074 Err(e) => {
1075 if force_epsv {
1076 return Err(Error::Http(format!(
1078 "FTP EPSV data connection failed: {e}"
1079 )));
1080 }
1081 self.config.use_epsv = false;
1084 }
1085 }
1086 }
1087 Err(e) => {
1088 return Err(e);
1091 }
1092 }
1093 } else if force_epsv {
1094 return Err(Error::Transfer {
1096 code: 13,
1097 message: format!(
1098 "FTP EPSV failed: {} {} (PASV not available for IPv6)",
1099 epsv_resp.code, epsv_resp.message
1100 ),
1101 });
1102 } else {
1103 self.config.use_epsv = false;
1105 }
1106 }
1107
1108 send_command(&mut self.writer, "PASV").await?;
1109 let pasv_resp = self.read_and_record().await?;
1110 if pasv_resp.code != 227 {
1111 return Err(Error::Transfer {
1112 code: 13,
1113 message: format!("FTP PASV failed: {} {}", pasv_resp.code, pasv_resp.message),
1114 });
1115 }
1116 let (data_host, data_port) = parse_pasv_response(&pasv_resp.message)?;
1117
1118 let effective_host =
1121 if self.config.skip_pasv_ip { self.hostname.clone() } else { data_host };
1122
1123 let tcp = self
1124 .connect_data(&effective_host, data_port)
1125 .await
1126 .map_err(|e| Error::Http(format!("FTP data connection failed: {e}")))?;
1127
1128 self.maybe_wrap_data_tls(tcp).await
1129 }
1130
1131 async fn connect_data(&self, host: &str, port: u16) -> Result<TcpStream, Error> {
1133 if let Some(ref proxy) = self.proxy_config {
1134 connect_via_proxy(proxy, host, port).await
1135 } else {
1136 let data_addr = format!("{host}:{port}");
1137 TcpStream::connect(&data_addr).await.map_err(Error::Connect)
1138 }
1139 }
1140
1141 async fn open_active_data_connection(
1151 &mut self,
1152 bind_addr: &str,
1153 ) -> Result<DataConnection, Error> {
1154 let advertise_ip: std::net::IpAddr = if bind_addr == "-" {
1158 self.local_addr.ip()
1159 } else {
1160 bind_addr.parse().map_err(|e| {
1161 Error::Http(format!("Invalid FTP active address '{bind_addr}': {e}"))
1162 })?
1163 };
1164
1165 let bind_ip = if bind_addr == "-" {
1169 self.local_addr.ip()
1170 } else if advertise_ip.is_ipv6() {
1171 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED)
1172 } else {
1173 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)
1174 };
1175 let bind = SocketAddr::new(bind_ip, 0);
1176 let listener = tokio::net::TcpListener::bind(bind)
1177 .await
1178 .map_err(|e| Error::Http(format!("FTP active mode bind failed: {e}")))?;
1179 let listen_addr = listener
1180 .local_addr()
1181 .map_err(|e| Error::Http(format!("FTP active mode local_addr failed: {e}")))?;
1182 let advertise_addr = SocketAddr::new(advertise_ip, listen_addr.port());
1184 let local_ip = advertise_ip;
1185
1186 let mut port_ok = false;
1190
1191 if local_ip.is_ipv6() || self.config.use_eprt {
1192 let eprt_cmd = format_eprt_command(&advertise_addr);
1193 send_command(&mut self.writer, &eprt_cmd).await?;
1194 let resp = self.read_and_record().await?;
1195 if resp.is_complete() {
1196 port_ok = true;
1197 } else if local_ip.is_ipv6() {
1198 return Err(Error::Transfer {
1200 code: 30,
1201 message: format!("FTP EPRT failed: {} {}", resp.code, resp.message),
1202 });
1203 }
1204 if !port_ok {
1206 self.config.use_eprt = false;
1207 }
1208 }
1209
1210 if !port_ok && local_ip.is_ipv4() {
1211 let port_cmd = format_port_command(&advertise_addr);
1212 send_command(&mut self.writer, &port_cmd).await?;
1213 let resp = self.read_and_record().await?;
1214 if !resp.is_complete() {
1215 return Err(Error::Transfer {
1216 code: 30,
1217 message: format!("FTP PORT failed: {} {}", resp.code, resp.message),
1218 });
1219 }
1220 }
1221
1222 Ok(DataConnection::PendingActive { listener, use_tls: self.use_tls_data })
1226 }
1227
1228 async fn maybe_wrap_data_tls(&self, tcp: TcpStream) -> Result<FtpStream, Error> {
1233 #[cfg(feature = "rustls")]
1234 if self.use_tls_data {
1235 if let Some(ref connector) = self.tls_connector {
1236 let (tls_stream, _) = connector.connect(tcp, &self.hostname).await?;
1237 return Ok(FtpStream::Tls(tls_stream));
1238 }
1239 }
1240
1241 Ok(FtpStream::Plain(tcp))
1242 }
1243
1244 pub async fn download(&mut self, path: &str) -> Result<Vec<u8>, Error> {
1250 self.set_type(TransferType::Binary).await?;
1251 let data_conn = self.open_data_connection().await?;
1252 let mut data_stream = data_conn.into_stream(self, None).await?;
1253
1254 send_command(&mut self.writer, &format!("RETR {path}")).await?;
1255 let retr_resp = self.read_and_record().await?;
1256 if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
1257 return Err(Error::Http(format!(
1258 "FTP RETR failed: {} {}",
1259 retr_resp.code, retr_resp.message
1260 )));
1261 }
1262
1263 let mut data = Vec::new();
1264 let _ = data_stream
1265 .read_to_end(&mut data)
1266 .await
1267 .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
1268 drop(data_stream);
1269
1270 let complete_resp = self.read_and_record().await?;
1271 if !complete_resp.is_complete() {
1272 return Err(Error::Http(format!(
1273 "FTP transfer failed: {} {}",
1274 complete_resp.code, complete_resp.message
1275 )));
1276 }
1277
1278 Ok(data)
1279 }
1280
1281 pub async fn download_resume(&mut self, path: &str, offset: u64) -> Result<Vec<u8>, Error> {
1287 self.set_type(TransferType::Binary).await?;
1288 let data_conn = self.open_data_connection().await?;
1289 let mut data_stream = data_conn.into_stream(self, None).await?;
1290
1291 send_command(&mut self.writer, &format!("REST {offset}")).await?;
1293 let rest_resp = self.read_and_record().await?;
1294 if !rest_resp.is_intermediate() {
1295 return Err(Error::Http(format!(
1296 "FTP REST failed: {} {}",
1297 rest_resp.code, rest_resp.message
1298 )));
1299 }
1300
1301 send_command(&mut self.writer, &format!("RETR {path}")).await?;
1302 let retr_resp = self.read_and_record().await?;
1303 if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
1304 return Err(Error::Http(format!(
1305 "FTP RETR failed: {} {}",
1306 retr_resp.code, retr_resp.message
1307 )));
1308 }
1309
1310 let mut data = Vec::new();
1311 let _ = data_stream
1312 .read_to_end(&mut data)
1313 .await
1314 .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
1315 drop(data_stream);
1316
1317 let complete_resp = self.read_and_record().await?;
1318 if !complete_resp.is_complete() {
1319 return Err(Error::Http(format!(
1320 "FTP transfer failed: {} {}",
1321 complete_resp.code, complete_resp.message
1322 )));
1323 }
1324
1325 Ok(data)
1326 }
1327
1328 pub async fn upload(&mut self, path: &str, data: &[u8]) -> Result<(), Error> {
1334 self.set_type(TransferType::Binary).await?;
1335 let data_conn = self.open_data_connection().await?;
1336 let mut data_stream = data_conn.into_stream(self, None).await?;
1337
1338 send_command(&mut self.writer, &format!("STOR {path}")).await?;
1339 let stor_resp = self.read_and_record().await?;
1340 if !stor_resp.is_preliminary() && !stor_resp.is_complete() {
1341 return Err(Error::Http(format!(
1342 "FTP STOR failed: {} {}",
1343 stor_resp.code, stor_resp.message
1344 )));
1345 }
1346
1347 data_stream
1348 .write_all(data)
1349 .await
1350 .map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
1351 data_stream
1352 .shutdown()
1353 .await
1354 .map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
1355 drop(data_stream);
1356
1357 let complete_resp = self.read_and_record().await?;
1358 if !complete_resp.is_complete() {
1359 return Err(Error::Http(format!(
1360 "FTP upload failed: {} {}",
1361 complete_resp.code, complete_resp.message
1362 )));
1363 }
1364
1365 Ok(())
1366 }
1367
1368 pub async fn append(&mut self, path: &str, data: &[u8]) -> Result<(), Error> {
1374 self.set_type(TransferType::Binary).await?;
1375 let data_conn = self.open_data_connection().await?;
1376 let mut data_stream = data_conn.into_stream(self, None).await?;
1377
1378 send_command(&mut self.writer, &format!("APPE {path}")).await?;
1379 let appe_resp = self.read_and_record().await?;
1380 if !appe_resp.is_preliminary() && !appe_resp.is_complete() {
1381 return Err(Error::Http(format!(
1382 "FTP APPE failed: {} {}",
1383 appe_resp.code, appe_resp.message
1384 )));
1385 }
1386
1387 data_stream
1388 .write_all(data)
1389 .await
1390 .map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
1391 data_stream
1392 .shutdown()
1393 .await
1394 .map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
1395 drop(data_stream);
1396
1397 let complete_resp = self.read_and_record().await?;
1398 if !complete_resp.is_complete() {
1399 return Err(Error::Http(format!(
1400 "FTP append failed: {} {}",
1401 complete_resp.code, complete_resp.message
1402 )));
1403 }
1404
1405 Ok(())
1406 }
1407
1408 pub async fn list(&mut self, path: Option<&str>) -> Result<Vec<u8>, Error> {
1414 if let Some(dir) = path {
1415 if !dir.is_empty() && dir != "/" {
1416 send_command(&mut self.writer, &format!("CWD {dir}")).await?;
1417 let cwd_resp = self.read_and_record().await?;
1418 if !cwd_resp.is_complete() {
1419 return Err(Error::Http(format!(
1420 "FTP CWD failed: {} {}",
1421 cwd_resp.code, cwd_resp.message
1422 )));
1423 }
1424 }
1425 }
1426
1427 let data_conn = self.open_data_connection().await?;
1428 let mut data_stream = data_conn.into_stream(self, None).await?;
1429
1430 send_command(&mut self.writer, "LIST").await?;
1431 let list_resp = self.read_and_record().await?;
1432 if !list_resp.is_preliminary() && !list_resp.is_complete() {
1433 return Err(Error::Http(format!(
1434 "FTP LIST failed: {} {}",
1435 list_resp.code, list_resp.message
1436 )));
1437 }
1438
1439 let mut data = Vec::new();
1440 let _ = data_stream
1441 .read_to_end(&mut data)
1442 .await
1443 .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
1444 drop(data_stream);
1445
1446 let complete_resp = self.read_and_record().await?;
1447 if !complete_resp.is_complete() {
1448 return Err(Error::Http(format!(
1449 "FTP transfer failed: {} {}",
1450 complete_resp.code, complete_resp.message
1451 )));
1452 }
1453
1454 Ok(data)
1455 }
1456
1457 pub async fn mlsd(&mut self, path: Option<&str>) -> Result<Vec<u8>, Error> {
1463 let data_conn = self.open_data_connection().await?;
1464 let mut data_stream = data_conn.into_stream(self, None).await?;
1465
1466 let cmd = path.map_or_else(|| "MLSD".to_string(), |p| format!("MLSD {p}"));
1467 send_command(&mut self.writer, &cmd).await?;
1468 let resp = self.read_and_record().await?;
1469 if !resp.is_preliminary() && !resp.is_complete() {
1470 return Err(Error::Http(format!("FTP MLSD failed: {} {}", resp.code, resp.message)));
1471 }
1472
1473 let mut data = Vec::new();
1474 let _ = data_stream
1475 .read_to_end(&mut data)
1476 .await
1477 .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
1478 drop(data_stream);
1479
1480 let complete_resp = self.read_and_record().await?;
1481 if !complete_resp.is_complete() {
1482 return Err(Error::Http(format!(
1483 "FTP MLSD transfer failed: {} {}",
1484 complete_resp.code, complete_resp.message
1485 )));
1486 }
1487
1488 Ok(data)
1489 }
1490
1491 pub async fn size(&mut self, path: &str) -> Result<u64, Error> {
1497 send_command(&mut self.writer, &format!("SIZE {path}")).await?;
1498 let resp = self.read_and_record().await?;
1499 if !resp.is_complete() {
1500 return Err(Error::Http(format!("FTP SIZE failed: {} {}", resp.code, resp.message)));
1501 }
1502 resp.message
1503 .trim()
1504 .parse::<u64>()
1505 .map_err(|e| Error::Http(format!("FTP SIZE parse error: {e}")))
1506 }
1507
1508 pub async fn mkdir(&mut self, path: &str) -> Result<(), Error> {
1514 send_command(&mut self.writer, &format!("MKD {path}")).await?;
1515 let resp = self.read_and_record().await?;
1516 if !resp.is_complete() {
1517 return Err(Error::Http(format!("FTP MKD failed: {} {}", resp.code, resp.message)));
1518 }
1519 Ok(())
1520 }
1521
1522 pub async fn rmdir(&mut self, path: &str) -> Result<(), Error> {
1528 send_command(&mut self.writer, &format!("RMD {path}")).await?;
1529 let resp = self.read_and_record().await?;
1530 if !resp.is_complete() {
1531 return Err(Error::Http(format!("FTP RMD failed: {} {}", resp.code, resp.message)));
1532 }
1533 Ok(())
1534 }
1535
1536 pub async fn delete(&mut self, path: &str) -> Result<(), Error> {
1542 send_command(&mut self.writer, &format!("DELE {path}")).await?;
1543 let resp = self.read_and_record().await?;
1544 if !resp.is_complete() {
1545 return Err(Error::Http(format!("FTP DELE failed: {} {}", resp.code, resp.message)));
1546 }
1547 Ok(())
1548 }
1549
1550 pub async fn rename(&mut self, from: &str, to: &str) -> Result<(), Error> {
1556 send_command(&mut self.writer, &format!("RNFR {from}")).await?;
1557 let rnfr_resp = self.read_and_record().await?;
1558 if !rnfr_resp.is_intermediate() {
1559 return Err(Error::Http(format!(
1560 "FTP RNFR failed: {} {}",
1561 rnfr_resp.code, rnfr_resp.message
1562 )));
1563 }
1564
1565 send_command(&mut self.writer, &format!("RNTO {to}")).await?;
1566 let rnto_resp = self.read_and_record().await?;
1567 if !rnto_resp.is_complete() {
1568 return Err(Error::Http(format!(
1569 "FTP RNTO failed: {} {}",
1570 rnto_resp.code, rnto_resp.message
1571 )));
1572 }
1573 Ok(())
1574 }
1575
1576 pub async fn site(&mut self, command: &str) -> Result<FtpResponse, Error> {
1582 send_command(&mut self.writer, &format!("SITE {command}")).await?;
1583 self.read_and_record().await
1584 }
1585
1586 pub async fn pwd(&mut self) -> Result<String, Error> {
1592 send_command(&mut self.writer, "PWD").await?;
1593 let resp = self.read_and_record().await?;
1594 if !resp.is_complete() {
1595 return Err(Error::Http(format!("FTP PWD failed: {} {}", resp.code, resp.message)));
1596 }
1597 if let Some(start) = resp.message.find('"') {
1600 if let Some(end) = resp.message[start + 1..].find('"') {
1601 return Ok(resp.message[start + 1..start + 1 + end].to_string());
1602 }
1603 }
1604 Ok(resp.message)
1605 }
1606
1607 pub async fn cwd(&mut self, path: &str) -> Result<(), Error> {
1613 send_command(&mut self.writer, &format!("CWD {path}")).await?;
1614 let resp = self.read_and_record().await?;
1615 if !resp.is_complete() {
1616 return Err(Error::Http(format!("FTP CWD failed: {} {}", resp.code, resp.message)));
1617 }
1618 Ok(())
1619 }
1620
1621 #[allow(dead_code)]
1632 async fn navigate_to_path(&mut self, path: &str) -> Result<String, Error> {
1633 match self.config.method {
1634 FtpMethod::NoCwd => Ok(path.to_string()),
1635 FtpMethod::SingleCwd => {
1636 if let Some((dir, file)) = path.rsplit_once('/') {
1637 if !dir.is_empty() {
1638 self.cwd(dir).await?;
1639 }
1640 Ok(file.to_string())
1641 } else {
1642 Ok(path.to_string())
1643 }
1644 }
1645 FtpMethod::MultiCwd => {
1646 if let Some((dir, file)) = path.rsplit_once('/') {
1647 for component in dir.split('/') {
1648 if !component.is_empty() {
1649 self.cwd(component).await?;
1650 }
1651 }
1652 Ok(file.to_string())
1653 } else {
1654 Ok(path.to_string())
1655 }
1656 }
1657 }
1658 }
1659
1660 #[allow(dead_code)]
1671 async fn create_dirs(&mut self, dir_path: &str) -> Result<(), Error> {
1672 for component in dir_path.split('/') {
1673 if component.is_empty() {
1674 continue;
1675 }
1676 send_command(&mut self.writer, &format!("CWD {component}")).await?;
1678 let cwd_resp = self.read_and_record().await?;
1679 if cwd_resp.is_complete() {
1680 continue;
1681 }
1682 send_command(&mut self.writer, &format!("MKD {component}")).await?;
1684 let mkd_resp = self.read_and_record().await?;
1685 if !mkd_resp.is_complete() {
1686 return Err(Error::Http(format!(
1687 "FTP MKD failed for '{}': {} {}",
1688 component, mkd_resp.code, mkd_resp.message
1689 )));
1690 }
1691 send_command(&mut self.writer, &format!("CWD {component}")).await?;
1692 let retry_resp = self.read_and_record().await?;
1693 if !retry_resp.is_complete() {
1694 return Err(Error::Http(format!(
1695 "FTP CWD failed after MKD for '{}': {} {}",
1696 component, retry_resp.code, retry_resp.message
1697 )));
1698 }
1699 }
1700 let _ = self.cwd("/").await;
1702 Ok(())
1703 }
1704
1705 pub async fn quit(&mut self) -> Result<(), Error> {
1714 let _ = send_command(&mut self.writer, "QUIT").await;
1715 let _ = tokio::time::timeout(
1718 std::time::Duration::from_millis(500),
1719 read_response(&mut self.reader),
1720 )
1721 .await;
1722 Ok(())
1723 }
1724
1725 #[must_use]
1727 pub fn can_reuse(&self, host: &str, port: u16, user: &str) -> bool {
1728 self.hostname == host && self.port == port && self.user == user
1729 }
1730
1731 pub fn take_connect_response_bytes(&mut self) -> Vec<u8> {
1733 std::mem::take(&mut self.connect_response_bytes)
1734 }
1735}
1736
1737pub async fn read_response<S: AsyncRead + Unpin>(
1745 stream: &mut BufReader<S>,
1746) -> Result<FtpResponse, Error> {
1747 let mut full_message = String::new();
1748 let mut final_code: Option<u16> = None;
1749 let mut raw_bytes = Vec::new();
1750
1751 loop {
1752 let mut line = String::new();
1753 let bytes_read = stream
1754 .read_line(&mut line)
1755 .await
1756 .map_err(|e| Error::Http(format!("FTP read error: {e}")))?;
1757
1758 if bytes_read == 0 {
1759 return Err(Error::Http("FTP connection closed unexpectedly".to_string()));
1760 }
1761
1762 if line.ends_with("\r\n") {
1764 raw_bytes.extend_from_slice(line.as_bytes());
1765 } else if line.ends_with('\n') {
1766 raw_bytes.extend_from_slice(&line.as_bytes()[..line.len() - 1]);
1767 raw_bytes.extend_from_slice(b"\r\n");
1768 } else {
1769 raw_bytes.extend_from_slice(line.as_bytes());
1770 raw_bytes.extend_from_slice(b"\r\n");
1771 }
1772
1773 let line = line.trim_end_matches('\n').trim_end_matches('\r');
1774
1775 if line.len() < 4 {
1776 full_message.push_str(line);
1778 full_message.push('\n');
1779 continue;
1780 }
1781
1782 let code_str = &line[..3];
1783 let separator = line.as_bytes().get(3).copied();
1784
1785 if let Ok(code) = code_str.parse::<u16>() {
1786 match separator {
1787 Some(b' ') => {
1788 let msg = &line[4..];
1790 full_message.push_str(msg);
1791 final_code = Some(code);
1792 break;
1793 }
1794 Some(b'-') => {
1795 let msg = &line[4..];
1797 full_message.push_str(msg);
1798 full_message.push('\n');
1799 if final_code.is_none() {
1800 final_code = Some(code);
1801 }
1802 }
1803 _ => {
1804 full_message.push_str(line);
1806 full_message.push('\n');
1807 }
1808 }
1809 } else {
1810 full_message.push_str(line);
1812 full_message.push('\n');
1813 }
1814 }
1815
1816 let code =
1817 final_code.ok_or_else(|| Error::Http("FTP response has no status code".to_string()))?;
1818
1819 Ok(FtpResponse { code, message: full_message, raw_bytes })
1820}
1821
1822pub async fn send_command<S: AsyncWrite + Unpin>(
1828 stream: &mut S,
1829 command: &str,
1830) -> Result<(), Error> {
1831 let cmd = format!("{command}\r\n");
1832 stream
1833 .write_all(cmd.as_bytes())
1834 .await
1835 .map_err(|e| Error::Http(format!("FTP write error: {e}")))?;
1836 stream.flush().await.map_err(|e| Error::Http(format!("FTP flush error: {e}")))?;
1837 Ok(())
1838}
1839
1840pub fn parse_pasv_response(message: &str) -> Result<(String, u16), Error> {
1848 let start = message.find('(').ok_or_else(|| Error::Transfer {
1850 code: 14,
1851 message: "PASV response missing address".to_string(),
1852 })?;
1853 let end = message.find(')').ok_or_else(|| Error::Transfer {
1854 code: 14,
1855 message: "PASV response missing closing paren".to_string(),
1856 })?;
1857
1858 let nums: Vec<u16> =
1859 message[start + 1..end].split(',').filter_map(|s| s.trim().parse().ok()).collect();
1860
1861 if nums.len() != 6 {
1862 return Err(Error::Transfer {
1863 code: 14,
1864 message: format!("PASV response has {} numbers, expected 6", nums.len()),
1865 });
1866 }
1867
1868 if nums[0] > 255 || nums[1] > 255 || nums[2] > 255 || nums[3] > 255 {
1870 return Err(Error::Transfer {
1871 code: 14,
1872 message: format!(
1873 "PASV response has invalid IP: {}.{}.{}.{}",
1874 nums[0], nums[1], nums[2], nums[3]
1875 ),
1876 });
1877 }
1878
1879 if nums[4] > 255 || nums[5] > 255 {
1881 return Err(Error::Transfer {
1882 code: 14,
1883 message: format!("PASV response has invalid port values: {},{}", nums[4], nums[5]),
1884 });
1885 }
1886
1887 let host = format!("{}.{}.{}.{}", nums[0], nums[1], nums[2], nums[3]);
1888 let port = nums[4] * 256 + nums[5];
1889
1890 Ok((host, port))
1891}
1892
1893pub fn parse_epsv_response(message: &str) -> Result<u16, Error> {
1901 let start = message.find("|||").ok_or_else(|| Error::Transfer {
1903 code: 13,
1904 message: "EPSV response missing port delimiter".to_string(),
1905 })?;
1906 let rest = &message[start + 3..];
1907 let end = rest.find('|').ok_or_else(|| Error::Transfer {
1908 code: 13,
1909 message: "EPSV response missing closing delimiter".to_string(),
1910 })?;
1911
1912 let port_num: u32 = rest[..end].parse().map_err(|e| Error::Transfer {
1914 code: 13,
1915 message: format!("EPSV port parse error: {e}"),
1916 })?;
1917
1918 if port_num == 0 || port_num > 65535 {
1919 return Err(Error::Transfer {
1920 code: 13,
1921 message: format!("EPSV port out of range: {port_num}"),
1922 });
1923 }
1924
1925 #[allow(clippy::cast_possible_truncation)]
1926 Ok(port_num as u16)
1927}
1928
1929#[must_use]
1935pub fn parse_feat_response(message: &str) -> FtpFeatures {
1936 let mut features = FtpFeatures::default();
1937 for line in message.lines() {
1938 let feature = line.trim().to_uppercase();
1939 if feature.starts_with("EPSV") {
1940 features.epsv = true;
1941 } else if feature.starts_with("MLST") {
1942 features.mlst = true;
1943 } else if feature.starts_with("REST") && feature.contains("STREAM") {
1944 features.rest_stream = true;
1945 } else if feature.starts_with("SIZE") {
1946 features.size = true;
1947 } else if feature.starts_with("UTF8") {
1948 features.utf8 = true;
1949 } else if feature.starts_with("AUTH") && feature.contains("TLS") {
1950 features.auth_tls = true;
1951 }
1952 if !feature.is_empty() {
1953 features.raw.push(line.trim().to_string());
1954 }
1955 }
1956 features
1957}
1958
1959#[must_use]
1964pub fn format_port_command(addr: &SocketAddr) -> String {
1965 match addr.ip() {
1966 std::net::IpAddr::V4(ip) => {
1967 let octets = ip.octets();
1968 let port = addr.port();
1969 format!(
1970 "PORT {},{},{},{},{},{}",
1971 octets[0],
1972 octets[1],
1973 octets[2],
1974 octets[3],
1975 port / 256,
1976 port % 256
1977 )
1978 }
1979 std::net::IpAddr::V6(_) => {
1980 format_eprt_command(addr)
1982 }
1983 }
1984}
1985
1986#[must_use]
1990pub fn format_eprt_command(addr: &SocketAddr) -> String {
1991 let (proto, ip_str) = match addr.ip() {
1992 std::net::IpAddr::V4(ip) => (1, ip.to_string()),
1993 std::net::IpAddr::V6(ip) => (2, ip.to_string()),
1994 };
1995 format!("EPRT |{proto}|{ip_str}|{}|", addr.port())
1996}
1997
1998#[allow(clippy::too_many_arguments)]
2003async fn connect_session(
2004 host: &str,
2005 port: u16,
2006 user: &str,
2007 pass: &str,
2008 ssl_mode: FtpSslMode,
2009 use_ssl: UseSsl,
2010 tls_config: &crate::tls::TlsConfig,
2011 config: FtpConfig,
2012 proxy: Option<FtpProxyConfig>,
2013) -> Result<FtpSession, Error> {
2014 match ssl_mode {
2015 FtpSslMode::None => {
2016 FtpSession::connect_maybe_proxy(host, port, user, pass, config, proxy).await
2017 }
2018 #[cfg(feature = "rustls")]
2019 _ => {
2020 if proxy.is_some() {
2022 return Err(Error::Http("FTPS through proxy is not yet supported".to_string()));
2023 }
2024 FtpSession::connect_with_tls(
2025 host, port, user, pass, ssl_mode, use_ssl, tls_config, config,
2026 )
2027 .await
2028 }
2029 #[cfg(not(feature = "rustls"))]
2030 _ => {
2031 let _ = (tls_config, config, use_ssl, proxy);
2032 Err(Error::Http("FTPS requires the 'rustls' feature".to_string()))
2033 }
2034 }
2035}
2036
2037#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
2069pub async fn perform(
2070 url: &crate::url::Url,
2071 upload_data: Option<&[u8]>,
2072 ssl_mode: FtpSslMode,
2073 use_ssl: UseSsl,
2074 tls_config: &crate::tls::TlsConfig,
2075 resume_from: Option<u64>,
2076 config: &FtpConfig,
2077 credentials: Option<(&str, &str)>,
2078 ftp_session: &mut Option<FtpSession>,
2079 proxy: Option<FtpProxyConfig>,
2080) -> Result<Response, Error> {
2081 let range_end = config.range_end;
2082 let (host, port) = url.host_and_port()?;
2083 let raw_path = url.path();
2084
2085 let decoded_path = percent_decode(raw_path);
2087 let path = decoded_path.as_str();
2088
2089 let url_creds = url.credentials();
2092 let decoded_user;
2093 let decoded_pass;
2094 #[allow(clippy::option_if_let_else)]
2095 let (user, pass) = if let Some(creds) = credentials {
2096 creds
2097 } else if let Some((raw_user, raw_pass)) = url_creds {
2098 decoded_user = percent_decode(raw_user);
2099 decoded_pass = percent_decode(raw_pass);
2100 (decoded_user.as_str(), decoded_pass.as_str())
2101 } else {
2102 ("anonymous", "ftp@example.com")
2103 };
2104
2105 let is_dir_list = path.ends_with('/') && upload_data.is_none();
2107
2108 let (effective_path, type_override) = parse_ftp_type(path);
2110
2111 let is_reuse = if let Some(existing) = ftp_session.take() {
2113 if existing.can_reuse(&host, port, user) {
2114 *ftp_session = Some(existing);
2115 true
2116 } else {
2117 let mut old = existing;
2119 let _ = old.quit().await;
2120 drop(old);
2121 false
2122 }
2123 } else {
2124 false
2125 };
2126
2127 if !is_reuse {
2128 let new_session = connect_session(
2129 &host,
2130 port,
2131 user,
2132 pass,
2133 ssl_mode,
2134 use_ssl,
2135 tls_config,
2136 config.clone(),
2137 proxy,
2138 )
2139 .await?;
2140 *ftp_session = Some(new_session);
2141 }
2142
2143 let Some(session) = ftp_session.as_mut() else {
2145 return Err(Error::Http("internal: FTP session missing".to_string()));
2146 };
2147 let result = perform_inner(
2148 session,
2149 url,
2150 upload_data,
2151 resume_from,
2152 config,
2153 is_reuse,
2154 effective_path,
2155 type_override,
2156 is_dir_list,
2157 range_end,
2158 )
2159 .await;
2160
2161 if let Err(ref e) = result {
2163 if is_connection_error(e) {
2164 let _ = ftp_session.take();
2165 }
2166 }
2167 if matches!(&result, Err(Error::UrlParse(_))) {
2169 let _ = ftp_session.take();
2170 }
2171 if let Err(Error::Transfer { code, .. }) = &result {
2175 if matches!(code, 14 | 28 | 84) {
2176 let _ = ftp_session.take();
2177 }
2178 }
2179 if let Ok(ref resp) = result {
2183 if resp.body_error().is_some() {
2184 let _ = ftp_session.take();
2185 }
2186 }
2187
2188 result
2189}
2190
2191const fn is_connection_error(e: &Error) -> bool {
2193 matches!(e, Error::Connect(_) | Error::Io(_))
2194}
2195
2196async fn send_type_if_needed(
2200 session: &mut FtpSession,
2201 transfer_type: TransferType,
2202) -> Result<(), Error> {
2203 if session.current_type == Some(transfer_type) {
2204 return Ok(());
2205 }
2206 let cmd = match transfer_type {
2207 TransferType::Ascii => "TYPE A",
2208 TransferType::Binary => "TYPE I",
2209 };
2210 send_command(&mut session.writer, cmd).await?;
2211 let resp = session.read_and_record().await?;
2212 if !resp.is_complete() {
2213 return Err(Error::Transfer {
2214 code: 17,
2215 message: format!("FTP TYPE failed: {} {}", resp.code, resp.message),
2216 });
2217 }
2218 session.current_type = Some(transfer_type);
2219 Ok(())
2220}
2221
2222async fn execute_quote_commands(
2227 session: &mut FtpSession,
2228 commands: &[String],
2229) -> Result<(), Error> {
2230 for raw_cmd in commands {
2231 #[allow(clippy::option_if_let_else)]
2233 let (ignore_fail, actual_cmd) = if let Some(stripped) = raw_cmd.strip_prefix('*') {
2234 (true, stripped)
2235 } else {
2236 (false, raw_cmd.as_str())
2237 };
2238 send_command(&mut session.writer, actual_cmd).await?;
2239 let resp = session.read_and_record().await?;
2240 if !ignore_fail && !resp.is_complete() && !resp.is_preliminary() {
2241 return Err(Error::Transfer {
2242 code: 21,
2243 message: format!(
2244 "FTP quote command '{}' failed: {} {}",
2245 actual_cmd, resp.code, resp.message
2246 ),
2247 });
2248 }
2249 }
2250 Ok(())
2251}
2252
2253#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
2258async fn perform_inner(
2259 session: &mut FtpSession,
2260 url: &crate::url::Url,
2261 upload_data: Option<&[u8]>,
2262 resume_from: Option<u64>,
2263 config: &FtpConfig,
2264 is_reuse: bool,
2265 effective_path: &str,
2266 type_override: Option<TransferType>,
2267 is_dir_list: bool,
2268 range_end: Option<u64>,
2269) -> Result<Response, Error> {
2270 if !is_reuse {
2271 let pwd_path = session.pwd_safe().await;
2273 session.home_dir.clone_from(&pwd_path);
2274
2275 let pwd_not_slash = pwd_path.as_ref().is_some_and(|p| !p.starts_with('/'));
2279 if pwd_not_slash {
2280 send_command(&mut session.writer, "SYST").await?;
2281 let syst_resp = session.read_and_record().await?;
2282 if syst_resp.is_complete() && syst_resp.message.contains("OS/400") {
2283 send_command(&mut session.writer, "SITE NAMEFMT 1").await?;
2285 let _site_resp = session.read_and_record().await?;
2286 let _pwd2 = session.pwd_safe().await;
2288 }
2289 }
2290 }
2291
2292 if effective_path.contains('\0') {
2294 return Err(Error::UrlParse("FTP path contains null byte".to_string()));
2295 }
2296
2297 let (dir_components, filename) = if is_dir_list {
2299 if config.method == FtpMethod::NoCwd {
2300 (Vec::new(), String::new())
2302 } else {
2303 let trimmed = effective_path.trim_start_matches('/');
2305 let trimmed = trimmed.trim_end_matches('/');
2306 if trimmed.is_empty() {
2307 if effective_path.starts_with("//") {
2311 (vec!["/"], String::new())
2312 } else {
2313 (Vec::new(), String::new())
2314 }
2315 } else {
2316 let components: Vec<&str> = trimmed.split('/').collect();
2317 (components, String::new())
2318 }
2319 }
2320 } else {
2321 split_path_for_method(effective_path, config.method)
2323 };
2324
2325 execute_quote_commands(session, &config.pre_quote).await?;
2327
2328 let target_dir: Vec<String> =
2332 dir_components.iter().filter(|c| !c.is_empty()).map(ToString::to_string).collect();
2333
2334 let need_cwd = if is_reuse {
2335 target_dir != session.current_dir
2336 } else {
2337 !target_dir.is_empty()
2339 };
2340
2341 if need_cwd {
2342 let did_reset_to_root = if is_reuse && !session.current_dir.is_empty() {
2345 let reset_dir = session.home_dir.clone().unwrap_or_else(|| "/".to_string());
2346 send_command(&mut session.writer, &format!("CWD {reset_dir}")).await?;
2347 let cwd_resp = session.read_and_record().await?;
2348 if !cwd_resp.is_complete() {
2349 return Err(Error::Transfer {
2350 code: 9,
2351 message: format!(
2352 "FTP CWD {reset_dir} failed: {} {}",
2353 cwd_resp.code, cwd_resp.message
2354 ),
2355 });
2356 }
2357 session.current_dir.clear();
2358 true
2359 } else {
2360 false
2361 };
2362
2363 for component in &dir_components {
2365 if component.is_empty() {
2366 continue;
2367 }
2368 if *component == "/" && did_reset_to_root {
2370 continue;
2371 }
2372 send_command(&mut session.writer, &format!("CWD {component}")).await?;
2373 let cwd_resp = session.read_and_record().await?;
2374 if !cwd_resp.is_complete() {
2375 if config.create_dirs {
2376 send_command(&mut session.writer, &format!("MKD {component}")).await?;
2378 let _mkd_resp = session.read_and_record().await?;
2379 send_command(&mut session.writer, &format!("CWD {component}")).await?;
2381 let retry_resp = session.read_and_record().await?;
2382 if !retry_resp.is_complete() {
2383 return Err(Error::Transfer {
2384 code: 9,
2385 message: format!(
2386 "FTP CWD failed after MKD: {} {}",
2387 retry_resp.code, retry_resp.message
2388 ),
2389 });
2390 }
2391 } else if cwd_resp.code == 421 {
2392 return Err(Error::Transfer {
2395 code: 28,
2396 message: format!(
2397 "FTP server timeout: {} {}",
2398 cwd_resp.code, cwd_resp.message
2399 ),
2400 });
2401 } else {
2402 return Err(Error::Transfer {
2403 code: 9,
2404 message: format!("FTP CWD failed: {} {}", cwd_resp.code, cwd_resp.message),
2405 });
2406 }
2407 }
2408 }
2409 session.current_dir = target_dir;
2410 }
2411
2412 if config.nobody {
2415 if is_dir_list {
2416 let raw = std::mem::take(&mut session.header_bytes);
2417 let headers = std::collections::HashMap::new();
2418 let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2419 resp.set_raw_headers(raw);
2420 return Ok(resp);
2421 }
2422 let mut last_modified: Option<String> = None;
2423 let mut content_length: Option<String> = None;
2424
2425 if !filename.is_empty() {
2427 send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
2428 let mdtm_resp = session.read_and_record().await?;
2429 if mdtm_resp.is_complete() {
2430 let mdtm_str = mdtm_resp.message.trim();
2431 if let Some(date) = format_mdtm_as_http_date(mdtm_str) {
2432 last_modified = Some(date);
2433 }
2434 }
2435 }
2436
2437 send_type_if_needed(session, TransferType::Binary).await?;
2439
2440 if !filename.is_empty() {
2442 send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
2443 let size_resp = session.read_and_record().await?;
2444 if size_resp.is_complete() {
2445 content_length = Some(size_resp.message.trim().to_string());
2446 }
2447 }
2448
2449 send_command(&mut session.writer, "REST 0").await?;
2451 let _rest_resp = session.read_and_record().await?;
2452
2453 let raw = std::mem::take(&mut session.header_bytes);
2454
2455 let mut body_text = String::new();
2457 if let Some(ref lm) = last_modified {
2458 body_text.push_str("Last-Modified: ");
2459 body_text.push_str(lm);
2460 body_text.push_str("\r\n");
2461 }
2462 if let Some(ref cl) = content_length {
2463 body_text.push_str("Content-Length: ");
2464 body_text.push_str(cl);
2465 body_text.push_str("\r\n");
2466 }
2467 body_text.push_str("Accept-ranges: bytes\r\n");
2468
2469 let mut headers = std::collections::HashMap::new();
2470 if let Some(ref cl) = content_length {
2471 let _old = headers.insert("content-length".to_string(), cl.clone());
2472 }
2473 if let Some(ref lm) = last_modified {
2474 let _old = headers.insert("last-modified".to_string(), lm.clone());
2475 }
2476 let mut resp =
2477 Response::new(200, headers, body_text.into_bytes(), url.as_str().to_string());
2478 resp.set_raw_headers(raw);
2479 return Ok(resp);
2480 }
2481
2482 if let Some(upload_bytes) = upload_data {
2484 if let Some((cond_ts, negate)) = config.time_condition {
2486 send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
2487 let mdtm_resp = session.read_and_record().await?;
2488 if mdtm_resp.is_complete() {
2489 let mdtm_str = mdtm_resp.message.trim();
2490 if let Some(file_ts) = parse_mdtm_timestamp(mdtm_str) {
2491 let should_skip = if negate { file_ts >= cond_ts } else { file_ts <= cond_ts };
2492 if should_skip {
2493 let raw = std::mem::take(&mut session.header_bytes);
2494 let headers = std::collections::HashMap::new();
2495 let mut resp =
2496 Response::new(200, headers, Vec::new(), url.as_str().to_string());
2497 resp.set_raw_headers(raw);
2498 return Ok(resp);
2499 }
2500 }
2501 }
2502 }
2503 let is_auto_resume = resume_from == Some(0);
2508 let explicit_offset = resume_from.filter(|&o| o > 0);
2509
2510 let (mut effective_upload_data, mut use_appe) = if let Some(offset) = explicit_offset {
2513 #[allow(clippy::cast_possible_truncation)]
2514 let offset_usize = offset as usize;
2515 if offset_usize >= upload_bytes.len() {
2516 let _ = session.open_data_connection().await;
2519 send_type_if_needed(session, TransferType::Binary).await?;
2520 let raw = std::mem::take(&mut session.header_bytes);
2521 let headers = std::collections::HashMap::new();
2522 let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2523 resp.set_raw_headers(raw);
2524 return Ok(resp);
2525 }
2526 (&upload_bytes[offset_usize..], true)
2527 } else {
2528 (upload_bytes, config.append)
2529 };
2530
2531 let pret_cmd =
2533 if config.append { format!("APPE {filename}") } else { format!("STOR {filename}") };
2534 let data_conn_result = session.open_data_connection_with_pret(&pret_cmd).await;
2535 let data_conn = match data_conn_result {
2536 Ok(s) => s,
2537 Err(e) => {
2538 return Err(e);
2539 }
2540 };
2541 let mut data_stream = data_conn.into_stream(session, None).await?;
2542
2543 let upload_type = match type_override {
2545 Some(TransferType::Ascii) => TransferType::Ascii,
2546 _ => TransferType::Binary,
2547 };
2548 send_type_if_needed(session, upload_type).await?;
2549
2550 if is_auto_resume {
2553 send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
2554 let size_resp = session.read_and_record().await?;
2555 if size_resp.is_complete() {
2556 if let Ok(remote_size) = size_resp.message.trim().parse::<u64>() {
2557 if remote_size > 0 {
2558 #[allow(clippy::cast_possible_truncation)]
2559 let skip = remote_size as usize;
2560 if skip >= upload_bytes.len() {
2561 drop(data_stream);
2563 let raw = std::mem::take(&mut session.header_bytes);
2564 let headers = std::collections::HashMap::new();
2565 let mut resp =
2566 Response::new(200, headers, Vec::new(), url.as_str().to_string());
2567 resp.set_raw_headers(raw);
2568 return Ok(resp);
2569 }
2570 effective_upload_data = &upload_bytes[skip..];
2571 use_appe = true;
2572 }
2573 }
2575 }
2576 }
2578
2579 let stor_cmd =
2581 if use_appe { format!("APPE {filename}") } else { format!("STOR {filename}") };
2582 send_command(&mut session.writer, &stor_cmd).await?;
2583 let stor_resp = session.read_and_record().await?;
2584 if !stor_resp.is_preliminary() && !stor_resp.is_complete() {
2585 return Err(Error::Transfer {
2586 code: 25,
2587 message: format!("FTP STOR/APPE failed: {} {}", stor_resp.code, stor_resp.message),
2588 });
2589 }
2590
2591 let ascii_upload = config.crlf || type_override == Some(TransferType::Ascii);
2593 if ascii_upload {
2594 let converted = lf_to_crlf(effective_upload_data);
2595 data_stream
2596 .write_all(&converted)
2597 .await
2598 .map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
2599 } else {
2600 data_stream
2601 .write_all(effective_upload_data)
2602 .await
2603 .map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
2604 }
2605 data_stream
2606 .shutdown()
2607 .await
2608 .map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
2609 drop(data_stream);
2610
2611 let complete_resp = session.read_and_record().await?;
2612 if !complete_resp.is_complete() {
2613 let code = if complete_resp.code == 452 || complete_resp.code == 552 { 70 } else { 25 };
2615 return Err(Error::Transfer {
2616 code,
2617 message: format!(
2618 "FTP upload failed: {} {}",
2619 complete_resp.code, complete_resp.message
2620 ),
2621 });
2622 }
2623
2624 execute_quote_commands(session, &config.post_quote).await?;
2626
2627 let raw = std::mem::take(&mut session.header_bytes);
2628 let headers = std::collections::HashMap::new();
2629 let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2630 resp.set_raw_headers(raw);
2631 return Ok(resp);
2632 }
2633
2634 if is_dir_list {
2636 let list_base = if config.list_only { "NLST" } else { "LIST" };
2638 let data_conn_result = session.open_data_connection_with_pret(list_base).await;
2639 let data_conn = match data_conn_result {
2640 Ok(s) => s,
2641 Err(e) => {
2642 return Err(e);
2643 }
2644 };
2645 let mut data_stream = data_conn.into_stream(session, None).await?;
2646
2647 send_type_if_needed(session, TransferType::Ascii).await?;
2649
2650 execute_quote_commands(session, &config.post_pasv_quote).await?;
2652
2653 let list_base = if config.list_only { "NLST" } else { "LIST" };
2655 let list_cmd = if config.method == FtpMethod::NoCwd {
2656 let path = effective_path.trim_end_matches('/');
2657 let path =
2661 if path.starts_with("//") { &path[1..] } else { path.trim_start_matches('/') };
2662 if path.is_empty() {
2663 format!("{list_base} /")
2664 } else {
2665 format!("{list_base} {path}")
2666 }
2667 } else {
2668 list_base.to_string()
2669 };
2670 send_command(&mut session.writer, &list_cmd).await?;
2671 let list_resp = session.read_and_record().await?;
2672 if list_resp.is_negative_transient() {
2675 drop(data_stream);
2676 let raw = std::mem::take(&mut session.header_bytes);
2677 let headers = std::collections::HashMap::new();
2678 let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2679 resp.set_raw_headers(raw);
2680 return Ok(resp);
2681 }
2682 if !list_resp.is_preliminary() && !list_resp.is_complete() {
2683 return Err(Error::Transfer {
2684 code: 19,
2685 message: format!("FTP LIST failed: {} {}", list_resp.code, list_resp.message),
2686 });
2687 }
2688
2689 let mut data = Vec::new();
2690 let _ = data_stream
2691 .read_to_end(&mut data)
2692 .await
2693 .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
2694 drop(data_stream);
2695
2696 if list_resp.is_preliminary() {
2698 let complete_resp = session.read_and_record().await?;
2699 if !complete_resp.is_complete() {
2700 return Err(Error::Http(format!(
2701 "FTP transfer failed: {} {}",
2702 complete_resp.code, complete_resp.message
2703 )));
2704 }
2705 }
2706
2707 execute_quote_commands(session, &config.post_quote).await?;
2709
2710 let raw = std::mem::take(&mut session.header_bytes);
2711 let mut headers = std::collections::HashMap::new();
2712 let _old = headers.insert("content-length".to_string(), data.len().to_string());
2713 let mut resp = Response::new(200, headers, data, url.as_str().to_string());
2714 resp.set_raw_headers(raw);
2715 return Ok(resp);
2716 }
2717
2718 let transfer_type = type_override.unwrap_or(if config.use_ascii {
2721 TransferType::Ascii
2722 } else {
2723 TransferType::Binary
2724 });
2725 let use_ascii = transfer_type == TransferType::Ascii;
2726
2727 if let Some((cond_ts, negate)) = config.time_condition {
2729 send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
2730 let mdtm_resp = session.read_and_record().await?;
2731 if mdtm_resp.is_complete() {
2732 let mdtm_str = mdtm_resp.message.trim();
2734 if let Some(file_ts) = parse_mdtm_timestamp(mdtm_str) {
2735 let should_skip = if negate {
2736 file_ts >= cond_ts
2738 } else {
2739 file_ts <= cond_ts
2741 };
2742 if should_skip {
2743 let raw = std::mem::take(&mut session.header_bytes);
2744 let headers = std::collections::HashMap::new();
2745 let mut resp =
2746 Response::new(200, headers, Vec::new(), url.as_str().to_string());
2747 resp.set_raw_headers(raw);
2748 return Ok(resp);
2749 }
2750 }
2751 }
2752 }
2753
2754 let data_conn_result =
2760 session.open_data_connection_with_pret(&format!("RETR {filename}")).await;
2761 let data_conn = match data_conn_result {
2762 Ok(s) => s,
2763 Err(e) => {
2764 return Err(e);
2765 }
2766 };
2767
2768 send_type_if_needed(session, transfer_type).await?;
2770
2771 execute_quote_commands(session, &config.post_pasv_quote).await?;
2773
2774 let mut remote_size: Option<u64> = None;
2777 if !use_ascii && !config.ignore_content_length {
2778 send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
2779 let size_resp = session.read_and_record().await?;
2780 if size_resp.is_complete() {
2781 if let Ok(sz) = size_resp.message.trim().parse::<u64>() {
2782 remote_size = Some(sz);
2783 }
2784 }
2785 }
2787
2788 let mut resume_from = resume_from;
2790 let mut range_end = range_end;
2791 if let Some(from_end) = config.range_from_end {
2792 if let Some(sz) = remote_size {
2793 let offset = sz.saturating_sub(from_end);
2794 resume_from = Some(offset);
2795 range_end = Some(sz.saturating_sub(1));
2797 }
2798 }
2799
2800 if let (Some(max_size), Some(sz)) = (config.max_filesize, remote_size) {
2802 if sz > max_size {
2803 drop(data_conn);
2804 return Err(Error::Transfer {
2805 code: 63,
2806 message: format!("Maximum file size exceeded ({sz} > {max_size})"),
2807 });
2808 }
2809 }
2810
2811 if let Some(offset) = resume_from {
2813 if let Some(sz) = remote_size {
2814 if offset > sz {
2815 drop(data_conn);
2816 return Err(Error::Transfer {
2817 code: 36,
2818 message: format!("Offset ({offset}) was beyond the end of the file ({sz})"),
2819 });
2820 }
2821 if offset == sz {
2822 drop(data_conn);
2824 let raw = std::mem::take(&mut session.header_bytes);
2825 let headers = std::collections::HashMap::new();
2826 let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2827 resp.set_raw_headers(raw);
2828 return Ok(resp);
2829 }
2830 }
2831
2832 send_command(&mut session.writer, &format!("REST {offset}")).await?;
2834 let rest_resp = session.read_and_record().await?;
2835 if !rest_resp.is_intermediate() {
2836 drop(data_conn);
2837 return Err(Error::Transfer {
2838 code: 36,
2839 message: format!("FTP REST failed: {} {}", rest_resp.code, rest_resp.message),
2840 });
2841 }
2842 }
2843
2844 send_command(&mut session.writer, &format!("RETR {filename}")).await?;
2846 let retr_resp = session.read_and_record().await?;
2847 if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
2848 drop(data_conn);
2849 let code = if retr_resp.code == 425 || retr_resp.code == 421 {
2853 10
2854 } else if retr_resp.code == 550 {
2855 78
2856 } else {
2857 19
2858 };
2859 return Err(Error::Transfer {
2860 code,
2861 message: format!("FTP RETR failed: {} {}", retr_resp.code, retr_resp.message),
2862 });
2863 }
2864
2865 let mut data_stream = match data_conn {
2869 DataConnection::Connected(stream) => stream,
2870 DataConnection::PendingActive { listener, use_tls } => {
2871 let accept_fut = listener.accept();
2872 tokio::select! {
2874 accept_result = accept_fut => {
2875 let (tcp, _) = accept_result
2876 .map_err(|e| Error::Http(format!("FTP active mode accept failed: {e}")))?;
2877 if use_tls {
2878 session.maybe_wrap_data_tls(tcp).await?
2879 } else {
2880 FtpStream::Plain(tcp)
2881 }
2882 }
2883 ctrl_result = read_response(&mut session.reader) => {
2884 let ctrl_resp = ctrl_result?;
2886 session.header_bytes.extend_from_slice(&ctrl_resp.raw_bytes);
2887 let code = if ctrl_resp.code == 425 || ctrl_resp.code == 421 {
2888 10
2889 } else {
2890 19
2891 };
2892 return Err(Error::Transfer {
2893 code,
2894 message: format!("FTP RETR failed: {} {}", ctrl_resp.code, ctrl_resp.message),
2895 });
2896 }
2897 }
2898 }
2899 };
2900
2901 let mut data = Vec::new();
2902
2903 let start_offset = resume_from.unwrap_or(0);
2905 if let Some(end) = range_end {
2906 #[allow(clippy::cast_possible_truncation)]
2907 let max_bytes = (end - start_offset + 1) as usize;
2908 let mut limited = data_stream.take(max_bytes as u64);
2909 let _ = limited
2910 .read_to_end(&mut data)
2911 .await
2912 .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
2913 drop(limited);
2914 send_command(&mut session.writer, "ABOR").await?;
2916 let _ = session.read_and_record().await;
2918 } else {
2919 let _ = data_stream
2920 .read_to_end(&mut data)
2921 .await
2922 .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
2923 drop(data_stream);
2924 }
2925
2926 if range_end.is_none() {
2930 if let Some(expected) = remote_size {
2931 let actual = data.len() as u64 + resume_from.unwrap_or(0);
2932 if actual < expected {
2933 let mut headers = std::collections::HashMap::new();
2934 let _old = headers.insert("content-length".to_string(), data.len().to_string());
2935 let mut resp = Response::new(200, headers, data, url.as_str().to_string());
2936 resp.set_raw_headers(std::mem::take(&mut session.header_bytes));
2937 resp.set_body_error(Some("partial".to_string()));
2938 return Ok(resp);
2939 }
2940 }
2941 }
2942
2943 if retr_resp.is_preliminary() && range_end.is_none() {
2945 let complete_resp = session.read_and_record().await?;
2946 if !complete_resp.is_complete() {
2947 return Err(Error::Http(format!(
2948 "FTP transfer failed: {} {}",
2949 complete_resp.code, complete_resp.message
2950 )));
2951 }
2952 }
2953
2954 execute_quote_commands(session, &config.post_quote).await?;
2956
2957 let raw = std::mem::take(&mut session.header_bytes);
2958
2959 let mut headers = std::collections::HashMap::new();
2960 let _old = headers.insert("content-length".to_string(), data.len().to_string());
2961
2962 let mut resp = Response::new(200, headers, data, url.as_str().to_string());
2963 resp.set_raw_headers(raw);
2964 Ok(resp)
2965}
2966
2967fn percent_decode(s: &str) -> String {
2969 let mut result = String::with_capacity(s.len());
2970 let mut chars = s.bytes();
2971 while let Some(b) = chars.next() {
2972 if b == b'%' {
2973 let hi = chars.next();
2974 let lo = chars.next();
2975 if let (Some(h), Some(l)) = (hi, lo) {
2976 let hex = [h, l];
2977 if let Ok(s) = std::str::from_utf8(&hex) {
2978 if let Ok(val) = u8::from_str_radix(s, 16) {
2979 result.push(val as char);
2980 continue;
2981 }
2982 }
2983 result.push('%');
2985 result.push(h as char);
2986 result.push(l as char);
2987 } else {
2988 result.push('%');
2989 }
2990 } else {
2991 result.push(b as char);
2992 }
2993 }
2994 result
2995}
2996
2997fn parse_mdtm_timestamp(s: &str) -> Option<i64> {
2999 if s.len() < 14 {
3000 return None;
3001 }
3002 let year: i64 = s[0..4].parse().ok()?;
3003 let month: i64 = s[4..6].parse().ok()?;
3004 let day: i64 = s[6..8].parse().ok()?;
3005 let hour: i64 = s[8..10].parse().ok()?;
3006 let min: i64 = s[10..12].parse().ok()?;
3007 let sec: i64 = s[12..14].parse().ok()?;
3008
3009 let days = days_from_date(year, month, day)?;
3012 Some(days * 86400 + hour * 3600 + min * 60 + sec)
3013}
3014
3015fn days_from_date(year: i64, month: i64, day: i64) -> Option<i64> {
3017 if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
3018 return None;
3019 }
3020 let month_days: [i64; 12] = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334];
3022 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
3023 let m = (month - 1) as usize;
3024
3025 let y = year - 1970;
3026 let leap_years = if year > 1970 {
3027 ((year - 1) / 4 - (year - 1) / 100 + (year - 1) / 400)
3028 - (1969 / 4 - 1969 / 100 + 1969 / 400)
3029 } else {
3030 0
3031 };
3032 let mut days = y * 365 + leap_years + month_days[m] + day - 1;
3033
3034 if month > 2 && (year % 4 == 0 && (year % 100 != 0 || year % 400 == 0)) {
3036 days += 1;
3037 }
3038
3039 Some(days)
3040}
3041
3042fn format_mdtm_as_http_date(s: &str) -> Option<String> {
3044 if s.len() < 14 {
3045 return None;
3046 }
3047 let year: u32 = s[0..4].parse().ok()?;
3048 let month: u32 = s[4..6].parse().ok()?;
3049 let day: u32 = s[6..8].parse().ok()?;
3050 let hour: u32 = s[8..10].parse().ok()?;
3051 let min: u32 = s[10..12].parse().ok()?;
3052 let sec: u32 = s[12..14].parse().ok()?;
3053
3054 let month_names =
3055 ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"];
3056 #[allow(clippy::cast_sign_loss)]
3057 let month_name = month_names.get((month - 1) as usize)?;
3058
3059 let ts = parse_mdtm_timestamp(s)?;
3061 #[allow(clippy::cast_sign_loss)]
3062 let day_of_week = ((ts / 86400 + 4) % 7) as usize; let day_names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"];
3064 let dow = day_names.get(day_of_week)?;
3065
3066 Some(format!("{dow}, {day:02} {month_name} {year} {hour:02}:{min:02}:{sec:02} GMT"))
3067}
3068
3069fn parse_ftp_type(path: &str) -> (&str, Option<TransferType>) {
3073 if let Some(pos) = path.rfind(";type=") {
3074 let type_str = &path[pos + 6..];
3075 let transfer_type = match type_str {
3076 "A" | "a" => Some(TransferType::Ascii),
3077 "I" | "i" => Some(TransferType::Binary),
3078 _ => None,
3079 };
3080 if transfer_type.is_some() {
3081 return (&path[..pos], transfer_type);
3082 }
3083 }
3084 (path, None)
3085}
3086
3087fn split_path_for_method(path: &str, method: FtpMethod) -> (Vec<&str>, String) {
3091 let trimmed = path.trim_start_matches('/');
3092
3093 match method {
3094 FtpMethod::NoCwd => {
3095 let filename = if path.starts_with("//") { &path[1..] } else { trimmed };
3099 (Vec::new(), filename.to_string())
3100 }
3101 FtpMethod::SingleCwd => {
3102 if let Some((dir, file)) = trimmed.rsplit_once('/') {
3103 if dir.is_empty() {
3104 (vec!["/"], file.to_string())
3106 } else {
3107 (vec![dir], file.to_string())
3108 }
3109 } else if path.starts_with("//") {
3110 (vec!["/"], trimmed.to_string())
3112 } else {
3113 (Vec::new(), trimmed.to_string())
3114 }
3115 }
3116 FtpMethod::MultiCwd => {
3117 if let Some((dir, file)) = trimmed.rsplit_once('/') {
3118 let mut components = Vec::new();
3120 if path.starts_with("//") {
3121 components.push("/");
3123 }
3124 for component in dir.split('/') {
3125 if !component.is_empty() {
3126 components.push(component);
3127 }
3128 }
3129 (components, file.to_string())
3130 } else if path.starts_with("//") {
3131 (vec!["/"], trimmed.to_string())
3133 } else {
3134 (Vec::new(), trimmed.to_string())
3135 }
3136 }
3137 }
3138}
3139
3140fn lf_to_crlf(data: &[u8]) -> Vec<u8> {
3142 let mut result = Vec::with_capacity(data.len() + data.len() / 10);
3143 let mut prev = 0u8;
3144 for &byte in data {
3145 if byte == b'\n' && prev != b'\r' {
3146 result.push(b'\r');
3147 }
3148 result.push(byte);
3149 prev = byte;
3150 }
3151 result
3152}
3153
3154#[allow(clippy::too_many_lines)]
3160pub async fn download(
3161 url: &crate::url::Url,
3162 ssl_mode: FtpSslMode,
3163 tls_config: &crate::tls::TlsConfig,
3164 resume_from: Option<u64>,
3165 config: &FtpConfig,
3166) -> Result<Response, Error> {
3167 perform(
3168 url,
3169 None,
3170 ssl_mode,
3171 UseSsl::None,
3172 tls_config,
3173 resume_from,
3174 config,
3175 None,
3176 &mut None,
3177 None,
3178 )
3179 .await
3180}
3181
3182#[allow(clippy::too_many_lines)]
3188pub async fn list(
3189 url: &crate::url::Url,
3190 ssl_mode: FtpSslMode,
3191 tls_config: &crate::tls::TlsConfig,
3192 config: &FtpConfig,
3193) -> Result<Response, Error> {
3194 perform(url, None, ssl_mode, UseSsl::None, tls_config, None, config, None, &mut None, None)
3195 .await
3196}
3197
3198pub async fn upload(
3204 url: &crate::url::Url,
3205 data: &[u8],
3206 ssl_mode: FtpSslMode,
3207 tls_config: &crate::tls::TlsConfig,
3208 config: &FtpConfig,
3209) -> Result<Response, Error> {
3210 perform(
3211 url,
3212 Some(data),
3213 ssl_mode,
3214 UseSsl::None,
3215 tls_config,
3216 None,
3217 config,
3218 None,
3219 &mut None,
3220 None,
3221 )
3222 .await
3223}
3224
3225#[cfg(test)]
3226#[allow(clippy::unwrap_used)]
3227mod tests {
3228 use super::*;
3229
3230 #[tokio::test]
3231 async fn read_simple_response() {
3232 let data = b"220 Welcome to FTP\r\n";
3233 let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3234 let resp = read_response(&mut reader).await.unwrap();
3235 assert_eq!(resp.code, 220);
3236 assert_eq!(resp.message, "Welcome to FTP");
3237 }
3238
3239 #[tokio::test]
3240 async fn read_multiline_response() {
3241 let data = b"220-Welcome\r\n220-to the\r\n220 FTP server\r\n";
3242 let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3243 let resp = read_response(&mut reader).await.unwrap();
3244 assert_eq!(resp.code, 220);
3245 assert!(resp.message.contains("Welcome"));
3246 assert!(resp.message.contains("FTP server"));
3247 }
3248
3249 #[tokio::test]
3250 async fn read_response_connection_closed() {
3251 let data = b"";
3252 let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3253 let result = read_response(&mut reader).await;
3254 assert!(result.is_err());
3255 }
3256
3257 #[test]
3258 fn parse_pasv_simple() {
3259 let msg = "Entering Passive Mode (192,168,1,1,4,1)";
3260 let (host, port) = parse_pasv_response(msg).unwrap();
3261 assert_eq!(host, "192.168.1.1");
3262 assert_eq!(port, 1025); }
3264
3265 #[test]
3266 fn parse_pasv_high_port() {
3267 let msg = "Entering Passive Mode (127,0,0,1,200,100)";
3268 let (host, port) = parse_pasv_response(msg).unwrap();
3269 assert_eq!(host, "127.0.0.1");
3270 assert_eq!(port, 51300); }
3272
3273 #[test]
3274 fn parse_epsv_simple() {
3275 let msg = "Entering Extended Passive Mode (|||12345|)";
3276 let port = parse_epsv_response(msg).unwrap();
3277 assert_eq!(port, 12345);
3278 }
3279
3280 #[test]
3281 fn ftp_response_status_categories() {
3282 let preliminary = FtpResponse { code: 150, message: String::new(), raw_bytes: Vec::new() };
3283 assert!(preliminary.is_preliminary());
3284 assert!(!preliminary.is_complete());
3285
3286 let complete = FtpResponse { code: 226, message: String::new(), raw_bytes: Vec::new() };
3287 assert!(complete.is_complete());
3288 assert!(!complete.is_intermediate());
3289
3290 let intermediate = FtpResponse { code: 331, message: String::new(), raw_bytes: Vec::new() };
3291 assert!(intermediate.is_intermediate());
3292 assert!(!intermediate.is_complete());
3293 }
3294
3295 #[test]
3296 fn parse_feat_response_full() {
3297 let message = "Extensions supported:\n EPSV\n MLST size*;modify*;type*\n REST STREAM\n SIZE\n UTF8\n AUTH TLS";
3298 let features = parse_feat_response(message);
3299 assert!(features.epsv);
3300 assert!(features.mlst);
3301 assert!(features.rest_stream);
3302 assert!(features.size);
3303 assert!(features.utf8);
3304 assert!(features.auth_tls);
3305 }
3306
3307 #[test]
3308 fn parse_feat_response_minimal() {
3309 let message = "SIZE\nREST STREAM";
3310 let features = parse_feat_response(message);
3311 assert!(features.size);
3312 assert!(features.rest_stream);
3313 assert!(!features.epsv);
3314 assert!(!features.mlst);
3315 }
3316
3317 #[test]
3318 fn parse_feat_response_empty() {
3319 let features = parse_feat_response("");
3320 assert!(!features.epsv);
3321 assert!(!features.mlst);
3322 assert!(!features.rest_stream);
3323 assert!(!features.size);
3324 assert!(!features.utf8);
3325 assert!(!features.auth_tls);
3326 assert!(features.raw.is_empty());
3327 }
3328
3329 #[test]
3330 fn parse_feat_response_auth_tls() {
3331 let message = "AUTH TLS\nAUTH SSL";
3332 let features = parse_feat_response(message);
3333 assert!(features.auth_tls);
3334 }
3335
3336 #[test]
3337 fn transfer_type_equality() {
3338 assert_eq!(TransferType::Ascii, TransferType::Ascii);
3339 assert_eq!(TransferType::Binary, TransferType::Binary);
3340 assert_ne!(TransferType::Ascii, TransferType::Binary);
3341 }
3342
3343 #[test]
3344 fn ftp_features_default() {
3345 let features = FtpFeatures::default();
3346 assert!(!features.epsv);
3347 assert!(!features.mlst);
3348 assert!(!features.rest_stream);
3349 assert!(!features.size);
3350 assert!(!features.utf8);
3351 assert!(!features.auth_tls);
3352 assert!(features.raw.is_empty());
3353 }
3354
3355 #[test]
3356 fn ftp_ssl_mode_equality() {
3357 assert_eq!(FtpSslMode::None, FtpSslMode::None);
3358 assert_eq!(FtpSslMode::Explicit, FtpSslMode::Explicit);
3359 assert_eq!(FtpSslMode::Implicit, FtpSslMode::Implicit);
3360 assert_ne!(FtpSslMode::None, FtpSslMode::Explicit);
3361 assert_ne!(FtpSslMode::Explicit, FtpSslMode::Implicit);
3362 }
3363
3364 #[test]
3365 fn ftp_method_default() {
3366 assert_eq!(FtpMethod::default(), FtpMethod::MultiCwd);
3367 }
3368
3369 #[test]
3370 fn ftp_method_equality() {
3371 assert_eq!(FtpMethod::MultiCwd, FtpMethod::MultiCwd);
3372 assert_eq!(FtpMethod::SingleCwd, FtpMethod::SingleCwd);
3373 assert_eq!(FtpMethod::NoCwd, FtpMethod::NoCwd);
3374 assert_ne!(FtpMethod::MultiCwd, FtpMethod::SingleCwd);
3375 assert_ne!(FtpMethod::SingleCwd, FtpMethod::NoCwd);
3376 }
3377
3378 #[test]
3379 fn format_port_ipv4() {
3380 let addr: SocketAddr = "192.168.1.100:12345".parse().unwrap();
3381 let cmd = format_port_command(&addr);
3382 assert_eq!(cmd, "PORT 192,168,1,100,48,57");
3384 }
3385
3386 #[test]
3387 fn format_port_low_port() {
3388 let addr: SocketAddr = "10.0.0.1:21".parse().unwrap();
3389 let cmd = format_port_command(&addr);
3390 assert_eq!(cmd, "PORT 10,0,0,1,0,21");
3392 }
3393
3394 #[test]
3395 fn format_port_high_port() {
3396 let addr: SocketAddr = "127.0.0.1:65535".parse().unwrap();
3397 let cmd = format_port_command(&addr);
3398 assert_eq!(cmd, "PORT 127,0,0,1,255,255");
3400 }
3401
3402 #[test]
3403 fn format_eprt_ipv4() {
3404 let addr: SocketAddr = "192.168.1.100:12345".parse().unwrap();
3405 let cmd = format_eprt_command(&addr);
3406 assert_eq!(cmd, "EPRT |1|192.168.1.100|12345|");
3407 }
3408
3409 #[test]
3410 fn format_eprt_ipv6() {
3411 let addr: SocketAddr = "[::1]:54321".parse().unwrap();
3412 let cmd = format_eprt_command(&addr);
3413 assert_eq!(cmd, "EPRT |2|::1|54321|");
3414 }
3415
3416 #[test]
3417 fn format_port_roundtrip() {
3418 let addr: SocketAddr = "10.20.30.40:5000".parse().unwrap();
3420 let cmd = format_port_command(&addr);
3421 assert!(cmd.starts_with("PORT "));
3423 let nums: Vec<&str> = cmd[5..].split(',').collect();
3424 assert_eq!(nums.len(), 6);
3425 let h1: u16 = nums[0].parse().unwrap();
3426 let h2: u16 = nums[1].parse().unwrap();
3427 let h3: u16 = nums[2].parse().unwrap();
3428 let h4: u16 = nums[3].parse().unwrap();
3429 let p1: u16 = nums[4].parse().unwrap();
3430 let p2: u16 = nums[5].parse().unwrap();
3431 assert_eq!(format!("{h1}.{h2}.{h3}.{h4}"), "10.20.30.40");
3432 assert_eq!(p1 * 256 + p2, 5000);
3433 }
3434
3435 #[tokio::test]
3436 async fn send_command_format() {
3437 let mut buf = Vec::new();
3438 send_command(&mut buf, "USER test").await.unwrap();
3439 assert_eq!(buf, b"USER test\r\n");
3440 }
3441
3442 #[tokio::test]
3443 async fn send_command_feat() {
3444 let mut buf = Vec::new();
3445 send_command(&mut buf, "FEAT").await.unwrap();
3446 assert_eq!(buf, b"FEAT\r\n");
3447 }
3448
3449 #[tokio::test]
3450 async fn read_auth_tls_response() {
3451 let data = b"234 AUTH TLS OK\r\n";
3452 let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3453 let resp = read_response(&mut reader).await.unwrap();
3454 assert_eq!(resp.code, 234);
3455 assert!(resp.is_complete());
3456 }
3457
3458 #[tokio::test]
3459 async fn read_pbsz_response() {
3460 let data = b"200 PBSZ=0\r\n";
3461 let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3462 let resp = read_response(&mut reader).await.unwrap();
3463 assert_eq!(resp.code, 200);
3464 assert!(resp.is_complete());
3465 }
3466
3467 #[tokio::test]
3468 async fn read_prot_p_response() {
3469 let data = b"200 Protection set to Private\r\n";
3470 let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3471 let resp = read_response(&mut reader).await.unwrap();
3472 assert_eq!(resp.code, 200);
3473 assert!(resp.is_complete());
3474 }
3475
3476 #[cfg(feature = "rustls")]
3477 #[test]
3478 fn tls_connector_no_alpn_creates_ok() {
3479 let tls_config = crate::tls::TlsConfig::default();
3480 let connector = crate::tls::TlsConnector::new_no_alpn(&tls_config);
3481 assert!(connector.is_ok());
3482 }
3483
3484 #[test]
3485 fn ftp_config_default() {
3486 let config = FtpConfig::default();
3487 assert!(config.use_epsv);
3488 assert!(config.use_eprt);
3489 assert!(!config.skip_pasv_ip);
3490 assert!(config.account.is_none());
3491 assert!(!config.create_dirs);
3492 assert_eq!(config.method, FtpMethod::MultiCwd);
3493 assert!(config.active_port.is_none());
3494 }
3495
3496 #[test]
3497 fn ftp_config_clone() {
3498 let config = FtpConfig {
3499 use_epsv: false,
3500 use_eprt: false,
3501 skip_pasv_ip: true,
3502 account: Some("myacct".to_string()),
3503 create_dirs: true,
3504 method: FtpMethod::NoCwd,
3505 active_port: Some("-".to_string()),
3506 ..Default::default()
3507 };
3508 #[allow(clippy::redundant_clone)] let cloned = config.clone();
3510 assert!(!cloned.use_epsv);
3511 assert!(!cloned.use_eprt);
3512 assert!(cloned.skip_pasv_ip);
3513 assert_eq!(cloned.account.as_deref(), Some("myacct"));
3514 assert!(cloned.create_dirs);
3515 assert_eq!(cloned.method, FtpMethod::NoCwd);
3516 assert_eq!(cloned.active_port.as_deref(), Some("-"));
3517 }
3518}