playit_agent_core/agent_control/
maintained_control.rs

1use std::future::Future;
2use std::net::SocketAddr;
3use std::time::Duration;
4
5use playit_agent_proto::control_feed::{ControlFeed, NewClient};
6use playit_agent_proto::control_messages::{ControlResponse, UdpChannelDetails};
7
8use crate::agent_control::errors::TryTimeoutHelper;
9use crate::agent_control::established_control::EstablishedControl;
10use crate::utils::now_milli;
11
12use super::address_selector::AddressSelector;
13use super::connected_control::ConnectedControl;
14use super::errors::SetupError;
15use super::{AuthResource, PacketIO};
16
17pub struct MaintainedControl<I: PacketIO, A: AuthResource> {
18    control: EstablishedControl<A, I>,
19    last_keep_alive: u64,
20    last_ping: u64,
21    last_pong: u64,
22    last_udp_auth: u64,
23    last_control_targets: Vec<SocketAddr>,
24}
25
26impl<I: PacketIO, A: AuthResource> MaintainedControl<I, A> {
27    pub async fn setup(io: I, auth: A) -> Result<Self, SetupError> {
28        let addresses = auth.get_control_addresses().await?;
29        let setup = AddressSelector::new(addresses.clone(), io)
30            .connect_to_first()
31            .try_timeout(Duration::from_secs(10))
32            .await?;
33
34        let control_channel = setup
35            .auth_into_established(auth)
36            .try_timeout(Duration::from_secs(10))
37            .await?;
38
39        Ok(MaintainedControl {
40            control: control_channel,
41            last_keep_alive: 0,
42            last_ping: 0,
43            last_pong: 0,
44            last_udp_auth: 0,
45            last_control_targets: addresses,
46        })
47    }
48
49    pub async fn reload_control_addr<E: Into<SetupError>, C: Future<Output = Result<I, E>>>(
50        &mut self,
51        create_io: C,
52    ) -> Result<bool, SetupError> {
53        let addresses = self
54            .control
55            .auth
56            .get_control_addresses()
57            .try_timeout(Duration::from_secs(5))
58            .await?;
59
60        if self.last_control_targets == addresses {
61            return Ok(false);
62        }
63
64        let new_io = async { create_io.await.map_err(|e| e.into()) }
65            .try_timeout(Duration::from_secs(5))
66            .await?;
67
68        let connected = AddressSelector::new(addresses.clone(), new_io)
69            .connect_to_first()
70            .try_timeout(Duration::from_secs(10))
71            .await?;
72
73        let updated = self
74            .replace_connection(connected, false)
75            .try_timeout(Duration::from_secs(5))
76            .await?;
77
78        self.last_control_targets = addresses;
79        Ok(updated)
80    }
81
82    pub async fn replace_connection(
83        &mut self,
84        mut connected: ConnectedControl<I>,
85        force: bool,
86    ) -> Result<bool, SetupError> {
87        if !force
88            && self.control.conn.pong_latest.client_addr.ip()
89                == connected.pong_latest.client_addr.ip()
90            && self.control.conn.pong_latest.tunnel_addr == connected.pong_latest.tunnel_addr
91        {
92            return Ok(false);
93        }
94
95        let registered = connected
96            .authenticate(&self.control.auth)
97            .try_timeout(Duration::from_secs(10))
98            .await?;
99
100        tracing::info!(old = %self.control.conn.pong_latest.tunnel_addr, new = %connected.pong_latest.tunnel_addr, "update control address");
101        connected.reset_established(&mut self.control, registered);
102
103        Ok(true)
104    }
105
106    pub async fn send_udp_session_auth(&mut self, now_ms: u64, min_wait_ms: u64) -> bool {
107        if now_ms < self.last_udp_auth + min_wait_ms {
108            return false;
109        }
110
111        self.last_udp_auth = now_ms;
112        if let Err(error) = self
113            .control
114            .send_setup_udp_channel(1)
115            .try_timeout(Duration::from_secs(5))
116            .await
117        {
118            tracing::error!(?error, "failed to send setup udp channel request");
119        }
120
121        true
122    }
123
124    pub async fn update(&mut self) -> Option<TunnelControlEvent> {
125        if let Some(reason) = self.control.is_expired() {
126            tracing::warn!(?reason, "session expired");
127
128            if let Err(error) = self
129                .control
130                .authenticate()
131                .try_timeout(Duration::from_secs(5))
132                .await
133            {
134                tracing::error!(?error, "failed to authenticate");
135                tokio::time::sleep(Duration::from_secs(2)).await;
136                return None;
137            }
138        }
139
140        let now = now_milli();
141        if now - self.last_ping > 1_000 {
142            self.last_ping = now;
143
144            if let Err(error) = self
145                .control
146                .send_ping(200, now)
147                .try_timeout(Duration::from_secs(1))
148                .await
149            {
150                tracing::error!(?error, "failed to send ping");
151            }
152        }
153
154        let time_till_expire = self.control.get_expire_at().max(now) - now;
155        tracing::trace!(time_till_expire, "time till expire");
156
157        /* keep alive every 60s or every 10s if expiring soon */
158        let interval = if time_till_expire < 30_000 {
159            10_000
160        } else {
161            60_000
162        };
163
164        if interval < now - self.last_keep_alive {
165            self.last_keep_alive = now;
166
167            tracing::info!(time_till_expire, "send KeepAlive");
168            if let Err(error) = self
169                .control
170                .send_keep_alive(100)
171                .try_timeout(Duration::from_secs(1))
172                .await
173            {
174                tracing::error!(?error, "failed to send KeepAlive");
175            }
176        }
177
178        let mut timeouts = 0;
179
180        for _ in 0..30 {
181            match tokio::time::timeout(Duration::from_millis(100), self.control.recv_feed_msg())
182                .await
183            {
184                Ok(Ok(ControlFeed::NewClient(new_client))) => {
185                    return Some(TunnelControlEvent::NewClient(new_client))
186                }
187                Ok(Ok(ControlFeed::NewClientOld(new_client))) => {
188                    return Some(TunnelControlEvent::NewClient(new_client.into()))
189                }
190                Ok(Ok(ControlFeed::Response(msg))) => match msg.content {
191                    ControlResponse::UdpChannelDetails(details) => {
192                        return Some(TunnelControlEvent::UdpChannelDetails(details))
193                    }
194                    ControlResponse::Unauthorized => {
195                        tracing::info!("session no longer authorized");
196                        self.control.set_expired();
197                    }
198                    ControlResponse::Pong(pong) => {
199                        self.last_pong = now_milli();
200
201                        if pong.client_addr != self.control.pong_at_auth.client_addr {
202                            tracing::info!(
203                                new_client = %pong.client_addr,
204                                old_client = %self.control.pong_at_auth.client_addr,
205                                "client ip changed"
206                            );
207                        }
208                    }
209                    msg => {
210                        tracing::debug!(?msg, "got response");
211                    }
212                },
213                Ok(Err(error)) => {
214                    tracing::error!(?error, "failed to parse response");
215                }
216                Err(_) => {
217                    timeouts += 1;
218
219                    if timeouts >= 10 {
220                        tracing::trace!("feed recv timeout");
221                        break;
222                    }
223                }
224            }
225        }
226
227        if self.last_pong != 0 && now_milli() - self.last_pong > 6_000 {
228            tracing::info!("timeout waiting for pong");
229
230            self.last_pong = 0;
231            self.control.set_expired();
232        }
233
234        None
235    }
236}
237
238pub enum TunnelControlEvent {
239    NewClient(NewClient),
240    UdpChannelDetails(UdpChannelDetails),
241}