ntrip_core/
client.rs

1//! NTRIP HTTP client for connecting to casters and streaming corrections.
2//!
3//! Supports both NTRIP v1 (legacy ICY protocol) and NTRIP v2 (HTTP/1.1 with
4//! chunked transfer encoding).
5//!
6//! Features:
7//! - Raw TCP socket connection (plain or TLS-encrypted)
8//! - NTRIP v1: HTTP/1.0, ICY 200 OK response
9//! - NTRIP v2: HTTP/1.1, chunked transfer encoding
10//! - Auto-detection of protocol version from server response
11//! - Basic Authentication
12//! - TLS/HTTPS support for secure connections
13//! - GGA position reporting
14
15use std::time::Duration;
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio::net::TcpStream;
18use tracing::{debug, error, info, warn};
19
20use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
21
22use crate::config::{NtripConfig, NtripVersion};
23use crate::gga::GgaSentence;
24use crate::sourcetable::Sourcetable;
25use crate::stream::NtripStream;
26use crate::Error;
27
28/// User-Agent string sent in HTTP requests.
29const USER_AGENT: &str = concat!("NTRIP ntrip-core/", env!("CARGO_PKG_VERSION"));
30
31/// Detected protocol version after connection.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33enum DetectedProtocol {
34    /// NTRIP v1 - raw binary stream
35    V1,
36    /// NTRIP v2 - chunked transfer encoding
37    V2Chunked,
38}
39
40/// NTRIP client for streaming corrections from a caster.
41/// Supports both v1 (ICY) and v2 (HTTP/1.1) protocols.
42pub struct NtripClient {
43    /// Configuration
44    config: NtripConfig,
45    /// Active stream (plain TCP or TLS, if connected)
46    stream: Option<NtripStream>,
47    /// Detected protocol after connection
48    protocol: Option<DetectedProtocol>,
49    /// Buffer for raw over-read bytes from header parsing (read before socket)
50    stream_buffer: Vec<u8>,
51    /// Buffer for decoded chunk data that exceeded caller's buffer size (v2 chunked mode)
52    chunk_buffer: Vec<u8>,
53    /// Remaining bytes in current chunk (for v2 chunked mode)
54    chunk_remaining: usize,
55    /// Last GGA sent (for automatic resend after reconnection)
56    last_gga: Option<GgaSentence>,
57}
58
59impl NtripClient {
60    /// Create a new NTRIP client with the given configuration.
61    pub fn new(config: NtripConfig) -> Result<Self, Error> {
62        config.validate()?;
63        Ok(Self {
64            config,
65            stream: None,
66            protocol: None,
67            stream_buffer: Vec::new(),
68            chunk_buffer: Vec::new(),
69            chunk_remaining: 0,
70            last_gga: None,
71        })
72    }
73
74    /// Connect to the NTRIP caster and start streaming.
75    ///
76    /// For NTRIP v2, consider using `connect_with_gga()` to send an initial
77    /// position in the request headers for faster VRS/nearest-base selection.
78    pub async fn connect(&mut self) -> Result<(), Error> {
79        self.connect_with_gga(None).await
80    }
81
82    /// Connect to the NTRIP caster with an optional initial GGA position.
83    ///
84    /// For NTRIP v2, the GGA sentence is sent in the `Ntrip-GGA` header,
85    /// allowing the caster to select the appropriate VRS or nearest base
86    /// immediately, without waiting for a post-connection GGA report.
87    ///
88    /// For NTRIP v1, the initial_gga is ignored (GGA must be sent post-connection).
89    pub async fn connect_with_gga(
90        &mut self,
91        initial_gga: Option<&GgaSentence>,
92    ) -> Result<(), Error> {
93        let host = &self.config.host;
94        let port = self.config.port;
95        let addr = format!("{}:{}", host, port);
96
97        info!(addr = %addr, "Connecting to NTRIP caster");
98
99        // 1. Establish TCP connection
100        let tcp_stream = match tokio::time::timeout(
101            Duration::from_secs(self.config.connection.timeout_secs as u64),
102            TcpStream::connect(&addr),
103        )
104        .await
105        {
106            Ok(Ok(s)) => s,
107            Ok(Err(e)) => return Err(Error::connection_failed(host, port, e)),
108            Err(_) => {
109                return Err(Error::Timeout {
110                    timeout_secs: self.config.connection.timeout_secs,
111                })
112            }
113        };
114
115        // 2. Optionally upgrade to TLS
116        let mut stream: NtripStream = if self.config.use_tls {
117            if self.config.tls_skip_verify {
118                warn!("TLS certificate verification is disabled - connection is not fully secure");
119            }
120            NtripStream::connect_tls(tcp_stream, host, self.config.tls_skip_verify).await?
121        } else {
122            NtripStream::plain(tcp_stream)
123        };
124
125        // 3. Construct HTTP Request based on configured version
126        let mountpoint = &self.config.mountpoint;
127
128        let auth_header =
129            if let (Some(user), Some(pass)) = (&self.config.username, &self.config.password) {
130                let credentials = format!("{}:{}", user, pass);
131                let encoded = BASE64.encode(credentials);
132                format!("Authorization: Basic {}\r\n", encoded)
133            } else {
134                String::new()
135            };
136
137        // Build Ntrip-GGA header for v2 if initial position provided
138        let gga_header = match (&self.config.ntrip_version, initial_gga) {
139            (NtripVersion::V2, Some(gga)) | (NtripVersion::Auto, Some(gga)) => {
140                let nmea = gga.to_nmea();
141                let nmea_trimmed = nmea.trim();
142                debug!(gga = %nmea_trimmed, "Including initial GGA in request header");
143                format!("Ntrip-GGA: {}\r\n", nmea_trimmed)
144            }
145            _ => String::new(),
146        };
147
148        // Store initial GGA for potential reconnection resend (fulfills documented behavior)
149        if let Some(gga) = initial_gga {
150            self.last_gga = Some(gga.clone());
151        }
152
153        // Build request based on configured NTRIP version
154        let request = match self.config.ntrip_version {
155            NtripVersion::V1 => {
156                debug!("Using NTRIP v1 protocol");
157                format!(
158                    "GET /{} HTTP/1.0\r\n\
159                     User-Agent: {}\r\n\
160                     Host: {}:{}\r\n\
161                     Accept: */*\r\n\
162                     Connection: close\r\n\
163                     {}\r\n",
164                    mountpoint, USER_AGENT, host, port, auth_header
165                )
166            }
167            NtripVersion::V2 | NtripVersion::Auto => {
168                debug!("Using NTRIP v2 protocol request");
169                format!(
170                    "GET /{} HTTP/1.1\r\n\
171                     Host: {}:{}\r\n\
172                     User-Agent: {}\r\n\
173                     Ntrip-Version: Ntrip/2.0\r\n\
174                     {}\
175                     Accept: */*\r\n\
176                     Connection: close\r\n\
177                     {}\r\n",
178                    mountpoint, host, port, USER_AGENT, gga_header, auth_header
179                )
180            }
181        };
182
183        // Log request with Authorization header redacted for security
184        let redacted_request = if auth_header.is_empty() {
185            request.clone()
186        } else {
187            request.replace(&auth_header, "Authorization: Basic [REDACTED]\r\n")
188        };
189        debug!(request = %redacted_request, "Sending NTRIP request");
190
191        // 4. Send Request
192        if let Err(e) = stream.write_all(request.as_bytes()).await {
193            return Err(Error::NetworkError { source: e });
194        }
195
196        // 5. Read Response Headers (buffered for efficiency)
197        const MAX_HEADER_SIZE: usize = 4096;
198        let mut headers = Vec::with_capacity(512);
199        let mut buffer = [0u8; 512];
200
201        let header_end_found = loop {
202            if headers.len() >= MAX_HEADER_SIZE {
203                break false;
204            }
205
206            // Read a chunk of data
207            let bytes_to_read = std::cmp::min(buffer.len(), MAX_HEADER_SIZE - headers.len());
208            match stream.read(&mut buffer[..bytes_to_read]).await {
209                Ok(0) => break false, // Connection closed
210                Ok(n) => {
211                    headers.extend_from_slice(&buffer[..n]);
212
213                    // Check for standard HTTP header end
214                    if headers.len() >= 4 {
215                        if let Some(pos) = headers.windows(4).position(|w| w == b"\r\n\r\n") {
216                            // Preserve any over-read bytes (data after headers) in stream_buffer
217                            let body_start = pos + 4;
218                            if headers.len() > body_start {
219                                self.stream_buffer.extend_from_slice(&headers[body_start..]);
220                            }
221                            // Trim headers to just before the body
222                            headers.truncate(body_start);
223                            break true;
224                        }
225                    }
226
227                    // Check for legacy ICY response (NTRIP v1)
228                    if headers.len() >= 12 && headers.starts_with(b"ICY 200 OK\r\n") {
229                        debug!("Detected legacy ICY 200 OK response, starting stream");
230                        // For ICY, preserve any bytes after "ICY 200 OK\r\n" in stream_buffer
231                        let icy_header_len = 12; // b"ICY 200 OK\r\n".len()
232                        if headers.len() > icy_header_len {
233                            self.stream_buffer
234                                .extend_from_slice(&headers[icy_header_len..]);
235                        }
236                        headers.truncate(icy_header_len);
237                        break true;
238                    }
239                }
240                Err(e) => return Err(Error::NetworkError { source: e }),
241            }
242        };
243
244        if !header_end_found {
245            return Err(Error::NetworkError {
246                source: std::io::Error::new(
247                    std::io::ErrorKind::InvalidData,
248                    "Header too large or incomplete",
249                ),
250            });
251        }
252
253        // 6. Parse Status Line and detect protocol version
254        let response = String::from_utf8_lossy(&headers);
255        let status_line = response.lines().next().unwrap_or("");
256
257        debug!(status_line = %status_line, "Received NTRIP response");
258
259        if status_line.starts_with("ICY 200") || status_line.contains("200 OK") {
260            let detected = self.detect_protocol(&response, status_line);
261            info!(
262                mountpoint = %mountpoint,
263                protocol = ?detected,
264                "Connected to NTRIP caster"
265            );
266
267            self.stream = Some(stream);
268            self.protocol = Some(detected);
269            // Note: chunk_buffer may contain over-read bytes from header parsing - do not clear
270            self.chunk_remaining = 0;
271            Ok(())
272        } else if status_line.contains("401 Unauthorized") {
273            error!("NTRIP authentication failed");
274            Err(Error::AuthenticationFailed {
275                host: host.clone(),
276                username: self.config.username.clone().unwrap_or_default(),
277            })
278        } else if status_line.contains("404 Not Found") {
279            error!(mountpoint = %mountpoint, "Mountpoint not found");
280            Err(Error::MountpointNotFound {
281                host: host.clone(),
282                mountpoint: mountpoint.clone(),
283            })
284        } else {
285            error!(status = %status_line, "NTRIP caster error");
286            Err(Error::HttpError {
287                host: host.clone(),
288                status_code: Self::parse_status_code(status_line),
289                reason: status_line.to_string(),
290            })
291        }
292    }
293
294    /// Read a chunk of RTCM data from the stream with timeout.
295    /// Automatically handles chunked transfer encoding for NTRIP v2.
296    ///
297    /// # Reconnection Behavior
298    ///
299    /// If automatic reconnection is enabled (default: 3 attempts), this method
300    /// will attempt to reconnect on timeout or disconnect errors before returning
301    /// an error to the caller.
302    ///
303    /// **Important:** When reconnection is enabled, this method may block for up to
304    /// `(max_reconnect_attempts × reconnect_delay_ms) + connection_timeout` milliseconds
305    /// while attempting to restore the connection. During this time:
306    /// - The original error (timeout/disconnect) is not immediately returned
307    /// - If a previous GGA position was set (via `send_gga()` or `connect_with_gga()`),
308    ///   it will be automatically resent after successful reconnection
309    ///
310    /// To disable this behavior, use `config.without_reconnect()`.
311    pub async fn read_chunk(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
312        let max_attempts = self.config.connection.max_reconnect_attempts;
313        let reconnect_delay = self.config.connection.reconnect_delay_ms;
314        let mut attempts = 0;
315
316        loop {
317            // Ensure we're connected
318            if self.stream.is_none() {
319                if attempts > 0 && attempts <= max_attempts {
320                    // Attempting reconnection
321                    warn!(
322                        attempt = attempts,
323                        max_attempts = max_attempts,
324                        "Attempting automatic reconnection"
325                    );
326
327                    // Delay before reconnect
328                    tokio::time::sleep(Duration::from_millis(reconnect_delay)).await;
329
330                    // Clone GGA before mutable borrow for reconnection
331                    let gga_clone = self.last_gga.clone();
332                    match self.connect_with_gga(gga_clone.as_ref()).await {
333                        Ok(()) => {
334                            info!(attempt = attempts, "Reconnection successful");
335                            // Resend last GGA if we have one (for v1 protocol)
336                            if let Some(ref gga) = gga_clone {
337                                if let Err(e) = self.send_gga_internal(gga).await {
338                                    warn!(error = %e, "Failed to resend GGA after reconnect");
339                                }
340                            }
341                        }
342                        Err(e) => {
343                            warn!(attempt = attempts, error = %e, "Reconnection attempt failed");
344                            attempts += 1;
345                            continue;
346                        }
347                    }
348                } else if attempts > max_attempts {
349                    // Exhausted all reconnection attempts
350                    return Err(Error::StreamDisconnected {
351                        reason: format!(
352                            "Connection lost after {} reconnection attempts",
353                            max_attempts
354                        ),
355                    });
356                } else {
357                    // First attempt, not connected yet
358                    return Err(Error::StreamDisconnected {
359                        reason: "Not connected".to_string(),
360                    });
361                }
362            }
363
364            let read_timeout_secs = self.config.connection.read_timeout_secs;
365
366            // If read_timeout_secs == 0, bypass timeout (as documented: 0 = no timeout)
367            let read_result = if read_timeout_secs == 0 {
368                self.read_raw_or_chunked(buf).await.map(Some)
369            } else {
370                let timeout_duration = Duration::from_secs(read_timeout_secs as u64);
371                match tokio::time::timeout(timeout_duration, self.read_raw_or_chunked(buf)).await {
372                    Ok(result) => result.map(Some),
373                    Err(_) => Ok(None), // Timeout occurred
374                }
375            };
376
377            match read_result {
378                Ok(Some(n)) => {
379                    debug!(bytes = n, "Received data");
380                    return Ok(n);
381                }
382                Ok(None) => {
383                    // Timeout - potentially recoverable
384                    self.stream = None;
385                    self.reset_chunk_state();
386                    if max_attempts > 0 {
387                        attempts += 1;
388                        continue;
389                    }
390                    return Err(Error::ReadTimeout {
391                        timeout_secs: read_timeout_secs,
392                    });
393                }
394                Err(e) => {
395                    // Check if this is a recoverable error
396                    if max_attempts > 0 && Self::is_recoverable_error(&e) {
397                        attempts += 1;
398                        self.stream = None;
399                        self.reset_chunk_state();
400                        continue;
401                    }
402                    return Err(e);
403                }
404            }
405        }
406    }
407
408    /// Check if an error is potentially recoverable via reconnection.
409    fn is_recoverable_error(error: &Error) -> bool {
410        matches!(
411            error,
412            Error::StreamDisconnected { .. }
413                | Error::NetworkError { .. }
414                | Error::ReadTimeout { .. }
415        )
416    }
417
418    /// Reset chunked decoding state (used after reconnection).
419    fn reset_chunk_state(&mut self) {
420        self.stream_buffer.clear();
421        self.chunk_buffer.clear();
422        self.chunk_remaining = 0;
423    }
424
425    /// Internal GGA send that doesn't update last_gga (used for reconnection resend).
426    async fn send_gga_internal(&mut self, gga: &GgaSentence) -> Result<(), Error> {
427        let stream = self.stream.as_mut().ok_or(Error::StreamDisconnected {
428            reason: "Not connected".into(),
429        })?;
430
431        let nmea = gga.to_nmea();
432        debug!(gga = %nmea.trim(), "Resending GGA after reconnection");
433
434        stream
435            .write_all(nmea.as_bytes())
436            .await
437            .map_err(|e| Error::NetworkError { source: e })?;
438
439        Ok(())
440    }
441
442    /// Check if connected.
443    pub fn is_connected(&self) -> bool {
444        self.stream.is_some()
445    }
446
447    /// Disconnect from the caster.
448    pub fn disconnect(&mut self) {
449        if self.stream.take().is_some() {
450            debug!("Disconnected from NTRIP caster");
451        }
452    }
453
454    /// Send a GGA position report to the caster on the existing stream.
455    pub async fn send_gga(&mut self, gga: &GgaSentence) -> Result<(), Error> {
456        let stream = self.stream.as_mut().ok_or(Error::StreamDisconnected {
457            reason: "Not connected".into(),
458        })?;
459
460        let nmea = gga.to_nmea();
461        debug!(gga = %nmea.trim(), "Sending GGA to caster");
462
463        stream
464            .write_all(nmea.as_bytes())
465            .await
466            .map_err(|e| Error::NetworkError { source: e })?;
467
468        // Store for potential reconnection
469        self.last_gga = Some(gga.clone());
470
471        Ok(())
472    }
473
474    /// Get the configuration.
475    pub fn config(&self) -> &NtripConfig {
476        &self.config
477    }
478
479    /// Fetch the sourcetable from the NTRIP caster.
480    ///
481    /// This is a one-shot operation that connects, retrieves the sourcetable,
482    /// and disconnects. It does not affect the current streaming connection.
483    pub async fn get_sourcetable(config: &NtripConfig) -> Result<Sourcetable, Error> {
484        config.validate()?;
485        let host = &config.host;
486        let port = config.port;
487        let addr = format!("{}:{}", host, port);
488
489        info!(addr = %addr, "Fetching sourcetable from NTRIP caster");
490
491        // 1. Establish TCP connection
492        let tcp_stream = match tokio::time::timeout(
493            Duration::from_secs(config.connection.timeout_secs as u64),
494            TcpStream::connect(&addr),
495        )
496        .await
497        {
498            Ok(Ok(s)) => s,
499            Ok(Err(e)) => return Err(Error::connection_failed(host, port, e)),
500            Err(_) => {
501                return Err(Error::Timeout {
502                    timeout_secs: config.connection.timeout_secs,
503                })
504            }
505        };
506
507        // 2. Optionally upgrade to TLS
508        let mut stream: NtripStream = if config.use_tls {
509            NtripStream::connect_tls(tcp_stream, host, config.tls_skip_verify).await?
510        } else {
511            NtripStream::plain(tcp_stream)
512        };
513
514        // 3. Send sourcetable request
515        let auth_header = if let (Some(user), Some(pass)) = (&config.username, &config.password) {
516            let credentials = format!("{}:{}", user, pass);
517            let encoded = BASE64.encode(credentials);
518            format!("Authorization: Basic {}\r\n", encoded)
519        } else {
520            String::new()
521        };
522
523        let request = format!(
524            "GET / HTTP/1.0\r\n\
525             User-Agent: {}\r\n\
526             Host: {}:{}\r\n\
527             Accept: */*\r\n\
528             Connection: close\r\n\
529             {}\r\n",
530            USER_AGENT, host, port, auth_header
531        );
532
533        // Log request with Authorization header redacted for security
534        let redacted_request = if auth_header.is_empty() {
535            request.clone()
536        } else {
537            request.replace(&auth_header, "Authorization: Basic [REDACTED]\r\n")
538        };
539        debug!(request = %redacted_request, "Sending sourcetable request");
540
541        if let Err(e) = stream.write_all(request.as_bytes()).await {
542            return Err(Error::NetworkError { source: e });
543        }
544
545        // 4. Read entire response
546        let mut response = Vec::new();
547        let mut buf = [0u8; 4096];
548
549        loop {
550            match tokio::time::timeout(
551                Duration::from_secs(config.connection.timeout_secs as u64),
552                stream.read(&mut buf),
553            )
554            .await
555            {
556                Ok(Ok(0)) => break,
557                Ok(Ok(n)) => response.extend_from_slice(&buf[..n]),
558                Ok(Err(e)) => return Err(Error::NetworkError { source: e }),
559                Err(_) => {
560                    return Err(Error::Timeout {
561                        timeout_secs: config.connection.timeout_secs,
562                    })
563                }
564            }
565
566            if response.len() > 1_000_000 {
567                return Err(Error::SourcetableParseError {
568                    message: "Sourcetable too large (>1MB)".to_string(),
569                });
570            }
571        }
572
573        // 5. Parse response
574        let response_str = String::from_utf8_lossy(&response);
575
576        let first_line = response_str.lines().next().unwrap_or("");
577        if !first_line.contains("200") {
578            return Err(Error::HttpError {
579                host: host.clone(),
580                status_code: Self::parse_status_code(first_line),
581                reason: first_line.to_string(),
582            });
583        }
584
585        let body = if let Some(idx) = response_str.find("\r\n\r\n") {
586            &response_str[idx + 4..]
587        } else {
588            &response_str[..]
589        };
590
591        let table = Sourcetable::parse(body);
592
593        info!(
594            streams = table.streams.len(),
595            casters = table.casters.len(),
596            networks = table.networks.len(),
597            "Parsed sourcetable"
598        );
599
600        Ok(table)
601    }
602
603    /// Parse HTTP status code from status line (e.g., "HTTP/1.1 401 Unauthorized" -> 401).
604    fn parse_status_code(status_line: &str) -> u16 {
605        status_line
606            .split_whitespace()
607            .nth(1)
608            .and_then(|s| s.parse().ok())
609            .unwrap_or(0)
610    }
611
612    /// Detect protocol version from server response headers.
613    fn detect_protocol(&self, response: &str, status_line: &str) -> DetectedProtocol {
614        if status_line.starts_with("ICY") {
615            debug!("Detected NTRIP v1 (ICY response)");
616            return DetectedProtocol::V1;
617        }
618
619        let response_lower = response.to_lowercase();
620        if response_lower.contains("transfer-encoding: chunked") {
621            debug!("Detected NTRIP v2 (chunked transfer encoding)");
622            return DetectedProtocol::V2Chunked;
623        }
624
625        if response_lower.contains("ntrip-version:") {
626            debug!("Server indicates NTRIP v2 but response is not chunked; treating as raw stream");
627            return DetectedProtocol::V1;
628        }
629
630        debug!("Defaulting to NTRIP v1 protocol (raw stream)");
631        DetectedProtocol::V1
632    }
633
634    /// Read a chunk of data, handling chunked transfer encoding for v2.
635    async fn read_raw_or_chunked(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
636        let protocol = self.protocol.unwrap_or(DetectedProtocol::V1);
637
638        match protocol {
639            DetectedProtocol::V1 => self.read_raw(buf).await,
640            DetectedProtocol::V2Chunked => self.read_chunked(buf).await,
641        }
642    }
643
644    /// Read raw data from stream (v1 protocol).
645    async fn read_raw(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
646        // First drain any over-read bytes from header parsing
647        if !self.stream_buffer.is_empty() {
648            let to_copy = std::cmp::min(buf.len(), self.stream_buffer.len());
649            buf[..to_copy].copy_from_slice(&self.stream_buffer[..to_copy]);
650            self.stream_buffer.drain(..to_copy);
651            return Ok(to_copy);
652        }
653
654        let stream = self
655            .stream
656            .as_mut()
657            .ok_or_else(|| Error::StreamDisconnected {
658                reason: "Not connected".to_string(),
659            })?;
660
661        match stream.read(buf).await {
662            Ok(0) => {
663                self.stream = None;
664                Err(Error::StreamDisconnected {
665                    reason: "Server closed connection".to_string(),
666                })
667            }
668            Ok(n) => Ok(n),
669            Err(e) => {
670                self.stream = None;
671                Err(Error::NetworkError { source: e })
672            }
673        }
674    }
675
676    /// Read chunked transfer encoded data (v2 protocol).
677    async fn read_chunked(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
678        // First return any already-decoded chunk data
679        if !self.chunk_buffer.is_empty() {
680            let to_copy = std::cmp::min(buf.len(), self.chunk_buffer.len());
681            buf[..to_copy].copy_from_slice(&self.chunk_buffer[..to_copy]);
682            self.chunk_buffer.drain(..to_copy);
683            return Ok(to_copy);
684        }
685
686        // If we have remaining bytes in current chunk, read them
687        if self.chunk_remaining > 0 {
688            let to_read = std::cmp::min(buf.len(), self.chunk_remaining);
689            let n = self.read_from_stream(&mut buf[..to_read]).await?;
690            if n == 0 {
691                self.stream = None;
692                return Err(Error::StreamDisconnected {
693                    reason: "Server closed connection".to_string(),
694                });
695            }
696            self.chunk_remaining -= n;
697            if self.chunk_remaining == 0 {
698                // Consume trailing CRLF after chunk data
699                self.read_and_validate_crlf().await?;
700            }
701            return Ok(n);
702        }
703
704        // Read chunk size line
705        let mut size_line = Vec::new();
706        loop {
707            let byte = self.read_one_byte().await?;
708            if byte == b'\n' {
709                break;
710            }
711            if byte != b'\r' {
712                size_line.push(byte);
713            }
714            if size_line.len() > 16 {
715                return Err(Error::NetworkError {
716                    source: std::io::Error::new(
717                        std::io::ErrorKind::InvalidData,
718                        "Chunk size line too long",
719                    ),
720                });
721            }
722        }
723
724        let size_str = String::from_utf8_lossy(&size_line);
725        // Handle chunk extensions (RFC 7230): parse only up to ';' and ignore extensions
726        let size_part = size_str.trim().split(';').next().unwrap_or("");
727        let chunk_size =
728            usize::from_str_radix(size_part.trim(), 16).map_err(|_| Error::NetworkError {
729                source: std::io::Error::new(
730                    std::io::ErrorKind::InvalidData,
731                    format!("Invalid chunk size: {}", size_str),
732                ),
733            })?;
734
735        if chunk_size == 0 {
736            self.stream = None;
737            return Err(Error::StreamDisconnected {
738                reason: "Chunked stream ended".to_string(),
739            });
740        }
741
742        debug!(chunk_size = chunk_size, "Reading chunked data");
743
744        self.chunk_remaining = chunk_size;
745        let to_read = std::cmp::min(buf.len(), self.chunk_remaining);
746
747        let n = self.read_from_stream(&mut buf[..to_read]).await?;
748        if n == 0 {
749            self.stream = None;
750            return Err(Error::StreamDisconnected {
751                reason: "Server closed connection".to_string(),
752            });
753        }
754        self.chunk_remaining -= n;
755        if self.chunk_remaining == 0 {
756            // Consume trailing CRLF after chunk data
757            self.read_and_validate_crlf().await?;
758        }
759        Ok(n)
760    }
761
762    /// Read bytes from stream_buffer first, then from actual stream.
763    async fn read_from_stream(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
764        // First drain from stream_buffer (over-read bytes from header parsing)
765        if !self.stream_buffer.is_empty() {
766            let to_copy = std::cmp::min(buf.len(), self.stream_buffer.len());
767            buf[..to_copy].copy_from_slice(&self.stream_buffer[..to_copy]);
768            self.stream_buffer.drain(..to_copy);
769            return Ok(to_copy);
770        }
771
772        let stream = self
773            .stream
774            .as_mut()
775            .ok_or_else(|| Error::StreamDisconnected {
776                reason: "Not connected".to_string(),
777            })?;
778
779        stream.read(buf).await.map_err(|e| {
780            self.stream = None;
781            Error::NetworkError { source: e }
782        })
783    }
784
785    /// Read exactly one byte from stream_buffer or stream.
786    async fn read_one_byte(&mut self) -> Result<u8, Error> {
787        // First drain from stream_buffer
788        if !self.stream_buffer.is_empty() {
789            return Ok(self.stream_buffer.remove(0));
790        }
791
792        let stream = self
793            .stream
794            .as_mut()
795            .ok_or_else(|| Error::StreamDisconnected {
796                reason: "Not connected".to_string(),
797            })?;
798
799        let mut byte = [0u8; 1];
800        stream.read_exact(&mut byte).await.map_err(|e| {
801            self.stream = None;
802            Error::NetworkError { source: e }
803        })?;
804        Ok(byte[0])
805    }
806
807    /// Read and validate CRLF after chunk data.
808    async fn read_and_validate_crlf(&mut self) -> Result<(), Error> {
809        let cr = self.read_one_byte().await?;
810        let lf = self.read_one_byte().await?;
811        if cr != b'\r' || lf != b'\n' {
812            self.stream = None;
813            return Err(Error::NetworkError {
814                source: std::io::Error::new(
815                    std::io::ErrorKind::InvalidData,
816                    "Expected CRLF after chunk data",
817                ),
818            });
819        }
820        Ok(())
821    }
822}
823
824impl Drop for NtripClient {
825    fn drop(&mut self) {
826        self.disconnect();
827    }
828}