Skip to main content

sidereon_core/nmea/
epoch.rs

1use std::collections::BTreeSet;
2
3use super::{
4    Gga, Gll, Gsa, Gst, Gsv, GsvSatellite, NmeaBody, NmeaDate, NmeaSatNumber, NmeaSentence,
5    NmeaSignalId, NmeaTalker, NmeaTime, Rmc, Vtg, Zda,
6};
7use crate::format::{Diagnostics, Parsed, RecordRef, Warning, WarningKind};
8
9const RETAINED_CAP: usize = 1024;
10
11#[derive(Debug, Clone, PartialEq)]
12pub struct EpochSnapshot {
13    pub time_of_day: Option<NmeaTime>,
14    pub date: Option<NmeaDate>,
15    pub gga: Option<Gga>,
16    pub rmc: Option<Rmc>,
17    pub gll: Option<Gll>,
18    pub gst: Option<Gst>,
19    pub vtg: Option<Vtg>,
20    pub zda: Option<Zda>,
21    pub gsa: Vec<GsaEntry>,
22    pub gsv: Vec<GsvGroup>,
23    pub sentence_count: usize,
24    pub diagnostics: Diagnostics,
25}
26
27#[derive(Debug, Clone, PartialEq)]
28pub struct GsaEntry {
29    pub system: Option<crate::GnssSystem>,
30    pub gsa: Gsa,
31}
32
33#[derive(Debug, Clone, PartialEq)]
34pub struct GsvGroup {
35    pub talker: NmeaTalker,
36    pub signal: Option<NmeaSignalId>,
37    pub claimed_in_view: Option<u16>,
38    pub satellites: Vec<GsvSatellite>,
39    pub complete: bool,
40}
41
42impl EpochSnapshot {
43    fn empty(date: Option<NmeaDate>) -> Self {
44        Self {
45            time_of_day: None,
46            date,
47            gga: None,
48            rmc: None,
49            gll: None,
50            gst: None,
51            vtg: None,
52            zda: None,
53            gsa: Vec::new(),
54            gsv: Vec::new(),
55            sentence_count: 0,
56            diagnostics: Diagnostics::new(),
57        }
58    }
59
60    pub fn position(&self) -> Option<crate::Wgs84Geodetic> {
61        if let Some(gga) = &self.gga {
62            let latitude = gga.latitude?;
63            let longitude = gga.longitude?;
64            let altitude_msl_m = gga.altitude_msl_m?;
65            let geoid_separation_m = gga.geoid_separation_m?;
66            return crate::Wgs84Geodetic::new(
67                latitude.radians(),
68                longitude.radians(),
69                altitude_msl_m + geoid_separation_m,
70            )
71            .ok();
72        }
73        if let Some(rmc) = &self.rmc {
74            return crate::Wgs84Geodetic::new(
75                rmc.latitude?.radians(),
76                rmc.longitude?.radians(),
77                0.0,
78            )
79            .ok();
80        }
81        let gll = self.gll.as_ref()?;
82        crate::Wgs84Geodetic::new(gll.latitude?.radians(), gll.longitude?.radians(), 0.0).ok()
83    }
84
85    pub fn instant_utc(&self) -> Option<crate::astro::time::Instant> {
86        let date = self.date?;
87        let time = self.time_of_day?;
88        let second = f64::from(time.second) + f64::from(time.nanos) * 1.0e-9;
89        crate::astro::time::Instant::from_utc_civil(
90            i32::from(date.year),
91            i32::from(date.month),
92            i32::from(date.day),
93            i32::from(time.hour),
94            i32::from(time.minute),
95            second,
96        )
97        .ok()
98    }
99
100    pub fn pdop(&self) -> Option<f64> {
101        self.gsa.iter().find_map(|entry| entry.gsa.pdop)
102    }
103
104    pub fn hdop(&self) -> Option<f64> {
105        self.gsa.iter().find_map(|entry| entry.gsa.hdop)
106    }
107
108    pub fn vdop(&self) -> Option<f64> {
109        self.gsa.iter().find_map(|entry| entry.gsa.vdop)
110    }
111
112    pub fn used_satellites(&self) -> impl Iterator<Item = &NmeaSatNumber> {
113        self.gsa
114            .iter()
115            .flat_map(|entry| entry.gsa.satellites.iter())
116    }
117
118    pub fn satellites_in_view(&self) -> usize {
119        let mut seen = BTreeSet::new();
120        for group in &self.gsv {
121            for sat in &group.satellites {
122                if let Some(number) = sat.sat_number {
123                    seen.insert((number.resolved, number.raw));
124                }
125            }
126        }
127        seen.len()
128    }
129}
130
131#[derive(Debug, Clone, PartialEq)]
132struct OpenEpoch {
133    snapshot: EpochSnapshot,
134    gsv_progress: Vec<GsvProgress>,
135}
136
137#[derive(Debug, Clone, PartialEq)]
138struct GsvProgress {
139    talker: NmeaTalker,
140    signal: Option<NmeaSignalId>,
141    total: u8,
142    next_expected: u8,
143}
144
145#[derive(Debug, Clone, PartialEq)]
146pub struct NmeaAccumulator {
147    current: Option<OpenEpoch>,
148    carried_date: Option<NmeaDate>,
149    previous_anchor: Option<NmeaTime>,
150    max_sentences_per_epoch: usize,
151    retained: Vec<u8>,
152    next_line: usize,
153}
154
155impl Default for NmeaAccumulator {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161impl NmeaAccumulator {
162    pub fn new() -> Self {
163        Self {
164            current: None,
165            carried_date: None,
166            previous_anchor: None,
167            max_sentences_per_epoch: 256,
168            retained: Vec::new(),
169            next_line: 1,
170        }
171    }
172
173    pub fn with_date(date: NmeaDate) -> Self {
174        Self {
175            carried_date: Some(date),
176            ..Self::new()
177        }
178    }
179
180    pub fn with_max_sentences_per_epoch(mut self, max: usize) -> Self {
181        self.max_sentences_per_epoch = max.max(16);
182        self
183    }
184
185    pub fn push(&mut self, sentence: &NmeaSentence) -> Option<EpochSnapshot> {
186        let incoming_time = sentence_time(sentence);
187        let mut new_epoch_warning = false;
188        let mut completed = None;
189
190        if let Some(current) = &mut self.current {
191            if let (Some(current_time), Some(incoming)) =
192                (current.snapshot.time_of_day, incoming_time)
193            {
194                if current_time.key() != incoming.key() {
195                    completed = self.current.take().map(|epoch| epoch.snapshot);
196                    self.apply_boundary_date_policy(current_time, incoming, &mut new_epoch_warning);
197                }
198            }
199        }
200
201        if completed.is_none() && self.gsv_cycle_boundary(sentence) {
202            completed = self.current.take().map(|epoch| epoch.snapshot);
203        }
204
205        if completed.is_none()
206            && self
207                .current
208                .as_ref()
209                .is_some_and(|epoch| epoch.snapshot.sentence_count >= self.max_sentences_per_epoch)
210        {
211            if let Some(mut epoch) = self.current.take() {
212                epoch.snapshot.diagnostics.push_warning(Warning {
213                    at: RecordRef::default(),
214                    kind: WarningKind::Mismatch,
215                });
216                completed = Some(epoch.snapshot);
217            }
218        }
219
220        if self.current.is_none() {
221            self.current = Some(OpenEpoch {
222                snapshot: EpochSnapshot::empty(self.carried_date),
223                gsv_progress: Vec::new(),
224            });
225            if new_epoch_warning {
226                self.warn_current();
227            }
228        }
229        self.attach(sentence);
230        completed
231    }
232
233    pub fn push_bytes(&mut self, chunk: &[u8]) -> NmeaChunkOutput {
234        self.retained.extend_from_slice(chunk);
235        let mut output = NmeaChunkOutput::default();
236        loop {
237            let Some((pos, term_len)) = next_line_end(&self.retained) else {
238                if self.retained.len() > RETAINED_CAP {
239                    super::push_error_skip_at(
240                        &mut output.diagnostics,
241                        super::NmeaError::NotFramed {
242                            reason: "sentence over length cap",
243                        },
244                        RecordRef::at_line(self.next_line),
245                    );
246                    self.retained.clear();
247                    self.next_line += 1;
248                }
249                break;
250            };
251            let line = self.retained.drain(..pos).collect::<Vec<_>>();
252            self.retained.drain(..term_len);
253            let line_no = self.next_line;
254            self.next_line += 1;
255            push_line(self, &line, line_no, &mut output);
256        }
257        output
258    }
259
260    pub fn finish(&mut self) -> Option<EpochSnapshot> {
261        if !self.retained.is_empty() {
262            let line = self.retained.drain(..).collect::<Vec<_>>();
263            let mut output = NmeaChunkOutput::default();
264            push_line(self, &line, self.next_line, &mut output);
265            self.next_line += 1;
266        }
267        self.current.take().map(|epoch| epoch.snapshot)
268    }
269
270    pub fn retained_len(&self) -> usize {
271        self.retained.len()
272    }
273
274    fn attach(&mut self, sentence: &NmeaSentence) {
275        let time = sentence_time(sentence);
276        let date = sentence_date(sentence);
277        if let Some(date) = date {
278            self.carried_date = Some(date);
279        }
280        let epoch = self.current.as_mut().expect("epoch is open");
281        if epoch.snapshot.time_of_day.is_none() {
282            epoch.snapshot.time_of_day = time;
283        }
284        if let Some(date) = date {
285            if epoch.snapshot.date.is_some_and(|existing| existing != date) {
286                epoch.snapshot.diagnostics.push_warning(Warning {
287                    at: RecordRef::default(),
288                    kind: WarningKind::Mismatch,
289                });
290            }
291            epoch.snapshot.date = Some(date);
292        } else if epoch.snapshot.date.is_none() {
293            epoch.snapshot.date = self.carried_date;
294        }
295        epoch.snapshot.sentence_count += 1;
296
297        match &sentence.body {
298            NmeaBody::Gga(gga) => attach_single(
299                &mut epoch.snapshot.gga,
300                gga,
301                &mut epoch.snapshot.diagnostics,
302            ),
303            NmeaBody::Rmc(rmc) => attach_single(
304                &mut epoch.snapshot.rmc,
305                rmc,
306                &mut epoch.snapshot.diagnostics,
307            ),
308            NmeaBody::Gll(gll) => attach_single(
309                &mut epoch.snapshot.gll,
310                gll,
311                &mut epoch.snapshot.diagnostics,
312            ),
313            NmeaBody::Gst(gst) => attach_single(
314                &mut epoch.snapshot.gst,
315                gst,
316                &mut epoch.snapshot.diagnostics,
317            ),
318            NmeaBody::Vtg(vtg) => attach_single(
319                &mut epoch.snapshot.vtg,
320                vtg,
321                &mut epoch.snapshot.diagnostics,
322            ),
323            NmeaBody::Zda(zda) => attach_single(
324                &mut epoch.snapshot.zda,
325                zda,
326                &mut epoch.snapshot.diagnostics,
327            ),
328            NmeaBody::Gsa(gsa) => attach_gsa(&mut epoch.snapshot, gsa),
329            NmeaBody::Gsv(gsv) => attach_gsv(
330                sentence.talker,
331                &mut epoch.snapshot,
332                &mut epoch.gsv_progress,
333                gsv,
334            ),
335        }
336    }
337
338    fn warn_current(&mut self) {
339        if let Some(current) = &mut self.current {
340            current.snapshot.diagnostics.push_warning(Warning {
341                at: RecordRef::default(),
342                kind: WarningKind::Mismatch,
343            });
344        }
345    }
346
347    fn gsv_cycle_boundary(&self, sentence: &NmeaSentence) -> bool {
348        let NmeaBody::Gsv(gsv) = &sentence.body else {
349            return false;
350        };
351        if gsv.message_number != 1 {
352            return false;
353        }
354        self.current.as_ref().is_some_and(|epoch| {
355            epoch.snapshot.gsv.iter().any(|group| {
356                group.talker == sentence.talker && group.signal == gsv.signal && group.complete
357            })
358        })
359    }
360
361    fn apply_boundary_date_policy(
362        &mut self,
363        current: NmeaTime,
364        incoming: NmeaTime,
365        warning: &mut bool,
366    ) {
367        self.previous_anchor = Some(current);
368        let current_ns = time_of_day_ns(current);
369        let incoming_ns = time_of_day_ns(incoming);
370        if current_ns > incoming_ns {
371            let delta = current_ns - incoming_ns;
372            if delta > 43_200_000_000_000 {
373                if let Some(date) = self.carried_date {
374                    self.carried_date = Some(date.next_day());
375                }
376            } else {
377                *warning = true;
378            }
379        }
380    }
381}
382
383fn attach_single<T: Clone>(slot: &mut Option<T>, value: &T, diagnostics: &mut Diagnostics) {
384    if slot.is_none() {
385        *slot = Some(value.clone());
386    } else {
387        diagnostics.push_warning(Warning {
388            at: RecordRef::default(),
389            kind: WarningKind::Mismatch,
390        });
391    }
392}
393
394fn attach_gsa(snapshot: &mut EpochSnapshot, gsa: &Gsa) {
395    if let Some(existing) = snapshot
396        .gsa
397        .iter()
398        .find(|entry| entry.system.is_some() && entry.system == gsa.system)
399    {
400        if existing.gsa.pdop != gsa.pdop
401            || existing.gsa.hdop != gsa.hdop
402            || existing.gsa.vdop != gsa.vdop
403        {
404            snapshot.diagnostics.push_warning(Warning {
405                at: RecordRef::default(),
406                kind: WarningKind::Mismatch,
407            });
408        }
409        snapshot.diagnostics.push_warning(Warning {
410            at: RecordRef::default(),
411            kind: WarningKind::Mismatch,
412        });
413        return;
414    }
415    if let Some(first) = snapshot.gsa.first() {
416        if differing(first.gsa.pdop, gsa.pdop)
417            || differing(first.gsa.hdop, gsa.hdop)
418            || differing(first.gsa.vdop, gsa.vdop)
419        {
420            snapshot.diagnostics.push_warning(Warning {
421                at: RecordRef::default(),
422                kind: WarningKind::Mismatch,
423            });
424        }
425    }
426    snapshot.gsa.push(GsaEntry {
427        system: gsa.system,
428        gsa: gsa.clone(),
429    });
430}
431
432fn attach_gsv(
433    talker: NmeaTalker,
434    snapshot: &mut EpochSnapshot,
435    progress: &mut Vec<GsvProgress>,
436    gsv: &Gsv,
437) {
438    let group_index = snapshot
439        .gsv
440        .iter()
441        .position(|group| group.talker == talker && group.signal == gsv.signal);
442    let progress_index = progress
443        .iter()
444        .position(|entry| entry.talker == talker && entry.signal == gsv.signal);
445    match (group_index, progress_index) {
446        (Some(group_index), Some(progress_index)) => {
447            let expected = progress[progress_index].next_expected;
448            let total = progress[progress_index].total;
449            if gsv.total_messages != total || gsv.message_number != expected {
450                snapshot.diagnostics.push_warning(Warning {
451                    at: RecordRef::default(),
452                    kind: WarningKind::Mismatch,
453                });
454                snapshot.gsv[group_index].satellites = gsv.satellites.clone();
455                snapshot.gsv[group_index].claimed_in_view = gsv.satellites_in_view;
456                snapshot.gsv[group_index].complete = gsv.message_number == gsv.total_messages;
457                progress[progress_index] = GsvProgress {
458                    talker,
459                    signal: gsv.signal,
460                    total: gsv.total_messages,
461                    next_expected: gsv.message_number.saturating_add(1),
462                };
463            } else {
464                snapshot.gsv[group_index]
465                    .satellites
466                    .extend(gsv.satellites.clone());
467                snapshot.gsv[group_index].complete = gsv.message_number == total;
468                progress[progress_index].next_expected = expected.saturating_add(1);
469            }
470            if snapshot.gsv[group_index].complete {
471                check_gsv_count(snapshot, group_index);
472            }
473        }
474        _ => {
475            snapshot.gsv.push(GsvGroup {
476                talker,
477                signal: gsv.signal,
478                claimed_in_view: gsv.satellites_in_view,
479                satellites: gsv.satellites.clone(),
480                complete: gsv.message_number == gsv.total_messages,
481            });
482            progress.push(GsvProgress {
483                talker,
484                signal: gsv.signal,
485                total: gsv.total_messages,
486                next_expected: gsv.message_number.saturating_add(1),
487            });
488            if snapshot.gsv.last().is_some_and(|group| group.complete) {
489                let index = snapshot.gsv.len() - 1;
490                check_gsv_count(snapshot, index);
491            }
492        }
493    }
494}
495
496fn check_gsv_count(snapshot: &mut EpochSnapshot, group_index: usize) {
497    let group = &snapshot.gsv[group_index];
498    if let Some(claimed) = group.claimed_in_view {
499        let listed = group
500            .satellites
501            .iter()
502            .filter(|sat| sat.sat_number.is_some())
503            .count();
504        if usize::from(claimed) != listed {
505            snapshot.diagnostics.push_warning(Warning {
506                at: RecordRef::default(),
507                kind: WarningKind::Mismatch,
508            });
509        }
510    }
511}
512
513fn differing(a: Option<f64>, b: Option<f64>) -> bool {
514    matches!((a, b), (Some(a), Some(b)) if a != b)
515}
516
517fn sentence_time(sentence: &NmeaSentence) -> Option<NmeaTime> {
518    match &sentence.body {
519        NmeaBody::Gga(gga) => gga.time,
520        NmeaBody::Rmc(rmc) => rmc.time,
521        NmeaBody::Gll(gll) => gll.time,
522        NmeaBody::Gst(gst) => gst.time,
523        NmeaBody::Zda(zda) => zda.time,
524        NmeaBody::Gsa(_) | NmeaBody::Gsv(_) | NmeaBody::Vtg(_) => None,
525    }
526}
527
528fn sentence_date(sentence: &NmeaSentence) -> Option<NmeaDate> {
529    match &sentence.body {
530        NmeaBody::Rmc(rmc) => rmc.date,
531        NmeaBody::Zda(zda) => zda.date,
532        _ => None,
533    }
534}
535
536fn time_of_day_ns(time: NmeaTime) -> u64 {
537    ((u64::from(time.hour) * 3600 + u64::from(time.minute) * 60 + u64::from(time.second))
538        * 1_000_000_000)
539        + u64::from(time.nanos)
540}
541
542fn next_line_end(buffer: &[u8]) -> Option<(usize, usize)> {
543    let pos = buffer.iter().position(|&b| b == b'\n' || b == b'\r')?;
544    let term_len = if buffer[pos] == b'\r' && buffer.get(pos + 1) == Some(&b'\n') {
545        2
546    } else {
547        1
548    };
549    Some((pos, term_len))
550}
551
552fn push_line(
553    accumulator: &mut NmeaAccumulator,
554    line: &[u8],
555    line_number: usize,
556    output: &mut NmeaChunkOutput,
557) {
558    if line.is_empty() {
559        return;
560    }
561    let parsed = match std::str::from_utf8(line) {
562        Ok(line) => super::parse_sentence(line),
563        Err(_) => {
564            super::push_error_skip_at(
565                &mut output.diagnostics,
566                super::NmeaError::NotFramed {
567                    reason: "non-ASCII byte",
568                },
569                RecordRef::at_line(line_number),
570            );
571            return;
572        }
573    };
574    match parsed {
575        Ok(Parsed {
576            value,
577            mut diagnostics,
578        }) => {
579            super::set_diagnostic_lines(&mut diagnostics, line_number);
580            super::merge_diagnostics(&mut output.diagnostics, diagnostics);
581            if let Some(snapshot) = accumulator.push(&value) {
582                output.snapshots.push(snapshot);
583            }
584            output.sentences.push(value);
585        }
586        Err(error) => super::push_error_skip_at(
587            &mut output.diagnostics,
588            error,
589            RecordRef::at_line(line_number),
590        ),
591    }
592}
593
594#[derive(Debug, Clone, PartialEq, Default)]
595pub struct NmeaChunkOutput {
596    pub snapshots: Vec<EpochSnapshot>,
597    pub sentences: Vec<NmeaSentence>,
598    pub diagnostics: Diagnostics,
599}