playit_agent_core/agent_control/
mod.rs1use std::{future::Future, net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, sync::{atomic::AtomicUsize, Arc}, task::Poll};
2
3use playit_agent_proto::control_messages::Pong;
4use errors::SetupError;
5use tokio::{io::ReadBuf, net::UdpSocket};
6use version::get_version;
7
8use playit_api_client::{api::{ReqAgentsRoutingGet, ReqProtoRegister}, PlayitApi};
9pub use playit_api_client::api::SignedAgentKey;
10
11use crate::utils::error_helper::ErrorHelper;
12
13pub mod errors;
14
15pub mod address_selector;
16pub mod connected_control;
17pub mod established_control;
18pub mod maintained_control;
19pub mod version;
20pub mod platform;
21
22pub trait PacketIO: Send + Sync + 'static {
23 fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync + Send;
24
25 fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync + Send;
26}
27
28pub trait PacketRx: Send + Sync + 'static {
29 fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync + Send;
30}
31
32impl<T: PacketIO> PacketRx for T {
33 fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync + Send {
34 T::recv_from(self, buf)
35 }
36}
37
38impl<T: PacketIO> PacketRx for Arc<T> {
39 fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync + Send {
40 T::recv_from(self, buf)
41 }
42}
43
44pub trait PacketTx {
45 fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync + Send;
46}
47
48impl<T: PacketIO> PacketTx for T {
49 fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync + Send {
50 T::send_to(self, buf, target)
51 }
52}
53
54impl<T: PacketIO> PacketTx for Arc<T> {
55 fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync + Send {
56 T::send_to(self, buf, target)
57 }
58}
59
60pub struct DualStackUdpSocket {
61 ip4: UdpSocket,
62 ip6: Option<UdpSocket>,
63 next: AtomicUsize,
64}
65
66impl DualStackUdpSocket {
67 pub async fn new() -> std::io::Result<Self> {
68 let ip4 = UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await?;
69 let ip6 = UdpSocket::bind(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))).await.ok();
70
71 Ok(DualStackUdpSocket {
72 ip4,
73 ip6,
74 next: AtomicUsize::new(0),
75 })
76 }
77
78 pub fn local_ip4_port(&self) -> Option<u16> {
79 Some(self.ip4.local_addr().ok()?.port())
80 }
81
82 pub fn local_ip6_port(&self) -> Option<u16> {
83 Some(self.ip6.as_ref()?.local_addr().ok()?.port())
84 }
85}
86
87impl PacketIO for DualStackUdpSocket {
88 async fn send_to(&self, buf: &[u8], target: SocketAddr) -> std::io::Result<usize> {
89 if target.is_ipv6() {
90 if let Some(ip6) = &self.ip6 {
91 return ip6.send_to(buf, target).await;
92 }
93 }
94 self.ip4.send_to(buf, target).await
95 }
96
97 async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
98 let sel = self.next.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
99
100 if sel % 2 == 0 {
101 PoolBoth {
102 buffer: buf,
103 a: self.ip6.as_ref(),
104 b: Some(&self.ip4),
105 }.await
106 } else {
107 PoolBoth {
108 buffer: buf,
109 a: Some(&self.ip4),
110 b: self.ip6.as_ref(),
111 }.await
112 }
113 }
114}
115
116struct PoolBoth<'a> {
117 buffer: &'a mut [u8],
118 a: Option<&'a UdpSocket>,
119 b: Option<&'a UdpSocket>
120}
121
122impl Future for PoolBoth<'_> {
123 type Output = std::io::Result<(usize, SocketAddr)>;
124
125 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
126 let PoolBoth {
127 buffer,
128 a,
129 b
130 } = &mut *self;
131
132 let mut buf = ReadBuf::new(buffer);
133
134 if let Some(a) = a {
135 if let Poll::Ready(ready) = a.poll_recv_from(cx, &mut buf) {
136 return match ready {
137 Ok(addr) => Poll::Ready(Ok((buf.filled().len(), addr))),
138 Err(error) => Poll::Ready(Err(error))
139 };
140 }
141 }
142
143 if let Some(b) = b {
144 if let Poll::Ready(ready) = b.poll_recv_from(cx, &mut buf) {
145 return match ready {
146 Ok(addr) => Poll::Ready(Ok((buf.filled().len(), addr))),
147 Err(error) => Poll::Ready(Err(error))
148 };
149 }
150 }
151
152 Poll::Pending
153 }
154}
155
156impl PacketIO for UdpSocket {
157 fn send_to(&self, buf: &[u8], target: SocketAddr) -> impl Future<Output = std::io::Result<usize>> + Sync {
158 UdpSocket::send_to(self, buf, target)
159 }
160
161 fn recv_from(&self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<(usize, SocketAddr)>> + Sync {
162 UdpSocket::recv_from(self, buf)
163 }
164}
165
166pub trait AuthResource: Clone {
167 fn authenticate(&self, pong: &Pong) -> impl Future<Output = Result<SignedAgentKey, SetupError>> + Sync;
168
169 fn get_control_addresses(&self) -> impl Future<Output = Result<Vec<SocketAddr>, SetupError>> + Sync;
170}
171
172#[derive(Clone)]
173pub struct AuthApi {
174 client: PlayitApi,
175}
176
177impl AuthApi {
178 pub fn new(api_url: String, secret_key: String) -> Self {
179 let client = PlayitApi::create(
180 api_url,
181 Some(secret_key)
182 );
183 AuthApi { client }
184 }
185}
186
187impl AuthResource for AuthApi {
188 async fn authenticate(&self, pong: &Pong) -> Result<SignedAgentKey, SetupError> {
189 let res = self.client.proto_register(ReqProtoRegister {
190 agent_version: get_version(),
191 client_addr: pong.client_addr,
192 tunnel_addr: pong.tunnel_addr,
193 }).await.with_error(|error| tracing::error!(?error, "failed to sign and register"))?;
194
195 Ok(res)
196 }
197
198 async fn get_control_addresses(&self) -> Result<Vec<SocketAddr>, SetupError> {
199 let routing = self.client.agents_routing_get(ReqAgentsRoutingGet { agent_id: None }).await?;
200
201 let mut addresses = vec![];
202 for ip6 in routing.targets6 {
203 addresses.push(SocketAddr::new(ip6.into(), 5525));
204 }
205 for ip4 in routing.targets4 {
206 addresses.push(SocketAddr::new(ip4.into(), 5525));
207 }
208
209 Ok(addresses)
210 }
211}