sameold/
receiver.rs

1//! Full receiver chain
2
3mod agc;
4mod assembler;
5mod builder;
6mod codesquelch;
7mod combiner;
8mod dcblock;
9mod demod;
10mod equalize;
11mod filter;
12mod framing;
13mod output;
14mod symsync;
15mod timeddata;
16mod waveform;
17
18#[cfg(not(test))]
19use log::{debug, info, trace, warn};
20
21#[cfg(test)]
22use std::{println as debug, println as trace, println as info, println as warn};
23
24use std::convert::From;
25use std::iter::{IntoIterator, Iterator};
26
27pub use self::builder::{EqualizerBuilder, SameReceiverBuilder};
28pub use self::output::{LinkState, SameEventType, SameReceiverEvent, TransportState};
29
30use crate::Message;
31
32use self::agc::Agc;
33use self::assembler::Assembler;
34use self::codesquelch::{CodeAndPowerSquelch, SquelchState};
35use self::dcblock::DCBlocker;
36use self::demod::{Demod, FskDemod};
37use self::equalize::Equalizer;
38use self::framing::Framer;
39use self::symsync::{SymbolEstimate, TimingLoop};
40
41/// A complete SAME/EAS receiver chain
42///
43/// The receive chain takes `f32` audio samples and
44/// performs the following operations:
45///
46/// 1. Automatic gain control
47/// 2. Demodulation and down-sampling to two samples
48///    per symbol, governed by a zero-crossing detector
49///    timing recovery loop.
50/// 3. Access code correlation and squelch. The code
51///    correlator also synchronizes to byte boundaries.
52/// 4. (Optional) Adaptive decision feedback equalization
53/// 5. Framing and message decoding
54///
55/// To create the receiver, first create its Builder:
56///
57/// ```
58/// use sameold::SameReceiverBuilder;
59///
60/// let mut builder = SameReceiverBuilder::default();
61/// let receiver = builder.build();
62/// assert_eq!(receiver.input_rate(), 22050);
63/// ```
64///
65/// Once created, use the
66/// [`iter_messages()`](SameReceiver::iter_messages)
67/// method to obtain decoded messages.
68///
69/// See [module documentation](index.html) for details.
70#[derive(Clone, Debug)]
71pub struct SameReceiver {
72    dc_block: DCBlocker,
73    agc: Agc,
74    demod: FskDemod,
75    symsync: TimingLoop,
76    squelch: CodeAndPowerSquelch,
77    equalizer: Equalizer,
78    framer: Framer,
79    assembler: Assembler,
80    timing_bandwidth_unlocked: f32,
81    timing_bandwidth_locked: f32,
82    input_rate: u32,
83    input_sample_counter: u64,
84    link_state: LinkState,
85    transport_state: TransportState,
86    event_queue: std::collections::VecDeque<SameReceiverEvent>,
87    ted_sample_clock: u32,
88    samples_until_next_ted: f32,
89    force_eom_at_sample: Option<u64>,
90}
91
92impl SameReceiver {
93    /// Decode events and messages from a source of audio
94    ///
95    /// Bind an iterator which will consume the `input` and
96    /// produce SAME [`SameReceiverEvent`] events, which include:
97    ///
98    /// * notifications about acquired and dropped carrier,
99    /// * attempts to frame messages; and
100    /// * successful framed messages
101    ///
102    /// The `input` must be f32 PCM mono audio at
103    /// the [`input_rate()`](SameReceiver::input_rate) for this
104    /// receiver. Sound cards commonly output audio samples
105    /// in `i16` format. You must perform the conversion to
106    /// floating-point yourself, if needed. It is unnecessary
107    /// to scale the converted values; our AGC algorithm will
108    /// take care of that.
109    ///
110    /// The iterator will consume as many samples of `input`
111    /// that are required to produce the next event. It will
112    /// return `None` if the input is exhausted and there
113    /// are no new events.
114    ///
115    /// You can use [`iter_messages()`](SameReceiver::iter_messages)
116    /// instead if you are only interested in successful
117    /// decodes.
118    #[must_use = "iterators are lazy and do nothing unless consumed"]
119    pub fn iter_events<'rx, I>(
120        &'rx mut self,
121        input: I,
122    ) -> impl Iterator<Item = SameReceiverEvent> + 'rx
123    where
124        I: IntoIterator<Item = f32> + 'rx,
125    {
126        SameReceiverIter {
127            receiver: self,
128            source: input.into_iter(),
129        }
130    }
131
132    /// Receive SAME messages from a source of audio
133    ///
134    /// Bind an iterator which will consume the `input` and
135    /// produce SAME [`Message`] events. Only
136    /// successfully-decoded messages are reported. Other
137    /// events, such as acquisition of signal or decoding
138    /// failures, are not reported. If you are interested in
139    /// these events, use
140    /// [`iter_events()`](SameReceiver::iter_events) instead.
141    ///
142    /// The `input` must be f32 PCM mono audio at
143    /// the [`input_rate()`](SameReceiver::input_rate) for this
144    /// receiver. Sound cards commonly output audio samples
145    /// in `i16` format. You must perform the conversion to
146    /// floating-point yourself, if needed. It is unnecessary
147    /// to scale the converted values; our AGC algorithm will
148    /// take care of that.
149    ///
150    /// The iterator will consume as many samples of `input`
151    /// that are required to produce the next message. It will
152    /// return `None` if the input is exhausted and there
153    /// are no new messages.
154    #[must_use = "iterators are lazy and do nothing unless consumed"]
155    pub fn iter_messages<'rx, I>(&'rx mut self, input: I) -> impl Iterator<Item = Message> + 'rx
156    where
157        I: IntoIterator<Item = f32> + 'rx,
158    {
159        self.iter_events(input)
160            .filter_map(|evt| evt.into_message_ok())
161    }
162
163    /// Input sampling rate
164    ///
165    /// Returns sampling rate expected by the
166    /// [`process()`](#method.process) method.
167    pub fn input_rate(&self) -> u32 {
168        self.input_rate
169    }
170
171    /// Lifetime total input sample counter
172    ///
173    /// Reports the lifetime total of input samples which
174    /// have been processed.
175    pub fn input_sample_counter(&self) -> u64 {
176        self.input_sample_counter
177    }
178
179    /// Clear all DSP states and reset to zero initial conditions
180    ///
181    /// All buffers and states are cleared.
182    pub fn reset(&mut self) {
183        self.dc_block.reset();
184        self.agc.reset();
185        self.demod.reset();
186        self.symsync.reset();
187        self.squelch.reset();
188        self.equalizer.reset();
189        self.framer.reset();
190        self.assembler.reset();
191        self.input_sample_counter = 0;
192        self.link_state = LinkState::NoCarrier;
193        self.transport_state = TransportState::Idle;
194        self.event_queue.clear();
195        self.ted_sample_clock = 0;
196        self.samples_until_next_ted = self.symsync.samples_per_ted();
197        self.force_eom_at_sample = None;
198    }
199
200    /// Flush the DSP buffers and emit any leftover messages
201    ///
202    /// The DSP algorithms impose delay on the input. When
203    /// processing recorded audio that has been "close cut"
204    /// to the extents of a message, the `SameReceiver` might
205    /// not emit the message. This is because not all of the
206    /// data samples from the file have made their way through
207    /// the entire system.
208    ///
209    /// This method flushes the input with an adequate number
210    /// of zeros to ensure all buffered samples have been
211    /// processed. Returns the last `Message` generated, if
212    /// any.
213    ///
214    /// You probably want to [`reset()`](#method.reset) after
215    /// calling this method.
216    pub fn flush(&mut self) -> Option<Message> {
217        let four_seconds_of_zeros = std::iter::repeat(0.0f32)
218            .zip(0..self.input_rate * 4)
219            .map(|(sa, _)| sa);
220        for msg in self.iter_messages(four_seconds_of_zeros) {
221            return Some(msg);
222        }
223        None
224    }
225
226    /// Process a sample
227    ///
228    /// Reads the given iterator of floating-point PCM audio samples.
229    /// The audio is demodulated and processed until it is either
230    /// exhausted or an event of interest to the modem occurs. If
231    /// one does, it is emitted.
232    #[inline]
233    fn process<I>(&mut self, audio_iter: &mut I) -> Option<SameReceiverEvent>
234    where
235        I: Iterator<Item = f32>,
236    {
237        // emit existing events
238        while let Some(evt) = self.event_queue.pop_front() {
239            return Some(evt);
240        }
241
242        // read audio source, process through all layers
243        for sample in audio_iter {
244            // link-layer processing
245            if let Some(link_state) = self.process_linklayer_high_rate(sample) {
246                if link_state != self.link_state {
247                    // report change
248                    self.link_state = link_state.clone();
249                    self.event_queue.push_back(SameReceiverEvent::new(
250                        self.link_state.clone(),
251                        self.input_sample_counter,
252                    ));
253                }
254
255                // transport-layer processing
256                if let Some(transport_state) = self
257                    .process_transportlayer(&link_state)
258                    .filter(|newstate| newstate != &self.transport_state)
259                {
260                    self.transport_state = transport_state;
261                    self.event_queue.push_back(SameReceiverEvent::new(
262                        self.transport_state.clone(),
263                        self.input_sample_counter,
264                    ));
265                }
266
267                if let Some(evt) = self.event_queue.pop_front() {
268                    return Some(evt);
269                }
270            }
271        }
272
273        None
274    }
275
276    // Transport-layer processing
277    //
278    // Accepts the new link state. New Bursts are immediately
279    // processed through the Assembler. Idle checks are also
280    // conducted to:
281    //
282    // 1. Detect "lingering" SAME messages which exceed the
283    //    maximum voice message length
284    //
285    // 2. Advise the Assembler if no further Bursts are
286    //    forthcoming
287    //
288    // Returns new transport-layer state if one is available.
289    #[inline]
290    #[must_use]
291    fn process_transportlayer(&mut self, link_state: &LinkState) -> Option<TransportState> {
292        let transport = match (link_state, self.force_eom_at_sample) {
293            (LinkState::Burst(burst_bytes), _) => {
294                // Process this burst
295                Some(
296                    self.assembler
297                        .assemble(burst_bytes, self.squelch.symbol_count()),
298                )
299            }
300            (LinkState::NoCarrier, Some(eom_timeout))
301                if self.input_sample_counter > eom_timeout =>
302            {
303                // Timed out waiting for EOM. Manually emit one.
304                warn!(
305                    "voice message timeout ({} s) exceeded; forcing end-of-message now",
306                    Self::MAX_MESSAGE_DURATION_SECS
307                );
308                Some(TransportState::Message(Ok(Message::EndOfMessage)))
309            }
310            (LinkState::NoCarrier, _) => {
311                // Perform idle processing
312                Some(self.assembler.idle(self.squelch.symbol_count()))
313            }
314            (_, _) => None,
315        }?;
316
317        match &transport {
318            TransportState::Message(Ok(Message::StartOfMessage(_))) => {
319                // set a timer to ensure we will eventually produce an EOM
320                // if we miss receipt
321                self.force_eom_at_sample = Some(
322                    self.input_sample_counter
323                        + Self::MAX_MESSAGE_DURATION_SECS * self.input_rate as u64,
324                );
325            }
326            TransportState::Message(Ok(Message::EndOfMessage)) => {
327                self.force_eom_at_sample = None;
328            }
329            _ => {}
330        };
331
332        Some(transport)
333    }
334
335    // Link-layer processing of one high-rate sample
336    //
337    // Accepts a floating-point PCM audio sample as `input`
338    // and updates the data link layer. Returns the updated
339    // link state if a "low-rate" sample was processed or
340    // `None` if only high-rate processing was performed.
341    #[inline]
342    #[must_use]
343    fn process_linklayer_high_rate(&mut self, input: f32) -> Option<LinkState> {
344        // high-rate processing: dc block, agc, and push onto demodulator's buffer
345        let sa = self.agc.input(self.dc_block.filter(input));
346        self.demod.push_scalar(sa);
347        self.ted_sample_clock += 1;
348        self.input_sample_counter = self.input_sample_counter.wrapping_add(1);
349
350        // compute time until we sample for the timing error detector
351        // positive → before time, negative → after time
352        let clock_remaining_sa = self.samples_until_next_ted - self.ted_sample_clock as f32;
353        if clock_remaining_sa <= 0.0f32 || clock_remaining_sa.abs() < 0.5f32 {
354            // process low-rate sample and look for state changes
355            self.ted_sample_clock = 0;
356            let symbol_est = self.process_linklayer_low_rate(clock_remaining_sa)?;
357            Some(self.process_linklayer_symbol(&symbol_est))
358        } else {
359            None
360        }
361    }
362
363    // Low-rate DSP at two samples per symbol
364    //
365    // `clock_remaining_sa` is the error between the current
366    // high-rate sample time (must be integer) and the
367    // commanded sample time (may be fractional), in
368    // high-rate samples.
369    // * positive → before time
370    // * negative → after time
371    //
372    // Performs demodulation and bit timing error detection.
373    // If a bit estimate is ready, returns it. Otherwise, returns
374    // `None`.
375    #[must_use]
376    fn process_linklayer_low_rate(&mut self, clock_remaining_sa: f32) -> Option<SymbolEstimate> {
377        // 1. demod from window
378        let sa_low = self.demod.demod();
379
380        // 2. symbol timing error detection
381        let sync_out = self.symsync.input(sa_low, clock_remaining_sa);
382        self.samples_until_next_ted = sync_out.0;
383        let bit_samples = sync_out.1?;
384
385        if self.squelch.symbol_count() % Self::TRACE_LOG_INTERVAL_SYMS == 0 {
386            trace!(
387                "[{:<14}]: signal magnitude {:0.1}, symbol power: {:0.2}",
388                self.input_sample_counter(),
389                1.0f32 / self.agc.gain(),
390                self.squelch.power()
391            );
392        }
393
394        Some(bit_samples)
395    }
396
397    // Process a bit estimate from the bit timing error detector
398    //
399    // Performs byte synchronization against the SAME preamble
400    // (`0xAB`) and framing.
401    //
402    // Returns updated link state if carrier was detected and
403    // the signal was processed all the way through to the framer.
404    // Otherwise, returns `None`.
405    #[inline]
406    #[must_use]
407    fn process_linklayer_symbol(&mut self, symbol: &SymbolEstimate) -> LinkState {
408        // 3. power and access code correlation squelch
409        let (is_resync, squelch_out) = match self.squelch.input(&symbol.data) {
410            SquelchState::NoCarrier => {
411                // end any frame in progress
412                return self.framer.end();
413            }
414            SquelchState::DroppedCarrier => {
415                // end any frame in progress, and reset DSP
416                self.end();
417                return self.framer.end();
418            }
419            SquelchState::Reading => {
420                // byte not yet ready
421                return self.framer.state();
422            }
423            SquelchState::Ready(true, byte_est) => {
424                // when byte sync is achieved, lock down the AGC
425                // and bit synchronizer. Put the equalizer
426                // in training mode
427                debug!(
428                    "[{:<14}]: entering tracking mode",
429                    self.input_sample_counter()
430                );
431                self.agc.lock(true);
432                self.symsync
433                    .set_loop_bandwidth(self.timing_bandwidth_locked);
434                self.equalizer
435                    .train()
436                    .expect("equalizer missing training sequence");
437                (true, byte_est)
438            }
439            SquelchState::Ready(false, byte_est) => {
440                // byte ready, no resync
441                (false, byte_est)
442            }
443        };
444
445        // 4. adaptive equalization
446        let (byte_est, adaptive_err) = self.equalizer.input(&squelch_out.samples);
447
448        trace!(
449            "byte: {:#04x} \"{:?}\", sym pwr: {:0.2}, adapt err: {:0.2}",
450            byte_est,
451            byte_est as char,
452            squelch_out.power,
453            adaptive_err
454        );
455
456        // 5. framing
457        let link_state = self
458            .framer
459            .input(byte_est, squelch_out.symbol_counter, is_resync);
460        match &link_state {
461            LinkState::Reading => {
462                // prevent sync-like sequences in the message data
463                // from changing the sync
464                self.squelch.lock(true);
465            }
466            LinkState::NoCarrier | LinkState::Burst(_) => {
467                // reset DSP
468                self.end()
469            }
470            _ => {}
471        }
472
473        link_state
474    }
475
476    // Handle "no carrier" / loss of signal
477    //
478    // Resets all locked DSPs, including the squelch.
479    fn end(&mut self) {
480        self.agc.lock(false);
481        self.squelch.end();
482        self.equalizer.reset();
483        self.symsync
484            .set_loop_bandwidth(self.timing_bandwidth_unlocked);
485        self.symsync.reset();
486        debug!(
487            "[{:<14}]: returning to acquisition mode",
488            self.input_sample_counter()
489        );
490    }
491
492    // Maximum length of a SAME/EAS voice message
493    //
494    // This is the maximum length of the analog voice message, and
495    // *NOT* the length of the digital data
496    const MAX_MESSAGE_DURATION_SECS: u64 = 135;
497
498    // Print trace-level messages about once per second
499    const TRACE_LOG_INTERVAL_SYMS: u64 = 520;
500}
501
502impl From<&SameReceiverBuilder> for SameReceiver {
503    /// Create the SAME Receiver from its Builder
504    fn from(cfg: &SameReceiverBuilder) -> Self {
505        let input_rate = cfg.input_rate();
506        let sps = waveform::samples_per_symbol(input_rate);
507        let (timing_bandwidth_unlocked, timing_bandwidth_locked) = cfg.timing_bandwidth();
508        let (power_open, power_close) = cfg.squelch_power();
509        let dc_block = DCBlocker::new((cfg.dc_blocker_length() * sps) as usize);
510        let agc = Agc::new(
511            cfg.agc_bandwidth() * sps / input_rate as f32,
512            cfg.agc_gain_limits()[0],
513            cfg.agc_gain_limits()[1],
514        );
515        let demod = FskDemod::new_from_same(cfg.input_rate());
516        let symsync = TimingLoop::new(sps, timing_bandwidth_unlocked, cfg.timing_max_deviation());
517        let code_squelch = CodeAndPowerSquelch::new(
518            waveform::PREAMBLE_SYNC_WORD,
519            cfg.preamble_max_errors(),
520            power_open,
521            power_close,
522            cfg.squelch_bandwidth(),
523        );
524        let eqcfg = match cfg.adaptive_equalizer() {
525            Some(eqcfg) => *eqcfg,
526            None => disabled_equalizer(),
527        };
528        let equalizer = Equalizer::new(
529            eqcfg.filter_order().0,
530            eqcfg.filter_order().1,
531            eqcfg.relaxation(),
532            eqcfg.regularization(),
533            Some(waveform::PREAMBLE_SYNC_WORD),
534        );
535        let framer = Framer::new(cfg.frame_prefix_max_errors(), cfg.frame_max_invalid());
536
537        let samples_until_next_ted = symsync.samples_per_ted();
538
539        Self {
540            dc_block,
541            agc,
542            demod,
543            symsync,
544            squelch: code_squelch,
545            equalizer,
546            framer,
547            assembler: Assembler::default(),
548            timing_bandwidth_unlocked,
549            timing_bandwidth_locked,
550            input_rate,
551            input_sample_counter: 0,
552            link_state: LinkState::NoCarrier,
553            transport_state: TransportState::Idle,
554            event_queue: std::collections::VecDeque::with_capacity(2),
555            ted_sample_clock: 0,
556            samples_until_next_ted,
557            force_eom_at_sample: None,
558        }
559    }
560}
561
562#[derive(Debug)]
563struct SameReceiverIter<'rx, I>
564where
565    I: Iterator<Item = f32>,
566{
567    source: I,
568    receiver: &'rx mut SameReceiver,
569}
570
571impl<'rx, 'data, I> Iterator for SameReceiverIter<'rx, I>
572where
573    I: Iterator<Item = f32>,
574{
575    type Item = SameReceiverEvent;
576
577    fn next(&mut self) -> Option<Self::Item> {
578        self.receiver.process(&mut self.source).and_then(|evt| {
579            info!("{}", &evt);
580            Some(evt)
581        })
582    }
583}
584
585fn disabled_equalizer() -> EqualizerBuilder {
586    let mut out = EqualizerBuilder::new();
587    out.with_filter_order(1, 1);
588    out.with_relaxation(0.0);
589    out
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595
596    use std::io::Write;
597
598    const TEST_MESSAGE: &str = "ZCZC-EAS-DMO-372088-091724-919623-645687-745748-175234-039940-955869-091611-304171-931612-334828-179485-569615-809223-830187-611340-014693-472885-084645-977764-466883-406863-390018-701741-058097-752790-311648-820127-255900-581947+0000-0001122-NOCALL00-";
599
600    // this method exists to allow us to dump the modulated
601    // waveform to a file
602    #[allow(dead_code)]
603    fn dump_file(out: &[f32], filename: &str) {
604        let mut f = std::fs::File::create(filename).expect("Unable to create file");
605        for &i in out {
606            f.write_all(&(i as i16).to_ne_bytes())
607                .expect("Unable to write data");
608        }
609    }
610
611    fn make_test_message(payload: &[u8]) -> Vec<u8> {
612        const PREAMBLE: &[u8] = &[waveform::PREAMBLE; 16];
613
614        let mut message: Vec<u8> = vec![];
615        message.extend_from_slice(PREAMBLE);
616        message.extend_from_slice(payload);
617        message
618    }
619
620    // Create test burst
621    //
622    // Returns waveform and number of samples per symbol, at 22.5 kSa/s
623    // The returned waveform has `num_bursts` bursts (minimum 1)
624    fn make_test_burst(msg: &[u8], num_bursts: usize) -> (Vec<f32>, usize) {
625        let sample_low = waveform::bytes_to_samples(msg, 1);
626        let (sample_high, sps) = waveform::modulate_afsk(&sample_low, 22050);
627
628        // scale like we're using i16, deliberately not using full arithmetic range
629        let burst: Vec<f32> = sample_high.iter().map(|&v| (v * 16384.0f32)).collect();
630
631        let mut out = burst.clone();
632        for _i in 1..num_bursts {
633            out.extend(std::iter::repeat(0.0f32).take(22050));
634            out.extend(burst.iter());
635        }
636        out.extend(std::iter::repeat(0.0f32).take(2 * 22050));
637
638        (out, sps)
639    }
640
641    #[test]
642    fn test_iter_events() {
643        let (afsk, _) = make_test_burst(&make_test_message(TEST_MESSAGE.as_bytes()), 1);
644
645        let mut rx = SameReceiverBuilder::new(22050)
646            .with_timing_max_deviation(0.01)
647            .build();
648
649        let mut found = 0usize;
650        for (idx, evt) in rx.iter_events(afsk.iter().map(|sa| *sa)).enumerate() {
651            match (idx, evt.what()) {
652                (0, SameEventType::Link(LinkState::Searching)) => {
653                    found += 1;
654                }
655                (1, SameEventType::Link(LinkState::Reading)) => {
656                    found += 1;
657                }
658                (2, SameEventType::Link(LinkState::Burst(data))) => {
659                    assert!(data.starts_with(TEST_MESSAGE.as_bytes()));
660                    found += 1;
661                }
662                (3, SameEventType::Transport(TransportState::Assembling)) => {
663                    found += 1;
664                }
665                (4, SameEventType::Link(LinkState::NoCarrier)) => {
666                    found += 1;
667                }
668                _ => {
669                    unreachable!()
670                }
671            }
672        }
673
674        assert_eq!(found, 5);
675    }
676
677    #[test]
678    fn test_top_level_receiver() {
679        let (afsk, _) = make_test_burst(&make_test_message(TEST_MESSAGE.as_bytes()), 3);
680
681        // uncomment me to dump the output
682        //dump_file(&afsk, "output.bin");
683
684        let mut rx = SameReceiverBuilder::new(22050)
685            .with_timing_max_deviation(0.01)
686            .build();
687
688        println!("{:?}", rx);
689
690        let out = rx
691            .iter_messages(afsk.iter().map(|sa| *sa))
692            .next()
693            .expect("expected message");
694        assert_eq!(TEST_MESSAGE, out.as_str());
695
696        // we're waiting for EOM
697        assert!(rx.force_eom_at_sample.is_some());
698
699        // force EOM due to timeout
700        //   we flush with four seconds of zeros, so putting us 3 seconds
701        //   away from timeout will get the job done during a flush()
702        rx.input_sample_counter = rx.force_eom_at_sample.unwrap() - 3 * rx.input_rate as u64;
703        let msg = rx.flush();
704        assert_eq!(Some(Message::EndOfMessage), msg);
705    }
706}