Skip to main content

sidereon_core/ntrip/
machine.rs

1use crate::ntrip::chunk::ChunkedDecoder;
2use crate::ntrip::gga::{format_gga, GgaPosition};
3use crate::ntrip::request::{NtripConfig, NtripVersion};
4use crate::ntrip::response::{classify_http_response, HttpClassification, NtripRejection};
5use crate::ntrip::sourcetable::{parse_sourcetable, Sourcetable};
6use crate::Result;
7
8const MAX_LINE: usize = 8 * 1024;
9const MAX_HEADER_BLOCK: usize = 64 * 1024;
10const MAX_SOURCETABLE: usize = 4 * 1024 * 1024;
11const PREFIX_LIMIT: usize = 256;
12
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum NtripState {
15    Idle,
16    AwaitingStatus,
17    AwaitingHeaders,
18    Streaming,
19    Sourcetable,
20    Closed,
21}
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub struct NtripHandshake {
25    pub version: NtripVersion,
26    pub chunked: bool,
27    pub headers: Vec<(String, String)>,
28}
29
30#[derive(Clone, Debug, PartialEq)]
31pub enum NtripEvent {
32    Connected(NtripHandshake),
33    Payload(Vec<u8>),
34    Sourcetable(Sourcetable),
35    Rejected(NtripRejection),
36    StreamCorrupted { detail: String },
37    StreamEnded,
38}
39
40#[derive(Clone, Debug)]
41pub struct NtripClientMachine {
42    config: NtripConfig,
43    state: NtripState,
44    carry: Vec<u8>,
45    headers: Vec<(String, String)>,
46    header_bytes: usize,
47    status_version: Option<NtripVersion>,
48    http_status: Option<u16>,
49    http_reason: String,
50    chunked: bool,
51    pending_icy_blank_line: bool,
52    sourcetable_chunked: bool,
53    chunk_decoder: ChunkedDecoder,
54    sourcetable_text: String,
55    sourcetable_carry: Vec<u8>,
56    sourcetable_records_started: bool,
57    last_gga_s: Option<f64>,
58}
59
60impl NtripClientMachine {
61    pub fn new(config: NtripConfig) -> Self {
62        Self {
63            config,
64            state: NtripState::Idle,
65            carry: Vec::new(),
66            headers: Vec::new(),
67            header_bytes: 0,
68            status_version: None,
69            http_status: None,
70            http_reason: String::new(),
71            chunked: false,
72            pending_icy_blank_line: false,
73            sourcetable_chunked: false,
74            chunk_decoder: ChunkedDecoder::new(),
75            sourcetable_text: String::new(),
76            sourcetable_carry: Vec::new(),
77            sourcetable_records_started: false,
78            last_gga_s: None,
79        }
80    }
81
82    pub fn connection_request(&mut self) -> Result<Vec<u8>> {
83        let bytes = self.config.request_bytes()?;
84        self.state = NtripState::AwaitingStatus;
85        Ok(bytes)
86    }
87
88    pub fn push(&mut self, bytes: &[u8]) -> Vec<NtripEvent> {
89        let mut events = Vec::new();
90        if matches!(self.state, NtripState::Closed) {
91            return events;
92        }
93        self.carry.extend_from_slice(bytes);
94
95        loop {
96            match self.state {
97                NtripState::Idle => {
98                    self.state = NtripState::AwaitingStatus;
99                }
100                NtripState::AwaitingStatus => {
101                    if !self.parse_status(&mut events) {
102                        break;
103                    }
104                }
105                NtripState::AwaitingHeaders => {
106                    if !self.parse_headers(&mut events) {
107                        break;
108                    }
109                }
110                NtripState::Streaming => {
111                    self.drain_payload(&mut events);
112                    break;
113                }
114                NtripState::Sourcetable => {
115                    if !self.drain_sourcetable(&mut events) {
116                        break;
117                    }
118                }
119                NtripState::Closed => break,
120            }
121        }
122
123        events
124    }
125
126    pub fn gga_message(
127        &mut self,
128        now_s: f64,
129        position: &GgaPosition,
130        utc_seconds_of_day: f64,
131    ) -> Option<Vec<u8>> {
132        self.try_gga_message(now_s, position, utc_seconds_of_day)
133            .ok()
134            .flatten()
135    }
136
137    pub fn try_gga_message(
138        &mut self,
139        now_s: f64,
140        position: &GgaPosition,
141        utc_seconds_of_day: f64,
142    ) -> Result<Option<Vec<u8>>> {
143        if self.state != NtripState::Streaming {
144            return Ok(None);
145        }
146        let Some(interval) = self.config.gga_interval_s else {
147            return Ok(None);
148        };
149        if !now_s.is_finite() {
150            return Ok(None);
151        }
152        let due = match self.last_gga_s {
153            None => true,
154            Some(last) if now_s >= last => now_s - last >= interval,
155            Some(_) => false,
156        };
157        if !due {
158            return Ok(None);
159        }
160        let bytes = format_gga(position, utc_seconds_of_day)?;
161        self.last_gga_s = Some(now_s);
162        Ok(Some(bytes))
163    }
164
165    pub fn state(&self) -> NtripState {
166        self.state
167    }
168
169    pub fn reset(&mut self) {
170        let config = self.config.clone();
171        *self = Self::new(config);
172    }
173
174    fn parse_status(&mut self, events: &mut Vec<NtripEvent>) -> bool {
175        let Some(line) = self.take_line_or_reject(events) else {
176            return false;
177        };
178        let text = String::from_utf8_lossy(&line).trim().to_string();
179
180        if let Some(rest) = text.strip_prefix("ERROR - ") {
181            let reason = rest.to_string();
182            let rejection = if reason.to_ascii_lowercase().contains("password") {
183                NtripRejection::Unauthorized
184            } else {
185                NtripRejection::CasterError { reason }
186            };
187            self.reject(events, rejection);
188            return true;
189        }
190
191        if let Some((status, reason)) = parse_prefixed_status(&text, "ICY") {
192            if status == 200 {
193                self.state = NtripState::Streaming;
194                self.status_version = Some(NtripVersion::Rev1);
195                self.chunked = false;
196                self.pending_icy_blank_line = true;
197                events.push(NtripEvent::Connected(NtripHandshake {
198                    version: NtripVersion::Rev1,
199                    chunked: false,
200                    headers: Vec::new(),
201                }));
202                return true;
203            }
204            self.reject(
205                events,
206                NtripRejection::HttpError {
207                    status,
208                    reason: reason.to_string(),
209                },
210            );
211            return true;
212        }
213
214        if let Some((status, reason)) = parse_prefixed_status(&text, "SOURCETABLE") {
215            if status == 200 {
216                self.state = NtripState::Sourcetable;
217                self.status_version = Some(NtripVersion::Rev1);
218                return true;
219            }
220            self.reject(
221                events,
222                NtripRejection::HttpError {
223                    status,
224                    reason: reason.to_string(),
225                },
226            );
227            return true;
228        }
229
230        if let Some((version, status, reason)) = parse_http_status(&text) {
231            self.status_version = Some(version);
232            self.headers.clear();
233            self.header_bytes = 0;
234            self.http_status = Some(status);
235            self.http_reason = reason.to_string();
236            self.state = NtripState::AwaitingHeaders;
237
238            return true;
239        }
240
241        self.reject(
242            events,
243            NtripRejection::MalformedHandshake {
244                prefix: self.prefix_with_line(&line),
245            },
246        );
247        true
248    }
249
250    fn parse_headers(&mut self, events: &mut Vec<NtripEvent>) -> bool {
251        let Some(line) = self.take_line_or_reject(events) else {
252            return false;
253        };
254        if line.is_empty() {
255            let status = self.http_status.unwrap_or(0);
256            match classify_http_response(status, &self.http_reason, &self.headers) {
257                HttpClassification::Stream { chunked } => {
258                    self.chunked = chunked;
259                    self.state = NtripState::Streaming;
260                    events.push(NtripEvent::Connected(NtripHandshake {
261                        version: self.status_version.unwrap_or(NtripVersion::Rev2),
262                        chunked,
263                        headers: self.headers.clone(),
264                    }));
265                    true
266                }
267                HttpClassification::Sourcetable { chunked } => {
268                    self.state = NtripState::Sourcetable;
269                    self.sourcetable_chunked = chunked;
270                    self.sourcetable_records_started = true;
271                    true
272                }
273                HttpClassification::Rejection(rejection) => {
274                    self.reject(events, rejection);
275                    true
276                }
277            }
278        } else {
279            self.header_bytes += line.len();
280            if self.header_bytes > MAX_HEADER_BLOCK {
281                self.reject_current_prefix(events);
282                return true;
283            }
284            if let Some((name, value)) = split_header(&line) {
285                self.headers.push((name, value));
286            }
287            true
288        }
289    }
290
291    fn drain_payload(&mut self, events: &mut Vec<NtripEvent>) {
292        if self.pending_icy_blank_line && !self.consume_pending_icy_blank_line() {
293            return;
294        }
295        if self.carry.is_empty() {
296            return;
297        }
298        let bytes: Vec<u8> = self.carry.drain(..).collect();
299        if self.chunked {
300            match self.chunk_decoder.push(&bytes) {
301                Ok(payload) => {
302                    if !payload.is_empty() {
303                        events.push(NtripEvent::Payload(payload));
304                    }
305                    if self.chunk_decoder.finished() {
306                        self.state = NtripState::Closed;
307                        events.push(NtripEvent::StreamEnded);
308                    }
309                }
310                Err(err) => {
311                    self.state = NtripState::Closed;
312                    events.push(NtripEvent::StreamCorrupted {
313                        detail: err.to_string(),
314                    });
315                }
316            }
317        } else {
318            events.push(NtripEvent::Payload(bytes));
319        }
320    }
321
322    fn drain_sourcetable(&mut self, events: &mut Vec<NtripEvent>) -> bool {
323        if self.sourcetable_chunked {
324            if !self.carry.is_empty() {
325                let bytes: Vec<u8> = self.carry.drain(..).collect();
326                match self.chunk_decoder.push(&bytes) {
327                    Ok(decoded) => self.sourcetable_carry.extend_from_slice(&decoded),
328                    Err(err) => {
329                        self.state = NtripState::Closed;
330                        events.push(NtripEvent::StreamCorrupted {
331                            detail: err.to_string(),
332                        });
333                        return true;
334                    }
335                }
336            }
337            if self.drain_sourcetable_lines(events) {
338                return true;
339            }
340            if self.chunk_decoder.finished() {
341                if !self.sourcetable_carry.is_empty() {
342                    let line: Vec<u8> = self.sourcetable_carry.drain(..).collect();
343                    if self.push_sourcetable_line(&line, events) {
344                        return true;
345                    }
346                }
347                self.finish_sourcetable(events);
348                return true;
349            }
350            false
351        } else {
352            self.drain_sourcetable_lines(events)
353        }
354    }
355
356    pub fn finish(&mut self) -> Vec<NtripEvent> {
357        let mut events = Vec::new();
358        if self.state == NtripState::Sourcetable {
359            if self.sourcetable_chunked {
360                if !self.sourcetable_carry.is_empty() {
361                    let line: Vec<u8> = self.sourcetable_carry.drain(..).collect();
362                    self.push_sourcetable_line(&line, &mut events);
363                }
364            } else if !self.carry.is_empty() {
365                let line: Vec<u8> = self.carry.drain(..).collect();
366                self.push_sourcetable_line(&line, &mut events);
367            }
368            self.finish_sourcetable(&mut events);
369        }
370        events
371    }
372
373    fn drain_sourcetable_lines(&mut self, events: &mut Vec<NtripEvent>) -> bool {
374        loop {
375            let line = if self.sourcetable_chunked {
376                take_line_from(&mut self.sourcetable_carry)
377            } else {
378                take_line_from(&mut self.carry)
379            };
380            let Some(line) = line else {
381                let len = if self.sourcetable_chunked {
382                    self.sourcetable_carry.len()
383                } else {
384                    self.carry.len()
385                };
386                if len > MAX_LINE {
387                    self.reject_current_prefix(events);
388                    return true;
389                }
390                return false;
391            };
392
393            if self.push_sourcetable_line(&line, events) {
394                return true;
395            }
396        }
397    }
398
399    fn push_sourcetable_line(&mut self, line: &[u8], events: &mut Vec<NtripEvent>) -> bool {
400        let text = String::from_utf8_lossy(line).to_string();
401        let first = text.split(';').next().unwrap_or("").trim();
402
403        if !self.sourcetable_records_started {
404            if text.is_empty() {
405                return false;
406            }
407            if is_sourcetable_record_start(first) {
408                self.sourcetable_records_started = true;
409            } else if text.contains(':') {
410                return false;
411            } else {
412                self.sourcetable_records_started = true;
413            }
414        }
415
416        self.sourcetable_text.push_str(&text);
417        self.sourcetable_text.push_str("\r\n");
418        if self.sourcetable_text.len() > MAX_SOURCETABLE {
419            self.reject_current_prefix(events);
420            return true;
421        }
422        if first.eq_ignore_ascii_case("ENDSOURCETABLE") {
423            self.finish_sourcetable(events);
424            return true;
425        }
426        false
427    }
428
429    fn finish_sourcetable(&mut self, events: &mut Vec<NtripEvent>) {
430        match parse_sourcetable(&self.sourcetable_text) {
431            Ok(table) => events.push(NtripEvent::Sourcetable(table)),
432            Err(err) => events.push(NtripEvent::StreamCorrupted {
433                detail: err.to_string(),
434            }),
435        }
436        self.state = NtripState::Closed;
437    }
438
439    fn take_line_or_reject(&mut self, events: &mut Vec<NtripEvent>) -> Option<Vec<u8>> {
440        if let Some(pos) = self.carry.iter().position(|&b| b == b'\n') {
441            let mut line: Vec<u8> = self.carry.drain(..=pos).collect();
442            if line.ends_with(b"\n") {
443                line.pop();
444            }
445            if line.ends_with(b"\r") {
446                line.pop();
447            }
448            Some(line)
449        } else if self.carry.len() > MAX_LINE {
450            self.reject_current_prefix(events);
451            None
452        } else {
453            None
454        }
455    }
456
457    fn consume_pending_icy_blank_line(&mut self) -> bool {
458        if self.carry.is_empty() {
459            return false;
460        }
461        if self.carry[0] == b'\n' {
462            self.carry.drain(..1);
463            self.pending_icy_blank_line = false;
464            return true;
465        }
466        if self.carry[0] == b'\r' {
467            if self.carry.len() == 1 {
468                return false;
469            }
470            if self.carry[1] == b'\n' {
471                self.carry.drain(..2);
472            }
473            self.pending_icy_blank_line = false;
474            return true;
475        }
476        self.pending_icy_blank_line = false;
477        true
478    }
479
480    fn reject_current_prefix(&mut self, events: &mut Vec<NtripEvent>) {
481        let prefix = self.carry.iter().copied().take(PREFIX_LIMIT).collect();
482        self.reject(events, NtripRejection::MalformedHandshake { prefix });
483    }
484
485    fn reject(&mut self, events: &mut Vec<NtripEvent>, rejection: NtripRejection) {
486        self.state = NtripState::Closed;
487        events.push(NtripEvent::Rejected(rejection));
488    }
489
490    fn prefix_with_line(&self, line: &[u8]) -> Vec<u8> {
491        let mut prefix = line.to_vec();
492        prefix.extend_from_slice(&self.carry);
493        prefix.truncate(PREFIX_LIMIT);
494        prefix
495    }
496}
497
498fn parse_prefixed_status<'a>(text: &'a str, prefix: &str) -> Option<(u16, &'a str)> {
499    let rest = text.strip_prefix(prefix)?.trim_start();
500    let mut parts = rest.splitn(2, char::is_whitespace);
501    let status = parts.next()?.parse().ok()?;
502    let reason = parts.next().unwrap_or("").trim();
503    Some((status, reason))
504}
505
506fn parse_http_status(text: &str) -> Option<(NtripVersion, u16, &str)> {
507    let rest = if let Some(rest) = text.strip_prefix("HTTP/1.1 ") {
508        (NtripVersion::Rev2, rest)
509    } else if let Some(rest) = text.strip_prefix("HTTP/1.0 ") {
510        (NtripVersion::Rev1, rest)
511    } else {
512        return None;
513    };
514    let mut parts = rest.1.splitn(2, char::is_whitespace);
515    let status = parts.next()?.parse().ok()?;
516    let reason = parts.next().unwrap_or("").trim();
517    Some((rest.0, status, reason))
518}
519
520fn split_header(line: &[u8]) -> Option<(String, String)> {
521    let pos = line.iter().position(|&b| b == b':')?;
522    let name = String::from_utf8_lossy(&line[..pos]).trim().to_string();
523    let value_bytes = if line.get(pos + 1) == Some(&b' ') {
524        &line[pos + 2..]
525    } else {
526        &line[pos + 1..]
527    };
528    let value = String::from_utf8_lossy(value_bytes).to_string();
529    Some((name, value))
530}
531
532fn take_line_from(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
533    let pos = buffer.iter().position(|&b| b == b'\n')?;
534    let mut line: Vec<u8> = buffer.drain(..=pos).collect();
535    if line.ends_with(b"\n") {
536        line.pop();
537    }
538    if line.ends_with(b"\r") {
539        line.pop();
540    }
541    Some(line)
542}
543
544fn is_sourcetable_record_start(first: &str) -> bool {
545    ["STR", "CAS", "NET", "ENDSOURCETABLE"]
546        .iter()
547        .any(|tag| first.eq_ignore_ascii_case(tag))
548}