actr_runtime/inbound/inbound_packet_dispatcher.rs
1//! InboundPacketDispatcher - Inbound packet dispatcher
2//!
3//! # Responsibilities
4//! - Receive packets from three inbound paths:
5//! - InprocChannel (intra-process)
6//! - WebRtcCoordinator (WebRTC)
7//! - WebSocketConnection (WebSocket)
8//! - Decode RpcEnvelope
9//! - Route to different handlers based on PayloadType:
10//! - Signal/Reliable → Mailbox
11//! - LatencyFirst → DataStreamRegistry
12//! - MediaTrack → MediaFrameRegistry
13//! - For response messages, wake up OutprocOutGate.pending_requests
14
15use super::data_stream_registry::DataStreamRegistry;
16use crate::outbound::OutprocOutGate;
17use actr_mailbox::{Mailbox, MessagePriority};
18use actr_protocol::{ActrId, DataStream, PayloadType, RpcEnvelope};
19use std::sync::Arc;
20
21/// Inbound packet metadata
22///
23/// Packet contains essential information about incoming data packets for dispatch decisions
24#[derive(Debug, Clone)]
25pub struct InboundPacket {
26 /// PayloadType (determines dispatch path)
27 pub payload_type: PayloadType,
28
29 /// Message payload (serialized)
30 pub data: Vec<u8>,
31
32 /// Sender ActrId (Protobuf bytes)
33 pub from: Vec<u8>,
34}
35
36/// InboundPacketDispatcher - Inbound packet dispatcher
37///
38/// # Design Principles
39/// - **Single Responsibility**: Only responsible for routing inbound packets
40/// - **Zero-copy**: Pass bytes directly, no deserialization until needed
41/// - **Thread-safe**: All components wrapped in Arc
42/// - **Response matching**: Wake up pending_requests via OutprocOutGate
43///
44/// # Note on MediaTrack
45/// MediaTrack (PayloadType::MediaRtp) is NOT handled here because:
46/// - MediaTrack uses WebRTC native RTP channels, not DataChannel
47/// - Media frames are delivered directly via RTCTrackRemote callbacks
48/// - No protobuf serialization involved
49/// - MediaFrameRegistry is registered at WebRTC PeerConnection level
50pub struct InboundPacketDispatcher {
51 /// Mailbox (state path: Signal/Reliable)
52 mailbox: Arc<dyn Mailbox>,
53
54 /// DataStreamRegistry (fast path: LatencyFirst)
55 data_stream_registry: Arc<DataStreamRegistry>,
56
57 /// OutprocOutGate (for waking up pending_requests)
58 outproc_out_gate: Option<Arc<OutprocOutGate>>,
59}
60
61impl InboundPacketDispatcher {
62 /// Create new InboundPacketDispatcher
63 ///
64 /// # Arguments
65 /// - `mailbox`: Mailbox instance
66 /// - `data_stream_registry`: DataStream registry
67 /// - `outproc_out_gate`: OutprocOutGate instance (optional, for RPC response matching)
68 pub fn new(
69 mailbox: Arc<dyn Mailbox>,
70 data_stream_registry: Arc<DataStreamRegistry>,
71 outproc_out_gate: Option<Arc<OutprocOutGate>>,
72 ) -> Self {
73 Self {
74 mailbox,
75 data_stream_registry,
76 outproc_out_gate,
77 }
78 }
79
80 /// Dispatch inbound packet
81 ///
82 /// # Core Logic
83 /// 1. Check if this is an RPC response (has request_id)
84 /// 2. If response, wake up OutprocOutGate.pending_requests
85 /// 3. If request, route to appropriate handler by PayloadType
86 ///
87 /// # Arguments
88 /// - `packet`: Inbound packet
89 pub async fn dispatch(&self, packet: InboundPacket) {
90 tracing::debug!(
91 "📥 InboundPacketDispatcher::dispatch: payload_type={:?}, size={}",
92 packet.payload_type,
93 packet.data.len()
94 );
95
96 // Route based on PayloadType
97 match packet.payload_type {
98 PayloadType::RpcReliable | PayloadType::RpcSignal => {
99 // State path: enqueue to Mailbox (RpcEnvelope only)
100 self.dispatch_to_mailbox(packet).await;
101 }
102 PayloadType::StreamReliable | PayloadType::StreamLatencyFirst => {
103 // Fast path: DataStream (both reliable and low-latency)
104 self.dispatch_to_data_stream(packet).await;
105 }
106 PayloadType::MediaRtp => {
107 // MediaRtp packets should NOT arrive here!
108 // MediaTrack uses WebRTC native RTP channels (RTCTrackRemote),
109 // not DataChannel, so they bypass InboundPacketDispatcher entirely.
110 tracing::error!(
111 "❌ MediaRtp packet received in DataChannel dispatcher! \
112 This should never happen. MediaTrack frames are delivered \
113 via RTCTrackRemote callbacks, not through InboundPacketDispatcher."
114 );
115 }
116 }
117 }
118
119 /// Dispatch to Mailbox (state path)
120 ///
121 /// # Design
122 /// - Signal → High Priority
123 /// - Reliable → Normal Priority
124 /// - Store raw bytes directly, no RpcEnvelope deserialization
125 async fn dispatch_to_mailbox(&self, packet: InboundPacket) {
126 let priority = match packet.payload_type {
127 PayloadType::RpcSignal => MessagePriority::High,
128 PayloadType::RpcReliable => MessagePriority::Normal,
129 PayloadType::StreamReliable
130 | PayloadType::StreamLatencyFirst
131 | PayloadType::MediaRtp => {
132 tracing::error!(
133 "❌ Invalid PayloadType for Mailbox: {:?}",
134 packet.payload_type
135 );
136 return;
137 }
138 };
139
140 match self
141 .mailbox
142 .enqueue(packet.from, packet.data, priority)
143 .await
144 {
145 Ok(msg_id) => {
146 tracing::debug!("✅ Packet enqueued: id={}, priority={:?}", msg_id, priority);
147 }
148 Err(e) => {
149 tracing::error!("❌ Failed to enqueue packet: {:?}", e);
150 }
151 }
152 }
153
154 /// Dispatch to DataStreamRegistry (fast path)
155 ///
156 /// # Design
157 /// - Decode DataStream
158 /// - Decode sender ActrId
159 /// - Invoke callback concurrently
160 async fn dispatch_to_data_stream(&self, packet: InboundPacket) {
161 use actr_protocol::prost::Message as ProstMessage;
162
163 // Decode DataStream
164 match DataStream::decode(&packet.data[..]) {
165 Ok(chunk) => {
166 tracing::debug!("📦 Dispatching DataStream: stream_id={}", chunk.stream_id);
167
168 // Decode sender ActrId
169 match ActrId::decode(&packet.from[..]) {
170 Ok(sender_id) => {
171 self.data_stream_registry.dispatch(chunk, sender_id).await;
172 }
173 Err(e) => {
174 tracing::error!("❌ Failed to decode sender ActrId: {:?}", e);
175 }
176 }
177 }
178 Err(e) => {
179 tracing::error!("❌ Failed to decode DataStream: {:?}", e);
180 }
181 }
182 }
183
184 /// Handle RPC response (wake up pending_requests)
185 ///
186 /// # Design
187 /// - Only handle if outproc_out_gate exists
188 /// - Decode RpcEnvelope, extract request_id
189 /// - Call OutprocOutGate::handle_response to wake up waiting request
190 ///
191 /// # Returns
192 /// - `true`: Successfully matched and woke up
193 /// - `false`: No matching pending request found
194 pub async fn handle_response(&self, envelope: RpcEnvelope) -> bool {
195 if let Some(outproc_out_gate) = &self.outproc_out_gate {
196 // Convert envelope to result (success or error)
197 let result = match (envelope.payload, envelope.error) {
198 (Some(payload), None) => Ok(payload),
199 (None, Some(error)) => Err(actr_protocol::ProtocolError::TransportError(format!(
200 "RPC error {}: {}",
201 error.code, error.message
202 ))),
203 _ => Err(actr_protocol::ProtocolError::DecodeError(
204 "Invalid RpcEnvelope: payload and error fields inconsistent".to_string(),
205 )),
206 };
207
208 match outproc_out_gate
209 .handle_response(&envelope.request_id, result)
210 .await
211 {
212 Ok(matched) => matched,
213 Err(e) => {
214 tracing::error!("❌ Failed to handle response: {:?}", e);
215 false
216 }
217 }
218 } else {
219 tracing::warn!("⚠️ No OutprocOutGate available for response handling");
220 false
221 }
222 }
223}