1use std::time::Duration;
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio::net::TcpStream;
18use tracing::{debug, error, info, warn};
19
20use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
21
22use crate::config::{NtripConfig, NtripVersion};
23use crate::gga::GgaSentence;
24use crate::sourcetable::Sourcetable;
25use crate::stream::NtripStream;
26use crate::Error;
27
28const USER_AGENT: &str = concat!("NTRIP ntrip-core/", env!("CARGO_PKG_VERSION"));
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33enum DetectedProtocol {
34 V1,
36 V2Chunked,
38}
39
40pub struct NtripClient {
43 config: NtripConfig,
45 stream: Option<NtripStream>,
47 protocol: Option<DetectedProtocol>,
49 stream_buffer: Vec<u8>,
51 chunk_buffer: Vec<u8>,
53 chunk_remaining: usize,
55 last_gga: Option<GgaSentence>,
57}
58
59impl NtripClient {
60 pub fn new(config: NtripConfig) -> Result<Self, Error> {
62 config.validate()?;
63 Ok(Self {
64 config,
65 stream: None,
66 protocol: None,
67 stream_buffer: Vec::new(),
68 chunk_buffer: Vec::new(),
69 chunk_remaining: 0,
70 last_gga: None,
71 })
72 }
73
74 pub async fn connect(&mut self) -> Result<(), Error> {
79 self.connect_with_gga(None).await
80 }
81
82 pub async fn connect_with_gga(
90 &mut self,
91 initial_gga: Option<&GgaSentence>,
92 ) -> Result<(), Error> {
93 let host = &self.config.host;
94 let port = self.config.port;
95 let addr = format!("{}:{}", host, port);
96
97 info!(addr = %addr, "Connecting to NTRIP caster");
98
99 let tcp_stream = match tokio::time::timeout(
101 Duration::from_secs(self.config.connection.timeout_secs as u64),
102 TcpStream::connect(&addr),
103 )
104 .await
105 {
106 Ok(Ok(s)) => s,
107 Ok(Err(e)) => return Err(Error::connection_failed(host, port, e)),
108 Err(_) => {
109 return Err(Error::Timeout {
110 timeout_secs: self.config.connection.timeout_secs,
111 })
112 }
113 };
114
115 let mut stream: NtripStream = if self.config.use_tls {
117 if self.config.tls_skip_verify {
118 warn!("TLS certificate verification is disabled - connection is not fully secure");
119 }
120 NtripStream::connect_tls(tcp_stream, host, self.config.tls_skip_verify).await?
121 } else {
122 NtripStream::plain(tcp_stream)
123 };
124
125 let mountpoint = &self.config.mountpoint;
127
128 let auth_header =
129 if let (Some(user), Some(pass)) = (&self.config.username, &self.config.password) {
130 let credentials = format!("{}:{}", user, pass);
131 let encoded = BASE64.encode(credentials);
132 format!("Authorization: Basic {}\r\n", encoded)
133 } else {
134 String::new()
135 };
136
137 let gga_header = match (&self.config.ntrip_version, initial_gga) {
139 (NtripVersion::V2, Some(gga)) | (NtripVersion::Auto, Some(gga)) => {
140 let nmea = gga.to_nmea();
141 let nmea_trimmed = nmea.trim();
142 debug!(gga = %nmea_trimmed, "Including initial GGA in request header");
143 format!("Ntrip-GGA: {}\r\n", nmea_trimmed)
144 }
145 _ => String::new(),
146 };
147
148 if let Some(gga) = initial_gga {
150 self.last_gga = Some(gga.clone());
151 }
152
153 let request = match self.config.ntrip_version {
155 NtripVersion::V1 => {
156 debug!("Using NTRIP v1 protocol");
157 format!(
158 "GET /{} HTTP/1.0\r\n\
159 User-Agent: {}\r\n\
160 Host: {}:{}\r\n\
161 Accept: */*\r\n\
162 Connection: close\r\n\
163 {}\r\n",
164 mountpoint, USER_AGENT, host, port, auth_header
165 )
166 }
167 NtripVersion::V2 | NtripVersion::Auto => {
168 debug!("Using NTRIP v2 protocol request");
169 format!(
170 "GET /{} HTTP/1.1\r\n\
171 Host: {}:{}\r\n\
172 User-Agent: {}\r\n\
173 Ntrip-Version: Ntrip/2.0\r\n\
174 {}\
175 Accept: */*\r\n\
176 Connection: close\r\n\
177 {}\r\n",
178 mountpoint, host, port, USER_AGENT, gga_header, auth_header
179 )
180 }
181 };
182
183 let redacted_request = if auth_header.is_empty() {
185 request.clone()
186 } else {
187 request.replace(&auth_header, "Authorization: Basic [REDACTED]\r\n")
188 };
189 debug!(request = %redacted_request, "Sending NTRIP request");
190
191 if let Err(e) = stream.write_all(request.as_bytes()).await {
193 return Err(Error::NetworkError { source: e });
194 }
195
196 const MAX_HEADER_SIZE: usize = 4096;
198 let mut headers = Vec::with_capacity(512);
199 let mut buffer = [0u8; 512];
200
201 let header_end_found = loop {
202 if headers.len() >= MAX_HEADER_SIZE {
203 break false;
204 }
205
206 let bytes_to_read = std::cmp::min(buffer.len(), MAX_HEADER_SIZE - headers.len());
208 match stream.read(&mut buffer[..bytes_to_read]).await {
209 Ok(0) => break false, Ok(n) => {
211 headers.extend_from_slice(&buffer[..n]);
212
213 if headers.len() >= 4 {
215 if let Some(pos) = headers.windows(4).position(|w| w == b"\r\n\r\n") {
216 let body_start = pos + 4;
218 if headers.len() > body_start {
219 self.stream_buffer.extend_from_slice(&headers[body_start..]);
220 }
221 headers.truncate(body_start);
223 break true;
224 }
225 }
226
227 if headers.len() >= 12 && headers.starts_with(b"ICY 200 OK\r\n") {
229 debug!("Detected legacy ICY 200 OK response, starting stream");
230 let icy_header_len = 12; if headers.len() > icy_header_len {
233 self.stream_buffer
234 .extend_from_slice(&headers[icy_header_len..]);
235 }
236 headers.truncate(icy_header_len);
237 break true;
238 }
239 }
240 Err(e) => return Err(Error::NetworkError { source: e }),
241 }
242 };
243
244 if !header_end_found {
245 return Err(Error::NetworkError {
246 source: std::io::Error::new(
247 std::io::ErrorKind::InvalidData,
248 "Header too large or incomplete",
249 ),
250 });
251 }
252
253 let response = String::from_utf8_lossy(&headers);
255 let status_line = response.lines().next().unwrap_or("");
256
257 debug!(status_line = %status_line, "Received NTRIP response");
258
259 if status_line.starts_with("ICY 200") || status_line.contains("200 OK") {
260 let detected = self.detect_protocol(&response, status_line);
261 info!(
262 mountpoint = %mountpoint,
263 protocol = ?detected,
264 "Connected to NTRIP caster"
265 );
266
267 self.stream = Some(stream);
268 self.protocol = Some(detected);
269 self.chunk_remaining = 0;
271 Ok(())
272 } else if status_line.contains("401 Unauthorized") {
273 error!("NTRIP authentication failed");
274 Err(Error::AuthenticationFailed {
275 host: host.clone(),
276 username: self.config.username.clone().unwrap_or_default(),
277 })
278 } else if status_line.contains("404 Not Found") {
279 error!(mountpoint = %mountpoint, "Mountpoint not found");
280 Err(Error::MountpointNotFound {
281 host: host.clone(),
282 mountpoint: mountpoint.clone(),
283 })
284 } else {
285 error!(status = %status_line, "NTRIP caster error");
286 Err(Error::HttpError {
287 host: host.clone(),
288 status_code: Self::parse_status_code(status_line),
289 reason: status_line.to_string(),
290 })
291 }
292 }
293
294 pub async fn read_chunk(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
312 let max_attempts = self.config.connection.max_reconnect_attempts;
313 let reconnect_delay = self.config.connection.reconnect_delay_ms;
314 let mut attempts = 0;
315
316 loop {
317 if self.stream.is_none() {
319 if attempts > 0 && attempts <= max_attempts {
320 warn!(
322 attempt = attempts,
323 max_attempts = max_attempts,
324 "Attempting automatic reconnection"
325 );
326
327 tokio::time::sleep(Duration::from_millis(reconnect_delay)).await;
329
330 let gga_clone = self.last_gga.clone();
332 match self.connect_with_gga(gga_clone.as_ref()).await {
333 Ok(()) => {
334 info!(attempt = attempts, "Reconnection successful");
335 if let Some(ref gga) = gga_clone {
337 if let Err(e) = self.send_gga_internal(gga).await {
338 warn!(error = %e, "Failed to resend GGA after reconnect");
339 }
340 }
341 }
342 Err(e) => {
343 warn!(attempt = attempts, error = %e, "Reconnection attempt failed");
344 attempts += 1;
345 continue;
346 }
347 }
348 } else if attempts > max_attempts {
349 return Err(Error::StreamDisconnected {
351 reason: format!(
352 "Connection lost after {} reconnection attempts",
353 max_attempts
354 ),
355 });
356 } else {
357 return Err(Error::StreamDisconnected {
359 reason: "Not connected".to_string(),
360 });
361 }
362 }
363
364 let read_timeout_secs = self.config.connection.read_timeout_secs;
365
366 let read_result = if read_timeout_secs == 0 {
368 self.read_raw_or_chunked(buf).await.map(Some)
369 } else {
370 let timeout_duration = Duration::from_secs(read_timeout_secs as u64);
371 match tokio::time::timeout(timeout_duration, self.read_raw_or_chunked(buf)).await {
372 Ok(result) => result.map(Some),
373 Err(_) => Ok(None), }
375 };
376
377 match read_result {
378 Ok(Some(n)) => {
379 debug!(bytes = n, "Received data");
380 return Ok(n);
381 }
382 Ok(None) => {
383 self.stream = None;
385 self.reset_chunk_state();
386 if max_attempts > 0 {
387 attempts += 1;
388 continue;
389 }
390 return Err(Error::ReadTimeout {
391 timeout_secs: read_timeout_secs,
392 });
393 }
394 Err(e) => {
395 if max_attempts > 0 && Self::is_recoverable_error(&e) {
397 attempts += 1;
398 self.stream = None;
399 self.reset_chunk_state();
400 continue;
401 }
402 return Err(e);
403 }
404 }
405 }
406 }
407
408 fn is_recoverable_error(error: &Error) -> bool {
410 matches!(
411 error,
412 Error::StreamDisconnected { .. }
413 | Error::NetworkError { .. }
414 | Error::ReadTimeout { .. }
415 )
416 }
417
418 fn reset_chunk_state(&mut self) {
420 self.stream_buffer.clear();
421 self.chunk_buffer.clear();
422 self.chunk_remaining = 0;
423 }
424
425 async fn send_gga_internal(&mut self, gga: &GgaSentence) -> Result<(), Error> {
427 let stream = self.stream.as_mut().ok_or(Error::StreamDisconnected {
428 reason: "Not connected".into(),
429 })?;
430
431 let nmea = gga.to_nmea();
432 debug!(gga = %nmea.trim(), "Resending GGA after reconnection");
433
434 stream
435 .write_all(nmea.as_bytes())
436 .await
437 .map_err(|e| Error::NetworkError { source: e })?;
438
439 Ok(())
440 }
441
442 pub fn is_connected(&self) -> bool {
444 self.stream.is_some()
445 }
446
447 pub fn disconnect(&mut self) {
449 if self.stream.take().is_some() {
450 debug!("Disconnected from NTRIP caster");
451 }
452 }
453
454 pub async fn send_gga(&mut self, gga: &GgaSentence) -> Result<(), Error> {
456 let stream = self.stream.as_mut().ok_or(Error::StreamDisconnected {
457 reason: "Not connected".into(),
458 })?;
459
460 let nmea = gga.to_nmea();
461 debug!(gga = %nmea.trim(), "Sending GGA to caster");
462
463 stream
464 .write_all(nmea.as_bytes())
465 .await
466 .map_err(|e| Error::NetworkError { source: e })?;
467
468 self.last_gga = Some(gga.clone());
470
471 Ok(())
472 }
473
474 pub fn config(&self) -> &NtripConfig {
476 &self.config
477 }
478
479 pub async fn get_sourcetable(config: &NtripConfig) -> Result<Sourcetable, Error> {
484 config.validate()?;
485 let host = &config.host;
486 let port = config.port;
487 let addr = format!("{}:{}", host, port);
488
489 info!(addr = %addr, "Fetching sourcetable from NTRIP caster");
490
491 let tcp_stream = match tokio::time::timeout(
493 Duration::from_secs(config.connection.timeout_secs as u64),
494 TcpStream::connect(&addr),
495 )
496 .await
497 {
498 Ok(Ok(s)) => s,
499 Ok(Err(e)) => return Err(Error::connection_failed(host, port, e)),
500 Err(_) => {
501 return Err(Error::Timeout {
502 timeout_secs: config.connection.timeout_secs,
503 })
504 }
505 };
506
507 let mut stream: NtripStream = if config.use_tls {
509 NtripStream::connect_tls(tcp_stream, host, config.tls_skip_verify).await?
510 } else {
511 NtripStream::plain(tcp_stream)
512 };
513
514 let auth_header = if let (Some(user), Some(pass)) = (&config.username, &config.password) {
516 let credentials = format!("{}:{}", user, pass);
517 let encoded = BASE64.encode(credentials);
518 format!("Authorization: Basic {}\r\n", encoded)
519 } else {
520 String::new()
521 };
522
523 let request = format!(
524 "GET / HTTP/1.0\r\n\
525 User-Agent: {}\r\n\
526 Host: {}:{}\r\n\
527 Accept: */*\r\n\
528 Connection: close\r\n\
529 {}\r\n",
530 USER_AGENT, host, port, auth_header
531 );
532
533 let redacted_request = if auth_header.is_empty() {
535 request.clone()
536 } else {
537 request.replace(&auth_header, "Authorization: Basic [REDACTED]\r\n")
538 };
539 debug!(request = %redacted_request, "Sending sourcetable request");
540
541 if let Err(e) = stream.write_all(request.as_bytes()).await {
542 return Err(Error::NetworkError { source: e });
543 }
544
545 let mut response = Vec::new();
547 let mut buf = [0u8; 4096];
548
549 loop {
550 match tokio::time::timeout(
551 Duration::from_secs(config.connection.timeout_secs as u64),
552 stream.read(&mut buf),
553 )
554 .await
555 {
556 Ok(Ok(0)) => break,
557 Ok(Ok(n)) => response.extend_from_slice(&buf[..n]),
558 Ok(Err(e)) => return Err(Error::NetworkError { source: e }),
559 Err(_) => {
560 return Err(Error::Timeout {
561 timeout_secs: config.connection.timeout_secs,
562 })
563 }
564 }
565
566 if response.len() > 1_000_000 {
567 return Err(Error::SourcetableParseError {
568 message: "Sourcetable too large (>1MB)".to_string(),
569 });
570 }
571 }
572
573 let response_str = String::from_utf8_lossy(&response);
575
576 let first_line = response_str.lines().next().unwrap_or("");
577 if !first_line.contains("200") {
578 return Err(Error::HttpError {
579 host: host.clone(),
580 status_code: Self::parse_status_code(first_line),
581 reason: first_line.to_string(),
582 });
583 }
584
585 let body = if let Some(idx) = response_str.find("\r\n\r\n") {
586 &response_str[idx + 4..]
587 } else {
588 &response_str[..]
589 };
590
591 let table = Sourcetable::parse(body);
592
593 info!(
594 streams = table.streams.len(),
595 casters = table.casters.len(),
596 networks = table.networks.len(),
597 "Parsed sourcetable"
598 );
599
600 Ok(table)
601 }
602
603 fn parse_status_code(status_line: &str) -> u16 {
605 status_line
606 .split_whitespace()
607 .nth(1)
608 .and_then(|s| s.parse().ok())
609 .unwrap_or(0)
610 }
611
612 fn detect_protocol(&self, response: &str, status_line: &str) -> DetectedProtocol {
614 if status_line.starts_with("ICY") {
615 debug!("Detected NTRIP v1 (ICY response)");
616 return DetectedProtocol::V1;
617 }
618
619 let response_lower = response.to_lowercase();
620 if response_lower.contains("transfer-encoding: chunked") {
621 debug!("Detected NTRIP v2 (chunked transfer encoding)");
622 return DetectedProtocol::V2Chunked;
623 }
624
625 if response_lower.contains("ntrip-version:") {
626 debug!("Server indicates NTRIP v2 but response is not chunked; treating as raw stream");
627 return DetectedProtocol::V1;
628 }
629
630 debug!("Defaulting to NTRIP v1 protocol (raw stream)");
631 DetectedProtocol::V1
632 }
633
634 async fn read_raw_or_chunked(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
636 let protocol = self.protocol.unwrap_or(DetectedProtocol::V1);
637
638 match protocol {
639 DetectedProtocol::V1 => self.read_raw(buf).await,
640 DetectedProtocol::V2Chunked => self.read_chunked(buf).await,
641 }
642 }
643
644 async fn read_raw(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
646 if !self.stream_buffer.is_empty() {
648 let to_copy = std::cmp::min(buf.len(), self.stream_buffer.len());
649 buf[..to_copy].copy_from_slice(&self.stream_buffer[..to_copy]);
650 self.stream_buffer.drain(..to_copy);
651 return Ok(to_copy);
652 }
653
654 let stream = self
655 .stream
656 .as_mut()
657 .ok_or_else(|| Error::StreamDisconnected {
658 reason: "Not connected".to_string(),
659 })?;
660
661 match stream.read(buf).await {
662 Ok(0) => {
663 self.stream = None;
664 Err(Error::StreamDisconnected {
665 reason: "Server closed connection".to_string(),
666 })
667 }
668 Ok(n) => Ok(n),
669 Err(e) => {
670 self.stream = None;
671 Err(Error::NetworkError { source: e })
672 }
673 }
674 }
675
676 async fn read_chunked(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
678 if !self.chunk_buffer.is_empty() {
680 let to_copy = std::cmp::min(buf.len(), self.chunk_buffer.len());
681 buf[..to_copy].copy_from_slice(&self.chunk_buffer[..to_copy]);
682 self.chunk_buffer.drain(..to_copy);
683 return Ok(to_copy);
684 }
685
686 if self.chunk_remaining > 0 {
688 let to_read = std::cmp::min(buf.len(), self.chunk_remaining);
689 let n = self.read_from_stream(&mut buf[..to_read]).await?;
690 if n == 0 {
691 self.stream = None;
692 return Err(Error::StreamDisconnected {
693 reason: "Server closed connection".to_string(),
694 });
695 }
696 self.chunk_remaining -= n;
697 if self.chunk_remaining == 0 {
698 self.read_and_validate_crlf().await?;
700 }
701 return Ok(n);
702 }
703
704 let mut size_line = Vec::new();
706 loop {
707 let byte = self.read_one_byte().await?;
708 if byte == b'\n' {
709 break;
710 }
711 if byte != b'\r' {
712 size_line.push(byte);
713 }
714 if size_line.len() > 16 {
715 return Err(Error::NetworkError {
716 source: std::io::Error::new(
717 std::io::ErrorKind::InvalidData,
718 "Chunk size line too long",
719 ),
720 });
721 }
722 }
723
724 let size_str = String::from_utf8_lossy(&size_line);
725 let size_part = size_str.trim().split(';').next().unwrap_or("");
727 let chunk_size =
728 usize::from_str_radix(size_part.trim(), 16).map_err(|_| Error::NetworkError {
729 source: std::io::Error::new(
730 std::io::ErrorKind::InvalidData,
731 format!("Invalid chunk size: {}", size_str),
732 ),
733 })?;
734
735 if chunk_size == 0 {
736 self.stream = None;
737 return Err(Error::StreamDisconnected {
738 reason: "Chunked stream ended".to_string(),
739 });
740 }
741
742 debug!(chunk_size = chunk_size, "Reading chunked data");
743
744 self.chunk_remaining = chunk_size;
745 let to_read = std::cmp::min(buf.len(), self.chunk_remaining);
746
747 let n = self.read_from_stream(&mut buf[..to_read]).await?;
748 if n == 0 {
749 self.stream = None;
750 return Err(Error::StreamDisconnected {
751 reason: "Server closed connection".to_string(),
752 });
753 }
754 self.chunk_remaining -= n;
755 if self.chunk_remaining == 0 {
756 self.read_and_validate_crlf().await?;
758 }
759 Ok(n)
760 }
761
762 async fn read_from_stream(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
764 if !self.stream_buffer.is_empty() {
766 let to_copy = std::cmp::min(buf.len(), self.stream_buffer.len());
767 buf[..to_copy].copy_from_slice(&self.stream_buffer[..to_copy]);
768 self.stream_buffer.drain(..to_copy);
769 return Ok(to_copy);
770 }
771
772 let stream = self
773 .stream
774 .as_mut()
775 .ok_or_else(|| Error::StreamDisconnected {
776 reason: "Not connected".to_string(),
777 })?;
778
779 stream.read(buf).await.map_err(|e| {
780 self.stream = None;
781 Error::NetworkError { source: e }
782 })
783 }
784
785 async fn read_one_byte(&mut self) -> Result<u8, Error> {
787 if !self.stream_buffer.is_empty() {
789 return Ok(self.stream_buffer.remove(0));
790 }
791
792 let stream = self
793 .stream
794 .as_mut()
795 .ok_or_else(|| Error::StreamDisconnected {
796 reason: "Not connected".to_string(),
797 })?;
798
799 let mut byte = [0u8; 1];
800 stream.read_exact(&mut byte).await.map_err(|e| {
801 self.stream = None;
802 Error::NetworkError { source: e }
803 })?;
804 Ok(byte[0])
805 }
806
807 async fn read_and_validate_crlf(&mut self) -> Result<(), Error> {
809 let cr = self.read_one_byte().await?;
810 let lf = self.read_one_byte().await?;
811 if cr != b'\r' || lf != b'\n' {
812 self.stream = None;
813 return Err(Error::NetworkError {
814 source: std::io::Error::new(
815 std::io::ErrorKind::InvalidData,
816 "Expected CRLF after chunk data",
817 ),
818 });
819 }
820 Ok(())
821 }
822}
823
824impl Drop for NtripClient {
825 fn drop(&mut self) {
826 self.disconnect();
827 }
828}