ntrip_client/
lib.rs

1use thiserror::Error;
2
3mod credentials;
4use credentials::NTRIPCredentials;
5
6// use std::io::{Read, Write};
7
8// use crate::coordinate::Coordinate;
9// use crate::tcp_handler::TcpHandler;
10// use base64::Engine as _;
11// use std::io::{Read, Write};
12// use std::time::{Duration, Instant};
13
14use tokio::{
15    io::{AsyncReadExt, AsyncWriteExt},
16    net::TcpStream,
17};
18
19use rtcm_rs::next_msg_frame;
20
21#[derive(Debug, Error)]
22pub enum NTRIPClientError {
23    #[error("I/O error: {0}")]
24    IoError(#[from] std::io::Error),
25
26    #[error("failed to connect to server")]
27    Connection,
28
29    #[error("failed to send data to server")]
30    Send,
31
32    #[error("invalid response from server")]
33    BadResponse,
34}
35
36#[cfg(feature = "log")]
37use log::{error, info};
38
39/// [NTRIPClient] allows to connect to a remote NTRIP server (v1 and v2 both supported),
40/// to receiver RTCM messages. [NTRIPClient] supports both V1 and V2 NTRIP.
41#[derive(Clone)]
42pub struct NTRIPClient {
43    /// Host (url)
44    host: String,
45
46    /// Network port
47    port: u16,
48
49    /// Name of the mountpoint
50    mountpoint: String,
51
52    /// Optional [NTRIPCredentials]
53    credentials: Option<NTRIPCredentials>,
54}
55
56impl NTRIPClient {
57    const GET_ICY_RESPONSE: &str = "ICY 200 OK\r\n";
58    const GET_HTTPOK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n";
59
60    /// Creates a new [NTRIPClient]
61    /// ## Input
62    /// - host: url
63    /// - port: network port
64    /// - mountpoint: remote NTRIP "mountpoint"
65    pub fn new(host: &str, port: u16, mountpoint: &str) -> Self {
66        Self {
67            port,
68            credentials: None,
69            host: host.to_string(),
70            mountpoint: mountpoint.to_string(),
71        }
72    }
73
74    /// Update [NTRIPClient] with desired credentials
75    pub fn with_credentials(&self, user: &str, password: &str) -> Self {
76        let mut s = self.clone();
77        s.credentials = Some(NTRIPCredentials::new(user, password));
78        s
79    }
80
81    /// Define [NTRIPClient] credentials, with mutable access.
82    pub fn set_credentials(&mut self, user: &str, password: &str) {
83        self.credentials = Some(NTRIPCredentials::new(user, password));
84    }
85
86    /// Deploy this [NTRIPClient] using tokio framework.
87    pub async fn run(&mut self) -> Result<(), NTRIPClientError> {
88        let mut ptr = 0;
89
90        let mut buffer = [0u8; 1024];
91
92        let get_icy_response_len: usize = Self::GET_ICY_RESPONSE.len();
93        let get_httpok_response_len = Self::GET_HTTPOK_RESPONSE.len();
94
95        let pkg_version = env!("CARGO_PKG_VERSION");
96
97        #[cfg(feature = "log")]
98        let mut stream = TcpStream::connect((self.host.as_str(), self.port))
99            .await
100            .map_err(|e| {
101                error!("connection failed with: {}", e);
102                NTRIPClientError::Connection
103            })?;
104
105        #[cfg(not(feature = "log"))]
106        let mut stream = TcpStream::connect((self.host.as_str(), self.port))
107            .await
108            .map_err(|_| NTRIPClientError::Connection)?;
109
110        // initial $GET request
111        let mut request = format!(
112            "GET /{} HTTP/1.0\r\n
113            Host: {}\r\nNtrip-version: Ntrip/2.0\r\n
114            User-Agent: rtk-rs/ntrip-client v{}\r\n
115            Connection: close\r\n
116            Accept: */*\r\n",
117            self.mountpoint, self.host, pkg_version,
118        );
119
120        if let Some(creds) = &self.credentials {
121            request.push_str(&format!("Authorization: Basic{}\r\n", &creds.encode()));
122        }
123
124        #[cfg(feature = "log")]
125        stream.write_all(request.as_bytes()).await.map_err(|e| {
126            #[cfg(feature = "log")]
127            error!("write error: {}", e);
128            NTRIPClientError::Send
129        })?;
130
131        #[cfg(not(feature = "log"))]
132        stream
133            .write_all(request.as_bytes())
134            .await
135            .map_err(|_| NTRIPClientError::Send)?;
136
137        // response verification
138        loop {
139            let size = stream.read(&mut buffer[ptr..]).await?;
140            if size == 0 {
141                break;
142            }
143            ptr += size;
144        }
145
146        if ptr < get_icy_response_len && ptr < get_httpok_response_len {
147            #[cfg(feature = "log")]
148            error!("invalid server response");
149            return Err(NTRIPClientError::BadResponse);
150        }
151
152        let response = String::from_utf8_lossy(&buffer[..ptr]);
153
154        if !response.starts_with(Self::GET_ICY_RESPONSE) {
155            if !response.starts_with(Self::GET_HTTPOK_RESPONSE) {
156                // #[cfg(feature = "log")]
157                println!("invalid response from server: \"{}\"", response);
158                return Err(NTRIPClientError::BadResponse);
159            }
160        }
161
162        #[cfg(feature = "log")]
163        info!(
164            "rtk-rs/ntrip-client v{} - connected to {}",
165            pkg_version, self.host
166        );
167
168        loop {
169            ptr = 0;
170            let size = stream.read(&mut buffer[ptr..]).await?;
171
172            if size == 0 {
173                #[cfg(feature = "log")]
174                error!("{} - connectoion closed", self.host);
175                return Ok(());
176            }
177
178            loop {
179                let (consumed, msg) = next_msg_frame(&buffer[ptr..]);
180
181                if consumed == 0 {
182                    break;
183                }
184
185                ptr += consumed;
186
187                if let Some(msg) = msg {
188                    println!("Found {:?}", msg.get_message());
189                }
190            }
191        }
192    }
193}
194
195// #[cfg(test)]
196// mod test {
197//     use crate::NTRIPClient;
198//
199//     #[tokio::test]
200//     async fn test_simple_connection() {
201//         let mut client = NTRIPClient::new("caster.centipede.fr", 2101, "ENSMM")
202//             .with_credentials("centipede", "centipede");
203//
204//         client.run().await.unwrap_or_else(|e| {
205//             panic!("run() failed with {}", e);
206//         });
207//     }
208// }