actr_runtime/outbound/
inproc_out_gate.rs

1//! InprocOutGate - Inproc transport adapter (outbound)
2//!
3//! # Responsibilities
4//! - Wrap InprocTransportManager (zero serialization, direct RpcEnvelope passing)
5//! - Used for intra-process communication (e.g., Shell ↔ Workload)
6//! - Support PayloadType routing (default Reliable)
7
8use crate::transport::InprocTransportManager;
9use actr_framework::Bytes;
10use actr_protocol::{ActorResult, ActrId, PayloadType, ProtocolError, RpcEnvelope};
11use std::sync::Arc;
12
13/// InprocOutGate - Inproc transport adapter (outbound)
14///
15/// # Features
16/// - Zero serialization: directly pass `RpcEnvelope` objects
17/// - Zero copy: use mpsc channel for in-process passing
18/// - PayloadType routing: defaults to Reliable, can specify other types via extension methods
19/// - High performance: latency < 10μs
20pub struct InprocOutGate {
21    transport: Arc<InprocTransportManager>,
22}
23
24impl InprocOutGate {
25    /// Create new InprocOutGate
26    ///
27    /// # Arguments
28    /// - `transport`: InprocTransportManager instance
29    pub fn new(transport: Arc<InprocTransportManager>) -> Self {
30        Self { transport }
31    }
32
33    /// Send request and wait for response (with specified PayloadType and identifier)
34    ///
35    /// # Extension Method
36    /// Used for scenarios requiring non-default PayloadType
37    ///
38    /// # Arguments
39    /// - `_target`: Target ActorId (only for logging, not needed for intra-process communication)
40    /// - `payload_type`: PayloadType (Reliable, Signal, LatencyFirst, MediaTrack)
41    /// - `identifier`: Optional identifier (LatencyFirst needs channel_id, MediaTrack needs track_id)
42    /// - `envelope`: Message envelope
43    pub async fn send_request_with_type(
44        &self,
45        _target: &ActrId,
46        payload_type: PayloadType,
47        identifier: Option<String>,
48        envelope: RpcEnvelope,
49    ) -> ActorResult<Bytes> {
50        tracing::debug!(
51            "📤 InprocOutGate::send_request_with_type to {:?} (type={:?}, id={:?})",
52            _target,
53            payload_type,
54            identifier
55        );
56
57        self.transport
58            .send_request(payload_type, identifier, envelope)
59            .await
60            .map_err(|e| ProtocolError::TransportError(e.to_string()))
61    }
62
63    /// Send one-way message (with specified PayloadType and identifier)
64    ///
65    /// # Arguments
66    /// - `_target`: Target ActorId (only for logging, not needed for intra-process communication)
67    /// - `payload_type`: PayloadType
68    /// - `identifier`: Optional identifier
69    /// - `envelope`: Message envelope
70    pub async fn send_message_with_type(
71        &self,
72        _target: &ActrId,
73        payload_type: PayloadType,
74        identifier: Option<String>,
75        envelope: RpcEnvelope,
76    ) -> ActorResult<()> {
77        tracing::debug!(
78            "📤 InprocOutGate::send_message_with_type to {:?} (type={:?}, id={:?})",
79            _target,
80            payload_type,
81            identifier
82        );
83
84        self.transport
85            .send_message(payload_type, identifier, envelope)
86            .await
87            .map_err(|e| ProtocolError::TransportError(e.to_string()))
88    }
89
90    /// Send request and wait for response (defaults to Reliable)
91    ///
92    /// # Arguments
93    /// - `target`: Target ActorId (for logging only)
94    /// - `envelope`: Message envelope
95    ///
96    /// # Default behavior
97    /// Uses PayloadType::RpcReliable with no identifier
98    pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
99        tracing::info!(
100            "📤 InprocOutGate::send_request to {:?}, request_id={}",
101            target,
102            envelope.request_id
103        );
104
105        // Default to Reliable (no identifier)
106        let result = self
107            .transport
108            .send_request(PayloadType::RpcReliable, None, envelope)
109            .await
110            .map_err(|e| ProtocolError::TransportError(e.to_string()));
111
112        match &result {
113            Ok(_) => tracing::info!("✅ InprocOutGate::send_request completed successfully"),
114            Err(e) => tracing::error!("❌ InprocOutGate::send_request failed: {:?}", e),
115        }
116
117        result
118    }
119
120    /// Send one-way message (defaults to Reliable)
121    ///
122    /// # Arguments
123    /// - `target`: Target ActorId (for logging only)
124    /// - `envelope`: Message envelope
125    ///
126    /// # Default behavior
127    /// Uses PayloadType::RpcReliable with no identifier
128    pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
129        tracing::debug!("InprocOutGate::send_message to {:?}", target);
130
131        // Default to Reliable (no identifier)
132        self.transport
133            .send_message(PayloadType::RpcReliable, None, envelope)
134            .await
135            .map_err(|e| ProtocolError::TransportError(e.to_string()))
136    }
137
138    /// Send DataStream (Fast Path)
139    ///
140    /// # Arguments
141    /// - `_target`: Target ActorId (for logging only, not needed for intra-process)
142    /// - `payload_type`: PayloadType (StreamReliable or StreamLatencyFirst)
143    /// - `data`: Serialized DataStream bytes
144    ///
145    /// # Note
146    /// For inproc, DataStream is sent via LatencyFirst channel with stream_id as identifier
147    pub async fn send_data_stream(
148        &self,
149        _target: &ActrId,
150        payload_type: PayloadType,
151        data: Bytes,
152    ) -> ActorResult<()> {
153        use actr_protocol::prost::Message as ProstMessage;
154
155        // Deserialize to get stream_id
156        let stream = actr_protocol::DataStream::decode(&*data)
157            .map_err(|e| ProtocolError::DecodeError(format!("Failed to decode DataStream: {e}")))?;
158
159        tracing::debug!(
160            "📤 InprocOutGate::send_data_stream stream_id={}, sequence={}",
161            stream.stream_id,
162            stream.sequence
163        );
164
165        // Wrap in RpcEnvelope for transport
166        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
167        let mut envelope = RpcEnvelope {
168            route_key: "fast_path.data_stream".to_string(),
169            payload: Some(data),
170            error: None,
171            traceparent: None,
172            tracestate: None,
173            request_id: uuid::Uuid::new_v4().to_string(),
174            metadata: vec![],
175            timeout_ms: 0,
176        };
177        // Inject tracing context
178        #[cfg(feature = "opentelemetry")]
179        {
180            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
181            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
182        }
183
184        self.transport
185            .send_message(payload_type, Some(stream.stream_id), envelope)
186            .await
187            .map_err(|e| ProtocolError::TransportError(e.to_string()))
188    }
189}