1#![warn(missing_docs)]
21
22use std::path::Path;
23
24use rvcsi_core::{
25 AdapterKind, AdapterProfile, CsiFrame, CsiSource, RvcsiError, SessionId, SourceHealth, SourceId,
26};
27
28pub mod chips;
29pub mod ffi;
30pub mod pcap;
31
32pub use chips::{
33 known_chips, known_pi_models, nexmon_adapter_profile, raspberry_pi_profile, NexmonChip,
34 RaspberryPiModel,
35};
36pub use ffi::{
37 decode_chanspec, decode_nexmon_udp, decode_record, encode_nexmon_udp, encode_record,
38 parse_nexmon_udp_header, shim_abi_version, DecodedChanspec, NexmonCsiHeader, NexmonFfiError,
39 NexmonRecord, NEXMON_CSI_FMT_INT16_IQ, NEXMON_HEADER_BYTES, NEXMON_MAGIC, RECORD_HEADER_BYTES,
40};
41pub use pcap::{
42 extract_udp_payload, synthetic_udp_pcap, PcapPacket, PcapReader, LINKTYPE_ETHERNET,
43 LINKTYPE_IPV4, LINKTYPE_LINUX_SLL, LINKTYPE_RAW, NEXMON_DEFAULT_PORT, PCAP_MAGIC_NS,
44 PCAP_MAGIC_US,
45};
46
47pub fn synthetic_nexmon_pcap(
52 frames: &[(u64, NexmonCsiHeader, Vec<f32>, Vec<f32>)],
53 port: u16,
54) -> Result<Vec<u8>, NexmonFfiError> {
55 let payloads: Vec<Vec<u8>> = frames
56 .iter()
57 .map(|(_, h, i, q)| encode_nexmon_udp(h, i, q))
58 .collect::<Result<_, _>>()?;
59 let refs: Vec<(u64, u16, &[u8])> = frames
60 .iter()
61 .zip(payloads.iter())
62 .map(|((ts, ..), p)| (*ts, port, p.as_slice()))
63 .collect();
64 Ok(pcap::synthetic_udp_pcap(&refs))
65}
66
67pub struct NexmonAdapter {
72 source_id: SourceId,
73 session_id: SessionId,
74 profile: AdapterProfile,
75 buf: Vec<u8>,
76 cursor: usize,
77 next_frame_id: u64,
78 delivered: u64,
79 rejected: u64,
80 status: Option<String>,
81}
82
83impl NexmonAdapter {
84 pub fn from_bytes(
86 source_id: impl Into<SourceId>,
87 session_id: SessionId,
88 bytes: impl Into<Vec<u8>>,
89 ) -> Self {
90 debug_assert_eq!(
92 shim_abi_version() >> 16,
93 1,
94 "rvcsi_nexmon_shim major ABI mismatch"
95 );
96 NexmonAdapter {
97 source_id: source_id.into(),
98 session_id,
99 profile: AdapterProfile::nexmon_default(),
100 buf: bytes.into(),
101 cursor: 0,
102 next_frame_id: 0,
103 delivered: 0,
104 rejected: 0,
105 status: None,
106 }
107 }
108
109 pub fn from_file(
111 source_id: impl Into<SourceId>,
112 session_id: SessionId,
113 path: impl AsRef<Path>,
114 ) -> Result<Self, RvcsiError> {
115 let bytes = std::fs::read(path)?;
116 Ok(Self::from_bytes(source_id, session_id, bytes))
117 }
118
119 pub fn with_profile(mut self, profile: AdapterProfile) -> Self {
121 self.profile = profile;
122 self
123 }
124
125 pub fn frames_from_bytes(
131 source_id: impl Into<SourceId>,
132 session_id: SessionId,
133 bytes: &[u8],
134 ) -> Result<Vec<CsiFrame>, RvcsiError> {
135 let mut adapter = NexmonAdapter::from_bytes(source_id, session_id, bytes.to_vec());
136 let mut out = Vec::new();
137 while let Some(frame) = adapter.next_frame()? {
138 out.push(frame);
139 }
140 Ok(out)
141 }
142
143 fn record_to_frame(&mut self, rec: NexmonRecord) -> CsiFrame {
144 let fid = self.next_frame_id;
145 self.next_frame_id += 1;
146 let mut frame = CsiFrame::from_iq(
147 fid.into(),
148 self.session_id,
149 self.source_id.clone(),
150 AdapterKind::Nexmon,
151 rec.timestamp_ns,
152 rec.channel,
153 rec.bandwidth_mhz,
154 rec.i_values,
155 rec.q_values,
156 );
157 if let Some(r) = rec.rssi_dbm {
158 frame.rssi_dbm = Some(r);
159 }
160 if let Some(n) = rec.noise_floor_dbm {
161 frame.noise_floor_dbm = Some(n);
162 }
163 frame
164 }
165}
166
167impl CsiSource for NexmonAdapter {
168 fn profile(&self) -> &AdapterProfile {
169 &self.profile
170 }
171
172 fn session_id(&self) -> SessionId {
173 self.session_id
174 }
175
176 fn source_id(&self) -> &SourceId {
177 &self.source_id
178 }
179
180 fn next_frame(&mut self) -> Result<Option<CsiFrame>, RvcsiError> {
181 if self.cursor >= self.buf.len() {
182 return Ok(None);
183 }
184 let remaining = &self.buf[self.cursor..];
185 match decode_record(remaining) {
186 Ok((rec, consumed)) => {
187 self.cursor += consumed;
188 self.delivered += 1;
189 Ok(Some(self.record_to_frame(rec)))
190 }
191 Err(e) => {
192 self.rejected += 1;
193 self.status = Some(format!("malformed record at byte {}: {e}", self.cursor));
194 self.cursor = self.buf.len();
197 Err(RvcsiError::adapter(
198 "nexmon",
199 format!("malformed record: {e}"),
200 ))
201 }
202 }
203 }
204
205 fn health(&self) -> SourceHealth {
206 SourceHealth {
207 connected: self.cursor < self.buf.len(),
208 frames_delivered: self.delivered,
209 frames_rejected: self.rejected,
210 status: self.status.clone(),
211 }
212 }
213}
214
215pub struct NexmonPcapAdapter {
224 source_id: SourceId,
225 session_id: SessionId,
226 profile: AdapterProfile,
227 detected_chip: NexmonChip,
228 frames: Vec<CsiFrame>,
229 headers: Vec<NexmonCsiHeader>,
230 link_type: u32,
231 cursor: usize,
232 skipped: u64,
233}
234
235fn detect_chip(headers: &[NexmonCsiHeader]) -> NexmonChip {
238 match headers.first() {
239 None => NexmonChip::Bcm43455c0, Some(h0) => {
241 let ver = h0.chip_ver;
242 if headers.iter().all(|h| h.chip_ver == ver) {
243 NexmonChip::from_chip_ver(ver)
244 } else {
245 NexmonChip::Unknown { chip_ver: 0 }
246 }
247 }
248 }
249}
250
251impl NexmonPcapAdapter {
252 pub fn parse(
257 source_id: impl Into<SourceId>,
258 session_id: SessionId,
259 pcap_bytes: &[u8],
260 port: Option<u16>,
261 ) -> Result<Self, RvcsiError> {
262 debug_assert_eq!(shim_abi_version() >> 16, 1, "rvcsi_nexmon_shim major ABI mismatch");
263 let source_id = source_id.into();
264 let reader = PcapReader::parse(pcap_bytes)?;
265 let link_type = reader.link_type();
266 let want_port = port.or(Some(NEXMON_DEFAULT_PORT));
267 let mut frames = Vec::new();
268 let mut headers = Vec::new();
269 let mut skipped = 0u64;
270 let mut next_fid = 0u64;
271 for (ts_ns, _dst_port, payload) in reader.udp_payloads(want_port) {
272 match decode_nexmon_udp(payload, NEXMON_CSI_FMT_INT16_IQ) {
273 Ok((hdr, rec)) => {
274 let mut frame = CsiFrame::from_iq(
275 next_fid.into(),
276 session_id,
277 source_id.clone(),
278 AdapterKind::Nexmon,
279 ts_ns,
280 rec.channel,
281 rec.bandwidth_mhz,
282 rec.i_values,
283 rec.q_values,
284 );
285 next_fid += 1;
286 frame.rssi_dbm = rec.rssi_dbm;
287 frame.noise_floor_dbm = rec.noise_floor_dbm;
288 frames.push(frame);
289 headers.push(hdr);
290 }
291 Err(_) => skipped += 1,
292 }
293 }
294 if let Some(p) = want_port {
296 skipped += reader.udp_payloads(None).filter(|(_, dp, _)| *dp != p).count() as u64;
297 }
298 let detected_chip = detect_chip(&headers);
299 Ok(NexmonPcapAdapter {
300 source_id,
301 session_id,
302 profile: nexmon_adapter_profile(detected_chip),
303 detected_chip,
304 frames,
305 headers,
306 link_type,
307 cursor: 0,
308 skipped,
309 })
310 }
311
312 pub fn with_chip(mut self, chip: NexmonChip) -> Self {
315 self.detected_chip = chip;
316 self.profile = nexmon_adapter_profile(chip);
317 self
318 }
319
320 pub fn with_pi_model(mut self, model: RaspberryPiModel) -> Self {
323 self.detected_chip = model.nexmon_chip();
324 self.profile = raspberry_pi_profile(model);
325 self
326 }
327
328 pub fn detected_chip(&self) -> NexmonChip {
331 self.detected_chip
332 }
333
334 pub fn open(
336 source_id: impl Into<SourceId>,
337 session_id: SessionId,
338 path: impl AsRef<Path>,
339 port: Option<u16>,
340 ) -> Result<Self, RvcsiError> {
341 let bytes = std::fs::read(path)?;
342 Self::parse(source_id, session_id, &bytes, port)
343 }
344
345 pub fn frames_from_pcap_bytes(
347 source_id: impl Into<SourceId>,
348 session_id: SessionId,
349 pcap_bytes: &[u8],
350 port: Option<u16>,
351 ) -> Result<Vec<CsiFrame>, RvcsiError> {
352 Ok(Self::parse(source_id, session_id, pcap_bytes, port)?.frames)
353 }
354
355 pub fn link_type(&self) -> u32 {
357 self.link_type
358 }
359
360 pub fn headers(&self) -> &[NexmonCsiHeader] {
362 &self.headers
363 }
364
365 pub fn frame_count(&self) -> usize {
367 self.frames.len()
368 }
369}
370
371impl CsiSource for NexmonPcapAdapter {
372 fn profile(&self) -> &AdapterProfile {
373 &self.profile
374 }
375
376 fn session_id(&self) -> SessionId {
377 self.session_id
378 }
379
380 fn source_id(&self) -> &SourceId {
381 &self.source_id
382 }
383
384 fn next_frame(&mut self) -> Result<Option<CsiFrame>, RvcsiError> {
385 let frame = self.frames.get(self.cursor).cloned();
386 if frame.is_some() {
387 self.cursor += 1;
388 }
389 Ok(frame)
390 }
391
392 fn health(&self) -> SourceHealth {
393 SourceHealth {
394 connected: self.cursor < self.frames.len(),
395 frames_delivered: self.cursor as u64,
396 frames_rejected: self.skipped,
397 status: Some(format!(
398 "pcap link_type={}, {} CSI frame(s), {} non-CSI/skipped",
399 self.link_type,
400 self.frames.len(),
401 self.skipped
402 )),
403 }
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use rvcsi_core::{validate_frame, ValidationPolicy, ValidationStatus};
411
412 fn make_record(ts: u64, ch: u16, n: usize, rssi: Option<i16>) -> Vec<u8> {
413 let i: Vec<f32> = (0..n).map(|k| (k as f32) * 0.5).collect();
414 let q: Vec<f32> = (0..n).map(|k| -(k as f32) * 0.25).collect();
415 let rec = NexmonRecord {
416 subcarrier_count: n as u16,
417 channel: ch,
418 bandwidth_mhz: 80,
419 rssi_dbm: rssi,
420 noise_floor_dbm: Some(-92),
421 timestamp_ns: ts,
422 i_values: i,
423 q_values: q,
424 };
425 encode_record(&rec).expect("encode")
426 }
427
428 #[test]
429 fn abi_version_is_one_point_one() {
430 assert_eq!(shim_abi_version(), 0x0001_0001);
432 assert_eq!(shim_abi_version() >> 16, 1, "major ABI must stay 1");
433 }
434
435 #[test]
436 fn roundtrip_single_record_via_c_shim() {
437 let bytes = make_record(123_456, 36, 64, Some(-58));
438 let (rec, consumed) = decode_record(&bytes).expect("decode");
439 assert_eq!(consumed, bytes.len());
440 assert_eq!(rec.subcarrier_count, 64);
441 assert_eq!(rec.channel, 36);
442 assert_eq!(rec.bandwidth_mhz, 80);
443 assert_eq!(rec.rssi_dbm, Some(-58));
444 assert_eq!(rec.noise_floor_dbm, Some(-92));
445 assert_eq!(rec.timestamp_ns, 123_456);
446 assert_eq!(rec.i_values.len(), 64);
447 assert_eq!(rec.i_values[1], 0.5);
449 assert_eq!(rec.q_values[1], -0.25);
450 }
451
452 #[test]
453 fn adapter_streams_multiple_records_then_validates() {
454 let mut buf = make_record(1_000, 6, 56, Some(-60));
455 buf.extend(make_record(2_000, 6, 56, Some(-61)));
456 buf.extend(make_record(3_000, 6, 56, None));
457
458 let mut adapter = NexmonAdapter::from_bytes("nexmon-test", SessionId(7), buf);
459 let mut frames = Vec::new();
460 while let Some(f) = adapter.next_frame().unwrap() {
461 frames.push(f);
462 }
463 assert_eq!(frames.len(), 3);
464 assert_eq!(frames[0].timestamp_ns, 1_000);
465 assert_eq!(frames[2].rssi_dbm, None);
466 assert_eq!(adapter.health().frames_delivered, 3);
467 assert!(!adapter.health().connected);
468
469 let mut f = frames[0].clone();
471 let err = validate_frame(&mut f, adapter.profile(), &ValidationPolicy::default(), None);
472 assert!(err.is_err());
473
474 let mut f = frames[0].clone();
476 validate_frame(
477 &mut f,
478 &AdapterProfile::offline(AdapterKind::Nexmon),
479 &ValidationPolicy::default(),
480 None,
481 )
482 .unwrap();
483 assert_eq!(f.validation, ValidationStatus::Accepted);
484 }
485
486 #[test]
487 fn truncated_buffer_is_a_structured_error_not_a_panic() {
488 let bytes = make_record(1, 6, 64, Some(-60));
489 let truncated = &bytes[..bytes.len() - 10];
490 let err = decode_record(truncated).unwrap_err();
491 assert!(err.to_string().to_lowercase().contains("trunc") || err.to_string().to_lowercase().contains("short"));
492
493 let mut adapter = NexmonAdapter::from_bytes("t", SessionId(0), truncated.to_vec());
494 assert!(adapter.next_frame().is_err());
495 assert_eq!(adapter.health().frames_rejected, 1);
496 }
497
498 #[test]
499 fn bad_magic_is_rejected() {
500 let mut bytes = make_record(1, 6, 64, Some(-60));
501 bytes[0] = 0xFF;
502 assert!(decode_record(&bytes).is_err());
503 }
504
505 #[test]
506 fn frames_from_bytes_helper() {
507 let mut buf = make_record(10, 1, 64, Some(-50));
508 buf.extend(make_record(20, 1, 64, Some(-51)));
509 let frames = NexmonAdapter::frames_from_bytes("t", SessionId(1), &buf).unwrap();
510 assert_eq!(frames.len(), 2);
511 assert_eq!(frames[1].timestamp_ns, 20);
512 }
513
514 fn synth_nexmon_payload(rssi: i16, chanspec: u16, nsub: u16, seq: u16) -> Vec<u8> {
518 let hdr = NexmonCsiHeader {
519 rssi_dbm: rssi,
520 fctl: 0x08,
521 src_mac: [0xde, 0xad, 0xbe, 0xef, 0x00, 0x02],
522 seq_cnt: seq,
523 core: 0,
524 spatial_stream: 0,
525 chanspec,
526 chip_ver: 0x4345,
527 channel: 0,
528 bandwidth_mhz: 0,
529 is_5ghz: false,
530 subcarrier_count: nsub,
531 };
532 let i: Vec<f32> = (0..nsub).map(|k| (k as i16 - 32) as f32).collect();
533 let q: Vec<f32> = (0..nsub).map(|k| (seq as i16 + k as i16) as f32).collect();
534 encode_nexmon_udp(&hdr, &i, &q).expect("encode nexmon payload")
535 }
536
537 fn eth_ip_udp(dst_port: u16, payload: &[u8]) -> Vec<u8> {
539 let mut f = vec![
540 1, 2, 3, 4, 5, 6, 10, 11, 12, 13, 14, 15, ];
543 f.extend_from_slice(&0x0800u16.to_be_bytes()); let total = (20 + 8 + payload.len()) as u16;
545 f.extend_from_slice(&[0x45, 0x00]);
546 f.extend_from_slice(&total.to_be_bytes());
547 f.extend_from_slice(&[0, 0, 0, 0, 64, 17, 0, 0]); f.extend_from_slice(&[10, 0, 0, 1, 10, 0, 0, 20]); f.extend_from_slice(&54321u16.to_be_bytes()); f.extend_from_slice(&dst_port.to_be_bytes()); f.extend_from_slice(&((8 + payload.len()) as u16).to_be_bytes()); f.extend_from_slice(&[0, 0]); f.extend_from_slice(payload);
554 f
555 }
556
557 fn pcap_le_us(link_type: u32, recs: &[(u32, u32, Vec<u8>)]) -> Vec<u8> {
559 let mut b = Vec::new();
560 b.extend_from_slice(&0xa1b2_c3d4u32.to_le_bytes());
561 b.extend_from_slice(&[2, 0, 4, 0]); b.extend_from_slice(&0u32.to_le_bytes()); b.extend_from_slice(&0u32.to_le_bytes()); b.extend_from_slice(&65535u32.to_le_bytes()); b.extend_from_slice(&link_type.to_le_bytes());
566 for (s, us, f) in recs {
567 b.extend_from_slice(&s.to_le_bytes());
568 b.extend_from_slice(&us.to_le_bytes());
569 b.extend_from_slice(&(f.len() as u32).to_le_bytes());
570 b.extend_from_slice(&(f.len() as u32).to_le_bytes());
571 b.extend_from_slice(f);
572 }
573 b
574 }
575
576 #[test]
577 fn pcap_adapter_decodes_real_nexmon_csi_packets() {
578 let chanspec = 0xc000u16 | 0x2000 | 36; let nsub = 256u16;
580 let recs = vec![
581 (1_000u32, 100_000u32, eth_ip_udp(5500, &synth_nexmon_payload(-58, chanspec, nsub, 1))),
582 (1_000u32, 600_000u32, eth_ip_udp(9999, &[0xaa; 8])), (1_001u32, 0u32, eth_ip_udp(5500, &synth_nexmon_payload(-61, chanspec, nsub, 2))),
584 (1_001u32, 50_000u32, eth_ip_udp(5500, &[0x42; 30])), ];
586 let pcap = pcap_le_us(LINKTYPE_ETHERNET, &recs);
587
588 let mut adapter = NexmonPcapAdapter::parse("nexmon-pcap", SessionId(9), &pcap, None).unwrap();
589 assert_eq!(adapter.link_type(), LINKTYPE_ETHERNET);
590 assert_eq!(adapter.frame_count(), 2);
591 assert_eq!(adapter.headers().len(), 2);
592 assert_eq!(adapter.headers()[0].chanspec, chanspec);
593 assert_eq!(adapter.headers()[0].channel, 36);
594 assert_eq!(adapter.headers()[0].bandwidth_mhz, 80);
595 assert!(adapter.headers()[0].is_5ghz);
596 assert_eq!(adapter.headers()[1].seq_cnt, 2);
597
598 let mut frames = Vec::new();
599 while let Some(f) = adapter.next_frame().unwrap() {
600 frames.push(f);
601 }
602 assert_eq!(frames.len(), 2);
603 assert_eq!(frames[0].adapter_kind, AdapterKind::Nexmon);
604 assert_eq!(frames[0].channel, 36);
605 assert_eq!(frames[0].bandwidth_mhz, 80);
606 assert_eq!(frames[0].rssi_dbm, Some(-58));
607 assert_eq!(frames[0].subcarrier_count, nsub);
608 assert_eq!(frames[0].timestamp_ns, 1_000 * 1_000_000_000 + 100_000 * 1_000);
610 assert_eq!(frames[1].timestamp_ns, 1_001 * 1_000_000_000);
611
612 let h = adapter.health();
613 assert!(!h.connected);
614 assert_eq!(h.frames_delivered, 2);
615 assert!(h.frames_rejected >= 2); }
617
618 #[test]
619 fn pcap_adapter_validates_decoded_frames() {
620 let pcap = pcap_le_us(
621 LINKTYPE_ETHERNET,
622 &[(1u32, 0u32, eth_ip_udp(5500, &synth_nexmon_payload(-60, 0x1000 | 6, 64, 7)))],
623 );
624 let frames = NexmonPcapAdapter::frames_from_pcap_bytes("p", SessionId(0), &pcap, Some(5500)).unwrap();
625 assert_eq!(frames.len(), 1);
626 let mut f = frames[0].clone();
628 validate_frame(
629 &mut f,
630 &AdapterProfile::offline(AdapterKind::Nexmon),
631 &ValidationPolicy::default(),
632 None,
633 )
634 .unwrap();
635 assert_eq!(f.validation, ValidationStatus::Accepted);
636 assert_eq!(f.channel, 6);
637 assert_eq!(f.bandwidth_mhz, 20);
638 }
639
640 #[test]
641 fn pcap_adapter_rejects_garbage_pcap() {
642 assert!(NexmonPcapAdapter::parse("p", SessionId(0), &[0u8; 8], None).is_err());
643 assert!(NexmonPcapAdapter::open("p", SessionId(0), "/no/such/file.pcap", None).is_err());
644 }
645
646 #[test]
647 fn pcap_adapter_auto_detects_raspberry_pi_5_chip() {
648 let chanspec = 0xc000u16 | 0x2000 | 36; let nsub = 256u16;
652 let pcap = pcap_le_us(
653 LINKTYPE_ETHERNET,
654 &[
655 (1u32, 0u32, eth_ip_udp(5500, &synth_nexmon_payload(-58, chanspec, nsub, 1))),
656 (1u32, 50_000u32, eth_ip_udp(5500, &synth_nexmon_payload(-59, chanspec, nsub, 2))),
657 ],
658 );
659 let adapter = NexmonPcapAdapter::parse("pi5-cap", SessionId(1), &pcap, None).unwrap();
660 assert_eq!(adapter.detected_chip(), NexmonChip::Bcm43455c0);
661 assert_eq!(adapter.headers()[0].chip(), NexmonChip::Bcm43455c0);
662 let p = adapter.profile();
664 assert_eq!(p.supported_bandwidths_mhz, vec![20, 40, 80]);
665 assert!(p.accepts_subcarrier_count(256));
666 assert!(p.accepts_channel(36));
667 let mut f = adapter.frames[0].clone();
669 validate_frame(&mut f, &raspberry_pi_profile(RaspberryPiModel::Pi5), &ValidationPolicy::default(), None).unwrap();
670 assert_eq!(f.validation, ValidationStatus::Accepted);
671
672 let a2 = NexmonPcapAdapter::parse("p", SessionId(0), &pcap, None).unwrap().with_pi_model(RaspberryPiModel::Pi5);
674 assert_eq!(a2.detected_chip(), NexmonChip::Bcm43455c0);
675 assert!(a2.profile().chip.as_deref().unwrap().contains("pi5"));
676 }
677}