ntrip_client/
client.rs

1//! NTRIP Client implementation
2
3use std::sync::Arc;
4
5use base64::{engine::general_purpose, Engine as _};
6use futures::Stream;
7use http::{
8    header::{InvalidHeaderValue, ToStrError, USER_AGENT},
9    HeaderMap, HeaderValue, Method,
10};
11use rtcm_rs::{Message, MessageFrame};
12use rustls::pki_types::{InvalidDnsNameError, ServerName};
13use tokio::{
14    io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt},
15    net::TcpStream,
16    select,
17    sync::{
18        broadcast::Sender as BroadcastSender,
19        mpsc::{unbounded_channel, UnboundedReceiver},
20    },
21    task::JoinHandle,
22};
23use tokio_rustls::TlsConnector;
24use tracing::{debug, error, trace, warn};
25
26use crate::{
27    config::{NtripConfig, NtripCredentials},
28    snip::ServerInfo,
29};
30
31/// NTRIP Client, used to connect to an NTRIP (RTCM) service.
32/// When "mounted", the [NtripHandle] allows real-time messaging
33/// through a [Stream] channel.
34///
35/// ```
36/// use tokio::select;
37/// use tokio::sync; // broadcast channem
38/// use futures::StreamExt; // real-time channel
39///
40/// use ntrip_client::{
41///     NtripClient,
42///     NtripConfig,
43///     RtcmProvider,
44///     NtripCredentials,
45///     NtripClientError,
46/// };
47///
48/// async fn basic_listener() -> Result<(), anyhow::Error> {
49///
50///     // this network does not require SSL
51///     let config = NtripConfig::from_provider(RtcmProvider::Centipede);
52///
53///     // adapt your credentials to the network
54///     let creds = NtripCredentials::default()
55///         .with_username("user")
56///         .with_password("password");
57///
58///     // client definition
59///     let mut client = NtripClient::new(config, creds)
60///         .await?;
61///
62///     // this channel allows graceful exit
63///     let (exit_tx, mut exit_rx) = sync::broadcast::channel(1);
64///
65///     // subscribe to remote server
66///     let mut mountpoint = client.mount("192.168.1.1:1234", exit_tx).await?;
67///
68///     // listening
69///     loop {
70///         select! {
71///             message = mountpoint.next() => match message {
72///                 Some(msg) => {
73///                     println!("received RTCM message: {:?}", msg);
74///                 },
75///                 None => {
76///                     println!("End of stream!");
77///                     break;
78///                 },
79///             },
80///             _ = exit_rx.recv() => {
81///                 println!("graceful exit");
82///                 break;
83///             },
84///         }
85///     }
86///
87///     Ok(())
88/// }
89///
90/// basic_listener();
91/// ```
92pub struct NtripClient {
93    config: NtripConfig,
94    creds: NtripCredentials,
95}
96
97/// [NtripHandle] is the Mount handle, it implements [Stream]
98/// which is how you can receiver messages in real-time.
99pub struct NtripHandle {
100    _rx_handle: tokio::task::JoinHandle<()>,
101    ntrip_rx: UnboundedReceiver<Message>,
102}
103
104#[derive(Debug, thiserror::Error)]
105pub enum NtripClientError {
106    #[error("Io error: {0}")]
107    Io(#[from] std::io::Error),
108
109    #[error("Reqwest error: {0}")]
110    Reqwest(#[from] reqwest::Error),
111
112    #[error("Invalid header value {0}")]
113    InvalidHeaderValue(#[from] InvalidHeaderValue),
114
115    #[error("Invalid DNS name {0}")]
116    InvalidDnsName(#[from] InvalidDnsNameError),
117
118    #[error("Header ToStrError error {0}")]
119    ToStrError(#[from] ToStrError),
120
121    #[error("Response error")]
122    ResponseError(String),
123}
124
125impl NtripClient {
126    pub async fn new(
127        config: NtripConfig,
128        creds: NtripCredentials,
129    ) -> Result<Self, NtripClientError> {
130        Ok(NtripClient { config, creds })
131    }
132
133    /// List available mounts on the NTRIP server
134    pub async fn list_mounts(&mut self) -> Result<ServerInfo, NtripClientError> {
135        let client = reqwest::Client::builder()
136            .http1_ignore_invalid_headers_in_responses(true)
137            .http09_responses()
138            .user_agent(format!(
139                "NTRIP {}/{}",
140                env!("CARGO_PKG_NAME"),
141                env!("CARGO_PKG_VERSION")
142            ))
143            .build()?;
144
145        // TODO: auth etc.
146        let proto = if self.config.use_tls { "https" } else { "http" };
147
148        let req = client
149            .request(
150                Method::GET,
151                format!("{}://{}:{}", proto, self.config.host, self.config.port),
152            )
153            .header("Ntrip-Version", "NTRIP/2.0")
154            .build()?;
155
156        let res = client.execute(req).await?;
157
158        debug!("Fetched NTRIP response: {:?}", res.status());
159
160        let body = res.text().await?;
161
162        let lines = body.lines().collect::<Vec<&str>>();
163
164        let snip_info = ServerInfo::parse(lines.iter().cloned());
165
166        Ok(snip_info)
167    }
168
169    /// 'Mount' the [NtripClient] from remote $url/$mount service point.
170    /// On success, you can then start listening to messages from the server.
171    ///
172    /// ## Input
173    /// - mount: readable remote mount point (server name)
174    /// - exit_tx: [BroadcastSender] is passed to allow graceful exit on errors
175    ///
176    /// ## Output
177    /// - [NtripHandle] which implements [Stream] to receive messages in real-time.
178    pub async fn mount(
179        &mut self,
180        mount: impl ToString,
181        exit_tx: BroadcastSender<()>,
182    ) -> Result<NtripHandle, NtripClientError> {
183        debug!(
184            "Connecting to NTRIP server {}/{}",
185            self.config.to_url(),
186            mount.to_string()
187        );
188
189        let sock = TcpStream::connect(&self.config.to_url()).await?;
190
191        let (rx_handle, ntrip_rx) = match self.config.use_tls {
192            true => {
193                debug!("Using TLS connection");
194
195                let mut root_cert_store = rustls::RootCertStore::empty();
196                root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
197
198                let tls_config = rustls::ClientConfig::builder()
199                    .with_root_certificates(root_cert_store)
200                    .with_no_client_auth();
201                let connector = TlsConnector::from(Arc::new(tls_config));
202                let dnsname = ServerName::try_from(self.config.host.clone())?;
203
204                let tls_sock = connector.connect(dnsname, sock).await?;
205
206                Self::handle_connection(
207                    &self.config,
208                    &self.creds,
209                    &mount.to_string(),
210                    exit_tx.clone(),
211                    tls_sock,
212                )
213                .await?
214            },
215            false => {
216                debug!("Using plain TCP connection");
217
218                Self::handle_connection(
219                    &self.config,
220                    &self.creds,
221                    &mount.to_string(),
222                    exit_tx.clone(),
223                    sock,
224                )
225                .await?
226            },
227        };
228
229        Ok(NtripHandle {
230            _rx_handle: rx_handle,
231            ntrip_rx,
232        })
233    }
234
235    pub async fn handle_connection(
236        config: &NtripConfig,
237        creds: &NtripCredentials,
238        mount: &str,
239        exit_tx: BroadcastSender<()>,
240        mut sock: impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
241    ) -> Result<(JoinHandle<()>, UnboundedReceiver<Message>), NtripClientError> {
242        // Setup HTTP headers
243        let mut headers = HeaderMap::new();
244        headers.append(
245            USER_AGENT,
246            HeaderValue::from_str(&format!(
247                "NTRIP {}/{}",
248                env!("CARGO_PKG_NAME"),
249                env!("CARGO_PKG_VERSION")
250            ))?,
251        );
252
253        headers.append("Ntrip-Version", HeaderValue::from_static("NTRIP/2.0"));
254        headers.append("Accept", HeaderValue::from_static("*/*"));
255        headers.append("Connection", HeaderValue::from_static("close"));
256
257        // If we have credentials, add the Authorization header
258        if !creds.user.is_empty() {
259            let auth = general_purpose::STANDARD.encode(format!("{}:{}", creds.user, creds.pass));
260            headers.append(
261                "Authorization",
262                HeaderValue::from_str(&format!("Basic {}", auth))?,
263            );
264        }
265
266        debug!("Headers: {:#?}", headers);
267
268        // Write HTTP request
269        debug!("Write HTTP request");
270        sock.write_all(format!("GET /{} HTTP/1.0\r\n", mount).as_bytes())
271            .await?;
272        sock.write_all(format!("Host: {}\r\n", config.to_url()).as_bytes())
273            .await?;
274
275        // Write HTTP headers
276        debug!("Writing headers");
277        for h in headers.iter() {
278            sock.write_all(format!("{}: {}\r\n", h.0.as_str(), h.1.to_str()?).as_bytes())
279                .await?;
280        }
281
282        sock.write_all(b"\r\n").await?;
283        sock.flush().await?;
284
285        debug!("Reading response");
286        let mut buff = Vec::with_capacity(1024);
287
288        // Perform a first read to get the response status
289        let n = sock.read_buf(&mut buff).await?;
290        debug!("Read {} bytes, current buffer {} bytes", n, buff.len());
291
292        // Parse out response status
293        let r = String::from_utf8_lossy(&buff[..n]);
294        match r.lines().next() {
295            Some(status) if status.contains("200 OK") => {
296                debug!("Got 200 OK response");
297            },
298            Some(status) => {
299                error!("NTRIP server returned error: {}", status);
300                return Err(NtripClientError::ResponseError(status.to_string()));
301            },
302            None => {
303                error!("NTRIP server returned empty response");
304                return Err(NtripClientError::ResponseError("empty response".into()));
305            },
306        }
307
308        // Flush buffer until the first RTCM message (0xd3)
309        if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) {
310            debug!(
311                "Trimming buffer to next potential frame start at index {}",
312                i.0
313            );
314            let _ = buff.drain(..i.0);
315        }
316
317        // Spawn a task to handle incoming NTRIP data
318
319        let (ntrip_tx, ntrip_rx) = unbounded_channel();
320        let mut exit_rx = exit_tx.subscribe();
321        let rx_handle = tokio::task::spawn(async move {
322            // Track parse errors so we can drop data (or abort) if needed
323            let mut error_count = 0;
324
325            'listener: loop {
326                select! {
327                    n = sock.read_buf(&mut buff) => match n {
328                        Ok(n) => {
329                            debug!("Read {} bytes, current buffer {} bytes", n, buff.len());
330                            trace!("Appended {:02x?}", &buff[buff.len()-n..][..n]);
331
332                            // Handle zero length read (connection closed)
333                            if n == 0 {
334                                warn!("Zero length response");
335                                break 'listener;
336                            }
337
338                            // Trim any non-message data from the start of the buffer
339                            if buff[0] != 0xd3 {
340                                if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) {
341                                    warn!("Trimming buffer to next potential frame start at index {}", i.0);
342                                    buff.drain(..i.0);
343
344                                    assert_eq!(buff[0], 0xd3);
345                                }
346                            }
347
348                            // While we have enough data for a header,
349                            // parse out RTCM messages
350                            while buff.len() > 6 {
351                                // Attempt to parse frames
352                                match MessageFrame::new(&buff[..]) {
353                                    Ok(f) => {
354                                        // Parse out message from frame
355                                        let m = f.get_message();
356
357                                        debug!("Parsed RTCM message: {:?} (consumed {} bytes)", m, f.frame_len());
358
359                                        // Emit message
360                                        ntrip_tx.send(m).unwrap();
361
362                                        // Remove parsed data from the buffer
363                                        let _ = buff.drain(..f.frame_len());
364
365                                        // Reset error counter
366                                        error_count = 0;
367                                    },
368                                    Err(e) => {
369                                        warn!("RTCM parse error: {} (count: {})", e, error_count);
370
371                                        // Update error counter
372                                        error_count += 1;
373
374                                        // If we keep getting errors, abort the connection
375                                        if error_count >= 5 {
376                                            error!("Too many parse errors, closing connection");
377                                            break 'listener;
378                                        }
379
380                                        break;
381                                    }
382                                }
383                            }
384                        },
385                        Err(e) => {
386                            error!("socket read error: {}", e);
387                            break;
388                        },
389                    },
390                    _ = exit_rx.recv() => {
391                        error!("Exiting NTRIP read loop on signal");
392                        break;
393                    }
394                }
395            }
396
397            warn!("NTRIP read loop exiting");
398
399            if !buff.is_empty() {
400                warn!("Dropping {} bytes of unparsed data", buff.len());
401
402                if let Ok(s) = String::from_utf8(buff) {
403                    debug!("Unparsed data:\r\n{}", s);
404                }
405            }
406        });
407
408        Ok((rx_handle, ntrip_rx))
409    }
410}
411
412/// [Stream] NTRIP [Message]'s from an [NtripHandle]
413impl Stream for NtripHandle {
414    type Item = Message;
415
416    fn poll_next(
417        mut self: std::pin::Pin<&mut Self>,
418        cx: &mut std::task::Context<'_>,
419    ) -> std::task::Poll<Option<Self::Item>> {
420        self.ntrip_rx.poll_recv(cx)
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use std::env;
427
428    use futures::StreamExt;
429    use tracing::debug;
430
431    use super::*;
432    use crate::config::NtripCredentials;
433
434    fn setup_logging() {
435        let _ = tracing_subscriber::FmtSubscriber::builder()
436            .compact()
437            .without_time()
438            .with_max_level(tracing::level_filters::LevelFilter::DEBUG)
439            .try_init();
440    }
441
442    #[tokio::test]
443    #[ignore = "Requires NTRIP config from the environment"]
444    async fn test_ntrip_client() {
445        setup_logging();
446
447        debug!("Connecting to NTRIP server");
448
449        let (exit_tx, _exit_rx) = tokio::sync::broadcast::channel(1);
450
451        let mount = env::var("NTRIP_MOUNT").unwrap_or("ARGOACU".to_string());
452        let config = env::var("NTRIP_HOST")
453            .unwrap_or("rtk2go".to_string())
454            .parse::<NtripConfig>()
455            .unwrap();
456        let creds = NtripCredentials {
457            user: env::var("NTRIP_USER").unwrap_or("user".into()),
458            pass: env::var("NTRIP_PASS").unwrap_or("pass".into()),
459        };
460
461        let mut client = NtripClient::new(config, creds).await.unwrap();
462
463        let mut h = client
464            .mount(mount.to_string(), exit_tx.clone())
465            .await
466            .unwrap();
467
468        for _i in 0..10 {
469            let m = h.next().await.unwrap();
470            debug!("Got RTCM message: {:?}", m);
471        }
472
473        let _ = exit_tx.send(());
474    }
475}