actr_runtime/wire/webrtc/
signaling.rs1use crate::transport::error::{NetworkError, NetworkResult};
6use actr_protocol::prost::Message as ProstMessage;
7use actr_protocol::{
8 AIdCredential, ActrId, ActrToSignaling, PeerToSignaling, Ping, RegisterRequest,
9 RegisterResponse, ServiceAvailabilityState, SignalingEnvelope, actr_to_signaling,
10 peer_to_signaling, signaling_envelope, signaling_to_actr,
11};
12use async_trait::async_trait;
13use futures_util::{SinkExt, StreamExt};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use tokio::net::TcpStream;
17use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
18use url::Url;
19
20#[derive(Debug, Clone)]
26pub struct SignalingConfig {
27 pub server_url: Url,
29
30 pub connection_timeout: u64,
32
33 pub heartbeat_interval: u64,
35
36 pub reconnect_config: ReconnectConfig,
38
39 pub auth_config: Option<AuthConfig>,
41}
42
43#[derive(Debug, Clone)]
45pub struct ReconnectConfig {
46 pub enabled: bool,
48
49 pub max_attempts: u32,
51
52 pub initial_delay: u64,
54
55 pub max_delay: u64,
57
58 pub backoff_multiplier: f64,
60}
61
62impl Default for ReconnectConfig {
63 fn default() -> Self {
64 Self {
65 enabled: true,
66 max_attempts: 10,
67 initial_delay: 1,
68 max_delay: 60,
69 backoff_multiplier: 2.0,
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct AuthConfig {
77 pub auth_type: AuthType,
79
80 pub credentials: HashMap<String, String>,
82}
83
84#[derive(Debug, Clone)]
86pub enum AuthType {
87 None,
89 BearerToken,
91 ApiKey,
93 Jwt,
95}
96
97#[async_trait]
107pub trait SignalingClient: Send + Sync {
108 async fn connect(&self) -> NetworkResult<()>;
110
111 async fn disconnect(&self) -> NetworkResult<()>;
113
114 async fn send_register_request(
116 &self,
117 request: RegisterRequest,
118 ) -> NetworkResult<RegisterResponse>;
119
120 async fn send_heartbeat(
122 &self,
123 actor_id: ActrId,
124 credential: AIdCredential,
125 availability: ServiceAvailabilityState,
126 power_reserve: f32,
127 mailbox_backlog: f32,
128 ) -> NetworkResult<()>;
129
130 async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()>;
132
133 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>>;
135
136 fn is_connected(&self) -> bool;
138
139 fn get_stats(&self) -> SignalingStats;
141}
142
143pub struct WebSocketSignalingClient {
145 config: SignalingConfig,
146 ws_sink: tokio::sync::Mutex<
148 Option<
149 futures_util::stream::SplitSink<
150 WebSocketStream<MaybeTlsStream<TcpStream>>,
151 tokio_tungstenite::tungstenite::Message,
152 >,
153 >,
154 >,
155 ws_stream: tokio::sync::Mutex<
157 Option<futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
158 >,
159 connected: tokio::sync::RwLock<bool>,
161 stats: tokio::sync::RwLock<SignalingStats>,
163 envelope_counter: tokio::sync::Mutex<u64>,
165}
166
167impl WebSocketSignalingClient {
168 pub fn new(config: SignalingConfig) -> Self {
170 Self {
171 config,
172 ws_sink: tokio::sync::Mutex::new(None),
173 ws_stream: tokio::sync::Mutex::new(None),
174 connected: tokio::sync::RwLock::new(false),
175 stats: tokio::sync::RwLock::new(SignalingStats::default()),
176 envelope_counter: tokio::sync::Mutex::new(0),
177 }
178 }
179
180 pub async fn connect_to(url: &str) -> NetworkResult<Self> {
182 let config = SignalingConfig {
183 server_url: url.parse()?,
184 connection_timeout: 30,
185 heartbeat_interval: 30,
186 reconnect_config: ReconnectConfig::default(),
187 auth_config: None,
188 };
189
190 let client = Self::new(config);
191 client.connect().await?;
192 Ok(client)
193 }
194
195 async fn next_envelope_id(&self) -> String {
197 let mut counter = self.envelope_counter.lock().await;
198 *counter += 1;
199 format!("env-{}", *counter)
200 }
201
202 async fn create_envelope(&self, flow: signaling_envelope::Flow) -> SignalingEnvelope {
204 SignalingEnvelope {
205 envelope_version: 1,
206 envelope_id: self.next_envelope_id().await,
207 reply_for: None,
208 timestamp: prost_types::Timestamp {
209 seconds: chrono::Utc::now().timestamp(),
210 nanos: 0,
211 },
212 flow: Some(flow),
213 }
214 }
215}
216
217#[async_trait]
218impl SignalingClient for WebSocketSignalingClient {
219 async fn connect(&self) -> NetworkResult<()> {
220 let (ws_stream, _) = connect_async(self.config.server_url.as_str()).await?;
221
222 let (sink, stream) = ws_stream.split();
224
225 *self.ws_sink.lock().await = Some(sink);
226 *self.ws_stream.lock().await = Some(stream);
227 *self.connected.write().await = true;
228
229 let mut stats = self.stats.write().await;
230 stats.connections += 1;
231
232 Ok(())
233 }
234
235 async fn disconnect(&self) -> NetworkResult<()> {
236 let mut sink_guard = self.ws_sink.lock().await;
238 let mut stream_guard = self.ws_stream.lock().await;
239
240 if let Some(mut sink) = sink_guard.take() {
242 let _ = sink.close().await;
243 }
244
245 stream_guard.take();
247
248 *self.connected.write().await = false;
249
250 let mut stats = self.stats.write().await;
251 stats.disconnections += 1;
252
253 Ok(())
254 }
255
256 async fn send_register_request(
257 &self,
258 request: RegisterRequest,
259 ) -> NetworkResult<RegisterResponse> {
260 let flow = signaling_envelope::Flow::PeerToServer(PeerToSignaling {
262 payload: Some(peer_to_signaling::Payload::RegisterRequest(request)),
263 });
264
265 let envelope = self.create_envelope(flow).await;
266 self.send_envelope(envelope).await?;
267
268 loop {
270 if let Some(response_envelope) = self.receive_envelope().await? {
271 if let Some(signaling_envelope::Flow::ServerToActr(server_to_actr)) =
272 response_envelope.flow
273 {
274 if let Some(signaling_to_actr::Payload::RegisterResponse(response)) =
275 server_to_actr.payload
276 {
277 return Ok(response);
278 }
279 }
280 } else {
281 return Err(NetworkError::ConnectionError(
282 "Connection closed while waiting for registration response".to_string(),
283 ));
284 }
285 }
286 }
287
288 async fn send_heartbeat(
289 &self,
290 actor_id: ActrId,
291 credential: AIdCredential,
292 availability: ServiceAvailabilityState,
293 power_reserve: f32,
294 mailbox_backlog: f32,
295 ) -> NetworkResult<()> {
296 let ping = Ping {
297 availability: availability as i32,
298 power_reserve,
299 mailbox_backlog,
300 sticky_client_ids: vec![], };
302
303 let flow = signaling_envelope::Flow::ActrToServer(ActrToSignaling {
304 source: actor_id,
305 credential,
306 payload: Some(actr_to_signaling::Payload::Ping(ping)),
307 });
308
309 let envelope = self.create_envelope(flow).await;
310 self.send_envelope(envelope).await
311 }
312
313 async fn send_envelope(&self, envelope: SignalingEnvelope) -> NetworkResult<()> {
314 let mut sink_guard = self.ws_sink.lock().await;
315
316 if let Some(sink) = sink_guard.as_mut() {
317 let mut buf = Vec::new();
319 envelope.encode(&mut buf)?;
320 let msg = tokio_tungstenite::tungstenite::Message::Binary(buf.into());
321 sink.send(msg).await?;
322
323 let mut stats = self.stats.write().await;
324 stats.messages_sent += 1;
325
326 Ok(())
327 } else {
328 Err(NetworkError::ConnectionError("Not connected".to_string()))
329 }
330 }
331
332 async fn receive_envelope(&self) -> NetworkResult<Option<SignalingEnvelope>> {
333 let mut stream_guard = self.ws_stream.lock().await;
334
335 if let Some(stream) = stream_guard.as_mut() {
336 if let Some(msg) = stream.next().await {
337 let msg = msg?;
338 match msg {
339 tokio_tungstenite::tungstenite::Message::Binary(data) => {
340 let envelope = SignalingEnvelope::decode(&data[..])?;
342
343 let mut stats = self.stats.write().await;
344 stats.messages_received += 1;
345
346 Ok(Some(envelope))
347 }
348 _ => Ok(None),
349 }
350 } else {
351 Ok(None)
352 }
353 } else {
354 Err(NetworkError::ConnectionError("Not connected".to_string()))
355 }
356 }
357
358 fn is_connected(&self) -> bool {
359 *self.connected.blocking_read()
362 }
363
364 fn get_stats(&self) -> SignalingStats {
365 self.stats.blocking_read().clone()
367 }
368}
369
370#[derive(Debug, Clone, Default, Serialize, Deserialize)]
372pub struct SignalingStats {
373 pub connections: u64,
375
376 pub disconnections: u64,
378
379 pub messages_sent: u64,
381
382 pub messages_received: u64,
384
385 pub heartbeats_sent: u64,
387
388 pub heartbeats_received: u64,
390
391 pub errors: u64,
393}