ntrip_core/
client.rs

1//! NTRIP HTTP client for connecting to casters and streaming corrections.
2//!
3//! Supports both NTRIP v1 (legacy ICY protocol) and NTRIP v2 (HTTP/1.1 with
4//! chunked transfer encoding).
5//!
6//! Features:
7//! - Raw TCP socket connection (plain or TLS-encrypted)
8//! - NTRIP v1: HTTP/1.0, ICY 200 OK response
9//! - NTRIP v2: HTTP/1.1, chunked transfer encoding
10//! - Auto-detection of protocol version from server response
11//! - Basic Authentication
12//! - TLS/HTTPS support for secure connections
13//! - HTTP proxy support via CONNECT tunneling
14//! - GGA position reporting
15
16use std::time::Duration;
17use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
18use tokio::net::TcpStream;
19use tracing::{debug, error, info, warn};
20
21use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
22
23use crate::config::{NtripConfig, NtripVersion, ProxyConfig};
24use crate::gga::GgaSentence;
25use crate::sourcetable::Sourcetable;
26use crate::stream::NtripStream;
27use crate::Error;
28
29/// User-Agent string sent in HTTP requests.
30const USER_AGENT: &str = concat!("NTRIP ntrip-core/", env!("CARGO_PKG_VERSION"));
31
32/// Detected protocol version after connection.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34enum DetectedProtocol {
35    /// NTRIP v1 - raw binary stream
36    V1,
37    /// NTRIP v2 - chunked transfer encoding
38    V2Chunked,
39}
40
41/// NTRIP client for streaming corrections from a caster.
42/// Supports both v1 (ICY) and v2 (HTTP/1.1) protocols.
43pub struct NtripClient {
44    /// Configuration
45    config: NtripConfig,
46    /// Active stream (plain TCP or TLS, if connected)
47    stream: Option<NtripStream>,
48    /// Detected protocol after connection
49    protocol: Option<DetectedProtocol>,
50    /// Buffer for raw over-read bytes from header parsing (read before socket)
51    stream_buffer: Vec<u8>,
52    /// Buffer for decoded chunk data that exceeded caller's buffer size (v2 chunked mode)
53    chunk_buffer: Vec<u8>,
54    /// Remaining bytes in current chunk (for v2 chunked mode)
55    chunk_remaining: usize,
56    /// Last GGA sent (for automatic resend after reconnection)
57    last_gga: Option<GgaSentence>,
58}
59
60impl NtripClient {
61    /// Create a new NTRIP client with the given configuration.
62    pub fn new(config: NtripConfig) -> Result<Self, Error> {
63        config.validate()?;
64        Ok(Self {
65            config,
66            stream: None,
67            protocol: None,
68            stream_buffer: Vec::new(),
69            chunk_buffer: Vec::new(),
70            chunk_remaining: 0,
71            last_gga: None,
72        })
73    }
74
75    /// Connect to the NTRIP caster and start streaming.
76    ///
77    /// For NTRIP v2, consider using `connect_with_gga()` to send an initial
78    /// position in the request headers for faster VRS/nearest-base selection.
79    pub async fn connect(&mut self) -> Result<(), Error> {
80        self.connect_with_gga(None).await
81    }
82
83    /// Establish a TCP connection, optionally through an HTTP proxy.
84    ///
85    /// If a proxy is configured, this connects to the proxy and uses HTTP CONNECT
86    /// to establish a tunnel to the target host:port.
87    async fn establish_tcp_connection(
88        target_host: &str,
89        target_port: u16,
90        proxy: Option<&ProxyConfig>,
91        timeout_secs: u32,
92    ) -> Result<TcpStream, Error> {
93        let timeout = Duration::from_secs(timeout_secs as u64);
94
95        match proxy {
96            Some(proxy_config) => {
97                let proxy_addr = format!("{}:{}", proxy_config.host, proxy_config.port);
98                info!(
99                    proxy = %proxy_addr,
100                    target = %format!("{}:{}", target_host, target_port),
101                    "Connecting via HTTP proxy"
102                );
103
104                // Connect to proxy
105                let mut stream =
106                    match tokio::time::timeout(timeout, TcpStream::connect(&proxy_addr)).await {
107                        Ok(Ok(s)) => s,
108                        Ok(Err(e)) => {
109                            return Err(Error::ProxyError {
110                                message: format!(
111                                    "Failed to connect to proxy {}:{}: {}",
112                                    proxy_config.host, proxy_config.port, e
113                                ),
114                            })
115                        }
116                        Err(_) => return Err(Error::Timeout { timeout_secs }),
117                    };
118
119                // Build HTTP CONNECT request
120                let mut connect_request = format!(
121                    "CONNECT {}:{} HTTP/1.1\r\nHost: {}:{}\r\n",
122                    target_host, target_port, target_host, target_port
123                );
124
125                // Add proxy authentication if configured
126                if let (Some(user), Some(pass)) = (&proxy_config.username, &proxy_config.password) {
127                    let credentials = format!("{}:{}", user, pass);
128                    let encoded = BASE64.encode(credentials);
129                    connect_request
130                        .push_str(&format!("Proxy-Authorization: Basic {}\r\n", encoded));
131                }
132
133                connect_request.push_str("\r\n");
134
135                debug!("Sending HTTP CONNECT request to proxy");
136
137                // Send CONNECT request
138                stream
139                    .write_all(connect_request.as_bytes())
140                    .await
141                    .map_err(|e| Error::ProxyError {
142                        message: format!("Failed to send CONNECT request: {}", e),
143                    })?;
144
145                // Read response (first line should be "HTTP/1.x 200 ...")
146                let mut reader = BufReader::new(&mut stream);
147                let mut response_line = String::new();
148
149                match tokio::time::timeout(timeout, reader.read_line(&mut response_line)).await {
150                    Ok(Ok(0)) => {
151                        return Err(Error::ProxyError {
152                            message: "Proxy closed connection without response".to_string(),
153                        })
154                    }
155                    Ok(Ok(_)) => {}
156                    Ok(Err(e)) => {
157                        return Err(Error::ProxyError {
158                            message: format!("Failed to read proxy response: {}", e),
159                        })
160                    }
161                    Err(_) => return Err(Error::Timeout { timeout_secs }),
162                }
163
164                // Check for 200 response
165                if !response_line.contains("200") {
166                    return Err(Error::ProxyError {
167                        message: format!("Proxy CONNECT failed: {}", response_line.trim()),
168                    });
169                }
170
171                debug!(response = %response_line.trim(), "Proxy tunnel established");
172
173                // Consume remaining headers until empty line
174                loop {
175                    let mut header_line = String::new();
176                    match tokio::time::timeout(timeout, reader.read_line(&mut header_line)).await {
177                        Ok(Ok(0)) => break,
178                        Ok(Ok(_)) => {
179                            if header_line.trim().is_empty() {
180                                break;
181                            }
182                        }
183                        Ok(Err(e)) => {
184                            return Err(Error::ProxyError {
185                                message: format!("Failed to read proxy headers: {}", e),
186                            })
187                        }
188                        Err(_) => return Err(Error::Timeout { timeout_secs }),
189                    }
190                }
191
192                info!("HTTP proxy tunnel established successfully");
193                Ok(stream)
194            }
195            None => {
196                // Direct connection (no proxy)
197                let addr = format!("{}:{}", target_host, target_port);
198                match tokio::time::timeout(timeout, TcpStream::connect(&addr)).await {
199                    Ok(Ok(s)) => Ok(s),
200                    Ok(Err(e)) => Err(Error::connection_failed(target_host, target_port, e)),
201                    Err(_) => Err(Error::Timeout { timeout_secs }),
202                }
203            }
204        }
205    }
206
207    /// Connect to the NTRIP caster with an optional initial GGA position.
208    ///
209    /// For NTRIP v2, the GGA sentence is sent in the `Ntrip-GGA` header,
210    /// allowing the caster to select the appropriate VRS or nearest base
211    /// immediately, without waiting for a post-connection GGA report.
212    ///
213    /// For NTRIP v1, the initial_gga is ignored (GGA must be sent post-connection).
214    pub async fn connect_with_gga(
215        &mut self,
216        initial_gga: Option<&GgaSentence>,
217    ) -> Result<(), Error> {
218        let host = &self.config.host;
219        let port = self.config.port;
220        let addr = format!("{}:{}", host, port);
221
222        info!(addr = %addr, "Connecting to NTRIP caster");
223
224        // 1. Establish TCP connection (directly or via proxy)
225        let tcp_stream = Self::establish_tcp_connection(
226            host,
227            port,
228            self.config.proxy.as_ref(),
229            self.config.connection.timeout_secs,
230        )
231        .await?;
232
233        // 2. Optionally upgrade to TLS
234        let mut stream: NtripStream = if self.config.use_tls {
235            if self.config.tls_skip_verify {
236                warn!("TLS certificate verification is disabled - connection is not fully secure");
237            }
238            NtripStream::connect_tls(tcp_stream, host, self.config.tls_skip_verify).await?
239        } else {
240            NtripStream::plain(tcp_stream)
241        };
242
243        // 3. Construct HTTP Request based on configured version
244        let mountpoint = &self.config.mountpoint;
245
246        let auth_header =
247            if let (Some(user), Some(pass)) = (&self.config.username, &self.config.password) {
248                let credentials = format!("{}:{}", user, pass);
249                let encoded = BASE64.encode(credentials);
250                format!("Authorization: Basic {}\r\n", encoded)
251            } else {
252                String::new()
253            };
254
255        // Build Ntrip-GGA header for v2 if initial position provided
256        let gga_header = match (&self.config.ntrip_version, initial_gga) {
257            (NtripVersion::V2, Some(gga)) | (NtripVersion::Auto, Some(gga)) => {
258                let nmea = gga.to_nmea();
259                let nmea_trimmed = nmea.trim();
260                debug!(gga = %nmea_trimmed, "Including initial GGA in request header");
261                format!("Ntrip-GGA: {}\r\n", nmea_trimmed)
262            }
263            _ => String::new(),
264        };
265
266        // Store initial GGA for potential reconnection resend (fulfills documented behavior)
267        if let Some(gga) = initial_gga {
268            self.last_gga = Some(gga.clone());
269        }
270
271        // Build request based on configured NTRIP version
272        let request = match self.config.ntrip_version {
273            NtripVersion::V1 => {
274                debug!("Using NTRIP v1 protocol");
275                format!(
276                    "GET /{} HTTP/1.0\r\n\
277                     User-Agent: {}\r\n\
278                     Host: {}:{}\r\n\
279                     Accept: */*\r\n\
280                     Connection: close\r\n\
281                     {}\r\n",
282                    mountpoint, USER_AGENT, host, port, auth_header
283                )
284            }
285            NtripVersion::V2 | NtripVersion::Auto => {
286                debug!("Using NTRIP v2 protocol request");
287                format!(
288                    "GET /{} HTTP/1.1\r\n\
289                     Host: {}:{}\r\n\
290                     User-Agent: {}\r\n\
291                     Ntrip-Version: Ntrip/2.0\r\n\
292                     {}\
293                     Accept: */*\r\n\
294                     Connection: close\r\n\
295                     {}\r\n",
296                    mountpoint, host, port, USER_AGENT, gga_header, auth_header
297                )
298            }
299        };
300
301        // Log request with Authorization header redacted for security
302        let redacted_request = if auth_header.is_empty() {
303            request.clone()
304        } else {
305            request.replace(&auth_header, "Authorization: Basic [REDACTED]\r\n")
306        };
307        debug!(request = %redacted_request, "Sending NTRIP request");
308
309        // 4. Send Request
310        if let Err(e) = stream.write_all(request.as_bytes()).await {
311            return Err(Error::NetworkError { source: e });
312        }
313
314        // 5. Read Response Headers (buffered for efficiency)
315        const MAX_HEADER_SIZE: usize = 4096;
316        let mut headers = Vec::with_capacity(512);
317        let mut buffer = [0u8; 512];
318
319        let header_end_found = loop {
320            if headers.len() >= MAX_HEADER_SIZE {
321                break false;
322            }
323
324            // Read a chunk of data
325            let bytes_to_read = std::cmp::min(buffer.len(), MAX_HEADER_SIZE - headers.len());
326            match stream.read(&mut buffer[..bytes_to_read]).await {
327                Ok(0) => break false, // Connection closed
328                Ok(n) => {
329                    headers.extend_from_slice(&buffer[..n]);
330
331                    // Check for standard HTTP header end
332                    if headers.len() >= 4 {
333                        if let Some(pos) = headers.windows(4).position(|w| w == b"\r\n\r\n") {
334                            // Preserve any over-read bytes (data after headers) in stream_buffer
335                            let body_start = pos + 4;
336                            if headers.len() > body_start {
337                                self.stream_buffer.extend_from_slice(&headers[body_start..]);
338                            }
339                            // Trim headers to just before the body
340                            headers.truncate(body_start);
341                            break true;
342                        }
343                    }
344
345                    // Check for legacy ICY response (NTRIP v1)
346                    if headers.len() >= 12 && headers.starts_with(b"ICY 200 OK\r\n") {
347                        debug!("Detected legacy ICY 200 OK response, starting stream");
348                        // For ICY, preserve any bytes after "ICY 200 OK\r\n" in stream_buffer
349                        let icy_header_len = 12; // b"ICY 200 OK\r\n".len()
350                        if headers.len() > icy_header_len {
351                            self.stream_buffer
352                                .extend_from_slice(&headers[icy_header_len..]);
353                        }
354                        headers.truncate(icy_header_len);
355                        break true;
356                    }
357                }
358                Err(e) => return Err(Error::NetworkError { source: e }),
359            }
360        };
361
362        if !header_end_found {
363            return Err(Error::NetworkError {
364                source: std::io::Error::new(
365                    std::io::ErrorKind::InvalidData,
366                    "Header too large or incomplete",
367                ),
368            });
369        }
370
371        // 6. Parse Status Line and detect protocol version
372        let response = String::from_utf8_lossy(&headers);
373        let status_line = response.lines().next().unwrap_or("");
374
375        debug!(status_line = %status_line, "Received NTRIP response");
376
377        if status_line.starts_with("ICY 200") || status_line.contains("200 OK") {
378            let detected = self.detect_protocol(&response, status_line);
379            info!(
380                mountpoint = %mountpoint,
381                protocol = ?detected,
382                "Connected to NTRIP caster"
383            );
384
385            self.stream = Some(stream);
386            self.protocol = Some(detected);
387            // Note: chunk_buffer may contain over-read bytes from header parsing - do not clear
388            self.chunk_remaining = 0;
389            Ok(())
390        } else if status_line.contains("401 Unauthorized") {
391            error!("NTRIP authentication failed");
392            Err(Error::AuthenticationFailed {
393                host: host.clone(),
394                username: self.config.username.clone().unwrap_or_default(),
395            })
396        } else if status_line.contains("404 Not Found") {
397            error!(mountpoint = %mountpoint, "Mountpoint not found");
398            Err(Error::MountpointNotFound {
399                host: host.clone(),
400                mountpoint: mountpoint.clone(),
401            })
402        } else {
403            error!(status = %status_line, "NTRIP caster error");
404            Err(Error::HttpError {
405                host: host.clone(),
406                status_code: Self::parse_status_code(status_line),
407                reason: status_line.to_string(),
408            })
409        }
410    }
411
412    /// Read a chunk of RTCM data from the stream with timeout.
413    /// Automatically handles chunked transfer encoding for NTRIP v2.
414    ///
415    /// # Reconnection Behavior
416    ///
417    /// If automatic reconnection is enabled (default: 3 attempts), this method
418    /// will attempt to reconnect on timeout or disconnect errors before returning
419    /// an error to the caller.
420    ///
421    /// **Important:** When reconnection is enabled, this method may block for up to
422    /// `(max_reconnect_attempts × reconnect_delay_ms) + connection_timeout` milliseconds
423    /// while attempting to restore the connection. During this time:
424    /// - The original error (timeout/disconnect) is not immediately returned
425    /// - If a previous GGA position was set (via `send_gga()` or `connect_with_gga()`),
426    ///   it will be automatically resent after successful reconnection
427    ///
428    /// To disable this behavior, use `config.without_reconnect()`.
429    pub async fn read_chunk(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
430        let max_attempts = self.config.connection.max_reconnect_attempts;
431        let reconnect_delay = self.config.connection.reconnect_delay_ms;
432        let mut attempts = 0;
433
434        loop {
435            // Ensure we're connected
436            if self.stream.is_none() {
437                if attempts > 0 && attempts <= max_attempts {
438                    // Attempting reconnection
439                    warn!(
440                        attempt = attempts,
441                        max_attempts = max_attempts,
442                        "Attempting automatic reconnection"
443                    );
444
445                    // Delay before reconnect
446                    tokio::time::sleep(Duration::from_millis(reconnect_delay)).await;
447
448                    // Clone GGA before mutable borrow for reconnection
449                    let gga_clone = self.last_gga.clone();
450                    match self.connect_with_gga(gga_clone.as_ref()).await {
451                        Ok(()) => {
452                            info!(attempt = attempts, "Reconnection successful");
453                            // Resend last GGA if we have one (for v1 protocol)
454                            if let Some(ref gga) = gga_clone {
455                                if let Err(e) = self.send_gga_internal(gga).await {
456                                    warn!(error = %e, "Failed to resend GGA after reconnect");
457                                }
458                            }
459                        }
460                        Err(e) => {
461                            warn!(attempt = attempts, error = %e, "Reconnection attempt failed");
462                            attempts += 1;
463                            continue;
464                        }
465                    }
466                } else if attempts > max_attempts {
467                    // Exhausted all reconnection attempts
468                    return Err(Error::StreamDisconnected {
469                        reason: format!(
470                            "Connection lost after {} reconnection attempts",
471                            max_attempts
472                        ),
473                    });
474                } else {
475                    // First attempt, not connected yet
476                    return Err(Error::StreamDisconnected {
477                        reason: "Not connected".to_string(),
478                    });
479                }
480            }
481
482            let read_timeout_secs = self.config.connection.read_timeout_secs;
483
484            // If read_timeout_secs == 0, bypass timeout (as documented: 0 = no timeout)
485            let read_result = if read_timeout_secs == 0 {
486                self.read_raw_or_chunked(buf).await.map(Some)
487            } else {
488                let timeout_duration = Duration::from_secs(read_timeout_secs as u64);
489                match tokio::time::timeout(timeout_duration, self.read_raw_or_chunked(buf)).await {
490                    Ok(result) => result.map(Some),
491                    Err(_) => Ok(None), // Timeout occurred
492                }
493            };
494
495            match read_result {
496                Ok(Some(n)) => {
497                    debug!(bytes = n, "Received data");
498                    return Ok(n);
499                }
500                Ok(None) => {
501                    // Timeout - potentially recoverable
502                    self.stream = None;
503                    self.reset_chunk_state();
504                    if max_attempts > 0 {
505                        attempts += 1;
506                        continue;
507                    }
508                    return Err(Error::ReadTimeout {
509                        timeout_secs: read_timeout_secs,
510                    });
511                }
512                Err(e) => {
513                    // Check if this is a recoverable error
514                    if max_attempts > 0 && Self::is_recoverable_error(&e) {
515                        attempts += 1;
516                        self.stream = None;
517                        self.reset_chunk_state();
518                        continue;
519                    }
520                    return Err(e);
521                }
522            }
523        }
524    }
525
526    /// Check if an error is potentially recoverable via reconnection.
527    fn is_recoverable_error(error: &Error) -> bool {
528        matches!(
529            error,
530            Error::StreamDisconnected { .. }
531                | Error::NetworkError { .. }
532                | Error::ReadTimeout { .. }
533        )
534    }
535
536    /// Reset chunked decoding state (used after reconnection).
537    fn reset_chunk_state(&mut self) {
538        self.stream_buffer.clear();
539        self.chunk_buffer.clear();
540        self.chunk_remaining = 0;
541    }
542
543    /// Internal GGA send that doesn't update last_gga (used for reconnection resend).
544    async fn send_gga_internal(&mut self, gga: &GgaSentence) -> Result<(), Error> {
545        let stream = self.stream.as_mut().ok_or(Error::StreamDisconnected {
546            reason: "Not connected".into(),
547        })?;
548
549        let nmea = gga.to_nmea();
550        debug!(gga = %nmea.trim(), "Resending GGA after reconnection");
551
552        stream
553            .write_all(nmea.as_bytes())
554            .await
555            .map_err(|e| Error::NetworkError { source: e })?;
556
557        Ok(())
558    }
559
560    /// Check if connected.
561    pub fn is_connected(&self) -> bool {
562        self.stream.is_some()
563    }
564
565    /// Disconnect from the caster.
566    pub fn disconnect(&mut self) {
567        if self.stream.take().is_some() {
568            debug!("Disconnected from NTRIP caster");
569        }
570    }
571
572    /// Send a GGA position report to the caster on the existing stream.
573    pub async fn send_gga(&mut self, gga: &GgaSentence) -> Result<(), Error> {
574        let stream = self.stream.as_mut().ok_or(Error::StreamDisconnected {
575            reason: "Not connected".into(),
576        })?;
577
578        let nmea = gga.to_nmea();
579        debug!(gga = %nmea.trim(), "Sending GGA to caster");
580
581        stream
582            .write_all(nmea.as_bytes())
583            .await
584            .map_err(|e| Error::NetworkError { source: e })?;
585
586        // Store for potential reconnection
587        self.last_gga = Some(gga.clone());
588
589        Ok(())
590    }
591
592    /// Get the configuration.
593    pub fn config(&self) -> &NtripConfig {
594        &self.config
595    }
596
597    /// Fetch the sourcetable from the NTRIP caster.
598    ///
599    /// This is a one-shot operation that connects, retrieves the sourcetable,
600    /// and disconnects. It does not affect the current streaming connection.
601    pub async fn get_sourcetable(config: &NtripConfig) -> Result<Sourcetable, Error> {
602        config.validate()?;
603        let host = &config.host;
604        let port = config.port;
605        let addr = format!("{}:{}", host, port);
606
607        info!(addr = %addr, "Fetching sourcetable from NTRIP caster");
608
609        // 1. Establish TCP connection (directly or via proxy)
610        let tcp_stream = Self::establish_tcp_connection(
611            host,
612            port,
613            config.proxy.as_ref(),
614            config.connection.timeout_secs,
615        )
616        .await?;
617
618        // 2. Optionally upgrade to TLS
619        let mut stream: NtripStream = if config.use_tls {
620            NtripStream::connect_tls(tcp_stream, host, config.tls_skip_verify).await?
621        } else {
622            NtripStream::plain(tcp_stream)
623        };
624
625        // 3. Send sourcetable request
626        let auth_header = if let (Some(user), Some(pass)) = (&config.username, &config.password) {
627            let credentials = format!("{}:{}", user, pass);
628            let encoded = BASE64.encode(credentials);
629            format!("Authorization: Basic {}\r\n", encoded)
630        } else {
631            String::new()
632        };
633
634        let request = format!(
635            "GET / HTTP/1.0\r\n\
636             User-Agent: {}\r\n\
637             Host: {}:{}\r\n\
638             Accept: */*\r\n\
639             Connection: close\r\n\
640             {}\r\n",
641            USER_AGENT, host, port, auth_header
642        );
643
644        // Log request with Authorization header redacted for security
645        let redacted_request = if auth_header.is_empty() {
646            request.clone()
647        } else {
648            request.replace(&auth_header, "Authorization: Basic [REDACTED]\r\n")
649        };
650        debug!(request = %redacted_request, "Sending sourcetable request");
651
652        if let Err(e) = stream.write_all(request.as_bytes()).await {
653            return Err(Error::NetworkError { source: e });
654        }
655
656        // 4. Read entire response
657        let mut response = Vec::new();
658        let mut buf = [0u8; 4096];
659
660        loop {
661            match tokio::time::timeout(
662                Duration::from_secs(config.connection.timeout_secs as u64),
663                stream.read(&mut buf),
664            )
665            .await
666            {
667                Ok(Ok(0)) => break,
668                Ok(Ok(n)) => response.extend_from_slice(&buf[..n]),
669                Ok(Err(e)) => return Err(Error::NetworkError { source: e }),
670                Err(_) => {
671                    return Err(Error::Timeout {
672                        timeout_secs: config.connection.timeout_secs,
673                    })
674                }
675            }
676
677            if response.len() > 1_000_000 {
678                return Err(Error::SourcetableParseError {
679                    message: "Sourcetable too large (>1MB)".to_string(),
680                });
681            }
682        }
683
684        // 5. Parse response
685        let response_str = String::from_utf8_lossy(&response);
686
687        let first_line = response_str.lines().next().unwrap_or("");
688        if !first_line.contains("200") {
689            return Err(Error::HttpError {
690                host: host.clone(),
691                status_code: Self::parse_status_code(first_line),
692                reason: first_line.to_string(),
693            });
694        }
695
696        let body = if let Some(idx) = response_str.find("\r\n\r\n") {
697            &response_str[idx + 4..]
698        } else {
699            &response_str[..]
700        };
701
702        let table = Sourcetable::parse(body);
703
704        info!(
705            streams = table.streams.len(),
706            casters = table.casters.len(),
707            networks = table.networks.len(),
708            "Parsed sourcetable"
709        );
710
711        Ok(table)
712    }
713
714    /// Parse HTTP status code from status line (e.g., "HTTP/1.1 401 Unauthorized" -> 401).
715    fn parse_status_code(status_line: &str) -> u16 {
716        status_line
717            .split_whitespace()
718            .nth(1)
719            .and_then(|s| s.parse().ok())
720            .unwrap_or(0)
721    }
722
723    /// Detect protocol version from server response headers.
724    fn detect_protocol(&self, response: &str, status_line: &str) -> DetectedProtocol {
725        if status_line.starts_with("ICY") {
726            debug!("Detected NTRIP v1 (ICY response)");
727            return DetectedProtocol::V1;
728        }
729
730        let response_lower = response.to_lowercase();
731        if response_lower.contains("transfer-encoding: chunked") {
732            debug!("Detected NTRIP v2 (chunked transfer encoding)");
733            return DetectedProtocol::V2Chunked;
734        }
735
736        if response_lower.contains("ntrip-version:") {
737            debug!("Server indicates NTRIP v2 but response is not chunked; treating as raw stream");
738            return DetectedProtocol::V1;
739        }
740
741        debug!("Defaulting to NTRIP v1 protocol (raw stream)");
742        DetectedProtocol::V1
743    }
744
745    /// Read a chunk of data, handling chunked transfer encoding for v2.
746    async fn read_raw_or_chunked(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
747        let protocol = self.protocol.unwrap_or(DetectedProtocol::V1);
748
749        match protocol {
750            DetectedProtocol::V1 => self.read_raw(buf).await,
751            DetectedProtocol::V2Chunked => self.read_chunked(buf).await,
752        }
753    }
754
755    /// Read raw data from stream (v1 protocol).
756    async fn read_raw(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
757        // First drain any over-read bytes from header parsing
758        if !self.stream_buffer.is_empty() {
759            let to_copy = std::cmp::min(buf.len(), self.stream_buffer.len());
760            buf[..to_copy].copy_from_slice(&self.stream_buffer[..to_copy]);
761            self.stream_buffer.drain(..to_copy);
762            return Ok(to_copy);
763        }
764
765        let stream = self
766            .stream
767            .as_mut()
768            .ok_or_else(|| Error::StreamDisconnected {
769                reason: "Not connected".to_string(),
770            })?;
771
772        match stream.read(buf).await {
773            Ok(0) => {
774                self.stream = None;
775                Err(Error::StreamDisconnected {
776                    reason: "Server closed connection".to_string(),
777                })
778            }
779            Ok(n) => Ok(n),
780            Err(e) => {
781                self.stream = None;
782                Err(Error::NetworkError { source: e })
783            }
784        }
785    }
786
787    /// Read chunked transfer encoded data (v2 protocol).
788    async fn read_chunked(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
789        // First return any already-decoded chunk data
790        if !self.chunk_buffer.is_empty() {
791            let to_copy = std::cmp::min(buf.len(), self.chunk_buffer.len());
792            buf[..to_copy].copy_from_slice(&self.chunk_buffer[..to_copy]);
793            self.chunk_buffer.drain(..to_copy);
794            return Ok(to_copy);
795        }
796
797        // If we have remaining bytes in current chunk, read them
798        if self.chunk_remaining > 0 {
799            let to_read = std::cmp::min(buf.len(), self.chunk_remaining);
800            let n = self.read_from_stream(&mut buf[..to_read]).await?;
801            if n == 0 {
802                self.stream = None;
803                return Err(Error::StreamDisconnected {
804                    reason: "Server closed connection".to_string(),
805                });
806            }
807            self.chunk_remaining -= n;
808            if self.chunk_remaining == 0 {
809                // Consume trailing CRLF after chunk data
810                self.read_and_validate_crlf().await?;
811            }
812            return Ok(n);
813        }
814
815        // Read chunk size line
816        let mut size_line = Vec::new();
817        loop {
818            let byte = self.read_one_byte().await?;
819            if byte == b'\n' {
820                break;
821            }
822            if byte != b'\r' {
823                size_line.push(byte);
824            }
825            if size_line.len() > 16 {
826                return Err(Error::NetworkError {
827                    source: std::io::Error::new(
828                        std::io::ErrorKind::InvalidData,
829                        "Chunk size line too long",
830                    ),
831                });
832            }
833        }
834
835        let size_str = String::from_utf8_lossy(&size_line);
836        // Handle chunk extensions (RFC 7230): parse only up to ';' and ignore extensions
837        let size_part = size_str.trim().split(';').next().unwrap_or("");
838        let chunk_size =
839            usize::from_str_radix(size_part.trim(), 16).map_err(|_| Error::NetworkError {
840                source: std::io::Error::new(
841                    std::io::ErrorKind::InvalidData,
842                    format!("Invalid chunk size: {}", size_str),
843                ),
844            })?;
845
846        if chunk_size == 0 {
847            self.stream = None;
848            return Err(Error::StreamDisconnected {
849                reason: "Chunked stream ended".to_string(),
850            });
851        }
852
853        debug!(chunk_size = chunk_size, "Reading chunked data");
854
855        self.chunk_remaining = chunk_size;
856        let to_read = std::cmp::min(buf.len(), self.chunk_remaining);
857
858        let n = self.read_from_stream(&mut buf[..to_read]).await?;
859        if n == 0 {
860            self.stream = None;
861            return Err(Error::StreamDisconnected {
862                reason: "Server closed connection".to_string(),
863            });
864        }
865        self.chunk_remaining -= n;
866        if self.chunk_remaining == 0 {
867            // Consume trailing CRLF after chunk data
868            self.read_and_validate_crlf().await?;
869        }
870        Ok(n)
871    }
872
873    /// Read bytes from stream_buffer first, then from actual stream.
874    async fn read_from_stream(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
875        // First drain from stream_buffer (over-read bytes from header parsing)
876        if !self.stream_buffer.is_empty() {
877            let to_copy = std::cmp::min(buf.len(), self.stream_buffer.len());
878            buf[..to_copy].copy_from_slice(&self.stream_buffer[..to_copy]);
879            self.stream_buffer.drain(..to_copy);
880            return Ok(to_copy);
881        }
882
883        let stream = self
884            .stream
885            .as_mut()
886            .ok_or_else(|| Error::StreamDisconnected {
887                reason: "Not connected".to_string(),
888            })?;
889
890        stream.read(buf).await.map_err(|e| {
891            self.stream = None;
892            Error::NetworkError { source: e }
893        })
894    }
895
896    /// Read exactly one byte from stream_buffer or stream.
897    async fn read_one_byte(&mut self) -> Result<u8, Error> {
898        // First drain from stream_buffer
899        if !self.stream_buffer.is_empty() {
900            return Ok(self.stream_buffer.remove(0));
901        }
902
903        let stream = self
904            .stream
905            .as_mut()
906            .ok_or_else(|| Error::StreamDisconnected {
907                reason: "Not connected".to_string(),
908            })?;
909
910        let mut byte = [0u8; 1];
911        stream.read_exact(&mut byte).await.map_err(|e| {
912            self.stream = None;
913            Error::NetworkError { source: e }
914        })?;
915        Ok(byte[0])
916    }
917
918    /// Read and validate CRLF after chunk data.
919    async fn read_and_validate_crlf(&mut self) -> Result<(), Error> {
920        let cr = self.read_one_byte().await?;
921        let lf = self.read_one_byte().await?;
922        if cr != b'\r' || lf != b'\n' {
923            self.stream = None;
924            return Err(Error::NetworkError {
925                source: std::io::Error::new(
926                    std::io::ErrorKind::InvalidData,
927                    "Expected CRLF after chunk data",
928                ),
929            });
930        }
931        Ok(())
932    }
933}
934
935impl Drop for NtripClient {
936    fn drop(&mut self) {
937        self.disconnect();
938    }
939}