playit_agent_core/agent_control/
mod.rs

1use std::{future::Future, net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, sync::{atomic::AtomicUsize, Arc}, task::Poll};
2
3use playit_agent_proto::control_messages::Pong;
4use errors::SetupError;
5use tokio::{io::ReadBuf, net::UdpSocket};
6use version::get_version;
7
8use playit_api_client::{api::{ReqAgentsRoutingGet, ReqProtoRegister}, PlayitApi};
9pub use playit_api_client::api::SignedAgentKey;
10
11use crate::utils::error_helper::ErrorHelper;
12
13pub mod errors;
14
15pub mod address_selector;
16pub mod connected_control;
17pub mod established_control;
18pub mod maintained_control;
19pub mod version;
20pub mod platform;
21
22pub trait PacketIO: Send + Sync + 'static {
23    fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync + Send;
24
25    fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync + Send;
26}
27
28pub trait PacketRx: Send + Sync + 'static {
29    fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync + Send;
30}
31
32impl<T: PacketIO> PacketRx for T {
33    fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync + Send {
34        T::recv_from(self, buf)
35    }
36}
37
38impl<T: PacketIO> PacketRx for Arc<T> {
39    fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync + Send {
40        T::recv_from(self, buf)
41    }
42}
43
44pub trait PacketTx {
45    fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync + Send;
46}
47
48impl<T: PacketIO> PacketTx for T {
49    fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync + Send {
50        T::send_to(self, buf, target)
51    }
52}
53
54impl<T: PacketIO> PacketTx for Arc<T> {
55    fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync + Send {
56        T::send_to(self, buf, target)
57    }
58}
59
60pub struct DualStackUdpSocket {
61    ip4: UdpSocket,
62    ip6: Option<UdpSocket>,
63    next: AtomicUsize,
64}
65
66impl DualStackUdpSocket {
67    pub async fn new() -> std::io::Result<Self> {
68        let ip4 = UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await?;
69        let ip6 = UdpSocket::bind(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))).await.ok();
70
71        Ok(DualStackUdpSocket {
72            ip4,
73            ip6,
74            next: AtomicUsize::new(0),
75        })
76    }
77
78    pub fn local_ip4_port(&self) -> Option<u16> {
79        Some(self.ip4.local_addr().ok()?.port())
80    }
81
82    pub fn local_ip6_port(&self) -> Option<u16> {
83        Some(self.ip6.as_ref()?.local_addr().ok()?.port())
84    }
85}
86
87impl PacketIO for DualStackUdpSocket {
88    async fn send_to(&self, buf: &[u8], target: SocketAddr) -> std::io::Result<usize> {
89        if target.is_ipv6() {
90            if let Some(ip6) = &self.ip6 {
91                return ip6.send_to(buf, target).await;
92            }
93        }
94        self.ip4.send_to(buf, target).await
95    }
96
97    async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
98        let sel = self.next.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
99        
100        if sel % 2 == 0 {
101            PoolBoth {
102                buffer: buf,
103                a: self.ip6.as_ref(),
104                b: Some(&self.ip4),
105            }.await
106        } else {
107            PoolBoth {
108                buffer: buf,
109                a: Some(&self.ip4),
110                b: self.ip6.as_ref(),
111            }.await
112        }
113    }
114}
115
116struct PoolBoth<'a> {
117    buffer: &'a mut [u8],
118    a: Option<&'a UdpSocket>,
119    b: Option<&'a UdpSocket>
120}
121
122impl Future for PoolBoth<'_> {
123    type Output = std::io::Result<(usize, SocketAddr)>;
124
125    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
126        let PoolBoth {
127            buffer,
128            a,
129            b
130        } = &mut *self;
131
132        let mut buf = ReadBuf::new(buffer);
133
134        if let Some(a) = a {
135            if let Poll::Ready(ready) = a.poll_recv_from(cx, &mut buf) {
136                return match ready {
137                    Ok(addr) => Poll::Ready(Ok((buf.filled().len(), addr))),
138                    Err(error) => Poll::Ready(Err(error))
139                };
140            }
141        }
142        
143        if let Some(b) = b {
144            if let Poll::Ready(ready) = b.poll_recv_from(cx, &mut buf) {
145                return match ready {
146                    Ok(addr) => Poll::Ready(Ok((buf.filled().len(), addr))),
147                    Err(error) => Poll::Ready(Err(error))
148                };
149            }
150        }
151
152        Poll::Pending
153    }
154}
155
156impl PacketIO for UdpSocket {
157    fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync {
158        UdpSocket::send_to(self, buf, target)
159    }
160
161    fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync {
162        UdpSocket::recv_from(self, buf)
163    }
164}
165
166pub trait AuthResource: Clone {
167    fn authenticate(&self, pong: &Pong) -> impl Future<Output = Result<SignedAgentKey, SetupError>> + Sync;
168
169    fn get_control_addresses(&self) -> impl Future<Output = Result<Vec<SocketAddr>, SetupError>> + Sync;
170}
171
172#[derive(Clone)]
173pub struct AuthApi {
174    client: PlayitApi,
175}
176
177impl AuthApi {
178    pub fn new(api_url: String, secret_key: String) -> Self {
179        let client = PlayitApi::create(
180            api_url,
181            Some(secret_key)
182        );
183        AuthApi { client }
184    }
185}
186
187impl AuthResource for AuthApi {
188    async fn authenticate(&self, pong: &Pong) -> Result<SignedAgentKey, SetupError> {
189        let res = self.client.proto_register(ReqProtoRegister {
190            agent_version: get_version(),
191            client_addr: pong.client_addr,
192            tunnel_addr: pong.tunnel_addr,
193        }).await.with_error(|error| tracing::error!(?error, "failed to sign and register"))?;
194
195        Ok(res)
196    }
197
198    async fn get_control_addresses(&self) -> Result<Vec<SocketAddr>, SetupError> {
199        let routing = self.client.agents_routing_get(ReqAgentsRoutingGet { agent_id: None }).await?;
200
201        let mut addresses = vec![];
202        for ip6 in routing.targets6 {
203            addresses.push(SocketAddr::new(ip6.into(), 5525));
204        }
205        for ip4 in routing.targets4 {
206            addresses.push(SocketAddr::new(ip4.into(), 5525));
207        }
208
209        Ok(addresses)
210    }
211}