Skip to main content

radio_utils_protocol/
protocol1.rs

1use std::collections::HashMap;
2use std::net::{Ipv4Addr, SocketAddr};
3use std::time::Duration;
4
5use num_complex::Complex;
6use ringbuf::traits::{Consumer, Observer, Split};
7use ringbuf::{HeapCons, HeapProd, HeapRb};
8use tokio::sync::mpsc;
9
10use crate::discovery;
11use crate::transport::UdpTransport;
12use crate::types::*;
13
14const PORT: u16 = 1024;
15const PACKET_SIZE: usize = 1032;
16const SUBFRAME_SIZE: usize = 512;
17const SYNC: [u8; 3] = [0x7F, 0x7F, 0x7F];
18
19/// If no packet is received within this duration, the connection is considered lost.
20const RECV_TIMEOUT: Duration = Duration::from_secs(10);
21
22/// Commands that can be sent to the Protocol 1 client while it is running.
23#[derive(Debug)]
24pub enum P1Command {
25    SetRxFrequency { ddc: usize, freq_hz: u32 },
26    SetTxFrequency(u32),
27    SetSampleRate(u32),
28    SetNddc(u8),
29    SetTxDrive(u8),
30    SetPtt(bool),
31    SetCwMode(bool),
32    SetRxAttenuation(u8),
33    Stop,
34}
35
36/// Protocol 1 client for connecting to HPSDR hardware.
37pub struct Protocol1Client {
38    transport: UdpTransport,
39    device: DiscoveredDevice,
40    running: bool,
41    // Configuration
42    sample_rate: u32,
43    nddc: u8,
44    rx_frequencies: [u32; 12],
45    tx_frequency: u32,
46    tx_drive: u8,
47    ptt: bool,
48    rx_attenuation: u8,
49    // Sequence tracking
50    tx_seq: u32,
51    rx_seq: u32,
52    // Control command state: which address to send next
53    control_idx: u8,
54    // Channels
55    iq_senders: HashMap<usize, mpsc::Sender<Vec<Complex<f64>>>>,
56    tx_iq_consumer: Option<HeapCons<Complex<f64>>>,
57    status_sender: mpsc::Sender<RadioStatus>,
58    status_receiver: Option<mpsc::Receiver<RadioStatus>>,
59    // Command channel for runtime control
60    cmd_sender: mpsc::Sender<P1Command>,
61    cmd_receiver: Option<mpsc::Receiver<P1Command>>,
62    // CW mode flag: when true, mic L/R channels are zeroed (tone is in IQ only)
63    tx_cw_mode: bool,
64    // TX IQ buffer: accumulates DSP blocks, drains 63 per subframe
65    tx_iq_buffer: Vec<Complex<f64>>,
66    // Pre-allocated packet buffer for the streaming hot path
67    host_pkt_buf: Vec<u8>,
68    // Pre-allocated packet buffer for send_control
69    ctrl_pkt_buf: Vec<u8>,
70    // Pre-allocated per-DDC sample buffers for parse_radio_subframe
71    ddc_sample_bufs: Vec<Vec<Complex<f64>>>,
72    // Accumulated status across response address rotation. Each subframe
73    // carries the data for ONE response-address slot (0x00 / 0x08 / 0x10
74    // / 0x18 in HPSDR P1), so a single subframe never has both fwd and
75    // rev fresh. We accumulate values into `accumulated_status` across
76    // the rotation and emit only at the cycle boundary — see
77    // `last_status_addr` below.
78    accumulated_status: RadioStatus,
79    // Last response-address byte we saw, used to detect cycle wrap.
80    // When the rotation comes back round (current addr <= previous),
81    // the just-completed cycle has fully populated fwd / rev / supply /
82    // pa_current, so we emit ONE coherent status. Without this every
83    // subframe emitted its own status which mixed fresh fields with
84    // stale ones from prior cycles — broke SWR (computed from a
85    // mismatched fwd/rev pair) and made the rev-power readout look
86    // stuck near zero.
87    last_status_addr: Option<u8>,
88    // TX debug counters
89    tx_subframes_with_data: u64,
90    tx_subframes_empty: u64,
91    /// Voice-TX underrun guard: increments each time the adaptive pacing
92    /// asked us to send a packet but the IQ buffer didn't have a full
93    /// `samples_per_packet` worth of samples yet. Counts skipped *send*
94    /// attempts (one per guard-firing), not subframes — so it's
95    /// directly comparable to packets-per-second figures rather than
96    /// to `sf_data` / `sf_empty`. Should be ~0 if the wire-pack and
97    /// the DSP producer are well-matched; large values mean burst-mode
98    /// pacing is outrunning DSP and the guard is doing its job.
99    tx_underrun_skips: u64,
100    tx_debug_log_counter: u32,
101    // FIFO-estimate TX pacing (voice only): accumulates on each packet sent
102    // and drains at `sample_rate`. Lets us burst packets at PTT start to
103    // build a radio-side jitter cushion, then throttle back to wire rate.
104    tx_fifo_estimate: f64,
105    tx_last_send: Option<tokio::time::Instant>,
106    tx_fifo_log_counter: u32,
107    // Wire-pack clipping diagnostics (per-PTT-session counters).
108    tx_samples_total: u64,
109    tx_samples_clipped_re: u64,
110    tx_samples_clipped_im: u64,
111    tx_max_excess: f64,
112}
113
114impl Protocol1Client {
115    /// Discover all HPSDR devices on the network using Protocol 1.
116    pub async fn discover(timeout: Duration) -> Result<Vec<DiscoveredDevice>> {
117        discovery::discover_on_interfaces(
118            build_p1_discovery_request,
119            parse_p1_discovery_response,
120            PORT,
121            timeout,
122        )
123        .await
124    }
125
126    /// Send a discovery request to a specific address (unicast).
127    pub async fn discover_at(addr: Ipv4Addr, timeout: Duration) -> Result<Vec<DiscoveredDevice>> {
128        discovery::discover_at_addr(
129            build_p1_discovery_request,
130            parse_p1_discovery_response,
131            addr,
132            PORT,
133            timeout,
134        )
135        .await
136    }
137
138    /// Connect to a discovered Protocol 1 device.
139    pub async fn connect(device: &DiscoveredDevice) -> Result<Self> {
140        let transport = UdpTransport::bind_any().await?;
141        let (status_tx, status_rx) = mpsc::channel(64);
142        let (cmd_tx, cmd_rx) = mpsc::channel(64);
143
144        Ok(Self {
145            transport,
146            device: device.clone(),
147            running: false,
148            sample_rate: 48_000,
149            nddc: 1,
150            rx_frequencies: [7_074_000; 12],
151            tx_frequency: 7_074_000,
152            tx_drive: 128,
153            ptt: false,
154            rx_attenuation: 0,
155            tx_seq: 0,
156            rx_seq: 0,
157            control_idx: 0,
158            iq_senders: HashMap::new(),
159            tx_iq_consumer: None,
160            tx_cw_mode: false,
161            tx_iq_buffer: Vec::new(),
162            status_sender: status_tx,
163            status_receiver: Some(status_rx),
164            cmd_sender: cmd_tx,
165            cmd_receiver: Some(cmd_rx),
166            host_pkt_buf: vec![0u8; PACKET_SIZE],
167            ctrl_pkt_buf: vec![0u8; PACKET_SIZE],
168            ddc_sample_bufs: Vec::new(),
169            accumulated_status: RadioStatus::default(),
170            last_status_addr: None,
171            tx_subframes_with_data: 0,
172            tx_subframes_empty: 0,
173            tx_underrun_skips: 0,
174            tx_debug_log_counter: 0,
175            tx_fifo_estimate: 0.0,
176            tx_last_send: None,
177            tx_fifo_log_counter: 0,
178            tx_samples_total: 0,
179            tx_samples_clipped_re: 0,
180            tx_samples_clipped_im: 0,
181            tx_max_excess: 0.0,
182        })
183    }
184
185    /// Start streaming IQ data from the radio.
186    pub async fn start(&mut self) -> Result<()> {
187        // Send start command
188        let mut pkt = vec![0u8; 64];
189        pkt[0] = 0xEF;
190        pkt[1] = 0xFE;
191        pkt[2] = 0x04;
192        pkt[3] = 0x01; // start
193        self.transport.send_to(&pkt, self.device.addr).await?;
194        self.running = true;
195        log::debug!("P1 Start command sent to {}", self.device.addr);
196
197        // Send initial configuration
198        self.send_config().await?;
199        Ok(())
200    }
201
202    /// Stop streaming.
203    pub async fn stop(&mut self) -> Result<()> {
204        let mut pkt = vec![0u8; 64];
205        pkt[0] = 0xEF;
206        pkt[1] = 0xFE;
207        pkt[2] = 0x04;
208        pkt[3] = 0x00; // stop
209        self.transport.send_to(&pkt, self.device.addr).await?;
210        self.running = false;
211        log::debug!("P1 Stop command sent");
212        Ok(())
213    }
214
215    // -- Configuration commands -----------------------------------------------
216
217    pub fn set_rx_frequency(&mut self, ddc: usize, freq_hz: u32) {
218        if ddc < self.rx_frequencies.len() {
219            self.rx_frequencies[ddc] = freq_hz;
220        }
221    }
222
223    pub fn set_tx_frequency(&mut self, freq_hz: u32) {
224        self.tx_frequency = freq_hz;
225    }
226
227    pub fn set_sample_rate(&mut self, rate: u32) {
228        self.sample_rate = rate;
229    }
230
231    pub fn set_nddc(&mut self, count: u8) {
232        self.nddc = count.max(1);
233    }
234
235    pub fn tx_drive(&self) -> u8 {
236        self.tx_drive
237    }
238
239    pub fn set_tx_drive(&mut self, drive: u8) {
240        self.tx_drive = drive;
241    }
242
243    pub fn set_ptt(&mut self, on: bool) {
244        self.ptt = on;
245    }
246
247    pub fn set_rx_attenuation(&mut self, db: u8) {
248        self.rx_attenuation = db;
249    }
250
251    /// Get a sender for runtime commands. The run() loop polls this channel.
252    pub fn command_sender(&self) -> mpsc::Sender<P1Command> {
253        self.cmd_sender.clone()
254    }
255
256    /// Get a receiver for IQ data from a specific DDC.
257    pub fn rx_iq_stream(&mut self, ddc: usize) -> mpsc::Receiver<Vec<Complex<f64>>> {
258        if self.iq_senders.contains_key(&ddc) {
259            log::warn!(
260                "rx_iq_stream called again for DDC {ddc} — previous receiver will be orphaned"
261            );
262        }
263        let (tx, rx) = mpsc::channel(256);
264        self.iq_senders.insert(ddc, tx);
265        rx
266    }
267
268    /// Get a producer for TX IQ data (lock-free ring buffer, zero-copy).
269    pub fn tx_iq_sink(&mut self) -> HeapProd<Complex<f64>> {
270        if self.tx_iq_consumer.is_some() {
271            log::warn!("tx_iq_sink called again — previous consumer will be orphaned");
272        }
273        // 256 blocks * 1024 samples = 262144 — enough to absorb DSP jitter
274        let rb = HeapRb::<Complex<f64>>::new(262_144);
275        let (prod, cons) = rb.split();
276        self.tx_iq_consumer = Some(cons);
277        prod
278    }
279
280    /// Get a receiver for radio status updates.
281    ///
282    /// Returns `None` if already taken (can only be called once).
283    pub fn status_stream(&mut self) -> Option<mpsc::Receiver<RadioStatus>> {
284        self.status_receiver.take()
285    }
286
287    /// Run the main receive/transmit loop. This blocks until stop() is called
288    /// or the connection is lost.
289    pub async fn run(&mut self) -> Result<()> {
290        let mut recv_buf = vec![0u8; 2048];
291        let spr = self.samples_per_subframe();
292        let samples_per_packet = (spr * 2) as f64;
293        // Wire-rate interval: used for RX and CW TX (no cushion needed).
294        let wire_rate_interval =
295            Duration::from_secs_f64(samples_per_packet / self.sample_rate as f64);
296
297        let mut cmd_rx = self.cmd_receiver.take();
298        let mut last_recv = tokio::time::Instant::now();
299        let mut next_tx_send = tokio::time::Instant::now();
300
301        while self.running {
302            tokio::select! {
303                // Send control/TX data at an adaptive cadence (voice TX) or
304                // wire rate (RX / CW TX).
305                _ = tokio::time::sleep_until(next_tx_send) => {
306                    let now = tokio::time::Instant::now();
307
308                    // Voice-TX underrun guard: if we don't have a full packet's
309                    // worth of IQ ready (in either the local refill buffer or
310                    // the lock-free consumer), skip this send and retry in
311                    // 500 µs so the wire-pack never emits a subframe padded
312                    // with zeros.
313                    //
314                    // Why this matters: the adaptive pacing below can drop
315                    // `next_delay` to 0 µs to burst-prime the radio FIFO at
316                    // PTT-on. Once the burst started, it kept re-entering
317                    // burst mode every time `tx_fifo_estimate` (a fictional
318                    // model of the radio FIFO based on packet timing, not on
319                    // actual buffer fill) dipped below `low_water`. The DSP
320                    // produces in chunks (1024-sample blocks every ~21 ms)
321                    // while burst mode pulls 126 samples per packet at
322                    // ~3000 pkt/s, so the consumer regularly drained dry and
323                    // `fill_host_subframe` packed 63 zero samples instead of
324                    // bailing. Those zero subframes injected hard
325                    // discontinuities into the on-air signal at a 19 % duty
326                    // cycle (sf_empty / (sf_empty + sf_data) measured ~412 /
327                    // 2150 in repro logs), creating broadband splatter that
328                    // is invisible to the host's pre-wire spectrum probe but
329                    // very visible to the receiver. CW TX is exempt — its
330                    // modulator generates a continuous carrier and the pacing
331                    // is wire-rate locked, so the underrun pattern doesn't
332                    // exist there.
333                    let voice_tx = self.ptt && !self.tx_cw_mode;
334                    let pkt_iq_samples = samples_per_packet as usize;
335                    let voice_iq_avail = self.tx_iq_buffer.len()
336                        + self
337                            .tx_iq_consumer
338                            .as_ref()
339                            .map(|c| c.occupied_len())
340                            .unwrap_or(0);
341                    let voice_underrun = voice_tx && voice_iq_avail < pkt_iq_samples;
342
343                    let next_delay = if voice_underrun {
344                        // Skip this send entirely: a packet whose IQ section
345                        // is zero-padded would inject 63 silent samples in the
346                        // middle of a continuous transmission and create
347                        // broadband splatter. Re-arm in 500 µs and try again
348                        // when DSP has produced more samples.
349                        self.tx_underrun_skips = self.tx_underrun_skips.saturating_add(1);
350                        Duration::from_micros(500)
351                    } else {
352                        // Update FIFO estimate: the radio drains at sample_rate.
353                        if let Some(last) = self.tx_last_send {
354                            let elapsed = now.duration_since(last).as_secs_f64();
355                            self.tx_fifo_estimate -= elapsed * self.sample_rate as f64;
356                            if self.tx_fifo_estimate < 0.0 {
357                                self.tx_fifo_estimate = 0.0;
358                            }
359                        }
360                        self.tx_last_send = Some(now);
361
362                        self.send_host_data_packet().await?;
363                        self.tx_fifo_estimate += samples_per_packet;
364
365                        // Voice TX: burst at start to prime the radio-side
366                        // FIFO, then throttle to keep ~30 ms of cushion
367                        // against network jitter. CW and RX keep fixed
368                        // wire-rate pacing.
369                        if voice_tx {
370                            let sr = self.sample_rate as f64;
371                            let high_water = 0.030 * sr;
372                            let low_water = 0.006 * sr;
373                            if self.tx_fifo_estimate > high_water {
374                                Duration::from_micros(2000)
375                            } else if self.tx_fifo_estimate > low_water {
376                                Duration::from_micros(500)
377                            } else {
378                                Duration::ZERO
379                            }
380                        } else {
381                            wire_rate_interval
382                        }
383                    };
384                    // Anchor the next-send schedule to the previous deadline
385                    // when on-time, fall back to `now + next_delay` when the
386                    // schedule has slipped into the past.
387                    //
388                    // Why anchor: every iteration's tokio wake + UDP-send
389                    // latency (~1 ms on a busy localhost loop) baked into
390                    // `now + next_delay` made the host's TX rate settle
391                    // around 270 pkt/s instead of the 380 pkt/s wire rate.
392                    // The emulator's RX timer is anchored (it uses
393                    // `tokio::time::interval`), so a drifted host produced
394                    // ~30 % fewer samples per wall-clock second than the
395                    // listener's live-mode read consumed — those reads then
396                    // fell onto the LIVE_DELAY safety margin and emitted
397                    // 63-sample silent subframes, scrambling a CW tone over
398                    // several kHz of bandwidth.
399                    //
400                    // Why fall back when slipping: if we anchor unconditionally
401                    // and the schedule is e.g. 10 ms in the past, sleep_until
402                    // returns immediately for several iterations in a row,
403                    // bursting catch-up sends. Each catch-up send drains 63
404                    // samples from the host's TX ring; if the producer was
405                    // in the middle of a 2 ms gap when the burst hit, the
406                    // ring underflows and the host pads the subframe with
407                    // zeros (loop mode then captures internal zero runs that
408                    // break up the recorded tone). Falling back to
409                    // `now + next_delay` on slip preserves cadence anchoring
410                    // for the steady-state case (the user's reported bug)
411                    // while preventing the burst regression.
412                    let scheduled = next_tx_send + next_delay;
413                    next_tx_send = if scheduled >= now {
414                        scheduled
415                    } else {
416                        now + next_delay
417                    };
418
419                    if voice_tx {
420                        self.tx_fifo_log_counter += 1;
421                        if self.tx_fifo_log_counter.is_multiple_of(400) {
422                            log::debug!(
423                                "[P1 TX PACE] fifo_est={:.0} samp ({:.1} ms), next_sleep={:?}",
424                                self.tx_fifo_estimate,
425                                self.tx_fifo_estimate * 1000.0 / self.sample_rate as f64,
426                                next_delay
427                            );
428                        }
429                    }
430                }
431                // Receive data from radio
432                result = self.transport.recv_from(&mut recv_buf) => {
433                    match result {
434                        Ok((len, _addr)) => {
435                            last_recv = tokio::time::Instant::now();
436                            self.handle_radio_packet(&recv_buf[..len]);
437                        }
438                        Err(e) => {
439                            log::error!("P1 Recv error: {}", e);
440                        }
441                    }
442                }
443                // Process runtime commands
444                Some(cmd) = async {
445                    match &mut cmd_rx {
446                        Some(rx) => rx.recv().await,
447                        None => std::future::pending().await,
448                    }
449                } => {
450                    self.handle_command(cmd).await?;
451                }
452                // Timeout detection
453                _ = tokio::time::sleep_until(last_recv + RECV_TIMEOUT) => {
454                    log::error!("P1 No data received for {:?}, connection lost", RECV_TIMEOUT);
455                    return Err(ProtocolError::ConnectionLost);
456                }
457            }
458        }
459        Ok(())
460    }
461
462    async fn handle_command(&mut self, cmd: P1Command) -> Result<()> {
463        match cmd {
464            P1Command::SetRxFrequency { ddc, freq_hz } => {
465                self.set_rx_frequency(ddc, freq_hz);
466                log::debug!("P1 RX {} freq -> {} Hz", ddc, freq_hz);
467            }
468            P1Command::SetTxFrequency(freq) => {
469                self.set_tx_frequency(freq);
470                log::debug!("P1 TX freq -> {} Hz", freq);
471            }
472            P1Command::SetSampleRate(rate) => {
473                self.set_sample_rate(rate);
474                log::debug!("P1 Sample rate -> {}", rate);
475            }
476            P1Command::SetNddc(n) => {
477                self.set_nddc(n);
478                log::debug!("P1 NDDC -> {}", n);
479            }
480            P1Command::SetTxDrive(drive) => {
481                self.set_tx_drive(drive);
482                log::debug!("P1 TX drive -> {}", drive);
483            }
484            P1Command::SetPtt(on) => {
485                let was = self.ptt;
486                self.set_ptt(on);
487                log::debug!(
488                    "P1 PTT {} -> {} (drive={}, tx_iq_consumer={})",
489                    if was { "ON" } else { "OFF" },
490                    if on { "ON" } else { "OFF" },
491                    self.tx_drive,
492                    if self.tx_iq_consumer.is_some() {
493                        "connected"
494                    } else {
495                        "NONE"
496                    }
497                );
498                if on && !was {
499                    // New TX session — start with empty radio FIFO estimate
500                    // so the sender bursts until the cushion is built.
501                    self.tx_fifo_estimate = 0.0;
502                    self.tx_last_send = None;
503                    self.tx_fifo_log_counter = 0;
504                    self.tx_samples_total = 0;
505                    self.tx_samples_clipped_re = 0;
506                    self.tx_samples_clipped_im = 0;
507                    self.tx_max_excess = 0.0;
508                    self.tx_underrun_skips = 0;
509                }
510                if !on {
511                    log::debug!(
512                        "P1 PTT OFF stats: subframes_with_data={}, subframes_empty={}, underrun_skips={}, buf_remaining={}",
513                        self.tx_subframes_with_data,
514                        self.tx_subframes_empty,
515                        self.tx_underrun_skips,
516                        self.tx_iq_buffer.len()
517                    );
518                    let total = self.tx_samples_total.max(1);
519                    let pct_re = self.tx_samples_clipped_re as f64 * 100.0 / total as f64;
520                    let pct_im = self.tx_samples_clipped_im as f64 * 100.0 / total as f64;
521                    log::info!(
522                        "[P1 TX CLIP] session: total={} clipped_re={} ({:.3}%) clipped_im={} ({:.3}%) max_excess={:.4} (1.0 = wire-pack ceiling)",
523                        self.tx_samples_total,
524                        self.tx_samples_clipped_re,
525                        pct_re,
526                        self.tx_samples_clipped_im,
527                        pct_im,
528                        self.tx_max_excess,
529                    );
530                    self.tx_iq_buffer.clear();
531                    self.tx_subframes_with_data = 0;
532                    self.tx_subframes_empty = 0;
533                    self.tx_underrun_skips = 0;
534                    self.tx_debug_log_counter = 0;
535                }
536            }
537            P1Command::SetCwMode(cw) => {
538                self.tx_cw_mode = cw;
539                log::debug!("P1 TX CW mode -> {}", cw);
540            }
541            P1Command::SetRxAttenuation(db) => {
542                self.set_rx_attenuation(db);
543                log::debug!("P1 RX attenuation -> {} dB", db);
544            }
545            P1Command::Stop => {
546                self.stop().await?;
547            }
548        }
549        Ok(())
550    }
551
552    // -- Internal methods -----------------------------------------------------
553
554    fn samples_per_subframe(&self) -> usize {
555        // Protocol 1 supports 1-8 DDCs; clamp to prevent division yielding 0
556        // (which would cause a busy-loop timer at 0-duration interval).
557        let nddc = (self.nddc.max(1) as usize).min(8);
558        504 / (6 * nddc + 2)
559    }
560
561    /// Send initial configuration commands.
562    async fn send_config(&mut self) -> Result<()> {
563        // Send sample rate + nddc + duplex (address 0x00)
564        let rate_code = sample_rate_to_p1_code(self.sample_rate);
565        let nddc_bits = ((self.nddc.max(1) - 1) & 0x07) << 3;
566        let duplex = 0x04;
567        self.send_control(0x00, rate_code, 0, 0, nddc_bits | duplex)
568            .await?;
569
570        // Send TX frequency (address 0x02)
571        let tx_bytes = self.tx_frequency.to_be_bytes();
572        self.send_control(0x02, tx_bytes[0], tx_bytes[1], tx_bytes[2], tx_bytes[3])
573            .await?;
574
575        // Send RX frequencies
576        for i in 0..self.nddc.min(8) as usize {
577            let addr = 0x04 + (i as u8 * 2);
578            let freq_bytes = self.rx_frequencies[i].to_be_bytes();
579            self.send_control(
580                addr,
581                freq_bytes[0],
582                freq_bytes[1],
583                freq_bytes[2],
584                freq_bytes[3],
585            )
586            .await?;
587        }
588
589        // Send TX drive (address 0x12) + PA enable for HL2.
590        // Hermes leaves C2 zero; the rotating control packet then handles
591        // the Alex filter bits separately.
592        let pa_c2 = match self.device.hw_type {
593            HpsdrHw::HermesLite => 0x08,
594            HpsdrHw::Hermes => 0x00,
595        };
596        self.send_control(0x12, self.tx_drive, pa_c2, 0, 0).await?;
597
598        Ok(())
599    }
600
601    /// Send a single control command embedded in a host data packet.
602    async fn send_control(&mut self, c0: u8, c1: u8, c2: u8, c3: u8, c4: u8) -> Result<()> {
603        self.ctrl_pkt_buf.fill(0);
604        self.ctrl_pkt_buf[0] = 0xEF;
605        self.ctrl_pkt_buf[1] = 0xFE;
606        self.ctrl_pkt_buf[2] = 0x01; // data
607        self.ctrl_pkt_buf[3] = 0x02; // endpoint 2 (host to radio)
608        let seq = self.tx_seq;
609        self.tx_seq = self.tx_seq.wrapping_add(1);
610        self.ctrl_pkt_buf[4..8].copy_from_slice(&seq.to_be_bytes());
611
612        // Sub-frame A: sync + control
613        self.ctrl_pkt_buf[8..11].copy_from_slice(&SYNC);
614        let mox_bit = if self.ptt { 0x01 } else { 0x00 };
615        self.ctrl_pkt_buf[11] = c0 | mox_bit;
616        self.ctrl_pkt_buf[12] = c1;
617        self.ctrl_pkt_buf[13] = c2;
618        self.ctrl_pkt_buf[14] = c3;
619        self.ctrl_pkt_buf[15] = c4;
620        // Rest of sub-frame A: TX IQ silence
621
622        // Sub-frame B: sync + address 0x00 with current sample rate + NDC + duplex
623        self.ctrl_pkt_buf[520..523].copy_from_slice(&SYNC);
624        let nddc_bits_b = ((self.nddc.max(1) - 1) & 0x07) << 3;
625        self.ctrl_pkt_buf[523] = mox_bit; // C0 = 0x00 | mox
626        self.ctrl_pkt_buf[524] = sample_rate_to_p1_code(self.sample_rate); // C1
627                                                                           // C2, C3 = 0 (already zeroed)
628        self.ctrl_pkt_buf[527] = nddc_bits_b | 0x04; // C4: nddc + duplex
629
630        self.transport
631            .send_to(&self.ctrl_pkt_buf, self.device.addr)
632            .await?;
633        Ok(())
634    }
635
636    /// Build and send a host data packet with rotating control commands and TX IQ.
637    async fn send_host_data_packet(&mut self) -> Result<()> {
638        self.host_pkt_buf.fill(0);
639        self.host_pkt_buf[0] = 0xEF;
640        self.host_pkt_buf[1] = 0xFE;
641        self.host_pkt_buf[2] = 0x01;
642        self.host_pkt_buf[3] = 0x02; // endpoint 2
643
644        let seq = self.tx_seq;
645        self.tx_seq = self.tx_seq.wrapping_add(1);
646        self.host_pkt_buf[4..8].copy_from_slice(&seq.to_be_bytes());
647
648        // Fill two sub-frames with control + TX data
649        // Take buffer out to avoid borrow conflict
650        let mut pkt = std::mem::take(&mut self.host_pkt_buf);
651        self.fill_host_subframe(&mut pkt, 8);
652        self.fill_host_subframe(&mut pkt, 520);
653        self.host_pkt_buf = pkt;
654
655        self.transport
656            .send_to(&self.host_pkt_buf, self.device.addr)
657            .await?;
658        Ok(())
659    }
660
661    fn fill_host_subframe(&mut self, buf: &mut [u8], offset: usize) {
662        buf[offset..offset + 3].copy_from_slice(&SYNC);
663
664        // Rotate through control addresses
665        let mox_bit = if self.ptt { 0x01 } else { 0x00 };
666        let (c0, c1, c2, c3, c4) = self.next_control_command();
667        buf[offset + 3] = c0 | mox_bit;
668        buf[offset + 4] = c1;
669        buf[offset + 5] = c2;
670        buf[offset + 6] = c3;
671        buf[offset + 7] = c4;
672
673        // TX IQ data: 63 blocks of 8 bytes each [L(2B) R(2B) I(2B) Q(2B)]
674        // Pop from lock-free ring buffer consumer when PTT is on
675        if self.ptt {
676            // Refill buffer from ring buffer if we need more samples
677            if self.tx_iq_buffer.len() < 63 {
678                if let Some(ref mut cons) = self.tx_iq_consumer {
679                    let avail = cons.occupied_len();
680                    if avail > 0 {
681                        let old_len = self.tx_iq_buffer.len();
682                        self.tx_iq_buffer
683                            .resize(old_len + avail, Complex::new(0.0, 0.0));
684                        let popped = cons.pop_slice(&mut self.tx_iq_buffer[old_len..]);
685                        self.tx_iq_buffer.truncate(old_len + popped);
686                    }
687                }
688            }
689
690            let n = self.tx_iq_buffer.len().min(63);
691            let mut peak = 0.0f64;
692            if n > 0 {
693                for i in 0..n {
694                    let sample = self.tx_iq_buffer[i];
695                    let base = offset + 8 + i * 8;
696                    if base + 8 > buf.len() {
697                        break;
698                    }
699                    // L/R audio (mic): zero in CW mode (tone lives in IQ only),
700                    // otherwise send real part as 16-bit mono on both channels.
701                    if !self.tx_cw_mode {
702                        let mic_val = (sample.re.clamp(-1.0, 1.0) * 32767.0) as i16;
703                        buf[base..base + 2].copy_from_slice(&mic_val.to_be_bytes());
704                        buf[base + 2..base + 4].copy_from_slice(&mic_val.to_be_bytes());
705                    }
706                    // I/Q: 16-bit complex — Q is negated to match HPSDR wire convention.
707                    // The emulator uses non-negated Q internally but this has been
708                    // validated to work correctly with real HPSDR hardware.
709                    let i_val = (sample.re.clamp(-1.0, 1.0) * 32767.0) as i16;
710                    let q_val = ((-sample.im).clamp(-1.0, 1.0) * 32767.0) as i16;
711                    buf[base + 4..base + 6].copy_from_slice(&i_val.to_be_bytes());
712                    buf[base + 6..base + 8].copy_from_slice(&q_val.to_be_bytes());
713                    let re_abs = sample.re.abs();
714                    let im_abs = sample.im.abs();
715                    peak = peak.max(re_abs.max(im_abs));
716                    self.tx_samples_total += 1;
717                    if re_abs > 1.0 {
718                        self.tx_samples_clipped_re += 1;
719                    }
720                    if im_abs > 1.0 {
721                        self.tx_samples_clipped_im += 1;
722                    }
723                    if re_abs > self.tx_max_excess {
724                        self.tx_max_excess = re_abs;
725                    }
726                    if im_abs > self.tx_max_excess {
727                        self.tx_max_excess = im_abs;
728                    }
729                }
730                // Drain consumed samples
731                self.tx_iq_buffer.drain(..n);
732
733                if self.tx_subframes_with_data < 3 {
734                    log::debug!(
735                        "[P1 TX] Subframe packed: {} IQ samples, peak={:.4}, buffer_remaining={}",
736                        n,
737                        peak,
738                        self.tx_iq_buffer.len()
739                    );
740                }
741                self.tx_subframes_with_data += 1;
742            } else {
743                peak = 0.0;
744                self.tx_subframes_empty += 1;
745            }
746
747            // Periodic debug log — every ~50 subframes (~131ms) to track
748            // whether IQ gaps (peak=0) actually reach the wire during CW.
749            self.tx_debug_log_counter += 1;
750            if self.tx_debug_log_counter.is_multiple_of(50) {
751                let total = self.tx_samples_total.max(1);
752                let pct_clipped = (self.tx_samples_clipped_re + self.tx_samples_clipped_im) as f64
753                    * 100.0
754                    / (2 * total) as f64;
755                log::debug!(
756                    "[P1 TX WIRE] peak={:.6} n={} buf={} cw={} drive={} sf_data={} sf_empty={} skips={} clipped={:.3}% max_excess={:.4}",
757                    peak,
758                    n,
759                    self.tx_iq_buffer.len(),
760                    self.tx_cw_mode,
761                    self.tx_drive,
762                    self.tx_subframes_with_data,
763                    self.tx_subframes_empty,
764                    self.tx_underrun_skips,
765                    pct_clipped,
766                    self.tx_max_excess,
767                );
768            }
769        }
770    }
771
772    /// Get the next control command in the rotation.
773    fn next_control_command(&mut self) -> (u8, u8, u8, u8, u8) {
774        let idx = self.control_idx;
775        self.control_idx = (self.control_idx + 1) % 13;
776
777        match idx {
778            0 => {
779                // Address 0x00: sample rate + nddc + duplex + OC outputs
780                let rate_code = sample_rate_to_p1_code(self.sample_rate);
781                let nddc_bits = ((self.nddc.max(1) - 1) & 0x07) << 3;
782                let duplex = 0x04; // always duplex (required by HL2, harmless for Hermes)
783                                   // OC outputs for HL2 N2ADR filter board (C2[7:1])
784                let c2 = match self.device.hw_type {
785                    HpsdrHw::HermesLite => {
786                        let freq = if self.ptt {
787                            self.tx_frequency
788                        } else {
789                            self.rx_frequencies[0]
790                        };
791                        n2adr_oc_for_freq(freq) << 1
792                    }
793                    HpsdrHw::Hermes => 0,
794                };
795                (0x00, rate_code, c2, 0, nddc_bits | duplex)
796            }
797            1 => {
798                // Address 0x02: TX frequency
799                let b = self.tx_frequency.to_be_bytes();
800                (0x02, b[0], b[1], b[2], b[3])
801            }
802            2..=8 => {
803                // Addresses 0x04-0x10: RX frequencies
804                let ddc = (idx - 2) as usize;
805                let addr = 0x04 + (ddc as u8 * 2);
806                let freq = if ddc < self.nddc as usize {
807                    self.rx_frequencies[ddc]
808                } else {
809                    0
810                };
811                let b = freq.to_be_bytes();
812                (addr, b[0], b[1], b[2], b[3])
813            }
814            9 => {
815                // Address 0x12: TX drive + PA enable (HL2) or Alex filters (Hermes).
816                match self.device.hw_type {
817                    HpsdrHw::HermesLite => {
818                        // HL2: PA enable in C2, no Alex filters
819                        (0x12, self.tx_drive, 0x08, 0, 0)
820                    }
821                    HpsdrHw::Hermes => {
822                        // Hermes: Alex RX HPF in C3, Alex TX LPF in C4
823                        let c3 = alex_rx_hpf_for_freq(self.rx_frequencies[0]);
824                        let c4 = alex_tx_lpf_for_freq(self.tx_frequency);
825                        (0x12, self.tx_drive, 0, c3, c4)
826                    }
827                }
828            }
829            10 => {
830                // Address 0x14: RX attenuator
831                match self.device.hw_type {
832                    HpsdrHw::HermesLite => {
833                        // HL2: step attenuator in C4 (direct value, 0-31)
834                        (0x14, 0, 0, 0, self.rx_attenuation & 0x1F)
835                    }
836                    HpsdrHw::Hermes => {
837                        // Hermes: C4 bit 5 = 20 dB attenuator
838                        let c4 = if self.rx_attenuation >= 20 {
839                            0x20
840                        } else {
841                            0x00
842                        };
843                        (0x14, 0, 0, 0, c4)
844                    }
845                }
846            }
847            11 => {
848                // Address 0x16: step attenuator values and CW keyer settings.
849                (0x16, 0, 0, 0, 0)
850            }
851            12 => {
852                // Address 0x1C: ADC assignments + TX attenuation
853                match self.device.hw_type {
854                    HpsdrHw::HermesLite => {
855                        // HL2: C3 = 0xC0 | rx_gain (bit 7=enable, bit 6=full-range, bits 5:0=value)
856                        (0x1C, 0, 0, 0xC0 | (self.rx_attenuation & 0x3F), 0)
857                    }
858                    HpsdrHw::Hermes => (0x1C, 0, 0, 0, 0),
859                }
860            }
861            _ => (0x00, 0, 0, 0, 0),
862        }
863    }
864
865    /// Handle an incoming packet from the radio.
866    fn handle_radio_packet(&mut self, data: &[u8]) {
867        if data.len() < 8 {
868            return;
869        }
870
871        // Check magic bytes
872        if data[0] != 0xEF || data[1] != 0xFE {
873            return;
874        }
875
876        match data[2] {
877            0x02 | 0x03 => {
878                // Discovery response (ignore during streaming)
879                log::debug!("P1 Discovery response received while streaming");
880            }
881            0x01 => {
882                // Data packet from radio (endpoint 6)
883                if data.len() >= PACKET_SIZE {
884                    let seq = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
885                    if seq != self.rx_seq && self.rx_seq > 0 {
886                        let lost = seq.wrapping_sub(self.rx_seq);
887                        if lost > 1 && lost < 100 {
888                            log::warn!(
889                                "P1 Packet loss: expected seq {}, got {} ({} lost)",
890                                self.rx_seq,
891                                seq,
892                                lost - 1
893                            );
894                        }
895                    }
896                    self.rx_seq = seq.wrapping_add(1);
897
898                    // Parse both sub-frames
899                    self.parse_radio_subframe(&data[8..520]);
900                    self.parse_radio_subframe(&data[520..1032]);
901                }
902            }
903            other => {
904                log::warn!("P1 Unknown packet type 0x{:02X} from radio", other);
905            }
906        }
907    }
908
909    /// Parse a single radio-to-host sub-frame.
910    fn parse_radio_subframe(&mut self, sf: &[u8]) {
911        if sf.len() < SUBFRAME_SIZE || sf[0..3] != SYNC {
912            return;
913        }
914
915        let c0 = sf[3];
916        let c1 = sf[4];
917        let c2 = sf[5];
918        let c3 = sf[6];
919        let c4 = sf[7];
920
921        // Parse control response — accumulate across response address rotation.
922        // The HPSDR P1 status subframe carries data for ONE address slot at a
923        // time, rotating 0x00 → 0x08 → 0x10 → 0x18 → 0x00 → … Each slot
924        // updates DIFFERENT fields of `accumulated_status`:
925        //
926        //   0x00 → adc_overflow
927        //   0x08 → exciter_power, forward_power
928        //   0x10 → reverse_power, supply_voltage
929        //   0x18 → pa_current
930        //
931        // PTT is in c0 of every subframe, so we always update it. Detect
932        // cycle wrap (current addr ≤ previous) and emit the just-completed
933        // cycle's status BEFORE this subframe overwrites its fields. That
934        // way every emitted RadioStatus has fwd, rev, supply, etc. all
935        // sampled from the same rotation — SWR computed from a coherent
936        // pair, not the half-fresh / half-stale mix we used to ship.
937        let _ptt = (c0 & 0x01) != 0;
938        let addr = c0 & 0x7E; // mask out PTT bit and bit 7
939        self.accumulated_status.ptt = _ptt;
940
941        let wrapped = self
942            .last_status_addr
943            .map(|prev| addr <= prev)
944            .unwrap_or(false);
945        if wrapped {
946            // Cycle complete — emit BEFORE we mutate the new cycle's fields.
947            let _ = self.status_sender.try_send(self.accumulated_status.clone());
948        }
949        self.last_status_addr = Some(addr);
950
951        match addr {
952            0x00 => {
953                self.accumulated_status.adc_overflow = c1;
954            }
955            0x08 => {
956                self.accumulated_status.exciter_power = u16::from_be_bytes([c1, c2]);
957                self.accumulated_status.forward_power = u16::from_be_bytes([c3, c4]);
958            }
959            0x10 => {
960                self.accumulated_status.reverse_power = u16::from_be_bytes([c1, c2]);
961                self.accumulated_status.supply_voltage = u16::from_be_bytes([c3, c4]);
962            }
963            0x18 => {
964                self.accumulated_status.pa_current = u16::from_be_bytes([c1, c2]);
965            }
966            _ => {}
967        }
968
969        // Extract IQ samples
970        let nddc = self.nddc.max(1) as usize;
971        let spr = 504 / (6 * nddc + 2);
972        let block_size = 6 * nddc + 2;
973
974        // Resize pre-allocated DDC buffers if nddc changed
975        if self.ddc_sample_bufs.len() != nddc {
976            self.ddc_sample_bufs.clear();
977            for _ in 0..nddc {
978                self.ddc_sample_bufs.push(Vec::with_capacity(spr));
979            }
980        } else {
981            for buf in &mut self.ddc_sample_bufs {
982                buf.clear();
983                if buf.capacity() == 0 {
984                    buf.reserve(spr);
985                }
986            }
987        }
988
989        // Deinterleave IQ for each DDC (reusing pre-allocated buffers)
990        for row in 0..spr {
991            let row_offset = 8 + row * block_size;
992            for (ddc, ddc_vec) in self.ddc_sample_bufs.iter_mut().enumerate().take(nddc) {
993                let sample_offset = row_offset + ddc * 6;
994                if sample_offset + 6 <= sf.len() {
995                    let sample = unpack_iq_24bit(sf, sample_offset);
996                    ddc_vec.push(sample);
997                }
998            }
999        }
1000
1001        // Send to channel receivers — use mem::take to avoid cloning
1002        for (ddc, samples) in self.ddc_sample_bufs.iter_mut().enumerate() {
1003            if let Some(sender) = self.iq_senders.get(&ddc) {
1004                if !samples.is_empty() {
1005                    let _ = sender.try_send(std::mem::take(samples));
1006                }
1007            }
1008        }
1009    }
1010}
1011
1012/// Build a Protocol 1 discovery request packet (63 bytes).
1013fn build_p1_discovery_request() -> Vec<u8> {
1014    let mut req = vec![0u8; 63];
1015    req[0] = 0xEF;
1016    req[1] = 0xFE;
1017    req[2] = 0x02;
1018    req
1019}
1020
1021/// Parse a Protocol 1 discovery response.
1022fn parse_p1_discovery_response(data: &[u8], addr: SocketAddr) -> Option<DiscoveredDevice> {
1023    if data.len() < 20 {
1024        return None;
1025    }
1026
1027    // Check magic bytes
1028    if data[0] != 0xEF || data[1] != 0xFE {
1029        return None;
1030    }
1031
1032    // Status byte: 0x02 = normal, 0x03 = busy
1033    let status = data[2];
1034    if status != 0x02 && status != 0x03 {
1035        return None;
1036    }
1037
1038    let mut mac = [0u8; 6];
1039    mac.copy_from_slice(&data[3..9]);
1040
1041    let firmware_version = data[9];
1042    let device_code = data[10];
1043    let protocol_version = data[11];
1044    let num_rxs = if data.len() > 20 { data[20] } else { 1 };
1045
1046    // Only Protocol 1 (protocol_version == 0) is supported.
1047    if protocol_version != 0 {
1048        return None;
1049    }
1050    let hw_type = HpsdrHw::from_p1_code(device_code)?;
1051
1052    // Use the radio's actual address (port 1024)
1053    let radio_addr = SocketAddr::new(addr.ip(), PORT);
1054
1055    Some(DiscoveredDevice {
1056        addr: radio_addr,
1057        mac,
1058        hw_type,
1059        firmware_version,
1060        num_rxs,
1061        status,
1062    })
1063}