playit_agent_core/agent_control/
established_control.rs

1use playit_agent_proto::control_feed::ControlFeed;
2use playit_agent_proto::control_messages::{AgentRegistered, ControlRequest, ControlResponse, Ping, Pong};
3use playit_agent_proto::rpc::ControlRpcMessage;
4
5use crate::utils::now_milli;
6
7use super::connected_control::ConnectedControl;
8use super::errors::{ControlError, SetupError};
9use super::{AuthResource, PacketIO};
10
11pub struct EstablishedControl<A: AuthResource, IO: PacketIO> {
12    pub(super) auth: A,
13    pub(super) conn: ConnectedControl<IO>,
14    pub(super) pong_at_auth: Pong,
15    pub(super) registered: AgentRegistered,
16    pub(super) current_ping: Option<u32>,
17    pub(super) clock_offset: i64,
18    pub(super) force_expired: bool,
19}
20
21impl<A: AuthResource, IO: PacketIO> EstablishedControl<A, IO> {
22    pub async fn send_keep_alive(&mut self, request_id: u64) -> Result<(), ControlError> {
23        self.send(ControlRpcMessage {
24            request_id,
25            content: ControlRequest::AgentKeepAlive(self.registered.id.clone()),
26        }).await
27    }
28
29    pub async fn send_setup_udp_channel(&mut self, request_id: u64) -> Result<(), ControlError> {
30        self.send(ControlRpcMessage {
31            request_id,
32            content: ControlRequest::SetupUdpChannel(self.registered.id.clone()),
33        }).await
34    }
35
36    pub async fn send_ping(&mut self, request_id: u64, now: u64) -> Result<(), ControlError> {
37        self.send(ControlRpcMessage {
38            request_id,
39            content: ControlRequest::Ping(Ping { now, current_ping: self.current_ping, session_id: Some(self.registered.id.clone()) }),
40        }).await
41    }
42
43    pub fn get_expire_at(&self) -> u64 {
44        self.registered.expires_at
45    }
46
47    pub fn is_expired(&self) -> Option<ExpiredReason> {
48        if self.force_expired {
49            return Some(ExpiredReason::Forced);
50        }
51        if self.pong_at_auth.session_expire_at.is_none() {
52            return Some(ExpiredReason::SessionNotSetup);
53        }
54        if self.flow_changed() {
55            return Some(ExpiredReason::FlowChanged);
56        }
57        None
58    }
59
60    pub fn set_expired(&mut self) {
61        self.force_expired = true;
62    }
63
64    fn flow_changed(&self) -> bool {
65        self.conn.pong_latest.client_addr != self.pong_at_auth.client_addr 
66            || self.conn.pong_latest.tunnel_addr != self.pong_at_auth.tunnel_addr
67    }
68
69    async fn send(&mut self, req: ControlRpcMessage<ControlRequest>) -> Result<(), ControlError> {
70        self.conn.send(&req).await?;
71        Ok(())
72    }
73
74    pub async fn authenticate(&mut self) -> Result<(), SetupError> {
75        let registered = self.conn.authenticate(&self.auth).await?;
76
77        self.force_expired = false;
78        self.registered = registered;
79        self.pong_at_auth = self.conn.pong_latest.clone();
80
81        tracing::info!(
82            last_pong = ?self.pong_at_auth,
83            "authenticate control"
84        );
85
86        Ok(())
87    }
88
89    pub fn into_connected(self) -> ConnectedControl<IO> {
90        self.conn
91    }
92
93    pub async fn recv_feed_msg(&mut self) -> Result<ControlFeed, ControlError> {
94        let feed = self.conn.recv().await?;
95        
96        if let ControlFeed::Response(res) = &feed {
97            match &res.content {
98                ControlResponse::AgentRegistered(registered) => {
99                    tracing::info!(details = ?registered, "agent registered");
100                    self.registered = registered.clone();
101                }
102                ControlResponse::Pong(pong) => {
103                    let now = now_milli();
104                    let rtt = (now.max(pong.request_now) - pong.request_now) as u32;
105
106                    let server_ts = pong.server_now - (rtt / 2) as u64;
107                    let local_ts = pong.request_now;
108                    self.clock_offset = local_ts as i64 - server_ts as i64;
109
110                    if 10_000 < self.clock_offset.abs() {
111                        tracing::warn!(offset = self.clock_offset, "local timestamp if over 10 seconds off");
112                    }
113
114                    self.current_ping = Some(rtt);
115
116                    if let Some(expires_at) = pong.session_expire_at {
117                        /* normalize to local timestamp to handle when host clock is wrong */
118                        self.registered.expires_at = pong.request_now + (expires_at - pong.server_now).max(rtt as u64) - rtt as u64;
119                    }
120                }
121                _ => {}
122            }
123        }
124
125        Ok(feed)
126    }
127}
128
129
130#[derive(Debug, PartialEq, Eq)]
131pub enum ExpiredReason {
132    Forced,
133    SessionNotSetup,
134    FlowChanged,
135}