Skip to main content

bench_congestion/
bench_congestion.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2024 Fundació Privada Internet i Innovació Digital a Catalunya (i2CAT)
3
4//! Benchmark: Congested-scenario TX + RX throughput.
5//!
6//! Transmits simultaneously:
7//!   - 10 CAMs/s  (BTP port 2001, TSB/SingleHop)
8//!   - 10 VAMs/s  (BTP port 2018, TSB/SingleHop)
9//!   -  5 DENMs/s (BTP port 2002, GeoBroadcast-Circle) via DEN service
10//!
11//! Concurrently receives and decodes all three message types, counting successes
12//! and decode latency.  A 1 Hz stats thread prints running totals.
13//!
14//! # Usage
15//! ```text
16//! sudo cargo run --release --example bench_congestion -- [interface] [duration_s]
17//! # defaults: lo, 30 s
18//! ```
19
20use rustflexstack::btp::router::{BTPRouterHandle, Router as BTPRouter};
21use rustflexstack::btp::service_access_point::{BTPDataIndication, BTPDataRequest};
22use rustflexstack::facilities::ca_basic_service::cam_coder::{
23    cam_header, generation_delta_time_now as cam_gdt, AccelerationComponent,
24    AccelerationConfidence, AccelerationValue, Altitude, AltitudeConfidence, AltitudeValue,
25    BasicContainer, BasicVehicleContainerHighFrequency, Cam, CamCoder, CamParameters, CamPayload,
26    Curvature, CurvatureCalculationMode, CurvatureConfidence, CurvatureValue, DriveDirection,
27    Heading, HeadingConfidence, HeadingValue, HighFrequencyContainer, Latitude, Longitude,
28    PositionConfidenceEllipse, ReferencePositionWithConfidence, SemiAxisLength, Speed,
29    SpeedConfidence, SpeedValue, TrafficParticipantType, VehicleLength,
30    VehicleLengthConfidenceIndication, VehicleLengthValue, VehicleWidth, Wgs84AngleValue, YawRate,
31    YawRateConfidence, YawRateValue,
32};
33use rustflexstack::facilities::decentralized_environmental_notification_service::denm_coder::CauseCodeChoice;
34use rustflexstack::facilities::decentralized_environmental_notification_service::{
35    denm_coder::AccidentSubCauseCode, DENRequest, DecentralizedEnvironmentalNotificationService,
36    VehicleData as DenVehicleData,
37};
38use rustflexstack::facilities::vru_awareness_service::vam_coder::{
39    generation_delta_time_now as vam_gdt, vam_header, AccelerationConfidence as VamAccelConf,
40    Altitude as VamAlt, AltitudeConfidence as VamAltConf, AltitudeValue as VamAltVal,
41    BasicContainer as VamBasicContainer, Latitude as VamLat, Longitude as VamLon,
42    LongitudinalAcceleration as VamLongAccel, LongitudinalAccelerationValue as VamLongAccelVal,
43    PositionConfidenceEllipse as VamPCE, ReferencePositionWithConfidence as VamRefPos,
44    SemiAxisLength as VamSAL, Speed as VamSpeed, SpeedConfidence as VamSpeedConf,
45    SpeedValue as VamSpeedVal, TrafficParticipantType as VamTPT, Vam, VamCoder, VamParameters,
46    VruAwareness, VruHighFrequencyContainer, Wgs84Angle as VamWgs84Angle,
47    Wgs84AngleConfidence as VamWgs84AnglConf, Wgs84AngleValue as VamWgs84AngleVal,
48};
49use rustflexstack::geonet::gn_address::{GNAddress, M, MID, ST};
50use rustflexstack::geonet::mib::Mib;
51use rustflexstack::geonet::position_vector::LongPositionVector;
52use rustflexstack::geonet::router::{Router as GNRouter, RouterHandle};
53use rustflexstack::geonet::service_access_point::{
54    Area, CommonNH, CommunicationProfile, GNDataIndication, GNDataRequest, HeaderSubType,
55    HeaderType, PacketTransportType, TopoBroadcastHST, TrafficClass,
56};
57use rustflexstack::link_layer::raw_link_layer::RawLinkLayer;
58use rustflexstack::security::sn_sap::SecurityProfile;
59
60use std::env;
61use std::sync::{
62    atomic::{AtomicU64, Ordering},
63    mpsc, Arc,
64};
65use std::thread;
66use std::time::{Duration, Instant};
67
68fn main() {
69    let iface = env::args().nth(1).unwrap_or_else(|| "lo".to_string());
70    let duration_s = env::args()
71        .nth(2)
72        .and_then(|s| s.parse::<u64>().ok())
73        .unwrap_or(30);
74
75    println!("=== Benchmark: Congested scenario (CAM + VAM + DENM) ===");
76    println!("Interface : {iface}");
77    println!("Duration  : {duration_s} s");
78    println!("TX target : 10 CAM/s  10 VAM/s  5 DENM/s\n");
79
80    // ── MAC / MIB ─────────────────────────────────────────────────────────────
81    let mac = random_mac();
82    let mut mib = Mib::new();
83    mib.itsGnLocalGnAddr = GNAddress::new(M::GnMulticast, ST::PassengerCar, MID::new(mac));
84    mib.itsGnBeaconServiceRetransmitTimer = 0;
85
86    let station_id = u32::from_be_bytes([mac[2], mac[3], mac[4], mac[5]]);
87
88    // ── Routers + link layer ──────────────────────────────────────────────────
89    let (gn_handle, gn_to_ll_rx, gn_to_btp_rx) = GNRouter::spawn(mib, None, None, None);
90    let (btp_handle, btp_to_gn_rx) = BTPRouter::spawn(mib);
91
92    let (ll_to_gn_tx, ll_to_gn_rx) = mpsc::channel::<Vec<u8>>();
93    RawLinkLayer::new(ll_to_gn_tx, gn_to_ll_rx, &iface, mac).start();
94    wire_routers(
95        &gn_handle,
96        &btp_handle,
97        ll_to_gn_rx,
98        gn_to_btp_rx,
99        btp_to_gn_rx,
100    );
101
102    // Seed position vector.
103    let mut epv = LongPositionVector::decode([0u8; 24]);
104    epv.update_from_gps(41.552, 2.134, 0.0, 0.0, true);
105    gn_handle.update_position_vector(epv);
106    thread::sleep(Duration::from_millis(50));
107
108    // ── DEN service (single instance for both TX and RX) ─────────────────────
109    let vd = DenVehicleData {
110        station_id,
111        station_type: 5,
112    };
113    let (den_svc, denm_rx) =
114        DecentralizedEnvironmentalNotificationService::new(btp_handle.clone(), vd);
115
116    // ── Shared atomic counters ────────────────────────────────────────────────
117    let cam_tx = Arc::new(AtomicU64::new(0));
118    let vam_tx = Arc::new(AtomicU64::new(0));
119    let denm_tx = Arc::new(AtomicU64::new(0));
120    let cam_rx = Arc::new(AtomicU64::new(0));
121    let vam_rx = Arc::new(AtomicU64::new(0));
122    let denm_rx_cnt = Arc::new(AtomicU64::new(0));
123    let rx_err = Arc::new(AtomicU64::new(0));
124    let cam_dec_us_total = Arc::new(AtomicU64::new(0));
125    let vam_dec_us_total = Arc::new(AtomicU64::new(0));
126
127    let bench_end = Instant::now() + Duration::from_secs(duration_s);
128
129    // ── CAM TX thread (100 ms interval → 10 Hz) ───────────────────────────────
130    {
131        let btp = btp_handle.clone();
132        let cnt = cam_tx.clone();
133        let end = bench_end;
134        let coder = CamCoder::new();
135        let tmpl = make_cam(station_id);
136        thread::spawn(move || {
137            while Instant::now() < end {
138                let t0 = Instant::now();
139                if let Ok(data) = coder.encode(&tmpl) {
140                    btp.send_btp_data_request(cam_btp_request(data));
141                    cnt.fetch_add(1, Ordering::Relaxed);
142                }
143                let elapsed = t0.elapsed();
144                if elapsed < Duration::from_millis(100) {
145                    thread::sleep(Duration::from_millis(100) - elapsed);
146                }
147            }
148        });
149    }
150
151    // ── VAM TX thread (100 ms interval → 10 Hz) ───────────────────────────────
152    {
153        let btp = btp_handle.clone();
154        let cnt = vam_tx.clone();
155        let end = bench_end;
156        let coder = VamCoder::new();
157        let tmpl = make_vam(station_id);
158        thread::spawn(move || {
159            while Instant::now() < end {
160                let t0 = Instant::now();
161                if let Ok(data) = coder.encode(&tmpl) {
162                    btp.send_btp_data_request(vam_btp_request(data));
163                    cnt.fetch_add(1, Ordering::Relaxed);
164                }
165                let elapsed = t0.elapsed();
166                if elapsed < Duration::from_millis(100) {
167                    thread::sleep(Duration::from_millis(100) - elapsed);
168                }
169            }
170        });
171    }
172
173    // ── DENM TX (via DEN service at 5 Hz for entire duration) ────────────────
174    {
175        let cnt = denm_tx.clone();
176        let total_ms = duration_s * 1000 + 500;
177        let den = den_svc;
178        // Count: spawn a watcher that checks every 200 ms how many intervals fired.
179        // Simpler: trigger once with interval=200 ms, increment counter in a
180        // separate thread counting 200 ms ticks.
181        thread::spawn(move || {
182            den.trigger_denm(DENRequest {
183                event_latitude: 41.552,
184                event_longitude: 2.134,
185                event_altitude_m: 50.0,
186                cause_code: CauseCodeChoice::accident2(AccidentSubCauseCode(0)),
187                information_quality: 3,
188                event_speed_raw: 16383,
189                event_heading_raw: 3601,
190                denm_interval_ms: 200, // 5 Hz
191                time_period_ms: total_ms,
192                relevance_radius_m: 1000,
193            });
194            // Approximate TX count: 1 DENM every 200 ms.
195            let end = Instant::now() + Duration::from_millis(total_ms);
196            while Instant::now() < end {
197                thread::sleep(Duration::from_millis(200));
198                cnt.fetch_add(1, Ordering::Relaxed);
199            }
200        });
201    }
202
203    // ── CAM RX thread ─────────────────────────────────────────────────────────
204    {
205        let (cam_ind_tx, cam_ind_rx) = mpsc::channel::<BTPDataIndication>();
206        btp_handle.register_port(2001, cam_ind_tx);
207        let cnt = cam_rx.clone();
208        let err = rx_err.clone();
209        let dec_sum = cam_dec_us_total.clone();
210        let end = bench_end;
211        thread::spawn(move || {
212            let coder = CamCoder::new();
213            loop {
214                let now = Instant::now();
215                if now >= end {
216                    break;
217                }
218                let timeout = (end - now).min(Duration::from_millis(500));
219                match cam_ind_rx.recv_timeout(timeout) {
220                    Ok(ind) => {
221                        let t0 = Instant::now();
222                        match coder.decode(&ind.data) {
223                            Ok(_) => {
224                                cnt.fetch_add(1, Ordering::Relaxed);
225                            }
226                            Err(_) => {
227                                err.fetch_add(1, Ordering::Relaxed);
228                            }
229                        }
230                        dec_sum.fetch_add(t0.elapsed().as_micros() as u64, Ordering::Relaxed);
231                    }
232                    Err(mpsc::RecvTimeoutError::Timeout) => {}
233                    Err(mpsc::RecvTimeoutError::Disconnected) => break,
234                }
235            }
236        });
237    }
238
239    // ── VAM RX thread ─────────────────────────────────────────────────────────
240    {
241        let (vam_ind_tx, vam_ind_rx) = mpsc::channel::<BTPDataIndication>();
242        btp_handle.register_port(2018, vam_ind_tx);
243        let cnt = vam_rx.clone();
244        let err = rx_err.clone();
245        let dec_sum = vam_dec_us_total.clone();
246        let end = bench_end;
247        thread::spawn(move || {
248            let coder = VamCoder::new();
249            loop {
250                let now = Instant::now();
251                if now >= end {
252                    break;
253                }
254                let timeout = (end - now).min(Duration::from_millis(500));
255                match vam_ind_rx.recv_timeout(timeout) {
256                    Ok(ind) => {
257                        let t0 = Instant::now();
258                        match coder.decode(&ind.data) {
259                            Ok(_) => {
260                                cnt.fetch_add(1, Ordering::Relaxed);
261                            }
262                            Err(_) => {
263                                err.fetch_add(1, Ordering::Relaxed);
264                            }
265                        }
266                        dec_sum.fetch_add(t0.elapsed().as_micros() as u64, Ordering::Relaxed);
267                    }
268                    Err(mpsc::RecvTimeoutError::Timeout) => {}
269                    Err(mpsc::RecvTimeoutError::Disconnected) => break,
270                }
271            }
272        });
273    }
274
275    // ── DENM RX thread ────────────────────────────────────────────────────────
276    {
277        let cnt = denm_rx_cnt.clone();
278        let end = bench_end;
279        thread::spawn(move || loop {
280            let now = Instant::now();
281            if now >= end {
282                break;
283            }
284            let timeout = (end - now).min(Duration::from_millis(500));
285            match denm_rx.recv_timeout(timeout) {
286                Ok(_) => {
287                    cnt.fetch_add(1, Ordering::Relaxed);
288                }
289                Err(mpsc::RecvTimeoutError::Timeout) => {}
290                Err(mpsc::RecvTimeoutError::Disconnected) => break,
291            }
292        });
293    }
294
295    // ── 1 Hz stats printer (main thread) ─────────────────────────────────────
296    println!(
297        "{:>6}  {:>8}  {:>8}  {:>8}  {:>8}  {:>8}  {:>8}  {:>8}  {:>8}  {:>8}",
298        "t(s)",
299        "cam_tx",
300        "cam_rx",
301        "cam_dec_µs",
302        "vam_tx",
303        "vam_rx",
304        "vam_dec_µs",
305        "den_tx",
306        "den_rx",
307        "rx_err",
308    );
309
310    let mut prev_cam_rx: u64 = 0;
311    let mut prev_vam_rx: u64 = 0;
312    let mut prev_denm_rx: u64 = 0;
313
314    let bench_start = Instant::now();
315    loop {
316        thread::sleep(Duration::from_secs(1));
317        let t = bench_start.elapsed().as_secs_f64();
318        if t >= duration_s as f64 + 1.5 {
319            break;
320        }
321
322        let ctx = cam_tx.load(Ordering::Relaxed);
323        let crx = cam_rx.load(Ordering::Relaxed);
324        let vtx = vam_tx.load(Ordering::Relaxed);
325        let vrx = vam_rx.load(Ordering::Relaxed);
326        let dtx = denm_tx.load(Ordering::Relaxed);
327        let drx = denm_rx_cnt.load(Ordering::Relaxed);
328        let errs = rx_err.load(Ordering::Relaxed);
329        let cdu = cam_dec_us_total.load(Ordering::Relaxed);
330        let vdu = vam_dec_us_total.load(Ordering::Relaxed);
331
332        let cam_dec_avg = if crx > 0 { cdu / crx } else { 0 };
333        let vam_dec_avg = if vrx > 0 { vdu / vrx } else { 0 };
334
335        let cam_rx_rate = (crx - prev_cam_rx) as f64;
336        let vam_rx_rate = (vrx - prev_vam_rx) as f64;
337        let denm_rx_rate = (drx - prev_denm_rx) as f64;
338        prev_cam_rx = crx;
339        prev_vam_rx = vrx;
340        prev_denm_rx = drx;
341
342        println!(
343            "{:>6.1}  {:>8}  {:>8.1}  {:>10}  {:>8}  {:>8.1}  {:>10}  {:>8}  {:>8.1}  {:>8}",
344            t,
345            ctx,
346            cam_rx_rate,
347            cam_dec_avg,
348            vtx,
349            vam_rx_rate,
350            vam_dec_avg,
351            dtx,
352            denm_rx_rate,
353            errs,
354        );
355
356        if t > duration_s as f64 {
357            break;
358        }
359    }
360
361    // ── Final summary ─────────────────────────────────────────────────────────
362    let elapsed = bench_start.elapsed().as_secs_f64();
363
364    let ctx = cam_tx.load(Ordering::Relaxed);
365    let crx = cam_rx.load(Ordering::Relaxed);
366    let vtx = vam_tx.load(Ordering::Relaxed);
367    let vrx = vam_rx.load(Ordering::Relaxed);
368    let dtx = denm_tx.load(Ordering::Relaxed);
369    let drx = denm_rx_cnt.load(Ordering::Relaxed);
370    let errs = rx_err.load(Ordering::Relaxed);
371    let cdu = cam_dec_us_total.load(Ordering::Relaxed);
372    let vdu = vam_dec_us_total.load(Ordering::Relaxed);
373
374    let cam_dec_avg = if crx > 0 { cdu / crx } else { 0 };
375    let vam_dec_avg = if vrx > 0 { vdu / vrx } else { 0 };
376
377    let cam_ratio = if ctx > 0 {
378        crx as f64 / ctx as f64 * 100.0
379    } else {
380        0.0
381    };
382    let vam_ratio = if vtx > 0 {
383        vrx as f64 / vtx as f64 * 100.0
384    } else {
385        0.0
386    };
387    let denm_ratio = if dtx > 0 {
388        drx as f64 / dtx as f64 * 100.0
389    } else {
390        0.0
391    };
392
393    println!();
394    println!("=== Congestion Benchmark Results ({elapsed:.1} s) ===");
395    println!();
396    println!(
397        "  CAM   TX: {ctx:>8}  RX: {crx:>8}  ratio: {cam_ratio:>6.1}%  avg_dec: {cam_dec_avg} µs"
398    );
399    println!(
400        "  VAM   TX: {vtx:>8}  RX: {vrx:>8}  ratio: {vam_ratio:>6.1}%  avg_dec: {vam_dec_avg} µs"
401    );
402    println!("  DENM  TX: {dtx:>8}  RX: {drx:>8}  ratio: {denm_ratio:>6.1}%");
403    println!("  RX errors: {errs}");
404}
405
406// ─── CAM helpers ─────────────────────────────────────────────────────────────
407
408fn make_cam(station_id: u32) -> Cam {
409    let hf = BasicVehicleContainerHighFrequency::new(
410        Heading::new(HeadingValue(900), HeadingConfidence(127)),
411        Speed::new(SpeedValue(0), SpeedConfidence(127)),
412        DriveDirection::unavailable,
413        VehicleLength::new(
414            VehicleLengthValue(1023),
415            VehicleLengthConfidenceIndication::unavailable,
416        ),
417        VehicleWidth(62),
418        AccelerationComponent::new(AccelerationValue(161), AccelerationConfidence(102)),
419        Curvature::new(CurvatureValue(1023), CurvatureConfidence::unavailable),
420        CurvatureCalculationMode::unavailable,
421        YawRate::new(YawRateValue(32767), YawRateConfidence::unavailable),
422        None,
423        None,
424        None,
425        None,
426        None,
427        None,
428        None,
429    );
430    Cam::new(
431        cam_header(station_id),
432        CamPayload::new(
433            cam_gdt(),
434            CamParameters::new(
435                BasicContainer::new(
436                    TrafficParticipantType(5),
437                    ReferencePositionWithConfidence::new(
438                        Latitude(415_520_000),
439                        Longitude(21_340_000),
440                        PositionConfidenceEllipse::new(
441                            SemiAxisLength(4095),
442                            SemiAxisLength(4095),
443                            Wgs84AngleValue(3601),
444                        ),
445                        Altitude::new(AltitudeValue(12000), AltitudeConfidence::unavailable),
446                    ),
447                ),
448                HighFrequencyContainer::basicVehicleContainerHighFrequency(hf),
449                None,
450                None,
451                None,
452            ),
453        ),
454    )
455}
456
457fn cam_btp_request(data: Vec<u8>) -> BTPDataRequest {
458    BTPDataRequest {
459        btp_type: CommonNH::BtpB,
460        source_port: 0,
461        destination_port: 2001,
462        destination_port_info: 0,
463        gn_packet_transport_type: PacketTransportType {
464            header_type: HeaderType::Tsb,
465            header_sub_type: HeaderSubType::TopoBroadcast(TopoBroadcastHST::SingleHop),
466        },
467        gn_destination_address: GNAddress {
468            m: M::GnMulticast,
469            st: ST::Unknown,
470            mid: MID::new([0xFF; 6]),
471        },
472        communication_profile: CommunicationProfile::Unspecified,
473        gn_area: Area {
474            latitude: 0,
475            longitude: 0,
476            a: 0,
477            b: 0,
478            angle: 0,
479        },
480        traffic_class: TrafficClass {
481            scf: false,
482            channel_offload: false,
483            tc_id: 0,
484        },
485        security_profile: SecurityProfile::NoSecurity,
486        its_aid: 36,
487        security_permissions: vec![],
488        gn_max_hop_limit: 1,
489        gn_max_packet_lifetime: None,
490        gn_repetition_interval: None,
491        gn_max_repetition_time: None,
492        destination: None,
493        length: data.len() as u16,
494        data,
495    }
496}
497
498// ─── VAM helpers ─────────────────────────────────────────────────────────────
499
500fn make_vam(station_id: u32) -> Vam {
501    // VruHighFrequencyContainer::new takes 14 args:
502    // heading, speed, longitudinal_acceleration,
503    // curvature, curvature_calculation_mode, yaw_rate,
504    // lateral_acceleration, vertical_acceleration,
505    // vru_lane_position, environment, movement_control,
506    // orientation, roll_angle, device_usage
507    let hf = VruHighFrequencyContainer::new(
508        VamWgs84Angle::new(VamWgs84AngleVal(3601), VamWgs84AnglConf(127)), // heading unavailable
509        VamSpeed::new(VamSpeedVal(0), VamSpeedConf(127)),                  // speed 0
510        VamLongAccel::new(VamLongAccelVal(161), VamAccelConf(102)),        // accel unavailable
511        None,
512        None,
513        None,
514        None,
515        None,
516        None,
517        None,
518        None,
519        None,
520        None,
521        None,
522    );
523    Vam::new(
524        vam_header(station_id),
525        VruAwareness::new(
526            vam_gdt(),
527            VamParameters::new(
528                VamBasicContainer::new(
529                    VamTPT(1), // pedestrian
530                    VamRefPos::new(
531                        VamLat(415_520_000),
532                        VamLon(21_340_000),
533                        VamPCE::new(VamSAL(4095), VamSAL(4095), VamWgs84AngleVal(3601)),
534                        VamAlt::new(VamAltVal(12000), VamAltConf::unavailable),
535                    ),
536                ),
537                hf,
538                None,
539                None,
540                None,
541                None, // lf, cluster_info, cluster_op, motion_pred
542            ),
543        ),
544    )
545}
546
547fn vam_btp_request(data: Vec<u8>) -> BTPDataRequest {
548    BTPDataRequest {
549        btp_type: CommonNH::BtpB,
550        source_port: 0,
551        destination_port: 2018,
552        destination_port_info: 0,
553        gn_packet_transport_type: PacketTransportType {
554            header_type: HeaderType::Tsb,
555            header_sub_type: HeaderSubType::TopoBroadcast(TopoBroadcastHST::SingleHop),
556        },
557        gn_destination_address: GNAddress {
558            m: M::GnMulticast,
559            st: ST::Unknown,
560            mid: MID::new([0xFF; 6]),
561        },
562        communication_profile: CommunicationProfile::Unspecified,
563        gn_area: Area {
564            latitude: 0,
565            longitude: 0,
566            a: 0,
567            b: 0,
568            angle: 0,
569        },
570        traffic_class: TrafficClass {
571            scf: false,
572            channel_offload: false,
573            tc_id: 0,
574        },
575        security_profile: SecurityProfile::NoSecurity,
576        its_aid: 16513,
577        security_permissions: vec![],
578        gn_max_hop_limit: 1,
579        gn_max_packet_lifetime: None,
580        gn_repetition_interval: None,
581        gn_max_repetition_time: None,
582        destination: None,
583        length: data.len() as u16,
584        data,
585    }
586}
587
588// ─── Shared helpers ───────────────────────────────────────────────────────────
589
590fn random_mac() -> [u8; 6] {
591    use std::time::{SystemTime, UNIX_EPOCH};
592    let s = SystemTime::now()
593        .duration_since(UNIX_EPOCH)
594        .unwrap()
595        .subsec_nanos();
596    [
597        0x02,
598        (s >> 24) as u8,
599        (s >> 16) as u8,
600        (s >> 8) as u8,
601        s as u8,
602        0xDD,
603    ]
604}
605
606fn wire_routers(
607    gn: &RouterHandle,
608    btp: &BTPRouterHandle,
609    ll_rx: mpsc::Receiver<Vec<u8>>,
610    gn_btp_rx: mpsc::Receiver<GNDataIndication>,
611    btp_gn_rx: mpsc::Receiver<GNDataRequest>,
612) {
613    let g1 = gn.clone();
614    thread::spawn(move || {
615        while let Ok(p) = ll_rx.recv() {
616            g1.send_incoming_packet(p);
617        }
618    });
619    let b1 = btp.clone();
620    thread::spawn(move || {
621        while let Ok(i) = gn_btp_rx.recv() {
622            b1.send_gn_data_indication(i);
623        }
624    });
625    let g2 = gn.clone();
626    thread::spawn(move || {
627        while let Ok(r) = btp_gn_rx.recv() {
628            g2.send_gn_data_request(r);
629        }
630    });
631}