actr_runtime/outbound/
inproc_out_gate.rs1use crate::transport::InprocTransportManager;
9use actr_framework::Bytes;
10use actr_protocol::ActrIdExt;
11use actr_protocol::{ActorResult, ActrId, PayloadType, ProtocolError, RpcEnvelope};
12use std::sync::Arc;
13
14pub struct InprocOutGate {
22 transport: Arc<InprocTransportManager>,
23}
24
25impl InprocOutGate {
26 pub fn new(transport: Arc<InprocTransportManager>) -> Self {
31 Self { transport }
32 }
33
34 pub async fn send_request_with_type(
45 &self,
46 _target: &ActrId,
47 payload_type: PayloadType,
48 identifier: Option<String>,
49 envelope: RpcEnvelope,
50 ) -> ActorResult<Bytes> {
51 tracing::debug!(
52 "📤 InprocOutGate::send_request_with_type to {:?} (type={:?}, id={:?})",
53 _target,
54 payload_type,
55 identifier
56 );
57
58 self.transport
59 .send_request(payload_type, identifier, envelope)
60 .await
61 .map_err(|e| ProtocolError::TransportError(e.to_string()))
62 }
63
64 pub async fn send_message_with_type(
72 &self,
73 _target: &ActrId,
74 payload_type: PayloadType,
75 identifier: Option<String>,
76 envelope: RpcEnvelope,
77 ) -> ActorResult<()> {
78 tracing::debug!(
79 "📤 InprocOutGate::send_message_with_type to {:?} (type={:?}, id={:?})",
80 _target,
81 payload_type,
82 identifier
83 );
84
85 self.transport
86 .send_message(payload_type, identifier, envelope)
87 .await
88 .map_err(|e| ProtocolError::TransportError(e.to_string()))
89 }
90
91 #[cfg_attr(
100 feature = "opentelemetry",
101 tracing::instrument(skip_all, name = "InprocOutGate.send_request")
102 )]
103 pub async fn send_request(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<Bytes> {
104 tracing::info!(
105 "📤 InprocOutGate::send_request to {:?}, request_id={}",
106 target,
107 envelope.request_id
108 );
109
110 let result = self
112 .transport
113 .send_request(PayloadType::RpcReliable, None, envelope)
114 .await
115 .map_err(|e| ProtocolError::TransportError(e.to_string()));
116
117 match &result {
118 Ok(_) => tracing::info!("✅ InprocOutGate::send_request completed successfully"),
119 Err(e) => tracing::error!("❌ InprocOutGate::send_request failed: {:?}", e),
120 }
121
122 result
123 }
124
125 #[cfg_attr(
134 feature = "opentelemetry",
135 tracing::instrument(skip_all, name = "InprocOutGate.send_message", fields(target = ?target.to_string_repr()))
136 )]
137 pub async fn send_message(&self, target: &ActrId, envelope: RpcEnvelope) -> ActorResult<()> {
138 tracing::debug!(
139 "InprocOutGate::send_message to {:?}",
140 target.to_string_repr()
141 );
142
143 self.transport
145 .send_message(PayloadType::RpcReliable, None, envelope)
146 .await
147 .map_err(|e| ProtocolError::TransportError(e.to_string()))
148 }
149
150 pub async fn send_data_stream(
160 &self,
161 _target: &ActrId,
162 payload_type: PayloadType,
163 data: Bytes,
164 ) -> ActorResult<()> {
165 use actr_protocol::prost::Message as ProstMessage;
166
167 let stream = actr_protocol::DataStream::decode(&*data)
169 .map_err(|e| ProtocolError::DecodeError(format!("Failed to decode DataStream: {e}")))?;
170
171 tracing::debug!(
172 "📤 InprocOutGate::send_data_stream stream_id={}, sequence={}",
173 stream.stream_id,
174 stream.sequence
175 );
176
177 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
179 let mut envelope = RpcEnvelope {
180 route_key: "fast_path.data_stream".to_string(),
181 payload: Some(data),
182 error: None,
183 traceparent: None,
184 tracestate: None,
185 request_id: uuid::Uuid::new_v4().to_string(),
186 metadata: vec![],
187 timeout_ms: 0,
188 };
189 #[cfg(feature = "opentelemetry")]
191 {
192 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
193 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
194 }
195
196 self.transport
197 .send_message(payload_type, Some(stream.stream_id), envelope)
198 .await
199 .map_err(|e| ProtocolError::TransportError(e.to_string()))
200 }
201}