1use std::sync::Arc;
4
5use base64::{engine::general_purpose, Engine as _};
6use futures::Stream;
7use http::{header::USER_AGENT, HeaderMap, HeaderValue, Method};
8use rtcm_rs::{Message, MessageFrame};
9use rustls::pki_types::ServerName;
10use tokio::{
11 io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt},
12 net::TcpStream,
13 select,
14 sync::{
15 broadcast::Sender as BroadcastSender,
16 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
17 },
18 task::JoinHandle,
19};
20use tokio_rustls::TlsConnector;
21use tracing::{debug, error, trace, warn};
22
23use crate::{
24 config::{NtripConfig, NtripCredentials},
25 snip::ServerInfo,
26 NtripClientError,
27};
28
29pub struct NtripClient {
92 config: NtripConfig,
93 creds: NtripCredentials,
94}
95
96pub struct NtripHandle<RX = UnboundedReceiver<(Message, Vec<u8>)>> {
99 _rx_handle: tokio::task::JoinHandle<()>,
100 ntrip_rx: RX,
101 exit_tx: BroadcastSender<()>,
102}
103
104impl NtripClient {
105 pub async fn new(
106 config: NtripConfig,
107 creds: NtripCredentials,
108 ) -> Result<Self, NtripClientError> {
109 Ok(NtripClient { config, creds })
110 }
111
112 pub async fn list_mounts(&mut self) -> Result<ServerInfo, NtripClientError> {
114 let client = reqwest::Client::builder()
115 .http1_ignore_invalid_headers_in_responses(true)
116 .http09_responses()
117 .user_agent(format!(
118 "NTRIP {}/{}",
119 env!("CARGO_PKG_NAME"),
120 env!("CARGO_PKG_VERSION")
121 ))
122 .build()?;
123
124 let proto = if self.config.use_tls { "https" } else { "http" };
126
127 let req = client
128 .request(
129 Method::GET,
130 format!("{}://{}:{}", proto, self.config.host, self.config.port),
131 )
132 .header("Ntrip-Version", "NTRIP/2.0")
133 .build()?;
134
135 let res = client.execute(req).await?;
136
137 trace!("Fetched NTRIP response: {:?}", res.status());
138
139 let body = res.text().await?;
140
141 let lines = body.lines().collect::<Vec<&str>>();
142
143 let snip_info = ServerInfo::parse(lines.iter().cloned());
144
145 Ok(snip_info)
146 }
147
148 pub async fn mount(
158 &mut self,
159 mount: impl ToString,
160 ) -> Result<NtripHandle<UnboundedReceiver<(Message, Vec<u8>)>>, NtripClientError> {
161 let (ntrip_tx, ntrip_rx) = unbounded_channel();
162
163 let (_rx_handle, exit_tx) = self.mount_internal(mount, ntrip_tx).await?;
164
165 Ok(NtripHandle {
166 _rx_handle: _rx_handle,
167 ntrip_rx: ntrip_rx,
168 exit_tx: exit_tx,
169 })
170 }
171
172 pub async fn mount_with_sink(
182 &mut self,
183 mount: impl ToString,
184 ntrip_tx: UnboundedSender<(Message, Vec<u8>)>,
185 ) -> Result<NtripHandle<()>, NtripClientError> {
186 let (_rx_handle, exit_tx) = self.mount_internal(mount, ntrip_tx).await?;
187
188 Ok(NtripHandle {
189 _rx_handle: _rx_handle,
190 ntrip_rx: (),
191 exit_tx: exit_tx,
192 })
193 }
194
195 async fn mount_internal(
205 &mut self,
206 mount: impl ToString,
207 ntrip_tx: UnboundedSender<(Message, Vec<u8>)>,
208 ) -> Result<(JoinHandle<()>, BroadcastSender<()>), NtripClientError> {
209 debug!(
210 "Connecting to NTRIP server {}/{}",
211 self.config.to_url(),
212 mount.to_string()
213 );
214
215 let (exit_tx, _exit_rx) = tokio::sync::broadcast::channel(1);
216
217 let sock = TcpStream::connect(&self.config.to_url()).await?;
218
219 let rx_handle = match self.config.use_tls {
220 true => {
221 debug!("Using TLS connection");
222
223 let mut root_cert_store = rustls::RootCertStore::empty();
224 root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
225
226 let tls_config = rustls::ClientConfig::builder()
227 .with_root_certificates(root_cert_store)
228 .with_no_client_auth();
229 let connector = TlsConnector::from(Arc::new(tls_config));
230 let dnsname = ServerName::try_from(self.config.host.clone())?;
231
232 let tls_sock = connector.connect(dnsname, sock).await?;
233
234 Self::handle_connection(
235 &self.config,
236 &self.creds,
237 &mount.to_string(),
238 ntrip_tx,
239 exit_tx.clone(),
240 tls_sock,
241 )
242 .await?
243 },
244 false => {
245 debug!("Using plain TCP connection");
246
247 Self::handle_connection(
248 &self.config,
249 &self.creds,
250 &mount.to_string(),
251 ntrip_tx,
252 exit_tx.clone(),
253 sock,
254 )
255 .await?
256 },
257 };
258
259 Ok((rx_handle, exit_tx))
260 }
261
262 pub async fn handle_connection(
263 config: &NtripConfig,
264 creds: &NtripCredentials,
265 mount: &str,
266 ntrip_tx: UnboundedSender<(Message, Vec<u8>)>,
267 exit_tx: BroadcastSender<()>,
268 mut sock: impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
269 ) -> Result<JoinHandle<()>, NtripClientError> {
270 let mut headers = HeaderMap::new();
272 headers.append(
273 USER_AGENT,
274 HeaderValue::from_str(&format!(
275 "NTRIP {}/{}",
276 env!("CARGO_PKG_NAME"),
277 env!("CARGO_PKG_VERSION")
278 ))?,
279 );
280
281 headers.append("Ntrip-Version", HeaderValue::from_static("NTRIP/2.0"));
282 headers.append("Accept", HeaderValue::from_static("*/*"));
283 headers.append("Connection", HeaderValue::from_static("close"));
284
285 if !creds.user.is_empty() {
287 let auth = general_purpose::STANDARD.encode(format!("{}:{}", creds.user, creds.pass));
288 headers.append(
289 "Authorization",
290 HeaderValue::from_str(&format!("Basic {}", auth))?,
291 );
292 }
293
294 trace!("Headers: {:#?}", headers);
295
296 trace!("Write HTTP request");
298 sock.write_all(format!("GET /{} HTTP/1.0\r\n", mount).as_bytes())
299 .await?;
300 sock.write_all(format!("Host: {}\r\n", config.to_url()).as_bytes())
301 .await?;
302
303 trace!("Writing headers");
305 for h in headers.iter() {
306 sock.write_all(format!("{}: {}\r\n", h.0.as_str(), h.1.to_str()?).as_bytes())
307 .await?;
308 }
309
310 sock.write_all(b"\r\n").await?;
311 sock.flush().await?;
312
313 trace!("Reading response");
314 let mut buff = Vec::with_capacity(1024);
315
316 let n = sock.read_buf(&mut buff).await?;
318 trace!("Read {} bytes, current buffer {} bytes", n, buff.len());
319
320 let r = String::from_utf8_lossy(&buff[..n]);
322 match r.lines().next() {
323 Some(status) if status.contains("200 OK") => {
324 trace!("Got 200 OK response");
325 },
326 Some(status) => {
327 error!("NTRIP server returned error: {}", status);
328 return Err(NtripClientError::ResponseError(status.to_string()));
329 },
330 None => {
331 error!("NTRIP server returned empty response");
332 return Err(NtripClientError::ResponseError("empty response".into()));
333 },
334 }
335
336 if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) {
338 trace!(
339 "Trimming buffer to next potential frame start at index {}",
340 i.0
341 );
342 let _ = buff.drain(..i.0);
343 }
344
345 let mut exit_rx = exit_tx.subscribe();
348 let rx_handle = tokio::task::spawn(async move {
349 let mut error_count = 0;
351
352 'listener: loop {
353 select! {
354 n = sock.read_buf(&mut buff) => match n {
355 Ok(n) => {
356 trace!("Read {} bytes, current buffer {} bytes", n, buff.len());
357 trace!("Appended {:02x?}", &buff[buff.len()-n..][..n]);
358
359 if n == 0 {
361 warn!("Zero length response");
362 break 'listener;
363 }
364
365 if buff[0] != 0xd3 {
367 if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) {
368 warn!("Trimming buffer to next potential frame start at index {}", i.0);
369 buff.drain(..i.0);
370
371 assert_eq!(buff[0], 0xd3);
372 }
373 }
374
375 while buff.len() > 6 {
378 match MessageFrame::new(&buff[..]) {
380 Ok(f) => {
381 let m = f.get_message();
383
384 trace!("Parsed RTCM message: {:?} (consumed {} bytes)", m, f.frame_len());
385
386 let raw_data = buff[..f.frame_len()].to_vec();
388 ntrip_tx.send((m, raw_data)).unwrap();
389
390 let _ = buff.drain(..f.frame_len());
392
393 error_count = 0;
395 },
396 Err(e) => {
397 warn!("RTCM parse error: {} (count: {})", e, error_count);
398
399 error_count += 1;
401
402 if error_count >= 5 {
404 error!("Too many parse errors, closing connection");
405 break 'listener;
406 }
407
408 break;
409 }
410 }
411 }
412 },
413 Err(e) => {
414 error!("socket read error: {}", e);
415 break;
416 },
417 },
418 _ = exit_rx.recv() => {
419 error!("Exiting NTRIP read loop on signal");
420 break;
421 }
422 }
423 }
424
425 warn!("NTRIP read loop exiting");
426
427 if !buff.is_empty() {
428 warn!("Dropping {} bytes of unparsed data", buff.len());
429
430 if let Ok(s) = String::from_utf8(buff) {
431 debug!("Unparsed data:\r\n{}", s);
432 }
433 }
434 });
435
436 Ok(rx_handle)
437 }
438}
439
440impl<RX> NtripHandle<RX> {
441 pub fn is_running(&self) -> bool {
443 !self._rx_handle.is_finished()
444 }
445}
446
447impl Stream for NtripHandle<UnboundedReceiver<(Message, Vec<u8>)>> {
449 type Item = (Message, Vec<u8>);
450
451 fn poll_next(
452 mut self: std::pin::Pin<&mut Self>,
453 cx: &mut std::task::Context<'_>,
454 ) -> std::task::Poll<Option<Self::Item>> {
455 self.ntrip_rx.poll_recv(cx)
456 }
457}
458
459impl<RX> Drop for NtripHandle<RX> {
460 fn drop(&mut self) {
461 let _ = self.exit_tx.send(());
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use std::env;
468
469 use futures::StreamExt;
470 use rustls::crypto::CryptoProvider;
471 use tracing::debug;
472
473 use super::*;
474 use crate::config::NtripCredentials;
475
476 fn setup_logging() {
477 let _ = tracing_subscriber::FmtSubscriber::builder()
478 .compact()
479 .without_time()
480 .with_max_level(tracing::level_filters::LevelFilter::DEBUG)
481 .try_init();
482 }
483
484 #[tokio::test]
485 #[ignore = "Requires NTRIP config from the environment"]
486 async fn test_ntrip_client() {
487 setup_logging();
488
489 CryptoProvider::install_default(rustls::crypto::ring::default_provider()).ok();
491
492 debug!("Connecting to NTRIP server");
493
494 let (exit_tx, _exit_rx) = tokio::sync::broadcast::channel(1);
495
496 let mount = env::var("NTRIP_MOUNT").unwrap_or("ARGOACU".to_string());
497 let config = env::var("NTRIP_HOST")
498 .unwrap_or("rtk2go".to_string())
499 .parse::<NtripConfig>()
500 .unwrap();
501 let creds = NtripCredentials {
502 user: env::var("NTRIP_USER").unwrap_or("user".into()),
503 pass: env::var("NTRIP_PASS").unwrap_or("pass".into()),
504 };
505
506 let mut client = NtripClient::new(config, creds).await.unwrap();
507
508 let mut h = client.mount(mount.to_string()).await.unwrap();
509
510 for _i in 0..10 {
511 let (m, d) = h.next().await.unwrap();
512 debug!("Got RTCM message: {:?}", m);
513 debug!("Raw data: {:02x?}", d);
514 }
515
516 let _ = exit_tx.send(());
517 }
518}