sma_proto/client/
session.rs

1/******************************************************************************\
2    sma-proto - A SMA Speedwire protocol library
3    Copyright (C) 2024 Max Maisel
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published by
7    the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17\******************************************************************************/
18
19use byteorder_cursor::Cursor;
20// Required for set_multicast_if_v4 and set_reuse_address
21use socket2::{Domain, Socket, Type};
22use std::net::{Ipv4Addr, SocketAddrV4};
23use tokio::net::UdpSocket;
24
25use super::{AnySmaMessage, ClientError, Error, SmaSerde};
26
27/// SMA client session instance that holds the network dependent state
28/// for communication with a single unicast device, or a group of multicast
29/// devices.
30#[derive(Debug)]
31pub struct SmaSession {
32    multicast: bool,
33    dst_sockaddr: SocketAddrV4,
34    socket: UdpSocket,
35}
36
37impl SmaSession {
38    /// Largest seen SMA speedwire packet size before fragmentation.
39    const BUFFER_SIZE: usize = 1030;
40
41    const SMA_PORT: u16 = 9522;
42    const SMA_MCAST_ADDR: Ipv4Addr = Ipv4Addr::new(239, 12, 255, 254);
43
44    /// Opens a unicast network socket for communication with a single SMA
45    /// device identified by a IP address.
46    pub fn open_unicast(remote_addr: Ipv4Addr) -> Result<Self, ClientError> {
47        let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
48        socket.bind(&SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into())?;
49        socket.set_nonblocking(true)?;
50
51        Ok(Self {
52            multicast: false,
53            socket: UdpSocket::from_std(socket.into())?,
54            dst_sockaddr: SocketAddrV4::new(remote_addr, Self::SMA_PORT),
55        })
56    }
57
58    /// Opens a multicast network socket on the given local IPv4 address for
59    /// communication with a group of SMA devices.
60    pub fn open_multicast(local_addr: Ipv4Addr) -> Result<Self, ClientError> {
61        let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
62        socket.set_reuse_address(true)?;
63        socket.bind(
64            &SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), Self::SMA_PORT)
65                .into(),
66        )?;
67        socket.set_nonblocking(true)?;
68
69        socket.set_multicast_loop_v4(false)?;
70        socket.set_multicast_if_v4(&local_addr)?;
71        socket.join_multicast_v4(&Self::SMA_MCAST_ADDR, &local_addr)?;
72
73        Ok(Self {
74            multicast: true,
75            socket: UdpSocket::from_std(socket.into())?,
76            dst_sockaddr: SocketAddrV4::new(
77                Self::SMA_MCAST_ADDR,
78                Self::SMA_PORT,
79            ),
80        })
81    }
82
83    pub(crate) async fn write<T: SmaSerde>(
84        &self,
85        msg: T,
86    ) -> Result<(), ClientError> {
87        let mut buffer = [0u8; Self::BUFFER_SIZE];
88        let mut cursor = Cursor::new(&mut buffer[..]);
89
90        msg.serialize(&mut cursor)?;
91        let len = cursor.position();
92
93        Ok(self
94            .socket
95            .send_to(&buffer[..len], self.dst_sockaddr)
96            .await
97            .map(|_| ())?)
98    }
99
100    pub(crate) async fn read<T: SmaSerde>(
101        &self,
102        predicate: impl Fn(AnySmaMessage) -> Option<T>,
103    ) -> Result<T, ClientError> {
104        let mut buffer = [0u8; Self::BUFFER_SIZE];
105
106        loop {
107            let (rx_len, rx_addr) = self.socket.recv_from(&mut buffer).await?;
108
109            if self.multicast || rx_addr.ip() == *self.dst_sockaddr.ip() {
110                // Since speedwire is a multicast protocol, receiving an
111                // incorrect message type is not necessarily an
112                // error as it could be just another broadcast message.
113                let mut cursor = Cursor::new(&buffer[..rx_len]);
114                let message = match AnySmaMessage::deserialize(&mut cursor) {
115                    Ok(x) => x,
116                    // Ignore unknown SMA protocols in multicast mode.
117                    Err(Error::UnsupportedProtocol { .. })
118                        if self.multicast =>
119                    {
120                        continue
121                    }
122                    Err(e) => return Err(e.into()),
123                };
124
125                if let Some(x) = predicate(message) {
126                    return Ok(x);
127                }
128            }
129        }
130    }
131}