sma_proto/client/
session.rs1use byteorder_cursor::Cursor;
20use socket2::{Domain, Socket, Type};
22use std::net::{Ipv4Addr, SocketAddrV4};
23use tokio::net::UdpSocket;
24
25use super::{AnySmaMessage, ClientError, Error, SmaSerde};
26
27#[derive(Debug)]
31pub struct SmaSession {
32 multicast: bool,
33 dst_sockaddr: SocketAddrV4,
34 socket: UdpSocket,
35}
36
37impl SmaSession {
38 const BUFFER_SIZE: usize = 1030;
40
41 const SMA_PORT: u16 = 9522;
42 const SMA_MCAST_ADDR: Ipv4Addr = Ipv4Addr::new(239, 12, 255, 254);
43
44 pub fn open_unicast(remote_addr: Ipv4Addr) -> Result<Self, ClientError> {
47 let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
48 socket.bind(&SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into())?;
49 socket.set_nonblocking(true)?;
50
51 Ok(Self {
52 multicast: false,
53 socket: UdpSocket::from_std(socket.into())?,
54 dst_sockaddr: SocketAddrV4::new(remote_addr, Self::SMA_PORT),
55 })
56 }
57
58 pub fn open_multicast(local_addr: Ipv4Addr) -> Result<Self, ClientError> {
61 let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
62 socket.set_reuse_address(true)?;
63 socket.bind(
64 &SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), Self::SMA_PORT)
65 .into(),
66 )?;
67 socket.set_nonblocking(true)?;
68
69 socket.set_multicast_loop_v4(false)?;
70 socket.set_multicast_if_v4(&local_addr)?;
71 socket.join_multicast_v4(&Self::SMA_MCAST_ADDR, &local_addr)?;
72
73 Ok(Self {
74 multicast: true,
75 socket: UdpSocket::from_std(socket.into())?,
76 dst_sockaddr: SocketAddrV4::new(
77 Self::SMA_MCAST_ADDR,
78 Self::SMA_PORT,
79 ),
80 })
81 }
82
83 pub(crate) async fn write<T: SmaSerde>(
84 &self,
85 msg: T,
86 ) -> Result<(), ClientError> {
87 let mut buffer = [0u8; Self::BUFFER_SIZE];
88 let mut cursor = Cursor::new(&mut buffer[..]);
89
90 msg.serialize(&mut cursor)?;
91 let len = cursor.position();
92
93 Ok(self
94 .socket
95 .send_to(&buffer[..len], self.dst_sockaddr)
96 .await
97 .map(|_| ())?)
98 }
99
100 pub(crate) async fn read<T: SmaSerde>(
101 &self,
102 predicate: impl Fn(AnySmaMessage) -> Option<T>,
103 ) -> Result<T, ClientError> {
104 let mut buffer = [0u8; Self::BUFFER_SIZE];
105
106 loop {
107 let (rx_len, rx_addr) = self.socket.recv_from(&mut buffer).await?;
108
109 if self.multicast || rx_addr.ip() == *self.dst_sockaddr.ip() {
110 let mut cursor = Cursor::new(&buffer[..rx_len]);
114 let message = match AnySmaMessage::deserialize(&mut cursor) {
115 Ok(x) => x,
116 Err(Error::UnsupportedProtocol { .. })
118 if self.multicast =>
119 {
120 continue
121 }
122 Err(e) => return Err(e.into()),
123 };
124
125 if let Some(x) = predicate(message) {
126 return Ok(x);
127 }
128 }
129 }
130 }
131}