actr_runtime/inbound/
inbound_packet_dispatcher.rs

1//! InboundPacketDispatcher - Inbound packet dispatcher
2//!
3//! # Responsibilities
4//! - Receive packets from three inbound paths:
5//!   - InprocChannel (intra-process)
6//!   - WebRtcCoordinator (WebRTC)
7//!   - WebSocketConnection (WebSocket)
8//! - Decode RpcEnvelope
9//! - Route to different handlers based on PayloadType:
10//!   - Signal/Reliable → Mailbox
11//!   - LatencyFirst → DataStreamRegistry
12//!   - MediaTrack → MediaFrameRegistry
13//! - For response messages, wake up OutprocOutGate.pending_requests
14
15use super::data_stream_registry::DataStreamRegistry;
16use crate::outbound::OutprocOutGate;
17use actr_mailbox::{Mailbox, MessagePriority};
18use actr_protocol::{ActrId, DataStream, PayloadType, RpcEnvelope};
19use std::sync::Arc;
20
21/// Inbound packet metadata
22///
23/// Packet contains essential information about incoming data packets for dispatch decisions
24#[derive(Debug, Clone)]
25pub struct InboundPacket {
26    /// PayloadType (determines dispatch path)
27    pub payload_type: PayloadType,
28
29    /// Message payload (serialized)
30    pub data: Vec<u8>,
31
32    /// Sender ActrId (Protobuf bytes)
33    pub from: Vec<u8>,
34}
35
36/// InboundPacketDispatcher - Inbound packet dispatcher
37///
38/// # Design Principles
39/// - **Single Responsibility**: Only responsible for routing inbound packets
40/// - **Zero-copy**: Pass bytes directly, no deserialization until needed
41/// - **Thread-safe**: All components wrapped in Arc
42/// - **Response matching**: Wake up pending_requests via OutprocOutGate
43///
44/// # Note on MediaTrack
45/// MediaTrack (PayloadType::MediaRtp) is NOT handled here because:
46/// - MediaTrack uses WebRTC native RTP channels, not DataChannel
47/// - Media frames are delivered directly via RTCTrackRemote callbacks
48/// - No protobuf serialization involved
49/// - MediaFrameRegistry is registered at WebRTC PeerConnection level
50pub struct InboundPacketDispatcher {
51    /// Mailbox (state path: Signal/Reliable)
52    mailbox: Arc<dyn Mailbox>,
53
54    /// DataStreamRegistry (fast path: LatencyFirst)
55    data_stream_registry: Arc<DataStreamRegistry>,
56
57    /// OutprocOutGate (for waking up pending_requests)
58    outproc_out_gate: Option<Arc<OutprocOutGate>>,
59}
60
61impl InboundPacketDispatcher {
62    /// Create new InboundPacketDispatcher
63    ///
64    /// # Arguments
65    /// - `mailbox`: Mailbox instance
66    /// - `data_stream_registry`: DataStream registry
67    /// - `outproc_out_gate`: OutprocOutGate instance (optional, for RPC response matching)
68    pub fn new(
69        mailbox: Arc<dyn Mailbox>,
70        data_stream_registry: Arc<DataStreamRegistry>,
71        outproc_out_gate: Option<Arc<OutprocOutGate>>,
72    ) -> Self {
73        Self {
74            mailbox,
75            data_stream_registry,
76            outproc_out_gate,
77        }
78    }
79
80    /// Dispatch inbound packet
81    ///
82    /// # Core Logic
83    /// 1. Check if this is an RPC response (has request_id)
84    /// 2. If response, wake up OutprocOutGate.pending_requests
85    /// 3. If request, route to appropriate handler by PayloadType
86    ///
87    /// # Arguments
88    /// - `packet`: Inbound packet
89    pub async fn dispatch(&self, packet: InboundPacket) {
90        tracing::debug!(
91            "📥 InboundPacketDispatcher::dispatch: payload_type={:?}, size={}",
92            packet.payload_type,
93            packet.data.len()
94        );
95
96        // Route based on PayloadType
97        match packet.payload_type {
98            PayloadType::RpcReliable | PayloadType::RpcSignal => {
99                // State path: enqueue to Mailbox (RpcEnvelope only)
100                self.dispatch_to_mailbox(packet).await;
101            }
102            PayloadType::StreamReliable | PayloadType::StreamLatencyFirst => {
103                // Fast path: DataStream (both reliable and low-latency)
104                self.dispatch_to_data_stream(packet).await;
105            }
106            PayloadType::MediaRtp => {
107                // MediaRtp packets should NOT arrive here!
108                // MediaTrack uses WebRTC native RTP channels (RTCTrackRemote),
109                // not DataChannel, so they bypass InboundPacketDispatcher entirely.
110                tracing::error!(
111                    "❌ MediaRtp packet received in DataChannel dispatcher! \
112                     This should never happen. MediaTrack frames are delivered \
113                     via RTCTrackRemote callbacks, not through InboundPacketDispatcher."
114                );
115            }
116        }
117    }
118
119    /// Dispatch to Mailbox (state path)
120    ///
121    /// # Design
122    /// - Signal → High Priority
123    /// - Reliable → Normal Priority
124    /// - Store raw bytes directly, no RpcEnvelope deserialization
125    async fn dispatch_to_mailbox(&self, packet: InboundPacket) {
126        let priority = match packet.payload_type {
127            PayloadType::RpcSignal => MessagePriority::High,
128            PayloadType::RpcReliable => MessagePriority::Normal,
129            PayloadType::StreamReliable
130            | PayloadType::StreamLatencyFirst
131            | PayloadType::MediaRtp => {
132                tracing::error!(
133                    "❌ Invalid PayloadType for Mailbox: {:?}",
134                    packet.payload_type
135                );
136                return;
137            }
138        };
139
140        match self
141            .mailbox
142            .enqueue(packet.from, packet.data, priority)
143            .await
144        {
145            Ok(msg_id) => {
146                tracing::debug!("✅ Packet enqueued: id={}, priority={:?}", msg_id, priority);
147            }
148            Err(e) => {
149                tracing::error!("❌ Failed to enqueue packet: {:?}", e);
150            }
151        }
152    }
153
154    /// Dispatch to DataStreamRegistry (fast path)
155    ///
156    /// # Design
157    /// - Decode DataStream
158    /// - Decode sender ActrId
159    /// - Invoke callback concurrently
160    async fn dispatch_to_data_stream(&self, packet: InboundPacket) {
161        use actr_protocol::prost::Message as ProstMessage;
162
163        // Decode DataStream
164        match DataStream::decode(&packet.data[..]) {
165            Ok(chunk) => {
166                tracing::debug!("📦 Dispatching DataStream: stream_id={}", chunk.stream_id);
167
168                // Decode sender ActrId
169                match ActrId::decode(&packet.from[..]) {
170                    Ok(sender_id) => {
171                        self.data_stream_registry.dispatch(chunk, sender_id).await;
172                    }
173                    Err(e) => {
174                        tracing::error!("❌ Failed to decode sender ActrId: {:?}", e);
175                    }
176                }
177            }
178            Err(e) => {
179                tracing::error!("❌ Failed to decode DataStream: {:?}", e);
180            }
181        }
182    }
183
184    /// Handle RPC response (wake up pending_requests)
185    ///
186    /// # Design
187    /// - Only handle if outproc_out_gate exists
188    /// - Decode RpcEnvelope, extract request_id
189    /// - Call OutprocOutGate::handle_response to wake up waiting request
190    ///
191    /// # Returns
192    /// - `true`: Successfully matched and woke up
193    /// - `false`: No matching pending request found
194    pub async fn handle_response(&self, envelope: RpcEnvelope) -> bool {
195        if let Some(outproc_out_gate) = &self.outproc_out_gate {
196            // Convert envelope to result (success or error)
197            let result = match (envelope.payload, envelope.error) {
198                (Some(payload), None) => Ok(payload),
199                (None, Some(error)) => Err(actr_protocol::ProtocolError::TransportError(format!(
200                    "RPC error {}: {}",
201                    error.code, error.message
202                ))),
203                _ => Err(actr_protocol::ProtocolError::DecodeError(
204                    "Invalid RpcEnvelope: payload and error fields inconsistent".to_string(),
205                )),
206            };
207
208            match outproc_out_gate
209                .handle_response(&envelope.request_id, result)
210                .await
211            {
212                Ok(matched) => matched,
213                Err(e) => {
214                    tracing::error!("❌ Failed to handle response: {:?}", e);
215                    false
216                }
217            }
218        } else {
219            tracing::warn!("⚠️ No OutprocOutGate available for response handling");
220            false
221        }
222    }
223}