clak/
transport.rs

1use std::{fmt::Display, io, net::SocketAddr};
2
3use serde::{Deserialize, Serialize};
4use tokio::net::UdpSocket;
5
6use crate::{message::Message, Address};
7
8pub trait Transport: Sized {
9    type TransportAddress: Address;
10    type Error;
11
12    fn bind(
13        address: Self::TransportAddress,
14    ) -> impl std::future::Future<Output = Result<Self, Self::Error>> + Send;
15
16    /// Wait for the next received message.
17    fn next(
18        &self,
19    ) -> impl std::future::Future<
20        Output = Option<(Message<Self::TransportAddress>, Self::TransportAddress)>,
21    > + Send;
22
23    fn send(
24        &self,
25        msg: &Message<Self::TransportAddress>,
26        target: &Self::TransportAddress,
27    ) -> impl std::future::Future<Output = Result<usize, Self::Error>> + Send;
28
29    fn address(&self) -> Self::TransportAddress;
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
33pub struct TcpAddress {
34    host: String,
35    port: u16,
36}
37
38impl From<SocketAddr> for TcpAddress {
39    fn from(value: SocketAddr) -> Self {
40        Self {
41            host: value.ip().to_string(),
42            port: value.port(),
43        }
44    }
45}
46
47impl Display for TcpAddress {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.write_fmt(format_args!("{}:{}", self.host, self.port))
50    }
51}
52
53impl Address for TcpAddress {}
54
55impl TcpAddress {
56    pub fn new(host: String, port: u16) -> Self {
57        Self { host, port }
58    }
59}
60
61pub struct UdpTransport {
62    socket: UdpSocket,
63}
64
65impl Transport for UdpTransport {
66    type TransportAddress = TcpAddress;
67    type Error = io::Error;
68
69    async fn bind(address: Self::TransportAddress) -> Result<Self, Self::Error> {
70        let socket = UdpSocket::bind(address.to_string()).await?;
71
72        log::info!(target: "udp-transport", "📞 Listening on {}", address);
73
74        Ok(Self { socket })
75    }
76
77    async fn next(&self) -> Option<(Message<Self::TransportAddress>, Self::TransportAddress)> {
78        // TODO(alexyer): unnecessary allocation
79        let mut buf = [0; 1024];
80
81        match self.socket.recv_from(&mut buf).await {
82            Ok((size, peer)) => {
83                if let Ok(msg) = bincode::deserialize(&buf[..size]) {
84                    Some((msg, peer.into()))
85                } else {
86                    log::debug!(target: "udp-transport", "invalid message");
87                    None
88                }
89            }
90            Err(e) => {
91                log::error!(target: "udp-transport", "❗️ failed to recv message: {}", e);
92                None
93            }
94        }
95    }
96
97    async fn send(
98        &self,
99        msg: &Message<Self::TransportAddress>,
100        target: &Self::TransportAddress,
101    ) -> Result<usize, Self::Error> {
102        let bytes = bincode::serialize(msg).expect("serialization should not fail");
103        self.socket.send_to(&bytes, target.to_string()).await
104    }
105
106    fn address(&self) -> Self::TransportAddress {
107        self.socket
108            .local_addr()
109            .expect("address must be present")
110            .into()
111    }
112}