playit_agent_core/agent_control/
established_control.rs1use playit_agent_proto::control_feed::ControlFeed;
2use playit_agent_proto::control_messages::{AgentRegistered, ControlRequest, ControlResponse, Ping, Pong};
3use playit_agent_proto::rpc::ControlRpcMessage;
4
5use crate::utils::now_milli;
6
7use super::connected_control::ConnectedControl;
8use super::errors::{ControlError, SetupError};
9use super::{AuthResource, PacketIO};
10
11pub struct EstablishedControl<A: AuthResource, IO: PacketIO> {
12 pub(super) auth: A,
13 pub(super) conn: ConnectedControl<IO>,
14 pub(super) pong_at_auth: Pong,
15 pub(super) registered: AgentRegistered,
16 pub(super) current_ping: Option<u32>,
17 pub(super) clock_offset: i64,
18 pub(super) force_expired: bool,
19}
20
21impl<A: AuthResource, IO: PacketIO> EstablishedControl<A, IO> {
22 pub async fn send_keep_alive(&mut self, request_id: u64) -> Result<(), ControlError> {
23 self.send(ControlRpcMessage {
24 request_id,
25 content: ControlRequest::AgentKeepAlive(self.registered.id.clone()),
26 }).await
27 }
28
29 pub async fn send_setup_udp_channel(&mut self, request_id: u64) -> Result<(), ControlError> {
30 self.send(ControlRpcMessage {
31 request_id,
32 content: ControlRequest::SetupUdpChannel(self.registered.id.clone()),
33 }).await
34 }
35
36 pub async fn send_ping(&mut self, request_id: u64, now: u64) -> Result<(), ControlError> {
37 self.send(ControlRpcMessage {
38 request_id,
39 content: ControlRequest::Ping(Ping { now, current_ping: self.current_ping, session_id: Some(self.registered.id.clone()) }),
40 }).await
41 }
42
43 pub fn get_expire_at(&self) -> u64 {
44 self.registered.expires_at
45 }
46
47 pub fn is_expired(&self) -> Option<ExpiredReason> {
48 if self.force_expired {
49 return Some(ExpiredReason::Forced);
50 }
51 if self.pong_at_auth.session_expire_at.is_none() {
52 return Some(ExpiredReason::SessionNotSetup);
53 }
54 if self.flow_changed() {
55 return Some(ExpiredReason::FlowChanged);
56 }
57 None
58 }
59
60 pub fn set_expired(&mut self) {
61 self.force_expired = true;
62 }
63
64 fn flow_changed(&self) -> bool {
65 self.conn.pong_latest.client_addr != self.pong_at_auth.client_addr
66 || self.conn.pong_latest.tunnel_addr != self.pong_at_auth.tunnel_addr
67 }
68
69 async fn send(&mut self, req: ControlRpcMessage<ControlRequest>) -> Result<(), ControlError> {
70 self.conn.send(&req).await?;
71 Ok(())
72 }
73
74 pub async fn authenticate(&mut self) -> Result<(), SetupError> {
75 let registered = self.conn.authenticate(&self.auth).await?;
76
77 self.force_expired = false;
78 self.registered = registered;
79 self.pong_at_auth = self.conn.pong_latest.clone();
80
81 tracing::info!(
82 last_pong = ?self.pong_at_auth,
83 "authenticate control"
84 );
85
86 Ok(())
87 }
88
89 pub fn into_connected(self) -> ConnectedControl<IO> {
90 self.conn
91 }
92
93 pub async fn recv_feed_msg(&mut self) -> Result<ControlFeed, ControlError> {
94 let feed = self.conn.recv().await?;
95
96 if let ControlFeed::Response(res) = &feed {
97 match &res.content {
98 ControlResponse::AgentRegistered(registered) => {
99 tracing::info!(details = ?registered, "agent registered");
100 self.registered = registered.clone();
101 }
102 ControlResponse::Pong(pong) => {
103 let now = now_milli();
104 let rtt = (now.max(pong.request_now) - pong.request_now) as u32;
105
106 let server_ts = pong.server_now - (rtt / 2) as u64;
107 let local_ts = pong.request_now;
108 self.clock_offset = local_ts as i64 - server_ts as i64;
109
110 if 10_000 < self.clock_offset.abs() {
111 tracing::warn!(offset = self.clock_offset, "local timestamp if over 10 seconds off");
112 }
113
114 self.current_ping = Some(rtt);
115
116 if let Some(expires_at) = pong.session_expire_at {
117 self.registered.expires_at = pong.request_now + (expires_at - pong.server_now).max(rtt as u64) - rtt as u64;
119 }
120 }
121 _ => {}
122 }
123 }
124
125 Ok(feed)
126 }
127}
128
129
130#[derive(Debug, PartialEq, Eq)]
131pub enum ExpiredReason {
132 Forced,
133 SessionNotSetup,
134 FlowChanged,
135}