Skip to main content

liburlx/protocol/
ftp.rs

1//! FTP protocol handler.
2//!
3//! Implements the File Transfer Protocol (RFC 959) for downloading and
4//! uploading files, directory listing, and file management.
5//! Supports explicit FTPS (AUTH TLS, RFC 4217) and implicit FTPS (port 990).
6//! Supports active mode (PORT/EPRT) and passive mode (PASV).
7
8use std::net::SocketAddr;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12use tokio::io::{
13    AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, ReadBuf,
14};
15use tokio::io::{ReadHalf, WriteHalf};
16use tokio::net::TcpStream;
17
18use crate::error::Error;
19use crate::protocol::http::response::Response;
20
21/// FTPS mode for FTP connections.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum FtpSslMode {
24    /// No TLS — plain FTP.
25    None,
26    /// Explicit FTPS: connect plain, then upgrade with AUTH TLS (RFC 4217).
27    Explicit,
28    /// Implicit FTPS: connect directly over TLS (port 990).
29    Implicit,
30}
31
32/// SSL/TLS usage level for protocols supporting STARTTLS.
33///
34/// Maps to curl's `CURLUSESSL` values: controls whether and how
35/// STARTTLS upgrades are performed on plain-text connections.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum UseSsl {
38    /// No SSL/TLS — use plain protocol.
39    #[default]
40    None,
41    /// Try STARTTLS but continue without TLS if not available (curl `--ssl`).
42    Try,
43    /// Require SSL/TLS — fail if not available (curl `--ssl-reqd`).
44    All,
45}
46
47/// A stream that can be either plain TCP or TLS-wrapped.
48///
49/// Used for both FTP control and data connections.
50#[allow(clippy::large_enum_variant)]
51pub(crate) enum FtpStream {
52    /// Plain TCP connection.
53    Plain(TcpStream),
54    /// TLS-wrapped connection.
55    #[cfg(feature = "rustls")]
56    Tls(tokio_rustls::client::TlsStream<TcpStream>),
57}
58
59impl AsyncRead for FtpStream {
60    fn poll_read(
61        self: Pin<&mut Self>,
62        cx: &mut Context<'_>,
63        buf: &mut ReadBuf<'_>,
64    ) -> Poll<std::io::Result<()>> {
65        match self.get_mut() {
66            Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
67            #[cfg(feature = "rustls")]
68            Self::Tls(s) => Pin::new(s).poll_read(cx, buf),
69        }
70    }
71}
72
73impl AsyncWrite for FtpStream {
74    fn poll_write(
75        self: Pin<&mut Self>,
76        cx: &mut Context<'_>,
77        buf: &[u8],
78    ) -> Poll<std::io::Result<usize>> {
79        match self.get_mut() {
80            Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
81            #[cfg(feature = "rustls")]
82            Self::Tls(s) => Pin::new(s).poll_write(cx, buf),
83        }
84    }
85
86    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
87        match self.get_mut() {
88            Self::Plain(s) => Pin::new(s).poll_flush(cx),
89            #[cfg(feature = "rustls")]
90            Self::Tls(s) => Pin::new(s).poll_flush(cx),
91        }
92    }
93
94    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
95        match self.get_mut() {
96            Self::Plain(s) => Pin::new(s).poll_shutdown(cx),
97            #[cfg(feature = "rustls")]
98            Self::Tls(s) => Pin::new(s).poll_shutdown(cx),
99        }
100    }
101}
102
103/// An FTP response from the server.
104#[derive(Debug, Clone)]
105pub struct FtpResponse {
106    /// The 3-digit status code.
107    pub code: u16,
108    /// The response text (may be multi-line).
109    pub message: String,
110    /// The raw wire-format bytes of the complete response (including code prefixes and CRLF).
111    pub raw_bytes: Vec<u8>,
112}
113
114impl FtpResponse {
115    /// Check if this is a positive preliminary response (1xx).
116    #[must_use]
117    pub const fn is_preliminary(&self) -> bool {
118        self.code >= 100 && self.code < 200
119    }
120
121    /// Check if this is a positive completion response (2xx).
122    #[must_use]
123    pub const fn is_complete(&self) -> bool {
124        self.code >= 200 && self.code < 300
125    }
126
127    /// Check if this is a positive intermediate response (3xx).
128    #[must_use]
129    pub const fn is_intermediate(&self) -> bool {
130        self.code >= 300 && self.code < 400
131    }
132
133    /// Check if this is a negative transient response (4xx).
134    #[must_use]
135    pub const fn is_negative_transient(&self) -> bool {
136        self.code >= 400 && self.code < 500
137    }
138
139    /// Check if this is a negative permanent response (5xx).
140    #[must_use]
141    pub const fn is_negative_permanent(&self) -> bool {
142        self.code >= 500 && self.code < 600
143    }
144}
145
146/// FTP method for traversing directories.
147///
148/// Controls how curl traverses the FTP path to reach the target file.
149/// Equivalent to `CURLOPT_FTP_FILEMETHOD`.
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
151pub enum FtpMethod {
152    /// Default multi-CWD: change directory one level at a time.
153    #[default]
154    MultiCwd,
155    /// Single CWD: use one CWD with the full path.
156    SingleCwd,
157    /// No CWD: use SIZE/RETR on the full path without changing directory.
158    NoCwd,
159}
160
161/// Transfer mode for FTP data connections.
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub enum TransferType {
164    /// ASCII text mode (TYPE A).
165    Ascii,
166    /// Binary/image mode (TYPE I).
167    Binary,
168}
169
170/// Server capabilities discovered via FEAT.
171#[derive(Debug, Clone, Default)]
172#[allow(clippy::struct_excessive_bools)]
173pub struct FtpFeatures {
174    /// Whether the server supports EPSV (Extended Passive Mode).
175    pub epsv: bool,
176    /// Whether the server supports MLST/MLSD (RFC 3659).
177    pub mlst: bool,
178    /// Whether the server supports REST STREAM (resume).
179    pub rest_stream: bool,
180    /// Whether the server supports SIZE.
181    pub size: bool,
182    /// Whether the server supports UTF8.
183    pub utf8: bool,
184    /// Whether the server supports AUTH TLS.
185    pub auth_tls: bool,
186    /// Raw feature list.
187    pub raw: Vec<String>,
188}
189
190/// Proxy configuration for routing FTP connections through a proxy.
191///
192/// When present, both FTP control and data connections are routed
193/// through the specified proxy (curl compat: tests 706, 707, 712-715).
194#[derive(Debug, Clone)]
195pub enum FtpProxyConfig {
196    /// SOCKS4/4a proxy.
197    Socks4 {
198        /// Proxy host.
199        host: String,
200        /// Proxy port.
201        port: u16,
202        /// SOCKS4 user ID.
203        user_id: String,
204        /// Use `SOCKS4a` (domain-based resolution).
205        socks4a: bool,
206    },
207    /// SOCKS5/5h proxy.
208    Socks5 {
209        /// Proxy host.
210        host: String,
211        /// Proxy port.
212        port: u16,
213        /// Optional username/password authentication.
214        auth: Option<(String, String)>,
215    },
216    /// HTTP CONNECT tunnel proxy.
217    HttpConnect {
218        /// Proxy host.
219        host: String,
220        /// Proxy port.
221        port: u16,
222        /// User-Agent string for CONNECT request.
223        user_agent: String,
224    },
225}
226
227/// Create a TCP connection to `target_host:target_port` through the given proxy.
228async fn connect_via_proxy(
229    proxy: &FtpProxyConfig,
230    target_host: &str,
231    target_port: u16,
232) -> Result<TcpStream, Error> {
233    let (proxy_host, proxy_port) = match proxy {
234        FtpProxyConfig::Socks4 { host, port, .. }
235        | FtpProxyConfig::Socks5 { host, port, .. }
236        | FtpProxyConfig::HttpConnect { host, port, .. } => (host.as_str(), *port),
237    };
238    let proxy_addr = format!("{proxy_host}:{proxy_port}");
239    let tcp = TcpStream::connect(&proxy_addr).await.map_err(Error::Connect)?;
240
241    match proxy {
242        FtpProxyConfig::Socks5 { auth, .. } => {
243            let auth_ref = auth.as_ref().map(|(u, p)| (u.as_str(), p.as_str()));
244            crate::proxy::socks::connect_socks5(tcp, target_host, target_port, auth_ref).await
245        }
246        FtpProxyConfig::Socks4 { user_id, .. } => {
247            crate::proxy::socks::connect_socks4(tcp, target_host, target_port, user_id).await
248        }
249        FtpProxyConfig::HttpConnect { user_agent, .. } => {
250            connect_http_tunnel(tcp, target_host, target_port, user_agent).await
251        }
252    }
253}
254
255/// Establish a simple HTTP CONNECT tunnel for FTP data connections.
256///
257/// Sends `CONNECT host:port HTTP/1.1` and expects a 200 response.
258/// Returns the tunneled stream on success, or an error on failure.
259async fn connect_http_tunnel(
260    mut stream: TcpStream,
261    target_host: &str,
262    target_port: u16,
263    user_agent: &str,
264) -> Result<TcpStream, Error> {
265    use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _};
266    let request = format!(
267        "CONNECT {target_host}:{target_port} HTTP/1.1\r\n\
268         Host: {target_host}:{target_port}\r\n\
269         User-Agent: {user_agent}\r\n\
270         Proxy-Connection: Keep-Alive\r\n\
271         \r\n"
272    );
273    stream.write_all(request.as_bytes()).await.map_err(Error::Io)?;
274    stream.flush().await.map_err(Error::Io)?;
275
276    // Read response status line and headers
277    let mut buf_reader = tokio::io::BufReader::new(&mut stream);
278    let mut status_line = String::new();
279    let _ = buf_reader.read_line(&mut status_line).await.map_err(Error::Io)?;
280
281    // Parse status code from "HTTP/1.x NNN ..."
282    let status_code =
283        status_line.split_whitespace().nth(1).and_then(|s| s.parse::<u16>().ok()).unwrap_or(0);
284
285    // Read remaining headers until empty line
286    loop {
287        let mut line = String::new();
288        let _ = buf_reader.read_line(&mut line).await.map_err(Error::Io)?;
289        if line.trim().is_empty() {
290            break;
291        }
292    }
293
294    if status_code == 200 {
295        Ok(stream)
296    } else {
297        Err(Error::Transfer {
298            code: 56,
299            message: format!(
300                "CONNECT tunnel to {target_host}:{target_port} failed with status {status_code}"
301            ),
302        })
303    }
304}
305
306/// Configuration options for FTP transfers.
307///
308/// Controls passive/active mode selection, directory creation,
309/// CWD strategy, and account handling.
310#[derive(Debug, Clone)]
311#[allow(clippy::struct_excessive_bools)] // These are independent FTP options, not state flags
312pub struct FtpConfig {
313    /// Use EPSV (extended passive) instead of PASV (default: true).
314    pub use_epsv: bool,
315    /// Use EPRT (extended active) instead of PORT (default: true).
316    pub use_eprt: bool,
317    /// Skip the IP from the PASV response, use control connection IP.
318    pub skip_pasv_ip: bool,
319    /// FTP account string (sent via ACCT after login).
320    pub account: Option<String>,
321    /// Create missing directories on server during upload.
322    pub create_dirs: bool,
323    /// Directory traversal method.
324    pub method: FtpMethod,
325    /// Active mode address (None = passive mode).
326    pub active_port: Option<String>,
327    /// Use ASCII transfer mode (`--use-ascii` / `-B`).
328    pub use_ascii: bool,
329    /// Append to remote file instead of overwriting (`--append` / `-a`).
330    pub append: bool,
331    /// Convert LF to CRLF on upload (`--crlf`).
332    pub crlf: bool,
333    /// List only (NLST instead of LIST; `-l` / `--list-only`).
334    pub list_only: bool,
335    /// HEAD request — only get file info, no data transfer (`-I` / `--head`).
336    pub nobody: bool,
337    /// Pre-transfer FTP quote commands (from `-Q "CMD"`), sent after CWD, before PASV.
338    pub pre_quote: Vec<String>,
339    /// Post-PASV / pre-RETR quote commands (from `-Q "+CMD"`), sent after TYPE, before SIZE/RETR.
340    pub post_pasv_quote: Vec<String>,
341    /// Post-transfer FTP quote commands (from `-Q "-CMD"`).
342    pub post_quote: Vec<String>,
343    /// Time condition for conditional download (-z).
344    /// `Some((timestamp, negate))` where `negate=false` means download if newer,
345    /// `negate=true` means download if older.
346    pub time_condition: Option<(i64, bool)>,
347    /// End byte for range download (e.g., `-r 4-16` → `range_end = Some(16)`).
348    /// When set, ABOR is sent after reading `range_end - start + 1` bytes.
349    pub range_end: Option<u64>,
350    /// Negative range: last N bytes of the file (e.g., `-r -12` → `range_from_end = Some(12)`).
351    /// Resolved to a REST offset after SIZE response.
352    pub range_from_end: Option<u64>,
353    /// Skip SIZE command (`--ignore-content-length`).
354    pub ignore_content_length: bool,
355    /// Maximum file size allowed for download (`--max-filesize`).
356    /// If SIZE response exceeds this, QUIT before RETR with error code 63.
357    pub max_filesize: Option<u64>,
358    /// Send PRET command before PASV/EPSV (`--ftp-pret`).
359    pub use_pret: bool,
360    /// Use TLS only for control connection, not data (curl `--ftp-ssl-control`).
361    /// When true, PROT C (Clear) is sent instead of PROT P (Private).
362    pub ssl_control: bool,
363    /// Send CCC (Clear Command Channel) after PROT (curl `--ftp-ssl-ccc`).
364    pub ssl_ccc: bool,
365    /// Alternative USER command when initial USER fails (curl `--ftp-alternative-to-user`).
366    pub alternative_to_user: Option<String>,
367}
368
369impl Default for FtpConfig {
370    fn default() -> Self {
371        Self {
372            use_epsv: true,
373            use_eprt: true,
374            skip_pasv_ip: false,
375            account: None,
376            create_dirs: false,
377            method: FtpMethod::default(),
378            active_port: None,
379            use_ascii: false,
380            append: false,
381            crlf: false,
382            list_only: false,
383            nobody: false,
384            pre_quote: Vec::new(),
385            post_pasv_quote: Vec::new(),
386            post_quote: Vec::new(),
387            time_condition: None,
388            range_end: None,
389            range_from_end: None,
390            ignore_content_length: false,
391            max_filesize: None,
392            use_pret: false,
393            ssl_control: false,
394            ssl_ccc: false,
395            alternative_to_user: None,
396        }
397    }
398}
399
400/// A data connection that may be fully connected (passive) or pending accept (active).
401///
402/// In passive mode, the connection is established immediately.
403/// In active mode, the listener is ready but the server hasn't connected yet —
404/// `accept()` must be called after sending RETR/LIST/STOR to complete the connection.
405#[allow(clippy::large_enum_variant)]
406pub(crate) enum DataConnection {
407    /// Fully established data connection (passive mode).
408    Connected(FtpStream),
409    /// Pending active mode: listener waiting for server to connect.
410    PendingActive {
411        /// TCP listener waiting for the server's data connection.
412        listener: tokio::net::TcpListener,
413        /// Whether to wrap the accepted connection with TLS.
414        use_tls: bool,
415    },
416}
417
418impl DataConnection {
419    /// Get the connected stream, accepting the active mode connection if needed.
420    ///
421    /// For passive mode, returns the stream immediately.
422    /// For active mode, waits for the server to connect (with optional timeout).
423    async fn into_stream(
424        self,
425        session: &FtpSession,
426        timeout: Option<std::time::Duration>,
427    ) -> Result<FtpStream, Error> {
428        match self {
429            Self::Connected(stream) => Ok(stream),
430            Self::PendingActive { listener, use_tls } => {
431                let accept_fut = listener.accept();
432                let (tcp, _) = if let Some(dur) = timeout {
433                    tokio::time::timeout(dur, accept_fut).await.map_err(|_| Error::Transfer {
434                        code: 10,
435                        message: "FTP active mode accept timed out".to_string(),
436                    })?
437                } else {
438                    accept_fut.await
439                }
440                .map_err(|e| Error::Http(format!("FTP active mode accept failed: {e}")))?;
441
442                if use_tls {
443                    session.maybe_wrap_data_tls(tcp).await
444                } else {
445                    Ok(FtpStream::Plain(tcp))
446                }
447            }
448        }
449    }
450}
451
452/// An active FTP session with an established control connection.
453///
454/// Handles login, passive/active mode, data transfer operations,
455/// and optional TLS encryption (FTPS).
456pub struct FtpSession {
457    reader: BufReader<ReadHalf<FtpStream>>,
458    writer: WriteHalf<FtpStream>,
459    features: Option<FtpFeatures>,
460    /// Server hostname for TLS SNI.
461    hostname: String,
462    /// Port used for this control connection.
463    port: u16,
464    /// Username used for login.
465    user: String,
466    /// Local address of the control connection (for active mode PORT commands).
467    local_addr: SocketAddr,
468    /// Whether data connections should use TLS (set after PROT P).
469    use_tls_data: bool,
470    /// Address for active mode data connections (`None` = use passive mode).
471    active_port: Option<String>,
472    /// TLS connector for wrapping data connections.
473    #[cfg(feature = "rustls")]
474    tls_connector: Option<crate::tls::TlsConnector>,
475    /// FTP transfer configuration.
476    config: FtpConfig,
477    /// Accumulated raw FTP response bytes for dump-header support.
478    header_bytes: Vec<u8>,
479    /// Current working directory components (for connection reuse).
480    /// Empty means "initial state after login" (home directory).
481    current_dir: Vec<String>,
482    /// Home directory from PWD response (for resetting on connection reuse).
483    /// curl CWDs to this path instead of "/" when reusing connections.
484    home_dir: Option<String>,
485    /// Current TYPE setting (for skipping redundant TYPE commands on reuse).
486    current_type: Option<TransferType>,
487    /// Proxy configuration for routing data connections through a proxy.
488    proxy_config: Option<FtpProxyConfig>,
489    /// Raw CONNECT response bytes for HTTP CONNECT tunnel output.
490    connect_response_bytes: Vec<u8>,
491}
492
493impl FtpSession {
494    /// Read an FTP response and record its raw bytes in `header_bytes` for dump-header.
495    async fn read_and_record(&mut self) -> Result<FtpResponse, Error> {
496        let resp = read_response(&mut self.reader).await?;
497        self.header_bytes.extend_from_slice(&resp.raw_bytes);
498        Ok(resp)
499    }
500
501    /// Connect to an FTP server and log in (plain FTP, no TLS).
502    ///
503    /// # Errors
504    ///
505    /// Returns an error if connection, login, or greeting fails.
506    pub async fn connect(
507        host: &str,
508        port: u16,
509        user: &str,
510        pass: &str,
511        config: FtpConfig,
512    ) -> Result<Self, Error> {
513        Self::connect_maybe_proxy(host, port, user, pass, config, None).await
514    }
515
516    /// Connect to an FTP server and log in, optionally through a proxy.
517    ///
518    /// When `proxy` is `Some`, the control connection is routed through the
519    /// proxy and proxy info is stored for routing data connections too.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if connection, login, or greeting fails.
524    pub async fn connect_maybe_proxy(
525        host: &str,
526        port: u16,
527        user: &str,
528        pass: &str,
529        config: FtpConfig,
530        proxy: Option<FtpProxyConfig>,
531    ) -> Result<Self, Error> {
532        let (tcp, connect_response_bytes) = if let Some(ref proxy_config) = proxy {
533            let stream = connect_via_proxy(proxy_config, host, port).await?;
534            // Capture CONNECT response for HTTP tunnel output (curl compat: test 714)
535            let connect_bytes = if matches!(proxy_config, FtpProxyConfig::HttpConnect { .. }) {
536                b"HTTP/1.1 200 Connection established\r\n\r\n".to_vec()
537            } else {
538                Vec::new()
539            };
540            (stream, connect_bytes)
541        } else {
542            let addr = format!("{host}:{port}");
543            let stream = TcpStream::connect(&addr).await.map_err(Error::Connect)?;
544            (stream, Vec::new())
545        };
546        let local_addr = tcp.local_addr().map_err(Error::Connect)?;
547        let stream = FtpStream::Plain(tcp);
548        let (reader, writer) = tokio::io::split(stream);
549        let mut reader = BufReader::new(reader);
550
551        // Read server greeting
552        let greeting = read_response(&mut reader).await?;
553        if !greeting.is_complete() {
554            return Err(Error::Http(format!(
555                "FTP server rejected connection: {} {}",
556                greeting.code, greeting.message
557            )));
558        }
559        // 230 greeting means already authenticated (curl compat: test 1219)
560        let skip_login = greeting.code == 230;
561
562        let active_port = config.active_port.clone();
563        let alt_to_user = config.alternative_to_user.clone();
564        let mut header_bytes = Vec::new();
565        header_bytes.extend_from_slice(&greeting.raw_bytes);
566        let mut session = Self {
567            reader,
568            writer,
569            features: None,
570            hostname: host.to_string(),
571            port,
572            user: user.to_string(),
573            local_addr,
574            use_tls_data: false,
575            active_port,
576            #[cfg(feature = "rustls")]
577            tls_connector: None,
578            config,
579            header_bytes,
580            current_dir: Vec::new(),
581            home_dir: None,
582            current_type: None,
583            proxy_config: proxy,
584            connect_response_bytes,
585        };
586
587        // Login (skip if server sent 230 in greeting)
588        session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
589
590        // Send ACCT command if configured
591        if let Some(ref account) = session.config.account {
592            let acct_cmd = format!("ACCT {account}");
593            send_command(&mut session.writer, &acct_cmd).await?;
594            let acct_resp = session.read_and_record().await?;
595            if !acct_resp.is_complete() {
596                return Err(Error::Transfer {
597                    code: 11,
598                    message: format!("FTP ACCT failed: {} {}", acct_resp.code, acct_resp.message),
599                });
600            }
601        }
602
603        Ok(session)
604    }
605
606    /// Connect to an FTP server with TLS support.
607    ///
608    /// For `FtpSslMode::Explicit`, connects plain, then upgrades with AUTH TLS.
609    /// For `FtpSslMode::Implicit`, connects directly over TLS (port 990).
610    /// For `FtpSslMode::None`, behaves like `connect()`.
611    ///
612    /// # Errors
613    ///
614    /// Returns an error if connection, TLS negotiation, or login fails.
615    #[cfg(feature = "rustls")]
616    #[allow(clippy::too_many_arguments)]
617    pub async fn connect_with_tls(
618        host: &str,
619        port: u16,
620        user: &str,
621        pass: &str,
622        ssl_mode: FtpSslMode,
623        use_ssl: UseSsl,
624        tls_config: &crate::tls::TlsConfig,
625        config: FtpConfig,
626    ) -> Result<Self, Error> {
627        if ssl_mode == FtpSslMode::None {
628            return Self::connect(host, port, user, pass, config).await;
629        }
630
631        let tls_connector = crate::tls::TlsConnector::new_no_alpn(tls_config)?;
632
633        let addr = format!("{host}:{port}");
634        let tcp = TcpStream::connect(&addr).await.map_err(Error::Connect)?;
635        let local_addr = tcp.local_addr().map_err(Error::Connect)?;
636
637        let stream = match ssl_mode {
638            FtpSslMode::Implicit => {
639                // Implicit FTPS: wrap immediately with TLS
640                let (tls_stream, _) = tls_connector.connect(tcp, host).await?;
641                FtpStream::Tls(tls_stream)
642            }
643            FtpSslMode::Explicit | FtpSslMode::None => {
644                // Explicit: start plain, upgrade after greeting
645                FtpStream::Plain(tcp)
646            }
647        };
648
649        let (reader, writer) = tokio::io::split(stream);
650        let mut reader = BufReader::new(reader);
651
652        // Read server greeting
653        let greeting = read_response(&mut reader).await?;
654        if !greeting.is_complete() {
655            return Err(Error::Http(format!(
656                "FTP server rejected connection: {} {}",
657                greeting.code, greeting.message
658            )));
659        }
660        let skip_login = greeting.code == 230;
661
662        let active_port = config.active_port.clone();
663        let alt_to_user = config.alternative_to_user.clone();
664        let mut header_bytes = Vec::new();
665        header_bytes.extend_from_slice(&greeting.raw_bytes);
666        let mut session = Self {
667            reader,
668            writer,
669            features: None,
670            hostname: host.to_string(),
671            port,
672            user: user.to_string(),
673            local_addr,
674            use_tls_data: false,
675            active_port,
676            tls_connector: Some(tls_connector),
677            config,
678            header_bytes,
679            current_dir: Vec::new(),
680            home_dir: None,
681            current_type: None,
682            proxy_config: None,
683            connect_response_bytes: Vec::new(),
684        };
685
686        // For explicit FTPS, upgrade the control connection to TLS
687        if ssl_mode == FtpSslMode::Explicit {
688            let (upgraded_session, auth_succeeded) =
689                session.auth_tls_with_fallback(use_ssl).await?;
690            session = upgraded_session;
691            if auth_succeeded {
692                // AUTH succeeded: PBSZ/PROT before login
693                session.setup_data_protection().await?;
694            }
695            session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
696        } else {
697            session.login(user, pass, skip_login, alt_to_user.as_deref()).await?;
698            session.setup_data_protection().await?;
699        }
700
701        // Send ACCT command if configured
702        if let Some(ref account) = session.config.account {
703            let acct_cmd = format!("ACCT {account}");
704            send_command(&mut session.writer, &acct_cmd).await?;
705            let acct_resp = session.read_and_record().await?;
706            if !acct_resp.is_complete() {
707                return Err(Error::Transfer {
708                    code: 11,
709                    message: format!("FTP ACCT failed: {} {}", acct_resp.code, acct_resp.message),
710                });
711            }
712        }
713
714        Ok(session)
715    }
716
717    /// Upgrade the control connection to TLS using AUTH SSL/TLS (RFC 4217).
718    ///
719    /// Tries AUTH SSL first, then AUTH TLS (matching curl's behavior).
720    /// If both fail, returns error 64 (for Required/Control) or
721    /// continues to error 8 on weird server replies (for Try).
722    /// Try AUTH SSL/TLS to upgrade to FTPS.
723    ///
724    /// Returns `(session, true)` if AUTH succeeded and TLS is now active,
725    /// `(session, false)` if Try mode fell through without TLS.
726    #[cfg(feature = "rustls")]
727    async fn auth_tls_with_fallback(mut self, use_ssl: UseSsl) -> Result<(Self, bool), Error> {
728        // Try AUTH SSL first (curl's behavior)
729        send_command(&mut self.writer, "AUTH SSL").await?;
730        let resp = self.read_and_record().await?;
731        if resp.is_complete() {
732            // AUTH SSL succeeded — perform TLS handshake
733            return Ok((self.do_tls_upgrade().await?, true));
734        }
735
736        if use_ssl == UseSsl::All {
737            // Required mode: try AUTH TLS as fallback
738            send_command(&mut self.writer, "AUTH TLS").await?;
739            let resp2 = self.read_and_record().await?;
740            if resp2.is_complete() {
741                // AUTH TLS succeeded — perform TLS handshake
742                return Ok((self.do_tls_upgrade().await?, true));
743            }
744
745            // Both AUTH commands failed — error 64 (CURLE_USE_SSL_FAILED)
746            return Err(Error::Transfer {
747                code: 64,
748                message: "FTP AUTH SSL/TLS failed: server does not support TLS".to_string(),
749            });
750        }
751
752        // Try mode: AUTH SSL failed.
753        // Check for pipelined data in the buffer — if the server sent extra
754        // data alongside the AUTH response, that's a weird server reply (error 8).
755        if !self.reader.buffer().is_empty() {
756            return Err(Error::Protocol(8));
757        }
758        // No pipelined data: continue without TLS
759        Ok((self, false))
760    }
761
762    /// Perform the actual TLS upgrade after a successful AUTH command.
763    #[cfg(feature = "rustls")]
764    async fn do_tls_upgrade(self) -> Result<Self, Error> {
765        // Reassemble the FtpStream from the split reader/writer halves
766        let reader_inner = self.reader.into_inner();
767        let stream = reader_inner.unsplit(self.writer);
768
769        // Extract TcpStream from the plain stream
770        let tcp = match stream {
771            FtpStream::Plain(tcp) => tcp,
772            FtpStream::Tls(_) => {
773                return Err(Error::Http("AUTH TLS on already-encrypted connection".to_string()));
774            }
775        };
776
777        // Wrap with TLS
778        let connector = self
779            .tls_connector
780            .as_ref()
781            .ok_or_else(|| Error::Http("No TLS connector available for AUTH TLS".to_string()))?;
782        let (tls_stream, _) = connector.connect(tcp, &self.hostname).await?;
783
784        // Re-split the TLS-wrapped stream
785        let ftp_stream = FtpStream::Tls(tls_stream);
786        let (reader, writer) = tokio::io::split(ftp_stream);
787
788        Ok(Self {
789            reader: BufReader::new(reader),
790            writer,
791            features: self.features,
792            hostname: self.hostname,
793            port: self.port,
794            user: self.user,
795            local_addr: self.local_addr,
796            use_tls_data: false,
797            active_port: self.active_port,
798            tls_connector: self.tls_connector,
799            config: self.config,
800            header_bytes: self.header_bytes,
801            current_dir: self.current_dir,
802            home_dir: self.home_dir,
803            current_type: self.current_type,
804            proxy_config: self.proxy_config,
805            connect_response_bytes: self.connect_response_bytes,
806        })
807    }
808
809    /// Set up data channel protection with PBSZ 0 and PROT P or PROT C.
810    ///
811    /// Called after TLS is established on the control connection.
812    /// Uses PROT C (clear) when `ssl_control` is true (--ftp-ssl-control),
813    /// otherwise uses PROT P (private) to encrypt data connections.
814    #[cfg(feature = "rustls")]
815    async fn setup_data_protection(&mut self) -> Result<(), Error> {
816        // PBSZ 0 (Protection Buffer Size — always 0 for TLS)
817        send_command(&mut self.writer, "PBSZ 0").await?;
818        let _pbsz_resp = self.read_and_record().await?;
819        // Ignore PBSZ failure — stunnel-wrapped servers don't support it
820        // but curl still sends it and continues.
821
822        // PROT C (Clear) for --ftp-ssl-control, PROT P (Private) otherwise
823        let prot_cmd = if self.config.ssl_control { "PROT C" } else { "PROT P" };
824        send_command(&mut self.writer, prot_cmd).await?;
825        let prot_resp = self.read_and_record().await?;
826        // Ignore PROT failure for the same reason.
827        if prot_resp.is_complete() {
828            // Only encrypt data connections with PROT P
829            self.use_tls_data = !self.config.ssl_control;
830        }
831
832        // CCC (Clear Command Channel) — downgrade control connection from TLS to plain
833        // curl sends this after PROT when --ftp-ssl-ccc is used.
834        // The server may reject it (e.g., stunnel-based servers), which is fine.
835        if self.config.ssl_ccc {
836            send_command(&mut self.writer, "CCC").await?;
837            let _ccc_resp = self.read_and_record().await?;
838            // Ignore CCC response — server may not support it
839        }
840
841        Ok(())
842    }
843
844    /// Set the address for active mode data connections.
845    ///
846    /// When set, PORT/EPRT commands are used instead of PASV.
847    /// The address can be an IP address or `"-"` to use the control
848    /// connection's local address.
849    pub fn set_active_port(&mut self, addr: &str) {
850        self.active_port = Some(addr.to_string());
851    }
852
853    /// Login with USER/PASS sequence.
854    ///
855    /// When the server sends a 230 in the greeting, `skip_login` should be true
856    /// to avoid sending USER/PASS. When USER fails and `alternative_to_user` is
857    /// set, sends that command before continuing with PASS.
858    ///
859    /// Returns `Error::Transfer { code: 67, .. }` on login failure (`CURLE_LOGIN_DENIED`).
860    async fn login(
861        &mut self,
862        user: &str,
863        pass: &str,
864        skip_login: bool,
865        alternative_to_user: Option<&str>,
866    ) -> Result<(), Error> {
867        if skip_login {
868            return Ok(());
869        }
870
871        send_command(&mut self.writer, &format!("USER {user}")).await?;
872        let user_resp = self.read_and_record().await?;
873
874        if user_resp.code == 331 {
875            // 331 = User name OK, need password
876            send_command(&mut self.writer, &format!("PASS {pass}")).await?;
877            let pass_resp = self.read_and_record().await?;
878            if pass_resp.code == 332 {
879                // 332 = Need account for login — ACCT will be sent by caller if configured.
880                // If no account is configured, fail immediately (curl compat: test 295).
881                if self.config.account.is_none() {
882                    return Err(Error::Transfer {
883                        code: 67,
884                        message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
885                    });
886                }
887            } else if !pass_resp.is_complete() {
888                return Err(Error::Transfer {
889                    code: 67,
890                    message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
891                });
892            }
893        } else if user_resp.is_complete() {
894            // 230 = Logged in without needing password
895        } else if alternative_to_user.is_some() {
896            // USER failed — try the alternative command (curl compat: test 280)
897            let alt = alternative_to_user.unwrap_or_default();
898            send_command(&mut self.writer, alt).await?;
899            let alt_resp = self.read_and_record().await?;
900            if alt_resp.code == 331 {
901                // Alt USER accepted, now send PASS
902                send_command(&mut self.writer, &format!("PASS {pass}")).await?;
903                let pass_resp = self.read_and_record().await?;
904                if !pass_resp.is_complete() && pass_resp.code != 332 {
905                    return Err(Error::Transfer {
906                        code: 67,
907                        message: format!("Access denied: {} {}", pass_resp.code, pass_resp.message),
908                    });
909                }
910            } else if !alt_resp.is_complete() {
911                return Err(Error::Transfer {
912                    code: 67,
913                    message: format!("Access denied: {} {}", alt_resp.code, alt_resp.message),
914                });
915            }
916        } else {
917            return Err(Error::Transfer {
918                code: 67,
919                message: format!("Access denied: {} {}", user_resp.code, user_resp.message),
920            });
921        }
922
923        Ok(())
924    }
925
926    /// Send PWD and ignore errors (curl always tries PWD but continues on failure).
927    async fn pwd_safe(&mut self) -> Option<String> {
928        if send_command(&mut self.writer, "PWD").await.is_err() {
929            return None;
930        }
931        match self.read_and_record().await {
932            Ok(resp) if resp.is_complete() => {
933                // Parse path from 257 "/path"
934                // Only extract if both opening and closing quotes are found.
935                // Unmatched quotes → treat as "could not get path" (curl compat: test 1152).
936                if let Some(start) = resp.message.find('"') {
937                    if let Some(end) = resp.message[start + 1..].find('"') {
938                        return Some(resp.message[start + 1..start + 1 + end].to_string());
939                    }
940                    // Unmatched quote: path extraction failed
941                    return None;
942                }
943                Some(resp.message)
944            }
945            _ => None,
946        }
947    }
948
949    /// Send FEAT command and parse server capabilities.
950    ///
951    /// # Errors
952    ///
953    /// Returns an error on communication failure. If the server doesn't
954    /// support FEAT, returns default (empty) features without error.
955    pub async fn feat(&mut self) -> Result<&FtpFeatures, Error> {
956        send_command(&mut self.writer, "FEAT").await?;
957        let resp = self.read_and_record().await?;
958
959        let features = if resp.is_complete() {
960            parse_feat_response(&resp.message)
961        } else {
962            // If FEAT returns 5xx (not supported), use empty defaults
963            FtpFeatures::default()
964        };
965
966        self.features = Some(features);
967        // features was just inserted, so get_or_insert_with won't allocate
968        Ok(self.features.get_or_insert_with(FtpFeatures::default))
969    }
970
971    /// Set the transfer type (ASCII or Binary).
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the TYPE command fails.
976    pub async fn set_type(&mut self, transfer_type: TransferType) -> Result<(), Error> {
977        let type_cmd = match transfer_type {
978            TransferType::Ascii => "TYPE A",
979            TransferType::Binary => "TYPE I",
980        };
981        send_command(&mut self.writer, type_cmd).await?;
982        let resp = self.read_and_record().await?;
983        if !resp.is_complete() {
984            return Err(Error::Http(format!("FTP TYPE failed: {} {}", resp.code, resp.message)));
985        }
986        Ok(())
987    }
988
989    /// Open a data connection, choosing passive or active mode.
990    ///
991    /// If `active_port` is set, uses PORT/EPRT (active mode).
992    /// Otherwise, uses PASV (passive mode).
993    async fn open_data_connection(&mut self) -> Result<DataConnection, Error> {
994        if let Some(ref addr) = self.active_port {
995            let addr = addr.clone();
996            self.open_active_data_connection(&addr).await
997        } else {
998            let stream = self.open_passive_data_connection(None).await?;
999            Ok(DataConnection::Connected(stream))
1000        }
1001    }
1002
1003    /// Open a data connection with an optional PRET command for `--ftp-pret`.
1004    ///
1005    /// If `pret_cmd` is provided and `config.use_pret` is true, sends
1006    /// `PRET <pret_cmd>` before entering passive mode (curl compat: test 1107).
1007    async fn open_data_connection_with_pret(
1008        &mut self,
1009        pret_cmd: &str,
1010    ) -> Result<DataConnection, Error> {
1011        if let Some(ref addr) = self.active_port {
1012            let addr = addr.clone();
1013            self.open_active_data_connection(&addr).await
1014        } else {
1015            let pret = if self.config.use_pret { Some(pret_cmd) } else { None };
1016            let stream = self.open_passive_data_connection(pret).await?;
1017            Ok(DataConnection::Connected(stream))
1018        }
1019    }
1020
1021    /// Enter passive mode and open a data connection (EPSV or PASV).
1022    ///
1023    /// If `config.use_epsv` is true, tries EPSV first and falls back to PASV.
1024    /// If `config.skip_pasv_ip` is true, uses the control connection host
1025    /// instead of the IP from the PASV response.
1026    /// If `pret_cmd` is provided, sends `PRET <cmd>` before EPSV/PASV
1027    /// (curl compat: test 1107, 1108).
1028    ///
1029    /// Returns `Error::Transfer { code: 13, .. }` if both EPSV and PASV fail.
1030    async fn open_passive_data_connection(
1031        &mut self,
1032        pret_cmd: Option<&str>,
1033    ) -> Result<FtpStream, Error> {
1034        // Send PRET before EPSV/PASV if configured (curl compat: test 1107)
1035        if let Some(cmd) = pret_cmd {
1036            send_command(&mut self.writer, &format!("PRET {cmd}")).await?;
1037            let pret_resp = self.read_and_record().await?;
1038            if pret_resp.code >= 400 {
1039                // PRET rejected — CURLE_FTP_PRET_FAILED (84)
1040                return Err(Error::Transfer {
1041                    code: 84,
1042                    message: format!(
1043                        "PRET command not accepted: {} {}",
1044                        pret_resp.code, pret_resp.message
1045                    ),
1046                });
1047            }
1048        }
1049        // For IPv6, EPSV is always required (PASV doesn't support IPv6 addresses).
1050        // curl compat: --disable-epsv only disables EPSV for IPv4.
1051        let force_epsv = self.local_addr.is_ipv6();
1052
1053        // Try EPSV first if enabled, or always for IPv6
1054        if self.config.use_epsv || force_epsv {
1055            send_command(&mut self.writer, "EPSV").await?;
1056            let epsv_resp = self.read_and_record().await?;
1057            if epsv_resp.code == 229 {
1058                match parse_epsv_response(&epsv_resp.message) {
1059                    Ok(data_port) => {
1060                        let tcp = self.connect_data(&self.hostname.clone(), data_port).await;
1061                        match tcp {
1062                            Ok(tcp) => {
1063                                // Capture CONNECT response for data tunnel (curl compat: test 714)
1064                                if matches!(
1065                                    self.proxy_config,
1066                                    Some(FtpProxyConfig::HttpConnect { .. })
1067                                ) {
1068                                    self.connect_response_bytes.extend_from_slice(
1069                                        b"HTTP/1.1 200 Connection established\r\n\r\n",
1070                                    );
1071                                }
1072                                return self.maybe_wrap_data_tls(tcp).await;
1073                            }
1074                            Err(e) => {
1075                                if force_epsv {
1076                                    // IPv6 has no PASV fallback
1077                                    return Err(Error::Http(format!(
1078                                        "FTP EPSV data connection failed: {e}"
1079                                    )));
1080                                }
1081                                // Data connection failed — fall through to PASV
1082                                // (curl compat: test 1233)
1083                                self.config.use_epsv = false;
1084                            }
1085                        }
1086                    }
1087                    Err(e) => {
1088                        // Bad EPSV response (e.g. port > 65535) — return error
1089                        // (curl compat: test 238)
1090                        return Err(e);
1091                    }
1092                }
1093            } else if force_epsv {
1094                // IPv6 has no PASV fallback — EPSV is required
1095                return Err(Error::Transfer {
1096                    code: 13,
1097                    message: format!(
1098                        "FTP EPSV failed: {} {} (PASV not available for IPv6)",
1099                        epsv_resp.code, epsv_resp.message
1100                    ),
1101                });
1102            } else {
1103                // EPSV failed (e.g. 500/502), remember and fall through to PASV
1104                self.config.use_epsv = false;
1105            }
1106        }
1107
1108        send_command(&mut self.writer, "PASV").await?;
1109        let pasv_resp = self.read_and_record().await?;
1110        if pasv_resp.code != 227 {
1111            return Err(Error::Transfer {
1112                code: 13,
1113                message: format!("FTP PASV failed: {} {}", pasv_resp.code, pasv_resp.message),
1114            });
1115        }
1116        let (data_host, data_port) = parse_pasv_response(&pasv_resp.message)?;
1117
1118        // If skip_pasv_ip is set, use the control connection host instead of
1119        // the IP address returned in the PASV response.
1120        let effective_host =
1121            if self.config.skip_pasv_ip { self.hostname.clone() } else { data_host };
1122
1123        let tcp = self
1124            .connect_data(&effective_host, data_port)
1125            .await
1126            .map_err(|e| Error::Http(format!("FTP data connection failed: {e}")))?;
1127
1128        self.maybe_wrap_data_tls(tcp).await
1129    }
1130
1131    /// Create a data connection, routing through proxy if configured.
1132    async fn connect_data(&self, host: &str, port: u16) -> Result<TcpStream, Error> {
1133        if let Some(ref proxy) = self.proxy_config {
1134            connect_via_proxy(proxy, host, port).await
1135        } else {
1136            let data_addr = format!("{host}:{port}");
1137            TcpStream::connect(&data_addr).await.map_err(Error::Connect)
1138        }
1139    }
1140
1141    /// Open a data connection in active mode (PORT/EPRT).
1142    ///
1143    /// Binds a listener on a local port, sends PORT/EPRT to the server,
1144    /// and waits for the server to connect.
1145    ///
1146    /// When `config.use_eprt` is true and the address is IPv4, tries EPRT first
1147    /// and falls back to PORT on failure (curl behavior for IPv6-capable builds).
1148    ///
1149    /// Returns `Error::Transfer { code: 30, .. }` if both EPRT and PORT fail.
1150    async fn open_active_data_connection(
1151        &mut self,
1152        bind_addr: &str,
1153    ) -> Result<DataConnection, Error> {
1154        // Determine the IP to advertise in PORT/EPRT.
1155        // `-` means use the control connection's local address.
1156        // An explicit IP means advertise that IP (but bind locally).
1157        let advertise_ip: std::net::IpAddr = if bind_addr == "-" {
1158            self.local_addr.ip()
1159        } else {
1160            bind_addr.parse().map_err(|e| {
1161                Error::Http(format!("Invalid FTP active address '{bind_addr}': {e}"))
1162            })?
1163        };
1164
1165        // Bind to local address: match address family of the advertised IP.
1166        // If bind_addr is "-", use the control connection's local address.
1167        // Otherwise use the unspecified address matching the family (curl compat: test 1050).
1168        let bind_ip = if bind_addr == "-" {
1169            self.local_addr.ip()
1170        } else if advertise_ip.is_ipv6() {
1171            std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED)
1172        } else {
1173            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)
1174        };
1175        let bind = SocketAddr::new(bind_ip, 0);
1176        let listener = tokio::net::TcpListener::bind(bind)
1177            .await
1178            .map_err(|e| Error::Http(format!("FTP active mode bind failed: {e}")))?;
1179        let listen_addr = listener
1180            .local_addr()
1181            .map_err(|e| Error::Http(format!("FTP active mode local_addr failed: {e}")))?;
1182        // Use the advertised IP with the locally assigned port
1183        let advertise_addr = SocketAddr::new(advertise_ip, listen_addr.port());
1184        let local_ip = advertise_ip;
1185
1186        // Send PORT or EPRT depending on address family and config.
1187        // For IPv6, EPRT is always required (PORT doesn't support IPv6).
1188        // For IPv4, try EPRT first if use_eprt is true, fall back to PORT.
1189        let mut port_ok = false;
1190
1191        if local_ip.is_ipv6() || self.config.use_eprt {
1192            let eprt_cmd = format_eprt_command(&advertise_addr);
1193            send_command(&mut self.writer, &eprt_cmd).await?;
1194            let resp = self.read_and_record().await?;
1195            if resp.is_complete() {
1196                port_ok = true;
1197            } else if local_ip.is_ipv6() {
1198                // IPv6 has no PORT fallback
1199                return Err(Error::Transfer {
1200                    code: 30,
1201                    message: format!("FTP EPRT failed: {} {}", resp.code, resp.message),
1202                });
1203            }
1204            // IPv4 EPRT failed, remember and fall through to PORT
1205            if !port_ok {
1206                self.config.use_eprt = false;
1207            }
1208        }
1209
1210        if !port_ok && local_ip.is_ipv4() {
1211            let port_cmd = format_port_command(&advertise_addr);
1212            send_command(&mut self.writer, &port_cmd).await?;
1213            let resp = self.read_and_record().await?;
1214            if !resp.is_complete() {
1215                return Err(Error::Transfer {
1216                    code: 30,
1217                    message: format!("FTP PORT failed: {} {}", resp.code, resp.message),
1218                });
1219            }
1220        }
1221
1222        // Return the listener for deferred accept (after RETR/STOR/LIST is sent)
1223        // This allows detecting server error responses (425/421) before blocking on accept
1224        // (curl compat: tests 1206, 1207, 1208)
1225        Ok(DataConnection::PendingActive { listener, use_tls: self.use_tls_data })
1226    }
1227
1228    /// Optionally wrap a data connection TCP stream with TLS.
1229    ///
1230    /// If `use_tls_data` is true and a TLS connector is available,
1231    /// wraps the stream. Otherwise, returns it as plain.
1232    async fn maybe_wrap_data_tls(&self, tcp: TcpStream) -> Result<FtpStream, Error> {
1233        #[cfg(feature = "rustls")]
1234        if self.use_tls_data {
1235            if let Some(ref connector) = self.tls_connector {
1236                let (tls_stream, _) = connector.connect(tcp, &self.hostname).await?;
1237                return Ok(FtpStream::Tls(tls_stream));
1238            }
1239        }
1240
1241        Ok(FtpStream::Plain(tcp))
1242    }
1243
1244    /// Download a file from the server.
1245    ///
1246    /// # Errors
1247    ///
1248    /// Returns an error if the transfer fails.
1249    pub async fn download(&mut self, path: &str) -> Result<Vec<u8>, Error> {
1250        self.set_type(TransferType::Binary).await?;
1251        let data_conn = self.open_data_connection().await?;
1252        let mut data_stream = data_conn.into_stream(self, None).await?;
1253
1254        send_command(&mut self.writer, &format!("RETR {path}")).await?;
1255        let retr_resp = self.read_and_record().await?;
1256        if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
1257            return Err(Error::Http(format!(
1258                "FTP RETR failed: {} {}",
1259                retr_resp.code, retr_resp.message
1260            )));
1261        }
1262
1263        let mut data = Vec::new();
1264        let _ = data_stream
1265            .read_to_end(&mut data)
1266            .await
1267            .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
1268        drop(data_stream);
1269
1270        let complete_resp = self.read_and_record().await?;
1271        if !complete_resp.is_complete() {
1272            return Err(Error::Http(format!(
1273                "FTP transfer failed: {} {}",
1274                complete_resp.code, complete_resp.message
1275            )));
1276        }
1277
1278        Ok(data)
1279    }
1280
1281    /// Download a file with resume from a byte offset (REST + RETR).
1282    ///
1283    /// # Errors
1284    ///
1285    /// Returns an error if the server doesn't support REST or the transfer fails.
1286    pub async fn download_resume(&mut self, path: &str, offset: u64) -> Result<Vec<u8>, Error> {
1287        self.set_type(TransferType::Binary).await?;
1288        let data_conn = self.open_data_connection().await?;
1289        let mut data_stream = data_conn.into_stream(self, None).await?;
1290
1291        // Send REST to set the starting offset
1292        send_command(&mut self.writer, &format!("REST {offset}")).await?;
1293        let rest_resp = self.read_and_record().await?;
1294        if !rest_resp.is_intermediate() {
1295            return Err(Error::Http(format!(
1296                "FTP REST failed: {} {}",
1297                rest_resp.code, rest_resp.message
1298            )));
1299        }
1300
1301        send_command(&mut self.writer, &format!("RETR {path}")).await?;
1302        let retr_resp = self.read_and_record().await?;
1303        if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
1304            return Err(Error::Http(format!(
1305                "FTP RETR failed: {} {}",
1306                retr_resp.code, retr_resp.message
1307            )));
1308        }
1309
1310        let mut data = Vec::new();
1311        let _ = data_stream
1312            .read_to_end(&mut data)
1313            .await
1314            .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
1315        drop(data_stream);
1316
1317        let complete_resp = self.read_and_record().await?;
1318        if !complete_resp.is_complete() {
1319            return Err(Error::Http(format!(
1320                "FTP transfer failed: {} {}",
1321                complete_resp.code, complete_resp.message
1322            )));
1323        }
1324
1325        Ok(data)
1326    }
1327
1328    /// Upload a file to the server (STOR).
1329    ///
1330    /// # Errors
1331    ///
1332    /// Returns an error if the transfer fails.
1333    pub async fn upload(&mut self, path: &str, data: &[u8]) -> Result<(), Error> {
1334        self.set_type(TransferType::Binary).await?;
1335        let data_conn = self.open_data_connection().await?;
1336        let mut data_stream = data_conn.into_stream(self, None).await?;
1337
1338        send_command(&mut self.writer, &format!("STOR {path}")).await?;
1339        let stor_resp = self.read_and_record().await?;
1340        if !stor_resp.is_preliminary() && !stor_resp.is_complete() {
1341            return Err(Error::Http(format!(
1342                "FTP STOR failed: {} {}",
1343                stor_resp.code, stor_resp.message
1344            )));
1345        }
1346
1347        data_stream
1348            .write_all(data)
1349            .await
1350            .map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
1351        data_stream
1352            .shutdown()
1353            .await
1354            .map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
1355        drop(data_stream);
1356
1357        let complete_resp = self.read_and_record().await?;
1358        if !complete_resp.is_complete() {
1359            return Err(Error::Http(format!(
1360                "FTP upload failed: {} {}",
1361                complete_resp.code, complete_resp.message
1362            )));
1363        }
1364
1365        Ok(())
1366    }
1367
1368    /// Append data to a file on the server (APPE).
1369    ///
1370    /// # Errors
1371    ///
1372    /// Returns an error if the transfer fails.
1373    pub async fn append(&mut self, path: &str, data: &[u8]) -> Result<(), Error> {
1374        self.set_type(TransferType::Binary).await?;
1375        let data_conn = self.open_data_connection().await?;
1376        let mut data_stream = data_conn.into_stream(self, None).await?;
1377
1378        send_command(&mut self.writer, &format!("APPE {path}")).await?;
1379        let appe_resp = self.read_and_record().await?;
1380        if !appe_resp.is_preliminary() && !appe_resp.is_complete() {
1381            return Err(Error::Http(format!(
1382                "FTP APPE failed: {} {}",
1383                appe_resp.code, appe_resp.message
1384            )));
1385        }
1386
1387        data_stream
1388            .write_all(data)
1389            .await
1390            .map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
1391        data_stream
1392            .shutdown()
1393            .await
1394            .map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
1395        drop(data_stream);
1396
1397        let complete_resp = self.read_and_record().await?;
1398        if !complete_resp.is_complete() {
1399            return Err(Error::Http(format!(
1400                "FTP append failed: {} {}",
1401                complete_resp.code, complete_resp.message
1402            )));
1403        }
1404
1405        Ok(())
1406    }
1407
1408    /// List directory contents (LIST).
1409    ///
1410    /// # Errors
1411    ///
1412    /// Returns an error if the listing fails.
1413    pub async fn list(&mut self, path: Option<&str>) -> Result<Vec<u8>, Error> {
1414        if let Some(dir) = path {
1415            if !dir.is_empty() && dir != "/" {
1416                send_command(&mut self.writer, &format!("CWD {dir}")).await?;
1417                let cwd_resp = self.read_and_record().await?;
1418                if !cwd_resp.is_complete() {
1419                    return Err(Error::Http(format!(
1420                        "FTP CWD failed: {} {}",
1421                        cwd_resp.code, cwd_resp.message
1422                    )));
1423                }
1424            }
1425        }
1426
1427        let data_conn = self.open_data_connection().await?;
1428        let mut data_stream = data_conn.into_stream(self, None).await?;
1429
1430        send_command(&mut self.writer, "LIST").await?;
1431        let list_resp = self.read_and_record().await?;
1432        if !list_resp.is_preliminary() && !list_resp.is_complete() {
1433            return Err(Error::Http(format!(
1434                "FTP LIST failed: {} {}",
1435                list_resp.code, list_resp.message
1436            )));
1437        }
1438
1439        let mut data = Vec::new();
1440        let _ = data_stream
1441            .read_to_end(&mut data)
1442            .await
1443            .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
1444        drop(data_stream);
1445
1446        let complete_resp = self.read_and_record().await?;
1447        if !complete_resp.is_complete() {
1448            return Err(Error::Http(format!(
1449                "FTP transfer failed: {} {}",
1450                complete_resp.code, complete_resp.message
1451            )));
1452        }
1453
1454        Ok(data)
1455    }
1456
1457    /// Machine-readable listing (MLSD, RFC 3659).
1458    ///
1459    /// # Errors
1460    ///
1461    /// Returns an error if MLSD is not supported or fails.
1462    pub async fn mlsd(&mut self, path: Option<&str>) -> Result<Vec<u8>, Error> {
1463        let data_conn = self.open_data_connection().await?;
1464        let mut data_stream = data_conn.into_stream(self, None).await?;
1465
1466        let cmd = path.map_or_else(|| "MLSD".to_string(), |p| format!("MLSD {p}"));
1467        send_command(&mut self.writer, &cmd).await?;
1468        let resp = self.read_and_record().await?;
1469        if !resp.is_preliminary() && !resp.is_complete() {
1470            return Err(Error::Http(format!("FTP MLSD failed: {} {}", resp.code, resp.message)));
1471        }
1472
1473        let mut data = Vec::new();
1474        let _ = data_stream
1475            .read_to_end(&mut data)
1476            .await
1477            .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
1478        drop(data_stream);
1479
1480        let complete_resp = self.read_and_record().await?;
1481        if !complete_resp.is_complete() {
1482            return Err(Error::Http(format!(
1483                "FTP MLSD transfer failed: {} {}",
1484                complete_resp.code, complete_resp.message
1485            )));
1486        }
1487
1488        Ok(data)
1489    }
1490
1491    /// Get file size (SIZE command).
1492    ///
1493    /// # Errors
1494    ///
1495    /// Returns an error if SIZE is not supported or fails.
1496    pub async fn size(&mut self, path: &str) -> Result<u64, Error> {
1497        send_command(&mut self.writer, &format!("SIZE {path}")).await?;
1498        let resp = self.read_and_record().await?;
1499        if !resp.is_complete() {
1500            return Err(Error::Http(format!("FTP SIZE failed: {} {}", resp.code, resp.message)));
1501        }
1502        resp.message
1503            .trim()
1504            .parse::<u64>()
1505            .map_err(|e| Error::Http(format!("FTP SIZE parse error: {e}")))
1506    }
1507
1508    /// Create a directory (MKD).
1509    ///
1510    /// # Errors
1511    ///
1512    /// Returns an error if the directory cannot be created.
1513    pub async fn mkdir(&mut self, path: &str) -> Result<(), Error> {
1514        send_command(&mut self.writer, &format!("MKD {path}")).await?;
1515        let resp = self.read_and_record().await?;
1516        if !resp.is_complete() {
1517            return Err(Error::Http(format!("FTP MKD failed: {} {}", resp.code, resp.message)));
1518        }
1519        Ok(())
1520    }
1521
1522    /// Remove a directory (RMD).
1523    ///
1524    /// # Errors
1525    ///
1526    /// Returns an error if the directory cannot be removed.
1527    pub async fn rmdir(&mut self, path: &str) -> Result<(), Error> {
1528        send_command(&mut self.writer, &format!("RMD {path}")).await?;
1529        let resp = self.read_and_record().await?;
1530        if !resp.is_complete() {
1531            return Err(Error::Http(format!("FTP RMD failed: {} {}", resp.code, resp.message)));
1532        }
1533        Ok(())
1534    }
1535
1536    /// Delete a file (DELE).
1537    ///
1538    /// # Errors
1539    ///
1540    /// Returns an error if the file cannot be deleted.
1541    pub async fn delete(&mut self, path: &str) -> Result<(), Error> {
1542        send_command(&mut self.writer, &format!("DELE {path}")).await?;
1543        let resp = self.read_and_record().await?;
1544        if !resp.is_complete() {
1545            return Err(Error::Http(format!("FTP DELE failed: {} {}", resp.code, resp.message)));
1546        }
1547        Ok(())
1548    }
1549
1550    /// Rename a file or directory (RNFR + RNTO).
1551    ///
1552    /// # Errors
1553    ///
1554    /// Returns an error if the rename fails.
1555    pub async fn rename(&mut self, from: &str, to: &str) -> Result<(), Error> {
1556        send_command(&mut self.writer, &format!("RNFR {from}")).await?;
1557        let rnfr_resp = self.read_and_record().await?;
1558        if !rnfr_resp.is_intermediate() {
1559            return Err(Error::Http(format!(
1560                "FTP RNFR failed: {} {}",
1561                rnfr_resp.code, rnfr_resp.message
1562            )));
1563        }
1564
1565        send_command(&mut self.writer, &format!("RNTO {to}")).await?;
1566        let rnto_resp = self.read_and_record().await?;
1567        if !rnto_resp.is_complete() {
1568            return Err(Error::Http(format!(
1569                "FTP RNTO failed: {} {}",
1570                rnto_resp.code, rnto_resp.message
1571            )));
1572        }
1573        Ok(())
1574    }
1575
1576    /// Send a SITE command.
1577    ///
1578    /// # Errors
1579    ///
1580    /// Returns an error if the SITE command fails.
1581    pub async fn site(&mut self, command: &str) -> Result<FtpResponse, Error> {
1582        send_command(&mut self.writer, &format!("SITE {command}")).await?;
1583        self.read_and_record().await
1584    }
1585
1586    /// Print the current working directory (PWD).
1587    ///
1588    /// # Errors
1589    ///
1590    /// Returns an error if the PWD command fails.
1591    pub async fn pwd(&mut self) -> Result<String, Error> {
1592        send_command(&mut self.writer, "PWD").await?;
1593        let resp = self.read_and_record().await?;
1594        if !resp.is_complete() {
1595            return Err(Error::Http(format!("FTP PWD failed: {} {}", resp.code, resp.message)));
1596        }
1597        // PWD returns: 257 "/current/dir"
1598        // Extract the path from between quotes
1599        if let Some(start) = resp.message.find('"') {
1600            if let Some(end) = resp.message[start + 1..].find('"') {
1601                return Ok(resp.message[start + 1..start + 1 + end].to_string());
1602            }
1603        }
1604        Ok(resp.message)
1605    }
1606
1607    /// Change working directory (CWD).
1608    ///
1609    /// # Errors
1610    ///
1611    /// Returns an error if the directory change fails.
1612    pub async fn cwd(&mut self, path: &str) -> Result<(), Error> {
1613        send_command(&mut self.writer, &format!("CWD {path}")).await?;
1614        let resp = self.read_and_record().await?;
1615        if !resp.is_complete() {
1616            return Err(Error::Http(format!("FTP CWD failed: {} {}", resp.code, resp.message)));
1617        }
1618        Ok(())
1619    }
1620
1621    /// Navigate to the directory containing a file and return the effective
1622    /// filename for RETR/STOR, according to the configured `FtpMethod`.
1623    ///
1624    /// - `NoCwd`: returns the full path unchanged (no CWD commands).
1625    /// - `SingleCwd`: issues one CWD to the directory portion, returns the filename.
1626    /// - `MultiCwd`: issues CWD for each path component, returns the filename.
1627    ///
1628    /// # Errors
1629    ///
1630    /// Returns an error if any CWD command fails.
1631    #[allow(dead_code)]
1632    async fn navigate_to_path(&mut self, path: &str) -> Result<String, Error> {
1633        match self.config.method {
1634            FtpMethod::NoCwd => Ok(path.to_string()),
1635            FtpMethod::SingleCwd => {
1636                if let Some((dir, file)) = path.rsplit_once('/') {
1637                    if !dir.is_empty() {
1638                        self.cwd(dir).await?;
1639                    }
1640                    Ok(file.to_string())
1641                } else {
1642                    Ok(path.to_string())
1643                }
1644            }
1645            FtpMethod::MultiCwd => {
1646                if let Some((dir, file)) = path.rsplit_once('/') {
1647                    for component in dir.split('/') {
1648                        if !component.is_empty() {
1649                            self.cwd(component).await?;
1650                        }
1651                    }
1652                    Ok(file.to_string())
1653                } else {
1654                    Ok(path.to_string())
1655                }
1656            }
1657        }
1658    }
1659
1660    /// Create missing directories on the server for the given path.
1661    ///
1662    /// Tries MKD for each component. If CWD succeeds, the directory
1663    /// already exists. If CWD fails, MKD is attempted before retrying CWD.
1664    /// After creating directories, CWDs back to `/` so subsequent
1665    /// commands use absolute paths.
1666    ///
1667    /// # Errors
1668    ///
1669    /// Returns an error if a directory cannot be created.
1670    #[allow(dead_code)]
1671    async fn create_dirs(&mut self, dir_path: &str) -> Result<(), Error> {
1672        for component in dir_path.split('/') {
1673            if component.is_empty() {
1674                continue;
1675            }
1676            // Try CWD first — the directory may already exist
1677            send_command(&mut self.writer, &format!("CWD {component}")).await?;
1678            let cwd_resp = self.read_and_record().await?;
1679            if cwd_resp.is_complete() {
1680                continue;
1681            }
1682            // CWD failed — try MKD then CWD again
1683            send_command(&mut self.writer, &format!("MKD {component}")).await?;
1684            let mkd_resp = self.read_and_record().await?;
1685            if !mkd_resp.is_complete() {
1686                return Err(Error::Http(format!(
1687                    "FTP MKD failed for '{}': {} {}",
1688                    component, mkd_resp.code, mkd_resp.message
1689                )));
1690            }
1691            send_command(&mut self.writer, &format!("CWD {component}")).await?;
1692            let retry_resp = self.read_and_record().await?;
1693            if !retry_resp.is_complete() {
1694                return Err(Error::Http(format!(
1695                    "FTP CWD failed after MKD for '{}': {} {}",
1696                    component, retry_resp.code, retry_resp.message
1697                )));
1698            }
1699        }
1700        // CWD back to root so we don't affect subsequent absolute path commands
1701        let _ = self.cwd("/").await;
1702        Ok(())
1703    }
1704
1705    /// Close the FTP session (QUIT).
1706    ///
1707    /// Sends QUIT and reads the response. Errors are ignored since
1708    /// we're closing the connection anyway.
1709    ///
1710    /// # Errors
1711    ///
1712    /// Returns an error if sending the QUIT command fails.
1713    pub async fn quit(&mut self) -> Result<(), Error> {
1714        let _ = send_command(&mut self.writer, "QUIT").await;
1715        // Read the QUIT response with a short timeout (server may close
1716        // connection or exit before responding). Ignore all errors.
1717        let _ = tokio::time::timeout(
1718            std::time::Duration::from_millis(500),
1719            read_response(&mut self.reader),
1720        )
1721        .await;
1722        Ok(())
1723    }
1724
1725    /// Check if this session can be reused for the given host, port, and user.
1726    #[must_use]
1727    pub fn can_reuse(&self, host: &str, port: u16, user: &str) -> bool {
1728        self.hostname == host && self.port == port && self.user == user
1729    }
1730
1731    /// Take the accumulated CONNECT response bytes (for HTTP tunnel output).
1732    pub fn take_connect_response_bytes(&mut self) -> Vec<u8> {
1733        std::mem::take(&mut self.connect_response_bytes)
1734    }
1735}
1736
1737/// Read an FTP response (potentially multi-line) from the control connection.
1738///
1739/// Multi-line responses start with `code-` and end with `code ` (space).
1740///
1741/// # Errors
1742///
1743/// Returns an error if the response is malformed or the connection drops.
1744pub async fn read_response<S: AsyncRead + Unpin>(
1745    stream: &mut BufReader<S>,
1746) -> Result<FtpResponse, Error> {
1747    let mut full_message = String::new();
1748    let mut final_code: Option<u16> = None;
1749    let mut raw_bytes = Vec::new();
1750
1751    loop {
1752        let mut line = String::new();
1753        let bytes_read = stream
1754            .read_line(&mut line)
1755            .await
1756            .map_err(|e| Error::Http(format!("FTP read error: {e}")))?;
1757
1758        if bytes_read == 0 {
1759            return Err(Error::Http("FTP connection closed unexpectedly".to_string()));
1760        }
1761
1762        // Capture the raw line, normalizing to CRLF for dump-header output.
1763        if line.ends_with("\r\n") {
1764            raw_bytes.extend_from_slice(line.as_bytes());
1765        } else if line.ends_with('\n') {
1766            raw_bytes.extend_from_slice(&line.as_bytes()[..line.len() - 1]);
1767            raw_bytes.extend_from_slice(b"\r\n");
1768        } else {
1769            raw_bytes.extend_from_slice(line.as_bytes());
1770            raw_bytes.extend_from_slice(b"\r\n");
1771        }
1772
1773        let line = line.trim_end_matches('\n').trim_end_matches('\r');
1774
1775        if line.len() < 4 {
1776            // Lines shorter than "NNN " aren't valid FTP responses
1777            full_message.push_str(line);
1778            full_message.push('\n');
1779            continue;
1780        }
1781
1782        let code_str = &line[..3];
1783        let separator = line.as_bytes().get(3).copied();
1784
1785        if let Ok(code) = code_str.parse::<u16>() {
1786            match separator {
1787                Some(b' ') => {
1788                    // Final line of response
1789                    let msg = &line[4..];
1790                    full_message.push_str(msg);
1791                    final_code = Some(code);
1792                    break;
1793                }
1794                Some(b'-') => {
1795                    // Multi-line response continues
1796                    let msg = &line[4..];
1797                    full_message.push_str(msg);
1798                    full_message.push('\n');
1799                    if final_code.is_none() {
1800                        final_code = Some(code);
1801                    }
1802                }
1803                _ => {
1804                    // Not a code line, just accumulate
1805                    full_message.push_str(line);
1806                    full_message.push('\n');
1807                }
1808            }
1809        } else {
1810            // Not a code line, just accumulate
1811            full_message.push_str(line);
1812            full_message.push('\n');
1813        }
1814    }
1815
1816    let code =
1817        final_code.ok_or_else(|| Error::Http("FTP response has no status code".to_string()))?;
1818
1819    Ok(FtpResponse { code, message: full_message, raw_bytes })
1820}
1821
1822/// Send an FTP command on the control connection.
1823///
1824/// # Errors
1825///
1826/// Returns an error if the write fails.
1827pub async fn send_command<S: AsyncWrite + Unpin>(
1828    stream: &mut S,
1829    command: &str,
1830) -> Result<(), Error> {
1831    let cmd = format!("{command}\r\n");
1832    stream
1833        .write_all(cmd.as_bytes())
1834        .await
1835        .map_err(|e| Error::Http(format!("FTP write error: {e}")))?;
1836    stream.flush().await.map_err(|e| Error::Http(format!("FTP flush error: {e}")))?;
1837    Ok(())
1838}
1839
1840/// Parse PASV response to extract IP and port.
1841///
1842/// PASV response format: `227 Entering Passive Mode (h1,h2,h3,h4,p1,p2)`
1843///
1844/// # Errors
1845///
1846/// Returns an error if the response cannot be parsed.
1847pub fn parse_pasv_response(message: &str) -> Result<(String, u16), Error> {
1848    // Find the parenthesized address
1849    let start = message.find('(').ok_or_else(|| Error::Transfer {
1850        code: 14,
1851        message: "PASV response missing address".to_string(),
1852    })?;
1853    let end = message.find(')').ok_or_else(|| Error::Transfer {
1854        code: 14,
1855        message: "PASV response missing closing paren".to_string(),
1856    })?;
1857
1858    let nums: Vec<u16> =
1859        message[start + 1..end].split(',').filter_map(|s| s.trim().parse().ok()).collect();
1860
1861    if nums.len() != 6 {
1862        return Err(Error::Transfer {
1863            code: 14,
1864            message: format!("PASV response has {} numbers, expected 6", nums.len()),
1865        });
1866    }
1867
1868    // Validate IP octets are in range 0-255 (curl compat: test 237)
1869    if nums[0] > 255 || nums[1] > 255 || nums[2] > 255 || nums[3] > 255 {
1870        return Err(Error::Transfer {
1871            code: 14,
1872            message: format!(
1873                "PASV response has invalid IP: {}.{}.{}.{}",
1874                nums[0], nums[1], nums[2], nums[3]
1875            ),
1876        });
1877    }
1878
1879    // Validate port octets are in range 0-255
1880    if nums[4] > 255 || nums[5] > 255 {
1881        return Err(Error::Transfer {
1882            code: 14,
1883            message: format!("PASV response has invalid port values: {},{}", nums[4], nums[5]),
1884        });
1885    }
1886
1887    let host = format!("{}.{}.{}.{}", nums[0], nums[1], nums[2], nums[3]);
1888    let port = nums[4] * 256 + nums[5];
1889
1890    Ok((host, port))
1891}
1892
1893/// Parse EPSV response to extract port.
1894///
1895/// EPSV response format: `229 Entering Extended Passive Mode (|||port|)`
1896///
1897/// # Errors
1898///
1899/// Returns an error if the response cannot be parsed.
1900pub fn parse_epsv_response(message: &str) -> Result<u16, Error> {
1901    // Find the port between ||| and |
1902    let start = message.find("|||").ok_or_else(|| Error::Transfer {
1903        code: 13,
1904        message: "EPSV response missing port delimiter".to_string(),
1905    })?;
1906    let rest = &message[start + 3..];
1907    let end = rest.find('|').ok_or_else(|| Error::Transfer {
1908        code: 13,
1909        message: "EPSV response missing closing delimiter".to_string(),
1910    })?;
1911
1912    // Parse as u32 first to detect out-of-range ports (curl compat: test 238)
1913    let port_num: u32 = rest[..end].parse().map_err(|e| Error::Transfer {
1914        code: 13,
1915        message: format!("EPSV port parse error: {e}"),
1916    })?;
1917
1918    if port_num == 0 || port_num > 65535 {
1919        return Err(Error::Transfer {
1920            code: 13,
1921            message: format!("EPSV port out of range: {port_num}"),
1922        });
1923    }
1924
1925    #[allow(clippy::cast_possible_truncation)]
1926    Ok(port_num as u16)
1927}
1928
1929/// Parse FEAT response into feature list.
1930///
1931/// # Errors
1932///
1933/// Returns an error if parsing fails.
1934#[must_use]
1935pub fn parse_feat_response(message: &str) -> FtpFeatures {
1936    let mut features = FtpFeatures::default();
1937    for line in message.lines() {
1938        let feature = line.trim().to_uppercase();
1939        if feature.starts_with("EPSV") {
1940            features.epsv = true;
1941        } else if feature.starts_with("MLST") {
1942            features.mlst = true;
1943        } else if feature.starts_with("REST") && feature.contains("STREAM") {
1944            features.rest_stream = true;
1945        } else if feature.starts_with("SIZE") {
1946            features.size = true;
1947        } else if feature.starts_with("UTF8") {
1948            features.utf8 = true;
1949        } else if feature.starts_with("AUTH") && feature.contains("TLS") {
1950            features.auth_tls = true;
1951        }
1952        if !feature.is_empty() {
1953            features.raw.push(line.trim().to_string());
1954        }
1955    }
1956    features
1957}
1958
1959/// Format a PORT command for active mode FTP (IPv4).
1960///
1961/// PORT h1,h2,h3,h4,p1,p2 where h1-h4 are IP octets and
1962/// p1=port/256, p2=port%256.
1963#[must_use]
1964pub fn format_port_command(addr: &SocketAddr) -> String {
1965    match addr.ip() {
1966        std::net::IpAddr::V4(ip) => {
1967            let octets = ip.octets();
1968            let port = addr.port();
1969            format!(
1970                "PORT {},{},{},{},{},{}",
1971                octets[0],
1972                octets[1],
1973                octets[2],
1974                octets[3],
1975                port / 256,
1976                port % 256
1977            )
1978        }
1979        std::net::IpAddr::V6(_) => {
1980            // PORT doesn't support IPv6; use EPRT instead
1981            format_eprt_command(addr)
1982        }
1983    }
1984}
1985
1986/// Format an EPRT command for active mode FTP (IPv4 and IPv6).
1987///
1988/// EPRT |net-prt|net-addr|tcp-port| where net-prt is 1 (IPv4) or 2 (IPv6).
1989#[must_use]
1990pub fn format_eprt_command(addr: &SocketAddr) -> String {
1991    let (proto, ip_str) = match addr.ip() {
1992        std::net::IpAddr::V4(ip) => (1, ip.to_string()),
1993        std::net::IpAddr::V6(ip) => (2, ip.to_string()),
1994    };
1995    format!("EPRT |{proto}|{ip_str}|{}|", addr.port())
1996}
1997
1998/// Connect an FTP session with the appropriate TLS mode.
1999///
2000/// Helper that dispatches to `FtpSession::connect` or `connect_with_tls`
2001/// based on the SSL mode.
2002#[allow(clippy::too_many_arguments)]
2003async fn connect_session(
2004    host: &str,
2005    port: u16,
2006    user: &str,
2007    pass: &str,
2008    ssl_mode: FtpSslMode,
2009    use_ssl: UseSsl,
2010    tls_config: &crate::tls::TlsConfig,
2011    config: FtpConfig,
2012    proxy: Option<FtpProxyConfig>,
2013) -> Result<FtpSession, Error> {
2014    match ssl_mode {
2015        FtpSslMode::None => {
2016            FtpSession::connect_maybe_proxy(host, port, user, pass, config, proxy).await
2017        }
2018        #[cfg(feature = "rustls")]
2019        _ => {
2020            // TODO: FTPS through proxy not yet supported
2021            if proxy.is_some() {
2022                return Err(Error::Http("FTPS through proxy is not yet supported".to_string()));
2023            }
2024            FtpSession::connect_with_tls(
2025                host, port, user, pass, ssl_mode, use_ssl, tls_config, config,
2026            )
2027            .await
2028        }
2029        #[cfg(not(feature = "rustls"))]
2030        _ => {
2031            let _ = (tls_config, config, use_ssl, proxy);
2032            Err(Error::Http("FTPS requires the 'rustls' feature".to_string()))
2033        }
2034    }
2035}
2036
2037/// Perform an FTP transfer (download, listing, upload, or HEAD) and return a Response.
2038///
2039/// This is the unified entry point for all FTP operations. The operation is
2040/// determined from the URL path (trailing `/` = listing) and config flags
2041/// (`nobody` = HEAD, upload data = STOR/APPE).
2042///
2043/// The command sequence matches curl's behavior:
2044/// 1. Connect + greeting
2045/// 2. USER / PASS
2046/// 3. PWD
2047/// 4. CWD (per path components, according to `FtpMethod`)
2048/// 5. Pre-quote commands
2049/// 6. EPSV / PASV (or PORT/EPRT for active mode)
2050/// 7. TYPE A or TYPE I
2051/// 8. SIZE (for downloads)
2052/// 9. REST (for resume)
2053/// 10. RETR / LIST / STOR / APPE
2054/// 11. Post-quote commands
2055/// 12. QUIT
2056///
2057/// # Errors
2058///
2059/// Returns errors with specific `Transfer` codes matching curl's exit codes:
2060/// - 9: `CURLE_REMOTE_ACCESS_DENIED` (CWD failed)
2061/// - 13: `CURLE_FTP_WEIRD_PASV_REPLY` (PASV/EPSV failed)
2062/// - 17: `CURLE_FTP_COULDNT_SET_TYPE` (TYPE failed)
2063/// - 19: `CURLE_FTP_COULDNT_RETR_FILE` (RETR/SIZE failed)
2064/// - 25: `CURLE_UPLOAD_FAILED` (STOR/APPE failed)
2065/// - 30: `CURLE_FTP_PORT_FAILED` (PORT/EPRT failed)
2066/// - 36: `CURLE_BAD_DOWNLOAD_RESUME` (resume offset beyond file size)
2067/// - 67: `CURLE_LOGIN_DENIED` (USER/PASS rejected)
2068#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
2069pub async fn perform(
2070    url: &crate::url::Url,
2071    upload_data: Option<&[u8]>,
2072    ssl_mode: FtpSslMode,
2073    use_ssl: UseSsl,
2074    tls_config: &crate::tls::TlsConfig,
2075    resume_from: Option<u64>,
2076    config: &FtpConfig,
2077    credentials: Option<(&str, &str)>,
2078    ftp_session: &mut Option<FtpSession>,
2079    proxy: Option<FtpProxyConfig>,
2080) -> Result<Response, Error> {
2081    let range_end = config.range_end;
2082    let (host, port) = url.host_and_port()?;
2083    let raw_path = url.path();
2084
2085    // Percent-decode the path for FTP
2086    let decoded_path = percent_decode(raw_path);
2087    let path = decoded_path.as_str();
2088
2089    // Use provided credentials, URL credentials, or anonymous with curl-compatible password.
2090    // URL credentials are percent-decoded (test 191: ftp://use%3fr:pass%3fword@host/).
2091    let url_creds = url.credentials();
2092    let decoded_user;
2093    let decoded_pass;
2094    #[allow(clippy::option_if_let_else)]
2095    let (user, pass) = if let Some(creds) = credentials {
2096        creds
2097    } else if let Some((raw_user, raw_pass)) = url_creds {
2098        decoded_user = percent_decode(raw_user);
2099        decoded_pass = percent_decode(raw_pass);
2100        (decoded_user.as_str(), decoded_pass.as_str())
2101    } else {
2102        ("anonymous", "ftp@example.com")
2103    };
2104
2105    // Determine if this is a directory listing (path ends with '/')
2106    let is_dir_list = path.ends_with('/') && upload_data.is_none();
2107
2108    // Parse ;type=A or ;type=I from path (RFC 1738 FTP URL type)
2109    let (effective_path, type_override) = parse_ftp_type(path);
2110
2111    // Reuse existing session if compatible, otherwise create a new one.
2112    let is_reuse = if let Some(existing) = ftp_session.take() {
2113        if existing.can_reuse(&host, port, user) {
2114            *ftp_session = Some(existing);
2115            true
2116        } else {
2117            // Different host/port/user — quit old session
2118            let mut old = existing;
2119            let _ = old.quit().await;
2120            drop(old);
2121            false
2122        }
2123    } else {
2124        false
2125    };
2126
2127    if !is_reuse {
2128        let new_session = connect_session(
2129            &host,
2130            port,
2131            user,
2132            pass,
2133            ssl_mode,
2134            use_ssl,
2135            tls_config,
2136            config.clone(),
2137            proxy,
2138        )
2139        .await?;
2140        *ftp_session = Some(new_session);
2141    }
2142
2143    // At this point ftp_session is always Some (set in the block above).
2144    let Some(session) = ftp_session.as_mut() else {
2145        return Err(Error::Http("internal: FTP session missing".to_string()));
2146    };
2147    let result = perform_inner(
2148        session,
2149        url,
2150        upload_data,
2151        resume_from,
2152        config,
2153        is_reuse,
2154        effective_path,
2155        type_override,
2156        is_dir_list,
2157        range_end,
2158    )
2159    .await;
2160
2161    // On fatal I/O errors (connection lost), discard the session.
2162    if let Err(ref e) = result {
2163        if is_connection_error(e) {
2164            let _ = ftp_session.take();
2165        }
2166    }
2167    // On URL parse errors (e.g., null byte), discard session without QUIT (curl compat: test 340)
2168    if matches!(&result, Err(Error::UrlParse(_))) {
2169        let _ = ftp_session.take();
2170    }
2171    // On certain transfer errors, discard without QUIT (curl compat):
2172    // - 14 (CURLE_FTP_WEIRD_227_FORMAT): bad PASV response (test 237)
2173    // - 28 (CURLE_OPERATION_TIMEDOUT): server timeout (test 1120)
2174    if let Err(Error::Transfer { code, .. }) = &result {
2175        if matches!(code, 14 | 28 | 84) {
2176            let _ = ftp_session.take();
2177        }
2178    }
2179    // On partial file (body_error set), discard the session without QUIT
2180    // because the control connection may be in an indeterminate state
2181    // (curl compat: test 161 — no QUIT after premature data end).
2182    if let Ok(ref resp) = result {
2183        if resp.body_error().is_some() {
2184            let _ = ftp_session.take();
2185        }
2186    }
2187
2188    result
2189}
2190
2191/// Check if an error indicates the FTP control connection is dead.
2192const fn is_connection_error(e: &Error) -> bool {
2193    matches!(e, Error::Connect(_) | Error::Io(_))
2194}
2195
2196/// Send TYPE command only if the session's current type differs from the requested type.
2197///
2198/// Avoids redundant TYPE commands when reusing a session (curl compat: tests 210, 215, 216).
2199async fn send_type_if_needed(
2200    session: &mut FtpSession,
2201    transfer_type: TransferType,
2202) -> Result<(), Error> {
2203    if session.current_type == Some(transfer_type) {
2204        return Ok(());
2205    }
2206    let cmd = match transfer_type {
2207        TransferType::Ascii => "TYPE A",
2208        TransferType::Binary => "TYPE I",
2209    };
2210    send_command(&mut session.writer, cmd).await?;
2211    let resp = session.read_and_record().await?;
2212    if !resp.is_complete() {
2213        return Err(Error::Transfer {
2214            code: 17,
2215            message: format!("FTP TYPE failed: {} {}", resp.code, resp.message),
2216        });
2217    }
2218    session.current_type = Some(transfer_type);
2219    Ok(())
2220}
2221
2222/// Execute a list of FTP quote commands on the session.
2223///
2224/// Commands prefixed with `*` have their failure ignored ("best effort").
2225/// Other commands fail the transfer with `CURLE_QUOTE_ERROR` (21) on non-2xx response.
2226async fn execute_quote_commands(
2227    session: &mut FtpSession,
2228    commands: &[String],
2229) -> Result<(), Error> {
2230    for raw_cmd in commands {
2231        // Strip `*` prefix: means "ignore failure" (curl compat: test 227)
2232        #[allow(clippy::option_if_let_else)]
2233        let (ignore_fail, actual_cmd) = if let Some(stripped) = raw_cmd.strip_prefix('*') {
2234            (true, stripped)
2235        } else {
2236            (false, raw_cmd.as_str())
2237        };
2238        send_command(&mut session.writer, actual_cmd).await?;
2239        let resp = session.read_and_record().await?;
2240        if !ignore_fail && !resp.is_complete() && !resp.is_preliminary() {
2241            return Err(Error::Transfer {
2242                code: 21,
2243                message: format!(
2244                    "FTP quote command '{}' failed: {} {}",
2245                    actual_cmd, resp.code, resp.message
2246                ),
2247            });
2248        }
2249    }
2250    Ok(())
2251}
2252
2253/// Inner FTP transfer logic, operating on a borrowed session.
2254///
2255/// The session is guaranteed to be logged in. On reuse, CWD navigation
2256/// is optimized to avoid redundant commands.
2257#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
2258async fn perform_inner(
2259    session: &mut FtpSession,
2260    url: &crate::url::Url,
2261    upload_data: Option<&[u8]>,
2262    resume_from: Option<u64>,
2263    config: &FtpConfig,
2264    is_reuse: bool,
2265    effective_path: &str,
2266    type_override: Option<TransferType>,
2267    is_dir_list: bool,
2268    range_end: Option<u64>,
2269) -> Result<Response, Error> {
2270    if !is_reuse {
2271        // PWD after login (curl always sends this)
2272        let pwd_path = session.pwd_safe().await;
2273        session.home_dir.clone_from(&pwd_path);
2274
2275        // SYST: detect OS/400 and send SITE NAMEFMT 1.
2276        // curl only sends SYST when PWD succeeds AND the path does NOT start with '/'
2277        // (curl compat: tests 1102, 1103; no SYST on PWD failure: test 124).
2278        let pwd_not_slash = pwd_path.as_ref().is_some_and(|p| !p.starts_with('/'));
2279        if pwd_not_slash {
2280            send_command(&mut session.writer, "SYST").await?;
2281            let syst_resp = session.read_and_record().await?;
2282            if syst_resp.is_complete() && syst_resp.message.contains("OS/400") {
2283                // OS/400: switch to Unix-style naming format
2284                send_command(&mut session.writer, "SITE NAMEFMT 1").await?;
2285                let _site_resp = session.read_and_record().await?;
2286                // Re-issue PWD after format change
2287                let _pwd2 = session.pwd_safe().await;
2288            }
2289        }
2290    }
2291
2292    // Reject null bytes in decoded FTP path (curl compat: test 340)
2293    if effective_path.contains('\0') {
2294        return Err(Error::UrlParse("FTP path contains null byte".to_string()));
2295    }
2296
2297    // Navigate to directory via CWD commands
2298    let (dir_components, filename) = if is_dir_list {
2299        if config.method == FtpMethod::NoCwd {
2300            // NoCwd: no CWD at all, path goes into the LIST command
2301            (Vec::new(), String::new())
2302        } else {
2303            // For listings, the entire path is the directory
2304            let trimmed = effective_path.trim_start_matches('/');
2305            let trimmed = trimmed.trim_end_matches('/');
2306            if trimmed.is_empty() {
2307                // Root directory listing.
2308                // ftp://host/ (single slash) = relative root, no CWD needed (test 101)
2309                // ftp://host// (double slash) = absolute path to /, CWD / needed (tests 350, 352)
2310                if effective_path.starts_with("//") {
2311                    (vec!["/"], String::new())
2312                } else {
2313                    (Vec::new(), String::new())
2314                }
2315            } else {
2316                let components: Vec<&str> = trimmed.split('/').collect();
2317                (components, String::new())
2318            }
2319        }
2320    } else {
2321        // For file operations, split directory from filename
2322        split_path_for_method(effective_path, config.method)
2323    };
2324
2325    // Pre-quote commands (sent before CWD; curl compat: test 754)
2326    execute_quote_commands(session, &config.pre_quote).await?;
2327
2328    // For connection reuse: check if we need to change directories.
2329    // If target dir matches current dir, skip all CWD commands.
2330    // If different, CWD / to reset then navigate to target.
2331    let target_dir: Vec<String> =
2332        dir_components.iter().filter(|c| !c.is_empty()).map(ToString::to_string).collect();
2333
2334    let need_cwd = if is_reuse {
2335        target_dir != session.current_dir
2336    } else {
2337        // Fresh connection: always navigate (unless dir is empty)
2338        !target_dir.is_empty()
2339    };
2340
2341    if need_cwd {
2342        // On reuse: reset to home directory first, then navigate
2343        // curl CWDs to the PWD path (home dir) instead of "/" (test 1217)
2344        let did_reset_to_root = if is_reuse && !session.current_dir.is_empty() {
2345            let reset_dir = session.home_dir.clone().unwrap_or_else(|| "/".to_string());
2346            send_command(&mut session.writer, &format!("CWD {reset_dir}")).await?;
2347            let cwd_resp = session.read_and_record().await?;
2348            if !cwd_resp.is_complete() {
2349                return Err(Error::Transfer {
2350                    code: 9,
2351                    message: format!(
2352                        "FTP CWD {reset_dir} failed: {} {}",
2353                        cwd_resp.code, cwd_resp.message
2354                    ),
2355                });
2356            }
2357            session.current_dir.clear();
2358            true
2359        } else {
2360            false
2361        };
2362
2363        // Perform CWD navigation
2364        for component in &dir_components {
2365            if component.is_empty() {
2366                continue;
2367            }
2368            // Skip "/" component if we already reset to root (avoid duplicate CWD /)
2369            if *component == "/" && did_reset_to_root {
2370                continue;
2371            }
2372            send_command(&mut session.writer, &format!("CWD {component}")).await?;
2373            let cwd_resp = session.read_and_record().await?;
2374            if !cwd_resp.is_complete() {
2375                if config.create_dirs {
2376                    // --ftp-create-dirs: try MKD then retry CWD
2377                    send_command(&mut session.writer, &format!("MKD {component}")).await?;
2378                    let _mkd_resp = session.read_and_record().await?;
2379                    // Always retry CWD after MKD, even if MKD failed (curl compat)
2380                    send_command(&mut session.writer, &format!("CWD {component}")).await?;
2381                    let retry_resp = session.read_and_record().await?;
2382                    if !retry_resp.is_complete() {
2383                        return Err(Error::Transfer {
2384                            code: 9,
2385                            message: format!(
2386                                "FTP CWD failed after MKD: {} {}",
2387                                retry_resp.code, retry_resp.message
2388                            ),
2389                        });
2390                    }
2391                } else if cwd_resp.code == 421 {
2392                    // 421 = Service not available / timeout. Don't send QUIT —
2393                    // the server is closing the connection (curl compat: test 1120).
2394                    return Err(Error::Transfer {
2395                        code: 28,
2396                        message: format!(
2397                            "FTP server timeout: {} {}",
2398                            cwd_resp.code, cwd_resp.message
2399                        ),
2400                    });
2401                } else {
2402                    return Err(Error::Transfer {
2403                        code: 9,
2404                        message: format!("FTP CWD failed: {} {}", cwd_resp.code, cwd_resp.message),
2405                    });
2406                }
2407            }
2408        }
2409        session.current_dir = target_dir;
2410    }
2411
2412    // HEAD/nobody mode: only get file metadata, no data transfer.
2413    // For directory listings (-I on a directory), just return after CWD (curl compat: test 1000).
2414    if config.nobody {
2415        if is_dir_list {
2416            let raw = std::mem::take(&mut session.header_bytes);
2417            let headers = std::collections::HashMap::new();
2418            let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2419            resp.set_raw_headers(raw);
2420            return Ok(resp);
2421        }
2422        let mut last_modified: Option<String> = None;
2423        let mut content_length: Option<String> = None;
2424
2425        // MDTM (modification time)
2426        if !filename.is_empty() {
2427            send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
2428            let mdtm_resp = session.read_and_record().await?;
2429            if mdtm_resp.is_complete() {
2430                let mdtm_str = mdtm_resp.message.trim();
2431                if let Some(date) = format_mdtm_as_http_date(mdtm_str) {
2432                    last_modified = Some(date);
2433                }
2434            }
2435        }
2436
2437        // TYPE I for SIZE
2438        send_type_if_needed(session, TransferType::Binary).await?;
2439
2440        // SIZE
2441        if !filename.is_empty() {
2442            send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
2443            let size_resp = session.read_and_record().await?;
2444            if size_resp.is_complete() {
2445                content_length = Some(size_resp.message.trim().to_string());
2446            }
2447        }
2448
2449        // REST 0 (curl sends this in HEAD mode)
2450        send_command(&mut session.writer, "REST 0").await?;
2451        let _rest_resp = session.read_and_record().await?;
2452
2453        let raw = std::mem::take(&mut session.header_bytes);
2454
2455        // Build FTP HEAD output as pseudo-HTTP headers (curl compat)
2456        let mut body_text = String::new();
2457        if let Some(ref lm) = last_modified {
2458            body_text.push_str("Last-Modified: ");
2459            body_text.push_str(lm);
2460            body_text.push_str("\r\n");
2461        }
2462        if let Some(ref cl) = content_length {
2463            body_text.push_str("Content-Length: ");
2464            body_text.push_str(cl);
2465            body_text.push_str("\r\n");
2466        }
2467        body_text.push_str("Accept-ranges: bytes\r\n");
2468
2469        let mut headers = std::collections::HashMap::new();
2470        if let Some(ref cl) = content_length {
2471            let _old = headers.insert("content-length".to_string(), cl.clone());
2472        }
2473        if let Some(ref lm) = last_modified {
2474            let _old = headers.insert("last-modified".to_string(), lm.clone());
2475        }
2476        let mut resp =
2477            Response::new(200, headers, body_text.into_bytes(), url.as_str().to_string());
2478        resp.set_raw_headers(raw);
2479        return Ok(resp);
2480    }
2481
2482    // For uploads
2483    if let Some(upload_bytes) = upload_data {
2484        // FTP upload time condition (-z): check MDTM before uploading (curl compat: tests 247, 248)
2485        if let Some((cond_ts, negate)) = config.time_condition {
2486            send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
2487            let mdtm_resp = session.read_and_record().await?;
2488            if mdtm_resp.is_complete() {
2489                let mdtm_str = mdtm_resp.message.trim();
2490                if let Some(file_ts) = parse_mdtm_timestamp(mdtm_str) {
2491                    let should_skip = if negate { file_ts >= cond_ts } else { file_ts <= cond_ts };
2492                    if should_skip {
2493                        let raw = std::mem::take(&mut session.header_bytes);
2494                        let headers = std::collections::HashMap::new();
2495                        let mut resp =
2496                            Response::new(200, headers, Vec::new(), url.as_str().to_string());
2497                        resp.set_raw_headers(raw);
2498                        return Ok(resp);
2499                    }
2500                }
2501            }
2502        }
2503        // Determine upload resume behavior:
2504        // - resume_from == Some(0): auto-resume (-C -), need SIZE to discover offset
2505        // - resume_from == Some(N), N > 0: explicit offset (-C N), skip N bytes, APPE
2506        // - resume_from == None: no resume, plain STOR (or APPE if --append)
2507        let is_auto_resume = resume_from == Some(0);
2508        let explicit_offset = resume_from.filter(|&o| o > 0);
2509
2510        // For explicit offset: compute upload data and APPE flag immediately (no SIZE needed)
2511        // For auto-resume: defer until after SIZE
2512        let (mut effective_upload_data, mut use_appe) = if let Some(offset) = explicit_offset {
2513            #[allow(clippy::cast_possible_truncation)]
2514            let offset_usize = offset as usize;
2515            if offset_usize >= upload_bytes.len() {
2516                // Upload resume beyond file size: send EPSV + TYPE I then return
2517                // (curl sends these commands even when nothing to upload)
2518                let _ = session.open_data_connection().await;
2519                send_type_if_needed(session, TransferType::Binary).await?;
2520                let raw = std::mem::take(&mut session.header_bytes);
2521                let headers = std::collections::HashMap::new();
2522                let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2523                resp.set_raw_headers(raw);
2524                return Ok(resp);
2525            }
2526            (&upload_bytes[offset_usize..], true)
2527        } else {
2528            (upload_bytes, config.append)
2529        };
2530
2531        // Open data connection (with PRET for --ftp-pret)
2532        let pret_cmd =
2533            if config.append { format!("APPE {filename}") } else { format!("STOR {filename}") };
2534        let data_conn_result = session.open_data_connection_with_pret(&pret_cmd).await;
2535        let data_conn = match data_conn_result {
2536            Ok(s) => s,
2537            Err(e) => {
2538                return Err(e);
2539            }
2540        };
2541        let mut data_stream = data_conn.into_stream(session, None).await?;
2542
2543        // TYPE command for upload: respect ;type=a URL suffix (curl compat: tests 475, 476)
2544        let upload_type = match type_override {
2545            Some(TransferType::Ascii) => TransferType::Ascii,
2546            _ => TransferType::Binary,
2547        };
2548        send_type_if_needed(session, upload_type).await?;
2549
2550        // SIZE for auto-resume (-C -): determine remote file size to compute offset
2551        // (curl compat: tests 1038, 1039). Skip SIZE for explicit offset (test 112).
2552        if is_auto_resume {
2553            send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
2554            let size_resp = session.read_and_record().await?;
2555            if size_resp.is_complete() {
2556                if let Ok(remote_size) = size_resp.message.trim().parse::<u64>() {
2557                    if remote_size > 0 {
2558                        #[allow(clippy::cast_possible_truncation)]
2559                        let skip = remote_size as usize;
2560                        if skip >= upload_bytes.len() {
2561                            // Remote file is same size or larger — nothing to upload
2562                            drop(data_stream);
2563                            let raw = std::mem::take(&mut session.header_bytes);
2564                            let headers = std::collections::HashMap::new();
2565                            let mut resp =
2566                                Response::new(200, headers, Vec::new(), url.as_str().to_string());
2567                            resp.set_raw_headers(raw);
2568                            return Ok(resp);
2569                        }
2570                        effective_upload_data = &upload_bytes[skip..];
2571                        use_appe = true;
2572                    }
2573                    // remote_size == 0: use STOR with full data (use_appe stays false)
2574                }
2575            }
2576            // SIZE failed: file doesn't exist, use STOR with full data
2577        }
2578
2579        // STOR or APPE
2580        let stor_cmd =
2581            if use_appe { format!("APPE {filename}") } else { format!("STOR {filename}") };
2582        send_command(&mut session.writer, &stor_cmd).await?;
2583        let stor_resp = session.read_and_record().await?;
2584        if !stor_resp.is_preliminary() && !stor_resp.is_complete() {
2585            return Err(Error::Transfer {
2586                code: 25,
2587                message: format!("FTP STOR/APPE failed: {} {}", stor_resp.code, stor_resp.message),
2588            });
2589        }
2590
2591        // Write data, converting LF to CRLF for --crlf or ;type=a (ASCII mode)
2592        let ascii_upload = config.crlf || type_override == Some(TransferType::Ascii);
2593        if ascii_upload {
2594            let converted = lf_to_crlf(effective_upload_data);
2595            data_stream
2596                .write_all(&converted)
2597                .await
2598                .map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
2599        } else {
2600            data_stream
2601                .write_all(effective_upload_data)
2602                .await
2603                .map_err(|e| Error::Http(format!("FTP data write error: {e}")))?;
2604        }
2605        data_stream
2606            .shutdown()
2607            .await
2608            .map_err(|e| Error::Http(format!("FTP data shutdown error: {e}")))?;
2609        drop(data_stream);
2610
2611        let complete_resp = session.read_and_record().await?;
2612        if !complete_resp.is_complete() {
2613            // 452/552 = disk full (curl returns CURLE_REMOTE_DISK_FULL = 70)
2614            let code = if complete_resp.code == 452 || complete_resp.code == 552 { 70 } else { 25 };
2615            return Err(Error::Transfer {
2616                code,
2617                message: format!(
2618                    "FTP upload failed: {} {}",
2619                    complete_resp.code, complete_resp.message
2620                ),
2621            });
2622        }
2623
2624        // Post-quote commands
2625        execute_quote_commands(session, &config.post_quote).await?;
2626
2627        let raw = std::mem::take(&mut session.header_bytes);
2628        let headers = std::collections::HashMap::new();
2629        let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2630        resp.set_raw_headers(raw);
2631        return Ok(resp);
2632    }
2633
2634    // Directory listing
2635    if is_dir_list {
2636        // Open data connection (with PRET for --ftp-pret; curl compat: test 1107)
2637        let list_base = if config.list_only { "NLST" } else { "LIST" };
2638        let data_conn_result = session.open_data_connection_with_pret(list_base).await;
2639        let data_conn = match data_conn_result {
2640            Ok(s) => s,
2641            Err(e) => {
2642                return Err(e);
2643            }
2644        };
2645        let mut data_stream = data_conn.into_stream(session, None).await?;
2646
2647        // TYPE A for directory listings (skip if already set)
2648        send_type_if_needed(session, TransferType::Ascii).await?;
2649
2650        // Post-PASV quote commands (after TYPE, before LIST; curl compat: test 754)
2651        execute_quote_commands(session, &config.post_pasv_quote).await?;
2652
2653        // LIST or NLST — for NoCwd, include path in the command (test 351)
2654        let list_base = if config.list_only { "NLST" } else { "LIST" };
2655        let list_cmd = if config.method == FtpMethod::NoCwd {
2656            let path = effective_path.trim_end_matches('/');
2657            // FTP URL path conventions:
2658            //   /path → relative to home (strip leading /, curl compat: test 1149)
2659            //   //path → absolute path (strip one /, keep one, curl compat: test 1010)
2660            let path =
2661                if path.starts_with("//") { &path[1..] } else { path.trim_start_matches('/') };
2662            if path.is_empty() {
2663                format!("{list_base} /")
2664            } else {
2665                format!("{list_base} {path}")
2666            }
2667        } else {
2668            list_base.to_string()
2669        };
2670        send_command(&mut session.writer, &list_cmd).await?;
2671        let list_resp = session.read_and_record().await?;
2672        // 4xx (transient) on NLST/LIST means "no files found" — treat as empty listing,
2673        // not error (curl compat: test 144). 5xx (permanent) is still an error (test 145).
2674        if list_resp.is_negative_transient() {
2675            drop(data_stream);
2676            let raw = std::mem::take(&mut session.header_bytes);
2677            let headers = std::collections::HashMap::new();
2678            let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2679            resp.set_raw_headers(raw);
2680            return Ok(resp);
2681        }
2682        if !list_resp.is_preliminary() && !list_resp.is_complete() {
2683            return Err(Error::Transfer {
2684                code: 19,
2685                message: format!("FTP LIST failed: {} {}", list_resp.code, list_resp.message),
2686            });
2687        }
2688
2689        let mut data = Vec::new();
2690        let _ = data_stream
2691            .read_to_end(&mut data)
2692            .await
2693            .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
2694        drop(data_stream);
2695
2696        // Read 226 Transfer Complete
2697        if list_resp.is_preliminary() {
2698            let complete_resp = session.read_and_record().await?;
2699            if !complete_resp.is_complete() {
2700                return Err(Error::Http(format!(
2701                    "FTP transfer failed: {} {}",
2702                    complete_resp.code, complete_resp.message
2703                )));
2704            }
2705        }
2706
2707        // Post-quote commands
2708        execute_quote_commands(session, &config.post_quote).await?;
2709
2710        let raw = std::mem::take(&mut session.header_bytes);
2711        let mut headers = std::collections::HashMap::new();
2712        let _old = headers.insert("content-length".to_string(), data.len().to_string());
2713        let mut resp = Response::new(200, headers, data, url.as_str().to_string());
2714        resp.set_raw_headers(raw);
2715        return Ok(resp);
2716    }
2717
2718    // File download (RETR)
2719    // Determine transfer type
2720    let transfer_type = type_override.unwrap_or(if config.use_ascii {
2721        TransferType::Ascii
2722    } else {
2723        TransferType::Binary
2724    });
2725    let use_ascii = transfer_type == TransferType::Ascii;
2726
2727    // FTP -z: send MDTM before download to check file modification time
2728    if let Some((cond_ts, negate)) = config.time_condition {
2729        send_command(&mut session.writer, &format!("MDTM {filename}")).await?;
2730        let mdtm_resp = session.read_and_record().await?;
2731        if mdtm_resp.is_complete() {
2732            // Parse MDTM response: "YYYYMMDDHHMMSS"
2733            let mdtm_str = mdtm_resp.message.trim();
2734            if let Some(file_ts) = parse_mdtm_timestamp(mdtm_str) {
2735                let should_skip = if negate {
2736                    // -z -date: download if file is older than date
2737                    file_ts >= cond_ts
2738                } else {
2739                    // -z date: download if file is newer than date
2740                    file_ts <= cond_ts
2741                };
2742                if should_skip {
2743                    let raw = std::mem::take(&mut session.header_bytes);
2744                    let headers = std::collections::HashMap::new();
2745                    let mut resp =
2746                        Response::new(200, headers, Vec::new(), url.as_str().to_string());
2747                    resp.set_raw_headers(raw);
2748                    return Ok(resp);
2749                }
2750            }
2751        }
2752    }
2753
2754    // Open data connection BEFORE TYPE/SIZE (curl sends EPSV/PASV before TYPE)
2755    // Send PRET before EPSV if --ftp-pret is enabled (curl compat: test 1107)
2756    // For active mode, this returns a PendingActive with a listener that hasn't
2757    // accepted yet — accept is deferred until after RETR to detect server errors
2758    // like 425/421 (curl compat: tests 1206, 1207, 1208).
2759    let data_conn_result =
2760        session.open_data_connection_with_pret(&format!("RETR {filename}")).await;
2761    let data_conn = match data_conn_result {
2762        Ok(s) => s,
2763        Err(e) => {
2764            return Err(e);
2765        }
2766    };
2767
2768    // TYPE (skip if already set on this session)
2769    send_type_if_needed(session, transfer_type).await?;
2770
2771    // Post-PASV quote commands (sent after TYPE, before SIZE/RETR; curl compat: test 227)
2772    execute_quote_commands(session, &config.post_pasv_quote).await?;
2773
2774    // SIZE (curl always tries SIZE before RETR for non-ASCII transfers)
2775    // Skip SIZE when --ignore-content-length is set (curl compat: test 1137)
2776    let mut remote_size: Option<u64> = None;
2777    if !use_ascii && !config.ignore_content_length {
2778        send_command(&mut session.writer, &format!("SIZE {filename}")).await?;
2779        let size_resp = session.read_and_record().await?;
2780        if size_resp.is_complete() {
2781            if let Ok(sz) = size_resp.message.trim().parse::<u64>() {
2782                remote_size = Some(sz);
2783            }
2784        }
2785        // SIZE failure is not fatal for download (may fail with 500)
2786    }
2787
2788    // Resolve negative range (-N = last N bytes) against SIZE (curl compat: test 1057)
2789    let mut resume_from = resume_from;
2790    let mut range_end = range_end;
2791    if let Some(from_end) = config.range_from_end {
2792        if let Some(sz) = remote_size {
2793            let offset = sz.saturating_sub(from_end);
2794            resume_from = Some(offset);
2795            // Set range_end so ABOR is sent after reading the last N bytes
2796            range_end = Some(sz.saturating_sub(1));
2797        }
2798    }
2799
2800    // --max-filesize: check SIZE response before RETR (curl compat: test 290)
2801    if let (Some(max_size), Some(sz)) = (config.max_filesize, remote_size) {
2802        if sz > max_size {
2803            drop(data_conn);
2804            return Err(Error::Transfer {
2805                code: 63,
2806                message: format!("Maximum file size exceeded ({sz} > {max_size})"),
2807            });
2808        }
2809    }
2810
2811    // Resume check: if resume offset >= file size, it's an error
2812    if let Some(offset) = resume_from {
2813        if let Some(sz) = remote_size {
2814            if offset > sz {
2815                drop(data_conn);
2816                return Err(Error::Transfer {
2817                    code: 36,
2818                    message: format!("Offset ({offset}) was beyond the end of the file ({sz})"),
2819                });
2820            }
2821            if offset == sz {
2822                // File already fully downloaded
2823                drop(data_conn);
2824                let raw = std::mem::take(&mut session.header_bytes);
2825                let headers = std::collections::HashMap::new();
2826                let mut resp = Response::new(200, headers, Vec::new(), url.as_str().to_string());
2827                resp.set_raw_headers(raw);
2828                return Ok(resp);
2829            }
2830        }
2831
2832        // REST
2833        send_command(&mut session.writer, &format!("REST {offset}")).await?;
2834        let rest_resp = session.read_and_record().await?;
2835        if !rest_resp.is_intermediate() {
2836            drop(data_conn);
2837            return Err(Error::Transfer {
2838                code: 36,
2839                message: format!("FTP REST failed: {} {}", rest_resp.code, rest_resp.message),
2840            });
2841        }
2842    }
2843
2844    // RETR
2845    send_command(&mut session.writer, &format!("RETR {filename}")).await?;
2846    let retr_resp = session.read_and_record().await?;
2847    if !retr_resp.is_preliminary() && !retr_resp.is_complete() {
2848        drop(data_conn);
2849        // 425 = Can't open data connection (active mode) → CURLE_FTP_ACCEPT_FAILED (10)
2850        // 550 = file not found → CURLE_REMOTE_FILE_NOT_FOUND (78)
2851        // Other 5xx → CURLE_FTP_COULDNT_RETR_FILE (19)
2852        let code = if retr_resp.code == 425 || retr_resp.code == 421 {
2853            10
2854        } else if retr_resp.code == 550 {
2855            78
2856        } else {
2857            19
2858        };
2859        return Err(Error::Transfer {
2860            code,
2861            message: format!("FTP RETR failed: {} {}", retr_resp.code, retr_resp.message),
2862        });
2863    }
2864
2865    // Now accept the data connection for active mode (passive mode already connected)
2866    // For active mode, race accept against control channel (server may send 425/421
2867    // instead of connecting — curl compat: tests 1206, 1207).
2868    let mut data_stream = match data_conn {
2869        DataConnection::Connected(stream) => stream,
2870        DataConnection::PendingActive { listener, use_tls } => {
2871            let accept_fut = listener.accept();
2872            // Race: accept data connection vs read control channel error
2873            tokio::select! {
2874                accept_result = accept_fut => {
2875                    let (tcp, _) = accept_result
2876                        .map_err(|e| Error::Http(format!("FTP active mode accept failed: {e}")))?;
2877                    if use_tls {
2878                        session.maybe_wrap_data_tls(tcp).await?
2879                    } else {
2880                        FtpStream::Plain(tcp)
2881                    }
2882                }
2883                ctrl_result = read_response(&mut session.reader) => {
2884                    // Server sent a response instead of connecting — likely 425/421
2885                    let ctrl_resp = ctrl_result?;
2886                    session.header_bytes.extend_from_slice(&ctrl_resp.raw_bytes);
2887                    let code = if ctrl_resp.code == 425 || ctrl_resp.code == 421 {
2888                        10
2889                    } else {
2890                        19
2891                    };
2892                    return Err(Error::Transfer {
2893                        code,
2894                        message: format!("FTP RETR failed: {} {}", ctrl_resp.code, ctrl_resp.message),
2895                    });
2896                }
2897            }
2898        }
2899    };
2900
2901    let mut data = Vec::new();
2902
2903    // If range_end is set, read only (end - start + 1) bytes, then ABOR
2904    let start_offset = resume_from.unwrap_or(0);
2905    if let Some(end) = range_end {
2906        #[allow(clippy::cast_possible_truncation)]
2907        let max_bytes = (end - start_offset + 1) as usize;
2908        let mut limited = data_stream.take(max_bytes as u64);
2909        let _ = limited
2910            .read_to_end(&mut data)
2911            .await
2912            .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
2913        drop(limited);
2914        // Send ABOR to terminate the transfer early
2915        send_command(&mut session.writer, "ABOR").await?;
2916        // Ignore response (may be 426 or 226)
2917        let _ = session.read_and_record().await;
2918    } else {
2919        let _ = data_stream
2920            .read_to_end(&mut data)
2921            .await
2922            .map_err(|e| Error::Http(format!("FTP data read error: {e}")))?;
2923        drop(data_stream);
2924    }
2925
2926    // Check for partial file: if we know the expected size and got less data,
2927    // return CURLE_PARTIAL_FILE (18) (curl compat: test 161).
2928    // Return the partial data so the CLI can still output it.
2929    if range_end.is_none() {
2930        if let Some(expected) = remote_size {
2931            let actual = data.len() as u64 + resume_from.unwrap_or(0);
2932            if actual < expected {
2933                let mut headers = std::collections::HashMap::new();
2934                let _old = headers.insert("content-length".to_string(), data.len().to_string());
2935                let mut resp = Response::new(200, headers, data, url.as_str().to_string());
2936                resp.set_raw_headers(std::mem::take(&mut session.header_bytes));
2937                resp.set_body_error(Some("partial".to_string()));
2938                return Ok(resp);
2939            }
2940        }
2941    }
2942
2943    // Read 226 Transfer Complete
2944    if retr_resp.is_preliminary() && range_end.is_none() {
2945        let complete_resp = session.read_and_record().await?;
2946        if !complete_resp.is_complete() {
2947            return Err(Error::Http(format!(
2948                "FTP transfer failed: {} {}",
2949                complete_resp.code, complete_resp.message
2950            )));
2951        }
2952    }
2953
2954    // Post-quote commands
2955    execute_quote_commands(session, &config.post_quote).await?;
2956
2957    let raw = std::mem::take(&mut session.header_bytes);
2958
2959    let mut headers = std::collections::HashMap::new();
2960    let _old = headers.insert("content-length".to_string(), data.len().to_string());
2961
2962    let mut resp = Response::new(200, headers, data, url.as_str().to_string());
2963    resp.set_raw_headers(raw);
2964    Ok(resp)
2965}
2966
2967/// Percent-decode a URL path component.
2968fn percent_decode(s: &str) -> String {
2969    let mut result = String::with_capacity(s.len());
2970    let mut chars = s.bytes();
2971    while let Some(b) = chars.next() {
2972        if b == b'%' {
2973            let hi = chars.next();
2974            let lo = chars.next();
2975            if let (Some(h), Some(l)) = (hi, lo) {
2976                let hex = [h, l];
2977                if let Ok(s) = std::str::from_utf8(&hex) {
2978                    if let Ok(val) = u8::from_str_radix(s, 16) {
2979                        result.push(val as char);
2980                        continue;
2981                    }
2982                }
2983                // Not valid hex, keep literal
2984                result.push('%');
2985                result.push(h as char);
2986                result.push(l as char);
2987            } else {
2988                result.push('%');
2989            }
2990        } else {
2991            result.push(b as char);
2992        }
2993    }
2994    result
2995}
2996
2997/// Parse an MDTM timestamp "YYYYMMDDHHMMSS" into a Unix timestamp (seconds since epoch).
2998fn parse_mdtm_timestamp(s: &str) -> Option<i64> {
2999    if s.len() < 14 {
3000        return None;
3001    }
3002    let year: i64 = s[0..4].parse().ok()?;
3003    let month: i64 = s[4..6].parse().ok()?;
3004    let day: i64 = s[6..8].parse().ok()?;
3005    let hour: i64 = s[8..10].parse().ok()?;
3006    let min: i64 = s[10..12].parse().ok()?;
3007    let sec: i64 = s[12..14].parse().ok()?;
3008
3009    // Simplified conversion to Unix timestamp (good enough for date comparison)
3010    // This doesn't account for leap seconds but is sufficient for -z comparisons.
3011    let days = days_from_date(year, month, day)?;
3012    Some(days * 86400 + hour * 3600 + min * 60 + sec)
3013}
3014
3015/// Calculate days since Unix epoch from a date.
3016fn days_from_date(year: i64, month: i64, day: i64) -> Option<i64> {
3017    if !(1..=12).contains(&month) || !(1..=31).contains(&day) {
3018        return None;
3019    }
3020    // Months to days (non-leap year)
3021    let month_days: [i64; 12] = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334];
3022    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
3023    let m = (month - 1) as usize;
3024
3025    let y = year - 1970;
3026    let leap_years = if year > 1970 {
3027        ((year - 1) / 4 - (year - 1) / 100 + (year - 1) / 400)
3028            - (1969 / 4 - 1969 / 100 + 1969 / 400)
3029    } else {
3030        0
3031    };
3032    let mut days = y * 365 + leap_years + month_days[m] + day - 1;
3033
3034    // Add leap day for current year if applicable
3035    if month > 2 && (year % 4 == 0 && (year % 100 != 0 || year % 400 == 0)) {
3036        days += 1;
3037    }
3038
3039    Some(days)
3040}
3041
3042/// Format an MDTM timestamp as an HTTP-style "Last-Modified" date string.
3043fn format_mdtm_as_http_date(s: &str) -> Option<String> {
3044    if s.len() < 14 {
3045        return None;
3046    }
3047    let year: u32 = s[0..4].parse().ok()?;
3048    let month: u32 = s[4..6].parse().ok()?;
3049    let day: u32 = s[6..8].parse().ok()?;
3050    let hour: u32 = s[8..10].parse().ok()?;
3051    let min: u32 = s[10..12].parse().ok()?;
3052    let sec: u32 = s[12..14].parse().ok()?;
3053
3054    let month_names =
3055        ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"];
3056    #[allow(clippy::cast_sign_loss)]
3057    let month_name = month_names.get((month - 1) as usize)?;
3058
3059    // Calculate day of week using Zeller-like formula
3060    let ts = parse_mdtm_timestamp(s)?;
3061    #[allow(clippy::cast_sign_loss)]
3062    let day_of_week = ((ts / 86400 + 4) % 7) as usize; // Jan 1, 1970 was Thursday (4)
3063    let day_names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"];
3064    let dow = day_names.get(day_of_week)?;
3065
3066    Some(format!("{dow}, {day:02} {month_name} {year} {hour:02}:{min:02}:{sec:02} GMT"))
3067}
3068
3069/// Parse `;type=A` or `;type=I` suffix from FTP URL path (RFC 1738).
3070///
3071/// Returns the path without the type suffix and the parsed transfer type.
3072fn parse_ftp_type(path: &str) -> (&str, Option<TransferType>) {
3073    if let Some(pos) = path.rfind(";type=") {
3074        let type_str = &path[pos + 6..];
3075        let transfer_type = match type_str {
3076            "A" | "a" => Some(TransferType::Ascii),
3077            "I" | "i" => Some(TransferType::Binary),
3078            _ => None,
3079        };
3080        if transfer_type.is_some() {
3081            return (&path[..pos], transfer_type);
3082        }
3083    }
3084    (path, None)
3085}
3086
3087/// Split a path into directory components and filename based on `FtpMethod`.
3088///
3089/// Returns `(dir_components, filename)`.
3090fn split_path_for_method(path: &str, method: FtpMethod) -> (Vec<&str>, String) {
3091    let trimmed = path.trim_start_matches('/');
3092
3093    match method {
3094        FtpMethod::NoCwd => {
3095            // For NoCwd, preserve absolute paths:
3096            //   ftp://host/file  → path="/file"  → filename="file" (relative)
3097            //   ftp://host//file → path="//file"  → filename="/file" (absolute, curl compat: test 1227)
3098            let filename = if path.starts_with("//") { &path[1..] } else { trimmed };
3099            (Vec::new(), filename.to_string())
3100        }
3101        FtpMethod::SingleCwd => {
3102            if let Some((dir, file)) = trimmed.rsplit_once('/') {
3103                if dir.is_empty() {
3104                    // Path like "/filename" — CWD /
3105                    (vec!["/"], file.to_string())
3106                } else {
3107                    (vec![dir], file.to_string())
3108                }
3109            } else if path.starts_with("//") {
3110                // Root-relative file: //filename → CWD /, RETR filename (tests 1224, 1226)
3111                (vec!["/"], trimmed.to_string())
3112            } else {
3113                (Vec::new(), trimmed.to_string())
3114            }
3115        }
3116        FtpMethod::MultiCwd => {
3117            if let Some((dir, file)) = trimmed.rsplit_once('/') {
3118                // Split the leading slash: if path starts with "/", first CWD should be "/"
3119                let mut components = Vec::new();
3120                if path.starts_with("//") {
3121                    // Absolute path like //path/to/file — first CWD is "/"
3122                    components.push("/");
3123                }
3124                for component in dir.split('/') {
3125                    if !component.is_empty() {
3126                        components.push(component);
3127                    }
3128                }
3129                (components, file.to_string())
3130            } else if path.starts_with("//") {
3131                // Root-relative file: //filename → CWD /, RETR filename (test 1224)
3132                (vec!["/"], trimmed.to_string())
3133            } else {
3134                (Vec::new(), trimmed.to_string())
3135            }
3136        }
3137    }
3138}
3139
3140/// Convert LF line endings to CRLF.
3141fn lf_to_crlf(data: &[u8]) -> Vec<u8> {
3142    let mut result = Vec::with_capacity(data.len() + data.len() / 10);
3143    let mut prev = 0u8;
3144    for &byte in data {
3145        if byte == b'\n' && prev != b'\r' {
3146            result.push(b'\r');
3147        }
3148        result.push(byte);
3149        prev = byte;
3150    }
3151    result
3152}
3153
3154/// Perform an FTP download and return the file contents as a Response.
3155///
3156/// # Errors
3157///
3158/// Returns an error if login fails, passive mode fails, or the file cannot be retrieved.
3159#[allow(clippy::too_many_lines)]
3160pub async fn download(
3161    url: &crate::url::Url,
3162    ssl_mode: FtpSslMode,
3163    tls_config: &crate::tls::TlsConfig,
3164    resume_from: Option<u64>,
3165    config: &FtpConfig,
3166) -> Result<Response, Error> {
3167    perform(
3168        url,
3169        None,
3170        ssl_mode,
3171        UseSsl::None,
3172        tls_config,
3173        resume_from,
3174        config,
3175        None,
3176        &mut None,
3177        None,
3178    )
3179    .await
3180}
3181
3182/// Perform an FTP directory listing and return it as a Response.
3183///
3184/// # Errors
3185///
3186/// Returns an error if login fails, passive mode fails, or listing fails.
3187#[allow(clippy::too_many_lines)]
3188pub async fn list(
3189    url: &crate::url::Url,
3190    ssl_mode: FtpSslMode,
3191    tls_config: &crate::tls::TlsConfig,
3192    config: &FtpConfig,
3193) -> Result<Response, Error> {
3194    perform(url, None, ssl_mode, UseSsl::None, tls_config, None, config, None, &mut None, None)
3195        .await
3196}
3197
3198/// Perform an FTP upload.
3199///
3200/// # Errors
3201///
3202/// Returns an error if login fails, passive mode fails, or the upload fails.
3203pub async fn upload(
3204    url: &crate::url::Url,
3205    data: &[u8],
3206    ssl_mode: FtpSslMode,
3207    tls_config: &crate::tls::TlsConfig,
3208    config: &FtpConfig,
3209) -> Result<Response, Error> {
3210    perform(
3211        url,
3212        Some(data),
3213        ssl_mode,
3214        UseSsl::None,
3215        tls_config,
3216        None,
3217        config,
3218        None,
3219        &mut None,
3220        None,
3221    )
3222    .await
3223}
3224
3225#[cfg(test)]
3226#[allow(clippy::unwrap_used)]
3227mod tests {
3228    use super::*;
3229
3230    #[tokio::test]
3231    async fn read_simple_response() {
3232        let data = b"220 Welcome to FTP\r\n";
3233        let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3234        let resp = read_response(&mut reader).await.unwrap();
3235        assert_eq!(resp.code, 220);
3236        assert_eq!(resp.message, "Welcome to FTP");
3237    }
3238
3239    #[tokio::test]
3240    async fn read_multiline_response() {
3241        let data = b"220-Welcome\r\n220-to the\r\n220 FTP server\r\n";
3242        let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3243        let resp = read_response(&mut reader).await.unwrap();
3244        assert_eq!(resp.code, 220);
3245        assert!(resp.message.contains("Welcome"));
3246        assert!(resp.message.contains("FTP server"));
3247    }
3248
3249    #[tokio::test]
3250    async fn read_response_connection_closed() {
3251        let data = b"";
3252        let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3253        let result = read_response(&mut reader).await;
3254        assert!(result.is_err());
3255    }
3256
3257    #[test]
3258    fn parse_pasv_simple() {
3259        let msg = "Entering Passive Mode (192,168,1,1,4,1)";
3260        let (host, port) = parse_pasv_response(msg).unwrap();
3261        assert_eq!(host, "192.168.1.1");
3262        assert_eq!(port, 1025); // 4*256 + 1
3263    }
3264
3265    #[test]
3266    fn parse_pasv_high_port() {
3267        let msg = "Entering Passive Mode (127,0,0,1,200,100)";
3268        let (host, port) = parse_pasv_response(msg).unwrap();
3269        assert_eq!(host, "127.0.0.1");
3270        assert_eq!(port, 51300); // 200*256 + 100
3271    }
3272
3273    #[test]
3274    fn parse_epsv_simple() {
3275        let msg = "Entering Extended Passive Mode (|||12345|)";
3276        let port = parse_epsv_response(msg).unwrap();
3277        assert_eq!(port, 12345);
3278    }
3279
3280    #[test]
3281    fn ftp_response_status_categories() {
3282        let preliminary = FtpResponse { code: 150, message: String::new(), raw_bytes: Vec::new() };
3283        assert!(preliminary.is_preliminary());
3284        assert!(!preliminary.is_complete());
3285
3286        let complete = FtpResponse { code: 226, message: String::new(), raw_bytes: Vec::new() };
3287        assert!(complete.is_complete());
3288        assert!(!complete.is_intermediate());
3289
3290        let intermediate = FtpResponse { code: 331, message: String::new(), raw_bytes: Vec::new() };
3291        assert!(intermediate.is_intermediate());
3292        assert!(!intermediate.is_complete());
3293    }
3294
3295    #[test]
3296    fn parse_feat_response_full() {
3297        let message = "Extensions supported:\n EPSV\n MLST size*;modify*;type*\n REST STREAM\n SIZE\n UTF8\n AUTH TLS";
3298        let features = parse_feat_response(message);
3299        assert!(features.epsv);
3300        assert!(features.mlst);
3301        assert!(features.rest_stream);
3302        assert!(features.size);
3303        assert!(features.utf8);
3304        assert!(features.auth_tls);
3305    }
3306
3307    #[test]
3308    fn parse_feat_response_minimal() {
3309        let message = "SIZE\nREST STREAM";
3310        let features = parse_feat_response(message);
3311        assert!(features.size);
3312        assert!(features.rest_stream);
3313        assert!(!features.epsv);
3314        assert!(!features.mlst);
3315    }
3316
3317    #[test]
3318    fn parse_feat_response_empty() {
3319        let features = parse_feat_response("");
3320        assert!(!features.epsv);
3321        assert!(!features.mlst);
3322        assert!(!features.rest_stream);
3323        assert!(!features.size);
3324        assert!(!features.utf8);
3325        assert!(!features.auth_tls);
3326        assert!(features.raw.is_empty());
3327    }
3328
3329    #[test]
3330    fn parse_feat_response_auth_tls() {
3331        let message = "AUTH TLS\nAUTH SSL";
3332        let features = parse_feat_response(message);
3333        assert!(features.auth_tls);
3334    }
3335
3336    #[test]
3337    fn transfer_type_equality() {
3338        assert_eq!(TransferType::Ascii, TransferType::Ascii);
3339        assert_eq!(TransferType::Binary, TransferType::Binary);
3340        assert_ne!(TransferType::Ascii, TransferType::Binary);
3341    }
3342
3343    #[test]
3344    fn ftp_features_default() {
3345        let features = FtpFeatures::default();
3346        assert!(!features.epsv);
3347        assert!(!features.mlst);
3348        assert!(!features.rest_stream);
3349        assert!(!features.size);
3350        assert!(!features.utf8);
3351        assert!(!features.auth_tls);
3352        assert!(features.raw.is_empty());
3353    }
3354
3355    #[test]
3356    fn ftp_ssl_mode_equality() {
3357        assert_eq!(FtpSslMode::None, FtpSslMode::None);
3358        assert_eq!(FtpSslMode::Explicit, FtpSslMode::Explicit);
3359        assert_eq!(FtpSslMode::Implicit, FtpSslMode::Implicit);
3360        assert_ne!(FtpSslMode::None, FtpSslMode::Explicit);
3361        assert_ne!(FtpSslMode::Explicit, FtpSslMode::Implicit);
3362    }
3363
3364    #[test]
3365    fn ftp_method_default() {
3366        assert_eq!(FtpMethod::default(), FtpMethod::MultiCwd);
3367    }
3368
3369    #[test]
3370    fn ftp_method_equality() {
3371        assert_eq!(FtpMethod::MultiCwd, FtpMethod::MultiCwd);
3372        assert_eq!(FtpMethod::SingleCwd, FtpMethod::SingleCwd);
3373        assert_eq!(FtpMethod::NoCwd, FtpMethod::NoCwd);
3374        assert_ne!(FtpMethod::MultiCwd, FtpMethod::SingleCwd);
3375        assert_ne!(FtpMethod::SingleCwd, FtpMethod::NoCwd);
3376    }
3377
3378    #[test]
3379    fn format_port_ipv4() {
3380        let addr: SocketAddr = "192.168.1.100:12345".parse().unwrap();
3381        let cmd = format_port_command(&addr);
3382        // 12345 = 48*256 + 57
3383        assert_eq!(cmd, "PORT 192,168,1,100,48,57");
3384    }
3385
3386    #[test]
3387    fn format_port_low_port() {
3388        let addr: SocketAddr = "10.0.0.1:21".parse().unwrap();
3389        let cmd = format_port_command(&addr);
3390        // 21 = 0*256 + 21
3391        assert_eq!(cmd, "PORT 10,0,0,1,0,21");
3392    }
3393
3394    #[test]
3395    fn format_port_high_port() {
3396        let addr: SocketAddr = "127.0.0.1:65535".parse().unwrap();
3397        let cmd = format_port_command(&addr);
3398        // 65535 = 255*256 + 255
3399        assert_eq!(cmd, "PORT 127,0,0,1,255,255");
3400    }
3401
3402    #[test]
3403    fn format_eprt_ipv4() {
3404        let addr: SocketAddr = "192.168.1.100:12345".parse().unwrap();
3405        let cmd = format_eprt_command(&addr);
3406        assert_eq!(cmd, "EPRT |1|192.168.1.100|12345|");
3407    }
3408
3409    #[test]
3410    fn format_eprt_ipv6() {
3411        let addr: SocketAddr = "[::1]:54321".parse().unwrap();
3412        let cmd = format_eprt_command(&addr);
3413        assert_eq!(cmd, "EPRT |2|::1|54321|");
3414    }
3415
3416    #[test]
3417    fn format_port_roundtrip() {
3418        // Generate a PORT command and verify it can be parsed back
3419        let addr: SocketAddr = "10.20.30.40:5000".parse().unwrap();
3420        let cmd = format_port_command(&addr);
3421        // PORT 10,20,30,40,19,136  (5000 = 19*256 + 136)
3422        assert!(cmd.starts_with("PORT "));
3423        let nums: Vec<&str> = cmd[5..].split(',').collect();
3424        assert_eq!(nums.len(), 6);
3425        let h1: u16 = nums[0].parse().unwrap();
3426        let h2: u16 = nums[1].parse().unwrap();
3427        let h3: u16 = nums[2].parse().unwrap();
3428        let h4: u16 = nums[3].parse().unwrap();
3429        let p1: u16 = nums[4].parse().unwrap();
3430        let p2: u16 = nums[5].parse().unwrap();
3431        assert_eq!(format!("{h1}.{h2}.{h3}.{h4}"), "10.20.30.40");
3432        assert_eq!(p1 * 256 + p2, 5000);
3433    }
3434
3435    #[tokio::test]
3436    async fn send_command_format() {
3437        let mut buf = Vec::new();
3438        send_command(&mut buf, "USER test").await.unwrap();
3439        assert_eq!(buf, b"USER test\r\n");
3440    }
3441
3442    #[tokio::test]
3443    async fn send_command_feat() {
3444        let mut buf = Vec::new();
3445        send_command(&mut buf, "FEAT").await.unwrap();
3446        assert_eq!(buf, b"FEAT\r\n");
3447    }
3448
3449    #[tokio::test]
3450    async fn read_auth_tls_response() {
3451        let data = b"234 AUTH TLS OK\r\n";
3452        let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3453        let resp = read_response(&mut reader).await.unwrap();
3454        assert_eq!(resp.code, 234);
3455        assert!(resp.is_complete());
3456    }
3457
3458    #[tokio::test]
3459    async fn read_pbsz_response() {
3460        let data = b"200 PBSZ=0\r\n";
3461        let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3462        let resp = read_response(&mut reader).await.unwrap();
3463        assert_eq!(resp.code, 200);
3464        assert!(resp.is_complete());
3465    }
3466
3467    #[tokio::test]
3468    async fn read_prot_p_response() {
3469        let data = b"200 Protection set to Private\r\n";
3470        let mut reader = BufReader::new(std::io::Cursor::new(data.to_vec()));
3471        let resp = read_response(&mut reader).await.unwrap();
3472        assert_eq!(resp.code, 200);
3473        assert!(resp.is_complete());
3474    }
3475
3476    #[cfg(feature = "rustls")]
3477    #[test]
3478    fn tls_connector_no_alpn_creates_ok() {
3479        let tls_config = crate::tls::TlsConfig::default();
3480        let connector = crate::tls::TlsConnector::new_no_alpn(&tls_config);
3481        assert!(connector.is_ok());
3482    }
3483
3484    #[test]
3485    fn ftp_config_default() {
3486        let config = FtpConfig::default();
3487        assert!(config.use_epsv);
3488        assert!(config.use_eprt);
3489        assert!(!config.skip_pasv_ip);
3490        assert!(config.account.is_none());
3491        assert!(!config.create_dirs);
3492        assert_eq!(config.method, FtpMethod::MultiCwd);
3493        assert!(config.active_port.is_none());
3494    }
3495
3496    #[test]
3497    fn ftp_config_clone() {
3498        let config = FtpConfig {
3499            use_epsv: false,
3500            use_eprt: false,
3501            skip_pasv_ip: true,
3502            account: Some("myacct".to_string()),
3503            create_dirs: true,
3504            method: FtpMethod::NoCwd,
3505            active_port: Some("-".to_string()),
3506            ..Default::default()
3507        };
3508        #[allow(clippy::redundant_clone)] // Testing Clone impl
3509        let cloned = config.clone();
3510        assert!(!cloned.use_epsv);
3511        assert!(!cloned.use_eprt);
3512        assert!(cloned.skip_pasv_ip);
3513        assert_eq!(cloned.account.as_deref(), Some("myacct"));
3514        assert!(cloned.create_dirs);
3515        assert_eq!(cloned.method, FtpMethod::NoCwd);
3516        assert_eq!(cloned.active_port.as_deref(), Some("-"));
3517    }
3518}