1use std::collections::BTreeSet;
2
3use super::{
4 Gga, Gll, Gsa, Gst, Gsv, GsvSatellite, NmeaBody, NmeaDate, NmeaSatNumber, NmeaSentence,
5 NmeaSignalId, NmeaTalker, NmeaTime, Rmc, Vtg, Zda,
6};
7use crate::format::{Diagnostics, Parsed, RecordRef, Warning, WarningKind};
8
9const RETAINED_CAP: usize = 1024;
10
11#[derive(Debug, Clone, PartialEq)]
12pub struct EpochSnapshot {
13 pub time_of_day: Option<NmeaTime>,
14 pub date: Option<NmeaDate>,
15 pub gga: Option<Gga>,
16 pub rmc: Option<Rmc>,
17 pub gll: Option<Gll>,
18 pub gst: Option<Gst>,
19 pub vtg: Option<Vtg>,
20 pub zda: Option<Zda>,
21 pub gsa: Vec<GsaEntry>,
22 pub gsv: Vec<GsvGroup>,
23 pub sentence_count: usize,
24 pub diagnostics: Diagnostics,
25}
26
27#[derive(Debug, Clone, PartialEq)]
28pub struct GsaEntry {
29 pub system: Option<crate::GnssSystem>,
30 pub gsa: Gsa,
31}
32
33#[derive(Debug, Clone, PartialEq)]
34pub struct GsvGroup {
35 pub talker: NmeaTalker,
36 pub signal: Option<NmeaSignalId>,
37 pub claimed_in_view: Option<u16>,
38 pub satellites: Vec<GsvSatellite>,
39 pub complete: bool,
40}
41
42impl EpochSnapshot {
43 fn empty(date: Option<NmeaDate>) -> Self {
44 Self {
45 time_of_day: None,
46 date,
47 gga: None,
48 rmc: None,
49 gll: None,
50 gst: None,
51 vtg: None,
52 zda: None,
53 gsa: Vec::new(),
54 gsv: Vec::new(),
55 sentence_count: 0,
56 diagnostics: Diagnostics::new(),
57 }
58 }
59
60 pub fn position(&self) -> Option<crate::Wgs84Geodetic> {
61 if let Some(gga) = &self.gga {
62 let latitude = gga.latitude?;
63 let longitude = gga.longitude?;
64 let altitude_msl_m = gga.altitude_msl_m?;
65 let geoid_separation_m = gga.geoid_separation_m?;
66 return crate::Wgs84Geodetic::new(
67 latitude.radians(),
68 longitude.radians(),
69 altitude_msl_m + geoid_separation_m,
70 )
71 .ok();
72 }
73 if let Some(rmc) = &self.rmc {
74 return crate::Wgs84Geodetic::new(
75 rmc.latitude?.radians(),
76 rmc.longitude?.radians(),
77 0.0,
78 )
79 .ok();
80 }
81 let gll = self.gll.as_ref()?;
82 crate::Wgs84Geodetic::new(gll.latitude?.radians(), gll.longitude?.radians(), 0.0).ok()
83 }
84
85 pub fn instant_utc(&self) -> Option<crate::astro::time::Instant> {
86 let date = self.date?;
87 let time = self.time_of_day?;
88 let second = f64::from(time.second) + f64::from(time.nanos) * 1.0e-9;
89 crate::astro::time::Instant::from_utc_civil(
90 i32::from(date.year),
91 i32::from(date.month),
92 i32::from(date.day),
93 i32::from(time.hour),
94 i32::from(time.minute),
95 second,
96 )
97 .ok()
98 }
99
100 pub fn pdop(&self) -> Option<f64> {
101 self.gsa.iter().find_map(|entry| entry.gsa.pdop)
102 }
103
104 pub fn hdop(&self) -> Option<f64> {
105 self.gsa.iter().find_map(|entry| entry.gsa.hdop)
106 }
107
108 pub fn vdop(&self) -> Option<f64> {
109 self.gsa.iter().find_map(|entry| entry.gsa.vdop)
110 }
111
112 pub fn used_satellites(&self) -> impl Iterator<Item = &NmeaSatNumber> {
113 self.gsa
114 .iter()
115 .flat_map(|entry| entry.gsa.satellites.iter())
116 }
117
118 pub fn satellites_in_view(&self) -> usize {
119 let mut seen = BTreeSet::new();
120 for group in &self.gsv {
121 for sat in &group.satellites {
122 if let Some(number) = sat.sat_number {
123 seen.insert((number.resolved, number.raw));
124 }
125 }
126 }
127 seen.len()
128 }
129}
130
131#[derive(Debug, Clone, PartialEq)]
132struct OpenEpoch {
133 snapshot: EpochSnapshot,
134 gsv_progress: Vec<GsvProgress>,
135}
136
137#[derive(Debug, Clone, PartialEq)]
138struct GsvProgress {
139 talker: NmeaTalker,
140 signal: Option<NmeaSignalId>,
141 total: u8,
142 next_expected: u8,
143}
144
145#[derive(Debug, Clone, PartialEq)]
146pub struct NmeaAccumulator {
147 current: Option<OpenEpoch>,
148 carried_date: Option<NmeaDate>,
149 previous_anchor: Option<NmeaTime>,
150 max_sentences_per_epoch: usize,
151 retained: Vec<u8>,
152 next_line: usize,
153}
154
155impl Default for NmeaAccumulator {
156 fn default() -> Self {
157 Self::new()
158 }
159}
160
161impl NmeaAccumulator {
162 pub fn new() -> Self {
163 Self {
164 current: None,
165 carried_date: None,
166 previous_anchor: None,
167 max_sentences_per_epoch: 256,
168 retained: Vec::new(),
169 next_line: 1,
170 }
171 }
172
173 pub fn with_date(date: NmeaDate) -> Self {
174 Self {
175 carried_date: Some(date),
176 ..Self::new()
177 }
178 }
179
180 pub fn with_max_sentences_per_epoch(mut self, max: usize) -> Self {
181 self.max_sentences_per_epoch = max.max(16);
182 self
183 }
184
185 pub fn push(&mut self, sentence: &NmeaSentence) -> Option<EpochSnapshot> {
186 let incoming_time = sentence_time(sentence);
187 let mut new_epoch_warning = false;
188 let mut completed = None;
189
190 if let Some(current) = &mut self.current {
191 if let (Some(current_time), Some(incoming)) =
192 (current.snapshot.time_of_day, incoming_time)
193 {
194 if current_time.key() != incoming.key() {
195 completed = self.current.take().map(|epoch| epoch.snapshot);
196 self.apply_boundary_date_policy(current_time, incoming, &mut new_epoch_warning);
197 }
198 }
199 }
200
201 if completed.is_none() && self.gsv_cycle_boundary(sentence) {
202 completed = self.current.take().map(|epoch| epoch.snapshot);
203 }
204
205 if completed.is_none()
206 && self
207 .current
208 .as_ref()
209 .is_some_and(|epoch| epoch.snapshot.sentence_count >= self.max_sentences_per_epoch)
210 {
211 if let Some(mut epoch) = self.current.take() {
212 epoch.snapshot.diagnostics.push_warning(Warning {
213 at: RecordRef::default(),
214 kind: WarningKind::Mismatch,
215 });
216 completed = Some(epoch.snapshot);
217 }
218 }
219
220 if self.current.is_none() {
221 self.current = Some(OpenEpoch {
222 snapshot: EpochSnapshot::empty(self.carried_date),
223 gsv_progress: Vec::new(),
224 });
225 if new_epoch_warning {
226 self.warn_current();
227 }
228 }
229 self.attach(sentence);
230 completed
231 }
232
233 pub fn push_bytes(&mut self, chunk: &[u8]) -> NmeaChunkOutput {
234 self.retained.extend_from_slice(chunk);
235 let mut output = NmeaChunkOutput::default();
236 loop {
237 let Some((pos, term_len)) = next_line_end(&self.retained) else {
238 if self.retained.len() > RETAINED_CAP {
239 super::push_error_skip_at(
240 &mut output.diagnostics,
241 super::NmeaError::NotFramed {
242 reason: "sentence over length cap",
243 },
244 RecordRef::at_line(self.next_line),
245 );
246 self.retained.clear();
247 self.next_line += 1;
248 }
249 break;
250 };
251 let line = self.retained.drain(..pos).collect::<Vec<_>>();
252 self.retained.drain(..term_len);
253 let line_no = self.next_line;
254 self.next_line += 1;
255 push_line(self, &line, line_no, &mut output);
256 }
257 output
258 }
259
260 pub fn finish(&mut self) -> Option<EpochSnapshot> {
261 if !self.retained.is_empty() {
262 let line = self.retained.drain(..).collect::<Vec<_>>();
263 let mut output = NmeaChunkOutput::default();
264 push_line(self, &line, self.next_line, &mut output);
265 self.next_line += 1;
266 }
267 self.current.take().map(|epoch| epoch.snapshot)
268 }
269
270 pub fn retained_len(&self) -> usize {
271 self.retained.len()
272 }
273
274 fn attach(&mut self, sentence: &NmeaSentence) {
275 let time = sentence_time(sentence);
276 let date = sentence_date(sentence);
277 if let Some(date) = date {
278 self.carried_date = Some(date);
279 }
280 let epoch = self.current.as_mut().expect("epoch is open");
281 if epoch.snapshot.time_of_day.is_none() {
282 epoch.snapshot.time_of_day = time;
283 }
284 if let Some(date) = date {
285 if epoch.snapshot.date.is_some_and(|existing| existing != date) {
286 epoch.snapshot.diagnostics.push_warning(Warning {
287 at: RecordRef::default(),
288 kind: WarningKind::Mismatch,
289 });
290 }
291 epoch.snapshot.date = Some(date);
292 } else if epoch.snapshot.date.is_none() {
293 epoch.snapshot.date = self.carried_date;
294 }
295 epoch.snapshot.sentence_count += 1;
296
297 match &sentence.body {
298 NmeaBody::Gga(gga) => attach_single(
299 &mut epoch.snapshot.gga,
300 gga,
301 &mut epoch.snapshot.diagnostics,
302 ),
303 NmeaBody::Rmc(rmc) => attach_single(
304 &mut epoch.snapshot.rmc,
305 rmc,
306 &mut epoch.snapshot.diagnostics,
307 ),
308 NmeaBody::Gll(gll) => attach_single(
309 &mut epoch.snapshot.gll,
310 gll,
311 &mut epoch.snapshot.diagnostics,
312 ),
313 NmeaBody::Gst(gst) => attach_single(
314 &mut epoch.snapshot.gst,
315 gst,
316 &mut epoch.snapshot.diagnostics,
317 ),
318 NmeaBody::Vtg(vtg) => attach_single(
319 &mut epoch.snapshot.vtg,
320 vtg,
321 &mut epoch.snapshot.diagnostics,
322 ),
323 NmeaBody::Zda(zda) => attach_single(
324 &mut epoch.snapshot.zda,
325 zda,
326 &mut epoch.snapshot.diagnostics,
327 ),
328 NmeaBody::Gsa(gsa) => attach_gsa(&mut epoch.snapshot, gsa),
329 NmeaBody::Gsv(gsv) => attach_gsv(
330 sentence.talker,
331 &mut epoch.snapshot,
332 &mut epoch.gsv_progress,
333 gsv,
334 ),
335 }
336 }
337
338 fn warn_current(&mut self) {
339 if let Some(current) = &mut self.current {
340 current.snapshot.diagnostics.push_warning(Warning {
341 at: RecordRef::default(),
342 kind: WarningKind::Mismatch,
343 });
344 }
345 }
346
347 fn gsv_cycle_boundary(&self, sentence: &NmeaSentence) -> bool {
348 let NmeaBody::Gsv(gsv) = &sentence.body else {
349 return false;
350 };
351 if gsv.message_number != 1 {
352 return false;
353 }
354 self.current.as_ref().is_some_and(|epoch| {
355 epoch.snapshot.gsv.iter().any(|group| {
356 group.talker == sentence.talker && group.signal == gsv.signal && group.complete
357 })
358 })
359 }
360
361 fn apply_boundary_date_policy(
362 &mut self,
363 current: NmeaTime,
364 incoming: NmeaTime,
365 warning: &mut bool,
366 ) {
367 self.previous_anchor = Some(current);
368 let current_ns = time_of_day_ns(current);
369 let incoming_ns = time_of_day_ns(incoming);
370 if current_ns > incoming_ns {
371 let delta = current_ns - incoming_ns;
372 if delta > 43_200_000_000_000 {
373 if let Some(date) = self.carried_date {
374 self.carried_date = Some(date.next_day());
375 }
376 } else {
377 *warning = true;
378 }
379 }
380 }
381}
382
383fn attach_single<T: Clone>(slot: &mut Option<T>, value: &T, diagnostics: &mut Diagnostics) {
384 if slot.is_none() {
385 *slot = Some(value.clone());
386 } else {
387 diagnostics.push_warning(Warning {
388 at: RecordRef::default(),
389 kind: WarningKind::Mismatch,
390 });
391 }
392}
393
394fn attach_gsa(snapshot: &mut EpochSnapshot, gsa: &Gsa) {
395 if let Some(existing) = snapshot
396 .gsa
397 .iter()
398 .find(|entry| entry.system.is_some() && entry.system == gsa.system)
399 {
400 if existing.gsa.pdop != gsa.pdop
401 || existing.gsa.hdop != gsa.hdop
402 || existing.gsa.vdop != gsa.vdop
403 {
404 snapshot.diagnostics.push_warning(Warning {
405 at: RecordRef::default(),
406 kind: WarningKind::Mismatch,
407 });
408 }
409 snapshot.diagnostics.push_warning(Warning {
410 at: RecordRef::default(),
411 kind: WarningKind::Mismatch,
412 });
413 return;
414 }
415 if let Some(first) = snapshot.gsa.first() {
416 if differing(first.gsa.pdop, gsa.pdop)
417 || differing(first.gsa.hdop, gsa.hdop)
418 || differing(first.gsa.vdop, gsa.vdop)
419 {
420 snapshot.diagnostics.push_warning(Warning {
421 at: RecordRef::default(),
422 kind: WarningKind::Mismatch,
423 });
424 }
425 }
426 snapshot.gsa.push(GsaEntry {
427 system: gsa.system,
428 gsa: gsa.clone(),
429 });
430}
431
432fn attach_gsv(
433 talker: NmeaTalker,
434 snapshot: &mut EpochSnapshot,
435 progress: &mut Vec<GsvProgress>,
436 gsv: &Gsv,
437) {
438 let group_index = snapshot
439 .gsv
440 .iter()
441 .position(|group| group.talker == talker && group.signal == gsv.signal);
442 let progress_index = progress
443 .iter()
444 .position(|entry| entry.talker == talker && entry.signal == gsv.signal);
445 match (group_index, progress_index) {
446 (Some(group_index), Some(progress_index)) => {
447 let expected = progress[progress_index].next_expected;
448 let total = progress[progress_index].total;
449 if gsv.total_messages != total || gsv.message_number != expected {
450 snapshot.diagnostics.push_warning(Warning {
451 at: RecordRef::default(),
452 kind: WarningKind::Mismatch,
453 });
454 snapshot.gsv[group_index].satellites = gsv.satellites.clone();
455 snapshot.gsv[group_index].claimed_in_view = gsv.satellites_in_view;
456 snapshot.gsv[group_index].complete = gsv.message_number == gsv.total_messages;
457 progress[progress_index] = GsvProgress {
458 talker,
459 signal: gsv.signal,
460 total: gsv.total_messages,
461 next_expected: gsv.message_number.saturating_add(1),
462 };
463 } else {
464 snapshot.gsv[group_index]
465 .satellites
466 .extend(gsv.satellites.clone());
467 snapshot.gsv[group_index].complete = gsv.message_number == total;
468 progress[progress_index].next_expected = expected.saturating_add(1);
469 }
470 if snapshot.gsv[group_index].complete {
471 check_gsv_count(snapshot, group_index);
472 }
473 }
474 _ => {
475 snapshot.gsv.push(GsvGroup {
476 talker,
477 signal: gsv.signal,
478 claimed_in_view: gsv.satellites_in_view,
479 satellites: gsv.satellites.clone(),
480 complete: gsv.message_number == gsv.total_messages,
481 });
482 progress.push(GsvProgress {
483 talker,
484 signal: gsv.signal,
485 total: gsv.total_messages,
486 next_expected: gsv.message_number.saturating_add(1),
487 });
488 if snapshot.gsv.last().is_some_and(|group| group.complete) {
489 let index = snapshot.gsv.len() - 1;
490 check_gsv_count(snapshot, index);
491 }
492 }
493 }
494}
495
496fn check_gsv_count(snapshot: &mut EpochSnapshot, group_index: usize) {
497 let group = &snapshot.gsv[group_index];
498 if let Some(claimed) = group.claimed_in_view {
499 let listed = group
500 .satellites
501 .iter()
502 .filter(|sat| sat.sat_number.is_some())
503 .count();
504 if usize::from(claimed) != listed {
505 snapshot.diagnostics.push_warning(Warning {
506 at: RecordRef::default(),
507 kind: WarningKind::Mismatch,
508 });
509 }
510 }
511}
512
513fn differing(a: Option<f64>, b: Option<f64>) -> bool {
514 matches!((a, b), (Some(a), Some(b)) if a != b)
515}
516
517fn sentence_time(sentence: &NmeaSentence) -> Option<NmeaTime> {
518 match &sentence.body {
519 NmeaBody::Gga(gga) => gga.time,
520 NmeaBody::Rmc(rmc) => rmc.time,
521 NmeaBody::Gll(gll) => gll.time,
522 NmeaBody::Gst(gst) => gst.time,
523 NmeaBody::Zda(zda) => zda.time,
524 NmeaBody::Gsa(_) | NmeaBody::Gsv(_) | NmeaBody::Vtg(_) => None,
525 }
526}
527
528fn sentence_date(sentence: &NmeaSentence) -> Option<NmeaDate> {
529 match &sentence.body {
530 NmeaBody::Rmc(rmc) => rmc.date,
531 NmeaBody::Zda(zda) => zda.date,
532 _ => None,
533 }
534}
535
536fn time_of_day_ns(time: NmeaTime) -> u64 {
537 ((u64::from(time.hour) * 3600 + u64::from(time.minute) * 60 + u64::from(time.second))
538 * 1_000_000_000)
539 + u64::from(time.nanos)
540}
541
542fn next_line_end(buffer: &[u8]) -> Option<(usize, usize)> {
543 let pos = buffer.iter().position(|&b| b == b'\n' || b == b'\r')?;
544 let term_len = if buffer[pos] == b'\r' && buffer.get(pos + 1) == Some(&b'\n') {
545 2
546 } else {
547 1
548 };
549 Some((pos, term_len))
550}
551
552fn push_line(
553 accumulator: &mut NmeaAccumulator,
554 line: &[u8],
555 line_number: usize,
556 output: &mut NmeaChunkOutput,
557) {
558 if line.is_empty() {
559 return;
560 }
561 let parsed = match std::str::from_utf8(line) {
562 Ok(line) => super::parse_sentence(line),
563 Err(_) => {
564 super::push_error_skip_at(
565 &mut output.diagnostics,
566 super::NmeaError::NotFramed {
567 reason: "non-ASCII byte",
568 },
569 RecordRef::at_line(line_number),
570 );
571 return;
572 }
573 };
574 match parsed {
575 Ok(Parsed {
576 value,
577 mut diagnostics,
578 }) => {
579 super::set_diagnostic_lines(&mut diagnostics, line_number);
580 super::merge_diagnostics(&mut output.diagnostics, diagnostics);
581 if let Some(snapshot) = accumulator.push(&value) {
582 output.snapshots.push(snapshot);
583 }
584 output.sentences.push(value);
585 }
586 Err(error) => super::push_error_skip_at(
587 &mut output.diagnostics,
588 error,
589 RecordRef::at_line(line_number),
590 ),
591 }
592}
593
594#[derive(Debug, Clone, PartialEq, Default)]
595pub struct NmeaChunkOutput {
596 pub snapshots: Vec<EpochSnapshot>,
597 pub sentences: Vec<NmeaSentence>,
598 pub diagnostics: Diagnostics,
599}