1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
use Stream;
/// Represents a fundamental computation unit in the `acty` framework.
///
/// An `Actor` is an entity that implements specific business logic by processing
/// an asynchronous message stream (the `inbox`).
///
/// ## Core Concept
///
/// The core of an Actor is its [`run`] method. This method contains the Actor's
/// main loop, which continuously pulls and processes messages from the `inbox`
/// until the stream ends.
///
/// ## Actor Lifecycle
///
/// An Actor's lifecycle is determined by the lifecycle of its `inbox`, which is a
/// "sender-driven" pattern. When all [`Outbox`](crate::UnboundedOutbox) instances
/// associated with the Actor (i.e., the message senders) are destroyed, the
/// `inbox` stream naturally closes. This causes the message loop in the `run`
/// method to finish, leading to the Actor's `tokio` task exiting gracefully.
/// This design avoids the need to manually send a "stop" message, making
/// lifecycle management simpler and safer.
///
/// ## State Management
///
/// An Actor's state usually exists as local variables within the asynchronous
/// scope of the `run` method. The Actor structure itself (`Self`) is often
/// only used to carry initial configuration or handles for result communication
/// (e.g., a `tokio::sync::oneshot::Sender`). When the `run` method starts,
/// `self` is consumed, and its fields can be moved into the `run` method's scope.
///
/// ## Trait Bounds: `Sized + Send + 'static`
///
/// - `Sized`: This is a standard requirement, meaning the Actor's size must be known at compile time.
/// - `Send`: Since the Actor will be moved to a new task by `tokio::spawn` (potentially on a different thread), it must be thread-safe itself.
/// - `'static`: The Actor must not contain any non-static references, ensuring it remains valid throughout its entire execution lifecycle.
///
/// The `#[trait_variant::make(Send)]` macro automatically ensures that implementors
/// of this trait satisfy the `Send` constraint.
///
/// ## Example
///
/// Here is an example of a `SummarizerActor` that receives a series of text fragments
/// and returns them concatenated into an article upon completion.
///
/// ```
/// use acty::{Actor, ActorExt, AsyncClose};
/// use futures::{Stream, StreamExt};
/// use std::pin::pin;
/// use tokio::sync::oneshot;
///
/// /// An Actor used to concatenate strings.
/// /// It holds a `oneshot::Sender` in its structure to return the final result
/// /// when the task is complete.
/// struct SummarizerActor {
/// result_sender: oneshot::Sender<String>,
/// }
///
/// /// Implement the Actor trait
/// impl Actor for SummarizerActor {
/// // This Actor handles messages of type String
/// type Message = String;
///
/// /// The Actor's main logic
/// async fn run(self, inbox: impl Stream<Item = Self::Message> + Send) {
/// // Pin the inbox to the stack to enable use of .next()
/// let mut inbox = pin!(inbox);
///
/// // Initialize the Actor's internal state
/// let mut summary = String::new();
///
/// // Loop to process messages from the inbox
/// while let Some(fragment) = inbox.next().await {
/// summary.push_str(&fragment);
/// summary.push('\n');
/// }
///
/// // The loop finishes when the inbox closes (all Outboxes are dropped).
/// // At this point, send the final result via the oneshot channel.
/// // Ignore the error if the receiver has been dropped.
/// self.result_sender.send(summary).unwrap_or(());
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// // 1. Create the oneshot channel to receive the result
/// let (tx, rx) = oneshot::channel();
///
/// // 2. Create the Actor instance
/// let actor = SummarizerActor { result_sender: tx };
///
/// // 3. Launch the Actor and get an Outbox for sending messages
/// let outbox = actor.start();
///
/// // 4. Send messages to the Actor
/// outbox.send("This is the first part.".to_string()).unwrap();
/// outbox.send("This is the second part.".to_string()).unwrap();
///
/// // 5. Close the outbox, which will cause the Actor's inbox stream to end
/// outbox.close().await;
///
/// // 6. Wait for and retrieve the final result returned by the Actor
/// let result = rx.await.expect("Actor failed to send the result");
///
/// assert_eq!(result, "This is the first part.\nThis is the second part.\n");
/// println!("Final summary:\n{}", result);
/// }
/// ```