Skip to main content

ntrip_client/
client.rs

1//! NTRIP Client implementation
2
3use std::sync::Arc;
4
5use base64::{engine::general_purpose, Engine as _};
6use futures::Stream;
7use http::{header::USER_AGENT, HeaderMap, HeaderValue, Method};
8use rtcm_rs::{Message, MessageFrame};
9use rustls::pki_types::ServerName;
10use tokio::{
11    io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt},
12    net::TcpStream,
13    select,
14    sync::{
15        broadcast::Sender as BroadcastSender,
16        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
17    },
18    task::JoinHandle,
19};
20use tokio_rustls::TlsConnector;
21use tracing::{debug, error, trace, warn};
22
23use crate::{
24    config::{NtripConfig, NtripCredentials},
25    snip::ServerInfo,
26    NtripClientError,
27};
28
29/// NTRIP Client, used to connect to an NTRIP (RTCM) service.
30/// When "mounted", the [NtripHandle] allows real-time messaging
31/// through a [Stream] channel.
32///
33/// ```
34/// use tokio::select;
35/// use tokio::sync; // broadcast channel
36/// use futures::StreamExt; // real-time channel
37///
38/// use ntrip_client::{
39///     NtripClient,
40///     NtripConfig,
41///     RtcmProvider,
42///     NtripCredentials,
43///     NtripClientError,
44/// };
45///
46/// async fn basic_listener() -> Result<(), anyhow::Error> {
47///
48///     // this network does not require SSL
49///     let config = NtripConfig::from_provider(RtcmProvider::Centipede);
50///
51///     // adapt your credentials to the network
52///     let creds = NtripCredentials::default()
53///         .with_username("centipede")
54///         .with_password("password");
55///
56///     // client definition
57///     let mut client = NtripClient::new(config, creds)
58///         .await?;
59///
60///     // list available mountpoints
61///     let mountpoints = client.list_mounts()
62///         .await?;
63///
64///     for remote in mountpoints.services {
65///         println!("{} - {}", remote.name, remote.details);
66///     }
67///
68///     // subscribe to remote server
69///     let mut handle = client.mount("VALDM").await?;
70///
71///     // listening
72///     loop {
73///         select! {
74///             message = handle.next() => match message {
75///                 Some(msg) => {
76///                     println!("received RTCM message: {:?}", msg);
77///                 },
78///                 None => {
79///                     println!("End of stream!");
80///                     break;
81///                 },
82///             },
83///         }
84///     }
85///
86///     Ok(())
87/// }
88///
89/// basic_listener();
90/// ```
91pub struct NtripClient {
92    config: NtripConfig,
93    creds: NtripCredentials,
94}
95
96/// [NtripHandle] is the Mount handle, it implements [Stream]
97/// which is how you can receive messages in real-time.
98pub struct NtripHandle<RX = UnboundedReceiver<(Message, Vec<u8>)>> {
99    _rx_handle: tokio::task::JoinHandle<()>,
100    ntrip_rx: RX,
101    exit_tx: BroadcastSender<()>,
102}
103
104impl NtripClient {
105    pub async fn new(
106        config: NtripConfig,
107        creds: NtripCredentials,
108    ) -> Result<Self, NtripClientError> {
109        Ok(NtripClient { config, creds })
110    }
111
112    /// List available mounts on the NTRIP server
113    pub async fn list_mounts(&mut self) -> Result<ServerInfo, NtripClientError> {
114        let client = reqwest::Client::builder()
115            .http1_ignore_invalid_headers_in_responses(true)
116            .http09_responses()
117            .user_agent(format!(
118                "NTRIP {}/{}",
119                env!("CARGO_PKG_NAME"),
120                env!("CARGO_PKG_VERSION")
121            ))
122            .build()?;
123
124        // TODO: auth etc.
125        let proto = if self.config.use_tls { "https" } else { "http" };
126
127        let req = client
128            .request(
129                Method::GET,
130                format!("{}://{}:{}", proto, self.config.host, self.config.port),
131            )
132            .header("Ntrip-Version", "NTRIP/2.0")
133            .build()?;
134
135        let res = client.execute(req).await?;
136
137        trace!("Fetched NTRIP response: {:?}", res.status());
138
139        let body = res.text().await?;
140
141        let lines = body.lines().collect::<Vec<&str>>();
142
143        let snip_info = ServerInfo::parse(lines.iter().cloned());
144
145        Ok(snip_info)
146    }
147
148    /// 'Mount' the [NtripClient] from remote $url/$mount service point.
149    /// On success, you can then start listening to messages from the server.
150    ///
151    /// ## Input
152    /// - mount: readable remote mount point (server name)
153    /// - exit_tx: [BroadcastSender] is passed to allow graceful exit on errors
154    ///
155    /// ## Output
156    /// - [NtripHandle] which implements [Stream] to receive messages in real-time.
157    pub async fn mount(
158        &mut self,
159        mount: impl ToString,
160    ) -> Result<NtripHandle<UnboundedReceiver<(Message, Vec<u8>)>>, NtripClientError> {
161        let (ntrip_tx, ntrip_rx) = unbounded_channel();
162
163        let (_rx_handle, exit_tx) = self.mount_internal(mount, ntrip_tx).await?;
164
165        Ok(NtripHandle {
166            _rx_handle: _rx_handle,
167            ntrip_rx: ntrip_rx,
168            exit_tx: exit_tx,
169        })
170    }
171
172    /// 'Mount' the [NtripClient] from remote $url/$mount service point.
173    /// On success, you can then start listening to messages from the server.
174    ///
175    /// ## Input
176    /// - mount: readable remote mount point (server name)
177    /// - exit_tx: [BroadcastSender] is passed to allow graceful exit on errors
178    ///
179    /// ## Output
180    /// - [NtripHandle<()>] which will route received messages throught the provided channel
181    pub async fn mount_with_sink(
182        &mut self,
183        mount: impl ToString,
184        ntrip_tx: UnboundedSender<(Message, Vec<u8>)>,
185    ) -> Result<NtripHandle<()>, NtripClientError> {
186        let (_rx_handle, exit_tx) = self.mount_internal(mount, ntrip_tx).await?;
187
188        Ok(NtripHandle {
189            _rx_handle: _rx_handle,
190            ntrip_rx: (),
191            exit_tx: exit_tx,
192        })
193    }
194
195    /// 'Mount' the [NtripClient] from remote $url/$mount service point.
196    /// On success, you can then start listening to messages from the server.
197    ///
198    /// ## Input
199    /// - mount: readable remote mount point (server name)
200    /// - exit_tx: [BroadcastSender] is passed to allow graceful exit on errors
201    ///
202    /// ## Output
203    /// - [NtripHandle] which will route received messages throught the provided channel
204    async fn mount_internal(
205        &mut self,
206        mount: impl ToString,
207        ntrip_tx: UnboundedSender<(Message, Vec<u8>)>,
208    ) -> Result<(JoinHandle<()>, BroadcastSender<()>), NtripClientError> {
209        debug!(
210            "Connecting to NTRIP server {}/{}",
211            self.config.to_url(),
212            mount.to_string()
213        );
214
215        let (exit_tx, _exit_rx) = tokio::sync::broadcast::channel(1);
216
217        let sock = TcpStream::connect(&self.config.to_url()).await?;
218
219        let rx_handle = match self.config.use_tls {
220            true => {
221                debug!("Using TLS connection");
222
223                let mut root_cert_store = rustls::RootCertStore::empty();
224                root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
225
226                let tls_config = rustls::ClientConfig::builder()
227                    .with_root_certificates(root_cert_store)
228                    .with_no_client_auth();
229                let connector = TlsConnector::from(Arc::new(tls_config));
230                let dnsname = ServerName::try_from(self.config.host.clone())?;
231
232                let tls_sock = connector.connect(dnsname, sock).await?;
233
234                Self::handle_connection(
235                    &self.config,
236                    &self.creds,
237                    &mount.to_string(),
238                    ntrip_tx,
239                    exit_tx.clone(),
240                    tls_sock,
241                )
242                .await?
243            },
244            false => {
245                debug!("Using plain TCP connection");
246
247                Self::handle_connection(
248                    &self.config,
249                    &self.creds,
250                    &mount.to_string(),
251                    ntrip_tx,
252                    exit_tx.clone(),
253                    sock,
254                )
255                .await?
256            },
257        };
258
259        Ok((rx_handle, exit_tx))
260    }
261
262    pub async fn handle_connection(
263        config: &NtripConfig,
264        creds: &NtripCredentials,
265        mount: &str,
266        ntrip_tx: UnboundedSender<(Message, Vec<u8>)>,
267        exit_tx: BroadcastSender<()>,
268        mut sock: impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
269    ) -> Result<JoinHandle<()>, NtripClientError> {
270        // Setup HTTP headers
271        let mut headers = HeaderMap::new();
272        headers.append(
273            USER_AGENT,
274            HeaderValue::from_str(&format!(
275                "NTRIP {}/{}",
276                env!("CARGO_PKG_NAME"),
277                env!("CARGO_PKG_VERSION")
278            ))?,
279        );
280
281        headers.append("Ntrip-Version", HeaderValue::from_static("NTRIP/2.0"));
282        headers.append("Accept", HeaderValue::from_static("*/*"));
283        headers.append("Connection", HeaderValue::from_static("close"));
284
285        // If we have credentials, add the Authorization header
286        if !creds.user.is_empty() {
287            let auth = general_purpose::STANDARD.encode(format!("{}:{}", creds.user, creds.pass));
288            headers.append(
289                "Authorization",
290                HeaderValue::from_str(&format!("Basic {}", auth))?,
291            );
292        }
293
294        trace!("Headers: {:#?}", headers);
295
296        // Write HTTP request
297        trace!("Write HTTP request");
298        sock.write_all(format!("GET /{} HTTP/1.0\r\n", mount).as_bytes())
299            .await?;
300        sock.write_all(format!("Host: {}\r\n", config.to_url()).as_bytes())
301            .await?;
302
303        // Write HTTP headers
304        trace!("Writing headers");
305        for h in headers.iter() {
306            sock.write_all(format!("{}: {}\r\n", h.0.as_str(), h.1.to_str()?).as_bytes())
307                .await?;
308        }
309
310        sock.write_all(b"\r\n").await?;
311        sock.flush().await?;
312
313        trace!("Reading response");
314        let mut buff = Vec::with_capacity(1024);
315
316        // Perform a first read to get the response status
317        let n = sock.read_buf(&mut buff).await?;
318        trace!("Read {} bytes, current buffer {} bytes", n, buff.len());
319
320        // Parse out response status
321        let r = String::from_utf8_lossy(&buff[..n]);
322        match r.lines().next() {
323            Some(status) if status.contains("200 OK") => {
324                trace!("Got 200 OK response");
325            },
326            Some(status) => {
327                error!("NTRIP server returned error: {}", status);
328                return Err(NtripClientError::ResponseError(status.to_string()));
329            },
330            None => {
331                error!("NTRIP server returned empty response");
332                return Err(NtripClientError::ResponseError("empty response".into()));
333            },
334        }
335
336        // Flush buffer until the first RTCM message (0xd3)
337        if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) {
338            trace!(
339                "Trimming buffer to next potential frame start at index {}",
340                i.0
341            );
342            let _ = buff.drain(..i.0);
343        }
344
345        // Spawn a task to handle incoming NTRIP data
346
347        let mut exit_rx = exit_tx.subscribe();
348        let rx_handle = tokio::task::spawn(async move {
349            // Track parse errors so we can drop data (or abort) if needed
350            let mut error_count = 0;
351
352            'listener: loop {
353                select! {
354                    n = sock.read_buf(&mut buff) => match n {
355                        Ok(n) => {
356                            trace!("Read {} bytes, current buffer {} bytes", n, buff.len());
357                            trace!("Appended {:02x?}", &buff[buff.len()-n..][..n]);
358
359                            // Handle zero length read (connection closed)
360                            if n == 0 {
361                                warn!("Zero length response");
362                                break 'listener;
363                            }
364
365                            // Trim any non-message data from the start of the buffer
366                            if buff[0] != 0xd3 {
367                                if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) {
368                                    warn!("Trimming buffer to next potential frame start at index {}", i.0);
369                                    buff.drain(..i.0);
370
371                                    assert_eq!(buff[0], 0xd3);
372                                }
373                            }
374
375                            // While we have enough data for a header,
376                            // parse out RTCM messages
377                            while buff.len() > 6 {
378                                // Attempt to parse frames
379                                match MessageFrame::new(&buff[..]) {
380                                    Ok(f) => {
381                                        // Parse out message from frame
382                                        let m = f.get_message();
383
384                                        trace!("Parsed RTCM message: {:?} (consumed {} bytes)", m, f.frame_len());
385
386                                        // Emit message
387                                        let raw_data = buff[..f.frame_len()].to_vec();
388                                        ntrip_tx.send((m, raw_data)).unwrap();
389
390                                        // Remove parsed data from the buffer
391                                        let _ = buff.drain(..f.frame_len());
392
393                                        // Reset error counter
394                                        error_count = 0;
395                                    },
396                                    Err(e) => {
397                                        warn!("RTCM parse error: {} (count: {})", e, error_count);
398
399                                        // Update error counter
400                                        error_count += 1;
401
402                                        // If we keep getting errors, abort the connection
403                                        if error_count >= 5 {
404                                            error!("Too many parse errors, closing connection");
405                                            break 'listener;
406                                        }
407
408                                        break;
409                                    }
410                                }
411                            }
412                        },
413                        Err(e) => {
414                            error!("socket read error: {}", e);
415                            break;
416                        },
417                    },
418                    _ = exit_rx.recv() => {
419                        error!("Exiting NTRIP read loop on signal");
420                        break;
421                    }
422                }
423            }
424
425            warn!("NTRIP read loop exiting");
426
427            if !buff.is_empty() {
428                warn!("Dropping {} bytes of unparsed data", buff.len());
429
430                if let Ok(s) = String::from_utf8(buff) {
431                    debug!("Unparsed data:\r\n{}", s);
432                }
433            }
434        });
435
436        Ok(rx_handle)
437    }
438}
439
440impl<RX> NtripHandle<RX> {
441    /// Check whether the NTRIP connection is still active (i.e. the read task is still running)
442    pub fn is_running(&self) -> bool {
443        !self._rx_handle.is_finished()
444    }
445}
446
447/// [Stream] NTRIP [Message]'s from an [NtripHandle]
448impl Stream for NtripHandle<UnboundedReceiver<(Message, Vec<u8>)>> {
449    type Item = (Message, Vec<u8>);
450
451    fn poll_next(
452        mut self: std::pin::Pin<&mut Self>,
453        cx: &mut std::task::Context<'_>,
454    ) -> std::task::Poll<Option<Self::Item>> {
455        self.ntrip_rx.poll_recv(cx)
456    }
457}
458
459impl<RX> Drop for NtripHandle<RX> {
460    fn drop(&mut self) {
461        let _ = self.exit_tx.send(());
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use std::env;
468
469    use futures::StreamExt;
470    use rustls::crypto::CryptoProvider;
471    use tracing::debug;
472
473    use super::*;
474    use crate::config::NtripCredentials;
475
476    fn setup_logging() {
477        let _ = tracing_subscriber::FmtSubscriber::builder()
478            .compact()
479            .without_time()
480            .with_max_level(tracing::level_filters::LevelFilter::DEBUG)
481            .try_init();
482    }
483
484    #[tokio::test]
485    #[ignore = "Requires NTRIP config from the environment"]
486    async fn test_ntrip_client() {
487        setup_logging();
488
489        // Install the default crypto provider
490        CryptoProvider::install_default(rustls::crypto::ring::default_provider()).ok();
491
492        debug!("Connecting to NTRIP server");
493
494        let (exit_tx, _exit_rx) = tokio::sync::broadcast::channel(1);
495
496        let mount = env::var("NTRIP_MOUNT").unwrap_or("ARGOACU".to_string());
497        let config = env::var("NTRIP_HOST")
498            .unwrap_or("rtk2go".to_string())
499            .parse::<NtripConfig>()
500            .unwrap();
501        let creds = NtripCredentials {
502            user: env::var("NTRIP_USER").unwrap_or("user".into()),
503            pass: env::var("NTRIP_PASS").unwrap_or("pass".into()),
504        };
505
506        let mut client = NtripClient::new(config, creds).await.unwrap();
507
508        let mut h = client.mount(mount.to_string()).await.unwrap();
509
510        for _i in 0..10 {
511            let (m, d) = h.next().await.unwrap();
512            debug!("Got RTCM message: {:?}", m);
513            debug!("Raw data: {:02x?}", d);
514        }
515
516        let _ = exit_tx.send(());
517    }
518}