1use thiserror::Error;
2
3mod credentials;
4use credentials::NTRIPCredentials;
5
6use tokio::{
15 io::{AsyncReadExt, AsyncWriteExt},
16 net::TcpStream,
17};
18
19use rtcm_rs::next_msg_frame;
20
21#[derive(Debug, Error)]
22pub enum NTRIPClientError {
23 #[error("I/O error: {0}")]
24 IoError(#[from] std::io::Error),
25
26 #[error("failed to connect to server")]
27 Connection,
28
29 #[error("failed to send data to server")]
30 Send,
31
32 #[error("invalid response from server")]
33 BadResponse,
34}
35
36#[cfg(feature = "log")]
37use log::{error, info};
38
39#[derive(Clone)]
42pub struct NTRIPClient {
43 host: String,
45
46 port: u16,
48
49 mountpoint: String,
51
52 credentials: Option<NTRIPCredentials>,
54}
55
56impl NTRIPClient {
57 const GET_ICY_RESPONSE: &str = "ICY 200 OK\r\n";
58 const GET_HTTPOK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n";
59
60 pub fn new(host: &str, port: u16, mountpoint: &str) -> Self {
66 Self {
67 port,
68 credentials: None,
69 host: host.to_string(),
70 mountpoint: mountpoint.to_string(),
71 }
72 }
73
74 pub fn with_credentials(&self, user: &str, password: &str) -> Self {
76 let mut s = self.clone();
77 s.credentials = Some(NTRIPCredentials::new(user, password));
78 s
79 }
80
81 pub fn set_credentials(&mut self, user: &str, password: &str) {
83 self.credentials = Some(NTRIPCredentials::new(user, password));
84 }
85
86 pub async fn run(&mut self) -> Result<(), NTRIPClientError> {
88 let mut ptr = 0;
89
90 let mut buffer = [0u8; 1024];
91
92 let get_icy_response_len: usize = Self::GET_ICY_RESPONSE.len();
93 let get_httpok_response_len = Self::GET_HTTPOK_RESPONSE.len();
94
95 let pkg_version = env!("CARGO_PKG_VERSION");
96
97 #[cfg(feature = "log")]
98 let mut stream = TcpStream::connect((self.host.as_str(), self.port))
99 .await
100 .map_err(|e| {
101 error!("connection failed with: {}", e);
102 NTRIPClientError::Connection
103 })?;
104
105 #[cfg(not(feature = "log"))]
106 let mut stream = TcpStream::connect((self.host.as_str(), self.port))
107 .await
108 .map_err(|_| NTRIPClientError::Connection)?;
109
110 let mut request = format!(
112 "GET /{} HTTP/1.0\r\n
113 Host: {}\r\nNtrip-version: Ntrip/2.0\r\n
114 User-Agent: rtk-rs/ntrip-client v{}\r\n
115 Connection: close\r\n
116 Accept: */*\r\n",
117 self.mountpoint, self.host, pkg_version,
118 );
119
120 if let Some(creds) = &self.credentials {
121 request.push_str(&format!("Authorization: Basic{}\r\n", &creds.encode()));
122 }
123
124 #[cfg(feature = "log")]
125 stream.write_all(request.as_bytes()).await.map_err(|e| {
126 #[cfg(feature = "log")]
127 error!("write error: {}", e);
128 NTRIPClientError::Send
129 })?;
130
131 #[cfg(not(feature = "log"))]
132 stream
133 .write_all(request.as_bytes())
134 .await
135 .map_err(|_| NTRIPClientError::Send)?;
136
137 loop {
139 let size = stream.read(&mut buffer[ptr..]).await?;
140 if size == 0 {
141 break;
142 }
143 ptr += size;
144 }
145
146 if ptr < get_icy_response_len && ptr < get_httpok_response_len {
147 #[cfg(feature = "log")]
148 error!("invalid server response");
149 return Err(NTRIPClientError::BadResponse);
150 }
151
152 let response = String::from_utf8_lossy(&buffer[..ptr]);
153
154 if !response.starts_with(Self::GET_ICY_RESPONSE) {
155 if !response.starts_with(Self::GET_HTTPOK_RESPONSE) {
156 println!("invalid response from server: \"{}\"", response);
158 return Err(NTRIPClientError::BadResponse);
159 }
160 }
161
162 #[cfg(feature = "log")]
163 info!(
164 "rtk-rs/ntrip-client v{} - connected to {}",
165 pkg_version, self.host
166 );
167
168 loop {
169 ptr = 0;
170 let size = stream.read(&mut buffer[ptr..]).await?;
171
172 if size == 0 {
173 #[cfg(feature = "log")]
174 error!("{} - connectoion closed", self.host);
175 return Ok(());
176 }
177
178 loop {
179 let (consumed, msg) = next_msg_frame(&buffer[ptr..]);
180
181 if consumed == 0 {
182 break;
183 }
184
185 ptr += consumed;
186
187 if let Some(msg) = msg {
188 println!("Found {:?}", msg.get_message());
189 }
190 }
191 }
192 }
193}
194
195