radio_utils_protocol/protocol1.rs
1use std::collections::HashMap;
2use std::net::{Ipv4Addr, SocketAddr};
3use std::time::Duration;
4
5use num_complex::Complex;
6use ringbuf::traits::{Consumer, Observer, Split};
7use ringbuf::{HeapCons, HeapProd, HeapRb};
8use tokio::sync::mpsc;
9
10use crate::discovery;
11use crate::transport::UdpTransport;
12use crate::types::*;
13
14const PORT: u16 = 1024;
15const PACKET_SIZE: usize = 1032;
16const SUBFRAME_SIZE: usize = 512;
17const SYNC: [u8; 3] = [0x7F, 0x7F, 0x7F];
18
19/// If no packet is received within this duration, the connection is considered lost.
20const RECV_TIMEOUT: Duration = Duration::from_secs(10);
21
22/// Commands that can be sent to the Protocol 1 client while it is running.
23#[derive(Debug)]
24pub enum P1Command {
25 SetRxFrequency { ddc: usize, freq_hz: u32 },
26 SetTxFrequency(u32),
27 SetSampleRate(u32),
28 SetNddc(u8),
29 SetTxDrive(u8),
30 SetPtt(bool),
31 SetCwMode(bool),
32 SetRxAttenuation(u8),
33 Stop,
34}
35
36/// Protocol 1 client for connecting to HPSDR hardware.
37pub struct Protocol1Client {
38 transport: UdpTransport,
39 device: DiscoveredDevice,
40 running: bool,
41 // Configuration
42 sample_rate: u32,
43 nddc: u8,
44 rx_frequencies: [u32; 12],
45 tx_frequency: u32,
46 tx_drive: u8,
47 ptt: bool,
48 rx_attenuation: u8,
49 // Sequence tracking
50 tx_seq: u32,
51 rx_seq: u32,
52 // Control command state: which address to send next
53 control_idx: u8,
54 // Channels
55 iq_senders: HashMap<usize, mpsc::Sender<Vec<Complex<f64>>>>,
56 tx_iq_consumer: Option<HeapCons<Complex<f64>>>,
57 status_sender: mpsc::Sender<RadioStatus>,
58 status_receiver: Option<mpsc::Receiver<RadioStatus>>,
59 // Command channel for runtime control
60 cmd_sender: mpsc::Sender<P1Command>,
61 cmd_receiver: Option<mpsc::Receiver<P1Command>>,
62 // CW mode flag: when true, mic L/R channels are zeroed (tone is in IQ only)
63 tx_cw_mode: bool,
64 // TX IQ buffer: accumulates DSP blocks, drains 63 per subframe
65 tx_iq_buffer: Vec<Complex<f64>>,
66 // Pre-allocated packet buffer for the streaming hot path
67 host_pkt_buf: Vec<u8>,
68 // Pre-allocated packet buffer for send_control
69 ctrl_pkt_buf: Vec<u8>,
70 // Pre-allocated per-DDC sample buffers for parse_radio_subframe
71 ddc_sample_bufs: Vec<Vec<Complex<f64>>>,
72 // Accumulated status across response address rotation. Each subframe
73 // carries the data for ONE response-address slot (0x00 / 0x08 / 0x10
74 // / 0x18 in HPSDR P1), so a single subframe never has both fwd and
75 // rev fresh. We accumulate values into `accumulated_status` across
76 // the rotation and emit only at the cycle boundary — see
77 // `last_status_addr` below.
78 accumulated_status: RadioStatus,
79 // Last response-address byte we saw, used to detect cycle wrap.
80 // When the rotation comes back round (current addr <= previous),
81 // the just-completed cycle has fully populated fwd / rev / supply /
82 // pa_current, so we emit ONE coherent status. Without this every
83 // subframe emitted its own status which mixed fresh fields with
84 // stale ones from prior cycles — broke SWR (computed from a
85 // mismatched fwd/rev pair) and made the rev-power readout look
86 // stuck near zero.
87 last_status_addr: Option<u8>,
88 // TX debug counters
89 tx_subframes_with_data: u64,
90 tx_subframes_empty: u64,
91 /// Voice-TX underrun guard: increments each time the adaptive pacing
92 /// asked us to send a packet but the IQ buffer didn't have a full
93 /// `samples_per_packet` worth of samples yet. Counts skipped *send*
94 /// attempts (one per guard-firing), not subframes — so it's
95 /// directly comparable to packets-per-second figures rather than
96 /// to `sf_data` / `sf_empty`. Should be ~0 if the wire-pack and
97 /// the DSP producer are well-matched; large values mean burst-mode
98 /// pacing is outrunning DSP and the guard is doing its job.
99 tx_underrun_skips: u64,
100 tx_debug_log_counter: u32,
101 // FIFO-estimate TX pacing (voice only): accumulates on each packet sent
102 // and drains at `sample_rate`. Lets us burst packets at PTT start to
103 // build a radio-side jitter cushion, then throttle back to wire rate.
104 tx_fifo_estimate: f64,
105 tx_last_send: Option<tokio::time::Instant>,
106 tx_fifo_log_counter: u32,
107 // Wire-pack clipping diagnostics (per-PTT-session counters).
108 tx_samples_total: u64,
109 tx_samples_clipped_re: u64,
110 tx_samples_clipped_im: u64,
111 tx_max_excess: f64,
112}
113
114impl Protocol1Client {
115 /// Discover all HPSDR devices on the network using Protocol 1.
116 pub async fn discover(timeout: Duration) -> Result<Vec<DiscoveredDevice>> {
117 discovery::discover_on_interfaces(
118 build_p1_discovery_request,
119 parse_p1_discovery_response,
120 PORT,
121 timeout,
122 )
123 .await
124 }
125
126 /// Send a discovery request to a specific address (unicast).
127 pub async fn discover_at(addr: Ipv4Addr, timeout: Duration) -> Result<Vec<DiscoveredDevice>> {
128 discovery::discover_at_addr(
129 build_p1_discovery_request,
130 parse_p1_discovery_response,
131 addr,
132 PORT,
133 timeout,
134 )
135 .await
136 }
137
138 /// Connect to a discovered Protocol 1 device.
139 pub async fn connect(device: &DiscoveredDevice) -> Result<Self> {
140 let transport = UdpTransport::bind_any().await?;
141 let (status_tx, status_rx) = mpsc::channel(64);
142 let (cmd_tx, cmd_rx) = mpsc::channel(64);
143
144 Ok(Self {
145 transport,
146 device: device.clone(),
147 running: false,
148 sample_rate: 48_000,
149 nddc: 1,
150 rx_frequencies: [7_074_000; 12],
151 tx_frequency: 7_074_000,
152 tx_drive: 128,
153 ptt: false,
154 rx_attenuation: 0,
155 tx_seq: 0,
156 rx_seq: 0,
157 control_idx: 0,
158 iq_senders: HashMap::new(),
159 tx_iq_consumer: None,
160 tx_cw_mode: false,
161 tx_iq_buffer: Vec::new(),
162 status_sender: status_tx,
163 status_receiver: Some(status_rx),
164 cmd_sender: cmd_tx,
165 cmd_receiver: Some(cmd_rx),
166 host_pkt_buf: vec![0u8; PACKET_SIZE],
167 ctrl_pkt_buf: vec![0u8; PACKET_SIZE],
168 ddc_sample_bufs: Vec::new(),
169 accumulated_status: RadioStatus::default(),
170 last_status_addr: None,
171 tx_subframes_with_data: 0,
172 tx_subframes_empty: 0,
173 tx_underrun_skips: 0,
174 tx_debug_log_counter: 0,
175 tx_fifo_estimate: 0.0,
176 tx_last_send: None,
177 tx_fifo_log_counter: 0,
178 tx_samples_total: 0,
179 tx_samples_clipped_re: 0,
180 tx_samples_clipped_im: 0,
181 tx_max_excess: 0.0,
182 })
183 }
184
185 /// Start streaming IQ data from the radio.
186 pub async fn start(&mut self) -> Result<()> {
187 // Send start command
188 let mut pkt = vec![0u8; 64];
189 pkt[0] = 0xEF;
190 pkt[1] = 0xFE;
191 pkt[2] = 0x04;
192 pkt[3] = 0x01; // start
193 self.transport.send_to(&pkt, self.device.addr).await?;
194 self.running = true;
195 log::debug!("P1 Start command sent to {}", self.device.addr);
196
197 // Send initial configuration
198 self.send_config().await?;
199 Ok(())
200 }
201
202 /// Stop streaming.
203 pub async fn stop(&mut self) -> Result<()> {
204 let mut pkt = vec![0u8; 64];
205 pkt[0] = 0xEF;
206 pkt[1] = 0xFE;
207 pkt[2] = 0x04;
208 pkt[3] = 0x00; // stop
209 self.transport.send_to(&pkt, self.device.addr).await?;
210 self.running = false;
211 log::debug!("P1 Stop command sent");
212 Ok(())
213 }
214
215 // -- Configuration commands -----------------------------------------------
216
217 pub fn set_rx_frequency(&mut self, ddc: usize, freq_hz: u32) {
218 if ddc < self.rx_frequencies.len() {
219 self.rx_frequencies[ddc] = freq_hz;
220 }
221 }
222
223 pub fn set_tx_frequency(&mut self, freq_hz: u32) {
224 self.tx_frequency = freq_hz;
225 }
226
227 pub fn set_sample_rate(&mut self, rate: u32) {
228 self.sample_rate = rate;
229 }
230
231 pub fn set_nddc(&mut self, count: u8) {
232 self.nddc = count.max(1);
233 }
234
235 pub fn tx_drive(&self) -> u8 {
236 self.tx_drive
237 }
238
239 pub fn set_tx_drive(&mut self, drive: u8) {
240 self.tx_drive = drive;
241 }
242
243 pub fn set_ptt(&mut self, on: bool) {
244 self.ptt = on;
245 }
246
247 pub fn set_rx_attenuation(&mut self, db: u8) {
248 self.rx_attenuation = db;
249 }
250
251 /// Get a sender for runtime commands. The run() loop polls this channel.
252 pub fn command_sender(&self) -> mpsc::Sender<P1Command> {
253 self.cmd_sender.clone()
254 }
255
256 /// Get a receiver for IQ data from a specific DDC.
257 pub fn rx_iq_stream(&mut self, ddc: usize) -> mpsc::Receiver<Vec<Complex<f64>>> {
258 if self.iq_senders.contains_key(&ddc) {
259 log::warn!(
260 "rx_iq_stream called again for DDC {ddc} — previous receiver will be orphaned"
261 );
262 }
263 let (tx, rx) = mpsc::channel(256);
264 self.iq_senders.insert(ddc, tx);
265 rx
266 }
267
268 /// Get a producer for TX IQ data (lock-free ring buffer, zero-copy).
269 pub fn tx_iq_sink(&mut self) -> HeapProd<Complex<f64>> {
270 if self.tx_iq_consumer.is_some() {
271 log::warn!("tx_iq_sink called again — previous consumer will be orphaned");
272 }
273 // 256 blocks * 1024 samples = 262144 — enough to absorb DSP jitter
274 let rb = HeapRb::<Complex<f64>>::new(262_144);
275 let (prod, cons) = rb.split();
276 self.tx_iq_consumer = Some(cons);
277 prod
278 }
279
280 /// Get a receiver for radio status updates.
281 ///
282 /// Returns `None` if already taken (can only be called once).
283 pub fn status_stream(&mut self) -> Option<mpsc::Receiver<RadioStatus>> {
284 self.status_receiver.take()
285 }
286
287 /// Run the main receive/transmit loop. This blocks until stop() is called
288 /// or the connection is lost.
289 pub async fn run(&mut self) -> Result<()> {
290 let mut recv_buf = vec![0u8; 2048];
291 let spr = self.samples_per_subframe();
292 let samples_per_packet = (spr * 2) as f64;
293 // Wire-rate interval: used for RX and CW TX (no cushion needed).
294 let wire_rate_interval =
295 Duration::from_secs_f64(samples_per_packet / self.sample_rate as f64);
296
297 let mut cmd_rx = self.cmd_receiver.take();
298 let mut last_recv = tokio::time::Instant::now();
299 let mut next_tx_send = tokio::time::Instant::now();
300
301 while self.running {
302 tokio::select! {
303 // Send control/TX data at an adaptive cadence (voice TX) or
304 // wire rate (RX / CW TX).
305 _ = tokio::time::sleep_until(next_tx_send) => {
306 let now = tokio::time::Instant::now();
307
308 // Voice-TX underrun guard: if we don't have a full packet's
309 // worth of IQ ready (in either the local refill buffer or
310 // the lock-free consumer), skip this send and retry in
311 // 500 µs so the wire-pack never emits a subframe padded
312 // with zeros.
313 //
314 // Why this matters: the adaptive pacing below can drop
315 // `next_delay` to 0 µs to burst-prime the radio FIFO at
316 // PTT-on. Once the burst started, it kept re-entering
317 // burst mode every time `tx_fifo_estimate` (a fictional
318 // model of the radio FIFO based on packet timing, not on
319 // actual buffer fill) dipped below `low_water`. The DSP
320 // produces in chunks (1024-sample blocks every ~21 ms)
321 // while burst mode pulls 126 samples per packet at
322 // ~3000 pkt/s, so the consumer regularly drained dry and
323 // `fill_host_subframe` packed 63 zero samples instead of
324 // bailing. Those zero subframes injected hard
325 // discontinuities into the on-air signal at a 19 % duty
326 // cycle (sf_empty / (sf_empty + sf_data) measured ~412 /
327 // 2150 in repro logs), creating broadband splatter that
328 // is invisible to the host's pre-wire spectrum probe but
329 // very visible to the receiver. CW TX is exempt — its
330 // modulator generates a continuous carrier and the pacing
331 // is wire-rate locked, so the underrun pattern doesn't
332 // exist there.
333 let voice_tx = self.ptt && !self.tx_cw_mode;
334 let pkt_iq_samples = samples_per_packet as usize;
335 let voice_iq_avail = self.tx_iq_buffer.len()
336 + self
337 .tx_iq_consumer
338 .as_ref()
339 .map(|c| c.occupied_len())
340 .unwrap_or(0);
341 let voice_underrun = voice_tx && voice_iq_avail < pkt_iq_samples;
342
343 let next_delay = if voice_underrun {
344 // Skip this send entirely: a packet whose IQ section
345 // is zero-padded would inject 63 silent samples in the
346 // middle of a continuous transmission and create
347 // broadband splatter. Re-arm in 500 µs and try again
348 // when DSP has produced more samples.
349 self.tx_underrun_skips = self.tx_underrun_skips.saturating_add(1);
350 Duration::from_micros(500)
351 } else {
352 // Update FIFO estimate: the radio drains at sample_rate.
353 if let Some(last) = self.tx_last_send {
354 let elapsed = now.duration_since(last).as_secs_f64();
355 self.tx_fifo_estimate -= elapsed * self.sample_rate as f64;
356 if self.tx_fifo_estimate < 0.0 {
357 self.tx_fifo_estimate = 0.0;
358 }
359 }
360 self.tx_last_send = Some(now);
361
362 self.send_host_data_packet().await?;
363 self.tx_fifo_estimate += samples_per_packet;
364
365 // Voice TX: burst at start to prime the radio-side
366 // FIFO, then throttle to keep ~30 ms of cushion
367 // against network jitter. CW and RX keep fixed
368 // wire-rate pacing.
369 if voice_tx {
370 let sr = self.sample_rate as f64;
371 let high_water = 0.030 * sr;
372 let low_water = 0.006 * sr;
373 if self.tx_fifo_estimate > high_water {
374 Duration::from_micros(2000)
375 } else if self.tx_fifo_estimate > low_water {
376 Duration::from_micros(500)
377 } else {
378 Duration::ZERO
379 }
380 } else {
381 wire_rate_interval
382 }
383 };
384 // Anchor the next-send schedule to the previous deadline
385 // when on-time, fall back to `now + next_delay` when the
386 // schedule has slipped into the past.
387 //
388 // Why anchor: every iteration's tokio wake + UDP-send
389 // latency (~1 ms on a busy localhost loop) baked into
390 // `now + next_delay` made the host's TX rate settle
391 // around 270 pkt/s instead of the 380 pkt/s wire rate.
392 // The emulator's RX timer is anchored (it uses
393 // `tokio::time::interval`), so a drifted host produced
394 // ~30 % fewer samples per wall-clock second than the
395 // listener's live-mode read consumed — those reads then
396 // fell onto the LIVE_DELAY safety margin and emitted
397 // 63-sample silent subframes, scrambling a CW tone over
398 // several kHz of bandwidth.
399 //
400 // Why fall back when slipping: if we anchor unconditionally
401 // and the schedule is e.g. 10 ms in the past, sleep_until
402 // returns immediately for several iterations in a row,
403 // bursting catch-up sends. Each catch-up send drains 63
404 // samples from the host's TX ring; if the producer was
405 // in the middle of a 2 ms gap when the burst hit, the
406 // ring underflows and the host pads the subframe with
407 // zeros (loop mode then captures internal zero runs that
408 // break up the recorded tone). Falling back to
409 // `now + next_delay` on slip preserves cadence anchoring
410 // for the steady-state case (the user's reported bug)
411 // while preventing the burst regression.
412 let scheduled = next_tx_send + next_delay;
413 next_tx_send = if scheduled >= now {
414 scheduled
415 } else {
416 now + next_delay
417 };
418
419 if voice_tx {
420 self.tx_fifo_log_counter += 1;
421 if self.tx_fifo_log_counter.is_multiple_of(400) {
422 log::debug!(
423 "[P1 TX PACE] fifo_est={:.0} samp ({:.1} ms), next_sleep={:?}",
424 self.tx_fifo_estimate,
425 self.tx_fifo_estimate * 1000.0 / self.sample_rate as f64,
426 next_delay
427 );
428 }
429 }
430 }
431 // Receive data from radio
432 result = self.transport.recv_from(&mut recv_buf) => {
433 match result {
434 Ok((len, _addr)) => {
435 last_recv = tokio::time::Instant::now();
436 self.handle_radio_packet(&recv_buf[..len]);
437 }
438 Err(e) => {
439 log::error!("P1 Recv error: {}", e);
440 }
441 }
442 }
443 // Process runtime commands
444 Some(cmd) = async {
445 match &mut cmd_rx {
446 Some(rx) => rx.recv().await,
447 None => std::future::pending().await,
448 }
449 } => {
450 self.handle_command(cmd).await?;
451 }
452 // Timeout detection
453 _ = tokio::time::sleep_until(last_recv + RECV_TIMEOUT) => {
454 log::error!("P1 No data received for {:?}, connection lost", RECV_TIMEOUT);
455 return Err(ProtocolError::ConnectionLost);
456 }
457 }
458 }
459 Ok(())
460 }
461
462 async fn handle_command(&mut self, cmd: P1Command) -> Result<()> {
463 match cmd {
464 P1Command::SetRxFrequency { ddc, freq_hz } => {
465 self.set_rx_frequency(ddc, freq_hz);
466 log::debug!("P1 RX {} freq -> {} Hz", ddc, freq_hz);
467 }
468 P1Command::SetTxFrequency(freq) => {
469 self.set_tx_frequency(freq);
470 log::debug!("P1 TX freq -> {} Hz", freq);
471 }
472 P1Command::SetSampleRate(rate) => {
473 self.set_sample_rate(rate);
474 log::debug!("P1 Sample rate -> {}", rate);
475 }
476 P1Command::SetNddc(n) => {
477 self.set_nddc(n);
478 log::debug!("P1 NDDC -> {}", n);
479 }
480 P1Command::SetTxDrive(drive) => {
481 self.set_tx_drive(drive);
482 log::debug!("P1 TX drive -> {}", drive);
483 }
484 P1Command::SetPtt(on) => {
485 let was = self.ptt;
486 self.set_ptt(on);
487 log::debug!(
488 "P1 PTT {} -> {} (drive={}, tx_iq_consumer={})",
489 if was { "ON" } else { "OFF" },
490 if on { "ON" } else { "OFF" },
491 self.tx_drive,
492 if self.tx_iq_consumer.is_some() {
493 "connected"
494 } else {
495 "NONE"
496 }
497 );
498 if on && !was {
499 // New TX session — start with empty radio FIFO estimate
500 // so the sender bursts until the cushion is built.
501 self.tx_fifo_estimate = 0.0;
502 self.tx_last_send = None;
503 self.tx_fifo_log_counter = 0;
504 self.tx_samples_total = 0;
505 self.tx_samples_clipped_re = 0;
506 self.tx_samples_clipped_im = 0;
507 self.tx_max_excess = 0.0;
508 self.tx_underrun_skips = 0;
509 }
510 if !on {
511 log::debug!(
512 "P1 PTT OFF stats: subframes_with_data={}, subframes_empty={}, underrun_skips={}, buf_remaining={}",
513 self.tx_subframes_with_data,
514 self.tx_subframes_empty,
515 self.tx_underrun_skips,
516 self.tx_iq_buffer.len()
517 );
518 let total = self.tx_samples_total.max(1);
519 let pct_re = self.tx_samples_clipped_re as f64 * 100.0 / total as f64;
520 let pct_im = self.tx_samples_clipped_im as f64 * 100.0 / total as f64;
521 log::info!(
522 "[P1 TX CLIP] session: total={} clipped_re={} ({:.3}%) clipped_im={} ({:.3}%) max_excess={:.4} (1.0 = wire-pack ceiling)",
523 self.tx_samples_total,
524 self.tx_samples_clipped_re,
525 pct_re,
526 self.tx_samples_clipped_im,
527 pct_im,
528 self.tx_max_excess,
529 );
530 self.tx_iq_buffer.clear();
531 self.tx_subframes_with_data = 0;
532 self.tx_subframes_empty = 0;
533 self.tx_underrun_skips = 0;
534 self.tx_debug_log_counter = 0;
535 }
536 }
537 P1Command::SetCwMode(cw) => {
538 self.tx_cw_mode = cw;
539 log::debug!("P1 TX CW mode -> {}", cw);
540 }
541 P1Command::SetRxAttenuation(db) => {
542 self.set_rx_attenuation(db);
543 log::debug!("P1 RX attenuation -> {} dB", db);
544 }
545 P1Command::Stop => {
546 self.stop().await?;
547 }
548 }
549 Ok(())
550 }
551
552 // -- Internal methods -----------------------------------------------------
553
554 fn samples_per_subframe(&self) -> usize {
555 // Protocol 1 supports 1-8 DDCs; clamp to prevent division yielding 0
556 // (which would cause a busy-loop timer at 0-duration interval).
557 let nddc = (self.nddc.max(1) as usize).min(8);
558 504 / (6 * nddc + 2)
559 }
560
561 /// Send initial configuration commands.
562 async fn send_config(&mut self) -> Result<()> {
563 // Send sample rate + nddc + duplex (address 0x00)
564 let rate_code = sample_rate_to_p1_code(self.sample_rate);
565 let nddc_bits = ((self.nddc.max(1) - 1) & 0x07) << 3;
566 let duplex = 0x04;
567 self.send_control(0x00, rate_code, 0, 0, nddc_bits | duplex)
568 .await?;
569
570 // Send TX frequency (address 0x02)
571 let tx_bytes = self.tx_frequency.to_be_bytes();
572 self.send_control(0x02, tx_bytes[0], tx_bytes[1], tx_bytes[2], tx_bytes[3])
573 .await?;
574
575 // Send RX frequencies
576 for i in 0..self.nddc.min(8) as usize {
577 let addr = 0x04 + (i as u8 * 2);
578 let freq_bytes = self.rx_frequencies[i].to_be_bytes();
579 self.send_control(
580 addr,
581 freq_bytes[0],
582 freq_bytes[1],
583 freq_bytes[2],
584 freq_bytes[3],
585 )
586 .await?;
587 }
588
589 // Send TX drive (address 0x12) + PA enable for HL2.
590 // Hermes leaves C2 zero; the rotating control packet then handles
591 // the Alex filter bits separately.
592 let pa_c2 = match self.device.hw_type {
593 HpsdrHw::HermesLite => 0x08,
594 HpsdrHw::Hermes => 0x00,
595 };
596 self.send_control(0x12, self.tx_drive, pa_c2, 0, 0).await?;
597
598 Ok(())
599 }
600
601 /// Send a single control command embedded in a host data packet.
602 async fn send_control(&mut self, c0: u8, c1: u8, c2: u8, c3: u8, c4: u8) -> Result<()> {
603 self.ctrl_pkt_buf.fill(0);
604 self.ctrl_pkt_buf[0] = 0xEF;
605 self.ctrl_pkt_buf[1] = 0xFE;
606 self.ctrl_pkt_buf[2] = 0x01; // data
607 self.ctrl_pkt_buf[3] = 0x02; // endpoint 2 (host to radio)
608 let seq = self.tx_seq;
609 self.tx_seq = self.tx_seq.wrapping_add(1);
610 self.ctrl_pkt_buf[4..8].copy_from_slice(&seq.to_be_bytes());
611
612 // Sub-frame A: sync + control
613 self.ctrl_pkt_buf[8..11].copy_from_slice(&SYNC);
614 let mox_bit = if self.ptt { 0x01 } else { 0x00 };
615 self.ctrl_pkt_buf[11] = c0 | mox_bit;
616 self.ctrl_pkt_buf[12] = c1;
617 self.ctrl_pkt_buf[13] = c2;
618 self.ctrl_pkt_buf[14] = c3;
619 self.ctrl_pkt_buf[15] = c4;
620 // Rest of sub-frame A: TX IQ silence
621
622 // Sub-frame B: sync + address 0x00 with current sample rate + NDC + duplex
623 self.ctrl_pkt_buf[520..523].copy_from_slice(&SYNC);
624 let nddc_bits_b = ((self.nddc.max(1) - 1) & 0x07) << 3;
625 self.ctrl_pkt_buf[523] = mox_bit; // C0 = 0x00 | mox
626 self.ctrl_pkt_buf[524] = sample_rate_to_p1_code(self.sample_rate); // C1
627 // C2, C3 = 0 (already zeroed)
628 self.ctrl_pkt_buf[527] = nddc_bits_b | 0x04; // C4: nddc + duplex
629
630 self.transport
631 .send_to(&self.ctrl_pkt_buf, self.device.addr)
632 .await?;
633 Ok(())
634 }
635
636 /// Build and send a host data packet with rotating control commands and TX IQ.
637 async fn send_host_data_packet(&mut self) -> Result<()> {
638 self.host_pkt_buf.fill(0);
639 self.host_pkt_buf[0] = 0xEF;
640 self.host_pkt_buf[1] = 0xFE;
641 self.host_pkt_buf[2] = 0x01;
642 self.host_pkt_buf[3] = 0x02; // endpoint 2
643
644 let seq = self.tx_seq;
645 self.tx_seq = self.tx_seq.wrapping_add(1);
646 self.host_pkt_buf[4..8].copy_from_slice(&seq.to_be_bytes());
647
648 // Fill two sub-frames with control + TX data
649 // Take buffer out to avoid borrow conflict
650 let mut pkt = std::mem::take(&mut self.host_pkt_buf);
651 self.fill_host_subframe(&mut pkt, 8);
652 self.fill_host_subframe(&mut pkt, 520);
653 self.host_pkt_buf = pkt;
654
655 self.transport
656 .send_to(&self.host_pkt_buf, self.device.addr)
657 .await?;
658 Ok(())
659 }
660
661 fn fill_host_subframe(&mut self, buf: &mut [u8], offset: usize) {
662 buf[offset..offset + 3].copy_from_slice(&SYNC);
663
664 // Rotate through control addresses
665 let mox_bit = if self.ptt { 0x01 } else { 0x00 };
666 let (c0, c1, c2, c3, c4) = self.next_control_command();
667 buf[offset + 3] = c0 | mox_bit;
668 buf[offset + 4] = c1;
669 buf[offset + 5] = c2;
670 buf[offset + 6] = c3;
671 buf[offset + 7] = c4;
672
673 // TX IQ data: 63 blocks of 8 bytes each [L(2B) R(2B) I(2B) Q(2B)]
674 // Pop from lock-free ring buffer consumer when PTT is on
675 if self.ptt {
676 // Refill buffer from ring buffer if we need more samples
677 if self.tx_iq_buffer.len() < 63 {
678 if let Some(ref mut cons) = self.tx_iq_consumer {
679 let avail = cons.occupied_len();
680 if avail > 0 {
681 let old_len = self.tx_iq_buffer.len();
682 self.tx_iq_buffer
683 .resize(old_len + avail, Complex::new(0.0, 0.0));
684 let popped = cons.pop_slice(&mut self.tx_iq_buffer[old_len..]);
685 self.tx_iq_buffer.truncate(old_len + popped);
686 }
687 }
688 }
689
690 let n = self.tx_iq_buffer.len().min(63);
691 let mut peak = 0.0f64;
692 if n > 0 {
693 for i in 0..n {
694 let sample = self.tx_iq_buffer[i];
695 let base = offset + 8 + i * 8;
696 if base + 8 > buf.len() {
697 break;
698 }
699 // L/R audio (mic): zero in CW mode (tone lives in IQ only),
700 // otherwise send real part as 16-bit mono on both channels.
701 if !self.tx_cw_mode {
702 let mic_val = (sample.re.clamp(-1.0, 1.0) * 32767.0) as i16;
703 buf[base..base + 2].copy_from_slice(&mic_val.to_be_bytes());
704 buf[base + 2..base + 4].copy_from_slice(&mic_val.to_be_bytes());
705 }
706 // I/Q: 16-bit complex — Q is negated to match HPSDR wire convention.
707 // The emulator uses non-negated Q internally but this has been
708 // validated to work correctly with real HPSDR hardware.
709 let i_val = (sample.re.clamp(-1.0, 1.0) * 32767.0) as i16;
710 let q_val = ((-sample.im).clamp(-1.0, 1.0) * 32767.0) as i16;
711 buf[base + 4..base + 6].copy_from_slice(&i_val.to_be_bytes());
712 buf[base + 6..base + 8].copy_from_slice(&q_val.to_be_bytes());
713 let re_abs = sample.re.abs();
714 let im_abs = sample.im.abs();
715 peak = peak.max(re_abs.max(im_abs));
716 self.tx_samples_total += 1;
717 if re_abs > 1.0 {
718 self.tx_samples_clipped_re += 1;
719 }
720 if im_abs > 1.0 {
721 self.tx_samples_clipped_im += 1;
722 }
723 if re_abs > self.tx_max_excess {
724 self.tx_max_excess = re_abs;
725 }
726 if im_abs > self.tx_max_excess {
727 self.tx_max_excess = im_abs;
728 }
729 }
730 // Drain consumed samples
731 self.tx_iq_buffer.drain(..n);
732
733 if self.tx_subframes_with_data < 3 {
734 log::debug!(
735 "[P1 TX] Subframe packed: {} IQ samples, peak={:.4}, buffer_remaining={}",
736 n,
737 peak,
738 self.tx_iq_buffer.len()
739 );
740 }
741 self.tx_subframes_with_data += 1;
742 } else {
743 peak = 0.0;
744 self.tx_subframes_empty += 1;
745 }
746
747 // Periodic debug log — every ~50 subframes (~131ms) to track
748 // whether IQ gaps (peak=0) actually reach the wire during CW.
749 self.tx_debug_log_counter += 1;
750 if self.tx_debug_log_counter.is_multiple_of(50) {
751 let total = self.tx_samples_total.max(1);
752 let pct_clipped = (self.tx_samples_clipped_re + self.tx_samples_clipped_im) as f64
753 * 100.0
754 / (2 * total) as f64;
755 log::debug!(
756 "[P1 TX WIRE] peak={:.6} n={} buf={} cw={} drive={} sf_data={} sf_empty={} skips={} clipped={:.3}% max_excess={:.4}",
757 peak,
758 n,
759 self.tx_iq_buffer.len(),
760 self.tx_cw_mode,
761 self.tx_drive,
762 self.tx_subframes_with_data,
763 self.tx_subframes_empty,
764 self.tx_underrun_skips,
765 pct_clipped,
766 self.tx_max_excess,
767 );
768 }
769 }
770 }
771
772 /// Get the next control command in the rotation.
773 fn next_control_command(&mut self) -> (u8, u8, u8, u8, u8) {
774 let idx = self.control_idx;
775 self.control_idx = (self.control_idx + 1) % 13;
776
777 match idx {
778 0 => {
779 // Address 0x00: sample rate + nddc + duplex + OC outputs
780 let rate_code = sample_rate_to_p1_code(self.sample_rate);
781 let nddc_bits = ((self.nddc.max(1) - 1) & 0x07) << 3;
782 let duplex = 0x04; // always duplex (required by HL2, harmless for Hermes)
783 // OC outputs for HL2 N2ADR filter board (C2[7:1])
784 let c2 = match self.device.hw_type {
785 HpsdrHw::HermesLite => {
786 let freq = if self.ptt {
787 self.tx_frequency
788 } else {
789 self.rx_frequencies[0]
790 };
791 n2adr_oc_for_freq(freq) << 1
792 }
793 HpsdrHw::Hermes => 0,
794 };
795 (0x00, rate_code, c2, 0, nddc_bits | duplex)
796 }
797 1 => {
798 // Address 0x02: TX frequency
799 let b = self.tx_frequency.to_be_bytes();
800 (0x02, b[0], b[1], b[2], b[3])
801 }
802 2..=8 => {
803 // Addresses 0x04-0x10: RX frequencies
804 let ddc = (idx - 2) as usize;
805 let addr = 0x04 + (ddc as u8 * 2);
806 let freq = if ddc < self.nddc as usize {
807 self.rx_frequencies[ddc]
808 } else {
809 0
810 };
811 let b = freq.to_be_bytes();
812 (addr, b[0], b[1], b[2], b[3])
813 }
814 9 => {
815 // Address 0x12: TX drive + PA enable (HL2) or Alex filters (Hermes).
816 match self.device.hw_type {
817 HpsdrHw::HermesLite => {
818 // HL2: PA enable in C2, no Alex filters
819 (0x12, self.tx_drive, 0x08, 0, 0)
820 }
821 HpsdrHw::Hermes => {
822 // Hermes: Alex RX HPF in C3, Alex TX LPF in C4
823 let c3 = alex_rx_hpf_for_freq(self.rx_frequencies[0]);
824 let c4 = alex_tx_lpf_for_freq(self.tx_frequency);
825 (0x12, self.tx_drive, 0, c3, c4)
826 }
827 }
828 }
829 10 => {
830 // Address 0x14: RX attenuator
831 match self.device.hw_type {
832 HpsdrHw::HermesLite => {
833 // HL2: step attenuator in C4 (direct value, 0-31)
834 (0x14, 0, 0, 0, self.rx_attenuation & 0x1F)
835 }
836 HpsdrHw::Hermes => {
837 // Hermes: C4 bit 5 = 20 dB attenuator
838 let c4 = if self.rx_attenuation >= 20 {
839 0x20
840 } else {
841 0x00
842 };
843 (0x14, 0, 0, 0, c4)
844 }
845 }
846 }
847 11 => {
848 // Address 0x16: step attenuator values and CW keyer settings.
849 (0x16, 0, 0, 0, 0)
850 }
851 12 => {
852 // Address 0x1C: ADC assignments + TX attenuation
853 match self.device.hw_type {
854 HpsdrHw::HermesLite => {
855 // HL2: C3 = 0xC0 | rx_gain (bit 7=enable, bit 6=full-range, bits 5:0=value)
856 (0x1C, 0, 0, 0xC0 | (self.rx_attenuation & 0x3F), 0)
857 }
858 HpsdrHw::Hermes => (0x1C, 0, 0, 0, 0),
859 }
860 }
861 _ => (0x00, 0, 0, 0, 0),
862 }
863 }
864
865 /// Handle an incoming packet from the radio.
866 fn handle_radio_packet(&mut self, data: &[u8]) {
867 if data.len() < 8 {
868 return;
869 }
870
871 // Check magic bytes
872 if data[0] != 0xEF || data[1] != 0xFE {
873 return;
874 }
875
876 match data[2] {
877 0x02 | 0x03 => {
878 // Discovery response (ignore during streaming)
879 log::debug!("P1 Discovery response received while streaming");
880 }
881 0x01 => {
882 // Data packet from radio (endpoint 6)
883 if data.len() >= PACKET_SIZE {
884 let seq = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
885 if seq != self.rx_seq && self.rx_seq > 0 {
886 let lost = seq.wrapping_sub(self.rx_seq);
887 if lost > 1 && lost < 100 {
888 log::warn!(
889 "P1 Packet loss: expected seq {}, got {} ({} lost)",
890 self.rx_seq,
891 seq,
892 lost - 1
893 );
894 }
895 }
896 self.rx_seq = seq.wrapping_add(1);
897
898 // Parse both sub-frames
899 self.parse_radio_subframe(&data[8..520]);
900 self.parse_radio_subframe(&data[520..1032]);
901 }
902 }
903 other => {
904 log::warn!("P1 Unknown packet type 0x{:02X} from radio", other);
905 }
906 }
907 }
908
909 /// Parse a single radio-to-host sub-frame.
910 fn parse_radio_subframe(&mut self, sf: &[u8]) {
911 if sf.len() < SUBFRAME_SIZE || sf[0..3] != SYNC {
912 return;
913 }
914
915 let c0 = sf[3];
916 let c1 = sf[4];
917 let c2 = sf[5];
918 let c3 = sf[6];
919 let c4 = sf[7];
920
921 // Parse control response — accumulate across response address rotation.
922 // The HPSDR P1 status subframe carries data for ONE address slot at a
923 // time, rotating 0x00 → 0x08 → 0x10 → 0x18 → 0x00 → … Each slot
924 // updates DIFFERENT fields of `accumulated_status`:
925 //
926 // 0x00 → adc_overflow
927 // 0x08 → exciter_power, forward_power
928 // 0x10 → reverse_power, supply_voltage
929 // 0x18 → pa_current
930 //
931 // PTT is in c0 of every subframe, so we always update it. Detect
932 // cycle wrap (current addr ≤ previous) and emit the just-completed
933 // cycle's status BEFORE this subframe overwrites its fields. That
934 // way every emitted RadioStatus has fwd, rev, supply, etc. all
935 // sampled from the same rotation — SWR computed from a coherent
936 // pair, not the half-fresh / half-stale mix we used to ship.
937 let _ptt = (c0 & 0x01) != 0;
938 let addr = c0 & 0x7E; // mask out PTT bit and bit 7
939 self.accumulated_status.ptt = _ptt;
940
941 let wrapped = self
942 .last_status_addr
943 .map(|prev| addr <= prev)
944 .unwrap_or(false);
945 if wrapped {
946 // Cycle complete — emit BEFORE we mutate the new cycle's fields.
947 let _ = self.status_sender.try_send(self.accumulated_status.clone());
948 }
949 self.last_status_addr = Some(addr);
950
951 match addr {
952 0x00 => {
953 self.accumulated_status.adc_overflow = c1;
954 }
955 0x08 => {
956 self.accumulated_status.exciter_power = u16::from_be_bytes([c1, c2]);
957 self.accumulated_status.forward_power = u16::from_be_bytes([c3, c4]);
958 }
959 0x10 => {
960 self.accumulated_status.reverse_power = u16::from_be_bytes([c1, c2]);
961 self.accumulated_status.supply_voltage = u16::from_be_bytes([c3, c4]);
962 }
963 0x18 => {
964 self.accumulated_status.pa_current = u16::from_be_bytes([c1, c2]);
965 }
966 _ => {}
967 }
968
969 // Extract IQ samples
970 let nddc = self.nddc.max(1) as usize;
971 let spr = 504 / (6 * nddc + 2);
972 let block_size = 6 * nddc + 2;
973
974 // Resize pre-allocated DDC buffers if nddc changed
975 if self.ddc_sample_bufs.len() != nddc {
976 self.ddc_sample_bufs.clear();
977 for _ in 0..nddc {
978 self.ddc_sample_bufs.push(Vec::with_capacity(spr));
979 }
980 } else {
981 for buf in &mut self.ddc_sample_bufs {
982 buf.clear();
983 if buf.capacity() == 0 {
984 buf.reserve(spr);
985 }
986 }
987 }
988
989 // Deinterleave IQ for each DDC (reusing pre-allocated buffers)
990 for row in 0..spr {
991 let row_offset = 8 + row * block_size;
992 for (ddc, ddc_vec) in self.ddc_sample_bufs.iter_mut().enumerate().take(nddc) {
993 let sample_offset = row_offset + ddc * 6;
994 if sample_offset + 6 <= sf.len() {
995 let sample = unpack_iq_24bit(sf, sample_offset);
996 ddc_vec.push(sample);
997 }
998 }
999 }
1000
1001 // Send to channel receivers — use mem::take to avoid cloning
1002 for (ddc, samples) in self.ddc_sample_bufs.iter_mut().enumerate() {
1003 if let Some(sender) = self.iq_senders.get(&ddc) {
1004 if !samples.is_empty() {
1005 let _ = sender.try_send(std::mem::take(samples));
1006 }
1007 }
1008 }
1009 }
1010}
1011
1012/// Build a Protocol 1 discovery request packet (63 bytes).
1013fn build_p1_discovery_request() -> Vec<u8> {
1014 let mut req = vec![0u8; 63];
1015 req[0] = 0xEF;
1016 req[1] = 0xFE;
1017 req[2] = 0x02;
1018 req
1019}
1020
1021/// Parse a Protocol 1 discovery response.
1022fn parse_p1_discovery_response(data: &[u8], addr: SocketAddr) -> Option<DiscoveredDevice> {
1023 if data.len() < 20 {
1024 return None;
1025 }
1026
1027 // Check magic bytes
1028 if data[0] != 0xEF || data[1] != 0xFE {
1029 return None;
1030 }
1031
1032 // Status byte: 0x02 = normal, 0x03 = busy
1033 let status = data[2];
1034 if status != 0x02 && status != 0x03 {
1035 return None;
1036 }
1037
1038 let mut mac = [0u8; 6];
1039 mac.copy_from_slice(&data[3..9]);
1040
1041 let firmware_version = data[9];
1042 let device_code = data[10];
1043 let protocol_version = data[11];
1044 let num_rxs = if data.len() > 20 { data[20] } else { 1 };
1045
1046 // Only Protocol 1 (protocol_version == 0) is supported.
1047 if protocol_version != 0 {
1048 return None;
1049 }
1050 let hw_type = HpsdrHw::from_p1_code(device_code)?;
1051
1052 // Use the radio's actual address (port 1024)
1053 let radio_addr = SocketAddr::new(addr.ip(), PORT);
1054
1055 Some(DiscoveredDevice {
1056 addr: radio_addr,
1057 mac,
1058 hw_type,
1059 firmware_version,
1060 num_rxs,
1061 status,
1062 })
1063}