1use rustflexstack::btp::router::{BTPRouterHandle, Router as BTPRouter};
21use rustflexstack::btp::service_access_point::{BTPDataIndication, BTPDataRequest};
22use rustflexstack::facilities::ca_basic_service::cam_coder::{
23 cam_header, generation_delta_time_now as cam_gdt, AccelerationComponent,
24 AccelerationConfidence, AccelerationValue, Altitude, AltitudeConfidence, AltitudeValue,
25 BasicContainer, BasicVehicleContainerHighFrequency, Cam, CamCoder, CamParameters, CamPayload,
26 Curvature, CurvatureCalculationMode, CurvatureConfidence, CurvatureValue, DriveDirection,
27 Heading, HeadingConfidence, HeadingValue, HighFrequencyContainer, Latitude, Longitude,
28 PositionConfidenceEllipse, ReferencePositionWithConfidence, SemiAxisLength, Speed,
29 SpeedConfidence, SpeedValue, TrafficParticipantType, VehicleLength,
30 VehicleLengthConfidenceIndication, VehicleLengthValue, VehicleWidth, Wgs84AngleValue, YawRate,
31 YawRateConfidence, YawRateValue,
32};
33use rustflexstack::facilities::decentralized_environmental_notification_service::denm_coder::CauseCodeChoice;
34use rustflexstack::facilities::decentralized_environmental_notification_service::{
35 denm_coder::AccidentSubCauseCode, DENRequest, DecentralizedEnvironmentalNotificationService,
36 VehicleData as DenVehicleData,
37};
38use rustflexstack::facilities::vru_awareness_service::vam_coder::{
39 generation_delta_time_now as vam_gdt, vam_header, AccelerationConfidence as VamAccelConf,
40 Altitude as VamAlt, AltitudeConfidence as VamAltConf, AltitudeValue as VamAltVal,
41 BasicContainer as VamBasicContainer, Latitude as VamLat, Longitude as VamLon,
42 LongitudinalAcceleration as VamLongAccel, LongitudinalAccelerationValue as VamLongAccelVal,
43 PositionConfidenceEllipse as VamPCE, ReferencePositionWithConfidence as VamRefPos,
44 SemiAxisLength as VamSAL, Speed as VamSpeed, SpeedConfidence as VamSpeedConf,
45 SpeedValue as VamSpeedVal, TrafficParticipantType as VamTPT, Vam, VamCoder, VamParameters,
46 VruAwareness, VruHighFrequencyContainer, Wgs84Angle as VamWgs84Angle,
47 Wgs84AngleConfidence as VamWgs84AnglConf, Wgs84AngleValue as VamWgs84AngleVal,
48};
49use rustflexstack::geonet::gn_address::{GNAddress, M, MID, ST};
50use rustflexstack::geonet::mib::Mib;
51use rustflexstack::geonet::position_vector::LongPositionVector;
52use rustflexstack::geonet::router::{Router as GNRouter, RouterHandle};
53use rustflexstack::geonet::service_access_point::{
54 Area, CommonNH, CommunicationProfile, GNDataIndication, GNDataRequest, HeaderSubType,
55 HeaderType, PacketTransportType, TopoBroadcastHST, TrafficClass,
56};
57use rustflexstack::link_layer::raw_link_layer::RawLinkLayer;
58use rustflexstack::security::sn_sap::SecurityProfile;
59
60use std::env;
61use std::sync::{
62 atomic::{AtomicU64, Ordering},
63 mpsc, Arc,
64};
65use std::thread;
66use std::time::{Duration, Instant};
67
68fn main() {
69 let iface = env::args().nth(1).unwrap_or_else(|| "lo".to_string());
70 let duration_s = env::args()
71 .nth(2)
72 .and_then(|s| s.parse::<u64>().ok())
73 .unwrap_or(30);
74
75 println!("=== Benchmark: Congested scenario (CAM + VAM + DENM) ===");
76 println!("Interface : {iface}");
77 println!("Duration : {duration_s} s");
78 println!("TX target : 10 CAM/s 10 VAM/s 5 DENM/s\n");
79
80 let mac = random_mac();
82 let mut mib = Mib::new();
83 mib.itsGnLocalGnAddr = GNAddress::new(M::GnMulticast, ST::PassengerCar, MID::new(mac));
84 mib.itsGnBeaconServiceRetransmitTimer = 0;
85
86 let station_id = u32::from_be_bytes([mac[2], mac[3], mac[4], mac[5]]);
87
88 let (gn_handle, gn_to_ll_rx, gn_to_btp_rx) = GNRouter::spawn(mib, None, None, None);
90 let (btp_handle, btp_to_gn_rx) = BTPRouter::spawn(mib);
91
92 let (ll_to_gn_tx, ll_to_gn_rx) = mpsc::channel::<Vec<u8>>();
93 RawLinkLayer::new(ll_to_gn_tx, gn_to_ll_rx, &iface, mac).start();
94 wire_routers(
95 &gn_handle,
96 &btp_handle,
97 ll_to_gn_rx,
98 gn_to_btp_rx,
99 btp_to_gn_rx,
100 );
101
102 let mut epv = LongPositionVector::decode([0u8; 24]);
104 epv.update_from_gps(41.552, 2.134, 0.0, 0.0, true);
105 gn_handle.update_position_vector(epv);
106 thread::sleep(Duration::from_millis(50));
107
108 let vd = DenVehicleData {
110 station_id,
111 station_type: 5,
112 };
113 let (den_svc, denm_rx) =
114 DecentralizedEnvironmentalNotificationService::new(btp_handle.clone(), vd);
115
116 let cam_tx = Arc::new(AtomicU64::new(0));
118 let vam_tx = Arc::new(AtomicU64::new(0));
119 let denm_tx = Arc::new(AtomicU64::new(0));
120 let cam_rx = Arc::new(AtomicU64::new(0));
121 let vam_rx = Arc::new(AtomicU64::new(0));
122 let denm_rx_cnt = Arc::new(AtomicU64::new(0));
123 let rx_err = Arc::new(AtomicU64::new(0));
124 let cam_dec_us_total = Arc::new(AtomicU64::new(0));
125 let vam_dec_us_total = Arc::new(AtomicU64::new(0));
126
127 let bench_end = Instant::now() + Duration::from_secs(duration_s);
128
129 {
131 let btp = btp_handle.clone();
132 let cnt = cam_tx.clone();
133 let end = bench_end;
134 let coder = CamCoder::new();
135 let tmpl = make_cam(station_id);
136 thread::spawn(move || {
137 while Instant::now() < end {
138 let t0 = Instant::now();
139 if let Ok(data) = coder.encode(&tmpl) {
140 btp.send_btp_data_request(cam_btp_request(data));
141 cnt.fetch_add(1, Ordering::Relaxed);
142 }
143 let elapsed = t0.elapsed();
144 if elapsed < Duration::from_millis(100) {
145 thread::sleep(Duration::from_millis(100) - elapsed);
146 }
147 }
148 });
149 }
150
151 {
153 let btp = btp_handle.clone();
154 let cnt = vam_tx.clone();
155 let end = bench_end;
156 let coder = VamCoder::new();
157 let tmpl = make_vam(station_id);
158 thread::spawn(move || {
159 while Instant::now() < end {
160 let t0 = Instant::now();
161 if let Ok(data) = coder.encode(&tmpl) {
162 btp.send_btp_data_request(vam_btp_request(data));
163 cnt.fetch_add(1, Ordering::Relaxed);
164 }
165 let elapsed = t0.elapsed();
166 if elapsed < Duration::from_millis(100) {
167 thread::sleep(Duration::from_millis(100) - elapsed);
168 }
169 }
170 });
171 }
172
173 {
175 let cnt = denm_tx.clone();
176 let total_ms = duration_s * 1000 + 500;
177 let den = den_svc;
178 thread::spawn(move || {
182 den.trigger_denm(DENRequest {
183 event_latitude: 41.552,
184 event_longitude: 2.134,
185 event_altitude_m: 50.0,
186 cause_code: CauseCodeChoice::accident2(AccidentSubCauseCode(0)),
187 information_quality: 3,
188 event_speed_raw: 16383,
189 event_heading_raw: 3601,
190 denm_interval_ms: 200, time_period_ms: total_ms,
192 relevance_radius_m: 1000,
193 });
194 let end = Instant::now() + Duration::from_millis(total_ms);
196 while Instant::now() < end {
197 thread::sleep(Duration::from_millis(200));
198 cnt.fetch_add(1, Ordering::Relaxed);
199 }
200 });
201 }
202
203 {
205 let (cam_ind_tx, cam_ind_rx) = mpsc::channel::<BTPDataIndication>();
206 btp_handle.register_port(2001, cam_ind_tx);
207 let cnt = cam_rx.clone();
208 let err = rx_err.clone();
209 let dec_sum = cam_dec_us_total.clone();
210 let end = bench_end;
211 thread::spawn(move || {
212 let coder = CamCoder::new();
213 loop {
214 let now = Instant::now();
215 if now >= end {
216 break;
217 }
218 let timeout = (end - now).min(Duration::from_millis(500));
219 match cam_ind_rx.recv_timeout(timeout) {
220 Ok(ind) => {
221 let t0 = Instant::now();
222 match coder.decode(&ind.data) {
223 Ok(_) => {
224 cnt.fetch_add(1, Ordering::Relaxed);
225 }
226 Err(_) => {
227 err.fetch_add(1, Ordering::Relaxed);
228 }
229 }
230 dec_sum.fetch_add(t0.elapsed().as_micros() as u64, Ordering::Relaxed);
231 }
232 Err(mpsc::RecvTimeoutError::Timeout) => {}
233 Err(mpsc::RecvTimeoutError::Disconnected) => break,
234 }
235 }
236 });
237 }
238
239 {
241 let (vam_ind_tx, vam_ind_rx) = mpsc::channel::<BTPDataIndication>();
242 btp_handle.register_port(2018, vam_ind_tx);
243 let cnt = vam_rx.clone();
244 let err = rx_err.clone();
245 let dec_sum = vam_dec_us_total.clone();
246 let end = bench_end;
247 thread::spawn(move || {
248 let coder = VamCoder::new();
249 loop {
250 let now = Instant::now();
251 if now >= end {
252 break;
253 }
254 let timeout = (end - now).min(Duration::from_millis(500));
255 match vam_ind_rx.recv_timeout(timeout) {
256 Ok(ind) => {
257 let t0 = Instant::now();
258 match coder.decode(&ind.data) {
259 Ok(_) => {
260 cnt.fetch_add(1, Ordering::Relaxed);
261 }
262 Err(_) => {
263 err.fetch_add(1, Ordering::Relaxed);
264 }
265 }
266 dec_sum.fetch_add(t0.elapsed().as_micros() as u64, Ordering::Relaxed);
267 }
268 Err(mpsc::RecvTimeoutError::Timeout) => {}
269 Err(mpsc::RecvTimeoutError::Disconnected) => break,
270 }
271 }
272 });
273 }
274
275 {
277 let cnt = denm_rx_cnt.clone();
278 let end = bench_end;
279 thread::spawn(move || loop {
280 let now = Instant::now();
281 if now >= end {
282 break;
283 }
284 let timeout = (end - now).min(Duration::from_millis(500));
285 match denm_rx.recv_timeout(timeout) {
286 Ok(_) => {
287 cnt.fetch_add(1, Ordering::Relaxed);
288 }
289 Err(mpsc::RecvTimeoutError::Timeout) => {}
290 Err(mpsc::RecvTimeoutError::Disconnected) => break,
291 }
292 });
293 }
294
295 println!(
297 "{:>6} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8} {:>8}",
298 "t(s)",
299 "cam_tx",
300 "cam_rx",
301 "cam_dec_µs",
302 "vam_tx",
303 "vam_rx",
304 "vam_dec_µs",
305 "den_tx",
306 "den_rx",
307 "rx_err",
308 );
309
310 let mut prev_cam_rx: u64 = 0;
311 let mut prev_vam_rx: u64 = 0;
312 let mut prev_denm_rx: u64 = 0;
313
314 let bench_start = Instant::now();
315 loop {
316 thread::sleep(Duration::from_secs(1));
317 let t = bench_start.elapsed().as_secs_f64();
318 if t >= duration_s as f64 + 1.5 {
319 break;
320 }
321
322 let ctx = cam_tx.load(Ordering::Relaxed);
323 let crx = cam_rx.load(Ordering::Relaxed);
324 let vtx = vam_tx.load(Ordering::Relaxed);
325 let vrx = vam_rx.load(Ordering::Relaxed);
326 let dtx = denm_tx.load(Ordering::Relaxed);
327 let drx = denm_rx_cnt.load(Ordering::Relaxed);
328 let errs = rx_err.load(Ordering::Relaxed);
329 let cdu = cam_dec_us_total.load(Ordering::Relaxed);
330 let vdu = vam_dec_us_total.load(Ordering::Relaxed);
331
332 let cam_dec_avg = if crx > 0 { cdu / crx } else { 0 };
333 let vam_dec_avg = if vrx > 0 { vdu / vrx } else { 0 };
334
335 let cam_rx_rate = (crx - prev_cam_rx) as f64;
336 let vam_rx_rate = (vrx - prev_vam_rx) as f64;
337 let denm_rx_rate = (drx - prev_denm_rx) as f64;
338 prev_cam_rx = crx;
339 prev_vam_rx = vrx;
340 prev_denm_rx = drx;
341
342 println!(
343 "{:>6.1} {:>8} {:>8.1} {:>10} {:>8} {:>8.1} {:>10} {:>8} {:>8.1} {:>8}",
344 t,
345 ctx,
346 cam_rx_rate,
347 cam_dec_avg,
348 vtx,
349 vam_rx_rate,
350 vam_dec_avg,
351 dtx,
352 denm_rx_rate,
353 errs,
354 );
355
356 if t > duration_s as f64 {
357 break;
358 }
359 }
360
361 let elapsed = bench_start.elapsed().as_secs_f64();
363
364 let ctx = cam_tx.load(Ordering::Relaxed);
365 let crx = cam_rx.load(Ordering::Relaxed);
366 let vtx = vam_tx.load(Ordering::Relaxed);
367 let vrx = vam_rx.load(Ordering::Relaxed);
368 let dtx = denm_tx.load(Ordering::Relaxed);
369 let drx = denm_rx_cnt.load(Ordering::Relaxed);
370 let errs = rx_err.load(Ordering::Relaxed);
371 let cdu = cam_dec_us_total.load(Ordering::Relaxed);
372 let vdu = vam_dec_us_total.load(Ordering::Relaxed);
373
374 let cam_dec_avg = if crx > 0 { cdu / crx } else { 0 };
375 let vam_dec_avg = if vrx > 0 { vdu / vrx } else { 0 };
376
377 let cam_ratio = if ctx > 0 {
378 crx as f64 / ctx as f64 * 100.0
379 } else {
380 0.0
381 };
382 let vam_ratio = if vtx > 0 {
383 vrx as f64 / vtx as f64 * 100.0
384 } else {
385 0.0
386 };
387 let denm_ratio = if dtx > 0 {
388 drx as f64 / dtx as f64 * 100.0
389 } else {
390 0.0
391 };
392
393 println!();
394 println!("=== Congestion Benchmark Results ({elapsed:.1} s) ===");
395 println!();
396 println!(
397 " CAM TX: {ctx:>8} RX: {crx:>8} ratio: {cam_ratio:>6.1}% avg_dec: {cam_dec_avg} µs"
398 );
399 println!(
400 " VAM TX: {vtx:>8} RX: {vrx:>8} ratio: {vam_ratio:>6.1}% avg_dec: {vam_dec_avg} µs"
401 );
402 println!(" DENM TX: {dtx:>8} RX: {drx:>8} ratio: {denm_ratio:>6.1}%");
403 println!(" RX errors: {errs}");
404}
405
406fn make_cam(station_id: u32) -> Cam {
409 let hf = BasicVehicleContainerHighFrequency::new(
410 Heading::new(HeadingValue(900), HeadingConfidence(127)),
411 Speed::new(SpeedValue(0), SpeedConfidence(127)),
412 DriveDirection::unavailable,
413 VehicleLength::new(
414 VehicleLengthValue(1023),
415 VehicleLengthConfidenceIndication::unavailable,
416 ),
417 VehicleWidth(62),
418 AccelerationComponent::new(AccelerationValue(161), AccelerationConfidence(102)),
419 Curvature::new(CurvatureValue(1023), CurvatureConfidence::unavailable),
420 CurvatureCalculationMode::unavailable,
421 YawRate::new(YawRateValue(32767), YawRateConfidence::unavailable),
422 None,
423 None,
424 None,
425 None,
426 None,
427 None,
428 None,
429 );
430 Cam::new(
431 cam_header(station_id),
432 CamPayload::new(
433 cam_gdt(),
434 CamParameters::new(
435 BasicContainer::new(
436 TrafficParticipantType(5),
437 ReferencePositionWithConfidence::new(
438 Latitude(415_520_000),
439 Longitude(21_340_000),
440 PositionConfidenceEllipse::new(
441 SemiAxisLength(4095),
442 SemiAxisLength(4095),
443 Wgs84AngleValue(3601),
444 ),
445 Altitude::new(AltitudeValue(12000), AltitudeConfidence::unavailable),
446 ),
447 ),
448 HighFrequencyContainer::basicVehicleContainerHighFrequency(hf),
449 None,
450 None,
451 None,
452 ),
453 ),
454 )
455}
456
457fn cam_btp_request(data: Vec<u8>) -> BTPDataRequest {
458 BTPDataRequest {
459 btp_type: CommonNH::BtpB,
460 source_port: 0,
461 destination_port: 2001,
462 destination_port_info: 0,
463 gn_packet_transport_type: PacketTransportType {
464 header_type: HeaderType::Tsb,
465 header_sub_type: HeaderSubType::TopoBroadcast(TopoBroadcastHST::SingleHop),
466 },
467 gn_destination_address: GNAddress {
468 m: M::GnMulticast,
469 st: ST::Unknown,
470 mid: MID::new([0xFF; 6]),
471 },
472 communication_profile: CommunicationProfile::Unspecified,
473 gn_area: Area {
474 latitude: 0,
475 longitude: 0,
476 a: 0,
477 b: 0,
478 angle: 0,
479 },
480 traffic_class: TrafficClass {
481 scf: false,
482 channel_offload: false,
483 tc_id: 0,
484 },
485 security_profile: SecurityProfile::NoSecurity,
486 its_aid: 36,
487 security_permissions: vec![],
488 gn_max_hop_limit: 1,
489 gn_max_packet_lifetime: None,
490 gn_repetition_interval: None,
491 gn_max_repetition_time: None,
492 destination: None,
493 length: data.len() as u16,
494 data,
495 }
496}
497
498fn make_vam(station_id: u32) -> Vam {
501 let hf = VruHighFrequencyContainer::new(
508 VamWgs84Angle::new(VamWgs84AngleVal(3601), VamWgs84AnglConf(127)), VamSpeed::new(VamSpeedVal(0), VamSpeedConf(127)), VamLongAccel::new(VamLongAccelVal(161), VamAccelConf(102)), None,
512 None,
513 None,
514 None,
515 None,
516 None,
517 None,
518 None,
519 None,
520 None,
521 None,
522 );
523 Vam::new(
524 vam_header(station_id),
525 VruAwareness::new(
526 vam_gdt(),
527 VamParameters::new(
528 VamBasicContainer::new(
529 VamTPT(1), VamRefPos::new(
531 VamLat(415_520_000),
532 VamLon(21_340_000),
533 VamPCE::new(VamSAL(4095), VamSAL(4095), VamWgs84AngleVal(3601)),
534 VamAlt::new(VamAltVal(12000), VamAltConf::unavailable),
535 ),
536 ),
537 hf,
538 None,
539 None,
540 None,
541 None, ),
543 ),
544 )
545}
546
547fn vam_btp_request(data: Vec<u8>) -> BTPDataRequest {
548 BTPDataRequest {
549 btp_type: CommonNH::BtpB,
550 source_port: 0,
551 destination_port: 2018,
552 destination_port_info: 0,
553 gn_packet_transport_type: PacketTransportType {
554 header_type: HeaderType::Tsb,
555 header_sub_type: HeaderSubType::TopoBroadcast(TopoBroadcastHST::SingleHop),
556 },
557 gn_destination_address: GNAddress {
558 m: M::GnMulticast,
559 st: ST::Unknown,
560 mid: MID::new([0xFF; 6]),
561 },
562 communication_profile: CommunicationProfile::Unspecified,
563 gn_area: Area {
564 latitude: 0,
565 longitude: 0,
566 a: 0,
567 b: 0,
568 angle: 0,
569 },
570 traffic_class: TrafficClass {
571 scf: false,
572 channel_offload: false,
573 tc_id: 0,
574 },
575 security_profile: SecurityProfile::NoSecurity,
576 its_aid: 16513,
577 security_permissions: vec![],
578 gn_max_hop_limit: 1,
579 gn_max_packet_lifetime: None,
580 gn_repetition_interval: None,
581 gn_max_repetition_time: None,
582 destination: None,
583 length: data.len() as u16,
584 data,
585 }
586}
587
588fn random_mac() -> [u8; 6] {
591 use std::time::{SystemTime, UNIX_EPOCH};
592 let s = SystemTime::now()
593 .duration_since(UNIX_EPOCH)
594 .unwrap()
595 .subsec_nanos();
596 [
597 0x02,
598 (s >> 24) as u8,
599 (s >> 16) as u8,
600 (s >> 8) as u8,
601 s as u8,
602 0xDD,
603 ]
604}
605
606fn wire_routers(
607 gn: &RouterHandle,
608 btp: &BTPRouterHandle,
609 ll_rx: mpsc::Receiver<Vec<u8>>,
610 gn_btp_rx: mpsc::Receiver<GNDataIndication>,
611 btp_gn_rx: mpsc::Receiver<GNDataRequest>,
612) {
613 let g1 = gn.clone();
614 thread::spawn(move || {
615 while let Ok(p) = ll_rx.recv() {
616 g1.send_incoming_packet(p);
617 }
618 });
619 let b1 = btp.clone();
620 thread::spawn(move || {
621 while let Ok(i) = gn_btp_rx.recv() {
622 b1.send_gn_data_indication(i);
623 }
624 });
625 let g2 = gn.clone();
626 thread::spawn(move || {
627 while let Ok(r) = btp_gn_rx.recv() {
628 g2.send_gn_data_request(r);
629 }
630 });
631}