use bytes::Bytes;
use mavlink::{MavHeader, MavlinkVersion};
use std::fmt;
use tokio::sync::broadcast;
use crate::mavlink_utils::MessageTarget;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct EndpointId(pub usize);
impl fmt::Display for EndpointId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Endpoint({})", self.0)
}
}
#[derive(Clone, Debug)]
pub struct RoutedMessage {
pub source_id: EndpointId,
pub header: MavHeader,
pub message_id: u32,
#[allow(dead_code)]
pub version: MavlinkVersion,
pub timestamp_us: u64,
pub serialized_bytes: Bytes,
pub target: MessageTarget,
}
#[derive(Clone)]
pub struct MessageBus {
pub tx: broadcast::Sender<RoutedMessage>,
}
impl MessageBus {
pub fn subscribe(&self) -> broadcast::Receiver<RoutedMessage> {
self.tx.subscribe()
}
pub fn sender(&self) -> broadcast::Sender<RoutedMessage> {
self.tx.clone()
}
}
pub fn create_bus(capacity: usize) -> MessageBus {
let (tx, _rx) = broadcast::channel(capacity);
MessageBus { tx }
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use super::*;
#[tokio::test]
async fn test_bus_filtering() {
let bus = create_bus(1000); let mut rx = bus.subscribe();
let msg = RoutedMessage {
source_id: EndpointId(1),
header: MavHeader::default(),
message_id: 0, version: MavlinkVersion::V2,
timestamp_us: 0,
serialized_bytes: Bytes::new(),
target: MessageTarget {
system_id: 0,
component_id: 0,
},
};
bus.tx
.send(msg.clone())
.expect("Failed to send test message");
let received = rx.recv().await.expect("Failed to receive test message");
assert_eq!(received.source_id, EndpointId(1));
}
}