1mod agc;
4mod assembler;
5mod builder;
6mod codesquelch;
7mod combiner;
8mod dcblock;
9mod demod;
10mod equalize;
11mod filter;
12mod framing;
13mod output;
14mod symsync;
15mod timeddata;
16mod waveform;
17
18#[cfg(not(test))]
19use log::{debug, info, trace, warn};
20
21#[cfg(test)]
22use std::{println as debug, println as trace, println as info, println as warn};
23
24use std::convert::From;
25use std::iter::{IntoIterator, Iterator};
26
27pub use self::builder::{EqualizerBuilder, SameReceiverBuilder};
28pub use self::output::{LinkState, SameEventType, SameReceiverEvent, TransportState};
29
30use crate::Message;
31
32use self::agc::Agc;
33use self::assembler::Assembler;
34use self::codesquelch::{CodeAndPowerSquelch, SquelchState};
35use self::dcblock::DCBlocker;
36use self::demod::{Demod, FskDemod};
37use self::equalize::Equalizer;
38use self::framing::Framer;
39use self::symsync::{SymbolEstimate, TimingLoop};
40
41#[derive(Clone, Debug)]
71pub struct SameReceiver {
72 dc_block: DCBlocker,
73 agc: Agc,
74 demod: FskDemod,
75 symsync: TimingLoop,
76 squelch: CodeAndPowerSquelch,
77 equalizer: Equalizer,
78 framer: Framer,
79 assembler: Assembler,
80 timing_bandwidth_unlocked: f32,
81 timing_bandwidth_locked: f32,
82 input_rate: u32,
83 input_sample_counter: u64,
84 link_state: LinkState,
85 transport_state: TransportState,
86 event_queue: std::collections::VecDeque<SameReceiverEvent>,
87 ted_sample_clock: u32,
88 samples_until_next_ted: f32,
89 force_eom_at_sample: Option<u64>,
90}
91
92impl SameReceiver {
93 #[must_use = "iterators are lazy and do nothing unless consumed"]
119 pub fn iter_events<'rx, I>(
120 &'rx mut self,
121 input: I,
122 ) -> impl Iterator<Item = SameReceiverEvent> + 'rx
123 where
124 I: IntoIterator<Item = f32> + 'rx,
125 {
126 SameReceiverIter {
127 receiver: self,
128 source: input.into_iter(),
129 }
130 }
131
132 #[must_use = "iterators are lazy and do nothing unless consumed"]
155 pub fn iter_messages<'rx, I>(&'rx mut self, input: I) -> impl Iterator<Item = Message> + 'rx
156 where
157 I: IntoIterator<Item = f32> + 'rx,
158 {
159 self.iter_events(input)
160 .filter_map(|evt| evt.into_message_ok())
161 }
162
163 pub fn input_rate(&self) -> u32 {
168 self.input_rate
169 }
170
171 pub fn input_sample_counter(&self) -> u64 {
176 self.input_sample_counter
177 }
178
179 pub fn reset(&mut self) {
183 self.dc_block.reset();
184 self.agc.reset();
185 self.demod.reset();
186 self.symsync.reset();
187 self.squelch.reset();
188 self.equalizer.reset();
189 self.framer.reset();
190 self.assembler.reset();
191 self.input_sample_counter = 0;
192 self.link_state = LinkState::NoCarrier;
193 self.transport_state = TransportState::Idle;
194 self.event_queue.clear();
195 self.ted_sample_clock = 0;
196 self.samples_until_next_ted = self.symsync.samples_per_ted();
197 self.force_eom_at_sample = None;
198 }
199
200 pub fn flush(&mut self) -> Option<Message> {
217 let four_seconds_of_zeros = std::iter::repeat(0.0f32)
218 .zip(0..self.input_rate * 4)
219 .map(|(sa, _)| sa);
220 for msg in self.iter_messages(four_seconds_of_zeros) {
221 return Some(msg);
222 }
223 None
224 }
225
226 #[inline]
233 fn process<I>(&mut self, audio_iter: &mut I) -> Option<SameReceiverEvent>
234 where
235 I: Iterator<Item = f32>,
236 {
237 while let Some(evt) = self.event_queue.pop_front() {
239 return Some(evt);
240 }
241
242 for sample in audio_iter {
244 if let Some(link_state) = self.process_linklayer_high_rate(sample) {
246 if link_state != self.link_state {
247 self.link_state = link_state.clone();
249 self.event_queue.push_back(SameReceiverEvent::new(
250 self.link_state.clone(),
251 self.input_sample_counter,
252 ));
253 }
254
255 if let Some(transport_state) = self
257 .process_transportlayer(&link_state)
258 .filter(|newstate| newstate != &self.transport_state)
259 {
260 self.transport_state = transport_state;
261 self.event_queue.push_back(SameReceiverEvent::new(
262 self.transport_state.clone(),
263 self.input_sample_counter,
264 ));
265 }
266
267 if let Some(evt) = self.event_queue.pop_front() {
268 return Some(evt);
269 }
270 }
271 }
272
273 None
274 }
275
276 #[inline]
290 #[must_use]
291 fn process_transportlayer(&mut self, link_state: &LinkState) -> Option<TransportState> {
292 let transport = match (link_state, self.force_eom_at_sample) {
293 (LinkState::Burst(burst_bytes), _) => {
294 Some(
296 self.assembler
297 .assemble(burst_bytes, self.squelch.symbol_count()),
298 )
299 }
300 (LinkState::NoCarrier, Some(eom_timeout))
301 if self.input_sample_counter > eom_timeout =>
302 {
303 warn!(
305 "voice message timeout ({} s) exceeded; forcing end-of-message now",
306 Self::MAX_MESSAGE_DURATION_SECS
307 );
308 Some(TransportState::Message(Ok(Message::EndOfMessage)))
309 }
310 (LinkState::NoCarrier, _) => {
311 Some(self.assembler.idle(self.squelch.symbol_count()))
313 }
314 (_, _) => None,
315 }?;
316
317 match &transport {
318 TransportState::Message(Ok(Message::StartOfMessage(_))) => {
319 self.force_eom_at_sample = Some(
322 self.input_sample_counter
323 + Self::MAX_MESSAGE_DURATION_SECS * self.input_rate as u64,
324 );
325 }
326 TransportState::Message(Ok(Message::EndOfMessage)) => {
327 self.force_eom_at_sample = None;
328 }
329 _ => {}
330 };
331
332 Some(transport)
333 }
334
335 #[inline]
342 #[must_use]
343 fn process_linklayer_high_rate(&mut self, input: f32) -> Option<LinkState> {
344 let sa = self.agc.input(self.dc_block.filter(input));
346 self.demod.push_scalar(sa);
347 self.ted_sample_clock += 1;
348 self.input_sample_counter = self.input_sample_counter.wrapping_add(1);
349
350 let clock_remaining_sa = self.samples_until_next_ted - self.ted_sample_clock as f32;
353 if clock_remaining_sa <= 0.0f32 || clock_remaining_sa.abs() < 0.5f32 {
354 self.ted_sample_clock = 0;
356 let symbol_est = self.process_linklayer_low_rate(clock_remaining_sa)?;
357 Some(self.process_linklayer_symbol(&symbol_est))
358 } else {
359 None
360 }
361 }
362
363 #[must_use]
376 fn process_linklayer_low_rate(&mut self, clock_remaining_sa: f32) -> Option<SymbolEstimate> {
377 let sa_low = self.demod.demod();
379
380 let sync_out = self.symsync.input(sa_low, clock_remaining_sa);
382 self.samples_until_next_ted = sync_out.0;
383 let bit_samples = sync_out.1?;
384
385 if self.squelch.symbol_count() % Self::TRACE_LOG_INTERVAL_SYMS == 0 {
386 trace!(
387 "[{:<14}]: signal magnitude {:0.1}, symbol power: {:0.2}",
388 self.input_sample_counter(),
389 1.0f32 / self.agc.gain(),
390 self.squelch.power()
391 );
392 }
393
394 Some(bit_samples)
395 }
396
397 #[inline]
406 #[must_use]
407 fn process_linklayer_symbol(&mut self, symbol: &SymbolEstimate) -> LinkState {
408 let (is_resync, squelch_out) = match self.squelch.input(&symbol.data) {
410 SquelchState::NoCarrier => {
411 return self.framer.end();
413 }
414 SquelchState::DroppedCarrier => {
415 self.end();
417 return self.framer.end();
418 }
419 SquelchState::Reading => {
420 return self.framer.state();
422 }
423 SquelchState::Ready(true, byte_est) => {
424 debug!(
428 "[{:<14}]: entering tracking mode",
429 self.input_sample_counter()
430 );
431 self.agc.lock(true);
432 self.symsync
433 .set_loop_bandwidth(self.timing_bandwidth_locked);
434 self.equalizer
435 .train()
436 .expect("equalizer missing training sequence");
437 (true, byte_est)
438 }
439 SquelchState::Ready(false, byte_est) => {
440 (false, byte_est)
442 }
443 };
444
445 let (byte_est, adaptive_err) = self.equalizer.input(&squelch_out.samples);
447
448 trace!(
449 "byte: {:#04x} \"{:?}\", sym pwr: {:0.2}, adapt err: {:0.2}",
450 byte_est,
451 byte_est as char,
452 squelch_out.power,
453 adaptive_err
454 );
455
456 let link_state = self
458 .framer
459 .input(byte_est, squelch_out.symbol_counter, is_resync);
460 match &link_state {
461 LinkState::Reading => {
462 self.squelch.lock(true);
465 }
466 LinkState::NoCarrier | LinkState::Burst(_) => {
467 self.end()
469 }
470 _ => {}
471 }
472
473 link_state
474 }
475
476 fn end(&mut self) {
480 self.agc.lock(false);
481 self.squelch.end();
482 self.equalizer.reset();
483 self.symsync
484 .set_loop_bandwidth(self.timing_bandwidth_unlocked);
485 self.symsync.reset();
486 debug!(
487 "[{:<14}]: returning to acquisition mode",
488 self.input_sample_counter()
489 );
490 }
491
492 const MAX_MESSAGE_DURATION_SECS: u64 = 135;
497
498 const TRACE_LOG_INTERVAL_SYMS: u64 = 520;
500}
501
502impl From<&SameReceiverBuilder> for SameReceiver {
503 fn from(cfg: &SameReceiverBuilder) -> Self {
505 let input_rate = cfg.input_rate();
506 let sps = waveform::samples_per_symbol(input_rate);
507 let (timing_bandwidth_unlocked, timing_bandwidth_locked) = cfg.timing_bandwidth();
508 let (power_open, power_close) = cfg.squelch_power();
509 let dc_block = DCBlocker::new((cfg.dc_blocker_length() * sps) as usize);
510 let agc = Agc::new(
511 cfg.agc_bandwidth() * sps / input_rate as f32,
512 cfg.agc_gain_limits()[0],
513 cfg.agc_gain_limits()[1],
514 );
515 let demod = FskDemod::new_from_same(cfg.input_rate());
516 let symsync = TimingLoop::new(sps, timing_bandwidth_unlocked, cfg.timing_max_deviation());
517 let code_squelch = CodeAndPowerSquelch::new(
518 waveform::PREAMBLE_SYNC_WORD,
519 cfg.preamble_max_errors(),
520 power_open,
521 power_close,
522 cfg.squelch_bandwidth(),
523 );
524 let eqcfg = match cfg.adaptive_equalizer() {
525 Some(eqcfg) => *eqcfg,
526 None => disabled_equalizer(),
527 };
528 let equalizer = Equalizer::new(
529 eqcfg.filter_order().0,
530 eqcfg.filter_order().1,
531 eqcfg.relaxation(),
532 eqcfg.regularization(),
533 Some(waveform::PREAMBLE_SYNC_WORD),
534 );
535 let framer = Framer::new(cfg.frame_prefix_max_errors(), cfg.frame_max_invalid());
536
537 let samples_until_next_ted = symsync.samples_per_ted();
538
539 Self {
540 dc_block,
541 agc,
542 demod,
543 symsync,
544 squelch: code_squelch,
545 equalizer,
546 framer,
547 assembler: Assembler::default(),
548 timing_bandwidth_unlocked,
549 timing_bandwidth_locked,
550 input_rate,
551 input_sample_counter: 0,
552 link_state: LinkState::NoCarrier,
553 transport_state: TransportState::Idle,
554 event_queue: std::collections::VecDeque::with_capacity(2),
555 ted_sample_clock: 0,
556 samples_until_next_ted,
557 force_eom_at_sample: None,
558 }
559 }
560}
561
562#[derive(Debug)]
563struct SameReceiverIter<'rx, I>
564where
565 I: Iterator<Item = f32>,
566{
567 source: I,
568 receiver: &'rx mut SameReceiver,
569}
570
571impl<'rx, 'data, I> Iterator for SameReceiverIter<'rx, I>
572where
573 I: Iterator<Item = f32>,
574{
575 type Item = SameReceiverEvent;
576
577 fn next(&mut self) -> Option<Self::Item> {
578 self.receiver.process(&mut self.source).and_then(|evt| {
579 info!("{}", &evt);
580 Some(evt)
581 })
582 }
583}
584
585fn disabled_equalizer() -> EqualizerBuilder {
586 let mut out = EqualizerBuilder::new();
587 out.with_filter_order(1, 1);
588 out.with_relaxation(0.0);
589 out
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595
596 use std::io::Write;
597
598 const TEST_MESSAGE: &str = "ZCZC-EAS-DMO-372088-091724-919623-645687-745748-175234-039940-955869-091611-304171-931612-334828-179485-569615-809223-830187-611340-014693-472885-084645-977764-466883-406863-390018-701741-058097-752790-311648-820127-255900-581947+0000-0001122-NOCALL00-";
599
600 #[allow(dead_code)]
603 fn dump_file(out: &[f32], filename: &str) {
604 let mut f = std::fs::File::create(filename).expect("Unable to create file");
605 for &i in out {
606 f.write_all(&(i as i16).to_ne_bytes())
607 .expect("Unable to write data");
608 }
609 }
610
611 fn make_test_message(payload: &[u8]) -> Vec<u8> {
612 const PREAMBLE: &[u8] = &[waveform::PREAMBLE; 16];
613
614 let mut message: Vec<u8> = vec![];
615 message.extend_from_slice(PREAMBLE);
616 message.extend_from_slice(payload);
617 message
618 }
619
620 fn make_test_burst(msg: &[u8], num_bursts: usize) -> (Vec<f32>, usize) {
625 let sample_low = waveform::bytes_to_samples(msg, 1);
626 let (sample_high, sps) = waveform::modulate_afsk(&sample_low, 22050);
627
628 let burst: Vec<f32> = sample_high.iter().map(|&v| (v * 16384.0f32)).collect();
630
631 let mut out = burst.clone();
632 for _i in 1..num_bursts {
633 out.extend(std::iter::repeat(0.0f32).take(22050));
634 out.extend(burst.iter());
635 }
636 out.extend(std::iter::repeat(0.0f32).take(2 * 22050));
637
638 (out, sps)
639 }
640
641 #[test]
642 fn test_iter_events() {
643 let (afsk, _) = make_test_burst(&make_test_message(TEST_MESSAGE.as_bytes()), 1);
644
645 let mut rx = SameReceiverBuilder::new(22050)
646 .with_timing_max_deviation(0.01)
647 .build();
648
649 let mut found = 0usize;
650 for (idx, evt) in rx.iter_events(afsk.iter().map(|sa| *sa)).enumerate() {
651 match (idx, evt.what()) {
652 (0, SameEventType::Link(LinkState::Searching)) => {
653 found += 1;
654 }
655 (1, SameEventType::Link(LinkState::Reading)) => {
656 found += 1;
657 }
658 (2, SameEventType::Link(LinkState::Burst(data))) => {
659 assert!(data.starts_with(TEST_MESSAGE.as_bytes()));
660 found += 1;
661 }
662 (3, SameEventType::Transport(TransportState::Assembling)) => {
663 found += 1;
664 }
665 (4, SameEventType::Link(LinkState::NoCarrier)) => {
666 found += 1;
667 }
668 _ => {
669 unreachable!()
670 }
671 }
672 }
673
674 assert_eq!(found, 5);
675 }
676
677 #[test]
678 fn test_top_level_receiver() {
679 let (afsk, _) = make_test_burst(&make_test_message(TEST_MESSAGE.as_bytes()), 3);
680
681 let mut rx = SameReceiverBuilder::new(22050)
685 .with_timing_max_deviation(0.01)
686 .build();
687
688 println!("{:?}", rx);
689
690 let out = rx
691 .iter_messages(afsk.iter().map(|sa| *sa))
692 .next()
693 .expect("expected message");
694 assert_eq!(TEST_MESSAGE, out.as_str());
695
696 assert!(rx.force_eom_at_sample.is_some());
698
699 rx.input_sample_counter = rx.force_eom_at_sample.unwrap() - 3 * rx.input_rate as u64;
703 let msg = rx.flush();
704 assert_eq!(Some(Message::EndOfMessage), msg);
705 }
706}