1use crate::ntrip::chunk::ChunkedDecoder;
2use crate::ntrip::gga::{format_gga, GgaPosition};
3use crate::ntrip::request::{NtripConfig, NtripVersion};
4use crate::ntrip::response::{classify_http_response, HttpClassification, NtripRejection};
5use crate::ntrip::sourcetable::{parse_sourcetable, Sourcetable};
6use crate::Result;
7
8const MAX_LINE: usize = 8 * 1024;
9const MAX_HEADER_BLOCK: usize = 64 * 1024;
10const MAX_SOURCETABLE: usize = 4 * 1024 * 1024;
11const PREFIX_LIMIT: usize = 256;
12
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum NtripState {
15 Idle,
16 AwaitingStatus,
17 AwaitingHeaders,
18 Streaming,
19 Sourcetable,
20 Closed,
21}
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct NtripHandshake {
25 pub version: NtripVersion,
26 pub chunked: bool,
27 pub headers: Vec<(String, String)>,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum NtripEvent {
32 Connected(NtripHandshake),
33 Payload(Vec<u8>),
34 Sourcetable(Sourcetable),
35 Rejected(NtripRejection),
36 StreamCorrupted { detail: String },
37 StreamEnded,
38}
39
40#[derive(Clone, Debug)]
41pub struct NtripClientMachine {
42 config: NtripConfig,
43 state: NtripState,
44 carry: Vec<u8>,
45 headers: Vec<(String, String)>,
46 header_bytes: usize,
47 status_version: Option<NtripVersion>,
48 http_status: Option<u16>,
49 http_reason: String,
50 chunked: bool,
51 pending_icy_blank_line: bool,
52 sourcetable_chunked: bool,
53 chunk_decoder: ChunkedDecoder,
54 sourcetable_text: String,
55 sourcetable_carry: Vec<u8>,
56 sourcetable_records_started: bool,
57 last_gga_s: Option<f64>,
58}
59
60impl NtripClientMachine {
61 pub fn new(config: NtripConfig) -> Self {
62 Self {
63 config,
64 state: NtripState::Idle,
65 carry: Vec::new(),
66 headers: Vec::new(),
67 header_bytes: 0,
68 status_version: None,
69 http_status: None,
70 http_reason: String::new(),
71 chunked: false,
72 pending_icy_blank_line: false,
73 sourcetable_chunked: false,
74 chunk_decoder: ChunkedDecoder::new(),
75 sourcetable_text: String::new(),
76 sourcetable_carry: Vec::new(),
77 sourcetable_records_started: false,
78 last_gga_s: None,
79 }
80 }
81
82 pub fn connection_request(&mut self) -> Result<Vec<u8>> {
83 let bytes = self.config.request_bytes()?;
84 self.state = NtripState::AwaitingStatus;
85 Ok(bytes)
86 }
87
88 pub fn push(&mut self, bytes: &[u8]) -> Vec<NtripEvent> {
89 let mut events = Vec::new();
90 if matches!(self.state, NtripState::Closed) {
91 return events;
92 }
93 self.carry.extend_from_slice(bytes);
94
95 loop {
96 match self.state {
97 NtripState::Idle => {
98 self.state = NtripState::AwaitingStatus;
99 }
100 NtripState::AwaitingStatus => {
101 if !self.parse_status(&mut events) {
102 break;
103 }
104 }
105 NtripState::AwaitingHeaders => {
106 if !self.parse_headers(&mut events) {
107 break;
108 }
109 }
110 NtripState::Streaming => {
111 self.drain_payload(&mut events);
112 break;
113 }
114 NtripState::Sourcetable => {
115 if !self.drain_sourcetable(&mut events) {
116 break;
117 }
118 }
119 NtripState::Closed => break,
120 }
121 }
122
123 events
124 }
125
126 pub fn gga_message(
127 &mut self,
128 now_s: f64,
129 position: &GgaPosition,
130 utc_seconds_of_day: f64,
131 ) -> Option<Vec<u8>> {
132 self.try_gga_message(now_s, position, utc_seconds_of_day)
133 .ok()
134 .flatten()
135 }
136
137 pub fn try_gga_message(
138 &mut self,
139 now_s: f64,
140 position: &GgaPosition,
141 utc_seconds_of_day: f64,
142 ) -> Result<Option<Vec<u8>>> {
143 if self.state != NtripState::Streaming {
144 return Ok(None);
145 }
146 let Some(interval) = self.config.gga_interval_s else {
147 return Ok(None);
148 };
149 if !now_s.is_finite() {
150 return Ok(None);
151 }
152 let due = match self.last_gga_s {
153 None => true,
154 Some(last) if now_s >= last => now_s - last >= interval,
155 Some(_) => false,
156 };
157 if !due {
158 return Ok(None);
159 }
160 let bytes = format_gga(position, utc_seconds_of_day)?;
161 self.last_gga_s = Some(now_s);
162 Ok(Some(bytes))
163 }
164
165 pub fn state(&self) -> NtripState {
166 self.state
167 }
168
169 pub fn reset(&mut self) {
170 let config = self.config.clone();
171 *self = Self::new(config);
172 }
173
174 fn parse_status(&mut self, events: &mut Vec<NtripEvent>) -> bool {
175 let Some(line) = self.take_line_or_reject(events) else {
176 return false;
177 };
178 let text = String::from_utf8_lossy(&line).trim().to_string();
179
180 if let Some(rest) = text.strip_prefix("ERROR - ") {
181 let reason = rest.to_string();
182 let rejection = if reason.to_ascii_lowercase().contains("password") {
183 NtripRejection::Unauthorized
184 } else {
185 NtripRejection::CasterError { reason }
186 };
187 self.reject(events, rejection);
188 return true;
189 }
190
191 if let Some((status, reason)) = parse_prefixed_status(&text, "ICY") {
192 if status == 200 {
193 self.state = NtripState::Streaming;
194 self.status_version = Some(NtripVersion::Rev1);
195 self.chunked = false;
196 self.pending_icy_blank_line = true;
197 events.push(NtripEvent::Connected(NtripHandshake {
198 version: NtripVersion::Rev1,
199 chunked: false,
200 headers: Vec::new(),
201 }));
202 return true;
203 }
204 self.reject(
205 events,
206 NtripRejection::HttpError {
207 status,
208 reason: reason.to_string(),
209 },
210 );
211 return true;
212 }
213
214 if let Some((status, reason)) = parse_prefixed_status(&text, "SOURCETABLE") {
215 if status == 200 {
216 self.state = NtripState::Sourcetable;
217 self.status_version = Some(NtripVersion::Rev1);
218 return true;
219 }
220 self.reject(
221 events,
222 NtripRejection::HttpError {
223 status,
224 reason: reason.to_string(),
225 },
226 );
227 return true;
228 }
229
230 if let Some((version, status, reason)) = parse_http_status(&text) {
231 self.status_version = Some(version);
232 self.headers.clear();
233 self.header_bytes = 0;
234 self.http_status = Some(status);
235 self.http_reason = reason.to_string();
236 self.state = NtripState::AwaitingHeaders;
237
238 return true;
239 }
240
241 self.reject(
242 events,
243 NtripRejection::MalformedHandshake {
244 prefix: self.prefix_with_line(&line),
245 },
246 );
247 true
248 }
249
250 fn parse_headers(&mut self, events: &mut Vec<NtripEvent>) -> bool {
251 let Some(line) = self.take_line_or_reject(events) else {
252 return false;
253 };
254 if line.is_empty() {
255 let status = self.http_status.unwrap_or(0);
256 match classify_http_response(status, &self.http_reason, &self.headers) {
257 HttpClassification::Stream { chunked } => {
258 self.chunked = chunked;
259 self.state = NtripState::Streaming;
260 events.push(NtripEvent::Connected(NtripHandshake {
261 version: self.status_version.unwrap_or(NtripVersion::Rev2),
262 chunked,
263 headers: self.headers.clone(),
264 }));
265 true
266 }
267 HttpClassification::Sourcetable { chunked } => {
268 self.state = NtripState::Sourcetable;
269 self.sourcetable_chunked = chunked;
270 self.sourcetable_records_started = true;
271 true
272 }
273 HttpClassification::Rejection(rejection) => {
274 self.reject(events, rejection);
275 true
276 }
277 }
278 } else {
279 self.header_bytes += line.len();
280 if self.header_bytes > MAX_HEADER_BLOCK {
281 self.reject_current_prefix(events);
282 return true;
283 }
284 if let Some((name, value)) = split_header(&line) {
285 self.headers.push((name, value));
286 }
287 true
288 }
289 }
290
291 fn drain_payload(&mut self, events: &mut Vec<NtripEvent>) {
292 if self.pending_icy_blank_line && !self.consume_pending_icy_blank_line() {
293 return;
294 }
295 if self.carry.is_empty() {
296 return;
297 }
298 let bytes: Vec<u8> = self.carry.drain(..).collect();
299 if self.chunked {
300 match self.chunk_decoder.push(&bytes) {
301 Ok(payload) => {
302 if !payload.is_empty() {
303 events.push(NtripEvent::Payload(payload));
304 }
305 if self.chunk_decoder.finished() {
306 self.state = NtripState::Closed;
307 events.push(NtripEvent::StreamEnded);
308 }
309 }
310 Err(err) => {
311 self.state = NtripState::Closed;
312 events.push(NtripEvent::StreamCorrupted {
313 detail: err.to_string(),
314 });
315 }
316 }
317 } else {
318 events.push(NtripEvent::Payload(bytes));
319 }
320 }
321
322 fn drain_sourcetable(&mut self, events: &mut Vec<NtripEvent>) -> bool {
323 if self.sourcetable_chunked {
324 if !self.carry.is_empty() {
325 let bytes: Vec<u8> = self.carry.drain(..).collect();
326 match self.chunk_decoder.push(&bytes) {
327 Ok(decoded) => self.sourcetable_carry.extend_from_slice(&decoded),
328 Err(err) => {
329 self.state = NtripState::Closed;
330 events.push(NtripEvent::StreamCorrupted {
331 detail: err.to_string(),
332 });
333 return true;
334 }
335 }
336 }
337 if self.drain_sourcetable_lines(events) {
338 return true;
339 }
340 if self.chunk_decoder.finished() {
341 if !self.sourcetable_carry.is_empty() {
342 let line: Vec<u8> = self.sourcetable_carry.drain(..).collect();
343 if self.push_sourcetable_line(&line, events) {
344 return true;
345 }
346 }
347 self.finish_sourcetable(events);
348 return true;
349 }
350 false
351 } else {
352 self.drain_sourcetable_lines(events)
353 }
354 }
355
356 pub fn finish(&mut self) -> Vec<NtripEvent> {
357 let mut events = Vec::new();
358 if self.state == NtripState::Sourcetable {
359 if self.sourcetable_chunked {
360 if !self.sourcetable_carry.is_empty() {
361 let line: Vec<u8> = self.sourcetable_carry.drain(..).collect();
362 self.push_sourcetable_line(&line, &mut events);
363 }
364 } else if !self.carry.is_empty() {
365 let line: Vec<u8> = self.carry.drain(..).collect();
366 self.push_sourcetable_line(&line, &mut events);
367 }
368 self.finish_sourcetable(&mut events);
369 }
370 events
371 }
372
373 fn drain_sourcetable_lines(&mut self, events: &mut Vec<NtripEvent>) -> bool {
374 loop {
375 let line = if self.sourcetable_chunked {
376 take_line_from(&mut self.sourcetable_carry)
377 } else {
378 take_line_from(&mut self.carry)
379 };
380 let Some(line) = line else {
381 let len = if self.sourcetable_chunked {
382 self.sourcetable_carry.len()
383 } else {
384 self.carry.len()
385 };
386 if len > MAX_LINE {
387 self.reject_current_prefix(events);
388 return true;
389 }
390 return false;
391 };
392
393 if self.push_sourcetable_line(&line, events) {
394 return true;
395 }
396 }
397 }
398
399 fn push_sourcetable_line(&mut self, line: &[u8], events: &mut Vec<NtripEvent>) -> bool {
400 let text = String::from_utf8_lossy(line).to_string();
401 let first = text.split(';').next().unwrap_or("").trim();
402
403 if !self.sourcetable_records_started {
404 if text.is_empty() {
405 return false;
406 }
407 if is_sourcetable_record_start(first) {
408 self.sourcetable_records_started = true;
409 } else if text.contains(':') {
410 return false;
411 } else {
412 self.sourcetable_records_started = true;
413 }
414 }
415
416 self.sourcetable_text.push_str(&text);
417 self.sourcetable_text.push_str("\r\n");
418 if self.sourcetable_text.len() > MAX_SOURCETABLE {
419 self.reject_current_prefix(events);
420 return true;
421 }
422 if first.eq_ignore_ascii_case("ENDSOURCETABLE") {
423 self.finish_sourcetable(events);
424 return true;
425 }
426 false
427 }
428
429 fn finish_sourcetable(&mut self, events: &mut Vec<NtripEvent>) {
430 match parse_sourcetable(&self.sourcetable_text) {
431 Ok(table) => events.push(NtripEvent::Sourcetable(table)),
432 Err(err) => events.push(NtripEvent::StreamCorrupted {
433 detail: err.to_string(),
434 }),
435 }
436 self.state = NtripState::Closed;
437 }
438
439 fn take_line_or_reject(&mut self, events: &mut Vec<NtripEvent>) -> Option<Vec<u8>> {
440 if let Some(pos) = self.carry.iter().position(|&b| b == b'\n') {
441 let mut line: Vec<u8> = self.carry.drain(..=pos).collect();
442 if line.ends_with(b"\n") {
443 line.pop();
444 }
445 if line.ends_with(b"\r") {
446 line.pop();
447 }
448 Some(line)
449 } else if self.carry.len() > MAX_LINE {
450 self.reject_current_prefix(events);
451 None
452 } else {
453 None
454 }
455 }
456
457 fn consume_pending_icy_blank_line(&mut self) -> bool {
458 if self.carry.is_empty() {
459 return false;
460 }
461 if self.carry[0] == b'\n' {
462 self.carry.drain(..1);
463 self.pending_icy_blank_line = false;
464 return true;
465 }
466 if self.carry[0] == b'\r' {
467 if self.carry.len() == 1 {
468 return false;
469 }
470 if self.carry[1] == b'\n' {
471 self.carry.drain(..2);
472 }
473 self.pending_icy_blank_line = false;
474 return true;
475 }
476 self.pending_icy_blank_line = false;
477 true
478 }
479
480 fn reject_current_prefix(&mut self, events: &mut Vec<NtripEvent>) {
481 let prefix = self.carry.iter().copied().take(PREFIX_LIMIT).collect();
482 self.reject(events, NtripRejection::MalformedHandshake { prefix });
483 }
484
485 fn reject(&mut self, events: &mut Vec<NtripEvent>, rejection: NtripRejection) {
486 self.state = NtripState::Closed;
487 events.push(NtripEvent::Rejected(rejection));
488 }
489
490 fn prefix_with_line(&self, line: &[u8]) -> Vec<u8> {
491 let mut prefix = line.to_vec();
492 prefix.extend_from_slice(&self.carry);
493 prefix.truncate(PREFIX_LIMIT);
494 prefix
495 }
496}
497
498fn parse_prefixed_status<'a>(text: &'a str, prefix: &str) -> Option<(u16, &'a str)> {
499 let rest = text.strip_prefix(prefix)?.trim_start();
500 let mut parts = rest.splitn(2, char::is_whitespace);
501 let status = parts.next()?.parse().ok()?;
502 let reason = parts.next().unwrap_or("").trim();
503 Some((status, reason))
504}
505
506fn parse_http_status(text: &str) -> Option<(NtripVersion, u16, &str)> {
507 let rest = if let Some(rest) = text.strip_prefix("HTTP/1.1 ") {
508 (NtripVersion::Rev2, rest)
509 } else if let Some(rest) = text.strip_prefix("HTTP/1.0 ") {
510 (NtripVersion::Rev1, rest)
511 } else {
512 return None;
513 };
514 let mut parts = rest.1.splitn(2, char::is_whitespace);
515 let status = parts.next()?.parse().ok()?;
516 let reason = parts.next().unwrap_or("").trim();
517 Some((rest.0, status, reason))
518}
519
520fn split_header(line: &[u8]) -> Option<(String, String)> {
521 let pos = line.iter().position(|&b| b == b':')?;
522 let name = String::from_utf8_lossy(&line[..pos]).trim().to_string();
523 let value_bytes = if line.get(pos + 1) == Some(&b' ') {
524 &line[pos + 2..]
525 } else {
526 &line[pos + 1..]
527 };
528 let value = String::from_utf8_lossy(value_bytes).to_string();
529 Some((name, value))
530}
531
532fn take_line_from(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
533 let pos = buffer.iter().position(|&b| b == b'\n')?;
534 let mut line: Vec<u8> = buffer.drain(..=pos).collect();
535 if line.ends_with(b"\n") {
536 line.pop();
537 }
538 if line.ends_with(b"\r") {
539 line.pop();
540 }
541 Some(line)
542}
543
544fn is_sourcetable_record_start(first: &str) -> bool {
545 ["STR", "CAS", "NET", "ENDSOURCETABLE"]
546 .iter()
547 .any(|tag| first.eq_ignore_ascii_case(tag))
548}