Skip to main content

twinleaf_tools/tools/
simulate.rs

1//! tio simulate
2//!
3//! Simulates a small Twinleaf device that publishes a noisy sine wave on stream 1.
4
5use crate::SimulateCli;
6use ratatui::crossterm::{
7    event::{self, Event, KeyCode, KeyEventKind, KeyModifiers},
8    terminal::{disable_raw_mode, enable_raw_mode},
9};
10use std::io::{self, Write};
11use std::net::{SocketAddr, UdpSocket};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13use twinleaf::device::RpcMetaFlags;
14use twinleaf::tio::proto::{self, meta};
15
16pub fn run_simulate(cli: SimulateCli) -> eyre::Result<()> {
17    let mut device = TestDevice::new(cli)?;
18    device.run()?;
19    Ok(())
20}
21
22// In raw mode, '\n' does not reliably return the cursor to column 0.
23macro_rules! terminal_println {
24    ($($arg:tt)*) => {
25        terminal_print_line(format_args!($($arg)*))
26    };
27}
28
29macro_rules! terminal_eprintln {
30    ($($arg:tt)*) => {
31        terminal_error_line(format_args!($($arg)*))
32    };
33}
34
35const SINE_STREAM_ID: u8 = 1;
36const STATUS_STREAM_ID: u8 = 2;
37const AUX_STREAM_ID: u8 = 3;
38const N_SEGMENTS: u8 = 16;
39const DEVICE_NAME: &str = "tio-test";
40// The simulator is a development/test device, so its build version is marked
41// `DEV`. Format: `{vendor} {name} {revision} ({serial}) [{date}/{build}]`.
42const DEVICE_DESC: &str = "Twinleaf tio-test R1 ((null)) [2026-06-08/000001-DEV]";
43const DEVICE_SERIAL: &str = "SIM0001";
44const DEVICE_FIRMWARE: &str = "twinleaf-rust-test";
45const SIGNAL_LEVEL: u8 = 234;
46const AUX_SAMPLE_RATE: u32 = 25;
47const AUX_WAVE_FREQUENCY: f64 = 0.25;
48const SAMPLE_DROP_INTERVAL_SECONDS: f64 = 60.0;
49const SAMPLE_DROP_JITTER_SECONDS: f64 = 30.0;
50const CLIENT_TIMEOUT: Duration = Duration::from_secs(2);
51const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(500);
52const LOG_MESSAGE_MIN_INTERVAL: Duration = Duration::from_millis(1500);
53const LOG_MESSAGE_JITTER: Duration = Duration::from_millis(4000);
54const CAPTURE_TRIGGER_DELAY: Duration = Duration::from_millis(500);
55const CAPTURE_DEFAULT_BLOCK_SIZE: u16 = 256;
56const CAPTURE_SAMPLE_COUNT_MIN: usize = 800;
57const CAPTURE_SAMPLE_COUNT_MAX: usize = 1200;
58const CAPTURE_SAMPLE_BYTES: usize = std::mem::size_of::<f32>();
59const CAPTURE_METADATA_VERSION: u8 = 1;
60const CAPTURE_METADATA_FIXED_LEN: u8 = 30;
61const CAPTURE_Y_CALIBRATION: f32 = 1.0;
62const CAPTURE_NAME: &str = "Test Signal";
63const CAPTURE_UNITS: &str = "V";
64const CAPTURE_X_NAME: &str = "Time";
65const CAPTURE_X_UNITS: &str = "s";
66const CAPTURE_STATUS_IDLE: u8 = 0;
67const CAPTURE_STATUS_CAPTURING: u8 = 1;
68const CAPTURE_STATUS_DONE: u8 = 2;
69const MAX_SAMPLE_NUMBER: u32 = 0x00ff_ffff;
70const STREAM_DATA_HEADER_BYTES: usize = 4;
71const SINE_SAMPLE_BYTES: usize = std::mem::size_of::<f64>() * 2;
72const STATUS_SAMPLE_BYTES: usize = 2;
73const AUX_SAMPLE_BYTES: usize = std::mem::size_of::<f64>() * 2;
74
75// RPC entry flags, ported from tl-chibi lib/core/tlrpc.h. The low bits encode
76// the method kind, the value type and the value size; the high byte holds the
77// access flags. The same flags word feeds both the legacy metadata replies
78// (rpc.info/rpc.listinfo) and the rpc.hash CRC, exactly like the firmware.
79const TL_RPC_METHOD_STD: u32 = 0x1;
80const TL_RPC_METHOD_ACTION: u32 = 0x2;
81const TL_RPC_METHOD_PROP: u32 = 0x3;
82
83const TL_RPC_TYPE_OFFSET: u32 = 4;
84const TL_RPC_TYPE_MASK: u32 = 0x7 << TL_RPC_TYPE_OFFSET;
85const TL_RPC_TYPE_ANY: u32 = 0x0 << TL_RPC_TYPE_OFFSET;
86const TL_RPC_TYPE_VOID: u32 = 0x1 << TL_RPC_TYPE_OFFSET;
87const TL_RPC_TYPE_UINT: u32 = 0x2 << TL_RPC_TYPE_OFFSET;
88#[allow(dead_code)]
89const TL_RPC_TYPE_INT: u32 = 0x3 << TL_RPC_TYPE_OFFSET;
90const TL_RPC_TYPE_FLOAT: u32 = 0x4 << TL_RPC_TYPE_OFFSET;
91const TL_RPC_TYPE_STRING: u32 = 0x5 << TL_RPC_TYPE_OFFSET;
92
93const TL_RPC_SIZE_OFFSET: u32 = 8;
94const TL_RPC_SIZE_MASK: u32 = 0x1FF << TL_RPC_SIZE_OFFSET;
95
96const TL_RPC_FLAGS_OFFSET: u32 = 24;
97const TL_RPC_PUBLIC_READ: u32 = 0x1 << TL_RPC_FLAGS_OFFSET;
98const TL_RPC_PUBLIC_WRITE: u32 = 0x2 << TL_RPC_FLAGS_OFFSET;
99const TL_RPC_PERSISTENT: u32 = 0x20 << TL_RPC_FLAGS_OFFSET;
100
101const TL_RPC_PUBLIC_RW: u32 = TL_RPC_PUBLIC_READ | TL_RPC_PUBLIC_WRITE;
102
103const fn tl_rpc_mk_uint(size: u32) -> u32 {
104    TL_RPC_TYPE_UINT | (size << TL_RPC_SIZE_OFFSET)
105}
106const fn tl_rpc_mk_float(size: u32) -> u32 {
107    TL_RPC_TYPE_FLOAT | (size << TL_RPC_SIZE_OFFSET)
108}
109
110#[derive(Clone, Copy)]
111struct SineParams {
112    amplitude: f64,
113    frequency: f64,
114    noise: f64,
115}
116
117/// One entry of the RPC table, mirroring tl-chibi's `struct tl_rpc_entry`
118/// fields that participate in introspection: name, flags, desc and signature.
119#[derive(Clone)]
120struct RpcSpec {
121    name: &'static str,
122    flags: u32,
123    desc: &'static str,
124    signature: &'static str,
125    /// Extra metadata bits the Rust client understands but the legacy
126    /// firmware encoding does not carry in `flags` (bool/capture markers).
127    extra_meta: u16,
128}
129
130impl RpcSpec {
131    const fn new(name: &'static str, flags: u32) -> Self {
132        Self {
133            name,
134            flags,
135            desc: "",
136            signature: "",
137            extra_meta: 0,
138        }
139    }
140
141    const fn with_extra_meta(name: &'static str, flags: u32, extra_meta: u16) -> Self {
142        Self {
143            name,
144            flags,
145            desc: "",
146            signature: "",
147            extra_meta,
148        }
149    }
150
151    fn rpc_type(&self) -> u32 {
152        self.flags & TL_RPC_TYPE_MASK
153    }
154
155    fn rpc_size(&self) -> u32 {
156        (self.flags & TL_RPC_SIZE_MASK) >> TL_RPC_SIZE_OFFSET
157    }
158
159    /// Convert flags to the u16 metadata reported by rpc.info/rpc.listinfo.
160    /// Port of tl-chibi's `legacy_rpc_metadata()` (public, non-privileged),
161    /// extended with the bool/capture bits used by the Rust tooling.
162    fn legacy_metadata(&self) -> u16 {
163        let rpc_type = self.rpc_type();
164        if rpc_type == TL_RPC_TYPE_ANY {
165            return self.extra_meta;
166        }
167        if rpc_type == TL_RPC_TYPE_VOID {
168            return 0x8000 | self.extra_meta;
169        }
170
171        let size = self.rpc_size();
172        if size > 0xF {
173            return self.extra_meta;
174        }
175
176        let mut meta: u16 = 0x8000 | ((size as u16) << 4);
177        if (TL_RPC_TYPE_UINT..=TL_RPC_TYPE_STRING).contains(&rpc_type) {
178            meta |= ((rpc_type - TL_RPC_TYPE_UINT) >> TL_RPC_TYPE_OFFSET) as u16;
179        }
180
181        if self.flags & TL_RPC_PUBLIC_READ != 0 {
182            meta |= RpcMetaFlags::READABLE.bits();
183        }
184        if self.flags & TL_RPC_PUBLIC_WRITE != 0 {
185            meta |= RpcMetaFlags::WRITABLE.bits();
186        }
187        if self.flags & TL_RPC_PERSISTENT != 0 {
188            meta |= RpcMetaFlags::PERSISTENT.bits();
189        }
190
191        meta | self.extra_meta
192    }
193}
194
195/// CRC-32 (ISO-HDLC: init 0xFFFFFFFF, reflected polynomial 0xEDB88320, final
196/// inversion), matching tl-chibi's `tl_crc32_*` in libtio.
197fn crc32_update(mut crc: u32, data: &[u8]) -> u32 {
198    for &byte in data {
199        crc ^= u32::from(byte);
200        for _ in 0..8 {
201            crc = if crc & 1 != 0 {
202                (crc >> 1) ^ 0xEDB8_8320
203            } else {
204                crc >> 1
205            };
206        }
207    }
208    crc
209}
210
211/// Hash of the RPC table, as computed by tl-chibi's `tl_rpc_finalize()` for
212/// the public table: CRC-32 over each entry's name, flags (4 bytes LE), desc
213/// and signature, in table order.
214fn rpc_table_hash(rpcs: &[RpcSpec]) -> u32 {
215    let mut crc = 0xFFFF_FFFF;
216    for spec in rpcs {
217        crc = crc32_update(crc, spec.name.as_bytes());
218        crc = crc32_update(crc, &spec.flags.to_le_bytes());
219        crc = crc32_update(crc, spec.desc.as_bytes());
220        crc = crc32_update(crc, spec.signature.as_bytes());
221    }
222    !crc
223}
224
225#[derive(Clone, Copy)]
226struct Client {
227    addr: SocketAddr,
228    last_rx: Instant,
229}
230
231#[derive(Clone, Copy)]
232struct CaptureInfo {
233    length: u32,
234    y_calibration: f32,
235    x_offset: f32,
236    x_stride: f32,
237}
238
239impl Default for CaptureInfo {
240    fn default() -> Self {
241        Self {
242            length: 0,
243            y_calibration: CAPTURE_Y_CALIBRATION,
244            x_offset: 0.0,
245            x_stride: 0.0,
246        }
247    }
248}
249
250struct CapturingCapture {
251    ready_at: Instant,
252    data: Vec<u8>,
253    info: CaptureInfo,
254}
255
256struct CaptureBuffer {
257    data: Vec<u8>,
258    block_size: u16,
259    capturing: Option<CapturingCapture>,
260    info: CaptureInfo,
261}
262
263impl CaptureBuffer {
264    fn new() -> Self {
265        Self {
266            data: Vec::new(),
267            block_size: CAPTURE_DEFAULT_BLOCK_SIZE,
268            capturing: None,
269            info: CaptureInfo::default(),
270        }
271    }
272
273    fn clear(&mut self) {
274        self.data.clear();
275        self.capturing = None;
276        self.block_size = CAPTURE_DEFAULT_BLOCK_SIZE;
277        self.info = CaptureInfo::default();
278    }
279
280    fn begin_capture(&mut self, data: Vec<u8>, info: CaptureInfo, ready_at: Instant) {
281        self.capturing = Some(CapturingCapture {
282            ready_at,
283            data,
284            info,
285        });
286    }
287
288    fn update(&mut self, now: Instant) {
289        let Some(capturing) = self.capturing.as_ref() else {
290            return;
291        };
292        if now < capturing.ready_at {
293            return;
294        }
295
296        let capturing = self.capturing.take().expect("capturing checked above");
297        self.data = capturing.data;
298        self.info = capturing.info;
299    }
300
301    fn locked(&self) -> bool {
302        self.capturing.is_some()
303    }
304
305    fn status(&self) -> u8 {
306        if self.capturing.is_some() {
307            CAPTURE_STATUS_CAPTURING
308        } else if self.data.is_empty() {
309            CAPTURE_STATUS_IDLE
310        } else {
311            CAPTURE_STATUS_DONE
312        }
313    }
314
315    fn info(&self) -> CaptureInfo {
316        self.capturing
317            .as_ref()
318            .map(|capturing| capturing.info)
319            .unwrap_or(self.info)
320    }
321
322    fn export_size(&self) -> usize {
323        self.capturing
324            .as_ref()
325            .map(|capturing| capturing.data.len())
326            .unwrap_or(self.data.len())
327    }
328
329    #[cfg(test)]
330    fn block_count(&self) -> u16 {
331        let size = self.export_size();
332        if size == 0 {
333            return 0;
334        }
335
336        let block_size = usize::from(self.block_size);
337        let blocks = size.div_ceil(block_size);
338        u16::try_from(blocks).unwrap_or(u16::MAX)
339    }
340
341    fn block(&self, index: u16) -> Option<&[u8]> {
342        let start = usize::from(index) * usize::from(self.block_size);
343        let end = (start + usize::from(self.block_size)).min(self.data.len());
344        if start >= end {
345            None
346        } else {
347            Some(&self.data[start..end])
348        }
349    }
350}
351
352struct RawModeGuard;
353
354impl RawModeGuard {
355    fn enable() -> io::Result<Self> {
356        enable_raw_mode()?;
357        Ok(Self)
358    }
359}
360
361impl Drop for RawModeGuard {
362    fn drop(&mut self) {
363        let _ = disable_raw_mode();
364    }
365}
366
367fn terminal_print_line(args: std::fmt::Arguments<'_>) {
368    let mut stdout = io::stdout().lock();
369    let _ = write!(stdout, "{args}\r\n");
370    let _ = stdout.flush();
371}
372
373fn terminal_error_line(args: std::fmt::Arguments<'_>) {
374    let mut stderr = io::stderr().lock();
375    let _ = write!(stderr, "{args}\r\n");
376    let _ = stderr.flush();
377}
378
379struct GaussianRng {
380    state: u64,
381    cached: Option<f64>,
382}
383
384impl GaussianRng {
385    fn new(seed: u64) -> Self {
386        Self {
387            state: seed,
388            cached: None,
389        }
390    }
391
392    fn next_u64(&mut self) -> u64 {
393        let mut x = self.state;
394        x ^= x >> 12;
395        x ^= x << 25;
396        x ^= x >> 27;
397        self.state = x;
398        x.wrapping_mul(0x2545_f491_4f6c_dd1d)
399    }
400
401    fn next_unit(&mut self) -> f64 {
402        let raw = self.next_u64() >> 11;
403        ((raw as f64) + 1.0) / ((1u64 << 53) as f64 + 1.0)
404    }
405
406    fn next_gaussian(&mut self) -> f64 {
407        if let Some(value) = self.cached.take() {
408            return value;
409        }
410
411        let u1 = self.next_unit();
412        let u2 = self.next_unit();
413        let radius = (-2.0 * u1.ln()).sqrt();
414        let phase = std::f64::consts::TAU * u2;
415        self.cached = Some(radius * phase.sin());
416        radius * phase.cos()
417    }
418}
419
420struct TestDevice {
421    socket: UdpSocket,
422    client: Option<Client>,
423    initial_params: SineParams,
424    params: SineParams,
425    initial_status: u8,
426    status: u8,
427    initial_enable: u8,
428    enable: u8,
429    // Mutable device description (dev.desc). A firmware upgrade rewrites it so
430    // a post-upgrade re-read shows a different build.
431    desc: String,
432    sample_rate: u32,
433    segment_seconds: u32,
434    segment_samples: u32,
435    max_samples_per_packet: u64,
436    aux_segment_samples: u32,
437    aux_max_samples_per_packet: u64,
438    session_id: u32,
439    started_at: Instant,
440    start_time: u32,
441    samples_generated: u64,
442    sample_number: u32,
443    segment_id: u8,
444    segment_start_time: u32,
445    pending_segment_update: bool,
446    next_drop_sample: u64,
447    aux_samples_generated: u64,
448    aux_sample_number: u32,
449    aux_segment_id: u8,
450    aux_segment_start_time: u32,
451    aux_pending_segment_update: bool,
452    next_aux_drop_sample: u64,
453    last_heartbeat: Instant,
454    next_log_message_at: Instant,
455    next_log_level: usize,
456    capture: CaptureBuffer,
457    rng: GaussianRng,
458    rpcs: Vec<RpcSpec>,
459    rpc_hash: u32,
460}
461
462impl TestDevice {
463    fn new(cli: SimulateCli) -> io::Result<Self> {
464        let socket = UdpSocket::bind(("0.0.0.0", cli.port))?;
465        socket.set_nonblocking(true)?;
466
467        let now = unix_duration();
468        let start_time = unix_time_secs(now);
469        let seed = now.as_nanos() as u64 ^ u64::from(cli.port).rotate_left(32);
470        let session_id = (seed as u32)
471            .wrapping_mul(1_664_525)
472            .wrapping_add(1_013_904_223);
473        let segment_samples = cli
474            .samplerate
475            .checked_mul(cli.segment_seconds)
476            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "segment too long"))?;
477        if segment_samples > MAX_SAMPLE_NUMBER {
478            return Err(io::Error::new(
479                io::ErrorKind::InvalidInput,
480                "segment contains too many samples for TIO sample numbering",
481            ));
482        }
483        let aux_segment_samples = AUX_SAMPLE_RATE
484            .checked_mul(cli.segment_seconds)
485            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "aux segment too long"))?;
486        if aux_segment_samples > MAX_SAMPLE_NUMBER {
487            return Err(io::Error::new(
488                io::ErrorKind::InvalidInput,
489                "aux segment contains too many samples for TIO sample numbering",
490            ));
491        }
492        let max_samples_per_packet = max_stream_samples_per_packet(SINE_SAMPLE_BYTES)
493            .min(max_stream_samples_per_packet(STATUS_SAMPLE_BYTES));
494        let aux_max_samples_per_packet = max_stream_samples_per_packet(AUX_SAMPLE_BYTES);
495        if max_samples_per_packet == 0 {
496            return Err(io::Error::new(
497                io::ErrorKind::InvalidInput,
498                "stream sample is too large for a TIO packet",
499            ));
500        }
501        if aux_max_samples_per_packet == 0 {
502            return Err(io::Error::new(
503                io::ErrorKind::InvalidInput,
504                "aux stream sample is too large for a TIO packet",
505            ));
506        }
507        let mut rng = GaussianRng::new(seed | 1);
508        let next_drop_sample = next_drop_sample_after(&mut rng, 0, cli.samplerate);
509        let next_aux_drop_sample = next_drop_sample_after(&mut rng, 0, AUX_SAMPLE_RATE);
510
511        let initial_params = SineParams {
512            amplitude: cli.amplitude,
513            frequency: cli.frequency,
514            noise: cli.noise,
515        };
516        let initial_status = 0;
517        let initial_enable = 1;
518
519        // RPC table. Like tl-chibi's tl_rpc_finalize(), the introspection
520        // methods come first, and the table order determines both the ids
521        // reported by rpc.list/rpc.listinfo and the rpc.hash CRC.
522        let rpcs = vec![
523            RpcSpec::new("rpc.name", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
524            RpcSpec::new("rpc.id", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
525            RpcSpec::new("rpc.info", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
526            RpcSpec::new("rpc.list", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
527            RpcSpec::new("rpc.listinfo", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
528            RpcSpec::new(
529                "rpc.hash",
530                TL_RPC_METHOD_PROP | tl_rpc_mk_uint(4) | TL_RPC_PUBLIC_READ,
531            ),
532            RpcSpec::new(
533                "dev.name",
534                TL_RPC_METHOD_PROP | TL_RPC_TYPE_STRING | TL_RPC_PUBLIC_READ,
535            ),
536            RpcSpec::new(
537                "dev.desc",
538                TL_RPC_METHOD_PROP | TL_RPC_TYPE_STRING | TL_RPC_PUBLIC_READ,
539            ),
540            RpcSpec::new(
541                "dev.stop",
542                TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
543            ),
544            RpcSpec::new(
545                "dev.firmware.upload",
546                TL_RPC_METHOD_STD | TL_RPC_PUBLIC_WRITE,
547            ),
548            RpcSpec::new(
549                "dev.firmware.upgrade",
550                TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
551            ),
552            RpcSpec::new("dev.metadata", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
553            RpcSpec::new(
554                "test.amplitude",
555                TL_RPC_METHOD_PROP | tl_rpc_mk_float(8) | TL_RPC_PUBLIC_RW,
556            ),
557            RpcSpec::new(
558                "test.frequency",
559                TL_RPC_METHOD_PROP | tl_rpc_mk_float(8) | TL_RPC_PUBLIC_RW,
560            ),
561            RpcSpec::new(
562                "test.noise",
563                TL_RPC_METHOD_PROP | tl_rpc_mk_float(8) | TL_RPC_PUBLIC_RW,
564            ),
565            RpcSpec::new(
566                "test.status",
567                TL_RPC_METHOD_PROP | tl_rpc_mk_uint(1) | TL_RPC_PUBLIC_RW,
568            ),
569            RpcSpec::with_extra_meta(
570                "test.enable",
571                TL_RPC_METHOD_PROP | tl_rpc_mk_uint(1) | TL_RPC_PUBLIC_RW,
572                RpcMetaFlags::BOOL.bits(),
573            ),
574            RpcSpec::new(
575                "test.go",
576                TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
577            ),
578            RpcSpec::with_extra_meta(
579                "test.capture",
580                TL_RPC_METHOD_STD | TL_RPC_PUBLIC_READ,
581                (RpcMetaFlags::READABLE | RpcMetaFlags::CAPTURE).bits(),
582            ),
583        ];
584        let rpc_hash = rpc_table_hash(&rpcs);
585
586        Ok(Self {
587            socket,
588            client: None,
589            initial_params,
590            params: initial_params,
591            initial_status,
592            status: initial_status,
593            initial_enable,
594            enable: initial_enable,
595            desc: DEVICE_DESC.to_string(),
596            sample_rate: cli.samplerate,
597            segment_seconds: cli.segment_seconds,
598            segment_samples,
599            max_samples_per_packet,
600            aux_segment_samples,
601            aux_max_samples_per_packet,
602            session_id,
603            started_at: Instant::now(),
604            start_time,
605            samples_generated: 0,
606            sample_number: 0,
607            segment_id: 0,
608            segment_start_time: start_time,
609            pending_segment_update: false,
610            next_drop_sample,
611            aux_samples_generated: 0,
612            aux_sample_number: 0,
613            aux_segment_id: 0,
614            aux_segment_start_time: start_time,
615            aux_pending_segment_update: false,
616            next_aux_drop_sample,
617            last_heartbeat: Instant::now(),
618            next_log_message_at: Instant::now() + next_log_delay(&mut rng),
619            next_log_level: 0,
620            capture: CaptureBuffer::new(),
621            rng,
622            rpcs,
623            rpc_hash,
624        })
625    }
626
627    fn run(&mut self) -> io::Result<()> {
628        let raw_mode = match RawModeGuard::enable() {
629            Ok(guard) => Some(guard),
630            Err(err) => {
631                terminal_eprintln!("keyboard shortcuts disabled: {err}");
632                None
633            }
634        };
635
636        terminal_println!(
637            "tio test listening on udp://0.0.0.0:{}",
638            self.socket.local_addr()?.port()
639        );
640        terminal_println!(
641            "  stream 1: 2 waveform channels, amplitude={} V frequency={} Hz noise={} V/sqrt(Hz) samplerate={} Hz segment={} s",
642            self.params.amplitude,
643            self.params.frequency,
644            self.params.noise,
645            self.sample_rate,
646            self.segment_seconds
647        );
648        terminal_println!(
649            "  stream 2: status={} signal_level={}",
650            self.status,
651            SIGNAL_LEVEL
652        );
653        terminal_println!(
654            "  stream 3: aux triangle/sawtooth at {} Hz sampled at {} Hz",
655            AUX_WAVE_FREQUENCY,
656            AUX_SAMPLE_RATE
657        );
658        terminal_println!(
659            "  randomly dropping one sample from each sample clock about once per minute"
660        );
661        terminal_println!(
662            "  capture buffer: test.capture(-1) trigger, test.capture(-2) status, \
663             test.capture(-3) metadata, {}-{} f32 samples, ~{:.1}s delay",
664            CAPTURE_SAMPLE_COUNT_MIN,
665            CAPTURE_SAMPLE_COUNT_MAX,
666            CAPTURE_TRIGGER_DELAY.as_secs_f64()
667        );
668        if raw_mode.is_some() {
669            terminal_println!("  press d to drop one sample now, r to reboot, Ctrl-C to quit");
670        }
671        terminal_println!(
672            "  connect with: tio proxy udp4://127.0.0.1:{}",
673            self.socket.local_addr()?.port()
674        );
675
676        loop {
677            if raw_mode.is_some() && !self.handle_keyboard()? {
678                terminal_println!("stopping tio test");
679                return Ok(());
680            }
681            self.receive_packets()?;
682            self.expire_client();
683            self.send_periodic_packets()?;
684            std::thread::sleep(Duration::from_millis(1));
685        }
686    }
687
688    fn handle_keyboard(&mut self) -> io::Result<bool> {
689        while event::poll(Duration::from_millis(0))? {
690            if let Event::Key(key) = event::read()? {
691                if key.kind != KeyEventKind::Press {
692                    continue;
693                }
694                match key.code {
695                    KeyCode::Char('d') => self.drop_samples_now()?,
696                    KeyCode::Char('r') => self.reboot()?,
697                    KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => {
698                        return Ok(false);
699                    }
700                    _ => {}
701                }
702            }
703        }
704        Ok(true)
705    }
706
707    fn receive_packets(&mut self) -> io::Result<()> {
708        let mut buf = [0u8; 1024];
709        loop {
710            match self.socket.recv_from(&mut buf) {
711                Ok((size, addr)) => {
712                    if !self.accept_packet_from(addr)? {
713                        continue;
714                    }
715                    match proto::Packet::deserialize(&buf[..size]) {
716                        Ok((packet, parsed_size)) if parsed_size == size => {
717                            self.handle_packet(packet, addr)?;
718                        }
719                        Ok(_) => {
720                            terminal_eprintln!(
721                                "Ignoring UDP datagram with trailing bytes from {addr}"
722                            );
723                        }
724                        Err(err) => {
725                            terminal_eprintln!("Ignoring malformed packet from {addr}: {err:?}");
726                        }
727                    }
728                }
729                Err(err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(()),
730                Err(err) => return Err(err),
731            }
732        }
733    }
734
735    fn accept_packet_from(&mut self, addr: SocketAddr) -> io::Result<bool> {
736        let now = Instant::now();
737        match self.client {
738            Some(mut client) if client.addr == addr => {
739                client.last_rx = now;
740                self.client = Some(client);
741                Ok(true)
742            }
743            Some(client) if now.duration_since(client.last_rx) < CLIENT_TIMEOUT => Ok(false),
744            _ => {
745                self.client = Some(Client { addr, last_rx: now });
746                self.reset_run();
747                terminal_println!("client connected: {addr}");
748                self.send_initial_packets(addr)?;
749                Ok(true)
750            }
751        }
752    }
753
754    fn expire_client(&mut self) {
755        if let Some(client) = self.client {
756            if Instant::now().duration_since(client.last_rx) > CLIENT_TIMEOUT {
757                terminal_println!("client disconnected: {}", client.addr);
758                self.client = None;
759            }
760        }
761    }
762
763    fn reset_run(&mut self) {
764        self.started_at = Instant::now();
765        self.start_time = unix_time_secs(unix_duration());
766        self.samples_generated = 0;
767        self.sample_number = 0;
768        self.segment_id = 0;
769        self.segment_start_time = self.start_time;
770        self.pending_segment_update = false;
771        let next_drop_sample = self.next_drop_sample_after(0, self.sample_rate);
772        self.next_drop_sample = next_drop_sample;
773        self.aux_samples_generated = 0;
774        self.aux_sample_number = 0;
775        self.aux_segment_id = 0;
776        self.aux_segment_start_time = self.start_time;
777        self.aux_pending_segment_update = false;
778        let next_aux_drop_sample = self.next_drop_sample_after(0, AUX_SAMPLE_RATE);
779        self.next_aux_drop_sample = next_aux_drop_sample;
780        self.last_heartbeat = Instant::now()
781            .checked_sub(HEARTBEAT_INTERVAL)
782            .unwrap_or_else(Instant::now);
783        self.next_log_message_at = Instant::now() + self.next_log_delay();
784        self.next_log_level = 0;
785        self.capture.clear();
786    }
787
788    fn reboot(&mut self) -> io::Result<()> {
789        self.session_id = self.next_session_id();
790        self.params = self.initial_params;
791        self.status = self.initial_status;
792        self.enable = self.initial_enable;
793        self.reset_run();
794        terminal_println!("rebooted test device; new session id {}", self.session_id);
795
796        if let Some(client) = self.client {
797            self.send_initial_packets(client.addr)?;
798        }
799        Ok(())
800    }
801
802    fn handle_packet(&mut self, packet: proto::Packet, addr: SocketAddr) -> io::Result<()> {
803        if let proto::Payload::RpcRequest(req) = packet.payload {
804            self.handle_rpc(req, packet.routing, addr)?;
805        }
806        Ok(())
807    }
808
809    fn handle_rpc(
810        &mut self,
811        req: proto::RpcRequestPayload,
812        routing: proto::DeviceRoute,
813        addr: SocketAddr,
814    ) -> io::Result<()> {
815        self.update_capture();
816
817        let method = match &req.method {
818            proto::RpcMethod::Name(name) => name.as_str(),
819            proto::RpcMethod::Id(_) => {
820                return self.send_rpc_error(req.id, proto::RpcErrorCode::NotFound, routing, addr)
821            }
822        };
823
824        let result = match method {
825            "dev.name" => self.rpc_read_string(req.id, DEVICE_NAME, &req.arg, routing, addr),
826            "dev.desc" => self.rpc_read_string(req.id, &self.desc.clone(), &req.arg, routing, addr),
827            "dev.stop" => self.send_rpc_reply(req.id, Vec::new(), routing, addr),
828            // Accept and acknowledge each firmware chunk (contents ignored).
829            "dev.firmware.upload" => self.send_rpc_reply(req.id, Vec::new(), routing, addr),
830            // Commit: simulate a reboot into a new build by rewriting dev.desc.
831            "dev.firmware.upgrade" => {
832                self.desc = "Twinleaf tio-test R1 ((null)) [2026-06-08/000002]".to_string();
833                self.send_rpc_reply(req.id, Vec::new(), routing, addr)
834            }
835            "rpc.hash" => self.rpc_read_u32(req.id, self.rpc_hash, &req.arg, routing, addr),
836            "rpc.name" => self.rpc_name(req.id, &req.arg, routing, addr),
837            "rpc.id" => self.rpc_id(req.id, &req.arg, routing, addr),
838            "rpc.info" => self.rpc_info(req.id, &req.arg, routing, addr),
839            "rpc.list" => self.rpc_list_and_info(req.id, &req.arg, false, routing, addr),
840            "rpc.listinfo" => self.rpc_list_and_info(req.id, &req.arg, true, routing, addr),
841            "dev.metadata" => self.rpc_metadata(req.id, &req.arg, routing, addr),
842            "test.amplitude" => {
843                let next = self.read_or_write_nonnegative_f64(
844                    req.id,
845                    &req.arg,
846                    self.params.amplitude,
847                    routing.clone(),
848                    addr,
849                )?;
850                self.params.amplitude = next;
851                Ok(())
852            }
853            "test.frequency" => {
854                let next = self.read_or_write_nonnegative_f64(
855                    req.id,
856                    &req.arg,
857                    self.params.frequency,
858                    routing.clone(),
859                    addr,
860                )?;
861                self.params.frequency = next;
862                Ok(())
863            }
864            "test.noise" => {
865                let next = self.read_or_write_nonnegative_f64(
866                    req.id,
867                    &req.arg,
868                    self.params.noise,
869                    routing.clone(),
870                    addr,
871                )?;
872                self.params.noise = next;
873                Ok(())
874            }
875            "test.status" => {
876                let next =
877                    self.read_or_write_u8(req.id, &req.arg, self.status, routing.clone(), addr)?;
878                self.status = next;
879                Ok(())
880            }
881            "test.enable" => {
882                let next =
883                    self.read_or_write_u8(req.id, &req.arg, self.enable, routing.clone(), addr)?;
884                self.enable = next;
885                Ok(())
886            }
887            "test.go" => self.rpc_action(req.id, &req.arg, routing, addr),
888            "test.capture" => self.rpc_capture(req.id, &req.arg, routing, addr),
889            _ => self.send_rpc_error(req.id, proto::RpcErrorCode::NotFound, routing, addr),
890        };
891
892        result
893    }
894
895    fn rpc_read_string(
896        &self,
897        id: u16,
898        value: &str,
899        arg: &[u8],
900        routing: proto::DeviceRoute,
901        addr: SocketAddr,
902    ) -> io::Result<()> {
903        if !arg.is_empty() {
904            return self.send_rpc_error(id, proto::RpcErrorCode::ReadOnly, routing, addr);
905        }
906        self.send_rpc_reply(id, value.as_bytes().to_vec(), routing, addr)
907    }
908
909    fn rpc_read_u32(
910        &self,
911        id: u16,
912        value: u32,
913        arg: &[u8],
914        routing: proto::DeviceRoute,
915        addr: SocketAddr,
916    ) -> io::Result<()> {
917        if !arg.is_empty() {
918            return self.send_rpc_error(id, proto::RpcErrorCode::ReadOnly, routing, addr);
919        }
920        self.send_rpc_reply(id, value.to_le_bytes().to_vec(), routing, addr)
921    }
922
923    /// `rpc.name`: index (u16) -> RPC name. Port of tl-chibi's `rpc_name()`.
924    fn rpc_name(
925        &self,
926        id: u16,
927        arg: &[u8],
928        routing: proto::DeviceRoute,
929        addr: SocketAddr,
930    ) -> io::Result<()> {
931        if arg.len() != 2 {
932            return self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr);
933        }
934        let index = u16::from_le_bytes([arg[0], arg[1]]) as usize;
935        let Some(spec) = self.rpcs.get(index) else {
936            return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
937        };
938        self.send_rpc_reply(id, spec.name.as_bytes().to_vec(), routing, addr)
939    }
940
941    /// `rpc.id`: RPC name -> index (u16). Port of tl-chibi's `rpc_id()`.
942    fn rpc_id(
943        &self,
944        id: u16,
945        arg: &[u8],
946        routing: proto::DeviceRoute,
947        addr: SocketAddr,
948    ) -> io::Result<()> {
949        if arg.is_empty() {
950            return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
951        }
952        let index = self
953            .rpcs
954            .iter()
955            .position(|spec| spec.name.as_bytes() == arg);
956        match index {
957            Some(index) => {
958                self.send_rpc_reply(id, (index as u16).to_le_bytes().to_vec(), routing, addr)
959            }
960            None => self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr),
961        }
962    }
963
964    /// `rpc.info`: RPC name -> metadata (u16). Port of tl-chibi's `rpc_info()`.
965    fn rpc_info(
966        &self,
967        id: u16,
968        arg: &[u8],
969        routing: proto::DeviceRoute,
970        addr: SocketAddr,
971    ) -> io::Result<()> {
972        if arg.is_empty() {
973            return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
974        }
975        let Some(spec) = self.rpcs.iter().find(|spec| spec.name.as_bytes() == arg) else {
976            return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
977        };
978        self.send_rpc_reply(
979            id,
980            spec.legacy_metadata().to_le_bytes().to_vec(),
981            routing,
982            addr,
983        )
984    }
985
986    /// `rpc.list` / `rpc.listinfo`: with no argument, the number of RPCs (u16);
987    /// with an index (u16), the RPC's name, prepended with its metadata (u16)
988    /// for `rpc.listinfo`. Port of tl-chibi's `rpc_list_and_info()`.
989    fn rpc_list_and_info(
990        &self,
991        id: u16,
992        arg: &[u8],
993        prepend_info: bool,
994        routing: proto::DeviceRoute,
995        addr: SocketAddr,
996    ) -> io::Result<()> {
997        if arg.is_empty() {
998            return self.send_rpc_reply(
999                id,
1000                (self.rpcs.len() as u16).to_le_bytes().to_vec(),
1001                routing,
1002                addr,
1003            );
1004        }
1005        if arg.len() != 2 {
1006            return self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr);
1007        }
1008
1009        let index = u16::from_le_bytes([arg[0], arg[1]]) as usize;
1010        let Some(spec) = self.rpcs.get(index) else {
1011            return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
1012        };
1013
1014        let mut reply = Vec::new();
1015        if prepend_info {
1016            reply.extend(spec.legacy_metadata().to_le_bytes());
1017        }
1018        reply.extend(spec.name.as_bytes());
1019        self.send_rpc_reply(id, reply, routing, addr)
1020    }
1021
1022    fn rpc_metadata(
1023        &self,
1024        id: u16,
1025        arg: &[u8],
1026        routing: proto::DeviceRoute,
1027        addr: SocketAddr,
1028    ) -> io::Result<()> {
1029        let reply = if arg.is_empty() {
1030            self.all_metadata_reply()?
1031        } else if arg.len() % 3 == 0 {
1032            let mut reply = Vec::new();
1033            for req in arg.chunks_exact(3) {
1034                if self
1035                    .append_metadata_record(&mut reply, req[0], req[1], req[2])
1036                    .is_err()
1037                {
1038                    return self.send_rpc_error(
1039                        id,
1040                        proto::RpcErrorCode::InvalidArgs,
1041                        routing,
1042                        addr,
1043                    );
1044                }
1045            }
1046            reply
1047        } else {
1048            return self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr);
1049        };
1050
1051        self.send_rpc_reply(id, reply, routing, addr)
1052    }
1053
1054    fn read_or_write_nonnegative_f64(
1055        &self,
1056        id: u16,
1057        arg: &[u8],
1058        current: f64,
1059        routing: proto::DeviceRoute,
1060        addr: SocketAddr,
1061    ) -> io::Result<f64> {
1062        let value = match arg.len() {
1063            0 => current,
1064            8 => f64::from_le_bytes(arg.try_into().unwrap()),
1065            _ => {
1066                self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr)?;
1067                return Ok(current);
1068            }
1069        };
1070
1071        if !value.is_finite() || value < 0.0 {
1072            self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr)?;
1073            return Ok(current);
1074        }
1075
1076        self.send_rpc_reply(id, value.to_le_bytes().to_vec(), routing, addr)?;
1077        Ok(value)
1078    }
1079
1080    fn read_or_write_u8(
1081        &self,
1082        id: u16,
1083        arg: &[u8],
1084        current: u8,
1085        routing: proto::DeviceRoute,
1086        addr: SocketAddr,
1087    ) -> io::Result<u8> {
1088        let value = match arg.len() {
1089            0 => current,
1090            1 => arg[0],
1091            _ => {
1092                self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr)?;
1093                return Ok(current);
1094            }
1095        };
1096
1097        self.send_rpc_reply(id, vec![value], routing, addr)?;
1098        Ok(value)
1099    }
1100
1101    fn rpc_action(
1102        &self,
1103        id: u16,
1104        arg: &[u8],
1105        routing: proto::DeviceRoute,
1106        addr: SocketAddr,
1107    ) -> io::Result<()> {
1108        if !arg.is_empty() {
1109            return self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr);
1110        }
1111        terminal_println!("test.go action invoked");
1112        self.send_rpc_reply(id, Vec::new(), routing, addr)
1113    }
1114
1115    fn rpc_capture(
1116        &mut self,
1117        id: u16,
1118        arg: &[u8],
1119        routing: proto::DeviceRoute,
1120        addr: SocketAddr,
1121    ) -> io::Result<()> {
1122        let selector = match arg.len() {
1123            0 => -2,
1124            2 => i16::from_le_bytes([arg[0], arg[1]]),
1125            _ => {
1126                self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr)?;
1127                return Ok(());
1128            }
1129        };
1130
1131        match selector {
1132            -1 => self.rpc_capture_trigger(id, routing, addr),
1133            -2 => self.send_rpc_reply(id, vec![self.capture.status()], routing, addr),
1134            -3 => self.send_rpc_reply(id, self.capture_metadata_reply(), routing, addr),
1135            index if index >= 0 => self.rpc_capture_block(id, index as u16, routing, addr),
1136            _ => self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr),
1137        }
1138    }
1139
1140    fn rpc_capture_trigger(
1141        &mut self,
1142        id: u16,
1143        routing: proto::DeviceRoute,
1144        addr: SocketAddr,
1145    ) -> io::Result<()> {
1146        if self.capture.locked() {
1147            return self.send_rpc_error(id, proto::RpcErrorCode::Busy, routing, addr);
1148        }
1149
1150        let (data, info) = self.generate_capture_data();
1151        self.capture
1152            .begin_capture(data, info, Instant::now() + CAPTURE_TRIGGER_DELAY);
1153        terminal_println!(
1154            "test.capture triggered ({} samples); data available in ~{:.1}s",
1155            info.length,
1156            CAPTURE_TRIGGER_DELAY.as_secs_f64()
1157        );
1158        self.send_rpc_reply(id, Vec::new(), routing, addr)
1159    }
1160
1161    fn rpc_capture_block(
1162        &mut self,
1163        id: u16,
1164        index: u16,
1165        routing: proto::DeviceRoute,
1166        addr: SocketAddr,
1167    ) -> io::Result<()> {
1168        if self.capture.locked() {
1169            return self.send_rpc_error(id, proto::RpcErrorCode::Busy, routing, addr);
1170        }
1171
1172        let Some(block) = self.capture.block(index) else {
1173            return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
1174        };
1175        self.send_rpc_reply(id, block.to_vec(), routing, addr)
1176    }
1177
1178    fn capture_metadata_reply(&self) -> Vec<u8> {
1179        let info = self.capture.info();
1180        let mut fixed = Vec::with_capacity(usize::from(CAPTURE_METADATA_FIXED_LEN));
1181        let mut varlen = Vec::new();
1182
1183        fixed.push(CAPTURE_METADATA_FIXED_LEN);
1184        fixed.push(CAPTURE_METADATA_VERSION);
1185        fixed.push(u8::from(proto::DataType::Float32));
1186        fixed.push(0);
1187        fixed.extend(
1188            u32::try_from(self.capture.export_size())
1189                .unwrap_or(u32::MAX)
1190                .to_le_bytes(),
1191        );
1192        fixed.extend(self.capture.block_size.to_le_bytes());
1193        fixed.extend(info.length.to_le_bytes());
1194        fixed.extend(info.y_calibration.to_le_bytes());
1195        fixed.extend(info.x_offset.to_le_bytes());
1196        fixed.extend(info.x_stride.to_le_bytes());
1197        fixed.push(append_capture_metadata_string(&mut varlen, CAPTURE_NAME));
1198        fixed.push(append_capture_metadata_string(&mut varlen, CAPTURE_UNITS));
1199        fixed.push(append_capture_metadata_string(&mut varlen, CAPTURE_X_NAME));
1200        fixed.push(append_capture_metadata_string(&mut varlen, CAPTURE_X_UNITS));
1201        debug_assert_eq!(fixed.len(), usize::from(CAPTURE_METADATA_FIXED_LEN));
1202        fixed.extend(varlen);
1203        fixed
1204    }
1205
1206    fn update_capture(&mut self) {
1207        let was_locked = self.capture.locked();
1208        self.capture.update(Instant::now());
1209        if was_locked && !self.capture.locked() {
1210            terminal_println!("test.capture done ({} bytes)", self.capture.export_size());
1211        }
1212    }
1213
1214    fn generate_capture_data(&mut self) -> (Vec<u8>, CaptureInfo) {
1215        let sample_count = next_capture_sample_count(&mut self.rng);
1216        let mut data = Vec::with_capacity(sample_count * CAPTURE_SAMPLE_BYTES);
1217
1218        let noise_sigma = self.params.noise * (f64::from(self.sample_rate) / 2.0).sqrt();
1219        let start_sample = self.samples_generated;
1220        for offset in 0..sample_count as u64 {
1221            let t = (start_sample + offset) as f64 / f64::from(self.sample_rate);
1222            let phase = std::f64::consts::TAU * self.params.frequency * t;
1223            let value =
1224                self.params.amplitude * phase.sin() + noise_sigma * self.rng.next_gaussian();
1225            data.extend((value as f32).to_le_bytes());
1226        }
1227
1228        let info = CaptureInfo {
1229            length: sample_count as u32,
1230            y_calibration: CAPTURE_Y_CALIBRATION,
1231            x_offset: start_sample as f32 / self.sample_rate as f32,
1232            x_stride: 1.0 / self.sample_rate as f32,
1233        };
1234
1235        (data, info)
1236    }
1237
1238    fn send_periodic_packets(&mut self) -> io::Result<()> {
1239        self.update_capture();
1240        let Some(client) = self.client else {
1241            return Ok(());
1242        };
1243
1244        if self.last_heartbeat.elapsed() >= HEARTBEAT_INTERVAL {
1245            self.send_packet(&self.heartbeat_packet(), client.addr)?;
1246            self.last_heartbeat = Instant::now();
1247        }
1248
1249        self.send_log_message_if_due(client.addr)?;
1250        self.send_due_samples(client.addr)?;
1251        self.send_due_aux_samples(client.addr)
1252    }
1253
1254    fn send_log_message_if_due(&mut self, addr: SocketAddr) -> io::Result<()> {
1255        if Instant::now() < self.next_log_message_at {
1256            return Ok(());
1257        }
1258
1259        let level = self.next_log_level();
1260        let lucky_number = (self.rng.next_u64() % 10_000) as u32;
1261        let message = self.random_log_message(level, lucky_number);
1262        self.send_packet(
1263            &proto::Packet {
1264                payload: proto::Payload::LogMessage(proto::LogMessagePayload {
1265                    data: lucky_number,
1266                    level,
1267                    message,
1268                }),
1269                routing: proto::DeviceRoute::root(),
1270                ttl: 0,
1271            },
1272            addr,
1273        )?;
1274        self.next_log_message_at = Instant::now() + self.next_log_delay();
1275        Ok(())
1276    }
1277
1278    fn send_due_samples(&mut self, addr: SocketAddr) -> io::Result<()> {
1279        for _ in 0..4 {
1280            let due = self.due_samples();
1281            if due == 0 {
1282                break;
1283            }
1284
1285            let first_sample_n = self.sample_number;
1286            let samples_until_drop = self.next_drop_sample.saturating_sub(self.samples_generated);
1287            if samples_until_drop == 0 {
1288                self.drop_sample();
1289                self.send_sample_segment_updates_if_needed(first_sample_n, addr)?;
1290                continue;
1291            }
1292
1293            let samples_left_in_segment = u64::from(self.segment_samples - self.sample_number);
1294            let batch_len = due
1295                .min(self.max_samples_per_packet)
1296                .min(samples_left_in_segment)
1297                .min(samples_until_drop);
1298            self.send_sample_batches(batch_len, addr)?;
1299            self.send_sample_segment_updates_if_needed(first_sample_n, addr)?;
1300        }
1301        Ok(())
1302    }
1303
1304    fn due_samples(&self) -> u64 {
1305        let elapsed = self.started_at.elapsed().as_secs_f64();
1306        let target = (elapsed * f64::from(self.sample_rate)).floor() as u64;
1307        target.saturating_sub(self.samples_generated)
1308    }
1309
1310    fn due_aux_samples(&self) -> u64 {
1311        let elapsed = self.started_at.elapsed().as_secs_f64();
1312        let target = (elapsed * f64::from(AUX_SAMPLE_RATE)).floor() as u64;
1313        target.saturating_sub(self.aux_samples_generated)
1314    }
1315
1316    fn send_sample_batches(&mut self, batch_len: u64, addr: SocketAddr) -> io::Result<()> {
1317        let first_sample_n = self.sample_number;
1318        let segment_id = self.segment_id;
1319        let mut waveform_data = Vec::with_capacity((batch_len as usize) * SINE_SAMPLE_BYTES);
1320        let mut status_data = Vec::with_capacity((batch_len as usize) * STATUS_SAMPLE_BYTES);
1321        let noise_sigma = self.params.noise * (f64::from(self.sample_rate) / 2.0).sqrt();
1322
1323        for offset in 0..batch_len {
1324            let t = (self.samples_generated + offset) as f64 / f64::from(self.sample_rate);
1325            let phase = std::f64::consts::TAU * self.params.frequency * t;
1326            let ch1 = self.params.amplitude * phase.sin() + noise_sigma * self.rng.next_gaussian();
1327            let ch2 = self.params.amplitude * phase.cos() + noise_sigma * self.rng.next_gaussian();
1328            waveform_data.extend(ch1.to_le_bytes());
1329            waveform_data.extend(ch2.to_le_bytes());
1330            status_data.push(self.status);
1331            status_data.push(SIGNAL_LEVEL);
1332        }
1333
1334        self.send_stream_packet(
1335            SINE_STREAM_ID,
1336            first_sample_n,
1337            segment_id,
1338            waveform_data,
1339            addr,
1340        )?;
1341        self.send_stream_packet(
1342            STATUS_STREAM_ID,
1343            first_sample_n,
1344            segment_id,
1345            status_data,
1346            addr,
1347        )?;
1348
1349        for _ in 0..batch_len {
1350            self.advance_sample();
1351        }
1352
1353        Ok(())
1354    }
1355
1356    fn send_due_aux_samples(&mut self, addr: SocketAddr) -> io::Result<()> {
1357        for _ in 0..4 {
1358            let due = self.due_aux_samples();
1359            if due == 0 {
1360                break;
1361            }
1362
1363            let first_sample_n = self.aux_sample_number;
1364            let samples_until_drop = self
1365                .next_aux_drop_sample
1366                .saturating_sub(self.aux_samples_generated);
1367            if samples_until_drop == 0 {
1368                self.drop_aux_sample();
1369                self.send_aux_segment_update_if_needed(first_sample_n, addr)?;
1370                continue;
1371            }
1372
1373            let samples_left_in_segment =
1374                u64::from(self.aux_segment_samples - self.aux_sample_number);
1375            let batch_len = due
1376                .min(self.aux_max_samples_per_packet)
1377                .min(samples_left_in_segment)
1378                .min(samples_until_drop);
1379            self.send_aux_sample_batch(batch_len, addr)?;
1380            self.send_aux_segment_update_if_needed(first_sample_n, addr)?;
1381        }
1382        Ok(())
1383    }
1384
1385    fn send_aux_sample_batch(&mut self, batch_len: u64, addr: SocketAddr) -> io::Result<()> {
1386        let first_sample_n = self.aux_sample_number;
1387        let segment_id = self.aux_segment_id;
1388        let mut data = Vec::with_capacity((batch_len as usize) * AUX_SAMPLE_BYTES);
1389
1390        for offset in 0..batch_len {
1391            let t = (self.aux_samples_generated + offset) as f64 / f64::from(AUX_SAMPLE_RATE);
1392            let phase = (AUX_WAVE_FREQUENCY * t).fract();
1393            let triangle = 1.0 - 4.0 * (phase - 0.5).abs();
1394            let sawtooth = 2.0 * phase - 1.0;
1395            data.extend(triangle.to_le_bytes());
1396            data.extend(sawtooth.to_le_bytes());
1397        }
1398
1399        self.send_stream_packet(AUX_STREAM_ID, first_sample_n, segment_id, data, addr)?;
1400
1401        for _ in 0..batch_len {
1402            self.advance_aux_sample();
1403        }
1404
1405        Ok(())
1406    }
1407
1408    fn send_stream_packet(
1409        &self,
1410        stream_id: u8,
1411        first_sample_n: u32,
1412        segment_id: u8,
1413        data: Vec<u8>,
1414        addr: SocketAddr,
1415    ) -> io::Result<()> {
1416        self.send_packet(
1417            &proto::Packet {
1418                payload: proto::Payload::StreamData(proto::StreamDataPayload {
1419                    stream_id,
1420                    first_sample_n,
1421                    segment_id,
1422                    data,
1423                }),
1424                routing: proto::DeviceRoute::root(),
1425                ttl: 0,
1426            },
1427            addr,
1428        )
1429    }
1430
1431    fn advance_sample(&mut self) {
1432        self.samples_generated = self.samples_generated.wrapping_add(1);
1433        self.sample_number += 1;
1434        if self.sample_number >= self.segment_samples {
1435            self.sample_number = 0;
1436            self.segment_id = (self.segment_id + 1) % N_SEGMENTS;
1437            self.segment_start_time = self.segment_start_time.saturating_add(self.segment_seconds);
1438            self.pending_segment_update = true;
1439        }
1440    }
1441
1442    fn advance_aux_sample(&mut self) {
1443        self.aux_samples_generated = self.aux_samples_generated.wrapping_add(1);
1444        self.aux_sample_number += 1;
1445        if self.aux_sample_number >= self.aux_segment_samples {
1446            self.aux_sample_number = 0;
1447            self.aux_segment_id = (self.aux_segment_id + 1) % N_SEGMENTS;
1448            self.aux_segment_start_time = self
1449                .aux_segment_start_time
1450                .saturating_add(self.segment_seconds);
1451            self.aux_pending_segment_update = true;
1452        }
1453    }
1454
1455    fn drop_sample(&mut self) {
1456        terminal_println!(
1457            "dropped sample {} from streams {}/{}",
1458            self.samples_generated,
1459            SINE_STREAM_ID,
1460            STATUS_STREAM_ID
1461        );
1462        self.advance_sample();
1463        self.next_drop_sample =
1464            self.next_drop_sample_after(self.samples_generated, self.sample_rate);
1465    }
1466
1467    fn drop_aux_sample(&mut self) {
1468        terminal_println!(
1469            "dropped sample {} from stream {}",
1470            self.aux_samples_generated,
1471            AUX_STREAM_ID
1472        );
1473        self.advance_aux_sample();
1474        self.next_aux_drop_sample =
1475            self.next_drop_sample_after(self.aux_samples_generated, AUX_SAMPLE_RATE);
1476    }
1477
1478    fn drop_samples_now(&mut self) -> io::Result<()> {
1479        let addr = self.client.map(|client| client.addr);
1480        let first_sample_n = self.sample_number;
1481        self.drop_sample();
1482        if let Some(addr) = addr {
1483            self.send_sample_segment_updates_if_needed(first_sample_n, addr)?;
1484        }
1485
1486        let first_aux_sample_n = self.aux_sample_number;
1487        self.drop_aux_sample();
1488        if let Some(addr) = addr {
1489            self.send_aux_segment_update_if_needed(first_aux_sample_n, addr)?;
1490        }
1491
1492        Ok(())
1493    }
1494
1495    fn next_drop_sample_after(&mut self, current_sample: u64, sample_rate: u32) -> u64 {
1496        next_drop_sample_after(&mut self.rng, current_sample, sample_rate)
1497    }
1498
1499    fn next_session_id(&mut self) -> u32 {
1500        let mut session_id = (self.rng.next_u64() as u32)
1501            .wrapping_mul(1_664_525)
1502            .wrapping_add(1_013_904_223);
1503        if session_id == self.session_id {
1504            session_id = session_id.wrapping_add(1);
1505        }
1506        session_id
1507    }
1508
1509    fn next_log_delay(&mut self) -> Duration {
1510        next_log_delay(&mut self.rng)
1511    }
1512
1513    fn next_log_level(&mut self) -> proto::LogLevel {
1514        let levels = [
1515            proto::LogLevel::Critical,
1516            proto::LogLevel::Error,
1517            proto::LogLevel::Warning,
1518            proto::LogLevel::Info,
1519            proto::LogLevel::Debug,
1520        ];
1521        let level = levels[self.next_log_level % levels.len()];
1522        self.next_log_level = self.next_log_level.wrapping_add(1);
1523        level
1524    }
1525
1526    fn random_log_message(&mut self, level: proto::LogLevel, lucky_number: u32) -> String {
1527        let templates = [
1528            "lucky number {lucky} nudged the simulated flux loop",
1529            "telemetry monitor reported lucky number {lucky}",
1530            "calibration check landed on lucky number {lucky}",
1531            "simulated event counter reached lucky number {lucky}",
1532            "operator marker recorded lucky number {lucky}",
1533            "background diagnostic index settled at lucky number {lucky}",
1534        ];
1535        let template = templates[(self.rng.next_u64() as usize) % templates.len()];
1536        format!(
1537            "{}: {}",
1538            log_level_name(level),
1539            template.replace("{lucky}", &lucky_number.to_string())
1540        )
1541    }
1542
1543    fn send_sample_segment_updates_if_needed(
1544        &mut self,
1545        first_sample_n: u32,
1546        addr: SocketAddr,
1547    ) -> io::Result<()> {
1548        if self.pending_segment_update && first_sample_n == 0 {
1549            for stream_id in [SINE_STREAM_ID, STATUS_STREAM_ID] {
1550                self.send_packet(&self.segment_metadata(stream_id).make_update(), addr)?;
1551            }
1552            self.pending_segment_update = false;
1553        }
1554        Ok(())
1555    }
1556
1557    fn send_aux_segment_update_if_needed(
1558        &mut self,
1559        first_sample_n: u32,
1560        addr: SocketAddr,
1561    ) -> io::Result<()> {
1562        if self.aux_pending_segment_update && first_sample_n == 0 {
1563            self.send_packet(&self.segment_metadata(AUX_STREAM_ID).make_update(), addr)?;
1564            self.aux_pending_segment_update = false;
1565        }
1566        Ok(())
1567    }
1568
1569    fn send_initial_packets(&self, addr: SocketAddr) -> io::Result<()> {
1570        self.send_packet(&self.settings_packet(), addr)?;
1571        self.send_packet(&self.heartbeat_packet(), addr)?;
1572        self.send_packet(&self.device_metadata().make_update(), addr)?;
1573        for stream_id in Self::stream_ids() {
1574            self.send_packet(
1575                &self
1576                    .stream_metadata(stream_id)
1577                    .expect("known stream")
1578                    .make_update(),
1579                addr,
1580            )?;
1581            self.send_packet(&self.segment_metadata(stream_id).make_update(), addr)?;
1582            for column_index in 0..Self::column_count(stream_id).expect("known stream") {
1583                self.send_packet(
1584                    &self
1585                        .column_metadata(stream_id, column_index)
1586                        .expect("known column")
1587                        .make_update(),
1588                    addr,
1589                )?;
1590            }
1591        }
1592        Ok(())
1593    }
1594
1595    fn send_rpc_reply(
1596        &self,
1597        id: u16,
1598        reply: Vec<u8>,
1599        routing: proto::DeviceRoute,
1600        addr: SocketAddr,
1601    ) -> io::Result<()> {
1602        self.send_packet(
1603            &proto::Packet {
1604                payload: proto::Payload::RpcReply(proto::RpcReplyPayload { id, reply }),
1605                routing,
1606                ttl: 0,
1607            },
1608            addr,
1609        )
1610    }
1611
1612    fn send_rpc_error(
1613        &self,
1614        id: u16,
1615        error: proto::RpcErrorCode,
1616        routing: proto::DeviceRoute,
1617        addr: SocketAddr,
1618    ) -> io::Result<()> {
1619        self.send_packet(
1620            &proto::Packet {
1621                payload: proto::Payload::RpcError(proto::RpcErrorPayload {
1622                    id,
1623                    error,
1624                    extra: Vec::new(),
1625                }),
1626                routing,
1627                ttl: 0,
1628            },
1629            addr,
1630        )
1631    }
1632
1633    fn send_packet(&self, packet: &proto::Packet, addr: SocketAddr) -> io::Result<()> {
1634        let raw = packet.serialize().map_err(|_| {
1635            io::Error::new(
1636                io::ErrorKind::InvalidData,
1637                format!("packet too large or invalid: {}", describe_packet(packet)),
1638            )
1639        })?;
1640        self.socket.send_to(&raw, addr)?;
1641        Ok(())
1642    }
1643
1644    fn all_metadata_reply(&self) -> io::Result<Vec<u8>> {
1645        let mut reply = Vec::new();
1646        self.append_metadata_record(&mut reply, u8::from(meta::MetadataType::Device), 0, 0)?;
1647        for stream_id in Self::stream_ids() {
1648            self.append_metadata_record(
1649                &mut reply,
1650                u8::from(meta::MetadataType::Stream),
1651                stream_id,
1652                0,
1653            )?;
1654            self.append_metadata_record(
1655                &mut reply,
1656                u8::from(meta::MetadataType::Segment),
1657                stream_id,
1658                self.segment_id,
1659            )?;
1660            for column_index in 0..Self::column_count(stream_id).expect("known stream") {
1661                self.append_metadata_record(
1662                    &mut reply,
1663                    u8::from(meta::MetadataType::Column),
1664                    stream_id,
1665                    column_index,
1666                )?;
1667            }
1668        }
1669        Ok(reply)
1670    }
1671
1672    fn append_metadata_record(
1673        &self,
1674        reply: &mut Vec<u8>,
1675        metadata_type: u8,
1676        stream_id: u8,
1677        index: u8,
1678    ) -> io::Result<()> {
1679        let (mtype, body) = match meta::MetadataType::from(metadata_type) {
1680            meta::MetadataType::Device => {
1681                let (fixed, varlen) = self
1682                    .device_metadata()
1683                    .serialize(&[], &[])
1684                    .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "device metadata"))?;
1685                (meta::MetadataType::Device, join_metadata(fixed, varlen))
1686            }
1687            meta::MetadataType::Stream => {
1688                let Some(stream) = self.stream_metadata(stream_id) else {
1689                    return Err(io::Error::new(
1690                        io::ErrorKind::InvalidInput,
1691                        "unknown stream metadata",
1692                    ));
1693                };
1694                let (fixed, varlen) = stream
1695                    .serialize(&[], &[])
1696                    .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "stream metadata"))?;
1697                (meta::MetadataType::Stream, join_metadata(fixed, varlen))
1698            }
1699            meta::MetadataType::Segment if Self::is_known_stream(stream_id) => {
1700                let mut segment = self.segment_metadata(stream_id);
1701                segment.segment_id = index;
1702                let current_segment_id = if stream_id == AUX_STREAM_ID {
1703                    self.aux_segment_id
1704                } else {
1705                    self.segment_id
1706                };
1707                let current_start_time = if stream_id == AUX_STREAM_ID {
1708                    self.aux_segment_start_time
1709                } else {
1710                    self.segment_start_time
1711                };
1712                let delta_segments =
1713                    u32::from((index + N_SEGMENTS - current_segment_id) % N_SEGMENTS);
1714                segment.start_time = current_start_time
1715                    .saturating_add(delta_segments.saturating_mul(self.segment_seconds));
1716                let (fixed, varlen) = segment
1717                    .serialize(&[], &[])
1718                    .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "segment metadata"))?;
1719                (meta::MetadataType::Segment, join_metadata(fixed, varlen))
1720            }
1721            meta::MetadataType::Column => {
1722                let Some(column) = self.column_metadata(stream_id, index) else {
1723                    return Err(io::Error::new(
1724                        io::ErrorKind::InvalidInput,
1725                        "unknown column metadata",
1726                    ));
1727                };
1728                let (fixed, varlen) = column
1729                    .serialize(&[], &[])
1730                    .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "column metadata"))?;
1731                (meta::MetadataType::Column, join_metadata(fixed, varlen))
1732            }
1733            _ => {
1734                return Err(io::Error::new(
1735                    io::ErrorKind::InvalidInput,
1736                    "unknown metadata request",
1737                ))
1738            }
1739        };
1740
1741        let len = u8::try_from(body.len())
1742            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "metadata too large"))?;
1743        reply.push(u8::from(mtype));
1744        reply.push(len);
1745        reply.extend(body);
1746        Ok(())
1747    }
1748
1749    fn device_metadata(&self) -> meta::DeviceMetadata {
1750        meta::DeviceMetadata {
1751            serial_number: DEVICE_SERIAL.to_string(),
1752            firmware_hash: DEVICE_FIRMWARE.to_string(),
1753            n_streams: 3,
1754            session_id: self.session_id,
1755            name: DEVICE_NAME.to_string(),
1756        }
1757    }
1758
1759    fn stream_metadata(&self, stream_id: u8) -> Option<meta::StreamMetadata> {
1760        let stream = match stream_id {
1761            SINE_STREAM_ID => meta::StreamMetadata {
1762                stream_id: SINE_STREAM_ID,
1763                name: "sine".to_string(),
1764                n_columns: 2,
1765                n_segments: N_SEGMENTS as usize,
1766                sample_size: SINE_SAMPLE_BYTES,
1767                buf_samples: self.sample_rate as usize,
1768            },
1769            STATUS_STREAM_ID => meta::StreamMetadata {
1770                stream_id: STATUS_STREAM_ID,
1771                name: "status".to_string(),
1772                n_columns: 2,
1773                n_segments: N_SEGMENTS as usize,
1774                sample_size: STATUS_SAMPLE_BYTES,
1775                buf_samples: self.sample_rate as usize,
1776            },
1777            AUX_STREAM_ID => meta::StreamMetadata {
1778                stream_id: AUX_STREAM_ID,
1779                name: "aux".to_string(),
1780                n_columns: 2,
1781                n_segments: N_SEGMENTS as usize,
1782                sample_size: AUX_SAMPLE_BYTES,
1783                buf_samples: AUX_SAMPLE_RATE as usize,
1784            },
1785            _ => return None,
1786        };
1787        Some(stream)
1788    }
1789
1790    fn segment_metadata(&self, stream_id: u8) -> meta::SegmentMetadata {
1791        let (segment_id, start_time, sampling_rate) = match stream_id {
1792            AUX_STREAM_ID => (
1793                self.aux_segment_id,
1794                self.aux_segment_start_time,
1795                AUX_SAMPLE_RATE,
1796            ),
1797            _ => (self.segment_id, self.segment_start_time, self.sample_rate),
1798        };
1799
1800        meta::SegmentMetadata {
1801            stream_id,
1802            segment_id,
1803            flags: 0x01 | 0x02,
1804            time_ref_epoch: meta::MetadataEpoch::Unix,
1805            time_ref_serial: DEVICE_SERIAL.to_string(),
1806            time_ref_session_id: self.session_id,
1807            start_time,
1808            sampling_rate,
1809            decimation: 1,
1810            filter_cutoff: sampling_rate as f32 / 2.0,
1811            filter_type: meta::MetadataFilter::Unfiltered,
1812        }
1813    }
1814
1815    fn column_metadata(&self, stream_id: u8, index: u8) -> Option<meta::ColumnMetadata> {
1816        let column = match (stream_id, index) {
1817            (SINE_STREAM_ID, 0) => meta::ColumnMetadata {
1818                stream_id,
1819                index: index.into(),
1820                data_type: proto::DataType::Float64,
1821                name: "sine".to_string(),
1822                units: "V".to_string(),
1823                description: "Noisy sine wave".to_string(),
1824            },
1825            (SINE_STREAM_ID, 1) => meta::ColumnMetadata {
1826                stream_id,
1827                index: index.into(),
1828                data_type: proto::DataType::Float64,
1829                name: "cosine".to_string(),
1830                units: "V".to_string(),
1831                description: "Noisy quadrature wave".to_string(),
1832            },
1833            (STATUS_STREAM_ID, 0) => meta::ColumnMetadata {
1834                stream_id,
1835                index: index.into(),
1836                data_type: proto::DataType::UInt8,
1837                name: "status".to_string(),
1838                units: "".to_string(),
1839                description: "Mirrors the test.status RPC".to_string(),
1840            },
1841            (STATUS_STREAM_ID, 1) => meta::ColumnMetadata {
1842                stream_id,
1843                index: index.into(),
1844                data_type: proto::DataType::UInt8,
1845                name: "signal_level".to_string(),
1846                units: "".to_string(),
1847                description: "Fixed simulated signal level".to_string(),
1848            },
1849            (AUX_STREAM_ID, 0) => meta::ColumnMetadata {
1850                stream_id,
1851                index: index.into(),
1852                data_type: proto::DataType::Float64,
1853                name: "triangle".to_string(),
1854                units: "arb".to_string(),
1855                description: "Triangle wave".to_string(),
1856            },
1857            (AUX_STREAM_ID, 1) => meta::ColumnMetadata {
1858                stream_id,
1859                index: index.into(),
1860                data_type: proto::DataType::Float64,
1861                name: "sawtooth".to_string(),
1862                units: "arb".to_string(),
1863                description: "Sawtooth wave".to_string(),
1864            },
1865            _ => return None,
1866        };
1867        Some(column)
1868    }
1869
1870    fn is_known_stream(stream_id: u8) -> bool {
1871        matches!(stream_id, SINE_STREAM_ID | STATUS_STREAM_ID | AUX_STREAM_ID)
1872    }
1873
1874    fn column_count(stream_id: u8) -> Option<u8> {
1875        match stream_id {
1876            SINE_STREAM_ID | STATUS_STREAM_ID | AUX_STREAM_ID => Some(2),
1877            _ => None,
1878        }
1879    }
1880
1881    fn stream_ids() -> [u8; 3] {
1882        [SINE_STREAM_ID, STATUS_STREAM_ID, AUX_STREAM_ID]
1883    }
1884
1885    fn heartbeat_packet(&self) -> proto::Packet {
1886        proto::Packet {
1887            payload: proto::Payload::Heartbeat(proto::HeartbeatPayload::Session(self.session_id)),
1888            routing: proto::DeviceRoute::root(),
1889            ttl: 0,
1890        }
1891    }
1892
1893    fn settings_packet(&self) -> proto::Packet {
1894        proto::Packet {
1895            payload: proto::Payload::Settings(proto::SettingsPayload::RpcHash(self.rpc_hash)),
1896            routing: proto::DeviceRoute::root(),
1897            ttl: 0,
1898        }
1899    }
1900}
1901
1902fn join_metadata(mut fixed: Vec<u8>, varlen: Vec<u8>) -> Vec<u8> {
1903    fixed.extend(varlen);
1904    fixed
1905}
1906
1907fn stream_data_max_data_bytes() -> usize {
1908    proto::TIO_PACKET_MAX_TOTAL_SIZE
1909        .saturating_sub(proto::TIO_PACKET_HEADER_SIZE)
1910        .saturating_sub(proto::TIO_PACKET_MAX_ROUTING_SIZE)
1911        .saturating_sub(STREAM_DATA_HEADER_BYTES)
1912}
1913
1914fn max_stream_samples_per_packet(sample_bytes: usize) -> u64 {
1915    if sample_bytes == 0 {
1916        return 0;
1917    }
1918    (stream_data_max_data_bytes() / sample_bytes) as u64
1919}
1920
1921#[cfg(test)]
1922fn rpc_reply_max_reply_bytes() -> usize {
1923    proto::TIO_PACKET_MAX_TOTAL_SIZE
1924        .saturating_sub(proto::TIO_PACKET_HEADER_SIZE)
1925        .saturating_sub(proto::TIO_PACKET_MAX_ROUTING_SIZE)
1926        .saturating_sub(2)
1927}
1928
1929fn append_capture_metadata_string(varlen: &mut Vec<u8>, value: &str) -> u8 {
1930    let bytes = value.as_bytes();
1931    let len = bytes.len().min(usize::from(u8::MAX));
1932    varlen.extend(&bytes[..len]);
1933    len as u8
1934}
1935
1936fn next_drop_sample_after(rng: &mut GaussianRng, current_sample: u64, sample_rate: u32) -> u64 {
1937    let min_seconds = SAMPLE_DROP_INTERVAL_SECONDS - SAMPLE_DROP_JITTER_SECONDS;
1938    let seconds = min_seconds + rng.next_unit() * SAMPLE_DROP_JITTER_SECONDS * 2.0;
1939    let interval = (seconds * f64::from(sample_rate)).round().max(1.0) as u64;
1940    current_sample.saturating_add(interval)
1941}
1942
1943fn next_log_delay(rng: &mut GaussianRng) -> Duration {
1944    let jitter = LOG_MESSAGE_JITTER.mul_f64(rng.next_unit());
1945    LOG_MESSAGE_MIN_INTERVAL + jitter
1946}
1947
1948fn next_capture_sample_count(rng: &mut GaussianRng) -> usize {
1949    let span = CAPTURE_SAMPLE_COUNT_MAX - CAPTURE_SAMPLE_COUNT_MIN + 1;
1950    CAPTURE_SAMPLE_COUNT_MIN + (rng.next_u64() as usize % span)
1951}
1952
1953fn log_level_name(level: proto::LogLevel) -> &'static str {
1954    match level {
1955        proto::LogLevel::Critical => "critical",
1956        proto::LogLevel::Error => "error",
1957        proto::LogLevel::Warning => "warning",
1958        proto::LogLevel::Info => "info",
1959        proto::LogLevel::Debug => "debug",
1960        proto::LogLevel::Unknown(_) => "unknown",
1961    }
1962}
1963
1964fn describe_packet(packet: &proto::Packet) -> String {
1965    match &packet.payload {
1966        proto::Payload::StreamData(data) => format!(
1967            "stream data stream_id={} segment_id={} first_sample_n={} data_bytes={} max_data_bytes={}",
1968            data.stream_id,
1969            data.segment_id,
1970            data.first_sample_n,
1971            data.data.len(),
1972            stream_data_max_data_bytes()
1973        ),
1974        proto::Payload::Metadata(metadata) => format!("metadata {:?}", metadata.content),
1975        proto::Payload::RpcReply(reply) => {
1976            format!("rpc reply id={} reply_bytes={}", reply.id, reply.reply.len())
1977        }
1978        proto::Payload::RpcError(error) => {
1979            format!("rpc error id={} error={:?}", error.id, error.error)
1980        }
1981        proto::Payload::RpcRequest(request) => format!(
1982            "rpc request id={} method={:?} arg_bytes={}",
1983            request.id,
1984            request.method,
1985            request.arg.len()
1986        ),
1987        other => format!("{other:?}"),
1988    }
1989}
1990
1991fn unix_duration() -> Duration {
1992    SystemTime::now()
1993        .duration_since(UNIX_EPOCH)
1994        .unwrap_or_else(|_| Duration::from_secs(0))
1995}
1996
1997fn unix_time_secs(now: Duration) -> u32 {
1998    u32::try_from(now.as_secs()).unwrap_or(u32::MAX)
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003    use super::*;
2004    use clap::Parser;
2005
2006    #[test]
2007    fn crc32_matches_standard_check_value() {
2008        // CRC-32/ISO-HDLC check value for "123456789", the same algorithm as
2009        // tl-chibi's tl_crc32().
2010        let crc = !crc32_update(0xFFFF_FFFF, b"123456789");
2011        assert_eq!(crc, 0xCBF4_3926);
2012    }
2013
2014    #[test]
2015    fn legacy_metadata_matches_firmware_encoding() {
2016        // f64 read/write property: 0x8000 | size 8<<4 | type float(2) | RW.
2017        let spec = RpcSpec::new(
2018            "test.amplitude",
2019            TL_RPC_METHOD_PROP | tl_rpc_mk_float(8) | TL_RPC_PUBLIC_RW,
2020        );
2021        assert_eq!(
2022            spec.legacy_metadata(),
2023            0x8000 | (8 << 4) | 2 | 0x0100 | 0x0200
2024        );
2025
2026        // u32 read-only property (rpc.hash).
2027        let spec = RpcSpec::new(
2028            "rpc.hash",
2029            TL_RPC_METHOD_PROP | tl_rpc_mk_uint(4) | TL_RPC_PUBLIC_READ,
2030        );
2031        assert_eq!(spec.legacy_metadata(), 0x8000 | (4 << 4) | 0x0100);
2032
2033        // Void action reports bare 0x8000, like legacy_rpc_metadata().
2034        let spec = RpcSpec::new(
2035            "dev.stop",
2036            TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
2037        );
2038        assert_eq!(spec.legacy_metadata(), 0x8000);
2039
2040        // Raw method (type ANY) reports 0 (unknown).
2041        let spec = RpcSpec::new("rpc.list", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW);
2042        assert_eq!(spec.legacy_metadata(), 0);
2043
2044        // String read-only property; bool flag carried via extra meta.
2045        let spec = RpcSpec::with_extra_meta(
2046            "test.enable",
2047            TL_RPC_METHOD_PROP | tl_rpc_mk_uint(1) | TL_RPC_PUBLIC_RW,
2048            RpcMetaFlags::BOOL.bits(),
2049        );
2050        assert_eq!(
2051            spec.legacy_metadata(),
2052            0x8000 | (1 << 4) | 0x0100 | 0x0200 | RpcMetaFlags::BOOL.bits()
2053        );
2054    }
2055
2056    #[test]
2057    fn rpc_table_hash_covers_name_flags_desc_signature() {
2058        let table = vec![
2059            RpcSpec::new(
2060                "a.b",
2061                TL_RPC_METHOD_PROP | tl_rpc_mk_uint(4) | TL_RPC_PUBLIC_READ,
2062            ),
2063            RpcSpec::new(
2064                "c.d",
2065                TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
2066            ),
2067        ];
2068        let base = rpc_table_hash(&table);
2069
2070        // Hash is stable for an identical table.
2071        assert_eq!(base, rpc_table_hash(&table.clone()));
2072
2073        // Renaming an entry changes the hash.
2074        let mut renamed = table.clone();
2075        renamed[0].name = "a.x";
2076        assert_ne!(base, rpc_table_hash(&renamed));
2077
2078        // Changing flags changes the hash.
2079        let mut reflagged = table.clone();
2080        reflagged[1].flags |= TL_RPC_PERSISTENT;
2081        assert_ne!(base, rpc_table_hash(&reflagged));
2082
2083        // Changing the description changes the hash.
2084        let mut redesc = table.clone();
2085        redesc[0].desc = "described";
2086        assert_ne!(base, rpc_table_hash(&redesc));
2087
2088        // Changing the signature changes the hash.
2089        let mut resig = table;
2090        resig[0].signature = "u32";
2091        assert_ne!(base, rpc_table_hash(&resig));
2092    }
2093
2094    #[test]
2095    fn capture_buffer_exports_indexed_blocks_after_delay() {
2096        let now = Instant::now();
2097        let mut capture = CaptureBuffer::new();
2098        capture.block_size = 4;
2099        capture.begin_capture(
2100            (0u8..10).collect(),
2101            CaptureInfo {
2102                length: 10,
2103                ..CaptureInfo::default()
2104            },
2105            now + Duration::from_millis(500),
2106        );
2107
2108        assert!(capture.locked());
2109        assert_eq!(capture.status(), CAPTURE_STATUS_CAPTURING);
2110        assert_eq!(capture.block_count(), 3);
2111        assert!(capture.block(0).is_none());
2112
2113        capture.update(now + Duration::from_millis(499));
2114        assert!(capture.locked());
2115
2116        capture.update(now + Duration::from_millis(500));
2117        assert!(!capture.locked());
2118        assert_eq!(capture.status(), CAPTURE_STATUS_DONE);
2119        assert_eq!(capture.export_size(), 10);
2120        assert_eq!(capture.info().length, 10);
2121        assert_eq!(capture.block(0), Some(&[0, 1, 2, 3][..]));
2122        assert_eq!(capture.block(1), Some(&[4, 5, 6, 7][..]));
2123        assert_eq!(capture.block(2), Some(&[8, 9][..]));
2124        assert!(capture.block(3).is_none());
2125    }
2126
2127    #[test]
2128    fn default_capture_block_size_fits_rpc_replies() {
2129        assert!(usize::from(CAPTURE_DEFAULT_BLOCK_SIZE) <= rpc_reply_max_reply_bytes());
2130    }
2131
2132    #[test]
2133    fn capture_data_uses_current_sine_parameters() {
2134        let cli = SimulateCli::parse_from([
2135            "tio-simulate",
2136            "--port",
2137            "0",
2138            "--samplerate",
2139            "4",
2140            "--frequency",
2141            "1",
2142            "--amplitude",
2143            "2",
2144            "--noise",
2145            "0",
2146        ]);
2147        let mut device = TestDevice::new(cli).unwrap();
2148
2149        let (data, info) = device.generate_capture_data();
2150
2151        assert!(
2152            (CAPTURE_SAMPLE_COUNT_MIN as u32..=CAPTURE_SAMPLE_COUNT_MAX as u32)
2153                .contains(&info.length)
2154        );
2155        assert_eq!(data.len(), info.length as usize * CAPTURE_SAMPLE_BYTES);
2156        assert_eq!(info.y_calibration, 1.0);
2157        assert_eq!(info.x_offset, 0.0);
2158        assert_eq!(info.x_stride, 0.25);
2159
2160        let first = f32::from_le_bytes(data[0..4].try_into().unwrap());
2161        let second = f32::from_le_bytes(data[4..8].try_into().unwrap());
2162        assert_eq!(first, 0.0);
2163        assert!((second - 2.0).abs() < f32::EPSILON);
2164    }
2165
2166    #[test]
2167    fn capture_metadata_uses_tl_chibi_type_and_y_calibration() {
2168        let cli = SimulateCli::parse_from(["tio-simulate", "--port", "0"]);
2169        let mut device = TestDevice::new(cli).unwrap();
2170        let (data, info) = device.generate_capture_data();
2171        let data_len = data.len();
2172        device
2173            .capture
2174            .begin_capture(data, info, Instant::now() + Duration::from_millis(1));
2175        device
2176            .capture
2177            .update(Instant::now() + Duration::from_millis(1));
2178
2179        let metadata = device.capture_metadata_reply();
2180
2181        assert_eq!(metadata[0], CAPTURE_METADATA_FIXED_LEN);
2182        assert_eq!(metadata[1], CAPTURE_METADATA_VERSION);
2183        assert_eq!(metadata[2], u8::from(proto::DataType::Float32));
2184        assert_eq!(
2185            u32::from_le_bytes(metadata[4..8].try_into().unwrap()),
2186            u32::try_from(data_len).unwrap()
2187        );
2188        assert_eq!(
2189            u32::from_le_bytes(metadata[10..14].try_into().unwrap()),
2190            info.length
2191        );
2192        assert_eq!(
2193            f32::from_le_bytes(metadata[14..18].try_into().unwrap()),
2194            CAPTURE_Y_CALIBRATION
2195        );
2196    }
2197
2198    #[test]
2199    fn capture_sample_count_varies_within_range() {
2200        let mut rng = GaussianRng::new(1);
2201        let mut counts = Vec::new();
2202        for _ in 0..8 {
2203            counts.push(next_capture_sample_count(&mut rng));
2204        }
2205
2206        assert!(counts
2207            .iter()
2208            .all(|count| (CAPTURE_SAMPLE_COUNT_MIN..=CAPTURE_SAMPLE_COUNT_MAX).contains(count)));
2209        assert!(counts.windows(2).any(|pair| pair[0] != pair[1]));
2210    }
2211}