Skip to main content

sdr_acars/
channel.rs

1//! Multi-channel ACARS decoder. Source-rate complex IQ feeds
2//! N parallel per-channel pipelines (oscillator + decimator
3//! → AM detect → MSK demod → frame parser).
4//!
5//! Faithful port of acarsdec's `rtl.c` per-channel
6//! decimation — the IQ-fork pattern. Single-threaded inline
7//! processing per `process()` call; no internal threads, no
8//! mutex.
9//!
10//! # Magnitude-calibration deviation
11//!
12//! The C divides each `wf[ind]` by `rtlMult` (= `decim_factor`)
13//! AND by `127.5` (RTL-SDR's u8 sample normalization). We do
14//! NEITHER — our IQ source produces pre-normalized
15//! `Complex<f32>` (no `/127.5` needed), and skipping
16//! `/decim_factor` scales `accum.norm()` up by a constant
17//! factor of `decim_factor`. This affects ONLY the level-dB
18//! metadata reported per message (volatile in the e2e diff,
19//! stripped before comparing); decode correctness is
20//! unaffected because [`crate::msk::MskDemod`] normalizes the
21//! matched-filter output internally.
22
23use num_complex::Complex32;
24
25use crate::error::AcarsError;
26use crate::frame::{AcarsMessage, FrameParser};
27use crate::msk::{IF_RATE_HZ, MskDemod};
28
29/// Per-channel state. Owns its oscillator, decimator
30/// accumulator, MSK demod, and frame parser. Private — only
31/// [`ChannelBank`] composes one.
32struct Channel {
33    /// Pre-computed complex exponential at `-offset_hz`,
34    /// sampled at source rate. Length = `decim_factor`. The
35    /// "free running" oscillator extension uses
36    /// `(osc_idx + n) mod decim_factor` wrap-around — at the
37    /// instant `decim_count` reaches `decim_factor`,
38    /// `osc_idx` has also wrapped back to 0, matching the
39    /// C's per-block `for (ind = 0; ind < rtlMult; ind++)`
40    /// init in `rtl.c::in_callback`.
41    oscillator: Vec<Complex32>,
42    /// Where in `oscillator` we are this block. Persists
43    /// across `process()` calls so the oscillator is
44    /// continuous across IQ-block boundaries.
45    osc_idx: usize,
46    /// Decimation accumulator state. Mirrors the C's local
47    /// `float complex D` in `rtl.c::in_callback`, but lifted
48    /// to per-Channel state so it survives partial blocks.
49    accum: Complex32,
50    /// Counter within the current decim period.
51    decim_count: u32,
52    /// Decimation factor (`source_rate / IF_RATE_HZ`). Mirrors
53    /// C `rtlMult`.
54    decim_factor: u32,
55    /// Buffer of decimated IF samples (AM-detected real
56    /// `f32`) to feed into [`MskDemod::process`]. Cleared at
57    /// the start of each `ChannelBank::process` call.
58    if_buffer: Vec<f32>,
59    msk: MskDemod,
60    parser: FrameParser,
61    /// Per-channel multi-block reassembler.
62    /// Frames emerging from `parser.drain` flow through here
63    /// before reaching `process`'s `on_message` callback —
64    /// ETB blocks park; ETX blocks emit reassembled (or
65    /// pass-through if no pending). Per-channel rather than
66    /// shared because aircraft don't normally hop channels
67    /// mid-message and channel-isolated state machines are
68    /// simpler to reason about.
69    assembler: crate::reassembly::MessageAssembler,
70}
71
72/// No-signal floor (dBFS) used as the idle baseline for a
73/// `ChannelStats` `level_db` field. Below this value is
74/// effectively the noise floor; above it indicates active
75/// RF energy. Single source of truth so `ChannelStats::default()`
76/// and `ChannelBank::new`'s per-channel initialization stay
77/// in lockstep.
78pub const NO_SIGNAL_FLOOR_DB: f32 = -120.0;
79
80/// Per-channel statistics for the UI panel and CLI status.
81#[derive(Clone, Copy, Debug)]
82pub struct ChannelStats {
83    /// Channel center frequency (Hz).
84    pub freq_hz: f64,
85    /// Wall-clock time of the most recent decoded message.
86    pub last_msg_at: Option<std::time::SystemTime>,
87    /// Total messages decoded on this channel since startup.
88    pub msg_count: u32,
89    /// Most-recent level estimate (dB). Reported but not yet
90    /// computed — placeholder until level metering lands.
91    pub level_db: f32,
92    /// Three-state lock indicator for the sidebar glyph.
93    pub lock_state: ChannelLockState,
94}
95
96impl Default for ChannelStats {
97    /// Idle baseline — matches `ChannelBank::new`'s
98    /// per-channel initialization. `level_db` defaults to
99    /// [`NO_SIGNAL_FLOOR_DB`] (the dBFS noise floor), NOT 0.0
100    /// which would inaccurately read as a strong present
101    /// signal in any UI gauge consuming the field.
102    fn default() -> Self {
103        Self {
104            freq_hz: 0.0,
105            last_msg_at: None,
106            msg_count: 0,
107            level_db: NO_SIGNAL_FLOOR_DB,
108            lock_state: ChannelLockState::Idle,
109        }
110    }
111}
112
113/// Three-state indicator for the sidebar glyph (●/○/⚠).
114#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
115pub enum ChannelLockState {
116    /// No RF energy detected.
117    #[default]
118    Idle,
119    /// RF energy present but no decoded frames within the
120    /// recent window.
121    Signal,
122    /// Recent frames decoded successfully.
123    Locked,
124}
125
126/// Multi-channel orchestrator. One source-rate IQ stream feeds
127/// N narrowband channels in parallel.
128pub struct ChannelBank {
129    channels: Vec<Channel>,
130    stats: Vec<ChannelStats>,
131}
132
133impl ChannelBank {
134    /// Build a bank for `channels` (Hz), where the source IQ is
135    /// at `source_rate_hz` centered on `center_hz`. Source rate
136    /// must be an integer multiple of [`IF_RATE_HZ`] (12500 Hz).
137    /// Each channel's offset from `center_hz` must fit within
138    /// the source bandwidth (`±source_rate_hz / 2`).
139    ///
140    /// # Errors
141    ///
142    /// - [`AcarsError::InvalidChannelConfig`] if the channel
143    ///   list is empty or any channel falls outside the source
144    ///   bandwidth.
145    /// - [`AcarsError::NonIntegerDecimation`] if
146    ///   `source_rate_hz` is not an integer multiple of
147    ///   [`IF_RATE_HZ`].
148    pub fn new(source_rate_hz: f64, center_hz: f64, channels: &[f64]) -> Result<Self, AcarsError> {
149        if channels.is_empty() {
150            return Err(AcarsError::InvalidChannelConfig(
151                "channel list is empty".into(),
152            ));
153        }
154        let if_rate = f64::from(IF_RATE_HZ);
155        // Reject zero / negative / NaN source rates up front.
156        // Without this guard, `source_rate_hz == 0.0` passes the
157        // integer-multiple check (decim_f = 0.0, fract = 0.0),
158        // produces `decim_factor == 0`, builds a zero-length
159        // oscillator, and crashes `process()` on the first sample
160        // access. Library-crate rule: surface as a typed error,
161        // not a deferred panic.
162        if !source_rate_hz.is_finite() || source_rate_hz <= 0.0 {
163            return Err(AcarsError::InvalidChannelConfig(format!(
164                "source rate {source_rate_hz} Hz must be finite and positive"
165            )));
166        }
167        let decim_f = source_rate_hz / if_rate;
168        if decim_f.fract().abs() > 1e-6 {
169            return Err(AcarsError::NonIntegerDecimation {
170                source_rate_hz,
171                if_rate_hz: if_rate,
172            });
173        }
174        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
175        let decim_factor = decim_f.round() as u32;
176
177        let mut built = Vec::with_capacity(channels.len());
178        let mut stats = Vec::with_capacity(channels.len());
179        for (idx, &freq_hz) in channels.iter().enumerate() {
180            let offset_hz = freq_hz - center_hz;
181            // Channel must fit in source bandwidth (Nyquist).
182            if offset_hz.abs() > source_rate_hz / 2.0 {
183                return Err(AcarsError::InvalidChannelConfig(format!(
184                    "channel {freq_hz} Hz outside source bandwidth ({source_rate_hz} Hz centered on {center_hz} Hz)"
185                )));
186            }
187            // Build the oscillator: complex exp at -offset_hz,
188            // sampled at the source rate. Length = decim_factor
189            // (one decim period). The "free running" extension
190            // is (osc_idx + n) mod decim_factor — see
191            // `process()`.
192            let mut oscillator = Vec::with_capacity(decim_factor as usize);
193            for n in 0..decim_factor {
194                let phase =
195                    -2.0 * core::f64::consts::PI * offset_hz * f64::from(n) / source_rate_hz;
196                #[allow(clippy::cast_possible_truncation)]
197                oscillator.push(Complex32::new(phase.cos() as f32, phase.sin() as f32));
198            }
199            #[allow(clippy::cast_possible_truncation)]
200            let idx_u8 = idx as u8;
201            built.push(Channel {
202                oscillator,
203                osc_idx: 0,
204                accum: Complex32::new(0.0, 0.0),
205                decim_count: 0,
206                decim_factor,
207                if_buffer: Vec::with_capacity(4096),
208                msk: MskDemod::new(),
209                parser: FrameParser::new(idx_u8, freq_hz),
210                assembler: crate::reassembly::MessageAssembler::new(),
211            });
212            stats.push(ChannelStats {
213                freq_hz,
214                last_msg_at: None,
215                msg_count: 0,
216                level_db: NO_SIGNAL_FLOOR_DB,
217                lock_state: ChannelLockState::Idle,
218            });
219        }
220        Ok(Self {
221            channels: built,
222            stats,
223        })
224    }
225
226    /// Drain `iq` through every channel's pipeline, emitting
227    /// any decoded messages via `on_message`. Mirrors
228    /// `rtl.c::in_callback`'s per-block accumulator loop, then
229    /// drives MSK + frame parsing per channel. The polarity-
230    /// flip handshake (`FrameParser::take_polarity_flip` →
231    /// `MskDemod::toggle_polarity`) handles 180° phase slip
232    /// detected on inverted-SYN.
233    pub fn process<F: FnMut(AcarsMessage)>(&mut self, iq: &[Complex32], mut on_message: F) {
234        for (idx, ch) in self.channels.iter_mut().enumerate() {
235            ch.if_buffer.clear();
236            for &sample in iq {
237                let osc = ch.oscillator[ch.osc_idx];
238                ch.osc_idx = (ch.osc_idx + 1) % ch.oscillator.len();
239                ch.accum += sample * osc;
240                ch.decim_count += 1;
241                if ch.decim_count >= ch.decim_factor {
242                    // AM-detect: magnitude of the complex
243                    // accumulator (matches C `cabsf(D)`).
244                    let am_sample = ch.accum.norm();
245                    ch.if_buffer.push(am_sample);
246                    ch.accum = Complex32::new(0.0, 0.0);
247                    ch.decim_count = 0;
248                }
249            }
250            // Drive the MSK demod with the decimated IF.
251            ch.msk.process(&ch.if_buffer, &mut ch.parser);
252            // Drain any complete bytes accumulated in the
253            // parser. Stamp live stats (msg_count, last_msg_at,
254            // level_db) on every emitted message so
255            // `channels()` reflects real state, not the
256            // construction placeholders.
257            //
258            // Each parser-emitted frame flows through the
259            // per-channel `MessageAssembler`. ETBs
260            // park silently (the assembler returns 0 messages);
261            // ETXs emit the reassembled message + any timed-out
262            // partial sweeps. Stats are stamped once per
263            // assembler-emitted message so a multi-block
264            // reassembly counts as one in `msg_count`.
265            let stats = &mut self.stats[idx];
266            ch.parser.drain(|msg| {
267                let now = msg.timestamp;
268                for mut emitted in ch.assembler.observe(msg, now) {
269                    stats.msg_count = stats.msg_count.saturating_add(1);
270                    stats.last_msg_at = Some(emitted.timestamp);
271                    // level_db on the message is currently 0.0
272                    // (a future enhancement will fill it from the
273                    // MSK matched-filter output); keep stats.level_db
274                    // pinned to the latest emitted value so the
275                    // contract stays "stats reflect the latest
276                    // decoded message" once levels start landing.
277                    stats.level_db = emitted.level_db;
278                    stats.lock_state = ChannelLockState::Locked;
279                    // Populate OOOI metadata after reassembly so the
280                    // parser sees the full concatenated text for
281                    // multi-block messages.
282                    emitted.parsed =
283                        crate::label_parsers::decode_label(emitted.label, &emitted.text);
284                    on_message(emitted);
285                }
286            });
287            // Drive timeout emission on every IQ-block tick, not
288            // just when the parser produces a frame. A channel
289            // that observed one ETB and then went silent would
290            // never get an `observe()` call to internally sweep
291            // — the partial reassembly would stay parked
292            // indefinitely. The check is cheap (HashMap
293            // iteration capped at MAX_PENDING_MESSAGES) and
294            // runs at IQ-block cadence, well above the 30 s
295            // recency window.
296            for mut emitted in ch.assembler.drain_timeouts(std::time::SystemTime::now()) {
297                stats.msg_count = stats.msg_count.saturating_add(1);
298                stats.last_msg_at = Some(emitted.timestamp);
299                stats.level_db = emitted.level_db;
300                stats.lock_state = ChannelLockState::Locked;
301                // Populate OOOI metadata after reassembly so the
302                // parser sees the full concatenated text for
303                // multi-block messages.
304                emitted.parsed = crate::label_parsers::decode_label(emitted.label, &emitted.text);
305                on_message(emitted);
306            }
307            // Apply pending polarity flip if the parser
308            // detected an inverted-SYN at frame start
309            // (`acars.c:259,274`).
310            if ch.parser.take_polarity_flip() {
311                ch.msk.toggle_polarity();
312            }
313        }
314        // Stats refresh (level, lock state) lands later — for
315        // now we leave `stats` static post-construction. The
316        // field is kept reachable via `channels()` so consumers
317        // can already enumerate per-channel frequencies.
318        let _ = &self.stats;
319    }
320
321    /// Snapshot of per-channel stats.
322    #[must_use]
323    pub fn channels(&self) -> &[ChannelStats] {
324        &self.stats
325    }
326}
327
328#[cfg(test)]
329#[allow(clippy::unwrap_used, clippy::panic)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn rejects_empty_channel_list() {
335        // `unwrap_err` would require `ChannelBank: Debug`; use
336        // a `match` on the result so we don't have to push a
337        // Debug derive into the `Channel` substructs (which
338        // contain MskDemod / FrameParser — neither of which
339        // currently derives Debug, and that's an unrelated
340        // decision to this task).
341        match ChannelBank::new(2_400_000.0, 130_450_000.0, &[]) {
342            Err(AcarsError::InvalidChannelConfig(_)) => {}
343            Err(other) => panic!("expected InvalidChannelConfig, got {other:?}"),
344            Ok(_) => panic!("expected InvalidChannelConfig, got Ok"),
345        }
346    }
347
348    #[test]
349    fn rejects_zero_or_negative_source_rate() {
350        // Zero/negative/NaN rates
351        // would silently produce decim_factor=0 and crash later.
352        for bad in [0.0, -1.0, f64::NAN, f64::INFINITY] {
353            match ChannelBank::new(bad, 130_337_500.0, &[131_550_000.0]) {
354                Err(AcarsError::InvalidChannelConfig(_)) => {}
355                Err(other) => panic!("rate={bad}: expected InvalidChannelConfig, got {other:?}"),
356                Ok(_) => panic!("rate={bad}: expected error, got Ok"),
357            }
358        }
359    }
360
361    #[test]
362    fn rejects_non_integer_decimation() {
363        match ChannelBank::new(2_400_001.0, 130_450_000.0, &[131_550_000.0]) {
364            Err(AcarsError::NonIntegerDecimation { .. }) => {}
365            Err(other) => panic!("expected NonIntegerDecimation, got {other:?}"),
366            Ok(_) => panic!("expected NonIntegerDecimation, got Ok"),
367        }
368    }
369
370    #[test]
371    fn rejects_channel_outside_source_bandwidth() {
372        // 200 MHz is well outside a 2.4 MHz window centered on
373        // 130.45 MHz.
374        match ChannelBank::new(2_400_000.0, 130_450_000.0, &[200_000_000.0]) {
375            Err(AcarsError::InvalidChannelConfig(_)) => {}
376            Err(other) => panic!("expected InvalidChannelConfig, got {other:?}"),
377            Ok(_) => panic!("expected InvalidChannelConfig, got Ok"),
378        }
379    }
380
381    #[test]
382    fn accepts_valid_us_six_config() {
383        // The US-6 set spans 129.125–131.550 MHz = 2.425 MHz —
384        // wider than 2.4 MHz, so we use 2.5 MHz (decim_factor
385        // = 200, integer multiple of 12.5 kHz; supported
386        // natively by RTL-SDR). Center on the midpoint of the
387        // extremes (130.3375 MHz) so the lowest channel's
388        // offset of -1.2125 MHz fits the ±1.25 MHz window.
389        // This mirrors `chooseFc`'s placement in
390        // acarsdec's `rtl.c:131-165`.
391        let bank = match ChannelBank::new(
392            2_500_000.0,
393            130_337_500.0,
394            &[
395                129_125_000.0,
396                130_025_000.0,
397                130_425_000.0,
398                130_450_000.0,
399                131_525_000.0,
400                131_550_000.0,
401            ],
402        ) {
403            Ok(b) => b,
404            Err(e) => panic!("expected Ok, got {e:?}"),
405        };
406        assert_eq!(bank.channels().len(), 6);
407        assert!((bank.channels()[0].freq_hz - 129_125_000.0).abs() < f64::EPSILON);
408    }
409
410    #[test]
411    fn process_silent_iq_doesnt_decode() {
412        let mut bank = match ChannelBank::new(2_500_000.0, 130_450_000.0, &[131_550_000.0]) {
413            Ok(b) => b,
414            Err(e) => panic!("expected Ok, got {e:?}"),
415        };
416        let silent = vec![Complex32::new(0.0, 0.0); 2500];
417        bank.process(&silent, |_msg| {
418            panic!("silence shouldn't produce messages");
419        });
420    }
421}