1use std::time::Duration;
17use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
18use tokio::net::TcpStream;
19use tracing::{debug, error, info, warn};
20
21use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
22
23use crate::config::{NtripConfig, NtripVersion, ProxyConfig};
24use crate::gga::GgaSentence;
25use crate::sourcetable::Sourcetable;
26use crate::stream::NtripStream;
27use crate::Error;
28
29const USER_AGENT: &str = concat!("NTRIP ntrip-core/", env!("CARGO_PKG_VERSION"));
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34enum DetectedProtocol {
35 V1,
37 V2Chunked,
39}
40
41pub struct NtripClient {
44 config: NtripConfig,
46 stream: Option<NtripStream>,
48 protocol: Option<DetectedProtocol>,
50 stream_buffer: Vec<u8>,
52 chunk_buffer: Vec<u8>,
54 chunk_remaining: usize,
56 last_gga: Option<GgaSentence>,
58}
59
60impl NtripClient {
61 pub fn new(config: NtripConfig) -> Result<Self, Error> {
63 config.validate()?;
64 Ok(Self {
65 config,
66 stream: None,
67 protocol: None,
68 stream_buffer: Vec::new(),
69 chunk_buffer: Vec::new(),
70 chunk_remaining: 0,
71 last_gga: None,
72 })
73 }
74
75 pub async fn connect(&mut self) -> Result<(), Error> {
80 self.connect_with_gga(None).await
81 }
82
83 async fn establish_tcp_connection(
88 target_host: &str,
89 target_port: u16,
90 proxy: Option<&ProxyConfig>,
91 timeout_secs: u32,
92 ) -> Result<TcpStream, Error> {
93 let timeout = Duration::from_secs(timeout_secs as u64);
94
95 match proxy {
96 Some(proxy_config) => {
97 let proxy_addr = format!("{}:{}", proxy_config.host, proxy_config.port);
98 info!(
99 proxy = %proxy_addr,
100 target = %format!("{}:{}", target_host, target_port),
101 "Connecting via HTTP proxy"
102 );
103
104 let mut stream =
106 match tokio::time::timeout(timeout, TcpStream::connect(&proxy_addr)).await {
107 Ok(Ok(s)) => s,
108 Ok(Err(e)) => {
109 return Err(Error::ProxyError {
110 message: format!(
111 "Failed to connect to proxy {}:{}: {}",
112 proxy_config.host, proxy_config.port, e
113 ),
114 })
115 }
116 Err(_) => return Err(Error::Timeout { timeout_secs }),
117 };
118
119 let mut connect_request = format!(
121 "CONNECT {}:{} HTTP/1.1\r\nHost: {}:{}\r\n",
122 target_host, target_port, target_host, target_port
123 );
124
125 if let (Some(user), Some(pass)) = (&proxy_config.username, &proxy_config.password) {
127 let credentials = format!("{}:{}", user, pass);
128 let encoded = BASE64.encode(credentials);
129 connect_request
130 .push_str(&format!("Proxy-Authorization: Basic {}\r\n", encoded));
131 }
132
133 connect_request.push_str("\r\n");
134
135 debug!("Sending HTTP CONNECT request to proxy");
136
137 stream
139 .write_all(connect_request.as_bytes())
140 .await
141 .map_err(|e| Error::ProxyError {
142 message: format!("Failed to send CONNECT request: {}", e),
143 })?;
144
145 let mut reader = BufReader::new(&mut stream);
147 let mut response_line = String::new();
148
149 match tokio::time::timeout(timeout, reader.read_line(&mut response_line)).await {
150 Ok(Ok(0)) => {
151 return Err(Error::ProxyError {
152 message: "Proxy closed connection without response".to_string(),
153 })
154 }
155 Ok(Ok(_)) => {}
156 Ok(Err(e)) => {
157 return Err(Error::ProxyError {
158 message: format!("Failed to read proxy response: {}", e),
159 })
160 }
161 Err(_) => return Err(Error::Timeout { timeout_secs }),
162 }
163
164 if !response_line.contains("200") {
166 return Err(Error::ProxyError {
167 message: format!("Proxy CONNECT failed: {}", response_line.trim()),
168 });
169 }
170
171 debug!(response = %response_line.trim(), "Proxy tunnel established");
172
173 loop {
175 let mut header_line = String::new();
176 match tokio::time::timeout(timeout, reader.read_line(&mut header_line)).await {
177 Ok(Ok(0)) => break,
178 Ok(Ok(_)) => {
179 if header_line.trim().is_empty() {
180 break;
181 }
182 }
183 Ok(Err(e)) => {
184 return Err(Error::ProxyError {
185 message: format!("Failed to read proxy headers: {}", e),
186 })
187 }
188 Err(_) => return Err(Error::Timeout { timeout_secs }),
189 }
190 }
191
192 info!("HTTP proxy tunnel established successfully");
193 Ok(stream)
194 }
195 None => {
196 let addr = format!("{}:{}", target_host, target_port);
198 match tokio::time::timeout(timeout, TcpStream::connect(&addr)).await {
199 Ok(Ok(s)) => Ok(s),
200 Ok(Err(e)) => Err(Error::connection_failed(target_host, target_port, e)),
201 Err(_) => Err(Error::Timeout { timeout_secs }),
202 }
203 }
204 }
205 }
206
207 pub async fn connect_with_gga(
215 &mut self,
216 initial_gga: Option<&GgaSentence>,
217 ) -> Result<(), Error> {
218 let host = &self.config.host;
219 let port = self.config.port;
220 let addr = format!("{}:{}", host, port);
221
222 info!(addr = %addr, "Connecting to NTRIP caster");
223
224 let tcp_stream = Self::establish_tcp_connection(
226 host,
227 port,
228 self.config.proxy.as_ref(),
229 self.config.connection.timeout_secs,
230 )
231 .await?;
232
233 let mut stream: NtripStream = if self.config.use_tls {
235 if self.config.tls_skip_verify {
236 warn!("TLS certificate verification is disabled - connection is not fully secure");
237 }
238 NtripStream::connect_tls(tcp_stream, host, self.config.tls_skip_verify).await?
239 } else {
240 NtripStream::plain(tcp_stream)
241 };
242
243 let mountpoint = &self.config.mountpoint;
245
246 let auth_header =
247 if let (Some(user), Some(pass)) = (&self.config.username, &self.config.password) {
248 let credentials = format!("{}:{}", user, pass);
249 let encoded = BASE64.encode(credentials);
250 format!("Authorization: Basic {}\r\n", encoded)
251 } else {
252 String::new()
253 };
254
255 let gga_header = match (&self.config.ntrip_version, initial_gga) {
257 (NtripVersion::V2, Some(gga)) | (NtripVersion::Auto, Some(gga)) => {
258 let nmea = gga.to_nmea();
259 let nmea_trimmed = nmea.trim();
260 debug!(gga = %nmea_trimmed, "Including initial GGA in request header");
261 format!("Ntrip-GGA: {}\r\n", nmea_trimmed)
262 }
263 _ => String::new(),
264 };
265
266 if let Some(gga) = initial_gga {
268 self.last_gga = Some(gga.clone());
269 }
270
271 let request = match self.config.ntrip_version {
273 NtripVersion::V1 => {
274 debug!("Using NTRIP v1 protocol");
275 format!(
276 "GET /{} HTTP/1.0\r\n\
277 User-Agent: {}\r\n\
278 Host: {}:{}\r\n\
279 Accept: */*\r\n\
280 Connection: close\r\n\
281 {}\r\n",
282 mountpoint, USER_AGENT, host, port, auth_header
283 )
284 }
285 NtripVersion::V2 | NtripVersion::Auto => {
286 debug!("Using NTRIP v2 protocol request");
287 format!(
288 "GET /{} HTTP/1.1\r\n\
289 Host: {}:{}\r\n\
290 User-Agent: {}\r\n\
291 Ntrip-Version: Ntrip/2.0\r\n\
292 {}\
293 Accept: */*\r\n\
294 Connection: close\r\n\
295 {}\r\n",
296 mountpoint, host, port, USER_AGENT, gga_header, auth_header
297 )
298 }
299 };
300
301 let redacted_request = if auth_header.is_empty() {
303 request.clone()
304 } else {
305 request.replace(&auth_header, "Authorization: Basic [REDACTED]\r\n")
306 };
307 debug!(request = %redacted_request, "Sending NTRIP request");
308
309 if let Err(e) = stream.write_all(request.as_bytes()).await {
311 return Err(Error::NetworkError { source: e });
312 }
313
314 const MAX_HEADER_SIZE: usize = 4096;
316 let mut headers = Vec::with_capacity(512);
317 let mut buffer = [0u8; 512];
318
319 let header_end_found = loop {
320 if headers.len() >= MAX_HEADER_SIZE {
321 break false;
322 }
323
324 let bytes_to_read = std::cmp::min(buffer.len(), MAX_HEADER_SIZE - headers.len());
326 match stream.read(&mut buffer[..bytes_to_read]).await {
327 Ok(0) => break false, Ok(n) => {
329 headers.extend_from_slice(&buffer[..n]);
330
331 if headers.len() >= 4 {
333 if let Some(pos) = headers.windows(4).position(|w| w == b"\r\n\r\n") {
334 let body_start = pos + 4;
336 if headers.len() > body_start {
337 self.stream_buffer.extend_from_slice(&headers[body_start..]);
338 }
339 headers.truncate(body_start);
341 break true;
342 }
343 }
344
345 if headers.len() >= 12 && headers.starts_with(b"ICY 200 OK\r\n") {
347 debug!("Detected legacy ICY 200 OK response, starting stream");
348 let icy_header_len = 12; if headers.len() > icy_header_len {
351 self.stream_buffer
352 .extend_from_slice(&headers[icy_header_len..]);
353 }
354 headers.truncate(icy_header_len);
355 break true;
356 }
357 }
358 Err(e) => return Err(Error::NetworkError { source: e }),
359 }
360 };
361
362 if !header_end_found {
363 return Err(Error::NetworkError {
364 source: std::io::Error::new(
365 std::io::ErrorKind::InvalidData,
366 "Header too large or incomplete",
367 ),
368 });
369 }
370
371 let response = String::from_utf8_lossy(&headers);
373 let status_line = response.lines().next().unwrap_or("");
374
375 debug!(status_line = %status_line, "Received NTRIP response");
376
377 if status_line.starts_with("ICY 200") || status_line.contains("200 OK") {
378 let detected = self.detect_protocol(&response, status_line);
379 info!(
380 mountpoint = %mountpoint,
381 protocol = ?detected,
382 "Connected to NTRIP caster"
383 );
384
385 self.stream = Some(stream);
386 self.protocol = Some(detected);
387 self.chunk_remaining = 0;
389 Ok(())
390 } else if status_line.contains("401 Unauthorized") {
391 error!("NTRIP authentication failed");
392 Err(Error::AuthenticationFailed {
393 host: host.clone(),
394 username: self.config.username.clone().unwrap_or_default(),
395 })
396 } else if status_line.contains("404 Not Found") {
397 error!(mountpoint = %mountpoint, "Mountpoint not found");
398 Err(Error::MountpointNotFound {
399 host: host.clone(),
400 mountpoint: mountpoint.clone(),
401 })
402 } else {
403 error!(status = %status_line, "NTRIP caster error");
404 Err(Error::HttpError {
405 host: host.clone(),
406 status_code: Self::parse_status_code(status_line),
407 reason: status_line.to_string(),
408 })
409 }
410 }
411
412 pub async fn read_chunk(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
430 let max_attempts = self.config.connection.max_reconnect_attempts;
431 let reconnect_delay = self.config.connection.reconnect_delay_ms;
432 let mut attempts = 0;
433
434 loop {
435 if self.stream.is_none() {
437 if attempts > 0 && attempts <= max_attempts {
438 warn!(
440 attempt = attempts,
441 max_attempts = max_attempts,
442 "Attempting automatic reconnection"
443 );
444
445 tokio::time::sleep(Duration::from_millis(reconnect_delay)).await;
447
448 let gga_clone = self.last_gga.clone();
450 match self.connect_with_gga(gga_clone.as_ref()).await {
451 Ok(()) => {
452 info!(attempt = attempts, "Reconnection successful");
453 if let Some(ref gga) = gga_clone {
455 if let Err(e) = self.send_gga_internal(gga).await {
456 warn!(error = %e, "Failed to resend GGA after reconnect");
457 }
458 }
459 }
460 Err(e) => {
461 warn!(attempt = attempts, error = %e, "Reconnection attempt failed");
462 attempts += 1;
463 continue;
464 }
465 }
466 } else if attempts > max_attempts {
467 return Err(Error::StreamDisconnected {
469 reason: format!(
470 "Connection lost after {} reconnection attempts",
471 max_attempts
472 ),
473 });
474 } else {
475 return Err(Error::StreamDisconnected {
477 reason: "Not connected".to_string(),
478 });
479 }
480 }
481
482 let read_timeout_secs = self.config.connection.read_timeout_secs;
483
484 let read_result = if read_timeout_secs == 0 {
486 self.read_raw_or_chunked(buf).await.map(Some)
487 } else {
488 let timeout_duration = Duration::from_secs(read_timeout_secs as u64);
489 match tokio::time::timeout(timeout_duration, self.read_raw_or_chunked(buf)).await {
490 Ok(result) => result.map(Some),
491 Err(_) => Ok(None), }
493 };
494
495 match read_result {
496 Ok(Some(n)) => {
497 debug!(bytes = n, "Received data");
498 return Ok(n);
499 }
500 Ok(None) => {
501 self.stream = None;
503 self.reset_chunk_state();
504 if max_attempts > 0 {
505 attempts += 1;
506 continue;
507 }
508 return Err(Error::ReadTimeout {
509 timeout_secs: read_timeout_secs,
510 });
511 }
512 Err(e) => {
513 if max_attempts > 0 && Self::is_recoverable_error(&e) {
515 attempts += 1;
516 self.stream = None;
517 self.reset_chunk_state();
518 continue;
519 }
520 return Err(e);
521 }
522 }
523 }
524 }
525
526 fn is_recoverable_error(error: &Error) -> bool {
528 matches!(
529 error,
530 Error::StreamDisconnected { .. }
531 | Error::NetworkError { .. }
532 | Error::ReadTimeout { .. }
533 )
534 }
535
536 fn reset_chunk_state(&mut self) {
538 self.stream_buffer.clear();
539 self.chunk_buffer.clear();
540 self.chunk_remaining = 0;
541 }
542
543 async fn send_gga_internal(&mut self, gga: &GgaSentence) -> Result<(), Error> {
545 let stream = self.stream.as_mut().ok_or(Error::StreamDisconnected {
546 reason: "Not connected".into(),
547 })?;
548
549 let nmea = gga.to_nmea();
550 debug!(gga = %nmea.trim(), "Resending GGA after reconnection");
551
552 stream
553 .write_all(nmea.as_bytes())
554 .await
555 .map_err(|e| Error::NetworkError { source: e })?;
556
557 Ok(())
558 }
559
560 pub fn is_connected(&self) -> bool {
562 self.stream.is_some()
563 }
564
565 pub fn disconnect(&mut self) {
567 if self.stream.take().is_some() {
568 debug!("Disconnected from NTRIP caster");
569 }
570 }
571
572 pub async fn send_gga(&mut self, gga: &GgaSentence) -> Result<(), Error> {
574 let stream = self.stream.as_mut().ok_or(Error::StreamDisconnected {
575 reason: "Not connected".into(),
576 })?;
577
578 let nmea = gga.to_nmea();
579 debug!(gga = %nmea.trim(), "Sending GGA to caster");
580
581 stream
582 .write_all(nmea.as_bytes())
583 .await
584 .map_err(|e| Error::NetworkError { source: e })?;
585
586 self.last_gga = Some(gga.clone());
588
589 Ok(())
590 }
591
592 pub fn config(&self) -> &NtripConfig {
594 &self.config
595 }
596
597 pub async fn get_sourcetable(config: &NtripConfig) -> Result<Sourcetable, Error> {
602 config.validate()?;
603 let host = &config.host;
604 let port = config.port;
605 let addr = format!("{}:{}", host, port);
606
607 info!(addr = %addr, "Fetching sourcetable from NTRIP caster");
608
609 let tcp_stream = Self::establish_tcp_connection(
611 host,
612 port,
613 config.proxy.as_ref(),
614 config.connection.timeout_secs,
615 )
616 .await?;
617
618 let mut stream: NtripStream = if config.use_tls {
620 NtripStream::connect_tls(tcp_stream, host, config.tls_skip_verify).await?
621 } else {
622 NtripStream::plain(tcp_stream)
623 };
624
625 let auth_header = if let (Some(user), Some(pass)) = (&config.username, &config.password) {
627 let credentials = format!("{}:{}", user, pass);
628 let encoded = BASE64.encode(credentials);
629 format!("Authorization: Basic {}\r\n", encoded)
630 } else {
631 String::new()
632 };
633
634 let request = format!(
635 "GET / HTTP/1.0\r\n\
636 User-Agent: {}\r\n\
637 Host: {}:{}\r\n\
638 Accept: */*\r\n\
639 Connection: close\r\n\
640 {}\r\n",
641 USER_AGENT, host, port, auth_header
642 );
643
644 let redacted_request = if auth_header.is_empty() {
646 request.clone()
647 } else {
648 request.replace(&auth_header, "Authorization: Basic [REDACTED]\r\n")
649 };
650 debug!(request = %redacted_request, "Sending sourcetable request");
651
652 if let Err(e) = stream.write_all(request.as_bytes()).await {
653 return Err(Error::NetworkError { source: e });
654 }
655
656 let mut response = Vec::new();
658 let mut buf = [0u8; 4096];
659
660 loop {
661 match tokio::time::timeout(
662 Duration::from_secs(config.connection.timeout_secs as u64),
663 stream.read(&mut buf),
664 )
665 .await
666 {
667 Ok(Ok(0)) => break,
668 Ok(Ok(n)) => response.extend_from_slice(&buf[..n]),
669 Ok(Err(e)) => return Err(Error::NetworkError { source: e }),
670 Err(_) => {
671 return Err(Error::Timeout {
672 timeout_secs: config.connection.timeout_secs,
673 })
674 }
675 }
676
677 if response.len() > 1_000_000 {
678 return Err(Error::SourcetableParseError {
679 message: "Sourcetable too large (>1MB)".to_string(),
680 });
681 }
682 }
683
684 let response_str = String::from_utf8_lossy(&response);
686
687 let first_line = response_str.lines().next().unwrap_or("");
688 if !first_line.contains("200") {
689 return Err(Error::HttpError {
690 host: host.clone(),
691 status_code: Self::parse_status_code(first_line),
692 reason: first_line.to_string(),
693 });
694 }
695
696 let body = if let Some(idx) = response_str.find("\r\n\r\n") {
697 &response_str[idx + 4..]
698 } else {
699 &response_str[..]
700 };
701
702 let table = Sourcetable::parse(body);
703
704 info!(
705 streams = table.streams.len(),
706 casters = table.casters.len(),
707 networks = table.networks.len(),
708 "Parsed sourcetable"
709 );
710
711 Ok(table)
712 }
713
714 fn parse_status_code(status_line: &str) -> u16 {
716 status_line
717 .split_whitespace()
718 .nth(1)
719 .and_then(|s| s.parse().ok())
720 .unwrap_or(0)
721 }
722
723 fn detect_protocol(&self, response: &str, status_line: &str) -> DetectedProtocol {
725 if status_line.starts_with("ICY") {
726 debug!("Detected NTRIP v1 (ICY response)");
727 return DetectedProtocol::V1;
728 }
729
730 let response_lower = response.to_lowercase();
731 if response_lower.contains("transfer-encoding: chunked") {
732 debug!("Detected NTRIP v2 (chunked transfer encoding)");
733 return DetectedProtocol::V2Chunked;
734 }
735
736 if response_lower.contains("ntrip-version:") {
737 debug!("Server indicates NTRIP v2 but response is not chunked; treating as raw stream");
738 return DetectedProtocol::V1;
739 }
740
741 debug!("Defaulting to NTRIP v1 protocol (raw stream)");
742 DetectedProtocol::V1
743 }
744
745 async fn read_raw_or_chunked(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
747 let protocol = self.protocol.unwrap_or(DetectedProtocol::V1);
748
749 match protocol {
750 DetectedProtocol::V1 => self.read_raw(buf).await,
751 DetectedProtocol::V2Chunked => self.read_chunked(buf).await,
752 }
753 }
754
755 async fn read_raw(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
757 if !self.stream_buffer.is_empty() {
759 let to_copy = std::cmp::min(buf.len(), self.stream_buffer.len());
760 buf[..to_copy].copy_from_slice(&self.stream_buffer[..to_copy]);
761 self.stream_buffer.drain(..to_copy);
762 return Ok(to_copy);
763 }
764
765 let stream = self
766 .stream
767 .as_mut()
768 .ok_or_else(|| Error::StreamDisconnected {
769 reason: "Not connected".to_string(),
770 })?;
771
772 match stream.read(buf).await {
773 Ok(0) => {
774 self.stream = None;
775 Err(Error::StreamDisconnected {
776 reason: "Server closed connection".to_string(),
777 })
778 }
779 Ok(n) => Ok(n),
780 Err(e) => {
781 self.stream = None;
782 Err(Error::NetworkError { source: e })
783 }
784 }
785 }
786
787 async fn read_chunked(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
789 if !self.chunk_buffer.is_empty() {
791 let to_copy = std::cmp::min(buf.len(), self.chunk_buffer.len());
792 buf[..to_copy].copy_from_slice(&self.chunk_buffer[..to_copy]);
793 self.chunk_buffer.drain(..to_copy);
794 return Ok(to_copy);
795 }
796
797 if self.chunk_remaining > 0 {
799 let to_read = std::cmp::min(buf.len(), self.chunk_remaining);
800 let n = self.read_from_stream(&mut buf[..to_read]).await?;
801 if n == 0 {
802 self.stream = None;
803 return Err(Error::StreamDisconnected {
804 reason: "Server closed connection".to_string(),
805 });
806 }
807 self.chunk_remaining -= n;
808 if self.chunk_remaining == 0 {
809 self.read_and_validate_crlf().await?;
811 }
812 return Ok(n);
813 }
814
815 let mut size_line = Vec::new();
817 loop {
818 let byte = self.read_one_byte().await?;
819 if byte == b'\n' {
820 break;
821 }
822 if byte != b'\r' {
823 size_line.push(byte);
824 }
825 if size_line.len() > 16 {
826 return Err(Error::NetworkError {
827 source: std::io::Error::new(
828 std::io::ErrorKind::InvalidData,
829 "Chunk size line too long",
830 ),
831 });
832 }
833 }
834
835 let size_str = String::from_utf8_lossy(&size_line);
836 let size_part = size_str.trim().split(';').next().unwrap_or("");
838 let chunk_size =
839 usize::from_str_radix(size_part.trim(), 16).map_err(|_| Error::NetworkError {
840 source: std::io::Error::new(
841 std::io::ErrorKind::InvalidData,
842 format!("Invalid chunk size: {}", size_str),
843 ),
844 })?;
845
846 if chunk_size == 0 {
847 self.stream = None;
848 return Err(Error::StreamDisconnected {
849 reason: "Chunked stream ended".to_string(),
850 });
851 }
852
853 debug!(chunk_size = chunk_size, "Reading chunked data");
854
855 self.chunk_remaining = chunk_size;
856 let to_read = std::cmp::min(buf.len(), self.chunk_remaining);
857
858 let n = self.read_from_stream(&mut buf[..to_read]).await?;
859 if n == 0 {
860 self.stream = None;
861 return Err(Error::StreamDisconnected {
862 reason: "Server closed connection".to_string(),
863 });
864 }
865 self.chunk_remaining -= n;
866 if self.chunk_remaining == 0 {
867 self.read_and_validate_crlf().await?;
869 }
870 Ok(n)
871 }
872
873 async fn read_from_stream(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
875 if !self.stream_buffer.is_empty() {
877 let to_copy = std::cmp::min(buf.len(), self.stream_buffer.len());
878 buf[..to_copy].copy_from_slice(&self.stream_buffer[..to_copy]);
879 self.stream_buffer.drain(..to_copy);
880 return Ok(to_copy);
881 }
882
883 let stream = self
884 .stream
885 .as_mut()
886 .ok_or_else(|| Error::StreamDisconnected {
887 reason: "Not connected".to_string(),
888 })?;
889
890 stream.read(buf).await.map_err(|e| {
891 self.stream = None;
892 Error::NetworkError { source: e }
893 })
894 }
895
896 async fn read_one_byte(&mut self) -> Result<u8, Error> {
898 if !self.stream_buffer.is_empty() {
900 return Ok(self.stream_buffer.remove(0));
901 }
902
903 let stream = self
904 .stream
905 .as_mut()
906 .ok_or_else(|| Error::StreamDisconnected {
907 reason: "Not connected".to_string(),
908 })?;
909
910 let mut byte = [0u8; 1];
911 stream.read_exact(&mut byte).await.map_err(|e| {
912 self.stream = None;
913 Error::NetworkError { source: e }
914 })?;
915 Ok(byte[0])
916 }
917
918 async fn read_and_validate_crlf(&mut self) -> Result<(), Error> {
920 let cr = self.read_one_byte().await?;
921 let lf = self.read_one_byte().await?;
922 if cr != b'\r' || lf != b'\n' {
923 self.stream = None;
924 return Err(Error::NetworkError {
925 source: std::io::Error::new(
926 std::io::ErrorKind::InvalidData,
927 "Expected CRLF after chunk data",
928 ),
929 });
930 }
931 Ok(())
932 }
933}
934
935impl Drop for NtripClient {
936 fn drop(&mut self) {
937 self.disconnect();
938 }
939}