actr_runtime/transport/
lane.rs

1//! DataLane - Business data transport channel
2//!
3//! DataLane is the core abstraction of the transport layer for message/data transmission.
4//! Note: MediaTrack uses a separate MediaFrameRegistry path, not DataLane.
5//!
6//! ## Design Philosophy
7//!
8//! ```text
9//! DataLane features:
10//!   ✓ enum type with 3 variants (WebRtcDataChannel, Mpsc, WebSocket)
11//!   ✓ Unified send/recv API for data messages
12//!   ✓ Cloneable (uses Arc internally for sharing)
13//!   ✓ Multi-consumer pattern (shared receive channel)
14//! ```
15
16use super::error::{NetworkError, NetworkResult};
17use actr_protocol::PayloadType;
18use futures_util::SinkExt;
19use futures_util::stream::SplitSink;
20use std::sync::Arc;
21use tokio::net::TcpStream;
22use tokio::sync::{Mutex, mpsc};
23use tokio_tungstenite::tungstenite::Message as WsMessage;
24use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
25use webrtc::data_channel::RTCDataChannel;
26
27/// Type alias for WebSocket sink (shared across all PayloadTypes)
28type WsSink = Arc<Mutex<Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>>>>;
29
30/// DataLane - Data transport channel
31///
32/// Each DataLane represents a specific transport path for data/message transmission.
33/// MediaTrack uses a separate path via MediaFrameRegistry, not DataLane.
34#[derive(Clone)]
35pub enum DataLane {
36    /// WebRTC DataChannel Lane
37    ///
38    /// For transmitting messages via WebRTC DataChannel
39    WebRtcDataChannel {
40        /// Underlying DataChannel
41        data_channel: Arc<RTCDataChannel>,
42
43        /// Receive channel (shared, uses Bytes for zero-copy)
44        rx: Arc<Mutex<mpsc::Receiver<bytes::Bytes>>>,
45    },
46
47    /// Mpsc Lane
48    ///
49    /// For intra-process communication (Inproc transport)
50    ///
51    /// Note: directly passes RpcEnvelope objects, zero serialization
52    Mpsc {
53        /// PayloadType identifier
54        payload_type: PayloadType,
55
56        /// Send channel (directly passes RpcEnvelope)
57        tx: mpsc::Sender<actr_protocol::RpcEnvelope>,
58
59        /// Receive channel (shared)
60        rx: Arc<Mutex<mpsc::Receiver<actr_protocol::RpcEnvelope>>>,
61    },
62
63    /// WebSocket Lane
64    ///
65    /// For business data transmission in C/S architecture
66    WebSocket {
67        /// Shared Sink (all PayloadTypes share the same WebSocket connection)
68        /// Uses Option to support lazy initialization
69        sink: WsSink,
70
71        /// PayloadType identifier (used to add message header when sending)
72        payload_type: PayloadType,
73
74        /// Receive channel (independent, routed by dispatcher, uses Bytes for zero-copy)
75        rx: Arc<Mutex<mpsc::Receiver<bytes::Bytes>>>,
76    },
77}
78
79impl DataLane {
80    /// Send message
81    ///
82    /// # Arguments
83    /// - `data`: message data (uses Bytes for zero-copy)
84    ///
85    /// # Example
86    ///
87    /// ```rust,ignore
88    /// use bytes::Bytes;
89    /// data_lane.send(Bytes::from_static(b"hello")).await?;
90    /// ```
91    pub async fn send(&self, data: bytes::Bytes) -> NetworkResult<()> {
92        match self {
93            DataLane::WebRtcDataChannel { data_channel, .. } => {
94                use webrtc::data_channel::data_channel_state::RTCDataChannelState;
95
96                // Wait for DataChannel to open (max 5 seconds)
97                let start = tokio::time::Instant::now();
98                loop {
99                    let state = data_channel.ready_state();
100                    if state == RTCDataChannelState::Open {
101                        break;
102                    }
103                    if state == RTCDataChannelState::Closed || state == RTCDataChannelState::Closing
104                    {
105                        return Err(NetworkError::DataChannelError(format!(
106                            "DataChannel closed: {state:?}"
107                        )));
108                    }
109                    if start.elapsed() > std::time::Duration::from_secs(5) {
110                        return Err(NetworkError::DataChannelError(format!(
111                            "DataChannel open timeout: {state:?}"
112                        )));
113                    }
114                    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
115                }
116                tracing::debug!("🔄 WebRTC DataChannel send");
117                // Zero-copy: directly use the passed Bytes
118                data_channel
119                    .send(&data)
120                    .await
121                    .map_err(|e| NetworkError::DataChannelError(format!("Send failed: {e}")))?;
122
123                tracing::trace!("📤 WebRTC DataChannel sent {} bytes", data.len());
124                Ok(())
125            }
126
127            DataLane::Mpsc { .. } => {
128                // Mpsc DataLane should use send_envelope() instead of send(bytes)
129                Err(NetworkError::InvalidOperation(
130                    "Mpsc DataLane requires send_envelope(), not send(bytes)".to_string(),
131                ))
132            }
133
134            DataLane::WebSocket {
135                sink, payload_type, ..
136            } => {
137                // 1. Encapsulate message (add PayloadType header)
138                let mut buf = Vec::with_capacity(5 + data.len());
139
140                // 1 byte: payload_type
141                buf.push(*payload_type as u8);
142
143                // 4 bytes: data length (big-endian)
144                let len = data.len() as u32;
145                buf.extend_from_slice(&len.to_be_bytes());
146
147                // N bytes: data (copy from Bytes to Vec)
148                buf.extend_from_slice(&data);
149
150                // 2. Send to WebSocket
151                let mut sink_opt = sink.lock().await;
152                if let Some(s) = sink_opt.as_mut() {
153                    s.send(WsMessage::Binary(buf.into())).await.map_err(|e| {
154                        NetworkError::SendError(format!("WebSocket send failed: {e}"))
155                    })?;
156
157                    tracing::trace!(
158                        "📤 WebSocket sent {} bytes (type={:?})",
159                        data.len(),
160                        payload_type
161                    );
162                    Ok(())
163                } else {
164                    Err(NetworkError::ConnectionError(
165                        "WebSocket not connected".to_string(),
166                    ))
167                }
168            }
169        }
170    }
171
172    /// Send RpcEnvelope (Inproc only, zero serialization)
173    ///
174    /// # Arguments
175    /// - `envelope`: RpcEnvelope object
176    ///
177    /// # Description
178    /// This method is only for `DataLane::Mpsc`, directly passing RpcEnvelope objects,
179    /// without serialization/deserialization, achieving zero-copy intra-process communication.
180    ///
181    /// # Example
182    ///
183    /// ```rust,ignore
184    /// use actr_protocol::RpcEnvelope;
185    /// let envelope = RpcEnvelope { /* ... */ };
186    /// data_lane.send_envelope(envelope).await?;
187    /// ```
188    #[cfg_attr(
189        feature = "opentelemetry",
190        tracing::instrument(skip_all, name = "DataLane.send_envelope")
191    )]
192    pub async fn send_envelope(&self, envelope: actr_protocol::RpcEnvelope) -> NetworkResult<()> {
193        match self {
194            DataLane::Mpsc { tx, .. } => {
195                tx.send(envelope)
196                    .await
197                    .map_err(|_| NetworkError::ChannelClosed("Mpsc channel closed".to_string()))?;
198
199                tracing::trace!("📤 Mpsc sent RpcEnvelope");
200                Ok(())
201            }
202            _ => Err(NetworkError::InvalidOperation(
203                "send_envelope() only supports Mpsc DataLane".to_string(),
204            )),
205        }
206    }
207
208    /// Receive message
209    ///
210    /// Blocks until a message is received or the channel is closed.
211    ///
212    /// # Returns
213    /// - `Ok(Bytes)`: received message data (zero-copy)
214    /// - `Err`: channel closed or other error
215    ///
216    /// # Example
217    ///
218    /// ```rust,ignore
219    /// let data = data_lane.recv().await?;
220    /// println!("Received {} bytes", data.len());
221    /// ```
222    pub async fn recv(&self) -> NetworkResult<bytes::Bytes> {
223        match self {
224            DataLane::WebRtcDataChannel { rx, .. } | DataLane::WebSocket { rx, .. } => {
225                let mut receiver = rx.lock().await;
226                tracing::debug!("🔄 WebRTC DataLane recv: {:?}", receiver);
227                receiver.recv().await.ok_or_else(|| {
228                    NetworkError::ChannelClosed("DataLane receiver closed".to_string())
229                })
230            }
231            DataLane::Mpsc { .. } => {
232                // Mpsc DataLane should use recv_envelope() instead of recv()
233                Err(NetworkError::InvalidOperation(
234                    "Mpsc DataLane requires recv_envelope(), not recv()".to_string(),
235                ))
236            }
237        }
238    }
239
240    /// Receive RpcEnvelope (Inproc only)
241    ///
242    /// # Returns
243    /// - `Ok(RpcEnvelope)`: received message object
244    /// - `Err`: channel closed
245    ///
246    /// # Description
247    /// This method is only for `DataLane::Mpsc`, directly receiving RpcEnvelope objects, zero-copy.
248    pub async fn recv_envelope(&self) -> NetworkResult<actr_protocol::RpcEnvelope> {
249        match self {
250            DataLane::Mpsc { rx, .. } => {
251                let mut receiver = rx.lock().await;
252                receiver
253                    .recv()
254                    .await
255                    .ok_or_else(|| NetworkError::ChannelClosed("Mpsc channel closed".to_string()))
256            }
257            _ => Err(NetworkError::InvalidOperation(
258                "recv_envelope() only supports Mpsc DataLane".to_string(),
259            )),
260        }
261    }
262
263    /// Try to receive message (non-blocking)
264    ///
265    /// # Returns
266    /// - `Ok(Some(data))`: received message (zero-copy)
267    /// - `Ok(None)`: no message available
268    /// - `Err`: channel closed or other error
269    pub async fn try_recv(&self) -> NetworkResult<Option<bytes::Bytes>> {
270        match self {
271            DataLane::WebRtcDataChannel { rx, .. } | DataLane::WebSocket { rx, .. } => {
272                let mut receiver = rx.lock().await;
273                match receiver.try_recv() {
274                    Ok(data) => Ok(Some(data)),
275                    Err(mpsc::error::TryRecvError::Empty) => Ok(None),
276                    Err(mpsc::error::TryRecvError::Disconnected) => Err(
277                        NetworkError::ChannelClosed("Lane receiver closed".to_string()),
278                    ),
279                }
280            }
281            DataLane::Mpsc { .. } => {
282                // Mpsc Lane should use try_recv_envelope()
283                Err(NetworkError::InvalidOperation(
284                    "Mpsc Lane requires try_recv_envelope(), not try_recv()".to_string(),
285                ))
286            }
287        }
288    }
289
290    /// Get DataLane type name (for logging)
291    #[inline]
292    pub fn lane_type(&self) -> &'static str {
293        match self {
294            DataLane::WebRtcDataChannel { .. } => "WebRtcDataChannel",
295            DataLane::Mpsc { .. } => "Mpsc",
296            DataLane::WebSocket { .. } => "WebSocket",
297        }
298    }
299}
300
301impl std::fmt::Debug for DataLane {
302    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303        match self {
304            DataLane::WebRtcDataChannel { .. } => write!(f, "DataLane::WebRtcDataChannel(..)"),
305            DataLane::Mpsc { .. } => write!(f, "DataLane::Mpsc(..)"),
306            DataLane::WebSocket { payload_type, .. } => {
307                write!(f, "DataLane::WebSocket(type={payload_type:?})")
308            }
309        }
310    }
311}
312
313/// DataLane factory methods
314impl DataLane {
315    /// Create Mpsc DataLane (accepts plain Receiver)
316    ///
317    /// # Arguments
318    /// - `payload_type`: PayloadType identifier
319    /// - `tx`: send channel (directly passes RpcEnvelope)
320    /// - `rx`: receive channel (automatically wrapped in Arc<Mutex<>>)
321    #[inline]
322    pub fn mpsc(
323        payload_type: PayloadType,
324        tx: mpsc::Sender<actr_protocol::RpcEnvelope>,
325        rx: mpsc::Receiver<actr_protocol::RpcEnvelope>,
326    ) -> Self {
327        DataLane::Mpsc {
328            payload_type,
329            tx,
330            rx: Arc::new(Mutex::new(rx)),
331        }
332    }
333
334    /// Create Mpsc DataLane (accepts shared Receiver)
335    ///
336    /// # Arguments
337    /// - `payload_type`: PayloadType identifier
338    /// - `tx`: send channel (directly passes RpcEnvelope)
339    /// - `rx`: shared receive channel
340    #[inline]
341    pub fn mpsc_shared(
342        payload_type: PayloadType,
343        tx: mpsc::Sender<actr_protocol::RpcEnvelope>,
344        rx: Arc<Mutex<mpsc::Receiver<actr_protocol::RpcEnvelope>>>,
345    ) -> Self {
346        DataLane::Mpsc {
347            payload_type,
348            tx,
349            rx,
350        }
351    }
352
353    /// Create WebRTC DataChannel DataLane
354    ///
355    /// # Arguments
356    /// - `data_channel`: DataChannel reference
357    /// - `rx`: receive channel (Bytes zero-copy)
358    #[inline]
359    pub fn webrtc_data_channel(
360        data_channel: Arc<RTCDataChannel>,
361        rx: mpsc::Receiver<bytes::Bytes>,
362    ) -> Self {
363        DataLane::WebRtcDataChannel {
364            data_channel,
365            rx: Arc::new(Mutex::new(rx)),
366        }
367    }
368
369    /// Create WebSocket DataLane
370    ///
371    /// # Arguments
372    /// - `sink`: shared WebSocket Sink (may not be connected yet, uses Option)
373    /// - `payload_type`: message type identifier
374    /// - `rx`: receive channel (Bytes zero-copy)
375    #[inline]
376    pub fn websocket(
377        sink: WsSink,
378        payload_type: PayloadType,
379        rx: mpsc::Receiver<bytes::Bytes>,
380    ) -> Self {
381        DataLane::WebSocket {
382            sink,
383            payload_type,
384            rx: Arc::new(Mutex::new(rx)),
385        }
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use bytes::Bytes;
393
394    #[tokio::test]
395    async fn test_mpsc_lane() {
396        use actr_protocol::RpcEnvelope;
397
398        let (tx, rx) = mpsc::channel(10);
399        let lane = DataLane::mpsc(PayloadType::RpcReliable, tx.clone(), rx);
400
401        // Send message (using RpcEnvelope)
402        let envelope = RpcEnvelope {
403            request_id: "test-1".to_string(),
404            route_key: "test.route".to_string(),
405            payload: Some(Bytes::from_static(b"hello")),
406            traceparent: None,
407            tracestate: None,
408            metadata: vec![],
409            timeout_ms: 30000,
410            error: None,
411        };
412        lane.send_envelope(envelope.clone()).await.unwrap();
413
414        // Receive message
415        let received = lane.recv_envelope().await.unwrap();
416        assert_eq!(received.request_id, "test-1");
417        assert_eq!(received.payload, Some(Bytes::from_static(b"hello")));
418    }
419
420    #[tokio::test]
421    async fn test_mpsc_lane_clone() {
422        use actr_protocol::RpcEnvelope;
423
424        let (tx, rx) = mpsc::channel(10);
425        let lane = DataLane::mpsc(PayloadType::RpcReliable, tx.clone(), rx);
426
427        // Clone lane
428        let lane2 = lane.clone();
429
430        // Send via lane
431        let envelope = RpcEnvelope {
432            request_id: "test-2".to_string(),
433            route_key: "test.route".to_string(),
434            payload: Some(Bytes::from_static(b"test")),
435            traceparent: None,
436            tracestate: None,
437            metadata: vec![],
438            timeout_ms: 30000,
439            error: None,
440        };
441        lane.send_envelope(envelope.clone()).await.unwrap();
442
443        // Receive via lane2
444        let received = lane2.recv_envelope().await.unwrap();
445        assert_eq!(received.request_id, "test-2");
446        assert_eq!(received.payload, Some(Bytes::from_static(b"test")));
447    }
448
449    #[tokio::test]
450    async fn test_mpsc_lane_with_shared_rx() {
451        use actr_protocol::RpcEnvelope;
452
453        let (tx, rx) = mpsc::channel(10);
454        let rx_shared = Arc::new(Mutex::new(rx));
455
456        // Use shared rx
457        let lane = DataLane::mpsc_shared(PayloadType::RpcReliable, tx.clone(), rx_shared.clone());
458
459        let envelope = RpcEnvelope {
460            request_id: "test-3".to_string(),
461            route_key: "test.route".to_string(),
462            payload: Some(Bytes::from_static(b"shared")),
463            traceparent: None,
464            tracestate: None,
465            metadata: vec![],
466            timeout_ms: 30000,
467            error: None,
468        };
469        lane.send_envelope(envelope.clone()).await.unwrap();
470
471        let received = lane.recv_envelope().await.unwrap();
472        assert_eq!(received.request_id, "test-3");
473        assert_eq!(received.payload, Some(Bytes::from_static(b"shared")));
474    }
475
476    #[test]
477    fn test_lane_type_name() {
478        let (tx, rx) = mpsc::channel(10);
479        let lane = DataLane::mpsc(PayloadType::RpcReliable, tx, rx);
480        assert_eq!(lane.lane_type(), "Mpsc");
481    }
482}