actr_runtime/transport/
wire_builder.rs

1//! WireBuilder - Wire layer component builder
2//!
3//! Provides default Wire component builder implementation, supporting:
4//! - WebRTC P2P connections (through WebRtcCoordinator)
5//! - WebSocket C/S connections
6//! - CancellationToken for terminating in-progress connection creation
7
8use super::Dest; // Re-exported from actr-framework
9use super::error::{NetworkError, NetworkResult};
10use super::manager::WireBuilder;
11use super::wire_handle::WireHandle;
12use crate::wire::webrtc::WebRtcCoordinator;
13use crate::wire::websocket::WebSocketConnection;
14use async_trait::async_trait;
15use std::sync::Arc;
16use tokio_util::sync::CancellationToken;
17
18/// Default Wire builder configuration
19pub struct DefaultWireBuilderConfig {
20    /// WebSocket server URL template (can contain {actor_id} placeholder for dynamic substitution)
21    pub websocket_url_template: Option<String>,
22
23    /// Enable WebRTC
24    pub enable_webrtc: bool,
25
26    /// Enable WebSocket
27    pub enable_websocket: bool,
28}
29
30impl Default for DefaultWireBuilderConfig {
31    fn default() -> Self {
32        Self {
33            websocket_url_template: None,
34            enable_webrtc: true,
35            enable_websocket: false, // WebSocket disabled by default (requires URL configuration)
36        }
37    }
38}
39
40/// default Wire construct build device
41///
42/// based onconfigurationCreate WebRTC and/or WebSocket Wire group file 。
43/// Supportsaturatedand format Connect( same temporal attempt try multiple typeConnectType)。
44pub struct DefaultWireBuilder {
45    /// WebRTC coordinator(optional)
46    webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
47
48    /// WebSocket URL vague template
49    websocket_url_template: Option<String>,
50
51    /// configuration
52    config: DefaultWireBuilderConfig,
53}
54
55impl DefaultWireBuilder {
56    /// Create new Wire construct build device
57    ///
58    /// # Arguments
59    /// - `webrtc_coordinator`: WebRTC coordinator(If start usage WebRTC)
60    /// - `config`: construct build device configuration
61    pub fn new(
62        webrtc_coordinator: Option<Arc<WebRtcCoordinator>>,
63        config: DefaultWireBuilderConfig,
64    ) -> Self {
65        Self {
66            webrtc_coordinator,
67            websocket_url_template: config.websocket_url_template.clone(),
68            config,
69        }
70    }
71
72    /// Build WebSocket URL from template
73    fn build_websocket_url(&self, dest: &Dest) -> Option<String> {
74        let template = self.websocket_url_template.as_ref()?;
75
76        match dest {
77            Dest::Actor(actor_id) => {
78                // Replace {actor_id} placeholder with serial_number
79                let url = template.replace("{actor_id}", &actor_id.serial_number.to_string());
80                Some(url)
81            }
82            Dest::Shell | Dest::Local => {
83                // Local/Shell calls don't need network connections (should be short-circuited at upper layer)
84                // Return None for type completeness
85                None
86            }
87        }
88    }
89}
90
91#[async_trait]
92impl WireBuilder for DefaultWireBuilder {
93    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
94    async fn create_connections(&self, dest: &Dest) -> NetworkResult<Vec<WireHandle>> {
95        // Delegate to method with no cancel token
96        self.create_connections_with_cancel(dest, None).await
97    }
98
99    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
100    async fn create_connections_with_cancel(
101        &self,
102        dest: &Dest,
103        cancel_token: Option<CancellationToken>,
104    ) -> NetworkResult<Vec<WireHandle>> {
105        let mut connections = Vec::new();
106
107        // Helper to check cancellation
108        let check_cancelled = |token: &Option<CancellationToken>| -> NetworkResult<()> {
109            if let Some(t) = token {
110                if t.is_cancelled() {
111                    return Err(NetworkError::ConnectionClosed(
112                        "Connection creation cancelled".to_string(),
113                    ));
114                }
115            }
116            Ok(())
117        };
118
119        // 1. Check if already cancelled
120        check_cancelled(&cancel_token)?;
121
122        // 2. attempt try Create WebSocket Connect
123        if self.config.enable_websocket {
124            check_cancelled(&cancel_token)?;
125
126            if let Some(url) = self.build_websocket_url(dest) {
127                tracing::debug!("🏭 [Factory] Create WebSocket Connect: {}", url);
128                let ws_conn = WebSocketConnection::new(url);
129                connections.push(WireHandle::WebSocket(ws_conn));
130            } else {
131                tracing::warn!(
132                    "⚠️ [Factory] WebSocket enabled but no method construct build URL: {:?}",
133                    dest
134                );
135            }
136        }
137
138        // 3. Check cancellation before WebRTC
139        check_cancelled(&cancel_token)?;
140
141        // 4. attempt try Create WebRTC Connect
142        if self.config.enable_webrtc {
143            if let Some(coordinator) = &self.webrtc_coordinator {
144                // WebRTC merely Support Actor Type
145                if dest.is_actor() {
146                    tracing::debug!("🏭 [Factory] Create WebRTC Connectto: {:?}", dest);
147
148                    // Check cancellation before long-running operation
149                    check_cancelled(&cancel_token)?;
150
151                    match coordinator
152                        .create_connection(dest, cancel_token.clone())
153                        .await
154                    {
155                        Ok(webrtc_conn) => {
156                            // Check cancellation after creation
157                            if let Err(e) = check_cancelled(&cancel_token) {
158                                // Clean up newly created connection
159                                if let Err(close_err) = webrtc_conn.close().await {
160                                    tracing::warn!(
161                                        "⚠️ [Factory] Failed to close cancelled connection: {}",
162                                        close_err
163                                    );
164                                }
165                                return Err(e);
166                            }
167                            connections.push(WireHandle::WebRTC(webrtc_conn));
168                        }
169                        Err(e) => {
170                            tracing::warn!(
171                                "❌ [Factory] WebRTC ConnectCreatefailure: {:?}: {}",
172                                dest,
173                                e
174                            );
175                            // not ReturnsError,allowusingotherConnectType
176                        }
177                    }
178                } else {
179                    tracing::debug!(
180                        "ℹ️ [Factory] WebRTC not Support Shell item mark ,skip through "
181                    );
182                }
183            } else {
184                tracing::warn!("⚠️ [Factory] WebRTC enabled but not Provide WebRtcCoordinator");
185            }
186        }
187
188        tracing::info!(
189            "✨ [Factory] as {:?} Create done {} Connect",
190            dest,
191            connections.len()
192        );
193
194        Ok(connections)
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use actr_protocol::ActrId;
202
203    #[test]
204    fn test_build_websocket_url() {
205        let config = DefaultWireBuilderConfig {
206            websocket_url_template: Some("ws://server:8080/actor/{actor_id}".to_string()),
207            enable_websocket: true,
208            enable_webrtc: false,
209        };
210
211        let factory = DefaultWireBuilder::new(None, config);
212
213        let mut actor_id = ActrId::default();
214        actor_id.serial_number = 12345;
215        let dest = Dest::Actor(actor_id);
216
217        let url = factory.build_websocket_url(&dest);
218        assert_eq!(url, Some("ws://server:8080/actor/12345".to_string()));
219    }
220
221    #[tokio::test]
222    async fn test_create_websocket_connection() {
223        use actr_protocol::ActrId;
224
225        let config = DefaultWireBuilderConfig {
226            websocket_url_template: Some("ws://localhost:8080".to_string()),
227            enable_websocket: true,
228            enable_webrtc: false,
229        };
230
231        let factory = DefaultWireBuilder::new(None, config);
232        // Use Actor dest instead of Shell (Shell doesn't create network connections)
233        let actor_id = ActrId::default();
234        let dest = Dest::actor(actor_id);
235
236        let connections = factory.create_connections(&dest).await.unwrap();
237        assert_eq!(connections.len(), 1);
238
239        if let WireHandle::WebSocket(_ws_conn) = &connections[0] {
240            // WebSocket ConnectCreatesuccess
241        } else {
242            panic!("Expected WebSocket connection");
243        }
244    }
245}