use std::collections::HashMap;
use std::f64::consts::PI;
use std::net::SocketAddr;
use web_time::{Duration, Instant};
use num_complex::Complex;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand_distr::{Distribution, Normal};
pub use radio_utils_protocol::{
p1_code_to_sample_rate as code_to_sample_rate, pack_iq_24bit_into,
sample_rate_to_p1_code as sample_rate_to_code, HpsdrHw, SAMPLE_RATES_P1,
};
pub struct HwInfo {
pub hw: HpsdrHw,
pub mac: [u8; 6],
pub firmware_version: u8,
pub mercury_versions: [u8; 4],
pub penny_version: u8,
pub metis_version: u8,
}
impl HwInfo {
pub fn new(hw: HpsdrHw, mac: [u8; 6]) -> Self {
Self {
hw,
mac,
firmware_version: 25,
mercury_versions: [25, 25, 25, 25],
penny_version: 25,
metis_version: 25,
}
}
pub fn mac_string(&self) -> String {
self.mac
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join(":")
}
pub fn random_mac() -> [u8; 6] {
let mut rng = StdRng::from_os_rng();
let mut mac = [0u8; 6];
rng.fill(&mut mac);
mac[0] = (mac[0] | 0x02) & 0xFE; mac
}
}
pub struct SignalGenerator {
pub sample_rate: u32,
pub noise_level: f64,
normal: Normal<f64>,
rng: StdRng,
}
impl SignalGenerator {
pub fn new(sample_rate: u32, noise_level: f64) -> Self {
Self {
sample_rate,
noise_level,
normal: Normal::new(0.0, 1.0).unwrap(),
rng: StdRng::from_os_rng(),
}
}
pub fn add_noise_into(&mut self, out: &mut [Complex<f64>]) {
if self.noise_level <= 0.0 {
return;
}
for s in out.iter_mut() {
*s += Complex::new(
self.normal.sample(&mut self.rng) * self.noise_level,
self.normal.sample(&mut self.rng) * self.noise_level,
);
}
}
}
const ECHO_ATTENUATION_DB: f64 = 30.0;
const LIVE_DELAY: usize = 1024;
const LOOP_TAIL_SILENCE_SEC: f64 = 0.5;
const LOOP_SESSION_GAP: Duration = Duration::from_secs(30);
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum EchoMode {
Loop,
Live,
}
struct LiveBuffer {
data: Vec<Complex<f64>>,
drained: usize,
epoch: u64,
max_samples: usize,
client_offsets: HashMap<SocketAddr, usize>,
}
impl LiveBuffer {
fn new(max_samples: usize) -> Self {
Self {
data: Vec::new(),
drained: 0,
epoch: 0,
max_samples,
client_offsets: HashMap::new(),
}
}
fn restart_with_epoch(&mut self, epoch: u64) {
self.data.clear();
self.drained = 0;
self.epoch = epoch;
self.client_offsets.clear();
}
fn start_client(&mut self, client_id: SocketAddr) {
let abs_tail = self.drained + self.data.len();
self.client_offsets.insert(client_id, abs_tail);
}
fn stop_client(&mut self, client_id: SocketAddr) {
self.client_offsets.remove(&client_id);
}
fn feed_from(&mut self, client_id: SocketAddr, samples: &[Complex<f64>]) {
if samples.is_empty() {
return;
}
let Some(abs_offset) = self.client_offsets.get_mut(&client_id) else {
return;
};
if *abs_offset < self.drained {
*abs_offset = self.drained;
}
let cur_abs = *abs_offset;
let rel_offset = cur_abs - self.drained;
let needed = rel_offset + samples.len();
if self.data.len() < needed {
self.data.resize(needed, Complex::new(0.0, 0.0));
}
for (i, &s) in samples.iter().enumerate() {
self.data[rel_offset + i] += s;
}
*abs_offset = cur_abs + samples.len();
if self.data.len() > self.max_samples {
let excess = self.data.len() - self.max_samples;
self.data.copy_within(excess.., 0);
self.data.truncate(self.max_samples);
self.drained += excess;
}
}
}
#[derive(Default)]
pub struct EchoPlaybackState {
playback_pos: HashMap<u32, usize>,
shift_phase: HashMap<u32, f64>,
live_pos: HashMap<u32, usize>,
live_shift_phase: HashMap<u32, f64>,
live_epoch: HashMap<u32, u64>,
scratch: Vec<Complex<f64>>,
}
impl EchoPlaybackState {
pub fn new() -> Self {
Self::default()
}
}
struct FreqRecorder {
active_count: u32,
sample_rate: u32,
recording: Vec<Complex<f64>>,
last_ptt_off: Option<Instant>,
}
pub struct EchoBuffer {
pub max_duration: f64,
mode: EchoMode,
attenuation: f64,
echoes: HashMap<u32, Vec<Complex<f64>>>,
recorders: HashMap<u32, FreqRecorder>,
live: HashMap<u32, LiveBuffer>,
next_epoch: u64,
}
impl EchoBuffer {
pub fn new(mode: EchoMode) -> Self {
let attenuation = 10.0_f64.powf(-ECHO_ATTENUATION_DB / 20.0);
Self {
max_duration: 10.0,
mode,
attenuation,
echoes: HashMap::new(),
recorders: HashMap::new(),
live: HashMap::new(),
next_epoch: 1,
}
}
pub fn mode(&self) -> EchoMode {
self.mode
}
pub fn has_data(&self) -> bool {
match self.mode {
EchoMode::Loop => !self.echoes.is_empty(),
EchoMode::Live => !self.live.is_empty(),
}
}
pub fn start_recording(&mut self, freq: u32, client_id: SocketAddr, sample_rate: u32) {
let mode = self.mode;
let max_duration = self.max_duration;
let rec = self.recorders.entry(freq).or_insert_with(|| FreqRecorder {
active_count: 0,
sample_rate,
recording: Vec::new(),
last_ptt_off: None,
});
if rec.active_count == 0 {
rec.sample_rate = sample_rate;
let resume = mode == EchoMode::Loop
&& rec
.last_ptt_off
.is_some_and(|t| t.elapsed() < LOOP_SESSION_GAP);
if resume {
let elapsed = rec.last_ptt_off.unwrap().elapsed();
let gap_samples = (elapsed.as_secs_f64() * sample_rate as f64) as usize;
let max_samples = (sample_rate as f64 * max_duration) as usize;
let pad_len = gap_samples.min(max_samples.saturating_sub(rec.recording.len()));
rec.recording
.extend(std::iter::repeat_n(Complex::new(0.0, 0.0), pad_len));
log::info!(
"Echo: resuming recording on {} Hz (+{} ms silence, total {:.2}s)",
freq,
elapsed.as_millis(),
rec.recording.len() as f64 / sample_rate as f64,
);
} else {
rec.recording.clear();
}
if mode == EchoMode::Live {
let max_samples = (sample_rate as f64 * self.max_duration) as usize;
let epoch = self.next_epoch;
self.next_epoch += 1;
let live_buf = self
.live
.entry(freq)
.or_insert_with(|| LiveBuffer::new(max_samples));
live_buf.max_samples = max_samples;
live_buf.restart_with_epoch(epoch);
}
log::info!(
"Echo: recording started on {} Hz @ {} Hz (mode={:?})",
freq,
sample_rate,
self.mode
);
} else {
log::info!(
"Echo: additional recorder on {} Hz (count={})",
freq,
rec.active_count + 1
);
}
rec.active_count += 1;
if self.mode == EchoMode::Live {
if let Some(live_buf) = self.live.get_mut(&freq) {
live_buf.start_client(client_id);
}
}
}
pub fn feed(&mut self, freq: u32, client_id: SocketAddr, samples: &[Complex<f64>]) {
if samples.is_empty() {
return;
}
let is_active = self
.recorders
.get(&freq)
.is_some_and(|r| r.active_count > 0);
if !is_active {
return;
}
if self.mode == EchoMode::Live {
let max_samples = self.recorders.get(&freq).map_or(480_000, |r| {
(r.sample_rate as f64 * self.max_duration) as usize
});
let live_buf = self
.live
.entry(freq)
.or_insert_with(|| LiveBuffer::new(max_samples));
live_buf.feed_from(client_id, samples);
} else if let Some(rec) = self.recorders.get_mut(&freq) {
let max_samples = (rec.sample_rate as f64 * self.max_duration) as usize;
rec.recording.extend_from_slice(samples);
if rec.recording.len() > max_samples {
let excess = rec.recording.len() - max_samples;
rec.recording.drain(..excess);
}
}
}
pub fn stop_recording(&mut self, freq: u32, client_id: SocketAddr) {
let count = match self.recorders.get_mut(&freq) {
Some(rec) if rec.active_count > 0 => {
rec.active_count -= 1;
rec.active_count
}
_ => return,
};
if self.mode == EchoMode::Live {
if let Some(live_buf) = self.live.get_mut(&freq) {
live_buf.stop_client(client_id);
}
}
if count == 0 {
if let Some(rec) = self.recorders.get_mut(&freq) {
rec.last_ptt_off = Some(Instant::now());
}
if self.mode == EchoMode::Loop {
self.commit_freq(freq);
} else {
}
log::info!(
"Echo: recording stopped on {} Hz (mode={:?})",
freq,
self.mode
);
} else {
log::info!("Echo: recorder left {} Hz ({} remain)", freq, count);
}
}
fn is_recording_on(&self, freq: u32) -> bool {
self.recorders
.get(&freq)
.is_some_and(|r| r.active_count > 0)
}
fn commit_freq(&mut self, freq: u32) {
let Some(rec) = self.recorders.get_mut(&freq) else {
return;
};
if rec.recording.is_empty() {
return;
}
if freq == 0 {
log::debug!("Echo: discarding recording with freq=0");
rec.recording.clear();
return;
}
let sample_rate = rec.sample_rate;
let max_samples = (sample_rate as f64 * self.max_duration) as usize;
let mut buf = rec.recording.clone();
buf.truncate(max_samples);
if buf.is_empty() {
return;
}
let first_nz = buf.iter().position(|s| s.re != 0.0 || s.im != 0.0);
let last_nz = buf.iter().rposition(|s| s.re != 0.0 || s.im != 0.0);
match (first_nz, last_nz) {
(Some(f), Some(l)) => {
if f > 0 || l + 1 < buf.len() {
buf = buf[f..=l].to_vec();
}
}
_ => {
log::debug!("Echo: recording on {} Hz was all-zero, discarding", freq);
return;
}
}
if buf.is_empty() {
return;
}
let tail_pad = (sample_rate as f64 * LOOP_TAIL_SILENCE_SEC).round() as usize;
let pad_room = max_samples.saturating_sub(buf.len());
let pad = tail_pad.min(pad_room);
if pad > 0 {
buf.extend(std::iter::repeat_n(Complex::new(0.0, 0.0), pad));
}
let fade_samples = ((sample_rate as f64 * 0.040) as usize).min(buf.len() / 4);
if fade_samples > 1 {
let n = fade_samples as f64;
let buf_len = buf.len();
for i in 0..fade_samples {
let w = 0.5 * (1.0 - (PI * i as f64 / n).cos());
let tail_idx = buf_len - fade_samples + i;
buf[i] = buf[i] * w + buf[tail_idx] * (1.0 - w);
}
buf.truncate(buf_len - fade_samples);
}
let len = buf.len();
log::info!(
"Echo: committed {} samples ({:.2}s) on {} Hz",
len,
len as f64 / sample_rate as f64,
freq,
);
self.echoes.insert(freq, buf);
}
pub fn generate_echo_into(
&self,
ps: &mut EchoPlaybackState,
out: &mut [Complex<f64>],
rx_freq: u32,
sample_rate: u32,
) {
let n_samples = out.len();
for s in out.iter_mut() {
*s = Complex::new(0.0, 0.0);
}
if self.echoes.is_empty() {
return;
}
let mut scratch = std::mem::take(&mut ps.scratch);
if scratch.len() < n_samples {
scratch.resize(n_samples, Complex::new(0.0, 0.0));
}
let half_bw = sample_rate as f64 / 2.0;
for (&freq, echo_buf) in &self.echoes {
let offset_hz = rx_freq as f64 - freq as f64;
if offset_hz.abs() > half_bw {
continue;
}
let echo_len = echo_buf.len();
let mut pos = *ps.playback_pos.get(&freq).unwrap_or(&0) % echo_len;
let mut remaining = n_samples;
let mut write_pos = 0;
while remaining > 0 {
let available = remaining.min(echo_len - pos);
scratch[write_pos..write_pos + available]
.copy_from_slice(&echo_buf[pos..pos + available]);
pos = (pos + available) % echo_len;
write_pos += available;
remaining -= available;
}
ps.playback_pos.insert(freq, pos);
if offset_hz != 0.0 {
let sr = sample_rate as f64;
let phase0 = *ps.shift_phase.get(&freq).unwrap_or(&0.0);
let step = 2.0 * PI * offset_hz / sr;
let phasor = Complex::new(step.cos(), step.sin());
let mut osc = Complex::new(phase0.cos(), phase0.sin());
for s in scratch[..n_samples].iter_mut() {
*s *= osc;
osc *= phasor;
}
let new_phase = (phase0 + step * n_samples as f64).rem_euclid(2.0 * PI);
ps.shift_phase.insert(freq, new_phase);
}
for (o, s) in out.iter_mut().zip(scratch[..n_samples].iter()) {
*o += *s;
}
}
ps.scratch = scratch;
for s in out.iter_mut() {
*s *= self.attenuation;
}
}
pub fn generate_live_echo_into(
&self,
ps: &mut EchoPlaybackState,
out: &mut [Complex<f64>],
rx_freq: u32,
sample_rate: u32,
) {
let n_samples = out.len();
for s in out.iter_mut() {
*s = Complex::new(0.0, 0.0);
}
if self.live.is_empty() {
return;
}
let half_bw = sample_rate as f64 / 2.0;
let mut scratch = std::mem::take(&mut ps.scratch);
if scratch.len() < n_samples {
scratch.resize(n_samples, Complex::new(0.0, 0.0));
}
for (&freq, live_buf) in &self.live {
let offset_hz = rx_freq as f64 - freq as f64;
if offset_hz.abs() > half_bw {
continue;
}
let data = &live_buf.data;
let drained = live_buf.drained;
let write_head = drained + data.len();
let current_epoch = live_buf.epoch;
let client_epoch = ps.live_epoch.entry(freq).or_insert(0);
if *client_epoch != current_epoch {
*client_epoch = current_epoch;
ps.live_pos
.insert(freq, write_head.saturating_sub(LIVE_DELAY));
ps.live_shift_phase.insert(freq, 0.0);
}
let default_pos = write_head.saturating_sub(LIVE_DELAY);
let mut abs_pos = (*ps.live_pos.get(&freq).unwrap_or(&default_pos)).max(drained);
let idx = abs_pos - drained;
let readable_end = if self.is_recording_on(freq) {
data.len().saturating_sub(LIVE_DELAY)
} else {
data.len()
};
for s in scratch[..n_samples].iter_mut() {
*s = Complex::new(0.0, 0.0);
}
if idx < readable_end {
let n = n_samples.min(readable_end - idx);
scratch[..n].copy_from_slice(&data[idx..idx + n]);
abs_pos += n;
}
ps.live_pos.insert(freq, abs_pos);
if offset_hz != 0.0 {
let sr = sample_rate as f64;
let phase0 = *ps.live_shift_phase.get(&freq).unwrap_or(&0.0);
let step = 2.0 * PI * offset_hz / sr;
let phasor = Complex::new(step.cos(), step.sin());
let mut osc = Complex::new(phase0.cos(), phase0.sin());
for s in scratch[..n_samples].iter_mut() {
*s *= osc;
osc *= phasor;
}
let new_phase = (phase0 + step * n_samples as f64).rem_euclid(2.0 * PI);
ps.live_shift_phase.insert(freq, new_phase);
}
for (o, s) in out.iter_mut().zip(scratch[..n_samples].iter()) {
*o += *s;
}
}
ps.scratch = scratch;
for s in out.iter_mut() {
*s *= self.attenuation;
}
}
}
pub fn unpack_tx_iq_16bit_into(data: &[u8], out: &mut [Complex<f64>]) -> usize {
let n_blocks = (data.len() / 8).min(out.len());
#[allow(clippy::needless_range_loop)]
for k in 0..n_blocks {
let off = k * 8;
let i_val = i16::from_be_bytes([data[off + 4], data[off + 5]]);
let q_val = i16::from_be_bytes([data[off + 6], data[off + 7]]);
out[k] = Complex::new(i_val as f64 / 32768.0, q_val as f64 / 32768.0);
}
n_blocks
}
#[cfg(test)]
#[allow(clippy::needless_range_loop, clippy::manual_div_ceil)]
mod tests {
use super::*;
#[test]
fn live_echo_passes_continuous_tone_unchanged() {
let sr: u32 = 48_000;
let freq: u32 = 7_074_000;
let addr: SocketAddr = "127.0.0.1:1234".parse().unwrap();
let tone_hz = 1000.0;
let amp = 0.5_f64;
let n_subframes_total = 1500; let sub_n = 63usize;
let mut echo = EchoBuffer::new(EchoMode::Live);
echo.start_recording(freq, addr, sr);
let mut ps = EchoPlaybackState::new();
let mut rx_out: Vec<Complex<f64>> = Vec::new();
let mut tx_buf = vec![Complex::new(0.0, 0.0); sub_n];
let mut rx_buf = vec![Complex::new(0.0, 0.0); sub_n];
let mut tx_off = 0usize;
for _ in 0..n_subframes_total / 2 {
for _ in 0..2 {
for k in 0..sub_n {
let phi = 2.0 * PI * tone_hz * (tx_off + k) as f64 / sr as f64;
tx_buf[k] = Complex::new(amp * phi.cos(), amp * phi.sin());
}
tx_off += sub_n;
echo.feed(freq, addr, &tx_buf);
}
for _ in 0..2 {
echo.generate_live_echo_into(&mut ps, &mut rx_buf, freq, sr);
rx_out.extend_from_slice(&rx_buf);
}
}
echo.stop_recording(freq, addr);
let warmup_end = 2 * LIVE_DELAY;
let head = ((warmup_end + 62) / 63) * 63;
let active = &rx_out[head..];
assert!(
active.len() >= 8192,
"active buf too short: {}",
active.len()
);
let buf = &active[..8192];
let total_power: f64 = buf.iter().map(|s| s.norm_sqr()).sum::<f64>() / buf.len() as f64;
let fund_mag: f64 = goertzel_mag(buf, tone_hz, sr as f64);
let fund_power = fund_mag * fund_mag;
let nad_power = (total_power - fund_power).max(1e-30);
let sinad_db = 10.0 * (fund_power / nad_power).log10();
eprintln!(
"[unit] live-echo: total_power={:.3e} fund_power={:.3e} sinad={:.1} dB \
fund_mag={:.3e}",
total_power, fund_power, sinad_db, fund_mag
);
assert!(
sinad_db > 30.0,
"live-echo unit test SINAD too low: {:.1} dB",
sinad_db
);
}
fn goertzel_mag(buf: &[Complex<f64>], freq_hz: f64, sample_rate: f64) -> f64 {
let omega = 2.0 * PI * freq_hz / sample_rate;
let mut acc = Complex::new(0.0, 0.0);
for (k, s) in buf.iter().enumerate() {
let phi = -omega * k as f64;
acc += *s * Complex::new(phi.cos(), phi.sin());
}
acc.norm() / buf.len() as f64
}
#[test]
fn live_echo_handles_bursty_writes() {
let sr: u32 = 48_000;
let freq: u32 = 7_074_000;
let addr: SocketAddr = "127.0.0.1:1234".parse().unwrap();
let tone_hz = 1000.0;
let amp = 0.5_f64;
let sub_n = 63usize;
const BURST_FEEDS: usize = 8;
let n_iters = 200;
let mut echo = EchoBuffer::new(EchoMode::Live);
echo.start_recording(freq, addr, sr);
let mut ps = EchoPlaybackState::new();
let mut rx_out: Vec<Complex<f64>> = Vec::new();
let mut tx_buf = vec![Complex::new(0.0, 0.0); sub_n];
let mut rx_buf = vec![Complex::new(0.0, 0.0); sub_n];
let mut tx_off = 0usize;
for _ in 0..n_iters {
for _ in 0..BURST_FEEDS {
for k in 0..sub_n {
let phi = 2.0 * PI * tone_hz * (tx_off + k) as f64 / sr as f64;
tx_buf[k] = Complex::new(amp * phi.cos(), amp * phi.sin());
}
tx_off += sub_n;
echo.feed(freq, addr, &tx_buf);
}
for _ in 0..2 {
echo.generate_live_echo_into(&mut ps, &mut rx_buf, freq, sr);
rx_out.extend_from_slice(&rx_buf);
}
}
echo.stop_recording(freq, addr);
let head = 2048usize.min(rx_out.len() / 4);
let active = &rx_out[head..];
let analysis_len = 8192.min(active.len());
let buf = &active[..analysis_len];
let total_power: f64 = buf.iter().map(|s| s.norm_sqr()).sum::<f64>() / buf.len() as f64;
let fund_mag: f64 = goertzel_mag(buf, tone_hz, sr as f64);
let fund_power = fund_mag * fund_mag;
let nad_power = (total_power - fund_power).max(1e-30);
let sinad_db = 10.0 * (fund_power / nad_power).log10();
eprintln!(
"[unit-burst] feeds_per_iter={} reads_per_iter=2 \
total_power={:.3e} fund_power={:.3e} sinad={:.1} dB",
BURST_FEEDS, total_power, fund_power, sinad_db
);
assert!(
sinad_db > 30.0,
"live-echo bursty SINAD too low: {:.1} dB (writes:reads = {}:2)",
sinad_db,
BURST_FEEDS
);
}
}