actr_runtime/outbound/
inproc_out_gate.rs

1//! InprocOutGate - Inproc transport adapter (outbound)
2//!
3//! # Responsibilities
4//! - Wrap InprocTransportManager (zero serialization, direct RpcEnvelope passing)
5//! - Used for intra-process communication (e.g., Shell ↔ Workload)
6//! - Support PayloadType routing (default Reliable)
7
8use crate::transport::InprocTransportManager;
9use actr_framework::Bytes;
10use actr_protocol::ActrIdExt;
11use actr_protocol::{ActorResult, ActrId, PayloadType, ProtocolError, RpcEnvelope};
12use std::sync::Arc;
13
14/// InprocOutGate - Inproc transport adapter (outbound)
15///
16/// # Features
17/// - Zero serialization: directly pass `RpcEnvelope` objects
18/// - Zero copy: use mpsc channel for in-process passing
19/// - PayloadType routing: defaults to Reliable, can specify other types via extension methods
20/// - High performance: latency < 10μs
21pub struct InprocOutGate {
22    transport: Arc<InprocTransportManager>,
23}
24
25impl InprocOutGate {
26    /// Create new InprocOutGate
27    ///
28    /// # Arguments
29    /// - `transport`: InprocTransportManager instance
30    pub fn new(transport: Arc<InprocTransportManager>) -> Self {
31        Self { transport }
32    }
33
34    /// Send request and wait for response (with specified PayloadType and identifier)
35    ///
36    /// # Extension Method
37    /// Used for scenarios requiring non-default PayloadType
38    ///
39    /// # Arguments
40    /// - `_target`: Target ActorId (only for logging, not needed for intra-process communication)
41    /// - `payload_type`: PayloadType (Reliable, Signal, LatencyFirst, MediaTrack)
42    /// - `identifier`: Optional identifier (LatencyFirst needs channel_id, MediaTrack needs track_id)
43    /// - `envelope`: Message envelope
44    pub async fn send_request_with_type(
45        &self,
46        _target: &ActrId,
47        payload_type: PayloadType,
48        identifier: Option<String>,
49        envelope: RpcEnvelope,
50    ) -> ActorResult<Bytes> {
51        tracing::debug!(
52            "📤 InprocOutGate::send_request_with_type to {:?} (type={:?}, id={:?})",
53            _target,
54            payload_type,
55            identifier
56        );
57
58        self.transport
59            .send_request(payload_type, identifier, envelope)
60            .await
61            .map_err(|e| ProtocolError::TransportError(e.to_string()))
62    }
63
64    /// Send one-way message (with specified PayloadType and identifier)
65    ///
66    /// # Arguments
67    /// - `_target`: Target ActorId (only for logging, not needed for intra-process communication)
68    /// - `payload_type`: PayloadType
69    /// - `identifier`: Optional identifier
70    /// - `envelope`: Message envelope
71    pub async fn send_message_with_type(
72        &self,
73        _target: &ActrId,
74        payload_type: PayloadType,
75        identifier: Option<String>,
76        envelope: RpcEnvelope,
77    ) -> ActorResult<()> {
78        tracing::debug!(
79            "📤 InprocOutGate::send_message_with_type to {:?} (type={:?}, id={:?})",
80            _target,
81            payload_type,
82            identifier
83        );
84
85        self.transport
86            .send_message(payload_type, identifier, envelope)
87            .await
88            .map_err(|e| ProtocolError::TransportError(e.to_string()))
89    }
90
91    /// Send request and wait for response (defaults to Reliable)
92    ///
93    /// # Arguments
94    /// - `target`: Target ActorId (for logging only)
95    /// - `envelope`: Message envelope
96    ///
97    /// # Default behavior
98    /// Uses PayloadType::RpcReliable with no identifier
99    #[cfg_attr(
100        feature = "opentelemetry",
101        tracing::instrument(skip_all, name = "InprocOutGate.send_request")
102    )]
103    pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
104        tracing::info!(
105            "📤 InprocOutGate::send_request to {:?}, request_id={}",
106            target,
107            envelope.request_id
108        );
109
110        // Default to Reliable (no identifier)
111        let result = self
112            .transport
113            .send_request(PayloadType::RpcReliable, None, envelope)
114            .await
115            .map_err(|e| ProtocolError::TransportError(e.to_string()));
116
117        match &result {
118            Ok(_) => tracing::info!("✅ InprocOutGate::send_request completed successfully"),
119            Err(e) => tracing::error!("❌ InprocOutGate::send_request failed: {:?}", e),
120        }
121
122        result
123    }
124
125    /// Send one-way message (defaults to Reliable)
126    ///
127    /// # Arguments
128    /// - `target`: Target ActorId (for logging only)
129    /// - `envelope`: Message envelope
130    ///
131    /// # Default behavior
132    /// Uses PayloadType::RpcReliable with no identifier
133    #[cfg_attr(
134        feature = "opentelemetry",
135        tracing::instrument(skip_all, name = "InprocOutGate.send_message", fields(target = ?target.to_string_repr()))
136    )]
137    pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
138        tracing::debug!(
139            "InprocOutGate::send_message to {:?}",
140            target.to_string_repr()
141        );
142
143        // Default to Reliable (no identifier)
144        self.transport
145            .send_message(PayloadType::RpcReliable, None, envelope)
146            .await
147            .map_err(|e| ProtocolError::TransportError(e.to_string()))
148    }
149
150    /// Send DataStream (Fast Path)
151    ///
152    /// # Arguments
153    /// - `_target`: Target ActorId (for logging only, not needed for intra-process)
154    /// - `payload_type`: PayloadType (StreamReliable or StreamLatencyFirst)
155    /// - `data`: Serialized DataStream bytes
156    ///
157    /// # Note
158    /// For inproc, DataStream is sent via LatencyFirst channel with stream_id as identifier
159    pub async fn send_data_stream(
160        &self,
161        _target: &ActrId,
162        payload_type: PayloadType,
163        data: Bytes,
164    ) -> ActorResult<()> {
165        use actr_protocol::prost::Message as ProstMessage;
166
167        // Deserialize to get stream_id
168        let stream = actr_protocol::DataStream::decode(&*data)
169            .map_err(|e| ProtocolError::DecodeError(format!("Failed to decode DataStream: {e}")))?;
170
171        tracing::debug!(
172            "📤 InprocOutGate::send_data_stream stream_id={}, sequence={}",
173            stream.stream_id,
174            stream.sequence
175        );
176
177        // Wrap in RpcEnvelope for transport
178        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
179        let mut envelope = RpcEnvelope {
180            route_key: "fast_path.data_stream".to_string(),
181            payload: Some(data),
182            error: None,
183            traceparent: None,
184            tracestate: None,
185            request_id: uuid::Uuid::new_v4().to_string(),
186            metadata: vec![],
187            timeout_ms: 0,
188        };
189        // Inject tracing context
190        #[cfg(feature = "opentelemetry")]
191        {
192            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
193            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
194        }
195
196        self.transport
197            .send_message(payload_type, Some(stream.stream_id), envelope)
198            .await
199            .map_err(|e| ProtocolError::TransportError(e.to_string()))
200    }
201}