1use std::{fmt::Display, io, net::SocketAddr};
2
3use serde::{Deserialize, Serialize};
4use tokio::net::UdpSocket;
5
6use crate::{message::Message, Address};
7
8pub trait Transport: Sized {
9 type TransportAddress: Address;
10 type Error;
11
12 fn bind(
13 address: Self::TransportAddress,
14 ) -> impl std::future::Future<Output = Result<Self, Self::Error>> + Send;
15
16 fn next(
18 &self,
19 ) -> impl std::future::Future<
20 Output = Option<(Message<Self::TransportAddress>, Self::TransportAddress)>,
21 > + Send;
22
23 fn send(
24 &self,
25 msg: &Message<Self::TransportAddress>,
26 target: &Self::TransportAddress,
27 ) -> impl std::future::Future<Output = Result<usize, Self::Error>> + Send;
28
29 fn address(&self) -> Self::TransportAddress;
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
33pub struct TcpAddress {
34 host: String,
35 port: u16,
36}
37
38impl From<SocketAddr> for TcpAddress {
39 fn from(value: SocketAddr) -> Self {
40 Self {
41 host: value.ip().to_string(),
42 port: value.port(),
43 }
44 }
45}
46
47impl Display for TcpAddress {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.write_fmt(format_args!("{}:{}", self.host, self.port))
50 }
51}
52
53impl Address for TcpAddress {}
54
55impl TcpAddress {
56 pub fn new(host: String, port: u16) -> Self {
57 Self { host, port }
58 }
59}
60
61pub struct UdpTransport {
62 socket: UdpSocket,
63}
64
65impl Transport for UdpTransport {
66 type TransportAddress = TcpAddress;
67 type Error = io::Error;
68
69 async fn bind(address: Self::TransportAddress) -> Result<Self, Self::Error> {
70 let socket = UdpSocket::bind(address.to_string()).await?;
71
72 log::info!(target: "udp-transport", "📞 Listening on {}", address);
73
74 Ok(Self { socket })
75 }
76
77 async fn next(&self) -> Option<(Message<Self::TransportAddress>, Self::TransportAddress)> {
78 let mut buf = [0; 1024];
80
81 match self.socket.recv_from(&mut buf).await {
82 Ok((size, peer)) => {
83 if let Ok(msg) = bincode::deserialize(&buf[..size]) {
84 Some((msg, peer.into()))
85 } else {
86 log::debug!(target: "udp-transport", "invalid message");
87 None
88 }
89 }
90 Err(e) => {
91 log::error!(target: "udp-transport", "❗️ failed to recv message: {}", e);
92 None
93 }
94 }
95 }
96
97 async fn send(
98 &self,
99 msg: &Message<Self::TransportAddress>,
100 target: &Self::TransportAddress,
101 ) -> Result<usize, Self::Error> {
102 let bytes = bincode::serialize(msg).expect("serialization should not fail");
103 self.socket.send_to(&bytes, target.to_string()).await
104 }
105
106 fn address(&self) -> Self::TransportAddress {
107 self.socket
108 .local_addr()
109 .expect("address must be present")
110 .into()
111 }
112}