#[cfg(test)]
mod threaded_system_tests {
use sedsnet::RouteSelectionMode;
use sedsnet::TelemetryResult;
use sedsnet::config::{
DataEndpoint, DataType, data_type_definition_by_name, endpoint_definition_by_name,
register_data_type_with_description, register_endpoint_with_description,
};
use sedsnet::discovery::{DISCOVERY_ROUTE_TTL_MS, build_discovery_announce};
use sedsnet::packet::Packet;
use sedsnet::relay::Relay;
use sedsnet::router::{Clock, EndpointHandler, Router, RouterConfig};
use sedsnet::{MessageClass, MessageDataType, MessageElement, ReliableMode};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Once;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
fn zero_clock() -> Box<dyn Clock + Send + Sync> {
Box::new(|| 0u64)
}
fn ensure_common_test_schema() {
static INIT: Once = Once::new();
INIT.call_once(|| {
if endpoint_definition_by_name("RADIO").is_none() {
register_endpoint_with_description(
"RADIO",
"Radio or external link (telemetry uplink/downlink).",
false,
)
.unwrap();
}
if endpoint_definition_by_name("SD_CARD").is_none() {
register_endpoint_with_description(
"SD_CARD",
"On-board storage (e.g. SD card / flash).",
false,
)
.unwrap();
}
if data_type_definition_by_name("GPS_DATA").is_none() {
register_data_type_with_description(
"GPS_DATA",
"GPS data (typically 3x f32: latitude, longitude, altitude).",
MessageElement::Static(3, MessageDataType::Float32, MessageClass::Data),
&[DataEndpoint::named("RADIO"), DataEndpoint::named("SD_CARD")],
ReliableMode::Ordered,
80,
)
.unwrap();
}
if data_type_definition_by_name("BATTERY_STATUS").is_none() {
register_data_type_with_description(
"BATTERY_STATUS",
"Battery status (e.g. voltage, current, etc.).",
MessageElement::Static(2, MessageDataType::Float32, MessageClass::Data),
&[DataEndpoint::named("RADIO"), DataEndpoint::named("SD_CARD")],
ReliableMode::None,
60,
)
.unwrap();
}
});
}
struct SharedClock {
now_ms: Arc<AtomicU64>,
}
impl Clock for SharedClock {
fn now_ms(&self) -> u64 {
self.now_ms.load(Ordering::SeqCst)
}
}
struct SimNode {
router: Arc<Router>,
radio_hits: Arc<AtomicUsize>,
sd_hits: Arc<AtomicUsize>,
}
fn make_radio_handler(counter: Arc<AtomicUsize>) -> EndpointHandler {
EndpointHandler::new_packet_handler(DataEndpoint::named("RADIO"), move |_pkt: &Packet| {
counter.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
fn make_sd_handler(counter: Arc<AtomicUsize>) -> EndpointHandler {
EndpointHandler::new_packet_handler(DataEndpoint::named("SD_CARD"), move |_pkt: &Packet| {
counter.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
fn make_series(buf: &mut [f32], base: f32) {
for (i, v) in buf.iter_mut().enumerate() {
*v = base + (i as f32) * 0.25;
}
}
fn make_packet(ty: DataType, vals: &[f32], ts: u64) -> Packet {
Packet::from_f32_slice(
ty,
vals,
&[DataEndpoint::named("SD_CARD"), DataEndpoint::named("RADIO")],
ts,
)
.unwrap()
}
fn count_packets_of_type(pkts: &[Packet], ty: DataType) -> usize {
pkts.iter().filter(|pkt| pkt.data_type() == ty).count()
}
#[test]
fn router_runtime_route_modes_work_in_system_flow() {
ensure_common_test_schema();
let now_ms = Arc::new(AtomicU64::new(0));
let seen_a: Arc<Mutex<Vec<Packet>>> = Arc::new(Mutex::new(Vec::new()));
let seen_b: Arc<Mutex<Vec<Packet>>> = Arc::new(Mutex::new(Vec::new()));
let seen_a_c = seen_a.clone();
let seen_b_c = seen_b.clone();
let router = Router::new_with_clock(
RouterConfig::default(),
Box::new(SharedClock {
now_ms: now_ms.clone(),
}),
);
let side_a = router.add_side_packet("A", move |pkt: &Packet| -> TelemetryResult<()> {
seen_a_c.lock().unwrap().push(pkt.clone());
Ok(())
});
let side_b = router.add_side_packet("B", move |pkt: &Packet| -> TelemetryResult<()> {
seen_b_c.lock().unwrap().push(pkt.clone());
Ok(())
});
let discovery_a =
build_discovery_announce("REMOTE_A", 0, &[DataEndpoint::named("RADIO")]).unwrap();
router
.rx_packed_queue_from_side(&sedsnet::wire_format::pack_packet(&discovery_a), side_a)
.unwrap();
router.process_all_queues().unwrap();
now_ms.store(DISCOVERY_ROUTE_TTL_MS / 2, Ordering::SeqCst);
let discovery_b = build_discovery_announce(
"REMOTE_B",
DISCOVERY_ROUTE_TTL_MS / 2,
&[DataEndpoint::named("RADIO")],
)
.unwrap();
router
.rx_packed_queue_from_side(&sedsnet::wire_format::pack_packet(&discovery_b), side_b)
.unwrap();
router.process_all_queues().unwrap();
seen_a.lock().unwrap().clear();
seen_b.lock().unwrap().clear();
router
.set_source_route_mode(None, RouteSelectionMode::Weighted)
.unwrap();
router.set_route_weight(None, side_a, 2).unwrap();
router.set_route_weight(None, side_b, 1).unwrap();
for seq in 0..6 {
let pkt = Packet::from_f32_slice(
DataType::named("GPS_DATA"),
&[seq as f32, seq as f32 + 1.0, seq as f32 + 2.0],
&[DataEndpoint::named("RADIO")],
seq as u64,
)
.unwrap();
router.tx_queue(pkt).unwrap();
}
router.process_tx_queue().unwrap();
assert_eq!(
count_packets_of_type(&seen_a.lock().unwrap(), DataType::named("GPS_DATA")),
4
);
assert_eq!(
count_packets_of_type(&seen_b.lock().unwrap(), DataType::named("GPS_DATA")),
2
);
router
.set_source_route_mode(None, RouteSelectionMode::Failover)
.unwrap();
router.set_route_priority(None, side_a, 0).unwrap();
router.set_route_priority(None, side_b, 1).unwrap();
now_ms.store(DISCOVERY_ROUTE_TTL_MS + 1, Ordering::SeqCst);
let failover_pkt = Packet::from_f32_slice(
DataType::named("BATTERY_STATUS"),
&[7.0, 8.0],
&[DataEndpoint::named("RADIO")],
99,
)
.unwrap();
router.tx_queue(failover_pkt).unwrap();
router.process_tx_queue().unwrap();
assert_eq!(
count_packets_of_type(&seen_a.lock().unwrap(), DataType::named("GPS_DATA")),
4
);
assert_eq!(
count_packets_of_type(&seen_b.lock().unwrap(), DataType::named("GPS_DATA")),
2
);
assert_eq!(
count_packets_of_type(&seen_b.lock().unwrap(), DataType::named("BATTERY_STATUS")),
1
);
}
#[test]
fn relay_runtime_route_modes_work_in_system_flow() {
ensure_common_test_schema();
let now_ms = Arc::new(AtomicU64::new(0));
let seen_a: Arc<Mutex<Vec<Packet>>> = Arc::new(Mutex::new(Vec::new()));
let seen_b: Arc<Mutex<Vec<Packet>>> = Arc::new(Mutex::new(Vec::new()));
let seen_a_c = seen_a.clone();
let seen_b_c = seen_b.clone();
let relay = Relay::new(Box::new(SharedClock {
now_ms: now_ms.clone(),
}));
let side_a = relay.add_side_packet("A", move |pkt: &Packet| -> TelemetryResult<()> {
seen_a_c.lock().unwrap().push(pkt.clone());
Ok(())
});
let side_b = relay.add_side_packet("B", move |pkt: &Packet| -> TelemetryResult<()> {
seen_b_c.lock().unwrap().push(pkt.clone());
Ok(())
});
let ingress = relay.add_side_packet("INGRESS", |_pkt: &Packet| Ok(()));
let discovery_a =
build_discovery_announce("REMOTE_A", 0, &[DataEndpoint::named("RADIO")]).unwrap();
relay
.rx_packed_from_side(side_a, &sedsnet::wire_format::pack_packet(&discovery_a))
.unwrap();
relay.process_all_queues().unwrap();
now_ms.store(DISCOVERY_ROUTE_TTL_MS / 2, Ordering::SeqCst);
let discovery_b = build_discovery_announce(
"REMOTE_B",
DISCOVERY_ROUTE_TTL_MS / 2,
&[DataEndpoint::named("RADIO")],
)
.unwrap();
relay
.rx_packed_from_side(side_b, &sedsnet::wire_format::pack_packet(&discovery_b))
.unwrap();
relay.process_all_queues().unwrap();
seen_a.lock().unwrap().clear();
seen_b.lock().unwrap().clear();
relay
.set_source_route_mode(Some(ingress), RouteSelectionMode::Weighted)
.unwrap();
relay.set_route_weight(Some(ingress), side_a, 2).unwrap();
relay.set_route_weight(Some(ingress), side_b, 1).unwrap();
for seq in 0..6 {
let pkt = Packet::from_f32_slice(
DataType::named("GPS_DATA"),
&[seq as f32, seq as f32 + 1.0, seq as f32 + 2.0],
&[DataEndpoint::named("RADIO")],
seq as u64,
)
.unwrap();
relay.rx_from_side(ingress, pkt).unwrap();
}
relay.process_all_queues().unwrap();
assert_eq!(
count_packets_of_type(&seen_a.lock().unwrap(), DataType::named("GPS_DATA")),
4
);
assert_eq!(
count_packets_of_type(&seen_b.lock().unwrap(), DataType::named("GPS_DATA")),
2
);
relay
.set_source_route_mode(Some(ingress), RouteSelectionMode::Failover)
.unwrap();
relay.set_route_priority(Some(ingress), side_a, 0).unwrap();
relay.set_route_priority(Some(ingress), side_b, 1).unwrap();
relay.remove_side(side_a).unwrap();
let failover_pkt = Packet::from_f32_slice(
DataType::named("BATTERY_STATUS"),
&[7.0, 8.0],
&[DataEndpoint::named("RADIO")],
99,
)
.unwrap();
relay.rx_from_side(ingress, failover_pkt).unwrap();
relay.process_all_queues().unwrap();
assert_eq!(
count_packets_of_type(&seen_a.lock().unwrap(), DataType::named("GPS_DATA")),
4
);
assert_eq!(
count_packets_of_type(&seen_b.lock().unwrap(), DataType::named("GPS_DATA")),
2
);
assert_eq!(
count_packets_of_type(&seen_b.lock().unwrap(), DataType::named("BATTERY_STATUS")),
1
);
}
#[test]
fn threaded_system_sim_rust() {
ensure_common_test_schema();
type BusMsg = (usize, Vec<u8>);
let (bus1_tx, bus1_rx) = mpsc::channel::<BusMsg>();
let (bus2_tx, bus2_rx) = mpsc::channel::<BusMsg>();
let relay = Arc::new(Relay::new(zero_clock()));
let relay_bus1_tx = bus1_tx.clone();
let bus1_side_id =
relay.add_side_packed("bus1", move |bytes: &[u8]| -> TelemetryResult<()> {
let _ = relay_bus1_tx.send((usize::MAX, bytes.to_vec()));
Ok(())
});
let relay_bus2_tx = bus2_tx.clone();
let bus2_side_id =
relay.add_side_packed("bus2", move |bytes: &[u8]| -> TelemetryResult<()> {
let _ = relay_bus2_tx.send((usize::MAX, bytes.to_vec()));
Ok(())
});
relay
.set_route(Some(bus1_side_id), bus2_side_id, true)
.unwrap();
relay
.set_route(Some(bus2_side_id), bus1_side_id, true)
.unwrap();
let stop_flag = Arc::new(AtomicBool::new(false));
let mut nodes: Vec<SimNode> = Vec::new();
for (idx, _) in ["Radio Board", "Flight Controller Board", "Power Board"]
.iter()
.enumerate()
{
let radio_hits = Arc::new(AtomicUsize::new(0));
let sd_hits = Arc::new(AtomicUsize::new(0));
let mut handlers = Vec::<EndpointHandler>::new();
match idx {
0 => {
handlers.push(make_radio_handler(radio_hits.clone()));
}
1 => {
handlers.push(make_sd_handler(sd_hits.clone()));
}
_ => {}
}
let clock = zero_clock();
let (local_bus_tx, _relay_side_id) = if idx <= 1 {
(bus1_tx.clone(), bus1_side_id)
} else {
(bus2_tx.clone(), bus2_side_id)
};
let tx = move |bytes: &[u8]| -> TelemetryResult<()> {
let _ = local_bus_tx.send((idx, bytes.to_vec()));
Ok(())
};
let router = if handlers.is_empty() {
Router::new_with_clock(RouterConfig::default(), clock)
} else {
Router::new_with_clock(RouterConfig::new(handlers), clock)
};
router.add_side_packed("bus", tx);
nodes.push(SimNode {
router: Arc::new(router),
radio_hits,
sd_hits,
});
}
let nodes = Arc::new(nodes);
let bus1_nodes = vec![0usize, 1usize];
let bus1_nodes_arc = nodes.clone();
let bus1_stop = stop_flag.clone();
let bus1_relay = relay.clone();
let bus1_handle = thread::spawn(move || {
while !bus1_stop.load(Ordering::SeqCst) {
match bus1_rx.recv_timeout(Duration::from_millis(10)) {
Ok((from, frame)) => {
for idx in &bus1_nodes {
if *idx != from {
bus1_nodes_arc[*idx]
.router
.rx_packed_queue(&frame)
.expect("bus1: rx_packed_packet_to_queue failed");
}
}
bus1_relay
.rx_packed_from_side(bus1_side_id, &frame)
.expect("bus1 -> relay failed");
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
while let Ok((from, frame)) = bus1_rx.try_recv() {
for idx in &bus1_nodes {
if *idx != from {
bus1_nodes_arc[*idx]
.router
.rx_packed_queue(&frame)
.expect("bus1: rx_packed_packet_to_queue failed (drain)");
}
}
bus1_relay
.rx_packed_from_side(bus1_side_id, &frame)
.expect("bus1 -> relay failed (drain)");
}
});
let bus2_nodes = vec![2usize];
let bus2_nodes_arc = nodes.clone();
let bus2_stop = stop_flag.clone();
let bus2_relay = relay.clone();
let bus2_handle = thread::spawn(move || {
while !bus2_stop.load(Ordering::SeqCst) {
match bus2_rx.recv_timeout(Duration::from_millis(10)) {
Ok((from, frame)) => {
for idx in &bus2_nodes {
if *idx != from {
bus2_nodes_arc[*idx]
.router
.rx_packed_queue(&frame)
.expect("bus2: rx_packed_packet_to_queue failed");
}
}
bus2_relay
.rx_packed_from_side(bus2_side_id, &frame)
.expect("bus2 -> relay failed");
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
while let Ok((from, frame)) = bus2_rx.try_recv() {
for idx in &bus2_nodes {
if *idx != from {
bus2_nodes_arc[*idx]
.router
.rx_packed_queue(&frame)
.expect("bus2: rx_packed_packet_to_queue failed (drain)");
}
}
bus2_relay
.rx_packed_from_side(bus2_side_id, &frame)
.expect("bus2 -> relay failed (drain)");
}
});
let relay_stop = stop_flag.clone();
let relay_clone = relay.clone();
let relay_handle = thread::spawn(move || {
while !relay_stop.load(Ordering::SeqCst) {
relay_clone
.process_all_queues_with_timeout(5)
.expect("relay process_all_queues_with_timeout failed");
thread::sleep(Duration::from_millis(1));
}
for _ in 0..50 {
relay_clone
.process_all_queues_with_timeout(0)
.expect("relay final drain failed");
thread::sleep(Duration::from_millis(1));
}
});
let mut proc_handles = Vec::new();
for node in nodes.iter() {
let r = node.router.clone();
let stop = stop_flag.clone();
let handle = thread::spawn(move || {
while !stop.load(Ordering::SeqCst) {
r.process_all_queues_with_timeout(5).unwrap();
thread::sleep(Duration::from_millis(1));
}
for _ in 0..50 {
r.process_all_queues_with_timeout(0).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
proc_handles.push(handle);
}
let radio_router = nodes[0].router.clone();
let flight_router = nodes[1].router.clone();
let power_router = nodes[2].router.clone();
let sender_a = thread::spawn(move || {
let mut buf = [0.0_f32; 8];
for i in 0..5 {
make_series(&mut buf[..3], 10.0);
let pkt = make_packet(DataType::named("GPS_DATA"), &buf[..3], i);
radio_router.tx(pkt).unwrap();
thread::sleep(Duration::from_millis(5));
}
});
let sender_b = thread::spawn(move || {
let mut buf = [0.0_f32; 8];
for i in 0..5 {
make_series(&mut buf[..3], 0.5);
let pkt1 = make_packet(DataType::named("GPS_DATA"), &buf[..3], i);
flight_router.tx(pkt1).unwrap();
thread::sleep(Duration::from_millis(5));
make_series(&mut buf[..3], 101.3);
let pkt2 = make_packet(DataType::named("GPS_DATA"), &buf[..3], i + 100);
flight_router.tx(pkt2).unwrap();
thread::sleep(Duration::from_millis(5));
}
});
let sender_c = thread::spawn(move || {
let mut buf = [0.0_f32; 8];
for i in 0..5 {
make_series(&mut buf[..2], 3.7);
let pkt1 = make_packet(DataType::named("BATTERY_STATUS"), &buf[..2], i + 200);
power_router.tx(pkt1).unwrap();
thread::sleep(Duration::from_millis(5));
let msg = "hello world!";
let pkt2 = Packet::from_str_slice(
DataType::TelemetryError,
msg,
&[DataEndpoint::named("SD_CARD"), DataEndpoint::named("RADIO")],
i + 300,
)
.unwrap();
power_router.tx(pkt2).unwrap();
thread::sleep(Duration::from_millis(5));
}
});
sender_a.join().expect("sender A panicked");
sender_b.join().expect("sender B panicked");
sender_c.join().expect("sender C panicked");
let expected_total = 20;
let deadline = Instant::now() + Duration::from_secs(5);
loop {
let a_radio = nodes[0].radio_hits.load(Ordering::SeqCst);
let b_sd = nodes[1].sd_hits.load(Ordering::SeqCst);
if a_radio >= expected_total && b_sd >= expected_total {
break;
}
if Instant::now() > deadline {
eprintln!(
"Timeout waiting for processors: A.radio_hits={}, B.sd_hits={}",
a_radio, b_sd
);
break;
}
thread::sleep(Duration::from_millis(10));
}
stop_flag.store(true, Ordering::SeqCst);
for handle in proc_handles {
handle.join().expect("processor thread panicked");
}
bus1_handle.join().expect("bus1 thread panicked");
bus2_handle.join().expect("bus2 thread panicked");
relay_handle.join().expect("relay thread panicked");
let radio_board = &nodes[0];
let flight_board = &nodes[1];
let power_board = &nodes[2];
println!(
"A.radio_hits={}, B.sd_hits={}, C.radio_hits={}, C.sd_hits={}",
radio_board.radio_hits.load(Ordering::SeqCst),
flight_board.sd_hits.load(Ordering::SeqCst),
power_board.radio_hits.load(Ordering::SeqCst),
power_board.sd_hits.load(Ordering::SeqCst),
);
assert!(
radio_board.radio_hits.load(Ordering::SeqCst) >= expected_total,
"Radio Board should see at least {} packets",
expected_total
);
assert!(
flight_board.sd_hits.load(Ordering::SeqCst) >= expected_total,
"Flight Controller Board should see at least {} SD packets",
expected_total
);
assert_eq!(
power_board.radio_hits.load(Ordering::SeqCst),
0,
"Power Board must not have a radio handler"
);
assert_eq!(
power_board.sd_hits.load(Ordering::SeqCst),
0,
"Power Board must not have an SD handler"
);
}
}