Skip to main content

bench_cam_rx/
bench_cam_rx.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2024 Fundació Privada Internet i Innovació Digital a Catalunya (i2CAT)
3
4//! Benchmark: Maximum CAM RX throughput.
5//!
6//! Listens on BTP port 2001, decodes every packet with `CamCoder::decode()`,
7//! and reports packets/s plus UPER decode latency in µs.
8//!
9//! Pair with `bench_cam_tx` (possibly on another machine or loopback) to load
10//! the receiver.
11//!
12//! # Usage
13//! ```text
14//! sudo cargo run --release --example bench_cam_rx -- [interface] [duration_s]
15//! # defaults: lo, 30 s
16//! ```
17
18use rustflexstack::btp::router::{BTPRouterHandle, Router as BTPRouter};
19use rustflexstack::btp::service_access_point::BTPDataIndication;
20use rustflexstack::facilities::ca_basic_service::cam_coder::CamCoder;
21use rustflexstack::geonet::gn_address::{GNAddress, M, MID, ST};
22use rustflexstack::geonet::mib::Mib;
23use rustflexstack::geonet::position_vector::LongPositionVector;
24use rustflexstack::geonet::router::{Router as GNRouter, RouterHandle};
25use rustflexstack::geonet::service_access_point::{GNDataIndication, GNDataRequest};
26use rustflexstack::link_layer::raw_link_layer::RawLinkLayer;
27
28use std::env;
29use std::sync::mpsc;
30use std::thread;
31use std::time::{Duration, Instant};
32
33fn main() {
34    let iface = env::args().nth(1).unwrap_or_else(|| "lo".to_string());
35    let duration_s = env::args()
36        .nth(2)
37        .and_then(|s| s.parse::<u64>().ok())
38        .unwrap_or(30);
39
40    println!("=== Benchmark: Maximum CAM RX throughput ===");
41    println!("Interface : {iface}");
42    println!("Duration  : {duration_s} s\n");
43
44    // ── MAC / MIB ─────────────────────────────────────────────────────────────
45    let mac = random_mac();
46    let mut mib = Mib::new();
47    mib.itsGnLocalGnAddr = GNAddress::new(M::GnMulticast, ST::PassengerCar, MID::new(mac));
48    mib.itsGnBeaconServiceRetransmitTimer = 0;
49
50    // ── Routers + link layer ──────────────────────────────────────────────────
51    let (gn_handle, gn_to_ll_rx, gn_to_btp_rx) = GNRouter::spawn(mib, None, None, None);
52    let (btp_handle, btp_to_gn_rx) = BTPRouter::spawn(mib);
53
54    let (ll_to_gn_tx, ll_to_gn_rx) = mpsc::channel::<Vec<u8>>();
55    RawLinkLayer::new(ll_to_gn_tx, gn_to_ll_rx, &iface, mac).start();
56    wire_routers(
57        &gn_handle,
58        &btp_handle,
59        ll_to_gn_rx,
60        gn_to_btp_rx,
61        btp_to_gn_rx,
62    );
63
64    // Seed position vector.
65    let mut epv = LongPositionVector::decode([0u8; 24]);
66    epv.update_from_gps(41.552, 2.134, 0.0, 0.0, true);
67    gn_handle.update_position_vector(epv);
68    thread::sleep(Duration::from_millis(50));
69
70    // ── Register on BTP port 2001 ─────────────────────────────────────────────
71    let (ind_tx, ind_rx) = mpsc::channel::<BTPDataIndication>();
72    btp_handle.register_port(2001, ind_tx);
73
74    // ── Benchmark loop ────────────────────────────────────────────────────────
75    println!("Waiting for CAMs on port 2001…\n");
76    println!(
77        "{:>7}  {:>10}  {:>12}  {:>12}  {:>10}",
78        "time(s)", "total_recv", "rate(pkt/s)", "avg_dec(µs)", "errors"
79    );
80
81    let coder = CamCoder::new();
82    let mut total_recv: u64 = 0;
83    let mut total_errors: u64 = 0;
84    let mut total_dec_us: u128 = 0;
85    let bench_start = Instant::now();
86    let bench_end = bench_start + Duration::from_secs(duration_s);
87    let mut win_start = Instant::now();
88    let mut win_recv: u64 = 0;
89
90    loop {
91        let now = Instant::now();
92        if now >= bench_end {
93            break;
94        }
95
96        let timeout = (bench_end - now).min(Duration::from_millis(500));
97        match ind_rx.recv_timeout(timeout) {
98            Ok(ind) => {
99                let t0 = Instant::now();
100                match coder.decode(&ind.data) {
101                    Ok(_) => {}
102                    Err(e) => {
103                        total_errors += 1;
104                        eprintln!("[RX] Decode error: {e}");
105                    }
106                }
107                let dec_us = t0.elapsed().as_micros();
108                total_recv += 1;
109                total_dec_us += dec_us;
110                win_recv += 1;
111            }
112            Err(mpsc::RecvTimeoutError::Timeout) => {}
113            Err(mpsc::RecvTimeoutError::Disconnected) => break,
114        }
115
116        let win_elapsed = win_start.elapsed();
117        if win_elapsed >= Duration::from_secs(1) {
118            let pps = win_recv as f64 / win_elapsed.as_secs_f64();
119            let avg_dec = if total_recv > 0 {
120                total_dec_us / total_recv as u128
121            } else {
122                0
123            };
124            println!(
125                "{:>7.1}  {:>10}  {:>12.1}  {:>12}  {:>10}",
126                bench_start.elapsed().as_secs_f64(),
127                total_recv,
128                pps,
129                avg_dec,
130                total_errors
131            );
132            win_start = Instant::now();
133            win_recv = 0;
134        }
135    }
136
137    // ── Summary ───────────────────────────────────────────────────────────────
138    let elapsed = bench_start.elapsed().as_secs_f64();
139    let avg_rate = total_recv as f64 / elapsed;
140    let avg_decode = if total_recv > 0 {
141        total_dec_us / total_recv as u128
142    } else {
143        0
144    };
145
146    println!();
147    println!("=== CAM RX Results ===");
148    println!("  Total received  : {total_recv}");
149    println!("  Decode errors   : {total_errors}");
150    println!("  Elapsed         : {elapsed:.3} s");
151    println!("  Average rate    : {avg_rate:.1} pkt/s");
152    println!("  Avg decode time : {avg_decode} µs");
153}
154
155// ─── Helpers ─────────────────────────────────────────────────────────────────
156
157fn random_mac() -> [u8; 6] {
158    use std::time::{SystemTime, UNIX_EPOCH};
159    let s = SystemTime::now()
160        .duration_since(UNIX_EPOCH)
161        .unwrap()
162        .subsec_nanos();
163    [
164        0x02,
165        (s >> 24) as u8,
166        (s >> 16) as u8,
167        (s >> 8) as u8,
168        s as u8,
169        0xCC,
170    ]
171}
172
173fn wire_routers(
174    gn: &RouterHandle,
175    btp: &BTPRouterHandle,
176    ll_rx: mpsc::Receiver<Vec<u8>>,
177    gn_btp_rx: mpsc::Receiver<GNDataIndication>,
178    btp_gn_rx: mpsc::Receiver<GNDataRequest>,
179) {
180    let g1 = gn.clone();
181    thread::spawn(move || {
182        while let Ok(p) = ll_rx.recv() {
183            g1.send_incoming_packet(p);
184        }
185    });
186    let b1 = btp.clone();
187    thread::spawn(move || {
188        while let Ok(i) = gn_btp_rx.recv() {
189            b1.send_gn_data_indication(i);
190        }
191    });
192    let g2 = gn.clone();
193    thread::spawn(move || {
194        while let Ok(r) = btp_gn_rx.recv() {
195            g2.send_gn_data_request(r);
196        }
197    });
198}