acton_reactive/message/outbound_envelope.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::cmp::PartialEq;
18use std::fmt::Debug; // Import Debug
19use std::hash::{Hash, Hasher}; // Import Hash and Hasher
20use std::sync::{Arc, OnceLock};
21
22use tokio::runtime::{Handle, Runtime};
23use tracing::{debug, error, instrument, trace, warn};
24
25/// Shared runtime for synchronous `reply()` calls made outside of a Tokio context.
26///
27/// This runtime is created lazily on first use and persists for the process lifetime.
28/// Using a shared runtime avoids the expensive overhead of creating a new runtime
29/// (and associated thread pools) for each `reply()` call from non-async code.
30///
31/// The runtime uses the multi-threaded flavor with a single worker thread. This allows
32/// `spawn()` to work without blocking the caller (preserving fire-and-forget semantics),
33/// while keeping resource usage minimal since reply operations are lightweight channel sends.
34static SYNC_REPLY_RUNTIME: OnceLock<Runtime> = OnceLock::new();
35
36/// Gets or creates the shared fallback runtime for synchronous `reply()` calls.
37///
38/// This function is called when `reply()` is invoked outside of any Tokio runtime
39/// context. Rather than creating a new runtime per call (expensive), we lazily
40/// initialize a single shared runtime that handles all sync reply operations.
41fn sync_reply_runtime() -> &'static Runtime {
42 SYNC_REPLY_RUNTIME.get_or_init(|| {
43 debug!("Creating shared fallback runtime for sync reply() calls");
44 tokio::runtime::Builder::new_multi_thread()
45 .worker_threads(1)
46 .enable_all()
47 .thread_name("acton-sync-reply")
48 .build()
49 .expect("Failed to create fallback Tokio runtime for sync reply()")
50 })
51}
52
53use crate::common::{Envelope, MessageError};
54use crate::message::message_address::MessageAddress;
55use crate::traits::ActonMessage;
56
57/// Represents a message prepared for sending, including sender and optional recipient addresses.
58///
59/// An `OutboundEnvelope` is typically created by an actor (using methods like
60/// [`ActorHandle::create_envelope`](crate::common::ActorHandle::create_envelope))
61/// before sending a message. It holds the [`MessageAddress`] of the sender (`return_address`)
62/// and optionally the [`MessageAddress`] of the recipient (`recipient_address`).
63///
64/// The primary methods for dispatching the message are [`OutboundEnvelope::send`] (asynchronous)
65/// and [`OutboundEnvelope::reply`] (synchronous wrapper).
66///
67/// Equality and hashing are based solely on the `return_address`.
68#[derive(Clone, Debug)]
69pub struct OutboundEnvelope {
70 /// The address of the actor sending the message.
71 pub(crate) return_address: MessageAddress,
72 /// The address of the intended recipient actor, if specified directly.
73 /// If `None`, the recipient might be implied (e.g., sending back to `return_address`).
74 pub(crate) recipient_address: Option<MessageAddress>,
75 /// The cancellation token for the sending actor.
76 pub(crate) cancellation_token: tokio_util::sync::CancellationToken,
77}
78
79// Note: The PartialEq impl for MessageAddress is defined here, but ideally should be
80// in message_address.rs if it's generally applicable. Assuming it's needed here for now.
81/// Implements equality comparison for `MessageAddress` based on the sender's `Ern`.
82impl PartialEq for MessageAddress {
83 fn eq(&self, other: &Self) -> bool {
84 self.sender == other.sender // Compare based on Ern
85 }
86}
87
88/// Implements equality comparison for `OutboundEnvelope` based on the `return_address`.
89impl PartialEq for OutboundEnvelope {
90 fn eq(&self, other: &Self) -> bool {
91 self.return_address == other.return_address
92 }
93}
94
95/// Derives `Eq` based on the `PartialEq` implementation.
96impl Eq for OutboundEnvelope {}
97
98/// Implements hashing for `OutboundEnvelope` based on the `return_address`.
99impl Hash for OutboundEnvelope {
100 fn hash<H: Hasher>(&self, state: &mut H) {
101 // Hash only based on the return address's sender Ern, consistent with PartialEq.
102 self.return_address.sender.hash(state);
103 }
104}
105
106impl OutboundEnvelope {
107 /// Creates a new `OutboundEnvelope` with only a return address specified.
108 ///
109 /// The recipient address is initially set to `None`. Use [`OutboundEnvelope::send`]
110 /// or [`OutboundEnvelope::reply`] to send the message, typically back to the
111 /// `return_address` if no recipient is set later (though `send_message_inner` logic defaults to `return_address` if `recipient_address` is `None`).
112 ///
113 /// # Arguments
114 ///
115 /// * `return_address`: The [`MessageAddress`] of the actor creating this envelope (the sender).
116 ///
117 /// # Returns
118 ///
119 /// A new `OutboundEnvelope` instance.
120 #[instrument(skip(return_address))]
121 pub fn new(
122 return_address: MessageAddress,
123 cancellation_token: tokio_util::sync::CancellationToken,
124 ) -> Self {
125 trace!(sender = %return_address.sender, "Creating new OutboundEnvelope");
126 Self {
127 return_address,
128 recipient_address: None,
129 cancellation_token,
130 }
131 }
132
133 /// Returns a clone of the sender's [`MessageAddress`].
134 #[inline]
135 #[must_use]
136 pub fn reply_to(&self) -> MessageAddress {
137 self.return_address.clone()
138 }
139
140 /// Returns a reference to the optional recipient's [`MessageAddress`].
141 #[inline]
142 #[must_use]
143 pub const fn recipient(&self) -> &Option<MessageAddress> {
144 &self.recipient_address
145 }
146
147 /// Crate-internal constructor: Creates a new `OutboundEnvelope` with specified sender and recipient.
148 #[instrument(skip(return_address, recipient_address))]
149 pub(crate) fn new_with_recipient(
150 return_address: MessageAddress,
151 recipient_address: MessageAddress,
152 cancellation_token: tokio_util::sync::CancellationToken,
153 ) -> Self {
154 trace!(sender = %return_address.sender, recipient = %recipient_address.sender, "Creating new OutboundEnvelope with recipient");
155 Self {
156 return_address,
157 recipient_address: Some(recipient_address),
158 cancellation_token,
159 }
160 }
161
162 /// Sends a message using this envelope synchronously.
163 ///
164 /// This method attempts to send a message without requiring an async context.
165 /// It uses the following strategy:
166 ///
167 /// 1. If called from within a Tokio runtime context, it spawns the send operation
168 /// on the existing runtime (most efficient).
169 /// 2. If called from outside any Tokio context, it creates a minimal runtime
170 /// to execute the send (fallback for non-async code paths).
171 ///
172 /// **Recommendation:** Prefer using the asynchronous [`OutboundEnvelope::send`] method
173 /// whenever possible, as it integrates better with async workflows.
174 ///
175 /// # Arguments
176 ///
177 /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
178 ///
179 /// # Returns
180 ///
181 /// * `Ok(())`: If the message was successfully scheduled to be sent (actual delivery depends on the recipient).
182 /// * `Err(MessageError)`: Currently, this implementation always returns `Ok(())`, but the signature
183 /// allows for future error handling. Potential errors (like closed channels) are logged internally.
184 #[instrument(skip(self, message), fields(message_type = std::any::type_name_of_val(&message)))]
185 pub fn reply(&self, message: impl ActonMessage + 'static) -> Result<(), MessageError> {
186 let envelope = self.clone();
187 let message_arc = Arc::new(message);
188
189 // Try to use the existing runtime if we're already in a Tokio context.
190 // This avoids the overhead of creating a new runtime per call.
191 if let Ok(handle) = Handle::try_current() {
192 // We're inside a Tokio runtime - spawn on the existing runtime
193 trace!(
194 sender = %envelope.return_address.sender,
195 recipient = ?envelope.recipient_address.as_ref().map(|r| r.sender.to_string()),
196 "Replying via existing runtime handle"
197 );
198 // Spawn a boxed future to reduce stack usage from large tokio::select! in send_message_inner
199 Self::spawn_reply_task(&handle, envelope, message_arc);
200 } else {
201 // We're outside any Tokio context - use the shared fallback runtime.
202 warn!(
203 sender = %envelope.return_address.sender,
204 "reply() called outside Tokio context; using shared fallback runtime"
205 );
206 Self::spawn_reply_on_fallback(envelope, message_arc);
207 }
208 Ok(())
209 }
210
211 /// Helper to spawn reply task on existing runtime, using boxed future.
212 fn spawn_reply_task(
213 handle: &Handle,
214 envelope: Self,
215 message: Arc<dyn ActonMessage + Send + Sync>,
216 ) {
217 handle.spawn(Box::pin(async move {
218 envelope.send_message_inner(message).await;
219 }));
220 }
221
222 /// Spawns a reply task on the shared fallback runtime.
223 ///
224 /// This is much more efficient than the previous approach of creating a new
225 /// `std::thread` and `Runtime` per call. The shared runtime amortizes the
226 /// cost across all sync `reply()` calls.
227 fn spawn_reply_on_fallback(envelope: Self, message: Arc<dyn ActonMessage + Send + Sync>) {
228 sync_reply_runtime().spawn(async move {
229 envelope.send_message_inner(message).await;
230 });
231 }
232
233 /// Crate-internal: Asynchronously sends the message payload to the recipient.
234 /// Handles channel reservation and error logging.
235 ///
236 /// # Performance
237 ///
238 /// This method uses a fast-path optimization: it first attempts `try_reserve()` which
239 /// is non-blocking and avoids async overhead when the channel has capacity (common case).
240 /// Only when the channel is full does it fall back to the async `reserve()` path.
241 ///
242 /// Clones are minimized on the fast path: `sender`/`recipient` identifiers are borrowed
243 /// from the address structs for logging rather than cloned upfront.
244 async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
245 let target_address = self
246 .recipient_address
247 .as_ref()
248 .unwrap_or(&self.return_address)
249 .clone();
250 let return_address = self.return_address.clone();
251 let channel_sender = target_address.address.clone();
252
253 // Check if cancelled before attempting send
254 if self.cancellation_token.is_cancelled() {
255 error!(sender = %return_address.sender, recipient = %target_address.sender, "Send aborted: cancellation_token triggered");
256 return;
257 }
258
259 // Fast path: try non-blocking reserve first (common case when channel has capacity)
260 match channel_sender.try_reserve() {
261 Ok(permit) => {
262 permit.send(Envelope::new(message, return_address, target_address));
263 return;
264 }
265 Err(tokio::sync::mpsc::error::TrySendError::Closed(())) => {
266 error!(sender = %return_address.sender, recipient = %target_address.sender, "Recipient channel is closed");
267 return;
268 }
269 Err(tokio::sync::mpsc::error::TrySendError::Full(())) => {
270 // Channel is full, fall through to slow path
271 }
272 }
273
274 // Slow path: channel is full, need to wait for capacity
275 match channel_sender.reserve().await {
276 Ok(permit) => {
277 permit.send(Envelope::new(message, return_address, target_address));
278 }
279 Err(e) => {
280 error!(sender = %return_address.sender, recipient = %target_address.sender, error = %e, "Failed to reserve channel capacity");
281 }
282 };
283 }
284
285 /// Sends a message asynchronously using this envelope.
286 ///
287 /// This method takes the message payload, wraps it in an `Arc`, and calls the
288 /// internal `send_message_inner` to dispatch it to the recipient's channel.
289 /// The recipient is determined by `recipient_address` if `Some`, otherwise it
290 /// defaults to `return_address`.
291 ///
292 /// This is the preferred method for sending messages from within an asynchronous context.
293 /// For fire-and-forget scenarios where errors can be ignored, this method logs errors
294 /// internally. For explicit error handling, use [`try_send`](OutboundEnvelope::try_send).
295 ///
296 /// # Arguments
297 ///
298 /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
299 #[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
300 pub async fn send(&self, message: impl ActonMessage + 'static) {
301 // Arc the message and call the internal async sender.
302 self.send_message_inner(Arc::new(message)).await;
303 }
304
305 /// Sends a message asynchronously with explicit error handling.
306 ///
307 /// This method is similar to [`send`](OutboundEnvelope::send), but returns a `Result`
308 /// indicating whether the message was successfully delivered to the recipient's channel.
309 /// Use this when you need to handle delivery failures explicitly rather than relying
310 /// on internal logging.
311 ///
312 /// # Arguments
313 ///
314 /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
315 ///
316 /// # Returns
317 ///
318 /// * `Ok(())` - The message was successfully queued in the recipient's channel
319 /// * `Err(MessageError)` - The message could not be delivered
320 ///
321 /// # Errors
322 ///
323 /// Returns an error if:
324 /// - The recipient's channel is closed (`MessageError::ChannelClosed`)
325 /// - The operation was cancelled (`MessageError::Cancelled`)
326 /// - The channel capacity could not be reserved (`MessageError::SendFailed`)
327 ///
328 /// # Performance
329 ///
330 /// Uses a fast-path with `try_reserve()` for the common case when the channel has capacity.
331 #[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
332 pub async fn try_send(&self, message: impl ActonMessage + 'static) -> Result<(), MessageError> {
333 let message = Arc::new(message);
334
335 // Determine the target address: recipient if Some, otherwise return_address.
336 let target_address = self
337 .recipient_address
338 .as_ref()
339 .unwrap_or(&self.return_address);
340
341 // Check cancellation synchronously first
342 if self.cancellation_token.is_cancelled() {
343 return Err(MessageError::Cancelled);
344 }
345
346 let channel_sender = &target_address.address;
347
348 // Fast path: try non-blocking reserve first
349 match channel_sender.try_reserve() {
350 Ok(permit) => {
351 let internal_envelope =
352 Envelope::new(message, self.return_address.clone(), target_address.clone());
353 permit.send(internal_envelope);
354 return Ok(());
355 }
356 Err(tokio::sync::mpsc::error::TrySendError::Closed(())) => {
357 return Err(MessageError::ChannelClosed);
358 }
359 Err(tokio::sync::mpsc::error::TrySendError::Full(())) => {
360 // Fall through to slow path
361 }
362 }
363
364 // Slow path: channel is full, need to wait for capacity
365 let channel_sender = channel_sender.clone();
366 let cancellation = self.cancellation_token.clone();
367 let return_addr = self.return_address.clone();
368 let target_addr = target_address.clone();
369
370 Box::pin(async move {
371 tokio::select! {
372 () = cancellation.cancelled() => {
373 Err(MessageError::Cancelled)
374 }
375 permit_result = channel_sender.reserve() => {
376 match permit_result {
377 Ok(permit) => {
378 let internal_envelope = Envelope::new(message, return_addr, target_addr);
379 permit.send(internal_envelope);
380 Ok(())
381 }
382 Err(e) => {
383 Err(MessageError::SendFailed(e.to_string()))
384 }
385 }
386 }
387 }
388 })
389 .await
390 }
391
392 /// Sends an Arc-wrapped message asynchronously using this envelope.
393 ///
394 /// This method is similar to [`send`](OutboundEnvelope::send), but accepts an
395 /// already-Arc'd message. This is useful when the message is already wrapped
396 /// in an Arc (e.g., from IPC deserialization where Box is converted to Arc).
397 ///
398 /// # Arguments
399 ///
400 /// * `message`: An Arc-wrapped message payload to send.
401 #[cfg(feature = "ipc")]
402 #[instrument(skip(self, message), level = "trace")]
403 pub async fn send_arc(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
404 self.send_message_inner(message).await;
405 }
406
407 /// Tries to send an Arc-wrapped message without blocking.
408 ///
409 /// This method is similar to [`send_arc`](OutboundEnvelope::send_arc), but uses
410 /// `try_reserve()` instead of `reserve()`. It returns immediately with an error
411 /// if the recipient's channel is full, rather than waiting for capacity.
412 ///
413 /// This is useful for IPC scenarios where backpressure feedback is needed
414 /// rather than blocking the IPC listener.
415 ///
416 /// # Arguments
417 ///
418 /// * `message`: An Arc-wrapped message payload to send.
419 ///
420 /// # Errors
421 ///
422 /// Returns an error if:
423 /// - The recipient's channel is closed
424 /// - The recipient's channel is full (backpressure)
425 #[cfg(feature = "ipc")]
426 #[instrument(skip(self, message), level = "trace")]
427 pub fn try_send_arc(
428 &self,
429 message: Arc<dyn ActonMessage + Send + Sync>,
430 ) -> Result<(), crate::common::ipc::IpcError> {
431 use crate::common::ipc::IpcError;
432 use tokio::sync::mpsc::error::TrySendError;
433
434 // Determine the target address
435 let target_address = self
436 .recipient_address
437 .as_ref()
438 .unwrap_or(&self.return_address);
439 let target_id = &target_address.sender;
440 let channel_sender = target_address.address.clone();
441
442 trace!(sender = %self.return_address.sender, recipient = %target_id, "Attempting try_send_arc");
443
444 if channel_sender.is_closed() {
445 tracing::error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
446 return Err(IpcError::IoError("Recipient channel is closed".to_string()));
447 }
448
449 // Try to reserve a send permit without blocking
450 let permit = match channel_sender.try_reserve() {
451 Ok(permit) => permit,
452 Err(TrySendError::Full(())) => {
453 tracing::warn!(sender = %self.return_address.sender, recipient = %target_id, "Target actor inbox is full");
454 return Err(IpcError::TargetBusy);
455 }
456 Err(TrySendError::Closed(())) => {
457 tracing::error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
458 return Err(IpcError::IoError("Recipient channel is closed".to_string()));
459 }
460 };
461
462 let internal_envelope =
463 Envelope::new(message, self.return_address.clone(), target_address.clone());
464 trace!(sender = %self.return_address.sender, recipient = %target_id, "Sending message via try_reserve permit");
465 permit.send(internal_envelope);
466 Ok(())
467 }
468}