playit_agent_core/agent_control/
maintained_control.rs1use std::future::Future;
2use std::net::SocketAddr;
3use std::time::Duration;
4
5use playit_agent_proto::control_feed::{ControlFeed, NewClient};
6use playit_agent_proto::control_messages::{ControlResponse, UdpChannelDetails};
7
8use crate::agent_control::errors::TryTimeoutHelper;
9use crate::agent_control::established_control::EstablishedControl;
10use crate::utils::now_milli;
11
12use super::address_selector::AddressSelector;
13use super::connected_control::ConnectedControl;
14use super::errors::SetupError;
15use super::{AuthResource, PacketIO};
16
17pub struct MaintainedControl<I: PacketIO, A: AuthResource> {
18 control: EstablishedControl<A, I>,
19 last_keep_alive: u64,
20 last_ping: u64,
21 last_pong: u64,
22 last_udp_auth: u64,
23 last_control_targets: Vec<SocketAddr>,
24}
25
26impl<I: PacketIO, A: AuthResource> MaintainedControl<I, A> {
27 pub async fn setup(io: I, auth: A) -> Result<Self, SetupError> {
28 let addresses = auth.get_control_addresses().await?;
29 let setup = AddressSelector::new(addresses.clone(), io)
30 .connect_to_first()
31 .try_timeout(Duration::from_secs(10))
32 .await?;
33
34 let control_channel = setup
35 .auth_into_established(auth)
36 .try_timeout(Duration::from_secs(10))
37 .await?;
38
39 Ok(MaintainedControl {
40 control: control_channel,
41 last_keep_alive: 0,
42 last_ping: 0,
43 last_pong: 0,
44 last_udp_auth: 0,
45 last_control_targets: addresses,
46 })
47 }
48
49 pub async fn reload_control_addr<E: Into<SetupError>, C: Future<Output = Result<I, E>>>(
50 &mut self,
51 create_io: C,
52 ) -> Result<bool, SetupError> {
53 let addresses = self
54 .control
55 .auth
56 .get_control_addresses()
57 .try_timeout(Duration::from_secs(5))
58 .await?;
59
60 if self.last_control_targets == addresses {
61 return Ok(false);
62 }
63
64 let new_io = async { create_io.await.map_err(|e| e.into()) }
65 .try_timeout(Duration::from_secs(5))
66 .await?;
67
68 let connected = AddressSelector::new(addresses.clone(), new_io)
69 .connect_to_first()
70 .try_timeout(Duration::from_secs(10))
71 .await?;
72
73 let updated = self
74 .replace_connection(connected, false)
75 .try_timeout(Duration::from_secs(5))
76 .await?;
77
78 self.last_control_targets = addresses;
79 Ok(updated)
80 }
81
82 pub async fn replace_connection(
83 &mut self,
84 mut connected: ConnectedControl<I>,
85 force: bool,
86 ) -> Result<bool, SetupError> {
87 if !force
88 && self.control.conn.pong_latest.client_addr.ip()
89 == connected.pong_latest.client_addr.ip()
90 && self.control.conn.pong_latest.tunnel_addr == connected.pong_latest.tunnel_addr
91 {
92 return Ok(false);
93 }
94
95 let registered = connected
96 .authenticate(&self.control.auth)
97 .try_timeout(Duration::from_secs(10))
98 .await?;
99
100 tracing::info!(old = %self.control.conn.pong_latest.tunnel_addr, new = %connected.pong_latest.tunnel_addr, "update control address");
101 connected.reset_established(&mut self.control, registered);
102
103 Ok(true)
104 }
105
106 pub async fn send_udp_session_auth(&mut self, now_ms: u64, min_wait_ms: u64) -> bool {
107 if now_ms < self.last_udp_auth + min_wait_ms {
108 return false;
109 }
110
111 self.last_udp_auth = now_ms;
112 if let Err(error) = self
113 .control
114 .send_setup_udp_channel(1)
115 .try_timeout(Duration::from_secs(5))
116 .await
117 {
118 tracing::error!(?error, "failed to send setup udp channel request");
119 }
120
121 true
122 }
123
124 pub async fn update(&mut self) -> Option<TunnelControlEvent> {
125 if let Some(reason) = self.control.is_expired() {
126 tracing::warn!(?reason, "session expired");
127
128 if let Err(error) = self
129 .control
130 .authenticate()
131 .try_timeout(Duration::from_secs(5))
132 .await
133 {
134 tracing::error!(?error, "failed to authenticate");
135 tokio::time::sleep(Duration::from_secs(2)).await;
136 return None;
137 }
138 }
139
140 let now = now_milli();
141 if now - self.last_ping > 1_000 {
142 self.last_ping = now;
143
144 if let Err(error) = self
145 .control
146 .send_ping(200, now)
147 .try_timeout(Duration::from_secs(1))
148 .await
149 {
150 tracing::error!(?error, "failed to send ping");
151 }
152 }
153
154 let time_till_expire = self.control.get_expire_at().max(now) - now;
155 tracing::trace!(time_till_expire, "time till expire");
156
157 let interval = if time_till_expire < 30_000 {
159 10_000
160 } else {
161 60_000
162 };
163
164 if interval < now - self.last_keep_alive {
165 self.last_keep_alive = now;
166
167 tracing::info!(time_till_expire, "send KeepAlive");
168 if let Err(error) = self
169 .control
170 .send_keep_alive(100)
171 .try_timeout(Duration::from_secs(1))
172 .await
173 {
174 tracing::error!(?error, "failed to send KeepAlive");
175 }
176 }
177
178 let mut timeouts = 0;
179
180 for _ in 0..30 {
181 match tokio::time::timeout(Duration::from_millis(100), self.control.recv_feed_msg())
182 .await
183 {
184 Ok(Ok(ControlFeed::NewClient(new_client))) => {
185 return Some(TunnelControlEvent::NewClient(new_client))
186 }
187 Ok(Ok(ControlFeed::NewClientOld(new_client))) => {
188 return Some(TunnelControlEvent::NewClient(new_client.into()))
189 }
190 Ok(Ok(ControlFeed::Response(msg))) => match msg.content {
191 ControlResponse::UdpChannelDetails(details) => {
192 return Some(TunnelControlEvent::UdpChannelDetails(details))
193 }
194 ControlResponse::Unauthorized => {
195 tracing::info!("session no longer authorized");
196 self.control.set_expired();
197 }
198 ControlResponse::Pong(pong) => {
199 self.last_pong = now_milli();
200
201 if pong.client_addr != self.control.pong_at_auth.client_addr {
202 tracing::info!(
203 new_client = %pong.client_addr,
204 old_client = %self.control.pong_at_auth.client_addr,
205 "client ip changed"
206 );
207 }
208 }
209 msg => {
210 tracing::debug!(?msg, "got response");
211 }
212 },
213 Ok(Err(error)) => {
214 tracing::error!(?error, "failed to parse response");
215 }
216 Err(_) => {
217 timeouts += 1;
218
219 if timeouts >= 10 {
220 tracing::trace!("feed recv timeout");
221 break;
222 }
223 }
224 }
225 }
226
227 if self.last_pong != 0 && now_milli() - self.last_pong > 6_000 {
228 tracing::info!("timeout waiting for pong");
229
230 self.last_pong = 0;
231 self.control.set_expired();
232 }
233
234 None
235 }
236}
237
238pub enum TunnelControlEvent {
239 NewClient(NewClient),
240 UdpChannelDetails(UdpChannelDetails),
241}