1use std::sync::Arc;
4
5use base64::{engine::general_purpose, Engine as _};
6use futures::Stream;
7use http::{
8 header::{InvalidHeaderValue, ToStrError, USER_AGENT},
9 HeaderMap, HeaderValue, Method,
10};
11use rtcm_rs::{Message, MessageFrame};
12use rustls::pki_types::{InvalidDnsNameError, ServerName};
13use tokio::{
14 io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt},
15 net::TcpStream,
16 select,
17 sync::{
18 broadcast::Sender as BroadcastSender,
19 mpsc::{unbounded_channel, UnboundedReceiver},
20 },
21 task::JoinHandle,
22};
23use tokio_rustls::TlsConnector;
24use tracing::{debug, error, trace, warn};
25
26use crate::{
27 config::{NtripConfig, NtripCredentials},
28 snip::ServerInfo,
29};
30
31pub struct NtripClient {
93 config: NtripConfig,
94 creds: NtripCredentials,
95}
96
97pub struct NtripHandle {
100 _rx_handle: tokio::task::JoinHandle<()>,
101 ntrip_rx: UnboundedReceiver<Message>,
102}
103
104#[derive(Debug, thiserror::Error)]
105pub enum NtripClientError {
106 #[error("Io error: {0}")]
107 Io(#[from] std::io::Error),
108
109 #[error("Reqwest error: {0}")]
110 Reqwest(#[from] reqwest::Error),
111
112 #[error("Invalid header value {0}")]
113 InvalidHeaderValue(#[from] InvalidHeaderValue),
114
115 #[error("Invalid DNS name {0}")]
116 InvalidDnsName(#[from] InvalidDnsNameError),
117
118 #[error("Header ToStrError error {0}")]
119 ToStrError(#[from] ToStrError),
120
121 #[error("Response error")]
122 ResponseError(String),
123}
124
125impl NtripClient {
126 pub async fn new(
127 config: NtripConfig,
128 creds: NtripCredentials,
129 ) -> Result<Self, NtripClientError> {
130 Ok(NtripClient { config, creds })
131 }
132
133 pub async fn list_mounts(&mut self) -> Result<ServerInfo, NtripClientError> {
135 let client = reqwest::Client::builder()
136 .http1_ignore_invalid_headers_in_responses(true)
137 .http09_responses()
138 .user_agent(format!(
139 "NTRIP {}/{}",
140 env!("CARGO_PKG_NAME"),
141 env!("CARGO_PKG_VERSION")
142 ))
143 .build()?;
144
145 let proto = if self.config.use_tls { "https" } else { "http" };
147
148 let req = client
149 .request(
150 Method::GET,
151 format!("{}://{}:{}", proto, self.config.host, self.config.port),
152 )
153 .header("Ntrip-Version", "NTRIP/2.0")
154 .build()?;
155
156 let res = client.execute(req).await?;
157
158 debug!("Fetched NTRIP response: {:?}", res.status());
159
160 let body = res.text().await?;
161
162 let lines = body.lines().collect::<Vec<&str>>();
163
164 let snip_info = ServerInfo::parse(lines.iter().cloned());
165
166 Ok(snip_info)
167 }
168
169 pub async fn mount(
179 &mut self,
180 mount: impl ToString,
181 exit_tx: BroadcastSender<()>,
182 ) -> Result<NtripHandle, NtripClientError> {
183 debug!(
184 "Connecting to NTRIP server {}/{}",
185 self.config.to_url(),
186 mount.to_string()
187 );
188
189 let sock = TcpStream::connect(&self.config.to_url()).await?;
190
191 let (rx_handle, ntrip_rx) = match self.config.use_tls {
192 true => {
193 debug!("Using TLS connection");
194
195 let mut root_cert_store = rustls::RootCertStore::empty();
196 root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
197
198 let tls_config = rustls::ClientConfig::builder()
199 .with_root_certificates(root_cert_store)
200 .with_no_client_auth();
201 let connector = TlsConnector::from(Arc::new(tls_config));
202 let dnsname = ServerName::try_from(self.config.host.clone())?;
203
204 let tls_sock = connector.connect(dnsname, sock).await?;
205
206 Self::handle_connection(
207 &self.config,
208 &self.creds,
209 &mount.to_string(),
210 exit_tx.clone(),
211 tls_sock,
212 )
213 .await?
214 },
215 false => {
216 debug!("Using plain TCP connection");
217
218 Self::handle_connection(
219 &self.config,
220 &self.creds,
221 &mount.to_string(),
222 exit_tx.clone(),
223 sock,
224 )
225 .await?
226 },
227 };
228
229 Ok(NtripHandle {
230 _rx_handle: rx_handle,
231 ntrip_rx,
232 })
233 }
234
235 pub async fn handle_connection(
236 config: &NtripConfig,
237 creds: &NtripCredentials,
238 mount: &str,
239 exit_tx: BroadcastSender<()>,
240 mut sock: impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
241 ) -> Result<(JoinHandle<()>, UnboundedReceiver<Message>), NtripClientError> {
242 let mut headers = HeaderMap::new();
244 headers.append(
245 USER_AGENT,
246 HeaderValue::from_str(&format!(
247 "NTRIP {}/{}",
248 env!("CARGO_PKG_NAME"),
249 env!("CARGO_PKG_VERSION")
250 ))?,
251 );
252
253 headers.append("Ntrip-Version", HeaderValue::from_static("NTRIP/2.0"));
254 headers.append("Accept", HeaderValue::from_static("*/*"));
255 headers.append("Connection", HeaderValue::from_static("close"));
256
257 if !creds.user.is_empty() {
259 let auth = general_purpose::STANDARD.encode(format!("{}:{}", creds.user, creds.pass));
260 headers.append(
261 "Authorization",
262 HeaderValue::from_str(&format!("Basic {}", auth))?,
263 );
264 }
265
266 debug!("Headers: {:#?}", headers);
267
268 debug!("Write HTTP request");
270 sock.write_all(format!("GET /{} HTTP/1.0\r\n", mount).as_bytes())
271 .await?;
272 sock.write_all(format!("Host: {}\r\n", config.to_url()).as_bytes())
273 .await?;
274
275 debug!("Writing headers");
277 for h in headers.iter() {
278 sock.write_all(format!("{}: {}\r\n", h.0.as_str(), h.1.to_str()?).as_bytes())
279 .await?;
280 }
281
282 sock.write_all(b"\r\n").await?;
283 sock.flush().await?;
284
285 debug!("Reading response");
286 let mut buff = Vec::with_capacity(1024);
287
288 let n = sock.read_buf(&mut buff).await?;
290 debug!("Read {} bytes, current buffer {} bytes", n, buff.len());
291
292 let r = String::from_utf8_lossy(&buff[..n]);
294 match r.lines().next() {
295 Some(status) if status.contains("200 OK") => {
296 debug!("Got 200 OK response");
297 },
298 Some(status) => {
299 error!("NTRIP server returned error: {}", status);
300 return Err(NtripClientError::ResponseError(status.to_string()));
301 },
302 None => {
303 error!("NTRIP server returned empty response");
304 return Err(NtripClientError::ResponseError("empty response".into()));
305 },
306 }
307
308 if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) {
310 debug!(
311 "Trimming buffer to next potential frame start at index {}",
312 i.0
313 );
314 let _ = buff.drain(..i.0);
315 }
316
317 let (ntrip_tx, ntrip_rx) = unbounded_channel();
320 let mut exit_rx = exit_tx.subscribe();
321 let rx_handle = tokio::task::spawn(async move {
322 let mut error_count = 0;
324
325 'listener: loop {
326 select! {
327 n = sock.read_buf(&mut buff) => match n {
328 Ok(n) => {
329 debug!("Read {} bytes, current buffer {} bytes", n, buff.len());
330 trace!("Appended {:02x?}", &buff[buff.len()-n..][..n]);
331
332 if n == 0 {
334 warn!("Zero length response");
335 break 'listener;
336 }
337
338 if buff[0] != 0xd3 {
340 if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) {
341 warn!("Trimming buffer to next potential frame start at index {}", i.0);
342 buff.drain(..i.0);
343
344 assert_eq!(buff[0], 0xd3);
345 }
346 }
347
348 while buff.len() > 6 {
351 match MessageFrame::new(&buff[..]) {
353 Ok(f) => {
354 let m = f.get_message();
356
357 debug!("Parsed RTCM message: {:?} (consumed {} bytes)", m, f.frame_len());
358
359 ntrip_tx.send(m).unwrap();
361
362 let _ = buff.drain(..f.frame_len());
364
365 error_count = 0;
367 },
368 Err(e) => {
369 warn!("RTCM parse error: {} (count: {})", e, error_count);
370
371 error_count += 1;
373
374 if error_count >= 5 {
376 error!("Too many parse errors, closing connection");
377 break 'listener;
378 }
379
380 break;
381 }
382 }
383 }
384 },
385 Err(e) => {
386 error!("socket read error: {}", e);
387 break;
388 },
389 },
390 _ = exit_rx.recv() => {
391 error!("Exiting NTRIP read loop on signal");
392 break;
393 }
394 }
395 }
396
397 warn!("NTRIP read loop exiting");
398
399 if !buff.is_empty() {
400 warn!("Dropping {} bytes of unparsed data", buff.len());
401
402 if let Ok(s) = String::from_utf8(buff) {
403 debug!("Unparsed data:\r\n{}", s);
404 }
405 }
406 });
407
408 Ok((rx_handle, ntrip_rx))
409 }
410}
411
412impl Stream for NtripHandle {
414 type Item = Message;
415
416 fn poll_next(
417 mut self: std::pin::Pin<&mut Self>,
418 cx: &mut std::task::Context<'_>,
419 ) -> std::task::Poll<Option<Self::Item>> {
420 self.ntrip_rx.poll_recv(cx)
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use std::env;
427
428 use futures::StreamExt;
429 use tracing::debug;
430
431 use super::*;
432 use crate::config::NtripCredentials;
433
434 fn setup_logging() {
435 let _ = tracing_subscriber::FmtSubscriber::builder()
436 .compact()
437 .without_time()
438 .with_max_level(tracing::level_filters::LevelFilter::DEBUG)
439 .try_init();
440 }
441
442 #[tokio::test]
443 #[ignore = "Requires NTRIP config from the environment"]
444 async fn test_ntrip_client() {
445 setup_logging();
446
447 debug!("Connecting to NTRIP server");
448
449 let (exit_tx, _exit_rx) = tokio::sync::broadcast::channel(1);
450
451 let mount = env::var("NTRIP_MOUNT").unwrap_or("ARGOACU".to_string());
452 let config = env::var("NTRIP_HOST")
453 .unwrap_or("rtk2go".to_string())
454 .parse::<NtripConfig>()
455 .unwrap();
456 let creds = NtripCredentials {
457 user: env::var("NTRIP_USER").unwrap_or("user".into()),
458 pass: env::var("NTRIP_PASS").unwrap_or("pass".into()),
459 };
460
461 let mut client = NtripClient::new(config, creds).await.unwrap();
462
463 let mut h = client
464 .mount(mount.to_string(), exit_tx.clone())
465 .await
466 .unwrap();
467
468 for _i in 0..10 {
469 let m = h.next().await.unwrap();
470 debug!("Got RTCM message: {:?}", m);
471 }
472
473 let _ = exit_tx.send(());
474 }
475}