actr_runtime/transport/
wire_builder.rs1use super::Dest; use super::error::{NetworkError, NetworkResult};
10use super::manager::WireBuilder;
11use super::wire_handle::WireHandle;
12use crate::wire::webrtc::WebRtcCoordinator;
13use crate::wire::websocket::WebSocketConnection;
14use async_trait::async_trait;
15use std::sync::Arc;
16use tokio_util::sync::CancellationToken;
17
18pub struct DefaultWireBuilderConfig {
20 pub websocket_url_template: Option<String>,
22
23 pub enable_webrtc: bool,
25
26 pub enable_websocket: bool,
28}
29
30impl Default for DefaultWireBuilderConfig {
31 fn default() -> Self {
32 Self {
33 websocket_url_template: None,
34 enable_webrtc: true,
35 enable_websocket: false, }
37 }
38}
39
40pub struct DefaultWireBuilder {
45 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
47
48 websocket_url_template: Option<String>,
50
51 config: DefaultWireBuilderConfig,
53}
54
55impl DefaultWireBuilder {
56 pub fn new(
62 webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
63 config: DefaultWireBuilderConfig,
64 ) -> Self {
65 Self {
66 webrtc_coordinator,
67 websocket_url_template: config.websocket_url_template.clone(),
68 config,
69 }
70 }
71
72 fn build_websocket_url(&self, dest: &Dest) -> Option<String> {
74 let template = self.websocket_url_template.as_ref()?;
75
76 match dest {
77 Dest::Actor(actor_id) => {
78 let url = template.replace("{actor_id}", &actor_id.serial_number.to_string());
80 Some(url)
81 }
82 Dest::Shell | Dest::Local => {
83 None
86 }
87 }
88 }
89}
90
91#[async_trait]
92impl WireBuilder for DefaultWireBuilder {
93 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
94 async fn create_connections(&self, dest: &Dest) -> NetworkResult<Vec<WireHandle>> {
95 self.create_connections_with_cancel(dest, None).await
97 }
98
99 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
100 async fn create_connections_with_cancel(
101 &self,
102 dest: &Dest,
103 cancel_token: Option<CancellationToken>,
104 ) -> NetworkResult<Vec<WireHandle>> {
105 let mut connections = Vec::new();
106
107 let check_cancelled = |token: &Option<CancellationToken>| -> NetworkResult<()> {
109 if let Some(t) = token {
110 if t.is_cancelled() {
111 return Err(NetworkError::ConnectionClosed(
112 "Connection creation cancelled".to_string(),
113 ));
114 }
115 }
116 Ok(())
117 };
118
119 check_cancelled(&cancel_token)?;
121
122 if self.config.enable_websocket {
124 check_cancelled(&cancel_token)?;
125
126 if let Some(url) = self.build_websocket_url(dest) {
127 tracing::debug!("🏭 [Factory] Create WebSocket Connect: {}", url);
128 let ws_conn = WebSocketConnection::new(url);
129 connections.push(WireHandle::WebSocket(ws_conn));
130 } else {
131 tracing::warn!(
132 "⚠️ [Factory] WebSocket enabled but no method construct build URL: {:?}",
133 dest
134 );
135 }
136 }
137
138 check_cancelled(&cancel_token)?;
140
141 if self.config.enable_webrtc {
143 if let Some(coordinator) = &self.webrtc_coordinator {
144 if dest.is_actor() {
146 tracing::debug!("🏭 [Factory] Create WebRTC Connectto: {:?}", dest);
147
148 check_cancelled(&cancel_token)?;
150
151 match coordinator
152 .create_connection(dest, cancel_token.clone())
153 .await
154 {
155 Ok(webrtc_conn) => {
156 if let Err(e) = check_cancelled(&cancel_token) {
158 if let Err(close_err) = webrtc_conn.close().await {
160 tracing::warn!(
161 "⚠️ [Factory] Failed to close cancelled connection: {}",
162 close_err
163 );
164 }
165 return Err(e);
166 }
167 connections.push(WireHandle::WebRTC(webrtc_conn));
168 }
169 Err(e) => {
170 tracing::warn!(
171 "❌ [Factory] WebRTC ConnectCreatefailure: {:?}: {}",
172 dest,
173 e
174 );
175 }
177 }
178 } else {
179 tracing::debug!(
180 "ℹ️ [Factory] WebRTC not Support Shell item mark ,skip through "
181 );
182 }
183 } else {
184 tracing::warn!("⚠️ [Factory] WebRTC enabled but not Provide WebRtcCoordinator");
185 }
186 }
187
188 tracing::info!(
189 "✨ [Factory] as {:?} Create done {} Connect",
190 dest,
191 connections.len()
192 );
193
194 Ok(connections)
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use actr_protocol::ActrId;
202
203 #[test]
204 fn test_build_websocket_url() {
205 let config = DefaultWireBuilderConfig {
206 websocket_url_template: Some("ws://server:8080/actor/{actor_id}".to_string()),
207 enable_websocket: true,
208 enable_webrtc: false,
209 };
210
211 let factory = DefaultWireBuilder::new(None, config);
212
213 let mut actor_id = ActrId::default();
214 actor_id.serial_number = 12345;
215 let dest = Dest::Actor(actor_id);
216
217 let url = factory.build_websocket_url(&dest);
218 assert_eq!(url, Some("ws://server:8080/actor/12345".to_string()));
219 }
220
221 #[tokio::test]
222 async fn test_create_websocket_connection() {
223 use actr_protocol::ActrId;
224
225 let config = DefaultWireBuilderConfig {
226 websocket_url_template: Some("ws://localhost:8080".to_string()),
227 enable_websocket: true,
228 enable_webrtc: false,
229 };
230
231 let factory = DefaultWireBuilder::new(None, config);
232 let actor_id = ActrId::default();
234 let dest = Dest::actor(actor_id);
235
236 let connections = factory.create_connections(&dest).await.unwrap();
237 assert_eq!(connections.len(), 1);
238
239 if let WireHandle::WebSocket(_ws_conn) = &connections[0] {
240 } else {
242 panic!("Expected WebSocket connection");
243 }
244 }
245}