actr_runtime/outbound/inproc_out_gate.rs
1//! InprocOutGate - Inproc transport adapter (outbound)
2//!
3//! # Responsibilities
4//! - Wrap InprocTransportManager (zero serialization, direct RpcEnvelope passing)
5//! - Used for intra-process communication (e.g., Shell ↔ Workload)
6//! - Support PayloadType routing (default Reliable)
7
8use crate::transport::InprocTransportManager;
9use actr_framework::Bytes;
10use actr_protocol::{ActorResult, ActrId, PayloadType, ProtocolError, RpcEnvelope};
11use std::sync::Arc;
12
13/// InprocOutGate - Inproc transport adapter (outbound)
14///
15/// # Features
16/// - Zero serialization: directly pass `RpcEnvelope` objects
17/// - Zero copy: use mpsc channel for in-process passing
18/// - PayloadType routing: defaults to Reliable, can specify other types via extension methods
19/// - High performance: latency < 10μs
20pub struct InprocOutGate {
21 transport: Arc<InprocTransportManager>,
22}
23
24impl InprocOutGate {
25 /// Create new InprocOutGate
26 ///
27 /// # Arguments
28 /// - `transport`: InprocTransportManager instance
29 pub fn new(transport: Arc<InprocTransportManager>) -> Self {
30 Self { transport }
31 }
32
33 /// Send request and wait for response (with specified PayloadType and identifier)
34 ///
35 /// # Extension Method
36 /// Used for scenarios requiring non-default PayloadType
37 ///
38 /// # Arguments
39 /// - `_target`: Target ActorId (only for logging, not needed for intra-process communication)
40 /// - `payload_type`: PayloadType (Reliable, Signal, LatencyFirst, MediaTrack)
41 /// - `identifier`: Optional identifier (LatencyFirst needs channel_id, MediaTrack needs track_id)
42 /// - `envelope`: Message envelope
43 pub async fn send_request_with_type(
44 &self,
45 _target: &ActrId,
46 payload_type: PayloadType,
47 identifier: Option<String>,
48 envelope: RpcEnvelope,
49 ) -> ActorResult<Bytes> {
50 tracing::debug!(
51 "📤 InprocOutGate::send_request_with_type to {:?} (type={:?}, id={:?})",
52 _target,
53 payload_type,
54 identifier
55 );
56
57 self.transport
58 .send_request(payload_type, identifier, envelope)
59 .await
60 .map_err(|e| ProtocolError::TransportError(e.to_string()))
61 }
62
63 /// Send one-way message (with specified PayloadType and identifier)
64 ///
65 /// # Arguments
66 /// - `_target`: Target ActorId (only for logging, not needed for intra-process communication)
67 /// - `payload_type`: PayloadType
68 /// - `identifier`: Optional identifier
69 /// - `envelope`: Message envelope
70 pub async fn send_message_with_type(
71 &self,
72 _target: &ActrId,
73 payload_type: PayloadType,
74 identifier: Option<String>,
75 envelope: RpcEnvelope,
76 ) -> ActorResult<()> {
77 tracing::debug!(
78 "📤 InprocOutGate::send_message_with_type to {:?} (type={:?}, id={:?})",
79 _target,
80 payload_type,
81 identifier
82 );
83
84 self.transport
85 .send_message(payload_type, identifier, envelope)
86 .await
87 .map_err(|e| ProtocolError::TransportError(e.to_string()))
88 }
89
90 /// Send request and wait for response (defaults to Reliable)
91 ///
92 /// # Arguments
93 /// - `target`: Target ActorId (for logging only)
94 /// - `envelope`: Message envelope
95 ///
96 /// # Default behavior
97 /// Uses PayloadType::RpcReliable with no identifier
98 pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
99 tracing::info!(
100 "📤 InprocOutGate::send_request to {:?}, request_id={}",
101 target,
102 envelope.request_id
103 );
104
105 // Default to Reliable (no identifier)
106 let result = self
107 .transport
108 .send_request(PayloadType::RpcReliable, None, envelope)
109 .await
110 .map_err(|e| ProtocolError::TransportError(e.to_string()));
111
112 match &result {
113 Ok(_) => tracing::info!("✅ InprocOutGate::send_request completed successfully"),
114 Err(e) => tracing::error!("❌ InprocOutGate::send_request failed: {:?}", e),
115 }
116
117 result
118 }
119
120 /// Send one-way message (defaults to Reliable)
121 ///
122 /// # Arguments
123 /// - `target`: Target ActorId (for logging only)
124 /// - `envelope`: Message envelope
125 ///
126 /// # Default behavior
127 /// Uses PayloadType::RpcReliable with no identifier
128 pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
129 tracing::debug!("InprocOutGate::send_message to {:?}", target);
130
131 // Default to Reliable (no identifier)
132 self.transport
133 .send_message(PayloadType::RpcReliable, None, envelope)
134 .await
135 .map_err(|e| ProtocolError::TransportError(e.to_string()))
136 }
137
138 /// Send DataStream (Fast Path)
139 ///
140 /// # Arguments
141 /// - `_target`: Target ActorId (for logging only, not needed for intra-process)
142 /// - `payload_type`: PayloadType (StreamReliable or StreamLatencyFirst)
143 /// - `data`: Serialized DataStream bytes
144 ///
145 /// # Note
146 /// For inproc, DataStream is sent via LatencyFirst channel with stream_id as identifier
147 pub async fn send_data_stream(
148 &self,
149 _target: &ActrId,
150 payload_type: PayloadType,
151 data: Bytes,
152 ) -> ActorResult<()> {
153 use actr_protocol::prost::Message as ProstMessage;
154
155 // Deserialize to get stream_id
156 let stream = actr_protocol::DataStream::decode(&*data)
157 .map_err(|e| ProtocolError::DecodeError(format!("Failed to decode DataStream: {e}")))?;
158
159 tracing::debug!(
160 "📤 InprocOutGate::send_data_stream stream_id={}, sequence={}",
161 stream.stream_id,
162 stream.sequence
163 );
164
165 // Wrap in RpcEnvelope for transport
166 let envelope = RpcEnvelope {
167 route_key: "fast_path.data_stream".to_string(),
168 payload: Some(data),
169 error: None,
170 trace_id: uuid::Uuid::new_v4().to_string(),
171 request_id: uuid::Uuid::new_v4().to_string(),
172 metadata: vec![],
173 timeout_ms: 0,
174 };
175
176 self.transport
177 .send_message(payload_type, Some(stream.stream_id), envelope)
178 .await
179 .map_err(|e| ProtocolError::TransportError(e.to_string()))
180 }
181}