1use crate::SimulateCli;
6use ratatui::crossterm::{
7 event::{self, Event, KeyCode, KeyEventKind, KeyModifiers},
8 terminal::{disable_raw_mode, enable_raw_mode},
9};
10use std::io::{self, Write};
11use std::net::{SocketAddr, UdpSocket};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13use twinleaf::device::RpcMetaFlags;
14use twinleaf::tio::proto::{self, meta};
15
16pub fn run_simulate(cli: SimulateCli) -> eyre::Result<()> {
17 let mut device = TestDevice::new(cli)?;
18 device.run()?;
19 Ok(())
20}
21
22macro_rules! terminal_println {
24 ($($arg:tt)*) => {
25 terminal_print_line(format_args!($($arg)*))
26 };
27}
28
29macro_rules! terminal_eprintln {
30 ($($arg:tt)*) => {
31 terminal_error_line(format_args!($($arg)*))
32 };
33}
34
35const SINE_STREAM_ID: u8 = 1;
36const STATUS_STREAM_ID: u8 = 2;
37const AUX_STREAM_ID: u8 = 3;
38const N_SEGMENTS: u8 = 16;
39const DEVICE_NAME: &str = "tio-test";
40const DEVICE_DESC: &str = "Twinleaf tio-test R1 ((null)) [2026-06-08/000001-DEV]";
43const DEVICE_SERIAL: &str = "SIM0001";
44const DEVICE_FIRMWARE: &str = "twinleaf-rust-test";
45const SIGNAL_LEVEL: u8 = 234;
46const AUX_SAMPLE_RATE: u32 = 25;
47const AUX_WAVE_FREQUENCY: f64 = 0.25;
48const SAMPLE_DROP_INTERVAL_SECONDS: f64 = 60.0;
49const SAMPLE_DROP_JITTER_SECONDS: f64 = 30.0;
50const CLIENT_TIMEOUT: Duration = Duration::from_secs(2);
51const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(500);
52const LOG_MESSAGE_MIN_INTERVAL: Duration = Duration::from_millis(1500);
53const LOG_MESSAGE_JITTER: Duration = Duration::from_millis(4000);
54const CAPTURE_TRIGGER_DELAY: Duration = Duration::from_millis(500);
55const CAPTURE_DEFAULT_BLOCK_SIZE: u16 = 256;
56const CAPTURE_SAMPLE_COUNT_MIN: usize = 800;
57const CAPTURE_SAMPLE_COUNT_MAX: usize = 1200;
58const CAPTURE_SAMPLE_BYTES: usize = std::mem::size_of::<f32>();
59const CAPTURE_METADATA_VERSION: u8 = 1;
60const CAPTURE_METADATA_FIXED_LEN: u8 = 30;
61const CAPTURE_Y_CALIBRATION: f32 = 1.0;
62const CAPTURE_NAME: &str = "Test Signal";
63const CAPTURE_UNITS: &str = "V";
64const CAPTURE_X_NAME: &str = "Time";
65const CAPTURE_X_UNITS: &str = "s";
66const CAPTURE_STATUS_IDLE: u8 = 0;
67const CAPTURE_STATUS_CAPTURING: u8 = 1;
68const CAPTURE_STATUS_DONE: u8 = 2;
69const MAX_SAMPLE_NUMBER: u32 = 0x00ff_ffff;
70const STREAM_DATA_HEADER_BYTES: usize = 4;
71const SINE_SAMPLE_BYTES: usize = std::mem::size_of::<f64>() * 2;
72const STATUS_SAMPLE_BYTES: usize = 2;
73const AUX_SAMPLE_BYTES: usize = std::mem::size_of::<f64>() * 2;
74
75const TL_RPC_METHOD_STD: u32 = 0x1;
80const TL_RPC_METHOD_ACTION: u32 = 0x2;
81const TL_RPC_METHOD_PROP: u32 = 0x3;
82
83const TL_RPC_TYPE_OFFSET: u32 = 4;
84const TL_RPC_TYPE_MASK: u32 = 0x7 << TL_RPC_TYPE_OFFSET;
85const TL_RPC_TYPE_ANY: u32 = 0x0 << TL_RPC_TYPE_OFFSET;
86const TL_RPC_TYPE_VOID: u32 = 0x1 << TL_RPC_TYPE_OFFSET;
87const TL_RPC_TYPE_UINT: u32 = 0x2 << TL_RPC_TYPE_OFFSET;
88#[allow(dead_code)]
89const TL_RPC_TYPE_INT: u32 = 0x3 << TL_RPC_TYPE_OFFSET;
90const TL_RPC_TYPE_FLOAT: u32 = 0x4 << TL_RPC_TYPE_OFFSET;
91const TL_RPC_TYPE_STRING: u32 = 0x5 << TL_RPC_TYPE_OFFSET;
92
93const TL_RPC_SIZE_OFFSET: u32 = 8;
94const TL_RPC_SIZE_MASK: u32 = 0x1FF << TL_RPC_SIZE_OFFSET;
95
96const TL_RPC_FLAGS_OFFSET: u32 = 24;
97const TL_RPC_PUBLIC_READ: u32 = 0x1 << TL_RPC_FLAGS_OFFSET;
98const TL_RPC_PUBLIC_WRITE: u32 = 0x2 << TL_RPC_FLAGS_OFFSET;
99const TL_RPC_PERSISTENT: u32 = 0x20 << TL_RPC_FLAGS_OFFSET;
100
101const TL_RPC_PUBLIC_RW: u32 = TL_RPC_PUBLIC_READ | TL_RPC_PUBLIC_WRITE;
102
103const fn tl_rpc_mk_uint(size: u32) -> u32 {
104 TL_RPC_TYPE_UINT | (size << TL_RPC_SIZE_OFFSET)
105}
106const fn tl_rpc_mk_float(size: u32) -> u32 {
107 TL_RPC_TYPE_FLOAT | (size << TL_RPC_SIZE_OFFSET)
108}
109
110#[derive(Clone, Copy)]
111struct SineParams {
112 amplitude: f64,
113 frequency: f64,
114 noise: f64,
115}
116
117#[derive(Clone)]
120struct RpcSpec {
121 name: &'static str,
122 flags: u32,
123 desc: &'static str,
124 signature: &'static str,
125 extra_meta: u16,
128}
129
130impl RpcSpec {
131 const fn new(name: &'static str, flags: u32) -> Self {
132 Self {
133 name,
134 flags,
135 desc: "",
136 signature: "",
137 extra_meta: 0,
138 }
139 }
140
141 const fn with_extra_meta(name: &'static str, flags: u32, extra_meta: u16) -> Self {
142 Self {
143 name,
144 flags,
145 desc: "",
146 signature: "",
147 extra_meta,
148 }
149 }
150
151 fn rpc_type(&self) -> u32 {
152 self.flags & TL_RPC_TYPE_MASK
153 }
154
155 fn rpc_size(&self) -> u32 {
156 (self.flags & TL_RPC_SIZE_MASK) >> TL_RPC_SIZE_OFFSET
157 }
158
159 fn legacy_metadata(&self) -> u16 {
163 let rpc_type = self.rpc_type();
164 if rpc_type == TL_RPC_TYPE_ANY {
165 return self.extra_meta;
166 }
167 if rpc_type == TL_RPC_TYPE_VOID {
168 return 0x8000 | self.extra_meta;
169 }
170
171 let size = self.rpc_size();
172 if size > 0xF {
173 return self.extra_meta;
174 }
175
176 let mut meta: u16 = 0x8000 | ((size as u16) << 4);
177 if (TL_RPC_TYPE_UINT..=TL_RPC_TYPE_STRING).contains(&rpc_type) {
178 meta |= ((rpc_type - TL_RPC_TYPE_UINT) >> TL_RPC_TYPE_OFFSET) as u16;
179 }
180
181 if self.flags & TL_RPC_PUBLIC_READ != 0 {
182 meta |= RpcMetaFlags::READABLE.bits();
183 }
184 if self.flags & TL_RPC_PUBLIC_WRITE != 0 {
185 meta |= RpcMetaFlags::WRITABLE.bits();
186 }
187 if self.flags & TL_RPC_PERSISTENT != 0 {
188 meta |= RpcMetaFlags::PERSISTENT.bits();
189 }
190
191 meta | self.extra_meta
192 }
193}
194
195fn crc32_update(mut crc: u32, data: &[u8]) -> u32 {
198 for &byte in data {
199 crc ^= u32::from(byte);
200 for _ in 0..8 {
201 crc = if crc & 1 != 0 {
202 (crc >> 1) ^ 0xEDB8_8320
203 } else {
204 crc >> 1
205 };
206 }
207 }
208 crc
209}
210
211fn rpc_table_hash(rpcs: &[RpcSpec]) -> u32 {
215 let mut crc = 0xFFFF_FFFF;
216 for spec in rpcs {
217 crc = crc32_update(crc, spec.name.as_bytes());
218 crc = crc32_update(crc, &spec.flags.to_le_bytes());
219 crc = crc32_update(crc, spec.desc.as_bytes());
220 crc = crc32_update(crc, spec.signature.as_bytes());
221 }
222 !crc
223}
224
225#[derive(Clone, Copy)]
226struct Client {
227 addr: SocketAddr,
228 last_rx: Instant,
229}
230
231#[derive(Clone, Copy)]
232struct CaptureInfo {
233 length: u32,
234 y_calibration: f32,
235 x_offset: f32,
236 x_stride: f32,
237}
238
239impl Default for CaptureInfo {
240 fn default() -> Self {
241 Self {
242 length: 0,
243 y_calibration: CAPTURE_Y_CALIBRATION,
244 x_offset: 0.0,
245 x_stride: 0.0,
246 }
247 }
248}
249
250struct CapturingCapture {
251 ready_at: Instant,
252 data: Vec<u8>,
253 info: CaptureInfo,
254}
255
256struct CaptureBuffer {
257 data: Vec<u8>,
258 block_size: u16,
259 capturing: Option<CapturingCapture>,
260 info: CaptureInfo,
261}
262
263impl CaptureBuffer {
264 fn new() -> Self {
265 Self {
266 data: Vec::new(),
267 block_size: CAPTURE_DEFAULT_BLOCK_SIZE,
268 capturing: None,
269 info: CaptureInfo::default(),
270 }
271 }
272
273 fn clear(&mut self) {
274 self.data.clear();
275 self.capturing = None;
276 self.block_size = CAPTURE_DEFAULT_BLOCK_SIZE;
277 self.info = CaptureInfo::default();
278 }
279
280 fn begin_capture(&mut self, data: Vec<u8>, info: CaptureInfo, ready_at: Instant) {
281 self.capturing = Some(CapturingCapture {
282 ready_at,
283 data,
284 info,
285 });
286 }
287
288 fn update(&mut self, now: Instant) {
289 let Some(capturing) = self.capturing.as_ref() else {
290 return;
291 };
292 if now < capturing.ready_at {
293 return;
294 }
295
296 let capturing = self.capturing.take().expect("capturing checked above");
297 self.data = capturing.data;
298 self.info = capturing.info;
299 }
300
301 fn locked(&self) -> bool {
302 self.capturing.is_some()
303 }
304
305 fn status(&self) -> u8 {
306 if self.capturing.is_some() {
307 CAPTURE_STATUS_CAPTURING
308 } else if self.data.is_empty() {
309 CAPTURE_STATUS_IDLE
310 } else {
311 CAPTURE_STATUS_DONE
312 }
313 }
314
315 fn info(&self) -> CaptureInfo {
316 self.capturing
317 .as_ref()
318 .map(|capturing| capturing.info)
319 .unwrap_or(self.info)
320 }
321
322 fn export_size(&self) -> usize {
323 self.capturing
324 .as_ref()
325 .map(|capturing| capturing.data.len())
326 .unwrap_or(self.data.len())
327 }
328
329 #[cfg(test)]
330 fn block_count(&self) -> u16 {
331 let size = self.export_size();
332 if size == 0 {
333 return 0;
334 }
335
336 let block_size = usize::from(self.block_size);
337 let blocks = size.div_ceil(block_size);
338 u16::try_from(blocks).unwrap_or(u16::MAX)
339 }
340
341 fn block(&self, index: u16) -> Option<&[u8]> {
342 let start = usize::from(index) * usize::from(self.block_size);
343 let end = (start + usize::from(self.block_size)).min(self.data.len());
344 if start >= end {
345 None
346 } else {
347 Some(&self.data[start..end])
348 }
349 }
350}
351
352struct RawModeGuard;
353
354impl RawModeGuard {
355 fn enable() -> io::Result<Self> {
356 enable_raw_mode()?;
357 Ok(Self)
358 }
359}
360
361impl Drop for RawModeGuard {
362 fn drop(&mut self) {
363 let _ = disable_raw_mode();
364 }
365}
366
367fn terminal_print_line(args: std::fmt::Arguments<'_>) {
368 let mut stdout = io::stdout().lock();
369 let _ = write!(stdout, "{args}\r\n");
370 let _ = stdout.flush();
371}
372
373fn terminal_error_line(args: std::fmt::Arguments<'_>) {
374 let mut stderr = io::stderr().lock();
375 let _ = write!(stderr, "{args}\r\n");
376 let _ = stderr.flush();
377}
378
379struct GaussianRng {
380 state: u64,
381 cached: Option<f64>,
382}
383
384impl GaussianRng {
385 fn new(seed: u64) -> Self {
386 Self {
387 state: seed,
388 cached: None,
389 }
390 }
391
392 fn next_u64(&mut self) -> u64 {
393 let mut x = self.state;
394 x ^= x >> 12;
395 x ^= x << 25;
396 x ^= x >> 27;
397 self.state = x;
398 x.wrapping_mul(0x2545_f491_4f6c_dd1d)
399 }
400
401 fn next_unit(&mut self) -> f64 {
402 let raw = self.next_u64() >> 11;
403 ((raw as f64) + 1.0) / ((1u64 << 53) as f64 + 1.0)
404 }
405
406 fn next_gaussian(&mut self) -> f64 {
407 if let Some(value) = self.cached.take() {
408 return value;
409 }
410
411 let u1 = self.next_unit();
412 let u2 = self.next_unit();
413 let radius = (-2.0 * u1.ln()).sqrt();
414 let phase = std::f64::consts::TAU * u2;
415 self.cached = Some(radius * phase.sin());
416 radius * phase.cos()
417 }
418}
419
420struct TestDevice {
421 socket: UdpSocket,
422 client: Option<Client>,
423 initial_params: SineParams,
424 params: SineParams,
425 initial_status: u8,
426 status: u8,
427 initial_enable: u8,
428 enable: u8,
429 desc: String,
432 sample_rate: u32,
433 segment_seconds: u32,
434 segment_samples: u32,
435 max_samples_per_packet: u64,
436 aux_segment_samples: u32,
437 aux_max_samples_per_packet: u64,
438 session_id: u32,
439 started_at: Instant,
440 start_time: u32,
441 samples_generated: u64,
442 sample_number: u32,
443 segment_id: u8,
444 segment_start_time: u32,
445 pending_segment_update: bool,
446 next_drop_sample: u64,
447 aux_samples_generated: u64,
448 aux_sample_number: u32,
449 aux_segment_id: u8,
450 aux_segment_start_time: u32,
451 aux_pending_segment_update: bool,
452 next_aux_drop_sample: u64,
453 last_heartbeat: Instant,
454 next_log_message_at: Instant,
455 next_log_level: usize,
456 capture: CaptureBuffer,
457 rng: GaussianRng,
458 rpcs: Vec<RpcSpec>,
459 rpc_hash: u32,
460}
461
462impl TestDevice {
463 fn new(cli: SimulateCli) -> io::Result<Self> {
464 let socket = UdpSocket::bind(("0.0.0.0", cli.port))?;
465 socket.set_nonblocking(true)?;
466
467 let now = unix_duration();
468 let start_time = unix_time_secs(now);
469 let seed = now.as_nanos() as u64 ^ u64::from(cli.port).rotate_left(32);
470 let session_id = (seed as u32)
471 .wrapping_mul(1_664_525)
472 .wrapping_add(1_013_904_223);
473 let segment_samples = cli
474 .samplerate
475 .checked_mul(cli.segment_seconds)
476 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "segment too long"))?;
477 if segment_samples > MAX_SAMPLE_NUMBER {
478 return Err(io::Error::new(
479 io::ErrorKind::InvalidInput,
480 "segment contains too many samples for TIO sample numbering",
481 ));
482 }
483 let aux_segment_samples = AUX_SAMPLE_RATE
484 .checked_mul(cli.segment_seconds)
485 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "aux segment too long"))?;
486 if aux_segment_samples > MAX_SAMPLE_NUMBER {
487 return Err(io::Error::new(
488 io::ErrorKind::InvalidInput,
489 "aux segment contains too many samples for TIO sample numbering",
490 ));
491 }
492 let max_samples_per_packet = max_stream_samples_per_packet(SINE_SAMPLE_BYTES)
493 .min(max_stream_samples_per_packet(STATUS_SAMPLE_BYTES));
494 let aux_max_samples_per_packet = max_stream_samples_per_packet(AUX_SAMPLE_BYTES);
495 if max_samples_per_packet == 0 {
496 return Err(io::Error::new(
497 io::ErrorKind::InvalidInput,
498 "stream sample is too large for a TIO packet",
499 ));
500 }
501 if aux_max_samples_per_packet == 0 {
502 return Err(io::Error::new(
503 io::ErrorKind::InvalidInput,
504 "aux stream sample is too large for a TIO packet",
505 ));
506 }
507 let mut rng = GaussianRng::new(seed | 1);
508 let next_drop_sample = next_drop_sample_after(&mut rng, 0, cli.samplerate);
509 let next_aux_drop_sample = next_drop_sample_after(&mut rng, 0, AUX_SAMPLE_RATE);
510
511 let initial_params = SineParams {
512 amplitude: cli.amplitude,
513 frequency: cli.frequency,
514 noise: cli.noise,
515 };
516 let initial_status = 0;
517 let initial_enable = 1;
518
519 let rpcs = vec![
523 RpcSpec::new("rpc.name", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
524 RpcSpec::new("rpc.id", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
525 RpcSpec::new("rpc.info", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
526 RpcSpec::new("rpc.list", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
527 RpcSpec::new("rpc.listinfo", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
528 RpcSpec::new(
529 "rpc.hash",
530 TL_RPC_METHOD_PROP | tl_rpc_mk_uint(4) | TL_RPC_PUBLIC_READ,
531 ),
532 RpcSpec::new(
533 "dev.name",
534 TL_RPC_METHOD_PROP | TL_RPC_TYPE_STRING | TL_RPC_PUBLIC_READ,
535 ),
536 RpcSpec::new(
537 "dev.desc",
538 TL_RPC_METHOD_PROP | TL_RPC_TYPE_STRING | TL_RPC_PUBLIC_READ,
539 ),
540 RpcSpec::new(
541 "dev.stop",
542 TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
543 ),
544 RpcSpec::new(
545 "dev.firmware.upload",
546 TL_RPC_METHOD_STD | TL_RPC_PUBLIC_WRITE,
547 ),
548 RpcSpec::new(
549 "dev.firmware.upgrade",
550 TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
551 ),
552 RpcSpec::new("dev.metadata", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW),
553 RpcSpec::new(
554 "test.amplitude",
555 TL_RPC_METHOD_PROP | tl_rpc_mk_float(8) | TL_RPC_PUBLIC_RW,
556 ),
557 RpcSpec::new(
558 "test.frequency",
559 TL_RPC_METHOD_PROP | tl_rpc_mk_float(8) | TL_RPC_PUBLIC_RW,
560 ),
561 RpcSpec::new(
562 "test.noise",
563 TL_RPC_METHOD_PROP | tl_rpc_mk_float(8) | TL_RPC_PUBLIC_RW,
564 ),
565 RpcSpec::new(
566 "test.status",
567 TL_RPC_METHOD_PROP | tl_rpc_mk_uint(1) | TL_RPC_PUBLIC_RW,
568 ),
569 RpcSpec::with_extra_meta(
570 "test.enable",
571 TL_RPC_METHOD_PROP | tl_rpc_mk_uint(1) | TL_RPC_PUBLIC_RW,
572 RpcMetaFlags::BOOL.bits(),
573 ),
574 RpcSpec::new(
575 "test.go",
576 TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
577 ),
578 RpcSpec::with_extra_meta(
579 "test.capture",
580 TL_RPC_METHOD_STD | TL_RPC_PUBLIC_READ,
581 (RpcMetaFlags::READABLE | RpcMetaFlags::CAPTURE).bits(),
582 ),
583 ];
584 let rpc_hash = rpc_table_hash(&rpcs);
585
586 Ok(Self {
587 socket,
588 client: None,
589 initial_params,
590 params: initial_params,
591 initial_status,
592 status: initial_status,
593 initial_enable,
594 enable: initial_enable,
595 desc: DEVICE_DESC.to_string(),
596 sample_rate: cli.samplerate,
597 segment_seconds: cli.segment_seconds,
598 segment_samples,
599 max_samples_per_packet,
600 aux_segment_samples,
601 aux_max_samples_per_packet,
602 session_id,
603 started_at: Instant::now(),
604 start_time,
605 samples_generated: 0,
606 sample_number: 0,
607 segment_id: 0,
608 segment_start_time: start_time,
609 pending_segment_update: false,
610 next_drop_sample,
611 aux_samples_generated: 0,
612 aux_sample_number: 0,
613 aux_segment_id: 0,
614 aux_segment_start_time: start_time,
615 aux_pending_segment_update: false,
616 next_aux_drop_sample,
617 last_heartbeat: Instant::now(),
618 next_log_message_at: Instant::now() + next_log_delay(&mut rng),
619 next_log_level: 0,
620 capture: CaptureBuffer::new(),
621 rng,
622 rpcs,
623 rpc_hash,
624 })
625 }
626
627 fn run(&mut self) -> io::Result<()> {
628 let raw_mode = match RawModeGuard::enable() {
629 Ok(guard) => Some(guard),
630 Err(err) => {
631 terminal_eprintln!("keyboard shortcuts disabled: {err}");
632 None
633 }
634 };
635
636 terminal_println!(
637 "tio test listening on udp://0.0.0.0:{}",
638 self.socket.local_addr()?.port()
639 );
640 terminal_println!(
641 " stream 1: 2 waveform channels, amplitude={} V frequency={} Hz noise={} V/sqrt(Hz) samplerate={} Hz segment={} s",
642 self.params.amplitude,
643 self.params.frequency,
644 self.params.noise,
645 self.sample_rate,
646 self.segment_seconds
647 );
648 terminal_println!(
649 " stream 2: status={} signal_level={}",
650 self.status,
651 SIGNAL_LEVEL
652 );
653 terminal_println!(
654 " stream 3: aux triangle/sawtooth at {} Hz sampled at {} Hz",
655 AUX_WAVE_FREQUENCY,
656 AUX_SAMPLE_RATE
657 );
658 terminal_println!(
659 " randomly dropping one sample from each sample clock about once per minute"
660 );
661 terminal_println!(
662 " capture buffer: test.capture(-1) trigger, test.capture(-2) status, \
663 test.capture(-3) metadata, {}-{} f32 samples, ~{:.1}s delay",
664 CAPTURE_SAMPLE_COUNT_MIN,
665 CAPTURE_SAMPLE_COUNT_MAX,
666 CAPTURE_TRIGGER_DELAY.as_secs_f64()
667 );
668 if raw_mode.is_some() {
669 terminal_println!(" press d to drop one sample now, r to reboot, Ctrl-C to quit");
670 }
671 terminal_println!(
672 " connect with: tio proxy udp4://127.0.0.1:{}",
673 self.socket.local_addr()?.port()
674 );
675
676 loop {
677 if raw_mode.is_some() && !self.handle_keyboard()? {
678 terminal_println!("stopping tio test");
679 return Ok(());
680 }
681 self.receive_packets()?;
682 self.expire_client();
683 self.send_periodic_packets()?;
684 std::thread::sleep(Duration::from_millis(1));
685 }
686 }
687
688 fn handle_keyboard(&mut self) -> io::Result<bool> {
689 while event::poll(Duration::from_millis(0))? {
690 if let Event::Key(key) = event::read()? {
691 if key.kind != KeyEventKind::Press {
692 continue;
693 }
694 match key.code {
695 KeyCode::Char('d') => self.drop_samples_now()?,
696 KeyCode::Char('r') => self.reboot()?,
697 KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => {
698 return Ok(false);
699 }
700 _ => {}
701 }
702 }
703 }
704 Ok(true)
705 }
706
707 fn receive_packets(&mut self) -> io::Result<()> {
708 let mut buf = [0u8; 1024];
709 loop {
710 match self.socket.recv_from(&mut buf) {
711 Ok((size, addr)) => {
712 if !self.accept_packet_from(addr)? {
713 continue;
714 }
715 match proto::Packet::deserialize(&buf[..size]) {
716 Ok((packet, parsed_size)) if parsed_size == size => {
717 self.handle_packet(packet, addr)?;
718 }
719 Ok(_) => {
720 terminal_eprintln!(
721 "Ignoring UDP datagram with trailing bytes from {addr}"
722 );
723 }
724 Err(err) => {
725 terminal_eprintln!("Ignoring malformed packet from {addr}: {err:?}");
726 }
727 }
728 }
729 Err(err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(()),
730 Err(err) => return Err(err),
731 }
732 }
733 }
734
735 fn accept_packet_from(&mut self, addr: SocketAddr) -> io::Result<bool> {
736 let now = Instant::now();
737 match self.client {
738 Some(mut client) if client.addr == addr => {
739 client.last_rx = now;
740 self.client = Some(client);
741 Ok(true)
742 }
743 Some(client) if now.duration_since(client.last_rx) < CLIENT_TIMEOUT => Ok(false),
744 _ => {
745 self.client = Some(Client { addr, last_rx: now });
746 self.reset_run();
747 terminal_println!("client connected: {addr}");
748 self.send_initial_packets(addr)?;
749 Ok(true)
750 }
751 }
752 }
753
754 fn expire_client(&mut self) {
755 if let Some(client) = self.client {
756 if Instant::now().duration_since(client.last_rx) > CLIENT_TIMEOUT {
757 terminal_println!("client disconnected: {}", client.addr);
758 self.client = None;
759 }
760 }
761 }
762
763 fn reset_run(&mut self) {
764 self.started_at = Instant::now();
765 self.start_time = unix_time_secs(unix_duration());
766 self.samples_generated = 0;
767 self.sample_number = 0;
768 self.segment_id = 0;
769 self.segment_start_time = self.start_time;
770 self.pending_segment_update = false;
771 let next_drop_sample = self.next_drop_sample_after(0, self.sample_rate);
772 self.next_drop_sample = next_drop_sample;
773 self.aux_samples_generated = 0;
774 self.aux_sample_number = 0;
775 self.aux_segment_id = 0;
776 self.aux_segment_start_time = self.start_time;
777 self.aux_pending_segment_update = false;
778 let next_aux_drop_sample = self.next_drop_sample_after(0, AUX_SAMPLE_RATE);
779 self.next_aux_drop_sample = next_aux_drop_sample;
780 self.last_heartbeat = Instant::now()
781 .checked_sub(HEARTBEAT_INTERVAL)
782 .unwrap_or_else(Instant::now);
783 self.next_log_message_at = Instant::now() + self.next_log_delay();
784 self.next_log_level = 0;
785 self.capture.clear();
786 }
787
788 fn reboot(&mut self) -> io::Result<()> {
789 self.session_id = self.next_session_id();
790 self.params = self.initial_params;
791 self.status = self.initial_status;
792 self.enable = self.initial_enable;
793 self.reset_run();
794 terminal_println!("rebooted test device; new session id {}", self.session_id);
795
796 if let Some(client) = self.client {
797 self.send_initial_packets(client.addr)?;
798 }
799 Ok(())
800 }
801
802 fn handle_packet(&mut self, packet: proto::Packet, addr: SocketAddr) -> io::Result<()> {
803 if let proto::Payload::RpcRequest(req) = packet.payload {
804 self.handle_rpc(req, packet.routing, addr)?;
805 }
806 Ok(())
807 }
808
809 fn handle_rpc(
810 &mut self,
811 req: proto::RpcRequestPayload,
812 routing: proto::DeviceRoute,
813 addr: SocketAddr,
814 ) -> io::Result<()> {
815 self.update_capture();
816
817 let method = match &req.method {
818 proto::RpcMethod::Name(name) => name.as_str(),
819 proto::RpcMethod::Id(_) => {
820 return self.send_rpc_error(req.id, proto::RpcErrorCode::NotFound, routing, addr)
821 }
822 };
823
824 let result = match method {
825 "dev.name" => self.rpc_read_string(req.id, DEVICE_NAME, &req.arg, routing, addr),
826 "dev.desc" => self.rpc_read_string(req.id, &self.desc.clone(), &req.arg, routing, addr),
827 "dev.stop" => self.send_rpc_reply(req.id, Vec::new(), routing, addr),
828 "dev.firmware.upload" => self.send_rpc_reply(req.id, Vec::new(), routing, addr),
830 "dev.firmware.upgrade" => {
832 self.desc = "Twinleaf tio-test R1 ((null)) [2026-06-08/000002]".to_string();
833 self.send_rpc_reply(req.id, Vec::new(), routing, addr)
834 }
835 "rpc.hash" => self.rpc_read_u32(req.id, self.rpc_hash, &req.arg, routing, addr),
836 "rpc.name" => self.rpc_name(req.id, &req.arg, routing, addr),
837 "rpc.id" => self.rpc_id(req.id, &req.arg, routing, addr),
838 "rpc.info" => self.rpc_info(req.id, &req.arg, routing, addr),
839 "rpc.list" => self.rpc_list_and_info(req.id, &req.arg, false, routing, addr),
840 "rpc.listinfo" => self.rpc_list_and_info(req.id, &req.arg, true, routing, addr),
841 "dev.metadata" => self.rpc_metadata(req.id, &req.arg, routing, addr),
842 "test.amplitude" => {
843 let next = self.read_or_write_nonnegative_f64(
844 req.id,
845 &req.arg,
846 self.params.amplitude,
847 routing.clone(),
848 addr,
849 )?;
850 self.params.amplitude = next;
851 Ok(())
852 }
853 "test.frequency" => {
854 let next = self.read_or_write_nonnegative_f64(
855 req.id,
856 &req.arg,
857 self.params.frequency,
858 routing.clone(),
859 addr,
860 )?;
861 self.params.frequency = next;
862 Ok(())
863 }
864 "test.noise" => {
865 let next = self.read_or_write_nonnegative_f64(
866 req.id,
867 &req.arg,
868 self.params.noise,
869 routing.clone(),
870 addr,
871 )?;
872 self.params.noise = next;
873 Ok(())
874 }
875 "test.status" => {
876 let next =
877 self.read_or_write_u8(req.id, &req.arg, self.status, routing.clone(), addr)?;
878 self.status = next;
879 Ok(())
880 }
881 "test.enable" => {
882 let next =
883 self.read_or_write_u8(req.id, &req.arg, self.enable, routing.clone(), addr)?;
884 self.enable = next;
885 Ok(())
886 }
887 "test.go" => self.rpc_action(req.id, &req.arg, routing, addr),
888 "test.capture" => self.rpc_capture(req.id, &req.arg, routing, addr),
889 _ => self.send_rpc_error(req.id, proto::RpcErrorCode::NotFound, routing, addr),
890 };
891
892 result
893 }
894
895 fn rpc_read_string(
896 &self,
897 id: u16,
898 value: &str,
899 arg: &[u8],
900 routing: proto::DeviceRoute,
901 addr: SocketAddr,
902 ) -> io::Result<()> {
903 if !arg.is_empty() {
904 return self.send_rpc_error(id, proto::RpcErrorCode::ReadOnly, routing, addr);
905 }
906 self.send_rpc_reply(id, value.as_bytes().to_vec(), routing, addr)
907 }
908
909 fn rpc_read_u32(
910 &self,
911 id: u16,
912 value: u32,
913 arg: &[u8],
914 routing: proto::DeviceRoute,
915 addr: SocketAddr,
916 ) -> io::Result<()> {
917 if !arg.is_empty() {
918 return self.send_rpc_error(id, proto::RpcErrorCode::ReadOnly, routing, addr);
919 }
920 self.send_rpc_reply(id, value.to_le_bytes().to_vec(), routing, addr)
921 }
922
923 fn rpc_name(
925 &self,
926 id: u16,
927 arg: &[u8],
928 routing: proto::DeviceRoute,
929 addr: SocketAddr,
930 ) -> io::Result<()> {
931 if arg.len() != 2 {
932 return self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr);
933 }
934 let index = u16::from_le_bytes([arg[0], arg[1]]) as usize;
935 let Some(spec) = self.rpcs.get(index) else {
936 return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
937 };
938 self.send_rpc_reply(id, spec.name.as_bytes().to_vec(), routing, addr)
939 }
940
941 fn rpc_id(
943 &self,
944 id: u16,
945 arg: &[u8],
946 routing: proto::DeviceRoute,
947 addr: SocketAddr,
948 ) -> io::Result<()> {
949 if arg.is_empty() {
950 return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
951 }
952 let index = self
953 .rpcs
954 .iter()
955 .position(|spec| spec.name.as_bytes() == arg);
956 match index {
957 Some(index) => {
958 self.send_rpc_reply(id, (index as u16).to_le_bytes().to_vec(), routing, addr)
959 }
960 None => self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr),
961 }
962 }
963
964 fn rpc_info(
966 &self,
967 id: u16,
968 arg: &[u8],
969 routing: proto::DeviceRoute,
970 addr: SocketAddr,
971 ) -> io::Result<()> {
972 if arg.is_empty() {
973 return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
974 }
975 let Some(spec) = self.rpcs.iter().find(|spec| spec.name.as_bytes() == arg) else {
976 return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
977 };
978 self.send_rpc_reply(
979 id,
980 spec.legacy_metadata().to_le_bytes().to_vec(),
981 routing,
982 addr,
983 )
984 }
985
986 fn rpc_list_and_info(
990 &self,
991 id: u16,
992 arg: &[u8],
993 prepend_info: bool,
994 routing: proto::DeviceRoute,
995 addr: SocketAddr,
996 ) -> io::Result<()> {
997 if arg.is_empty() {
998 return self.send_rpc_reply(
999 id,
1000 (self.rpcs.len() as u16).to_le_bytes().to_vec(),
1001 routing,
1002 addr,
1003 );
1004 }
1005 if arg.len() != 2 {
1006 return self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr);
1007 }
1008
1009 let index = u16::from_le_bytes([arg[0], arg[1]]) as usize;
1010 let Some(spec) = self.rpcs.get(index) else {
1011 return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
1012 };
1013
1014 let mut reply = Vec::new();
1015 if prepend_info {
1016 reply.extend(spec.legacy_metadata().to_le_bytes());
1017 }
1018 reply.extend(spec.name.as_bytes());
1019 self.send_rpc_reply(id, reply, routing, addr)
1020 }
1021
1022 fn rpc_metadata(
1023 &self,
1024 id: u16,
1025 arg: &[u8],
1026 routing: proto::DeviceRoute,
1027 addr: SocketAddr,
1028 ) -> io::Result<()> {
1029 let reply = if arg.is_empty() {
1030 self.all_metadata_reply()?
1031 } else if arg.len() % 3 == 0 {
1032 let mut reply = Vec::new();
1033 for req in arg.chunks_exact(3) {
1034 if self
1035 .append_metadata_record(&mut reply, req[0], req[1], req[2])
1036 .is_err()
1037 {
1038 return self.send_rpc_error(
1039 id,
1040 proto::RpcErrorCode::InvalidArgs,
1041 routing,
1042 addr,
1043 );
1044 }
1045 }
1046 reply
1047 } else {
1048 return self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr);
1049 };
1050
1051 self.send_rpc_reply(id, reply, routing, addr)
1052 }
1053
1054 fn read_or_write_nonnegative_f64(
1055 &self,
1056 id: u16,
1057 arg: &[u8],
1058 current: f64,
1059 routing: proto::DeviceRoute,
1060 addr: SocketAddr,
1061 ) -> io::Result<f64> {
1062 let value = match arg.len() {
1063 0 => current,
1064 8 => f64::from_le_bytes(arg.try_into().unwrap()),
1065 _ => {
1066 self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr)?;
1067 return Ok(current);
1068 }
1069 };
1070
1071 if !value.is_finite() || value < 0.0 {
1072 self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr)?;
1073 return Ok(current);
1074 }
1075
1076 self.send_rpc_reply(id, value.to_le_bytes().to_vec(), routing, addr)?;
1077 Ok(value)
1078 }
1079
1080 fn read_or_write_u8(
1081 &self,
1082 id: u16,
1083 arg: &[u8],
1084 current: u8,
1085 routing: proto::DeviceRoute,
1086 addr: SocketAddr,
1087 ) -> io::Result<u8> {
1088 let value = match arg.len() {
1089 0 => current,
1090 1 => arg[0],
1091 _ => {
1092 self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr)?;
1093 return Ok(current);
1094 }
1095 };
1096
1097 self.send_rpc_reply(id, vec![value], routing, addr)?;
1098 Ok(value)
1099 }
1100
1101 fn rpc_action(
1102 &self,
1103 id: u16,
1104 arg: &[u8],
1105 routing: proto::DeviceRoute,
1106 addr: SocketAddr,
1107 ) -> io::Result<()> {
1108 if !arg.is_empty() {
1109 return self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr);
1110 }
1111 terminal_println!("test.go action invoked");
1112 self.send_rpc_reply(id, Vec::new(), routing, addr)
1113 }
1114
1115 fn rpc_capture(
1116 &mut self,
1117 id: u16,
1118 arg: &[u8],
1119 routing: proto::DeviceRoute,
1120 addr: SocketAddr,
1121 ) -> io::Result<()> {
1122 let selector = match arg.len() {
1123 0 => -2,
1124 2 => i16::from_le_bytes([arg[0], arg[1]]),
1125 _ => {
1126 self.send_rpc_error(id, proto::RpcErrorCode::WrongSizeArgs, routing, addr)?;
1127 return Ok(());
1128 }
1129 };
1130
1131 match selector {
1132 -1 => self.rpc_capture_trigger(id, routing, addr),
1133 -2 => self.send_rpc_reply(id, vec![self.capture.status()], routing, addr),
1134 -3 => self.send_rpc_reply(id, self.capture_metadata_reply(), routing, addr),
1135 index if index >= 0 => self.rpc_capture_block(id, index as u16, routing, addr),
1136 _ => self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr),
1137 }
1138 }
1139
1140 fn rpc_capture_trigger(
1141 &mut self,
1142 id: u16,
1143 routing: proto::DeviceRoute,
1144 addr: SocketAddr,
1145 ) -> io::Result<()> {
1146 if self.capture.locked() {
1147 return self.send_rpc_error(id, proto::RpcErrorCode::Busy, routing, addr);
1148 }
1149
1150 let (data, info) = self.generate_capture_data();
1151 self.capture
1152 .begin_capture(data, info, Instant::now() + CAPTURE_TRIGGER_DELAY);
1153 terminal_println!(
1154 "test.capture triggered ({} samples); data available in ~{:.1}s",
1155 info.length,
1156 CAPTURE_TRIGGER_DELAY.as_secs_f64()
1157 );
1158 self.send_rpc_reply(id, Vec::new(), routing, addr)
1159 }
1160
1161 fn rpc_capture_block(
1162 &mut self,
1163 id: u16,
1164 index: u16,
1165 routing: proto::DeviceRoute,
1166 addr: SocketAddr,
1167 ) -> io::Result<()> {
1168 if self.capture.locked() {
1169 return self.send_rpc_error(id, proto::RpcErrorCode::Busy, routing, addr);
1170 }
1171
1172 let Some(block) = self.capture.block(index) else {
1173 return self.send_rpc_error(id, proto::RpcErrorCode::InvalidArgs, routing, addr);
1174 };
1175 self.send_rpc_reply(id, block.to_vec(), routing, addr)
1176 }
1177
1178 fn capture_metadata_reply(&self) -> Vec<u8> {
1179 let info = self.capture.info();
1180 let mut fixed = Vec::with_capacity(usize::from(CAPTURE_METADATA_FIXED_LEN));
1181 let mut varlen = Vec::new();
1182
1183 fixed.push(CAPTURE_METADATA_FIXED_LEN);
1184 fixed.push(CAPTURE_METADATA_VERSION);
1185 fixed.push(u8::from(proto::DataType::Float32));
1186 fixed.push(0);
1187 fixed.extend(
1188 u32::try_from(self.capture.export_size())
1189 .unwrap_or(u32::MAX)
1190 .to_le_bytes(),
1191 );
1192 fixed.extend(self.capture.block_size.to_le_bytes());
1193 fixed.extend(info.length.to_le_bytes());
1194 fixed.extend(info.y_calibration.to_le_bytes());
1195 fixed.extend(info.x_offset.to_le_bytes());
1196 fixed.extend(info.x_stride.to_le_bytes());
1197 fixed.push(append_capture_metadata_string(&mut varlen, CAPTURE_NAME));
1198 fixed.push(append_capture_metadata_string(&mut varlen, CAPTURE_UNITS));
1199 fixed.push(append_capture_metadata_string(&mut varlen, CAPTURE_X_NAME));
1200 fixed.push(append_capture_metadata_string(&mut varlen, CAPTURE_X_UNITS));
1201 debug_assert_eq!(fixed.len(), usize::from(CAPTURE_METADATA_FIXED_LEN));
1202 fixed.extend(varlen);
1203 fixed
1204 }
1205
1206 fn update_capture(&mut self) {
1207 let was_locked = self.capture.locked();
1208 self.capture.update(Instant::now());
1209 if was_locked && !self.capture.locked() {
1210 terminal_println!("test.capture done ({} bytes)", self.capture.export_size());
1211 }
1212 }
1213
1214 fn generate_capture_data(&mut self) -> (Vec<u8>, CaptureInfo) {
1215 let sample_count = next_capture_sample_count(&mut self.rng);
1216 let mut data = Vec::with_capacity(sample_count * CAPTURE_SAMPLE_BYTES);
1217
1218 let noise_sigma = self.params.noise * (f64::from(self.sample_rate) / 2.0).sqrt();
1219 let start_sample = self.samples_generated;
1220 for offset in 0..sample_count as u64 {
1221 let t = (start_sample + offset) as f64 / f64::from(self.sample_rate);
1222 let phase = std::f64::consts::TAU * self.params.frequency * t;
1223 let value =
1224 self.params.amplitude * phase.sin() + noise_sigma * self.rng.next_gaussian();
1225 data.extend((value as f32).to_le_bytes());
1226 }
1227
1228 let info = CaptureInfo {
1229 length: sample_count as u32,
1230 y_calibration: CAPTURE_Y_CALIBRATION,
1231 x_offset: start_sample as f32 / self.sample_rate as f32,
1232 x_stride: 1.0 / self.sample_rate as f32,
1233 };
1234
1235 (data, info)
1236 }
1237
1238 fn send_periodic_packets(&mut self) -> io::Result<()> {
1239 self.update_capture();
1240 let Some(client) = self.client else {
1241 return Ok(());
1242 };
1243
1244 if self.last_heartbeat.elapsed() >= HEARTBEAT_INTERVAL {
1245 self.send_packet(&self.heartbeat_packet(), client.addr)?;
1246 self.last_heartbeat = Instant::now();
1247 }
1248
1249 self.send_log_message_if_due(client.addr)?;
1250 self.send_due_samples(client.addr)?;
1251 self.send_due_aux_samples(client.addr)
1252 }
1253
1254 fn send_log_message_if_due(&mut self, addr: SocketAddr) -> io::Result<()> {
1255 if Instant::now() < self.next_log_message_at {
1256 return Ok(());
1257 }
1258
1259 let level = self.next_log_level();
1260 let lucky_number = (self.rng.next_u64() % 10_000) as u32;
1261 let message = self.random_log_message(level, lucky_number);
1262 self.send_packet(
1263 &proto::Packet {
1264 payload: proto::Payload::LogMessage(proto::LogMessagePayload {
1265 data: lucky_number,
1266 level,
1267 message,
1268 }),
1269 routing: proto::DeviceRoute::root(),
1270 ttl: 0,
1271 },
1272 addr,
1273 )?;
1274 self.next_log_message_at = Instant::now() + self.next_log_delay();
1275 Ok(())
1276 }
1277
1278 fn send_due_samples(&mut self, addr: SocketAddr) -> io::Result<()> {
1279 for _ in 0..4 {
1280 let due = self.due_samples();
1281 if due == 0 {
1282 break;
1283 }
1284
1285 let first_sample_n = self.sample_number;
1286 let samples_until_drop = self.next_drop_sample.saturating_sub(self.samples_generated);
1287 if samples_until_drop == 0 {
1288 self.drop_sample();
1289 self.send_sample_segment_updates_if_needed(first_sample_n, addr)?;
1290 continue;
1291 }
1292
1293 let samples_left_in_segment = u64::from(self.segment_samples - self.sample_number);
1294 let batch_len = due
1295 .min(self.max_samples_per_packet)
1296 .min(samples_left_in_segment)
1297 .min(samples_until_drop);
1298 self.send_sample_batches(batch_len, addr)?;
1299 self.send_sample_segment_updates_if_needed(first_sample_n, addr)?;
1300 }
1301 Ok(())
1302 }
1303
1304 fn due_samples(&self) -> u64 {
1305 let elapsed = self.started_at.elapsed().as_secs_f64();
1306 let target = (elapsed * f64::from(self.sample_rate)).floor() as u64;
1307 target.saturating_sub(self.samples_generated)
1308 }
1309
1310 fn due_aux_samples(&self) -> u64 {
1311 let elapsed = self.started_at.elapsed().as_secs_f64();
1312 let target = (elapsed * f64::from(AUX_SAMPLE_RATE)).floor() as u64;
1313 target.saturating_sub(self.aux_samples_generated)
1314 }
1315
1316 fn send_sample_batches(&mut self, batch_len: u64, addr: SocketAddr) -> io::Result<()> {
1317 let first_sample_n = self.sample_number;
1318 let segment_id = self.segment_id;
1319 let mut waveform_data = Vec::with_capacity((batch_len as usize) * SINE_SAMPLE_BYTES);
1320 let mut status_data = Vec::with_capacity((batch_len as usize) * STATUS_SAMPLE_BYTES);
1321 let noise_sigma = self.params.noise * (f64::from(self.sample_rate) / 2.0).sqrt();
1322
1323 for offset in 0..batch_len {
1324 let t = (self.samples_generated + offset) as f64 / f64::from(self.sample_rate);
1325 let phase = std::f64::consts::TAU * self.params.frequency * t;
1326 let ch1 = self.params.amplitude * phase.sin() + noise_sigma * self.rng.next_gaussian();
1327 let ch2 = self.params.amplitude * phase.cos() + noise_sigma * self.rng.next_gaussian();
1328 waveform_data.extend(ch1.to_le_bytes());
1329 waveform_data.extend(ch2.to_le_bytes());
1330 status_data.push(self.status);
1331 status_data.push(SIGNAL_LEVEL);
1332 }
1333
1334 self.send_stream_packet(
1335 SINE_STREAM_ID,
1336 first_sample_n,
1337 segment_id,
1338 waveform_data,
1339 addr,
1340 )?;
1341 self.send_stream_packet(
1342 STATUS_STREAM_ID,
1343 first_sample_n,
1344 segment_id,
1345 status_data,
1346 addr,
1347 )?;
1348
1349 for _ in 0..batch_len {
1350 self.advance_sample();
1351 }
1352
1353 Ok(())
1354 }
1355
1356 fn send_due_aux_samples(&mut self, addr: SocketAddr) -> io::Result<()> {
1357 for _ in 0..4 {
1358 let due = self.due_aux_samples();
1359 if due == 0 {
1360 break;
1361 }
1362
1363 let first_sample_n = self.aux_sample_number;
1364 let samples_until_drop = self
1365 .next_aux_drop_sample
1366 .saturating_sub(self.aux_samples_generated);
1367 if samples_until_drop == 0 {
1368 self.drop_aux_sample();
1369 self.send_aux_segment_update_if_needed(first_sample_n, addr)?;
1370 continue;
1371 }
1372
1373 let samples_left_in_segment =
1374 u64::from(self.aux_segment_samples - self.aux_sample_number);
1375 let batch_len = due
1376 .min(self.aux_max_samples_per_packet)
1377 .min(samples_left_in_segment)
1378 .min(samples_until_drop);
1379 self.send_aux_sample_batch(batch_len, addr)?;
1380 self.send_aux_segment_update_if_needed(first_sample_n, addr)?;
1381 }
1382 Ok(())
1383 }
1384
1385 fn send_aux_sample_batch(&mut self, batch_len: u64, addr: SocketAddr) -> io::Result<()> {
1386 let first_sample_n = self.aux_sample_number;
1387 let segment_id = self.aux_segment_id;
1388 let mut data = Vec::with_capacity((batch_len as usize) * AUX_SAMPLE_BYTES);
1389
1390 for offset in 0..batch_len {
1391 let t = (self.aux_samples_generated + offset) as f64 / f64::from(AUX_SAMPLE_RATE);
1392 let phase = (AUX_WAVE_FREQUENCY * t).fract();
1393 let triangle = 1.0 - 4.0 * (phase - 0.5).abs();
1394 let sawtooth = 2.0 * phase - 1.0;
1395 data.extend(triangle.to_le_bytes());
1396 data.extend(sawtooth.to_le_bytes());
1397 }
1398
1399 self.send_stream_packet(AUX_STREAM_ID, first_sample_n, segment_id, data, addr)?;
1400
1401 for _ in 0..batch_len {
1402 self.advance_aux_sample();
1403 }
1404
1405 Ok(())
1406 }
1407
1408 fn send_stream_packet(
1409 &self,
1410 stream_id: u8,
1411 first_sample_n: u32,
1412 segment_id: u8,
1413 data: Vec<u8>,
1414 addr: SocketAddr,
1415 ) -> io::Result<()> {
1416 self.send_packet(
1417 &proto::Packet {
1418 payload: proto::Payload::StreamData(proto::StreamDataPayload {
1419 stream_id,
1420 first_sample_n,
1421 segment_id,
1422 data,
1423 }),
1424 routing: proto::DeviceRoute::root(),
1425 ttl: 0,
1426 },
1427 addr,
1428 )
1429 }
1430
1431 fn advance_sample(&mut self) {
1432 self.samples_generated = self.samples_generated.wrapping_add(1);
1433 self.sample_number += 1;
1434 if self.sample_number >= self.segment_samples {
1435 self.sample_number = 0;
1436 self.segment_id = (self.segment_id + 1) % N_SEGMENTS;
1437 self.segment_start_time = self.segment_start_time.saturating_add(self.segment_seconds);
1438 self.pending_segment_update = true;
1439 }
1440 }
1441
1442 fn advance_aux_sample(&mut self) {
1443 self.aux_samples_generated = self.aux_samples_generated.wrapping_add(1);
1444 self.aux_sample_number += 1;
1445 if self.aux_sample_number >= self.aux_segment_samples {
1446 self.aux_sample_number = 0;
1447 self.aux_segment_id = (self.aux_segment_id + 1) % N_SEGMENTS;
1448 self.aux_segment_start_time = self
1449 .aux_segment_start_time
1450 .saturating_add(self.segment_seconds);
1451 self.aux_pending_segment_update = true;
1452 }
1453 }
1454
1455 fn drop_sample(&mut self) {
1456 terminal_println!(
1457 "dropped sample {} from streams {}/{}",
1458 self.samples_generated,
1459 SINE_STREAM_ID,
1460 STATUS_STREAM_ID
1461 );
1462 self.advance_sample();
1463 self.next_drop_sample =
1464 self.next_drop_sample_after(self.samples_generated, self.sample_rate);
1465 }
1466
1467 fn drop_aux_sample(&mut self) {
1468 terminal_println!(
1469 "dropped sample {} from stream {}",
1470 self.aux_samples_generated,
1471 AUX_STREAM_ID
1472 );
1473 self.advance_aux_sample();
1474 self.next_aux_drop_sample =
1475 self.next_drop_sample_after(self.aux_samples_generated, AUX_SAMPLE_RATE);
1476 }
1477
1478 fn drop_samples_now(&mut self) -> io::Result<()> {
1479 let addr = self.client.map(|client| client.addr);
1480 let first_sample_n = self.sample_number;
1481 self.drop_sample();
1482 if let Some(addr) = addr {
1483 self.send_sample_segment_updates_if_needed(first_sample_n, addr)?;
1484 }
1485
1486 let first_aux_sample_n = self.aux_sample_number;
1487 self.drop_aux_sample();
1488 if let Some(addr) = addr {
1489 self.send_aux_segment_update_if_needed(first_aux_sample_n, addr)?;
1490 }
1491
1492 Ok(())
1493 }
1494
1495 fn next_drop_sample_after(&mut self, current_sample: u64, sample_rate: u32) -> u64 {
1496 next_drop_sample_after(&mut self.rng, current_sample, sample_rate)
1497 }
1498
1499 fn next_session_id(&mut self) -> u32 {
1500 let mut session_id = (self.rng.next_u64() as u32)
1501 .wrapping_mul(1_664_525)
1502 .wrapping_add(1_013_904_223);
1503 if session_id == self.session_id {
1504 session_id = session_id.wrapping_add(1);
1505 }
1506 session_id
1507 }
1508
1509 fn next_log_delay(&mut self) -> Duration {
1510 next_log_delay(&mut self.rng)
1511 }
1512
1513 fn next_log_level(&mut self) -> proto::LogLevel {
1514 let levels = [
1515 proto::LogLevel::Critical,
1516 proto::LogLevel::Error,
1517 proto::LogLevel::Warning,
1518 proto::LogLevel::Info,
1519 proto::LogLevel::Debug,
1520 ];
1521 let level = levels[self.next_log_level % levels.len()];
1522 self.next_log_level = self.next_log_level.wrapping_add(1);
1523 level
1524 }
1525
1526 fn random_log_message(&mut self, level: proto::LogLevel, lucky_number: u32) -> String {
1527 let templates = [
1528 "lucky number {lucky} nudged the simulated flux loop",
1529 "telemetry monitor reported lucky number {lucky}",
1530 "calibration check landed on lucky number {lucky}",
1531 "simulated event counter reached lucky number {lucky}",
1532 "operator marker recorded lucky number {lucky}",
1533 "background diagnostic index settled at lucky number {lucky}",
1534 ];
1535 let template = templates[(self.rng.next_u64() as usize) % templates.len()];
1536 format!(
1537 "{}: {}",
1538 log_level_name(level),
1539 template.replace("{lucky}", &lucky_number.to_string())
1540 )
1541 }
1542
1543 fn send_sample_segment_updates_if_needed(
1544 &mut self,
1545 first_sample_n: u32,
1546 addr: SocketAddr,
1547 ) -> io::Result<()> {
1548 if self.pending_segment_update && first_sample_n == 0 {
1549 for stream_id in [SINE_STREAM_ID, STATUS_STREAM_ID] {
1550 self.send_packet(&self.segment_metadata(stream_id).make_update(), addr)?;
1551 }
1552 self.pending_segment_update = false;
1553 }
1554 Ok(())
1555 }
1556
1557 fn send_aux_segment_update_if_needed(
1558 &mut self,
1559 first_sample_n: u32,
1560 addr: SocketAddr,
1561 ) -> io::Result<()> {
1562 if self.aux_pending_segment_update && first_sample_n == 0 {
1563 self.send_packet(&self.segment_metadata(AUX_STREAM_ID).make_update(), addr)?;
1564 self.aux_pending_segment_update = false;
1565 }
1566 Ok(())
1567 }
1568
1569 fn send_initial_packets(&self, addr: SocketAddr) -> io::Result<()> {
1570 self.send_packet(&self.settings_packet(), addr)?;
1571 self.send_packet(&self.heartbeat_packet(), addr)?;
1572 self.send_packet(&self.device_metadata().make_update(), addr)?;
1573 for stream_id in Self::stream_ids() {
1574 self.send_packet(
1575 &self
1576 .stream_metadata(stream_id)
1577 .expect("known stream")
1578 .make_update(),
1579 addr,
1580 )?;
1581 self.send_packet(&self.segment_metadata(stream_id).make_update(), addr)?;
1582 for column_index in 0..Self::column_count(stream_id).expect("known stream") {
1583 self.send_packet(
1584 &self
1585 .column_metadata(stream_id, column_index)
1586 .expect("known column")
1587 .make_update(),
1588 addr,
1589 )?;
1590 }
1591 }
1592 Ok(())
1593 }
1594
1595 fn send_rpc_reply(
1596 &self,
1597 id: u16,
1598 reply: Vec<u8>,
1599 routing: proto::DeviceRoute,
1600 addr: SocketAddr,
1601 ) -> io::Result<()> {
1602 self.send_packet(
1603 &proto::Packet {
1604 payload: proto::Payload::RpcReply(proto::RpcReplyPayload { id, reply }),
1605 routing,
1606 ttl: 0,
1607 },
1608 addr,
1609 )
1610 }
1611
1612 fn send_rpc_error(
1613 &self,
1614 id: u16,
1615 error: proto::RpcErrorCode,
1616 routing: proto::DeviceRoute,
1617 addr: SocketAddr,
1618 ) -> io::Result<()> {
1619 self.send_packet(
1620 &proto::Packet {
1621 payload: proto::Payload::RpcError(proto::RpcErrorPayload {
1622 id,
1623 error,
1624 extra: Vec::new(),
1625 }),
1626 routing,
1627 ttl: 0,
1628 },
1629 addr,
1630 )
1631 }
1632
1633 fn send_packet(&self, packet: &proto::Packet, addr: SocketAddr) -> io::Result<()> {
1634 let raw = packet.serialize().map_err(|_| {
1635 io::Error::new(
1636 io::ErrorKind::InvalidData,
1637 format!("packet too large or invalid: {}", describe_packet(packet)),
1638 )
1639 })?;
1640 self.socket.send_to(&raw, addr)?;
1641 Ok(())
1642 }
1643
1644 fn all_metadata_reply(&self) -> io::Result<Vec<u8>> {
1645 let mut reply = Vec::new();
1646 self.append_metadata_record(&mut reply, u8::from(meta::MetadataType::Device), 0, 0)?;
1647 for stream_id in Self::stream_ids() {
1648 self.append_metadata_record(
1649 &mut reply,
1650 u8::from(meta::MetadataType::Stream),
1651 stream_id,
1652 0,
1653 )?;
1654 self.append_metadata_record(
1655 &mut reply,
1656 u8::from(meta::MetadataType::Segment),
1657 stream_id,
1658 self.segment_id,
1659 )?;
1660 for column_index in 0..Self::column_count(stream_id).expect("known stream") {
1661 self.append_metadata_record(
1662 &mut reply,
1663 u8::from(meta::MetadataType::Column),
1664 stream_id,
1665 column_index,
1666 )?;
1667 }
1668 }
1669 Ok(reply)
1670 }
1671
1672 fn append_metadata_record(
1673 &self,
1674 reply: &mut Vec<u8>,
1675 metadata_type: u8,
1676 stream_id: u8,
1677 index: u8,
1678 ) -> io::Result<()> {
1679 let (mtype, body) = match meta::MetadataType::from(metadata_type) {
1680 meta::MetadataType::Device => {
1681 let (fixed, varlen) = self
1682 .device_metadata()
1683 .serialize(&[], &[])
1684 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "device metadata"))?;
1685 (meta::MetadataType::Device, join_metadata(fixed, varlen))
1686 }
1687 meta::MetadataType::Stream => {
1688 let Some(stream) = self.stream_metadata(stream_id) else {
1689 return Err(io::Error::new(
1690 io::ErrorKind::InvalidInput,
1691 "unknown stream metadata",
1692 ));
1693 };
1694 let (fixed, varlen) = stream
1695 .serialize(&[], &[])
1696 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "stream metadata"))?;
1697 (meta::MetadataType::Stream, join_metadata(fixed, varlen))
1698 }
1699 meta::MetadataType::Segment if Self::is_known_stream(stream_id) => {
1700 let mut segment = self.segment_metadata(stream_id);
1701 segment.segment_id = index;
1702 let current_segment_id = if stream_id == AUX_STREAM_ID {
1703 self.aux_segment_id
1704 } else {
1705 self.segment_id
1706 };
1707 let current_start_time = if stream_id == AUX_STREAM_ID {
1708 self.aux_segment_start_time
1709 } else {
1710 self.segment_start_time
1711 };
1712 let delta_segments =
1713 u32::from((index + N_SEGMENTS - current_segment_id) % N_SEGMENTS);
1714 segment.start_time = current_start_time
1715 .saturating_add(delta_segments.saturating_mul(self.segment_seconds));
1716 let (fixed, varlen) = segment
1717 .serialize(&[], &[])
1718 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "segment metadata"))?;
1719 (meta::MetadataType::Segment, join_metadata(fixed, varlen))
1720 }
1721 meta::MetadataType::Column => {
1722 let Some(column) = self.column_metadata(stream_id, index) else {
1723 return Err(io::Error::new(
1724 io::ErrorKind::InvalidInput,
1725 "unknown column metadata",
1726 ));
1727 };
1728 let (fixed, varlen) = column
1729 .serialize(&[], &[])
1730 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "column metadata"))?;
1731 (meta::MetadataType::Column, join_metadata(fixed, varlen))
1732 }
1733 _ => {
1734 return Err(io::Error::new(
1735 io::ErrorKind::InvalidInput,
1736 "unknown metadata request",
1737 ))
1738 }
1739 };
1740
1741 let len = u8::try_from(body.len())
1742 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "metadata too large"))?;
1743 reply.push(u8::from(mtype));
1744 reply.push(len);
1745 reply.extend(body);
1746 Ok(())
1747 }
1748
1749 fn device_metadata(&self) -> meta::DeviceMetadata {
1750 meta::DeviceMetadata {
1751 serial_number: DEVICE_SERIAL.to_string(),
1752 firmware_hash: DEVICE_FIRMWARE.to_string(),
1753 n_streams: 3,
1754 session_id: self.session_id,
1755 name: DEVICE_NAME.to_string(),
1756 }
1757 }
1758
1759 fn stream_metadata(&self, stream_id: u8) -> Option<meta::StreamMetadata> {
1760 let stream = match stream_id {
1761 SINE_STREAM_ID => meta::StreamMetadata {
1762 stream_id: SINE_STREAM_ID,
1763 name: "sine".to_string(),
1764 n_columns: 2,
1765 n_segments: N_SEGMENTS as usize,
1766 sample_size: SINE_SAMPLE_BYTES,
1767 buf_samples: self.sample_rate as usize,
1768 },
1769 STATUS_STREAM_ID => meta::StreamMetadata {
1770 stream_id: STATUS_STREAM_ID,
1771 name: "status".to_string(),
1772 n_columns: 2,
1773 n_segments: N_SEGMENTS as usize,
1774 sample_size: STATUS_SAMPLE_BYTES,
1775 buf_samples: self.sample_rate as usize,
1776 },
1777 AUX_STREAM_ID => meta::StreamMetadata {
1778 stream_id: AUX_STREAM_ID,
1779 name: "aux".to_string(),
1780 n_columns: 2,
1781 n_segments: N_SEGMENTS as usize,
1782 sample_size: AUX_SAMPLE_BYTES,
1783 buf_samples: AUX_SAMPLE_RATE as usize,
1784 },
1785 _ => return None,
1786 };
1787 Some(stream)
1788 }
1789
1790 fn segment_metadata(&self, stream_id: u8) -> meta::SegmentMetadata {
1791 let (segment_id, start_time, sampling_rate) = match stream_id {
1792 AUX_STREAM_ID => (
1793 self.aux_segment_id,
1794 self.aux_segment_start_time,
1795 AUX_SAMPLE_RATE,
1796 ),
1797 _ => (self.segment_id, self.segment_start_time, self.sample_rate),
1798 };
1799
1800 meta::SegmentMetadata {
1801 stream_id,
1802 segment_id,
1803 flags: 0x01 | 0x02,
1804 time_ref_epoch: meta::MetadataEpoch::Unix,
1805 time_ref_serial: DEVICE_SERIAL.to_string(),
1806 time_ref_session_id: self.session_id,
1807 start_time,
1808 sampling_rate,
1809 decimation: 1,
1810 filter_cutoff: sampling_rate as f32 / 2.0,
1811 filter_type: meta::MetadataFilter::Unfiltered,
1812 }
1813 }
1814
1815 fn column_metadata(&self, stream_id: u8, index: u8) -> Option<meta::ColumnMetadata> {
1816 let column = match (stream_id, index) {
1817 (SINE_STREAM_ID, 0) => meta::ColumnMetadata {
1818 stream_id,
1819 index: index.into(),
1820 data_type: proto::DataType::Float64,
1821 name: "sine".to_string(),
1822 units: "V".to_string(),
1823 description: "Noisy sine wave".to_string(),
1824 },
1825 (SINE_STREAM_ID, 1) => meta::ColumnMetadata {
1826 stream_id,
1827 index: index.into(),
1828 data_type: proto::DataType::Float64,
1829 name: "cosine".to_string(),
1830 units: "V".to_string(),
1831 description: "Noisy quadrature wave".to_string(),
1832 },
1833 (STATUS_STREAM_ID, 0) => meta::ColumnMetadata {
1834 stream_id,
1835 index: index.into(),
1836 data_type: proto::DataType::UInt8,
1837 name: "status".to_string(),
1838 units: "".to_string(),
1839 description: "Mirrors the test.status RPC".to_string(),
1840 },
1841 (STATUS_STREAM_ID, 1) => meta::ColumnMetadata {
1842 stream_id,
1843 index: index.into(),
1844 data_type: proto::DataType::UInt8,
1845 name: "signal_level".to_string(),
1846 units: "".to_string(),
1847 description: "Fixed simulated signal level".to_string(),
1848 },
1849 (AUX_STREAM_ID, 0) => meta::ColumnMetadata {
1850 stream_id,
1851 index: index.into(),
1852 data_type: proto::DataType::Float64,
1853 name: "triangle".to_string(),
1854 units: "arb".to_string(),
1855 description: "Triangle wave".to_string(),
1856 },
1857 (AUX_STREAM_ID, 1) => meta::ColumnMetadata {
1858 stream_id,
1859 index: index.into(),
1860 data_type: proto::DataType::Float64,
1861 name: "sawtooth".to_string(),
1862 units: "arb".to_string(),
1863 description: "Sawtooth wave".to_string(),
1864 },
1865 _ => return None,
1866 };
1867 Some(column)
1868 }
1869
1870 fn is_known_stream(stream_id: u8) -> bool {
1871 matches!(stream_id, SINE_STREAM_ID | STATUS_STREAM_ID | AUX_STREAM_ID)
1872 }
1873
1874 fn column_count(stream_id: u8) -> Option<u8> {
1875 match stream_id {
1876 SINE_STREAM_ID | STATUS_STREAM_ID | AUX_STREAM_ID => Some(2),
1877 _ => None,
1878 }
1879 }
1880
1881 fn stream_ids() -> [u8; 3] {
1882 [SINE_STREAM_ID, STATUS_STREAM_ID, AUX_STREAM_ID]
1883 }
1884
1885 fn heartbeat_packet(&self) -> proto::Packet {
1886 proto::Packet {
1887 payload: proto::Payload::Heartbeat(proto::HeartbeatPayload::Session(self.session_id)),
1888 routing: proto::DeviceRoute::root(),
1889 ttl: 0,
1890 }
1891 }
1892
1893 fn settings_packet(&self) -> proto::Packet {
1894 proto::Packet {
1895 payload: proto::Payload::Settings(proto::SettingsPayload::RpcHash(self.rpc_hash)),
1896 routing: proto::DeviceRoute::root(),
1897 ttl: 0,
1898 }
1899 }
1900}
1901
1902fn join_metadata(mut fixed: Vec<u8>, varlen: Vec<u8>) -> Vec<u8> {
1903 fixed.extend(varlen);
1904 fixed
1905}
1906
1907fn stream_data_max_data_bytes() -> usize {
1908 proto::TIO_PACKET_MAX_TOTAL_SIZE
1909 .saturating_sub(proto::TIO_PACKET_HEADER_SIZE)
1910 .saturating_sub(proto::TIO_PACKET_MAX_ROUTING_SIZE)
1911 .saturating_sub(STREAM_DATA_HEADER_BYTES)
1912}
1913
1914fn max_stream_samples_per_packet(sample_bytes: usize) -> u64 {
1915 if sample_bytes == 0 {
1916 return 0;
1917 }
1918 (stream_data_max_data_bytes() / sample_bytes) as u64
1919}
1920
1921#[cfg(test)]
1922fn rpc_reply_max_reply_bytes() -> usize {
1923 proto::TIO_PACKET_MAX_TOTAL_SIZE
1924 .saturating_sub(proto::TIO_PACKET_HEADER_SIZE)
1925 .saturating_sub(proto::TIO_PACKET_MAX_ROUTING_SIZE)
1926 .saturating_sub(2)
1927}
1928
1929fn append_capture_metadata_string(varlen: &mut Vec<u8>, value: &str) -> u8 {
1930 let bytes = value.as_bytes();
1931 let len = bytes.len().min(usize::from(u8::MAX));
1932 varlen.extend(&bytes[..len]);
1933 len as u8
1934}
1935
1936fn next_drop_sample_after(rng: &mut GaussianRng, current_sample: u64, sample_rate: u32) -> u64 {
1937 let min_seconds = SAMPLE_DROP_INTERVAL_SECONDS - SAMPLE_DROP_JITTER_SECONDS;
1938 let seconds = min_seconds + rng.next_unit() * SAMPLE_DROP_JITTER_SECONDS * 2.0;
1939 let interval = (seconds * f64::from(sample_rate)).round().max(1.0) as u64;
1940 current_sample.saturating_add(interval)
1941}
1942
1943fn next_log_delay(rng: &mut GaussianRng) -> Duration {
1944 let jitter = LOG_MESSAGE_JITTER.mul_f64(rng.next_unit());
1945 LOG_MESSAGE_MIN_INTERVAL + jitter
1946}
1947
1948fn next_capture_sample_count(rng: &mut GaussianRng) -> usize {
1949 let span = CAPTURE_SAMPLE_COUNT_MAX - CAPTURE_SAMPLE_COUNT_MIN + 1;
1950 CAPTURE_SAMPLE_COUNT_MIN + (rng.next_u64() as usize % span)
1951}
1952
1953fn log_level_name(level: proto::LogLevel) -> &'static str {
1954 match level {
1955 proto::LogLevel::Critical => "critical",
1956 proto::LogLevel::Error => "error",
1957 proto::LogLevel::Warning => "warning",
1958 proto::LogLevel::Info => "info",
1959 proto::LogLevel::Debug => "debug",
1960 proto::LogLevel::Unknown(_) => "unknown",
1961 }
1962}
1963
1964fn describe_packet(packet: &proto::Packet) -> String {
1965 match &packet.payload {
1966 proto::Payload::StreamData(data) => format!(
1967 "stream data stream_id={} segment_id={} first_sample_n={} data_bytes={} max_data_bytes={}",
1968 data.stream_id,
1969 data.segment_id,
1970 data.first_sample_n,
1971 data.data.len(),
1972 stream_data_max_data_bytes()
1973 ),
1974 proto::Payload::Metadata(metadata) => format!("metadata {:?}", metadata.content),
1975 proto::Payload::RpcReply(reply) => {
1976 format!("rpc reply id={} reply_bytes={}", reply.id, reply.reply.len())
1977 }
1978 proto::Payload::RpcError(error) => {
1979 format!("rpc error id={} error={:?}", error.id, error.error)
1980 }
1981 proto::Payload::RpcRequest(request) => format!(
1982 "rpc request id={} method={:?} arg_bytes={}",
1983 request.id,
1984 request.method,
1985 request.arg.len()
1986 ),
1987 other => format!("{other:?}"),
1988 }
1989}
1990
1991fn unix_duration() -> Duration {
1992 SystemTime::now()
1993 .duration_since(UNIX_EPOCH)
1994 .unwrap_or_else(|_| Duration::from_secs(0))
1995}
1996
1997fn unix_time_secs(now: Duration) -> u32 {
1998 u32::try_from(now.as_secs()).unwrap_or(u32::MAX)
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003 use super::*;
2004 use clap::Parser;
2005
2006 #[test]
2007 fn crc32_matches_standard_check_value() {
2008 let crc = !crc32_update(0xFFFF_FFFF, b"123456789");
2011 assert_eq!(crc, 0xCBF4_3926);
2012 }
2013
2014 #[test]
2015 fn legacy_metadata_matches_firmware_encoding() {
2016 let spec = RpcSpec::new(
2018 "test.amplitude",
2019 TL_RPC_METHOD_PROP | tl_rpc_mk_float(8) | TL_RPC_PUBLIC_RW,
2020 );
2021 assert_eq!(
2022 spec.legacy_metadata(),
2023 0x8000 | (8 << 4) | 2 | 0x0100 | 0x0200
2024 );
2025
2026 let spec = RpcSpec::new(
2028 "rpc.hash",
2029 TL_RPC_METHOD_PROP | tl_rpc_mk_uint(4) | TL_RPC_PUBLIC_READ,
2030 );
2031 assert_eq!(spec.legacy_metadata(), 0x8000 | (4 << 4) | 0x0100);
2032
2033 let spec = RpcSpec::new(
2035 "dev.stop",
2036 TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
2037 );
2038 assert_eq!(spec.legacy_metadata(), 0x8000);
2039
2040 let spec = RpcSpec::new("rpc.list", TL_RPC_METHOD_STD | TL_RPC_PUBLIC_RW);
2042 assert_eq!(spec.legacy_metadata(), 0);
2043
2044 let spec = RpcSpec::with_extra_meta(
2046 "test.enable",
2047 TL_RPC_METHOD_PROP | tl_rpc_mk_uint(1) | TL_RPC_PUBLIC_RW,
2048 RpcMetaFlags::BOOL.bits(),
2049 );
2050 assert_eq!(
2051 spec.legacy_metadata(),
2052 0x8000 | (1 << 4) | 0x0100 | 0x0200 | RpcMetaFlags::BOOL.bits()
2053 );
2054 }
2055
2056 #[test]
2057 fn rpc_table_hash_covers_name_flags_desc_signature() {
2058 let table = vec![
2059 RpcSpec::new(
2060 "a.b",
2061 TL_RPC_METHOD_PROP | tl_rpc_mk_uint(4) | TL_RPC_PUBLIC_READ,
2062 ),
2063 RpcSpec::new(
2064 "c.d",
2065 TL_RPC_METHOD_ACTION | TL_RPC_TYPE_VOID | TL_RPC_PUBLIC_WRITE,
2066 ),
2067 ];
2068 let base = rpc_table_hash(&table);
2069
2070 assert_eq!(base, rpc_table_hash(&table.clone()));
2072
2073 let mut renamed = table.clone();
2075 renamed[0].name = "a.x";
2076 assert_ne!(base, rpc_table_hash(&renamed));
2077
2078 let mut reflagged = table.clone();
2080 reflagged[1].flags |= TL_RPC_PERSISTENT;
2081 assert_ne!(base, rpc_table_hash(&reflagged));
2082
2083 let mut redesc = table.clone();
2085 redesc[0].desc = "described";
2086 assert_ne!(base, rpc_table_hash(&redesc));
2087
2088 let mut resig = table;
2090 resig[0].signature = "u32";
2091 assert_ne!(base, rpc_table_hash(&resig));
2092 }
2093
2094 #[test]
2095 fn capture_buffer_exports_indexed_blocks_after_delay() {
2096 let now = Instant::now();
2097 let mut capture = CaptureBuffer::new();
2098 capture.block_size = 4;
2099 capture.begin_capture(
2100 (0u8..10).collect(),
2101 CaptureInfo {
2102 length: 10,
2103 ..CaptureInfo::default()
2104 },
2105 now + Duration::from_millis(500),
2106 );
2107
2108 assert!(capture.locked());
2109 assert_eq!(capture.status(), CAPTURE_STATUS_CAPTURING);
2110 assert_eq!(capture.block_count(), 3);
2111 assert!(capture.block(0).is_none());
2112
2113 capture.update(now + Duration::from_millis(499));
2114 assert!(capture.locked());
2115
2116 capture.update(now + Duration::from_millis(500));
2117 assert!(!capture.locked());
2118 assert_eq!(capture.status(), CAPTURE_STATUS_DONE);
2119 assert_eq!(capture.export_size(), 10);
2120 assert_eq!(capture.info().length, 10);
2121 assert_eq!(capture.block(0), Some(&[0, 1, 2, 3][..]));
2122 assert_eq!(capture.block(1), Some(&[4, 5, 6, 7][..]));
2123 assert_eq!(capture.block(2), Some(&[8, 9][..]));
2124 assert!(capture.block(3).is_none());
2125 }
2126
2127 #[test]
2128 fn default_capture_block_size_fits_rpc_replies() {
2129 assert!(usize::from(CAPTURE_DEFAULT_BLOCK_SIZE) <= rpc_reply_max_reply_bytes());
2130 }
2131
2132 #[test]
2133 fn capture_data_uses_current_sine_parameters() {
2134 let cli = SimulateCli::parse_from([
2135 "tio-simulate",
2136 "--port",
2137 "0",
2138 "--samplerate",
2139 "4",
2140 "--frequency",
2141 "1",
2142 "--amplitude",
2143 "2",
2144 "--noise",
2145 "0",
2146 ]);
2147 let mut device = TestDevice::new(cli).unwrap();
2148
2149 let (data, info) = device.generate_capture_data();
2150
2151 assert!(
2152 (CAPTURE_SAMPLE_COUNT_MIN as u32..=CAPTURE_SAMPLE_COUNT_MAX as u32)
2153 .contains(&info.length)
2154 );
2155 assert_eq!(data.len(), info.length as usize * CAPTURE_SAMPLE_BYTES);
2156 assert_eq!(info.y_calibration, 1.0);
2157 assert_eq!(info.x_offset, 0.0);
2158 assert_eq!(info.x_stride, 0.25);
2159
2160 let first = f32::from_le_bytes(data[0..4].try_into().unwrap());
2161 let second = f32::from_le_bytes(data[4..8].try_into().unwrap());
2162 assert_eq!(first, 0.0);
2163 assert!((second - 2.0).abs() < f32::EPSILON);
2164 }
2165
2166 #[test]
2167 fn capture_metadata_uses_tl_chibi_type_and_y_calibration() {
2168 let cli = SimulateCli::parse_from(["tio-simulate", "--port", "0"]);
2169 let mut device = TestDevice::new(cli).unwrap();
2170 let (data, info) = device.generate_capture_data();
2171 let data_len = data.len();
2172 device
2173 .capture
2174 .begin_capture(data, info, Instant::now() + Duration::from_millis(1));
2175 device
2176 .capture
2177 .update(Instant::now() + Duration::from_millis(1));
2178
2179 let metadata = device.capture_metadata_reply();
2180
2181 assert_eq!(metadata[0], CAPTURE_METADATA_FIXED_LEN);
2182 assert_eq!(metadata[1], CAPTURE_METADATA_VERSION);
2183 assert_eq!(metadata[2], u8::from(proto::DataType::Float32));
2184 assert_eq!(
2185 u32::from_le_bytes(metadata[4..8].try_into().unwrap()),
2186 u32::try_from(data_len).unwrap()
2187 );
2188 assert_eq!(
2189 u32::from_le_bytes(metadata[10..14].try_into().unwrap()),
2190 info.length
2191 );
2192 assert_eq!(
2193 f32::from_le_bytes(metadata[14..18].try_into().unwrap()),
2194 CAPTURE_Y_CALIBRATION
2195 );
2196 }
2197
2198 #[test]
2199 fn capture_sample_count_varies_within_range() {
2200 let mut rng = GaussianRng::new(1);
2201 let mut counts = Vec::new();
2202 for _ in 0..8 {
2203 counts.push(next_capture_sample_count(&mut rng));
2204 }
2205
2206 assert!(counts
2207 .iter()
2208 .all(|count| (CAPTURE_SAMPLE_COUNT_MIN..=CAPTURE_SAMPLE_COUNT_MAX).contains(count)));
2209 assert!(counts.windows(2).any(|pair| pair[0] != pair[1]));
2210 }
2211}