use alloc::string::String;
use alloc::vec::Vec;
pub const ETHERNET_ETHERTYPE: u16 = 0xB62C;
pub const DEFAULT_UADP_PORT: u16 = 4840;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransportError {
Io(String),
Timeout,
TooLarge {
len: usize,
max: usize,
},
Closed,
}
impl core::fmt::Display for TransportError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Io(m) => write!(f, "transport I/O error: {m}"),
Self::Timeout => write!(f, "transport receive timed out"),
Self::TooLarge { len, max } => {
write!(f, "datagram of {len} bytes exceeds the {max}-byte limit")
}
Self::Closed => write!(f, "transport is closed"),
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for TransportError {}
pub trait PubSubTransport {
fn send(&self, datagram: &[u8]) -> Result<(), TransportError>;
fn receive(&self) -> Result<Vec<u8>, TransportError>;
}
pub trait MqttClient {
fn publish(&self, topic: &str, payload: &[u8]) -> Result<(), TransportError>;
fn next_message(&self) -> Result<(String, Vec<u8>), TransportError>;
}
#[must_use]
pub fn mqtt_topic(prefix: &str, publisher_id: &str, writer_group: &str) -> String {
let mut t = String::with_capacity(prefix.len() + publisher_id.len() + writer_group.len() + 2);
t.push_str(prefix);
t.push('/');
t.push_str(publisher_id);
t.push('/');
t.push_str(writer_group);
t
}
#[derive(Debug, Clone)]
pub struct MqttTransport<C: MqttClient> {
client: C,
publish_topic: String,
}
impl<C: MqttClient> MqttTransport<C> {
pub fn new(client: C, publish_topic: impl Into<String>) -> Self {
Self {
client,
publish_topic: publish_topic.into(),
}
}
pub fn client(&self) -> &C {
&self.client
}
}
impl<C: MqttClient> PubSubTransport for MqttTransport<C> {
fn send(&self, datagram: &[u8]) -> Result<(), TransportError> {
self.client.publish(&self.publish_topic, datagram)
}
fn receive(&self) -> Result<Vec<u8>, TransportError> {
self.client.next_message().map(|(_topic, payload)| payload)
}
}
pub trait AmqpClient {
fn send_to(&self, address: &str, payload: &[u8]) -> Result<(), TransportError>;
fn recv_from(&self, address: &str) -> Result<Option<Vec<u8>>, TransportError>;
}
#[derive(Debug, Clone)]
pub struct AmqpTransport<C: AmqpClient> {
client: C,
address: String,
}
impl<C: AmqpClient> AmqpTransport<C> {
pub fn new(client: C, address: impl Into<String>) -> Self {
Self {
client,
address: address.into(),
}
}
pub fn client(&self) -> &C {
&self.client
}
}
impl<C: AmqpClient> PubSubTransport for AmqpTransport<C> {
fn send(&self, datagram: &[u8]) -> Result<(), TransportError> {
self.client.send_to(&self.address, datagram)
}
fn receive(&self) -> Result<Vec<u8>, TransportError> {
self.client
.recv_from(&self.address)?
.ok_or(TransportError::Timeout)
}
}
pub trait KafkaClient {
fn produce(&self, topic: &str, payload: &[u8]) -> Result<(), TransportError>;
fn poll(&self) -> Result<Option<(String, Vec<u8>)>, TransportError>;
}
#[derive(Debug, Clone)]
pub struct KafkaTransport<C: KafkaClient> {
client: C,
topic: String,
}
impl<C: KafkaClient> KafkaTransport<C> {
pub fn new(client: C, topic: impl Into<String>) -> Self {
Self {
client,
topic: topic.into(),
}
}
pub fn client(&self) -> &C {
&self.client
}
}
impl<C: KafkaClient> PubSubTransport for KafkaTransport<C> {
fn send(&self, datagram: &[u8]) -> Result<(), TransportError> {
self.client.produce(&self.topic, datagram)
}
fn receive(&self) -> Result<Vec<u8>, TransportError> {
self.client
.poll()?
.map(|(_topic, payload)| payload)
.ok_or(TransportError::Timeout)
}
}
pub trait EthernetInterface {
fn send_frame(&self, payload: &[u8]) -> Result<(), TransportError>;
fn recv_frame(&self) -> Result<Option<Vec<u8>>, TransportError>;
}
#[derive(Debug, Clone)]
pub struct EthernetTransport<E: EthernetInterface> {
interface: E,
}
impl<E: EthernetInterface> EthernetTransport<E> {
pub fn new(interface: E) -> Self {
Self { interface }
}
pub fn interface(&self) -> &E {
&self.interface
}
}
impl<E: EthernetInterface> PubSubTransport for EthernetTransport<E> {
fn send(&self, datagram: &[u8]) -> Result<(), TransportError> {
self.interface.send_frame(datagram)
}
fn receive(&self) -> Result<Vec<u8>, TransportError> {
self.interface.recv_frame()?.ok_or(TransportError::Timeout)
}
}
#[cfg(feature = "std")]
pub use std_carriers::{LoopbackTransport, UdpTransport};
#[cfg(feature = "std")]
mod std_carriers {
use super::{PubSubTransport, TransportError};
use alloc::collections::VecDeque;
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use std::io::ErrorKind;
use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
use std::string::ToString;
use std::sync::Mutex;
const UDP_MAX_DATAGRAM: usize = 65_507;
fn io(e: std::io::Error) -> TransportError {
match e.kind() {
ErrorKind::WouldBlock | ErrorKind::TimedOut => TransportError::Timeout,
_ => TransportError::Io(e.to_string()),
}
}
#[derive(Debug)]
pub struct UdpTransport {
socket: UdpSocket,
destination: SocketAddr,
}
impl UdpTransport {
pub fn bind(local: SocketAddr, destination: SocketAddr) -> Result<Self, TransportError> {
let socket = UdpSocket::bind(local).map_err(io)?;
Ok(Self {
socket,
destination,
})
}
pub fn join_multicast_v4(
&self,
group: Ipv4Addr,
interface: Ipv4Addr,
) -> Result<(), TransportError> {
self.socket
.join_multicast_v4(&group, &interface)
.map_err(io)
}
pub fn set_read_timeout(
&self,
timeout: Option<core::time::Duration>,
) -> Result<(), TransportError> {
self.socket.set_read_timeout(timeout).map_err(io)
}
pub fn local_addr(&self) -> Result<SocketAddr, TransportError> {
self.socket.local_addr().map_err(io)
}
}
impl PubSubTransport for UdpTransport {
fn send(&self, datagram: &[u8]) -> Result<(), TransportError> {
if datagram.len() > UDP_MAX_DATAGRAM {
return Err(TransportError::TooLarge {
len: datagram.len(),
max: UDP_MAX_DATAGRAM,
});
}
let sent = self
.socket
.send_to(datagram, self.destination)
.map_err(io)?;
if sent != datagram.len() {
return Err(TransportError::Io("short UDP send".to_string()));
}
Ok(())
}
fn receive(&self) -> Result<Vec<u8>, TransportError> {
let mut buf = vec![0u8; UDP_MAX_DATAGRAM];
let (n, _src) = self.socket.recv_from(&mut buf).map_err(io)?;
buf.truncate(n);
Ok(buf)
}
}
#[derive(Debug, Clone, Default)]
pub struct LoopbackTransport {
queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl LoopbackTransport {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn pending(&self) -> usize {
self.queue.lock().map(|q| q.len()).unwrap_or(0)
}
}
impl PubSubTransport for LoopbackTransport {
fn send(&self, datagram: &[u8]) -> Result<(), TransportError> {
self.queue
.lock()
.map_err(|_| TransportError::Closed)?
.push_back(datagram.to_vec());
Ok(())
}
fn receive(&self) -> Result<Vec<u8>, TransportError> {
self.queue
.lock()
.map_err(|_| TransportError::Closed)?
.pop_front()
.ok_or(TransportError::Timeout)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::cell::RefCell;
#[test]
fn mqtt_topic_convention() {
assert_eq!(
mqtt_topic("opcua/json", "pub1", "wg1"),
"opcua/json/pub1/wg1"
);
}
struct MockMqtt {
inbox: RefCell<Vec<(String, Vec<u8>)>>,
}
impl MqttClient for MockMqtt {
fn publish(&self, topic: &str, payload: &[u8]) -> Result<(), TransportError> {
self.inbox
.borrow_mut()
.push((String::from(topic), payload.to_vec()));
Ok(())
}
fn next_message(&self) -> Result<(String, Vec<u8>), TransportError> {
self.inbox.borrow_mut().pop().ok_or(TransportError::Timeout)
}
}
#[test]
fn mqtt_transport_round_trip() {
let t = MqttTransport::new(
MockMqtt {
inbox: RefCell::new(Vec::new()),
},
"opcua/uadp/pub1/wg1",
);
t.send(&[1, 2, 3, 4]).expect("send");
assert_eq!(t.receive().expect("recv"), alloc::vec![1, 2, 3, 4]);
assert_eq!(t.receive(), Err(TransportError::Timeout));
}
struct MockAmqp {
inbox: RefCell<Vec<(String, Vec<u8>)>>,
}
impl AmqpClient for MockAmqp {
fn send_to(&self, address: &str, payload: &[u8]) -> Result<(), TransportError> {
self.inbox
.borrow_mut()
.push((String::from(address), payload.to_vec()));
Ok(())
}
fn recv_from(&self, address: &str) -> Result<Option<Vec<u8>>, TransportError> {
let mut inbox = self.inbox.borrow_mut();
if let Some(pos) = inbox.iter().position(|(a, _)| a == address) {
Ok(Some(inbox.remove(pos).1))
} else {
Ok(None)
}
}
}
#[test]
fn amqp_transport_round_trip() {
let t = AmqpTransport::new(
MockAmqp {
inbox: RefCell::new(Vec::new()),
},
"/topic/pub1.wg1",
);
t.send(&[9, 8, 7]).expect("send");
assert_eq!(t.receive().expect("recv"), alloc::vec![9, 8, 7]);
assert_eq!(t.receive(), Err(TransportError::Timeout));
}
struct MockKafka {
log: RefCell<Vec<(String, Vec<u8>)>>,
}
impl KafkaClient for MockKafka {
fn produce(&self, topic: &str, payload: &[u8]) -> Result<(), TransportError> {
self.log
.borrow_mut()
.push((String::from(topic), payload.to_vec()));
Ok(())
}
fn poll(&self) -> Result<Option<(String, Vec<u8>)>, TransportError> {
Ok(self.log.borrow_mut().pop())
}
}
#[test]
fn kafka_transport_round_trip() {
let t = KafkaTransport::new(
MockKafka {
log: RefCell::new(Vec::new()),
},
"opcua.uadp.wg1",
);
t.send(&[4, 5, 6]).expect("send");
assert_eq!(t.receive().expect("recv"), alloc::vec![4, 5, 6]);
assert_eq!(t.receive(), Err(TransportError::Timeout));
}
struct MockEth {
wire: RefCell<Vec<Vec<u8>>>,
}
impl EthernetInterface for MockEth {
fn send_frame(&self, payload: &[u8]) -> Result<(), TransportError> {
self.wire.borrow_mut().push(payload.to_vec());
Ok(())
}
fn recv_frame(&self) -> Result<Option<Vec<u8>>, TransportError> {
let mut w = self.wire.borrow_mut();
Ok((!w.is_empty()).then(|| w.remove(0)))
}
}
#[test]
fn ethernet_transport_round_trip() {
let t = EthernetTransport::new(MockEth {
wire: RefCell::new(Vec::new()),
});
t.send(&[0xB6, 0x2C]).expect("send");
assert_eq!(t.receive().expect("recv"), alloc::vec![0xB6, 0x2C]);
assert_eq!(t.receive(), Err(TransportError::Timeout));
}
#[cfg(feature = "std")]
#[test]
fn loopback_round_trip_across_clones() {
let tx = LoopbackTransport::new();
let rx = tx.clone();
assert_eq!(tx.receive(), Err(TransportError::Timeout));
tx.send(&[0xDE, 0xAD]).expect("send");
tx.send(&[0xBE, 0xEF]).expect("send");
assert_eq!(rx.pending(), 2);
assert_eq!(rx.receive().expect("r1"), alloc::vec![0xDE, 0xAD]);
assert_eq!(rx.receive().expect("r2"), alloc::vec![0xBE, 0xEF]);
assert_eq!(rx.receive(), Err(TransportError::Timeout));
}
#[cfg(feature = "std")]
#[test]
fn udp_unicast_localhost_round_trip() {
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
let recv = UdpTransport::bind(
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)),
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)),
)
.expect("bind recv");
recv.set_read_timeout(Some(core::time::Duration::from_secs(2)))
.expect("timeout");
let recv_addr = recv.local_addr().expect("addr");
let send = UdpTransport::bind(
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)),
recv_addr,
)
.expect("bind send");
send.send(&[0x01, 0x02, 0x03]).expect("send");
assert_eq!(recv.receive().expect("recv"), alloc::vec![0x01, 0x02, 0x03]);
}
}