sdr_acars/channel.rs
1//! Multi-channel ACARS decoder. Source-rate complex IQ feeds
2//! N parallel per-channel pipelines (oscillator + decimator
3//! → AM detect → MSK demod → frame parser).
4//!
5//! Faithful port of acarsdec's `rtl.c` per-channel
6//! decimation — the IQ-fork pattern. Single-threaded inline
7//! processing per `process()` call; no internal threads, no
8//! mutex.
9//!
10//! # Magnitude-calibration deviation
11//!
12//! The C divides each `wf[ind]` by `rtlMult` (= `decim_factor`)
13//! AND by `127.5` (RTL-SDR's u8 sample normalization). We do
14//! NEITHER — our IQ source produces pre-normalized
15//! `Complex<f32>` (no `/127.5` needed), and skipping
16//! `/decim_factor` scales `accum.norm()` up by a constant
17//! factor of `decim_factor`. This affects ONLY the level-dB
18//! metadata reported per message (volatile in the e2e diff,
19//! stripped before comparing); decode correctness is
20//! unaffected because [`crate::msk::MskDemod`] normalizes the
21//! matched-filter output internally.
22
23use num_complex::Complex32;
24
25use crate::error::AcarsError;
26use crate::frame::{AcarsMessage, FrameParser};
27use crate::msk::{IF_RATE_HZ, MskDemod};
28
29/// Per-channel state. Owns its oscillator, decimator
30/// accumulator, MSK demod, and frame parser. Private — only
31/// [`ChannelBank`] composes one.
32struct Channel {
33 /// Pre-computed complex exponential at `-offset_hz`,
34 /// sampled at source rate. Length = `decim_factor`. The
35 /// "free running" oscillator extension uses
36 /// `(osc_idx + n) mod decim_factor` wrap-around — at the
37 /// instant `decim_count` reaches `decim_factor`,
38 /// `osc_idx` has also wrapped back to 0, matching the
39 /// C's per-block `for (ind = 0; ind < rtlMult; ind++)`
40 /// init in `rtl.c::in_callback`.
41 oscillator: Vec<Complex32>,
42 /// Where in `oscillator` we are this block. Persists
43 /// across `process()` calls so the oscillator is
44 /// continuous across IQ-block boundaries.
45 osc_idx: usize,
46 /// Decimation accumulator state. Mirrors the C's local
47 /// `float complex D` in `rtl.c::in_callback`, but lifted
48 /// to per-Channel state so it survives partial blocks.
49 accum: Complex32,
50 /// Counter within the current decim period.
51 decim_count: u32,
52 /// Decimation factor (`source_rate / IF_RATE_HZ`). Mirrors
53 /// C `rtlMult`.
54 decim_factor: u32,
55 /// Buffer of decimated IF samples (AM-detected real
56 /// `f32`) to feed into [`MskDemod::process`]. Cleared at
57 /// the start of each `ChannelBank::process` call.
58 if_buffer: Vec<f32>,
59 msk: MskDemod,
60 parser: FrameParser,
61 /// Per-channel multi-block reassembler.
62 /// Frames emerging from `parser.drain` flow through here
63 /// before reaching `process`'s `on_message` callback —
64 /// ETB blocks park; ETX blocks emit reassembled (or
65 /// pass-through if no pending). Per-channel rather than
66 /// shared because aircraft don't normally hop channels
67 /// mid-message and channel-isolated state machines are
68 /// simpler to reason about.
69 assembler: crate::reassembly::MessageAssembler,
70}
71
72/// No-signal floor (dBFS) used as the idle baseline for a
73/// `ChannelStats` `level_db` field. Below this value is
74/// effectively the noise floor; above it indicates active
75/// RF energy. Single source of truth so `ChannelStats::default()`
76/// and `ChannelBank::new`'s per-channel initialization stay
77/// in lockstep.
78pub const NO_SIGNAL_FLOOR_DB: f32 = -120.0;
79
80/// Per-channel statistics for the UI panel and CLI status.
81#[derive(Clone, Copy, Debug)]
82pub struct ChannelStats {
83 /// Channel center frequency (Hz).
84 pub freq_hz: f64,
85 /// Wall-clock time of the most recent decoded message.
86 pub last_msg_at: Option<std::time::SystemTime>,
87 /// Total messages decoded on this channel since startup.
88 pub msg_count: u32,
89 /// Most-recent level estimate (dB). Reported but not yet
90 /// computed — placeholder until level metering lands.
91 pub level_db: f32,
92 /// Three-state lock indicator for the sidebar glyph.
93 pub lock_state: ChannelLockState,
94}
95
96impl Default for ChannelStats {
97 /// Idle baseline — matches `ChannelBank::new`'s
98 /// per-channel initialization. `level_db` defaults to
99 /// [`NO_SIGNAL_FLOOR_DB`] (the dBFS noise floor), NOT 0.0
100 /// which would inaccurately read as a strong present
101 /// signal in any UI gauge consuming the field.
102 fn default() -> Self {
103 Self {
104 freq_hz: 0.0,
105 last_msg_at: None,
106 msg_count: 0,
107 level_db: NO_SIGNAL_FLOOR_DB,
108 lock_state: ChannelLockState::Idle,
109 }
110 }
111}
112
113/// Three-state indicator for the sidebar glyph (●/○/⚠).
114#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
115pub enum ChannelLockState {
116 /// No RF energy detected.
117 #[default]
118 Idle,
119 /// RF energy present but no decoded frames within the
120 /// recent window.
121 Signal,
122 /// Recent frames decoded successfully.
123 Locked,
124}
125
126/// Multi-channel orchestrator. One source-rate IQ stream feeds
127/// N narrowband channels in parallel.
128pub struct ChannelBank {
129 channels: Vec<Channel>,
130 stats: Vec<ChannelStats>,
131}
132
133impl ChannelBank {
134 /// Build a bank for `channels` (Hz), where the source IQ is
135 /// at `source_rate_hz` centered on `center_hz`. Source rate
136 /// must be an integer multiple of [`IF_RATE_HZ`] (12500 Hz).
137 /// Each channel's offset from `center_hz` must fit within
138 /// the source bandwidth (`±source_rate_hz / 2`).
139 ///
140 /// # Errors
141 ///
142 /// - [`AcarsError::InvalidChannelConfig`] if the channel
143 /// list is empty or any channel falls outside the source
144 /// bandwidth.
145 /// - [`AcarsError::NonIntegerDecimation`] if
146 /// `source_rate_hz` is not an integer multiple of
147 /// [`IF_RATE_HZ`].
148 pub fn new(source_rate_hz: f64, center_hz: f64, channels: &[f64]) -> Result<Self, AcarsError> {
149 if channels.is_empty() {
150 return Err(AcarsError::InvalidChannelConfig(
151 "channel list is empty".into(),
152 ));
153 }
154 let if_rate = f64::from(IF_RATE_HZ);
155 // Reject zero / negative / NaN source rates up front.
156 // Without this guard, `source_rate_hz == 0.0` passes the
157 // integer-multiple check (decim_f = 0.0, fract = 0.0),
158 // produces `decim_factor == 0`, builds a zero-length
159 // oscillator, and crashes `process()` on the first sample
160 // access. Library-crate rule: surface as a typed error,
161 // not a deferred panic.
162 if !source_rate_hz.is_finite() || source_rate_hz <= 0.0 {
163 return Err(AcarsError::InvalidChannelConfig(format!(
164 "source rate {source_rate_hz} Hz must be finite and positive"
165 )));
166 }
167 let decim_f = source_rate_hz / if_rate;
168 if decim_f.fract().abs() > 1e-6 {
169 return Err(AcarsError::NonIntegerDecimation {
170 source_rate_hz,
171 if_rate_hz: if_rate,
172 });
173 }
174 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
175 let decim_factor = decim_f.round() as u32;
176
177 let mut built = Vec::with_capacity(channels.len());
178 let mut stats = Vec::with_capacity(channels.len());
179 for (idx, &freq_hz) in channels.iter().enumerate() {
180 let offset_hz = freq_hz - center_hz;
181 // Channel must fit in source bandwidth (Nyquist).
182 if offset_hz.abs() > source_rate_hz / 2.0 {
183 return Err(AcarsError::InvalidChannelConfig(format!(
184 "channel {freq_hz} Hz outside source bandwidth ({source_rate_hz} Hz centered on {center_hz} Hz)"
185 )));
186 }
187 // Build the oscillator: complex exp at -offset_hz,
188 // sampled at the source rate. Length = decim_factor
189 // (one decim period). The "free running" extension
190 // is (osc_idx + n) mod decim_factor — see
191 // `process()`.
192 let mut oscillator = Vec::with_capacity(decim_factor as usize);
193 for n in 0..decim_factor {
194 let phase =
195 -2.0 * core::f64::consts::PI * offset_hz * f64::from(n) / source_rate_hz;
196 #[allow(clippy::cast_possible_truncation)]
197 oscillator.push(Complex32::new(phase.cos() as f32, phase.sin() as f32));
198 }
199 #[allow(clippy::cast_possible_truncation)]
200 let idx_u8 = idx as u8;
201 built.push(Channel {
202 oscillator,
203 osc_idx: 0,
204 accum: Complex32::new(0.0, 0.0),
205 decim_count: 0,
206 decim_factor,
207 if_buffer: Vec::with_capacity(4096),
208 msk: MskDemod::new(),
209 parser: FrameParser::new(idx_u8, freq_hz),
210 assembler: crate::reassembly::MessageAssembler::new(),
211 });
212 stats.push(ChannelStats {
213 freq_hz,
214 last_msg_at: None,
215 msg_count: 0,
216 level_db: NO_SIGNAL_FLOOR_DB,
217 lock_state: ChannelLockState::Idle,
218 });
219 }
220 Ok(Self {
221 channels: built,
222 stats,
223 })
224 }
225
226 /// Drain `iq` through every channel's pipeline, emitting
227 /// any decoded messages via `on_message`. Mirrors
228 /// `rtl.c::in_callback`'s per-block accumulator loop, then
229 /// drives MSK + frame parsing per channel. The polarity-
230 /// flip handshake (`FrameParser::take_polarity_flip` →
231 /// `MskDemod::toggle_polarity`) handles 180° phase slip
232 /// detected on inverted-SYN.
233 pub fn process<F: FnMut(AcarsMessage)>(&mut self, iq: &[Complex32], mut on_message: F) {
234 for (idx, ch) in self.channels.iter_mut().enumerate() {
235 ch.if_buffer.clear();
236 for &sample in iq {
237 let osc = ch.oscillator[ch.osc_idx];
238 ch.osc_idx = (ch.osc_idx + 1) % ch.oscillator.len();
239 ch.accum += sample * osc;
240 ch.decim_count += 1;
241 if ch.decim_count >= ch.decim_factor {
242 // AM-detect: magnitude of the complex
243 // accumulator (matches C `cabsf(D)`).
244 let am_sample = ch.accum.norm();
245 ch.if_buffer.push(am_sample);
246 ch.accum = Complex32::new(0.0, 0.0);
247 ch.decim_count = 0;
248 }
249 }
250 // Drive the MSK demod with the decimated IF.
251 ch.msk.process(&ch.if_buffer, &mut ch.parser);
252 // Drain any complete bytes accumulated in the
253 // parser. Stamp live stats (msg_count, last_msg_at,
254 // level_db) on every emitted message so
255 // `channels()` reflects real state, not the
256 // construction placeholders.
257 //
258 // Each parser-emitted frame flows through the
259 // per-channel `MessageAssembler`. ETBs
260 // park silently (the assembler returns 0 messages);
261 // ETXs emit the reassembled message + any timed-out
262 // partial sweeps. Stats are stamped once per
263 // assembler-emitted message so a multi-block
264 // reassembly counts as one in `msg_count`.
265 let stats = &mut self.stats[idx];
266 ch.parser.drain(|msg| {
267 let now = msg.timestamp;
268 for mut emitted in ch.assembler.observe(msg, now) {
269 stats.msg_count = stats.msg_count.saturating_add(1);
270 stats.last_msg_at = Some(emitted.timestamp);
271 // level_db on the message is currently 0.0
272 // (a future enhancement will fill it from the
273 // MSK matched-filter output); keep stats.level_db
274 // pinned to the latest emitted value so the
275 // contract stays "stats reflect the latest
276 // decoded message" once levels start landing.
277 stats.level_db = emitted.level_db;
278 stats.lock_state = ChannelLockState::Locked;
279 // Populate OOOI metadata after reassembly so the
280 // parser sees the full concatenated text for
281 // multi-block messages.
282 emitted.parsed =
283 crate::label_parsers::decode_label(emitted.label, &emitted.text);
284 on_message(emitted);
285 }
286 });
287 // Drive timeout emission on every IQ-block tick, not
288 // just when the parser produces a frame. A channel
289 // that observed one ETB and then went silent would
290 // never get an `observe()` call to internally sweep
291 // — the partial reassembly would stay parked
292 // indefinitely. The check is cheap (HashMap
293 // iteration capped at MAX_PENDING_MESSAGES) and
294 // runs at IQ-block cadence, well above the 30 s
295 // recency window.
296 for mut emitted in ch.assembler.drain_timeouts(std::time::SystemTime::now()) {
297 stats.msg_count = stats.msg_count.saturating_add(1);
298 stats.last_msg_at = Some(emitted.timestamp);
299 stats.level_db = emitted.level_db;
300 stats.lock_state = ChannelLockState::Locked;
301 // Populate OOOI metadata after reassembly so the
302 // parser sees the full concatenated text for
303 // multi-block messages.
304 emitted.parsed = crate::label_parsers::decode_label(emitted.label, &emitted.text);
305 on_message(emitted);
306 }
307 // Apply pending polarity flip if the parser
308 // detected an inverted-SYN at frame start
309 // (`acars.c:259,274`).
310 if ch.parser.take_polarity_flip() {
311 ch.msk.toggle_polarity();
312 }
313 }
314 // Stats refresh (level, lock state) lands later — for
315 // now we leave `stats` static post-construction. The
316 // field is kept reachable via `channels()` so consumers
317 // can already enumerate per-channel frequencies.
318 let _ = &self.stats;
319 }
320
321 /// Snapshot of per-channel stats.
322 #[must_use]
323 pub fn channels(&self) -> &[ChannelStats] {
324 &self.stats
325 }
326}
327
328#[cfg(test)]
329#[allow(clippy::unwrap_used, clippy::panic)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn rejects_empty_channel_list() {
335 // `unwrap_err` would require `ChannelBank: Debug`; use
336 // a `match` on the result so we don't have to push a
337 // Debug derive into the `Channel` substructs (which
338 // contain MskDemod / FrameParser — neither of which
339 // currently derives Debug, and that's an unrelated
340 // decision to this task).
341 match ChannelBank::new(2_400_000.0, 130_450_000.0, &[]) {
342 Err(AcarsError::InvalidChannelConfig(_)) => {}
343 Err(other) => panic!("expected InvalidChannelConfig, got {other:?}"),
344 Ok(_) => panic!("expected InvalidChannelConfig, got Ok"),
345 }
346 }
347
348 #[test]
349 fn rejects_zero_or_negative_source_rate() {
350 // Zero/negative/NaN rates
351 // would silently produce decim_factor=0 and crash later.
352 for bad in [0.0, -1.0, f64::NAN, f64::INFINITY] {
353 match ChannelBank::new(bad, 130_337_500.0, &[131_550_000.0]) {
354 Err(AcarsError::InvalidChannelConfig(_)) => {}
355 Err(other) => panic!("rate={bad}: expected InvalidChannelConfig, got {other:?}"),
356 Ok(_) => panic!("rate={bad}: expected error, got Ok"),
357 }
358 }
359 }
360
361 #[test]
362 fn rejects_non_integer_decimation() {
363 match ChannelBank::new(2_400_001.0, 130_450_000.0, &[131_550_000.0]) {
364 Err(AcarsError::NonIntegerDecimation { .. }) => {}
365 Err(other) => panic!("expected NonIntegerDecimation, got {other:?}"),
366 Ok(_) => panic!("expected NonIntegerDecimation, got Ok"),
367 }
368 }
369
370 #[test]
371 fn rejects_channel_outside_source_bandwidth() {
372 // 200 MHz is well outside a 2.4 MHz window centered on
373 // 130.45 MHz.
374 match ChannelBank::new(2_400_000.0, 130_450_000.0, &[200_000_000.0]) {
375 Err(AcarsError::InvalidChannelConfig(_)) => {}
376 Err(other) => panic!("expected InvalidChannelConfig, got {other:?}"),
377 Ok(_) => panic!("expected InvalidChannelConfig, got Ok"),
378 }
379 }
380
381 #[test]
382 fn accepts_valid_us_six_config() {
383 // The US-6 set spans 129.125–131.550 MHz = 2.425 MHz —
384 // wider than 2.4 MHz, so we use 2.5 MHz (decim_factor
385 // = 200, integer multiple of 12.5 kHz; supported
386 // natively by RTL-SDR). Center on the midpoint of the
387 // extremes (130.3375 MHz) so the lowest channel's
388 // offset of -1.2125 MHz fits the ±1.25 MHz window.
389 // This mirrors `chooseFc`'s placement in
390 // acarsdec's `rtl.c:131-165`.
391 let bank = match ChannelBank::new(
392 2_500_000.0,
393 130_337_500.0,
394 &[
395 129_125_000.0,
396 130_025_000.0,
397 130_425_000.0,
398 130_450_000.0,
399 131_525_000.0,
400 131_550_000.0,
401 ],
402 ) {
403 Ok(b) => b,
404 Err(e) => panic!("expected Ok, got {e:?}"),
405 };
406 assert_eq!(bank.channels().len(), 6);
407 assert!((bank.channels()[0].freq_hz - 129_125_000.0).abs() < f64::EPSILON);
408 }
409
410 #[test]
411 fn process_silent_iq_doesnt_decode() {
412 let mut bank = match ChannelBank::new(2_500_000.0, 130_450_000.0, &[131_550_000.0]) {
413 Ok(b) => b,
414 Err(e) => panic!("expected Ok, got {e:?}"),
415 };
416 let silent = vec![Complex32::new(0.0, 0.0); 2500];
417 bank.process(&silent, |_msg| {
418 panic!("silence shouldn't produce messages");
419 });
420 }
421}