actr_runtime/wire/webrtc/
signaling.rs

1//! signaling clientImplementation
2//!
3//! Based on protobuf Definition'ssignalingprotocol,using SignalingEnvelope conclude construct
4
5use crate::transport::error::{NetworkError, NetworkResult};
6use actr_protocol::prost::Message as ProstMessage;
7use actr_protocol::{
8    AIdCredential, ActrId, ActrToSignaling, PeerToSignaling, Ping, RegisterRequest,
9    RegisterResponse, ServiceAvailabilityState, SignalingEnvelope, actr_to_signaling,
10    peer_to_signaling, signaling_envelope, signaling_to_actr,
11};
12use async_trait::async_trait;
13use futures_util::{SinkExt, StreamExt};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use tokio::net::TcpStream;
17use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
18use url::Url;
19
20// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
21// configurationType
22// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
23
24/// signalingconfiguration
25#[derive(Debug, Clone)]
26pub struct SignalingConfig {
27    /// signaling server URL
28    pub server_url: Url,
29
30    /// Connecttimeout temporal duration (seconds)
31    pub connection_timeout: u64,
32
33    /// center skipinterval(seconds)
34    pub heartbeat_interval: u64,
35
36    /// reconnection configuration
37    pub reconnect_config: ReconnectConfig,
38
39    /// acknowledge verify configuration
40    pub auth_config: Option<AuthConfig>,
41}
42
43/// reconnection configuration
44#[derive(Debug, Clone)]
45pub struct ReconnectConfig {
46    /// whether start usage automatic reconnection
47    pub enabled: bool,
48
49    /// maximum reconnection attempts
50    pub max_attempts: u32,
51
52    /// initial reconnection delay(seconds)
53    pub initial_delay: u64,
54
55    /// maximum reconnection delay(seconds)
56    pub max_delay: u64,
57
58    /// Backoff multiplier factor
59    pub backoff_multiplier: f64,
60}
61
62impl Default for ReconnectConfig {
63    fn default() -> Self {
64        Self {
65            enabled: true,
66            max_attempts: 10,
67            initial_delay: 1,
68            max_delay: 60,
69            backoff_multiplier: 2.0,
70        }
71    }
72}
73
74/// acknowledge verify configuration
75#[derive(Debug, Clone)]
76pub struct AuthConfig {
77    /// acknowledge verify Type
78    pub auth_type: AuthType,
79
80    /// acknowledge verify credential data
81    pub credentials: HashMap<String, String>,
82}
83
84/// acknowledge verify Type
85#[derive(Debug, Clone)]
86pub enum AuthType {
87    /// no acknowledge verify
88    None,
89    /// Bearer Token
90    BearerToken,
91    /// API Key
92    ApiKey,
93    /// JWT
94    Jwt,
95}
96
97// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
98// Client interface and implementation
99// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
100
101/// signaling client connect port
102///
103/// # interior mutability
104/// allMethodusing `&self` and non `&mut self`, with for conveniencein Arc in shared。
105/// Implementation class needs interior mutability ( like Mutex)to manage WebSocket connection status。
106#[async_trait]
107pub trait SignalingClient: Send + Sync {
108    /// Connecttosignaling server
109    async fn connect(&self) -> NetworkResult<()>;
110
111    /// DisconnectConnect
112    async fn disconnect(&self) -> NetworkResult<()>;
113
114    /// SendRegisterrequest(Register front stream process ,using PeerToSignaling)
115    async fn send_register_request(
116        &self,
117        request: RegisterRequest,
118    ) -> NetworkResult<RegisterResponse>;
119
120    /// Send center skip(Registerafter stream process ,using ActrToSignaling)
121    async fn send_heartbeat(
122        &self,
123        actor_id: ActrId,
124        credential: AIdCredential,
125        availability: ServiceAvailabilityState,
126        power_reserve: f32,
127        mailbox_backlog: f32,
128    ) -> NetworkResult<()>;
129
130    /// Sendsignalingsignal seal ( pass usage Method)
131    async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()>;
132
133    /// Receivesignalingsignal seal
134    async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>>;
135
136    /// Check connection status
137    fn is_connected(&self) -> bool;
138
139    /// GetConnect statistics info
140    fn get_stats(&self) -> SignalingStats;
141}
142
143/// WebSocket signaling clientImplementation
144pub struct WebSocketSignalingClient {
145    config: SignalingConfig,
146    /// WebSocket write end (using Mutex Implementation interior mutability )
147    ws_sink: tokio::sync::Mutex<
148        Option<
149            futures_util::stream::SplitSink<
150                WebSocketStream<MaybeTlsStream<TcpStream>>,
151                tokio_tungstenite::tungstenite::Message,
152            >,
153        >,
154    >,
155    /// WebSocket read end (using Mutex Implementation interior mutability )
156    ws_stream: tokio::sync::Mutex<
157        Option<futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
158    >,
159    /// connection status
160    connected: tokio::sync::RwLock<bool>,
161    /// statistics info
162    stats: tokio::sync::RwLock<SignalingStats>,
163    /// Envelope count number device
164    envelope_counter: tokio::sync::Mutex<u64>,
165}
166
167impl WebSocketSignalingClient {
168    /// Create new WebSocket signaling client
169    pub fn new(config: SignalingConfig) -> Self {
170        Self {
171            config,
172            ws_sink: tokio::sync::Mutex::new(None),
173            ws_stream: tokio::sync::Mutex::new(None),
174            connected: tokio::sync::RwLock::new(false),
175            stats: tokio::sync::RwLock::new(SignalingStats::default()),
176            envelope_counter: tokio::sync::Mutex::new(0),
177        }
178    }
179
180    /// simple for convenience construct create Function
181    pub async fn connect_to(url: &str) -> NetworkResult<Self> {
182        let config = SignalingConfig {
183            server_url: url.parse()?,
184            connection_timeout: 30,
185            heartbeat_interval: 30,
186            reconnect_config: ReconnectConfig::default(),
187            auth_config: None,
188        };
189
190        let client = Self::new(config);
191        client.connect().await?;
192        Ok(client)
193    }
194
195    /// alive integrate down a envelope ID
196    async fn next_envelope_id(&self) -> String {
197        let mut counter = self.envelope_counter.lock().await;
198        *counter += 1;
199        format!("env-{}", *counter)
200    }
201
202    /// Create SignalingEnvelope
203    async fn create_envelope(&self, flow: signaling_envelope::Flow) -> SignalingEnvelope {
204        SignalingEnvelope {
205            envelope_version: 1,
206            envelope_id: self.next_envelope_id().await,
207            reply_for: None,
208            timestamp: prost_types::Timestamp {
209                seconds: chrono::Utc::now().timestamp(),
210                nanos: 0,
211            },
212            flow: Some(flow),
213        }
214    }
215}
216
217#[async_trait]
218impl SignalingClient for WebSocketSignalingClient {
219    async fn connect(&self) -> NetworkResult<()> {
220        let (ws_stream, _) = connect_async(self.config.server_url.as_str()).await?;
221
222        // distribute apart read write stream
223        let (sink, stream) = ws_stream.split();
224
225        *self.ws_sink.lock().await = Some(sink);
226        *self.ws_stream.lock().await = Some(stream);
227        *self.connected.write().await = true;
228
229        let mut stats = self.stats.write().await;
230        stats.connections += 1;
231
232        Ok(())
233    }
234
235    async fn disconnect(&self) -> NetworkResult<()> {
236        // fetch exit sink and stream
237        let mut sink_guard = self.ws_sink.lock().await;
238        let mut stream_guard = self.ws_stream.lock().await;
239
240        // Close sink
241        if let Some(mut sink) = sink_guard.take() {
242            let _ = sink.close().await;
243        }
244
245        // clear blank stream
246        stream_guard.take();
247
248        *self.connected.write().await = false;
249
250        let mut stats = self.stats.write().await;
251        stats.disconnections += 1;
252
253        Ok(())
254    }
255
256    async fn send_register_request(
257        &self,
258        request: RegisterRequest,
259    ) -> NetworkResult<RegisterResponse> {
260        // Create PeerToSignaling stream process (Register front )
261        let flow = signaling_envelope::Flow::PeerToServer(PeerToSignaling {
262            payload: Some(peer_to_signaling::Payload::RegisterRequest(request)),
263        });
264
265        let envelope = self.create_envelope(flow).await;
266        self.send_envelope(envelope).await?;
267
268        // Wait forRegisterresponse respond
269        loop {
270            if let Some(response_envelope) = self.receive_envelope().await? {
271                if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) =
272                    response_envelope.flow
273                {
274                    if let Some(signaling_to_actr::Payload::RegisterResponse(response)) =
275                        server_to_actr.payload
276                    {
277                        return Ok(response);
278                    }
279                }
280            } else {
281                return Err(NetworkError::ConnectionError(
282                    "Connection closed while waiting for registration response".to_string(),
283                ));
284            }
285        }
286    }
287
288    async fn send_heartbeat(
289        &self,
290        actor_id: ActrId,
291        credential: AIdCredential,
292        availability: ServiceAvailabilityState,
293        power_reserve: f32,
294        mailbox_backlog: f32,
295    ) -> NetworkResult<()> {
296        let ping = Ping {
297            availability: availability as i32,
298            power_reserve,
299            mailbox_backlog,
300            sticky_client_ids: vec![], // TODO: Implement sticky session tracking
301        };
302
303        let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
304            source: actor_id,
305            credential,
306            payload: Some(actr_to_signaling::Payload::Ping(ping)),
307        });
308
309        let envelope = self.create_envelope(flow).await;
310        self.send_envelope(envelope).await
311    }
312
313    async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()> {
314        let mut sink_guard = self.ws_sink.lock().await;
315
316        if let Some(sink) = sink_guard.as_mut() {
317            // using protobuf binary serialization
318            let mut buf = Vec::new();
319            envelope.encode(&mut buf)?;
320            let msg = tokio_tungstenite::tungstenite::Message::Binary(buf.into());
321            sink.send(msg).await?;
322
323            let mut stats = self.stats.write().await;
324            stats.messages_sent += 1;
325
326            Ok(())
327        } else {
328            Err(NetworkError::ConnectionError("Not connected".to_string()))
329        }
330    }
331
332    async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
333        let mut stream_guard = self.ws_stream.lock().await;
334
335        if let Some(stream) = stream_guard.as_mut() {
336            if let Some(msg) = stream.next().await {
337                let msg = msg?;
338                match msg {
339                    tokio_tungstenite::tungstenite::Message::Binary(data) => {
340                        // using protobuf decode
341                        let envelope = SignalingEnvelope::decode(&data[..])?;
342
343                        let mut stats = self.stats.write().await;
344                        stats.messages_received += 1;
345
346                        Ok(Some(envelope))
347                    }
348                    _ => Ok(None),
349                }
350            } else {
351                Ok(None)
352            }
353        } else {
354            Err(NetworkError::ConnectionError("Not connected".to_string()))
355        }
356    }
357
358    fn is_connected(&self) -> bool {
359        // using blocking API read connection status
360        // Note: this inmayinasynccontext in adjust usage , but RwLock::blocking_read() in short temporal duration hold has temporal is safe secure 's
361        *self.connected.blocking_read()
362    }
363
364    fn get_stats(&self) -> SignalingStats {
365        // using blocking API read statistics info
366        self.stats.blocking_read().clone()
367    }
368}
369
370/// signaling statistics info
371#[derive(Debug, Clone, Default, Serialize, Deserialize)]
372pub struct SignalingStats {
373    /// Connect attempts
374    pub connections: u64,
375
376    /// DisconnectConnect attempts
377    pub disconnections: u64,
378
379    /// Send'smessage number
380    pub messages_sent: u64,
381
382    /// Receive'smessage number
383    pub messages_received: u64,
384
385    /// Send's center skip number
386    pub heartbeats_sent: u64,
387
388    /// Receive's center skip number
389    pub heartbeats_received: u64,
390
391    /// Error attempts
392    pub errors: u64,
393}