Skip to main content

acton_reactive/message/
outbound_envelope.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::cmp::PartialEq;
18use std::fmt::Debug; // Import Debug
19use std::hash::{Hash, Hasher}; // Import Hash and Hasher
20use std::sync::{Arc, OnceLock};
21
22use tokio::runtime::{Handle, Runtime};
23use tracing::{debug, error, instrument, trace, warn};
24
25/// Shared runtime for synchronous `reply()` calls made outside of a Tokio context.
26///
27/// This runtime is created lazily on first use and persists for the process lifetime.
28/// Using a shared runtime avoids the expensive overhead of creating a new runtime
29/// (and associated thread pools) for each `reply()` call from non-async code.
30///
31/// The runtime uses the multi-threaded flavor with a single worker thread. This allows
32/// `spawn()` to work without blocking the caller (preserving fire-and-forget semantics),
33/// while keeping resource usage minimal since reply operations are lightweight channel sends.
34static SYNC_REPLY_RUNTIME: OnceLock<Runtime> = OnceLock::new();
35
36/// Gets or creates the shared fallback runtime for synchronous `reply()` calls.
37///
38/// This function is called when `reply()` is invoked outside of any Tokio runtime
39/// context. Rather than creating a new runtime per call (expensive), we lazily
40/// initialize a single shared runtime that handles all sync reply operations.
41fn sync_reply_runtime() -> &'static Runtime {
42    SYNC_REPLY_RUNTIME.get_or_init(|| {
43        debug!("Creating shared fallback runtime for sync reply() calls");
44        tokio::runtime::Builder::new_multi_thread()
45            .worker_threads(1)
46            .enable_all()
47            .thread_name("acton-sync-reply")
48            .build()
49            .expect("Failed to create fallback Tokio runtime for sync reply()")
50    })
51}
52
53use crate::common::{Envelope, MessageError};
54use crate::message::message_address::MessageAddress;
55use crate::traits::ActonMessage;
56
57/// Represents a message prepared for sending, including sender and optional recipient addresses.
58///
59/// An `OutboundEnvelope` is typically created by an actor (using methods like
60/// [`ActorHandle::create_envelope`](crate::common::ActorHandle::create_envelope))
61/// before sending a message. It holds the [`MessageAddress`] of the sender (`return_address`)
62/// and optionally the [`MessageAddress`] of the recipient (`recipient_address`).
63///
64/// The primary methods for dispatching the message are [`OutboundEnvelope::send`] (asynchronous)
65/// and [`OutboundEnvelope::reply`] (synchronous wrapper).
66///
67/// Equality and hashing are based solely on the `return_address`.
68#[derive(Clone, Debug)]
69pub struct OutboundEnvelope {
70    /// The address of the actor sending the message.
71    pub(crate) return_address: MessageAddress,
72    /// The address of the intended recipient actor, if specified directly.
73    /// If `None`, the recipient might be implied (e.g., sending back to `return_address`).
74    pub(crate) recipient_address: Option<MessageAddress>,
75    /// The cancellation token for the sending actor.
76    pub(crate) cancellation_token: tokio_util::sync::CancellationToken,
77}
78
79// Note: The PartialEq impl for MessageAddress is defined here, but ideally should be
80// in message_address.rs if it's generally applicable. Assuming it's needed here for now.
81/// Implements equality comparison for `MessageAddress` based on the sender's `Ern`.
82impl PartialEq for MessageAddress {
83    fn eq(&self, other: &Self) -> bool {
84        self.sender == other.sender // Compare based on Ern
85    }
86}
87
88/// Implements equality comparison for `OutboundEnvelope` based on the `return_address`.
89impl PartialEq for OutboundEnvelope {
90    fn eq(&self, other: &Self) -> bool {
91        self.return_address == other.return_address
92    }
93}
94
95/// Derives `Eq` based on the `PartialEq` implementation.
96impl Eq for OutboundEnvelope {}
97
98/// Implements hashing for `OutboundEnvelope` based on the `return_address`.
99impl Hash for OutboundEnvelope {
100    fn hash<H: Hasher>(&self, state: &mut H) {
101        // Hash only based on the return address's sender Ern, consistent with PartialEq.
102        self.return_address.sender.hash(state);
103    }
104}
105
106impl OutboundEnvelope {
107    /// Creates a new `OutboundEnvelope` with only a return address specified.
108    ///
109    /// The recipient address is initially set to `None`. Use [`OutboundEnvelope::send`]
110    /// or [`OutboundEnvelope::reply`] to send the message, typically back to the
111    /// `return_address` if no recipient is set later (though `send_message_inner` logic defaults to `return_address` if `recipient_address` is `None`).
112    ///
113    /// # Arguments
114    ///
115    /// * `return_address`: The [`MessageAddress`] of the actor creating this envelope (the sender).
116    ///
117    /// # Returns
118    ///
119    /// A new `OutboundEnvelope` instance.
120    #[instrument(skip(return_address))]
121    pub fn new(
122        return_address: MessageAddress,
123        cancellation_token: tokio_util::sync::CancellationToken,
124    ) -> Self {
125        trace!(sender = %return_address.sender, "Creating new OutboundEnvelope");
126        Self {
127            return_address,
128            recipient_address: None,
129            cancellation_token,
130        }
131    }
132
133    /// Returns a clone of the sender's [`MessageAddress`].
134    #[inline]
135    #[must_use]
136    pub fn reply_to(&self) -> MessageAddress {
137        self.return_address.clone()
138    }
139
140    /// Returns a reference to the optional recipient's [`MessageAddress`].
141    #[inline]
142    #[must_use]
143    pub const fn recipient(&self) -> &Option<MessageAddress> {
144        &self.recipient_address
145    }
146
147    /// Crate-internal constructor: Creates a new `OutboundEnvelope` with specified sender and recipient.
148    #[instrument(skip(return_address, recipient_address))]
149    pub(crate) fn new_with_recipient(
150        return_address: MessageAddress,
151        recipient_address: MessageAddress,
152        cancellation_token: tokio_util::sync::CancellationToken,
153    ) -> Self {
154        trace!(sender = %return_address.sender, recipient = %recipient_address.sender, "Creating new OutboundEnvelope with recipient");
155        Self {
156            return_address,
157            recipient_address: Some(recipient_address),
158            cancellation_token,
159        }
160    }
161
162    /// Sends a message using this envelope synchronously.
163    ///
164    /// This method attempts to send a message without requiring an async context.
165    /// It uses the following strategy:
166    ///
167    /// 1. If called from within a Tokio runtime context, it spawns the send operation
168    ///    on the existing runtime (most efficient).
169    /// 2. If called from outside any Tokio context, it creates a minimal runtime
170    ///    to execute the send (fallback for non-async code paths).
171    ///
172    /// **Recommendation:** Prefer using the asynchronous [`OutboundEnvelope::send`] method
173    /// whenever possible, as it integrates better with async workflows.
174    ///
175    /// # Arguments
176    ///
177    /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
178    ///
179    /// # Returns
180    ///
181    /// * `Ok(())`: If the message was successfully scheduled to be sent (actual delivery depends on the recipient).
182    /// * `Err(MessageError)`: Currently, this implementation always returns `Ok(())`, but the signature
183    ///   allows for future error handling. Potential errors (like closed channels) are logged internally.
184    #[instrument(skip(self, message), fields(message_type = std::any::type_name_of_val(&message)))]
185    pub fn reply(&self, message: impl ActonMessage + 'static) -> Result<(), MessageError> {
186        let envelope = self.clone();
187        let message_arc = Arc::new(message);
188
189        // Try to use the existing runtime if we're already in a Tokio context.
190        // This avoids the overhead of creating a new runtime per call.
191        if let Ok(handle) = Handle::try_current() {
192            // We're inside a Tokio runtime - spawn on the existing runtime
193            trace!(
194                sender = %envelope.return_address.sender,
195                recipient = ?envelope.recipient_address.as_ref().map(|r| r.sender.to_string()),
196                "Replying via existing runtime handle"
197            );
198            // Spawn a boxed future to reduce stack usage from large tokio::select! in send_message_inner
199            Self::spawn_reply_task(&handle, envelope, message_arc);
200        } else {
201            // We're outside any Tokio context - use the shared fallback runtime.
202            warn!(
203                sender = %envelope.return_address.sender,
204                "reply() called outside Tokio context; using shared fallback runtime"
205            );
206            Self::spawn_reply_on_fallback(envelope, message_arc);
207        }
208        Ok(())
209    }
210
211    /// Helper to spawn reply task on existing runtime, using boxed future.
212    fn spawn_reply_task(
213        handle: &Handle,
214        envelope: Self,
215        message: Arc<dyn ActonMessage + Send + Sync>,
216    ) {
217        handle.spawn(Box::pin(async move {
218            envelope.send_message_inner(message).await;
219        }));
220    }
221
222    /// Spawns a reply task on the shared fallback runtime.
223    ///
224    /// This is much more efficient than the previous approach of creating a new
225    /// `std::thread` and `Runtime` per call. The shared runtime amortizes the
226    /// cost across all sync `reply()` calls.
227    fn spawn_reply_on_fallback(envelope: Self, message: Arc<dyn ActonMessage + Send + Sync>) {
228        sync_reply_runtime().spawn(async move {
229            envelope.send_message_inner(message).await;
230        });
231    }
232
233    /// Crate-internal: Asynchronously sends the message payload to the recipient.
234    /// Handles channel reservation and error logging.
235    ///
236    /// # Performance
237    ///
238    /// This method uses a fast-path optimization: it first attempts `try_reserve()` which
239    /// is non-blocking and avoids async overhead when the channel has capacity (common case).
240    /// Only when the channel is full does it fall back to the async `reserve()` path.
241    ///
242    /// Clones are minimized on the fast path: `sender`/`recipient` identifiers are borrowed
243    /// from the address structs for logging rather than cloned upfront.
244    async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
245        let target_address = self
246            .recipient_address
247            .as_ref()
248            .unwrap_or(&self.return_address)
249            .clone();
250        let return_address = self.return_address.clone();
251        let channel_sender = target_address.address.clone();
252
253        // Check if cancelled before attempting send
254        if self.cancellation_token.is_cancelled() {
255            error!(sender = %return_address.sender, recipient = %target_address.sender, "Send aborted: cancellation_token triggered");
256            return;
257        }
258
259        // Fast path: try non-blocking reserve first (common case when channel has capacity)
260        match channel_sender.try_reserve() {
261            Ok(permit) => {
262                permit.send(Envelope::new(message, return_address, target_address));
263                return;
264            }
265            Err(tokio::sync::mpsc::error::TrySendError::Closed(())) => {
266                error!(sender = %return_address.sender, recipient = %target_address.sender, "Recipient channel is closed");
267                return;
268            }
269            Err(tokio::sync::mpsc::error::TrySendError::Full(())) => {
270                // Channel is full, fall through to slow path
271            }
272        }
273
274        // Slow path: channel is full, need to wait for capacity
275        match channel_sender.reserve().await {
276            Ok(permit) => {
277                permit.send(Envelope::new(message, return_address, target_address));
278            }
279            Err(e) => {
280                error!(sender = %return_address.sender, recipient = %target_address.sender, error = %e, "Failed to reserve channel capacity");
281            }
282        };
283    }
284
285    /// Sends a message asynchronously using this envelope.
286    ///
287    /// This method takes the message payload, wraps it in an `Arc`, and calls the
288    /// internal `send_message_inner` to dispatch it to the recipient's channel.
289    /// The recipient is determined by `recipient_address` if `Some`, otherwise it
290    /// defaults to `return_address`.
291    ///
292    /// This is the preferred method for sending messages from within an asynchronous context.
293    /// For fire-and-forget scenarios where errors can be ignored, this method logs errors
294    /// internally. For explicit error handling, use [`try_send`](OutboundEnvelope::try_send).
295    ///
296    /// # Arguments
297    ///
298    /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
299    #[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
300    pub async fn send(&self, message: impl ActonMessage + 'static) {
301        // Arc the message and call the internal async sender.
302        self.send_message_inner(Arc::new(message)).await;
303    }
304
305    /// Sends a message asynchronously with explicit error handling.
306    ///
307    /// This method is similar to [`send`](OutboundEnvelope::send), but returns a `Result`
308    /// indicating whether the message was successfully delivered to the recipient's channel.
309    /// Use this when you need to handle delivery failures explicitly rather than relying
310    /// on internal logging.
311    ///
312    /// # Arguments
313    ///
314    /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
315    ///
316    /// # Returns
317    ///
318    /// * `Ok(())` - The message was successfully queued in the recipient's channel
319    /// * `Err(MessageError)` - The message could not be delivered
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if:
324    /// - The recipient's channel is closed (`MessageError::ChannelClosed`)
325    /// - The operation was cancelled (`MessageError::Cancelled`)
326    /// - The channel capacity could not be reserved (`MessageError::SendFailed`)
327    ///
328    /// # Performance
329    ///
330    /// Uses a fast-path with `try_reserve()` for the common case when the channel has capacity.
331    #[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
332    pub async fn try_send(&self, message: impl ActonMessage + 'static) -> Result<(), MessageError> {
333        let message = Arc::new(message);
334
335        // Determine the target address: recipient if Some, otherwise return_address.
336        let target_address = self
337            .recipient_address
338            .as_ref()
339            .unwrap_or(&self.return_address);
340
341        // Check cancellation synchronously first
342        if self.cancellation_token.is_cancelled() {
343            return Err(MessageError::Cancelled);
344        }
345
346        let channel_sender = &target_address.address;
347
348        // Fast path: try non-blocking reserve first
349        match channel_sender.try_reserve() {
350            Ok(permit) => {
351                let internal_envelope =
352                    Envelope::new(message, self.return_address.clone(), target_address.clone());
353                permit.send(internal_envelope);
354                return Ok(());
355            }
356            Err(tokio::sync::mpsc::error::TrySendError::Closed(())) => {
357                return Err(MessageError::ChannelClosed);
358            }
359            Err(tokio::sync::mpsc::error::TrySendError::Full(())) => {
360                // Fall through to slow path
361            }
362        }
363
364        // Slow path: channel is full, need to wait for capacity
365        let channel_sender = channel_sender.clone();
366        let cancellation = self.cancellation_token.clone();
367        let return_addr = self.return_address.clone();
368        let target_addr = target_address.clone();
369
370        Box::pin(async move {
371            tokio::select! {
372                () = cancellation.cancelled() => {
373                    Err(MessageError::Cancelled)
374                }
375                permit_result = channel_sender.reserve() => {
376                    match permit_result {
377                        Ok(permit) => {
378                            let internal_envelope = Envelope::new(message, return_addr, target_addr);
379                            permit.send(internal_envelope);
380                            Ok(())
381                        }
382                        Err(e) => {
383                            Err(MessageError::SendFailed(e.to_string()))
384                        }
385                    }
386                }
387            }
388        })
389        .await
390    }
391
392    /// Sends an Arc-wrapped message asynchronously using this envelope.
393    ///
394    /// This method is similar to [`send`](OutboundEnvelope::send), but accepts an
395    /// already-Arc'd message. This is useful when the message is already wrapped
396    /// in an Arc (e.g., from IPC deserialization where Box is converted to Arc).
397    ///
398    /// # Arguments
399    ///
400    /// * `message`: An Arc-wrapped message payload to send.
401    #[cfg(feature = "ipc")]
402    #[instrument(skip(self, message), level = "trace")]
403    pub async fn send_arc(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
404        self.send_message_inner(message).await;
405    }
406
407    /// Tries to send an Arc-wrapped message without blocking.
408    ///
409    /// This method is similar to [`send_arc`](OutboundEnvelope::send_arc), but uses
410    /// `try_reserve()` instead of `reserve()`. It returns immediately with an error
411    /// if the recipient's channel is full, rather than waiting for capacity.
412    ///
413    /// This is useful for IPC scenarios where backpressure feedback is needed
414    /// rather than blocking the IPC listener.
415    ///
416    /// # Arguments
417    ///
418    /// * `message`: An Arc-wrapped message payload to send.
419    ///
420    /// # Errors
421    ///
422    /// Returns an error if:
423    /// - The recipient's channel is closed
424    /// - The recipient's channel is full (backpressure)
425    #[cfg(feature = "ipc")]
426    #[instrument(skip(self, message), level = "trace")]
427    pub fn try_send_arc(
428        &self,
429        message: Arc<dyn ActonMessage + Send + Sync>,
430    ) -> Result<(), crate::common::ipc::IpcError> {
431        use crate::common::ipc::IpcError;
432        use tokio::sync::mpsc::error::TrySendError;
433
434        // Determine the target address
435        let target_address = self
436            .recipient_address
437            .as_ref()
438            .unwrap_or(&self.return_address);
439        let target_id = &target_address.sender;
440        let channel_sender = target_address.address.clone();
441
442        trace!(sender = %self.return_address.sender, recipient = %target_id, "Attempting try_send_arc");
443
444        if channel_sender.is_closed() {
445            tracing::error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
446            return Err(IpcError::IoError("Recipient channel is closed".to_string()));
447        }
448
449        // Try to reserve a send permit without blocking
450        let permit = match channel_sender.try_reserve() {
451            Ok(permit) => permit,
452            Err(TrySendError::Full(())) => {
453                tracing::warn!(sender = %self.return_address.sender, recipient = %target_id, "Target actor inbox is full");
454                return Err(IpcError::TargetBusy);
455            }
456            Err(TrySendError::Closed(())) => {
457                tracing::error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
458                return Err(IpcError::IoError("Recipient channel is closed".to_string()));
459            }
460        };
461
462        let internal_envelope =
463            Envelope::new(message, self.return_address.clone(), target_address.clone());
464        trace!(sender = %self.return_address.sender, recipient = %target_id, "Sending message via try_reserve permit");
465        permit.send(internal_envelope);
466        Ok(())
467    }
468}