1use eyre::{Context, Result};
39use std::str::FromStr;
40
41pub struct RobustNtripClientOptions {
43 pub max_backoff_duration: std::time::Duration,
45
46 pub timeout: Option<std::time::Duration>,
48}
49
50impl std::default::Default for RobustNtripClientOptions {
51 fn default() -> Self {
52 Self {
53 max_backoff_duration: std::time::Duration::from_secs(30),
54 timeout: Some(std::time::Duration::from_secs(10)),
55 }
56 }
57}
58
59pub struct RobustNtripClient {
62 request_url: String,
63 user_pass: Option<(String, String)>,
64 client: reqwest::Client,
65 timeout: Option<std::time::Duration>,
66 max_backoff_duration: std::time::Duration,
67
68 response: reqwest::Response,
69}
70
71impl RobustNtripClient {
72 pub async fn new(url: &str, opts: RobustNtripClientOptions) -> Result<Self> {
74 let uri: http::Uri = url
75 .parse()
76 .with_context(|| format!("While parsing NTRIP URL \"{url}\"."))?;
77
78 let (need_tls, default_port) = if let Some(scheme) = uri.scheme() {
79 let ntrip = http::uri::Scheme::from_str("ntrip").unwrap();
80 let http = http::uri::Scheme::from_str("http").unwrap();
81 let https = http::uri::Scheme::from_str("https").unwrap();
82 if scheme == &ntrip {
83 (false, Some(2101))
84 } else if scheme == &http {
85 (false, None)
86 } else if scheme == &https {
87 (true, None)
88 } else {
89 eyre::bail!("Unexpected URI scheme (found \"{scheme}\").");
90 }
91 } else {
92 eyre::bail!("No URI scheme.");
95 };
96
97 let parts = uri.into_parts();
98 let (host_port, user_pass) = if let Some(auth) = &parts.authority {
99 parse_authority(auth)?
100 } else {
101 eyre::bail!("No authority section of URL");
102 };
103 let auth = http::uri::Authority::from_maybe_shared(host_port)?;
104
105 let host = auth.host();
106 let port = auth.port_u16().or(default_port);
107 let mountpoint = if let Some(pq) = parts.path_and_query {
108 pq.path().to_string()
109 } else {
110 "/".to_string()
111 };
112
113 let scheme = if need_tls { "https" } else { "http" };
114 let port = if let Some(port) = port {
115 format!(":{port}")
116 } else {
117 "".to_string()
118 };
119 let request_url = format!("{scheme}://{host}{port}{mountpoint}");
120
121 let mut headers = reqwest::header::HeaderMap::new();
122 headers.insert(
124 "Ntrip-Version",
125 reqwest::header::HeaderValue::from_static("ntrip/2.0"),
126 );
127
128 let client = reqwest::ClientBuilder::new()
129 .tcp_keepalive(std::time::Duration::from_secs(5))
130 .default_headers(headers)
131 .user_agent(format!(
132 "NTRIP {}/{}",
133 env!("CARGO_PKG_NAME"),
134 env!("CARGO_PKG_VERSION")
135 ))
136 .build()?;
137
138 let max_backoff_duration = opts.max_backoff_duration;
139 let timeout = opts.timeout;
140 let response = establish_connection(
141 &client,
142 &request_url,
143 user_pass.as_ref(),
144 max_backoff_duration,
145 )
146 .await?;
147
148 Ok(Self {
149 request_url,
150 user_pass,
151 client,
152 timeout,
153 max_backoff_duration,
154
155 response,
156 })
157 }
158
159 pub async fn chunk(&mut self) -> Result<bytes::Bytes> {
161 if let Some(duration) = self.timeout {
162 self.next_chunk_with_timeout(duration).await
163 } else {
164 self.next_chunk_infinite_wait().await
165 }
166 }
167
168 async fn next_chunk_with_timeout(
169 &mut self,
170 duration: std::time::Duration,
171 ) -> Result<bytes::Bytes> {
172 match tokio::time::timeout(duration, self.next_chunk_infinite_wait()).await {
173 Ok(next) => next, Err(_) => {
175 tracing::warn!("Reconnecting due to timeout elapsed.");
176 self.reconnect_and_get_first_chunk().await
177 }
178 }
179 }
180
181 async fn next_chunk_infinite_wait(&mut self) -> Result<bytes::Bytes> {
182 match self.response.chunk().await {
183 Ok(Some(next)) => Ok(next), Ok(None) => {
185 tracing::warn!("Reconnecting due to end of HTTP stream.");
186 self.reconnect_and_get_first_chunk().await
187 }
188 Err(_) => {
189 tracing::warn!("Reconnecting due to error with HTTP stream.");
190 self.reconnect_and_get_first_chunk().await
191 }
192 }
193 }
194
195 async fn reconnect_and_get_first_chunk(&mut self) -> Result<bytes::Bytes> {
196 let response = establish_connection(
197 &self.client,
198 &self.request_url,
199 self.user_pass.as_ref(),
200 self.max_backoff_duration,
201 )
202 .await?;
203 self.response = response;
204 let chunk = self
205 .response
206 .chunk()
207 .await?
208 .ok_or_else(|| eyre::eyre!("Could not get first chunk"))?;
209 Ok(chunk)
210 }
211}
212
213async fn establish_connection(
214 client: &reqwest::Client,
215 request_url: &str,
216 user_pass: Option<&(String, String)>,
217 max_backoff_duration: std::time::Duration,
218) -> Result<reqwest::Response> {
219 let mut backoff = std::time::Duration::from_secs(1);
220 loop {
221 tracing::info!("Establishing connection to {request_url}.");
222 let mut req_builder = client.get(request_url);
223 if let Some((username, password)) = &user_pass {
224 req_builder = req_builder.basic_auth(username, Some(password));
225 }
226 let result_response = req_builder.send().await;
227 match result_response {
228 Ok(response) => {
229 tracing::debug!("Sent request");
230
231 if !response.status().is_success() {
232 eyre::bail!("Error getting NTRIP URL: HTTP status {}", response.status());
233 }
234
235 return Ok(response);
236 }
237 Err(e) => {
238 let error = eyre::Report::from(e);
239 let mut err_msg = format!("Could not open NTRIP URL: {error}");
240 for cause in error.chain() {
241 err_msg = format!("{err_msg}\n cause: {cause}");
242 }
243 tracing::warn!("{err_msg}");
244
245 tokio::time::sleep(backoff).await;
246 backoff = min_dur(backoff * 2, max_backoff_duration);
247 }
248 }
249 }
250}
251
252fn min_dur(a: std::time::Duration, b: std::time::Duration) -> std::time::Duration {
253 if a < b { a } else { b }
254}
255
256fn parse_authority(auth: &http::uri::Authority) -> Result<(String, Option<(String, String)>)> {
257 let auth_vec = auth.as_str().split("@").collect::<Vec<_>>();
259 match auth_vec.len() {
260 1 => {
261 let host_port = auth_vec[0].to_string();
263 Ok((host_port, None))
264 }
265 2 => {
266 let user_pass = auth_vec[0];
268 let host_port = auth_vec[1].to_string();
269 let up = user_pass.split(":").collect::<Vec<_>>();
270 if up.len() != 2 {
271 eyre::bail!("Could not parse username and password from URL");
272 }
273 let username = up[0].to_string();
274 let password = up[1].to_string();
275 Ok((host_port, Some((username, password))))
276 }
277 _ => {
278 eyre::bail!("Expected zero or one '@' symbols in authority");
279 }
280 }
281}
282
283#[test]
284fn test_parse_example_url() {
285 let uri: http::Uri = "ntrip://hostname.com:2101/mountpoint".parse().unwrap();
286 let my_scheme = http::uri::Scheme::from_str("ntrip").unwrap();
287 assert_eq!(uri.scheme(), Some(&my_scheme));
288 let authority = uri.authority().unwrap();
289 assert_eq!(authority.host(), "hostname.com");
290 assert_eq!(authority.port_u16(), Some(2101));
291 let (host_port, user_pass) = parse_authority(&authority).unwrap();
292 assert!(user_pass.is_none());
293 assert_eq!(host_port, "hostname.com:2101");
294 let path_and_query = uri.path_and_query().unwrap();
295 assert_eq!(path_and_query.path(), "/mountpoint");
296}
297
298#[test]
299fn test_parse_example_url_with_user_pass() {
300 let uri: http::Uri = "ntrip://username:password@hostname.com:2101/mountpoint"
301 .parse()
302 .unwrap();
303 let my_scheme = http::uri::Scheme::from_str("ntrip").unwrap();
304 assert_eq!(uri.scheme(), Some(&my_scheme));
305 let authority = uri.authority().unwrap();
306 assert_eq!(authority.host(), "hostname.com");
307 assert_eq!(authority.port_u16(), Some(2101));
308 let (host_port, user_pass) = parse_authority(&authority).unwrap();
309 let (username, password) = user_pass.unwrap();
310 assert_eq!(username, "username");
311 assert_eq!(password, "password");
312 assert_eq!(host_port, "hostname.com:2101");
313 let path_and_query = uri.path_and_query().unwrap();
314 assert_eq!(path_and_query.path(), "/mountpoint");
315}
316
317pub struct FrameData {
319 frame_data: bytes::BytesMut,
320 message_number: u16,
321}
322
323impl FrameData {
324 pub fn frame_data(&self) -> &[u8] {
326 &self.frame_data
327 }
328 pub fn message_number(&self) -> u16 {
330 self.message_number
331 }
332}
333
334impl From<FrameData> for Vec<u8> {
335 fn from(val: FrameData) -> Self {
336 val.frame_data.into()
337 }
338}
339
340pub struct ParsingNtripClient {
342 client: RobustNtripClient,
343 buf: bytes::BytesMut,
344}
345
346impl ParsingNtripClient {
347 pub fn new(client: RobustNtripClient) -> Self {
349 let buf = bytes::BytesMut::new();
350 Self { client, buf }
351 }
352
353 pub async fn next(&mut self) -> Result<FrameData> {
355 loop {
356 let mut advance_info = None;
357 for (i, start_byte) in (&self.buf).into_iter().enumerate() {
358 if *start_byte == 0xd3 {
359 match rtcm_rs::MessageFrame::new(&self.buf[i..]) {
360 Ok(m) => {
361 tracing::debug!(
362 "Found RTCM message {} frame with length {}",
363 m.message_number().unwrap(),
364 m.frame_len()
365 );
366 advance_info = Some((
367 i,
368 false,
369 Some((m.frame_len(), m.message_number().unwrap())),
370 ));
371 break;
372 }
373 Err(rtcm_rs::rtcm_error::RtcmError::Incomplete) => {
374 advance_info = Some((i, true, None)); break;
376 }
377 Err(rtcm_rs::rtcm_error::RtcmError::NotValid) => {
378 advance_info = Some((i + 1, false, None)); break;
380 }
381 _ => unreachable!(),
382 }
383 }
384 }
385
386 let (n_discard, do_read_more, msg_info) = if let Some(x) = advance_info {
387 x
388 } else {
389 (self.buf.len(), true, None)
391 };
392
393 let _discard_bytes = self.buf.split_to(n_discard);
394 if let Some((frame_len, message_number)) = msg_info {
395 assert!(!do_read_more);
396 let frame_data = self.buf.split_to(frame_len);
397 return Ok(FrameData {
398 frame_data,
399 message_number,
400 });
401 }
402
403 if do_read_more {
404 let this_buf = self.client.chunk().await?;
406 self.buf.extend_from_slice(&this_buf);
407 }
408 }
409 }
410}