1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::{fmt, sync::Arc};
use tokio::sync::mpsc::Sender;
use crate::{
ActorId, Envelope, EventId, IntoEnvelope, Result,
internal::{Command, CommandSender},
};
/// Runtime-provided context for an actor to interact with the system.
///
/// Use it to:
/// - `send(event)`: emit events into the broker tagged with this actor's ID
/// - `send_child_event(event, parent_id)`: emit an event linked to a parent event
/// - `stop()`: stop this actor (other actors continue running)
/// - `stop_runtime()`: initiate shutdown of the entire runtime
/// - `actor_id()`: retrieve the actor's identity
/// - `is_sender_full()`: check channel congestion before sending
///
/// See also: [`Envelope`], [`crate::Meta`], [`crate::Supervisor`].
#[derive(Clone)]
pub struct Context<E> {
actor_id: ActorId,
sender: Sender<Arc<Envelope<E>>>,
cmd_tx: CommandSender,
}
impl<E> Context<E> {
pub(crate) fn new(
actor_id: ActorId,
sender: Sender<Arc<Envelope<E>>>,
cmd_tx: CommandSender,
) -> Self {
Self {
actor_id,
sender,
cmd_tx,
}
}
/// Send an event to the broker. Accepts any type that converts into `E`.
/// The envelope will carry this actor's name.
/// This awaits channel capacity (backpressure) to avoid silent drops.
///
/// # Errors
///
/// Returns [`Error::MailboxClosed`](crate::Error::MailboxClosed) if the broker
/// channel is closed.
pub async fn send<IE: Into<IntoEnvelope<E>>>(&self, into_envelope: IE) -> Result {
let envelope = into_envelope
.into()
.with_actor_id(self.actor_id.clone())
.build();
self.send_envelope(envelope).await
}
/// Emit a child event linked to the given parent event ID.
///
/// # Errors
///
/// Returns [`Error::MailboxClosed`](crate::Error::MailboxClosed) if the broker
/// channel is closed.
pub async fn send_child_event<IE: Into<IntoEnvelope<E>>>(
&self,
into_envelope: IE,
parent_id: EventId,
) -> Result {
let envelope = into_envelope
.into()
.with_actor_id(self.actor_id.clone())
.with_parent_id(parent_id)
.build();
self.send_envelope(envelope).await
}
#[inline]
async fn send_envelope<T: Into<Envelope<E>>>(&self, envelope: T) -> Result {
self.sender.send(Arc::new(envelope.into())).await?;
Ok(())
}
/// Stop this actor.
///
/// The actor's event loop will exit after the current tick completes,
/// and [`on_shutdown`](crate::Actor::on_shutdown) will be called.
/// Other actors continue running.
///
/// To shut down the entire runtime instead, use [`stop_runtime()`](Self::stop_runtime).
///
/// # Errors
///
/// Returns [`Error::Internal`](crate::Error::Internal) if the command
/// channel is closed (runtime already shut down).
#[inline]
pub fn stop(&self) -> Result {
self.cmd_tx.send(Command::StopActor(self.actor_id.clone()))
}
/// Initiate shutdown of the entire runtime.
///
/// All actors will be cancelled and the supervisor's
/// [`join()`](crate::Supervisor::join) or [`run()`](crate::Supervisor::run)
/// call will return.
///
/// To stop only this actor, use [`stop()`](Self::stop).
///
/// # Errors
///
/// Returns [`Error::Internal`](crate::Error::Internal) if the command
/// channel is closed (runtime already shut down).
#[inline]
pub fn stop_runtime(&self) -> Result {
self.cmd_tx.send(Command::StopRuntime)
}
/// The identity of this actor.
#[inline]
pub fn actor_id(&self) -> &ActorId {
&self.actor_id
}
/// The actor's name as registered with the supervisor.
#[inline]
pub fn actor_name(&self) -> &str {
self.actor_id.as_str()
}
/// Whether this actor's channel to the broker has no remaining capacity.
///
/// Producers can use this to skip sending non-essential events when
/// the channel is congested (stage 1). This is a cooperative
/// mechanism - the producer decides what to skip.
///
/// ```rust,ignore
/// // Skip telemetry when the channel is busy
/// if !ctx.is_sender_full() {
/// ctx.send(Event::Telemetry(stats)).await?;
/// }
/// ```
///
/// Note: each actor has its own channel to the broker (stage 1).
/// This reflects the sending actor's individual backlog, not
/// global system pressure or subscriber-side congestion (stage 2).
#[inline]
pub fn is_sender_full(&self) -> bool {
self.sender.capacity() == 0
}
}
impl<E> fmt::Debug for Context<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Context")
.field("actor_id", &self.actor_id)
.field("sender", &self.sender)
.finish_non_exhaustive()
}
}