robust_ntrip_client/
lib.rs

1//! # `robust-ntrip-client` - Robust NTRIP Client
2//!
3//! This crate provides a client to connect to a Network Transport of RTCM via
4//! Internet Protocol (NTRIP) server. RTCM stands for Radio Technical Commission
5//! for Maritime services and is the message type carrying GNSS correction
6//! signals to enable centimeter-resolution GNSS position finding.
7//!
8//! The implementation in this crate attempts to be robust against network
9//! interruptions and other transient errors. The [`RobustNtripClient`] handles
10//! the low-level interaction with the NTRIP server and would allow plugging an
11//! RTCM parsing library. The [`ParsingNtripClient`] wraps this low-level client
12//! and parses and validates the RTCM messages.
13//!
14//! See also the [`ntrip-client` crate](https://crates.io/crates/ntrip-client).
15//! I was unaware of this other crate at the time I began writing
16//! `robust-ntrip-client`.
17//!
18//! ## Example usage
19//! ```rust,no_run
20//! #[tokio::main]
21//! async fn main() -> eyre::Result<()> {
22//!    let raw_client = robust_ntrip_client::RobustNtripClient::new(
23//!        "ntrip://username:password@example-ntrip-server.com/mountpoint",
24//!        Default::default()
25//!    ).await?;
26//!    let mut ntrip = robust_ntrip_client::ParsingNtripClient::new(raw_client);
27//!
28//!    loop {
29//!        let msg = ntrip.next().await?;
30//!        println!(
31//!            "message {}: {} bytes",
32//!            msg.message_number(),
33//!            msg.frame_data().len()
34//!        );
35//!    }
36//!}
37//! ```
38use eyre::{Context, Result};
39use std::str::FromStr;
40
41/// Options for connecting to the NTRIP server.
42pub struct RobustNtripClientOptions {
43    /// Maximal interval to retry the NTRIP connection.
44    pub max_backoff_duration: std::time::Duration,
45
46    /// Reset the NTRIP connection after this duration of not receiving updates.
47    pub timeout: Option<std::time::Duration>,
48}
49
50impl std::default::Default for RobustNtripClientOptions {
51    fn default() -> Self {
52        Self {
53            max_backoff_duration: std::time::Duration::from_secs(30),
54            timeout: Some(std::time::Duration::from_secs(10)),
55        }
56    }
57}
58
59/// A client which automatically reconnects to an NTRIP server in case of
60/// interruption.
61pub struct RobustNtripClient {
62    request_url: String,
63    user_pass: Option<(String, String)>,
64    client: reqwest::Client,
65    timeout: Option<std::time::Duration>,
66    max_backoff_duration: std::time::Duration,
67
68    response: reqwest::Response,
69}
70
71impl RobustNtripClient {
72    /// Create a new connection to an NTRIP server.
73    pub async fn new(url: &str, opts: RobustNtripClientOptions) -> Result<Self> {
74        let uri: http::Uri = url
75            .parse()
76            .with_context(|| format!("While parsing NTRIP URL \"{url}\"."))?;
77
78        let (need_tls, default_port) = if let Some(scheme) = uri.scheme() {
79            let ntrip = http::uri::Scheme::from_str("ntrip").unwrap();
80            let http = http::uri::Scheme::from_str("http").unwrap();
81            let https = http::uri::Scheme::from_str("https").unwrap();
82            if scheme == &ntrip {
83                (false, Some(2101))
84            } else if scheme == &http {
85                (false, None)
86            } else if scheme == &https {
87                (true, None)
88            } else {
89                eyre::bail!("Unexpected URI scheme (found \"{scheme}\").");
90            }
91        } else {
92            // I'm not sure how this would be possible. I think parsing above would
93            // fail.
94            eyre::bail!("No URI scheme.");
95        };
96
97        let parts = uri.into_parts();
98        let (host_port, user_pass) = if let Some(auth) = &parts.authority {
99            parse_authority(auth)?
100        } else {
101            eyre::bail!("No authority section of URL");
102        };
103        let auth = http::uri::Authority::from_maybe_shared(host_port)?;
104
105        let host = auth.host();
106        let port = auth.port_u16().or(default_port);
107        let mountpoint = if let Some(pq) = parts.path_and_query {
108            pq.path().to_string()
109        } else {
110            "/".to_string()
111        };
112
113        let scheme = if need_tls { "https" } else { "http" };
114        let port = if let Some(port) = port {
115            format!(":{port}")
116        } else {
117            "".to_string()
118        };
119        let request_url = format!("{scheme}://{host}{port}{mountpoint}");
120
121        let mut headers = reqwest::header::HeaderMap::new();
122        // See https://support.pointonenav.com/polaris-ntrip-api-docs
123        headers.insert(
124            "Ntrip-Version",
125            reqwest::header::HeaderValue::from_static("ntrip/2.0"),
126        );
127
128        let client = reqwest::ClientBuilder::new()
129            .tcp_keepalive(std::time::Duration::from_secs(5))
130            .default_headers(headers)
131            .user_agent(format!(
132                "NTRIP {}/{}",
133                env!("CARGO_PKG_NAME"),
134                env!("CARGO_PKG_VERSION")
135            ))
136            .build()?;
137
138        let max_backoff_duration = opts.max_backoff_duration;
139        let timeout = opts.timeout;
140        let response = establish_connection(
141            &client,
142            &request_url,
143            user_pass.as_ref(),
144            max_backoff_duration,
145        )
146        .await?;
147
148        Ok(Self {
149            request_url,
150            user_pass,
151            client,
152            timeout,
153            max_backoff_duration,
154
155            response,
156        })
157    }
158
159    /// Get the next chunk of raw bytes from the NTRIP server.
160    pub async fn chunk(&mut self) -> Result<bytes::Bytes> {
161        if let Some(duration) = self.timeout {
162            self.next_chunk_with_timeout(duration).await
163        } else {
164            self.next_chunk_infinite_wait().await
165        }
166    }
167
168    async fn next_chunk_with_timeout(
169        &mut self,
170        duration: std::time::Duration,
171    ) -> Result<bytes::Bytes> {
172        match tokio::time::timeout(duration, self.next_chunk_infinite_wait()).await {
173            Ok(next) => next, // normal case: new data before timeout
174            Err(_) => {
175                tracing::warn!("Reconnecting due to timeout elapsed.");
176                self.reconnect_and_get_first_chunk().await
177            }
178        }
179    }
180
181    async fn next_chunk_infinite_wait(&mut self) -> Result<bytes::Bytes> {
182        match self.response.chunk().await {
183            Ok(Some(next)) => Ok(next), // normal case: new data
184            Ok(None) => {
185                tracing::warn!("Reconnecting due to end of HTTP stream.");
186                self.reconnect_and_get_first_chunk().await
187            }
188            Err(_) => {
189                tracing::warn!("Reconnecting due to error with HTTP stream.");
190                self.reconnect_and_get_first_chunk().await
191            }
192        }
193    }
194
195    async fn reconnect_and_get_first_chunk(&mut self) -> Result<bytes::Bytes> {
196        let response = establish_connection(
197            &self.client,
198            &self.request_url,
199            self.user_pass.as_ref(),
200            self.max_backoff_duration,
201        )
202        .await?;
203        self.response = response;
204        let chunk = self
205            .response
206            .chunk()
207            .await?
208            .ok_or_else(|| eyre::eyre!("Could not get first chunk"))?;
209        Ok(chunk)
210    }
211}
212
213async fn establish_connection(
214    client: &reqwest::Client,
215    request_url: &str,
216    user_pass: Option<&(String, String)>,
217    max_backoff_duration: std::time::Duration,
218) -> Result<reqwest::Response> {
219    let mut backoff = std::time::Duration::from_secs(1);
220    loop {
221        tracing::info!("Establishing connection to {request_url}.");
222        let mut req_builder = client.get(request_url);
223        if let Some((username, password)) = &user_pass {
224            req_builder = req_builder.basic_auth(username, Some(password));
225        }
226        let result_response = req_builder.send().await;
227        match result_response {
228            Ok(response) => {
229                tracing::debug!("Sent request");
230
231                if !response.status().is_success() {
232                    eyre::bail!("Error getting NTRIP URL: HTTP status {}", response.status());
233                }
234
235                return Ok(response);
236            }
237            Err(e) => {
238                let error = eyre::Report::from(e);
239                let mut err_msg = format!("Could not open NTRIP URL: {error}");
240                for cause in error.chain() {
241                    err_msg = format!("{err_msg}\n   cause: {cause}");
242                }
243                tracing::warn!("{err_msg}");
244
245                tokio::time::sleep(backoff).await;
246                backoff = min_dur(backoff * 2, max_backoff_duration);
247            }
248        }
249    }
250}
251
252fn min_dur(a: std::time::Duration, b: std::time::Duration) -> std::time::Duration {
253    if a < b { a } else { b }
254}
255
256fn parse_authority(auth: &http::uri::Authority) -> Result<(String, Option<(String, String)>)> {
257    // Replace when https://github.com/hyperium/http/pull/399 is merged.
258    let auth_vec = auth.as_str().split("@").collect::<Vec<_>>();
259    match auth_vec.len() {
260        1 => {
261            // Only "host:port"
262            let host_port = auth_vec[0].to_string();
263            Ok((host_port, None))
264        }
265        2 => {
266            // "username:password@host:port"
267            let user_pass = auth_vec[0];
268            let host_port = auth_vec[1].to_string();
269            let up = user_pass.split(":").collect::<Vec<_>>();
270            if up.len() != 2 {
271                eyre::bail!("Could not parse username and password from URL");
272            }
273            let username = up[0].to_string();
274            let password = up[1].to_string();
275            Ok((host_port, Some((username, password))))
276        }
277        _ => {
278            eyre::bail!("Expected zero or one '@' symbols in authority");
279        }
280    }
281}
282
283#[test]
284fn test_parse_example_url() {
285    let uri: http::Uri = "ntrip://hostname.com:2101/mountpoint".parse().unwrap();
286    let my_scheme = http::uri::Scheme::from_str("ntrip").unwrap();
287    assert_eq!(uri.scheme(), Some(&my_scheme));
288    let authority = uri.authority().unwrap();
289    assert_eq!(authority.host(), "hostname.com");
290    assert_eq!(authority.port_u16(), Some(2101));
291    let (host_port, user_pass) = parse_authority(&authority).unwrap();
292    assert!(user_pass.is_none());
293    assert_eq!(host_port, "hostname.com:2101");
294    let path_and_query = uri.path_and_query().unwrap();
295    assert_eq!(path_and_query.path(), "/mountpoint");
296}
297
298#[test]
299fn test_parse_example_url_with_user_pass() {
300    let uri: http::Uri = "ntrip://username:password@hostname.com:2101/mountpoint"
301        .parse()
302        .unwrap();
303    let my_scheme = http::uri::Scheme::from_str("ntrip").unwrap();
304    assert_eq!(uri.scheme(), Some(&my_scheme));
305    let authority = uri.authority().unwrap();
306    assert_eq!(authority.host(), "hostname.com");
307    assert_eq!(authority.port_u16(), Some(2101));
308    let (host_port, user_pass) = parse_authority(&authority).unwrap();
309    let (username, password) = user_pass.unwrap();
310    assert_eq!(username, "username");
311    assert_eq!(password, "password");
312    assert_eq!(host_port, "hostname.com:2101");
313    let path_and_query = uri.path_and_query().unwrap();
314    assert_eq!(path_and_query.path(), "/mountpoint");
315}
316
317/// One valid frame of RTCM data from the NTRIP server.
318pub struct FrameData {
319    frame_data: bytes::BytesMut,
320    message_number: u16,
321}
322
323impl FrameData {
324    /// Get the RTCM data.
325    pub fn frame_data(&self) -> &[u8] {
326        &self.frame_data
327    }
328    /// Get the RTCM message number.
329    pub fn message_number(&self) -> u16 {
330        self.message_number
331    }
332}
333
334impl From<FrameData> for Vec<u8> {
335    fn from(val: FrameData) -> Self {
336        val.frame_data.into()
337    }
338}
339
340/// A client which parses RTCM messages from the NTRIP stream.
341pub struct ParsingNtripClient {
342    client: RobustNtripClient,
343    buf: bytes::BytesMut,
344}
345
346impl ParsingNtripClient {
347    /// Create a parsing NTRIP client by wrapping a low-level NTRIP client.
348    pub fn new(client: RobustNtripClient) -> Self {
349        let buf = bytes::BytesMut::new();
350        Self { client, buf }
351    }
352
353    /// Get the next RTCM message from the NTRIP server.
354    pub async fn next(&mut self) -> Result<FrameData> {
355        loop {
356            let mut advance_info = None;
357            for (i, start_byte) in (&self.buf).into_iter().enumerate() {
358                if *start_byte == 0xd3 {
359                    match rtcm_rs::MessageFrame::new(&self.buf[i..]) {
360                        Ok(m) => {
361                            tracing::debug!(
362                                "Found RTCM message {} frame with length {}",
363                                m.message_number().unwrap(),
364                                m.frame_len()
365                            );
366                            advance_info = Some((
367                                i,
368                                false,
369                                Some((m.frame_len(), m.message_number().unwrap())),
370                            ));
371                            break;
372                        }
373                        Err(rtcm_rs::rtcm_error::RtcmError::Incomplete) => {
374                            advance_info = Some((i, true, None)); // discard data prior to the start byte.
375                            break;
376                        }
377                        Err(rtcm_rs::rtcm_error::RtcmError::NotValid) => {
378                            advance_info = Some((i + 1, false, None)); // advance past the invalid "start byte".
379                            break;
380                        }
381                        _ => unreachable!(),
382                    }
383                }
384            }
385
386            let (n_discard, do_read_more, msg_info) = if let Some(x) = advance_info {
387                x
388            } else {
389                // no start byte found, so we need to read more data.
390                (self.buf.len(), true, None)
391            };
392
393            let _discard_bytes = self.buf.split_to(n_discard);
394            if let Some((frame_len, message_number)) = msg_info {
395                assert!(!do_read_more);
396                let frame_data = self.buf.split_to(frame_len);
397                return Ok(FrameData {
398                    frame_data,
399                    message_number,
400                });
401            }
402
403            if do_read_more {
404                // Fetch more data.
405                let this_buf = self.client.chunk().await?;
406                self.buf.extend_from_slice(&this_buf);
407            }
408        }
409    }
410}