1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
//! Example demonstrating selective receive with RPC-style pattern.
//!
//! This example shows how to use selective receive (similar to Erlang's
//! `receive` with pattern matching) to implement RPC with correlation IDs.
//! Actors can wait for specific messages while leaving others in the mailbox.
//!
//! Run with: cargo run --example actor_selective_receive
use async_trait::async_trait;
use joerl::{Actor, ActorContext, ActorSystem, Message, Pid};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
/// Request message with correlation ID
#[derive(Debug, Clone)]
struct Request {
correlation_id: u64,
from: Pid,
query: String,
}
/// Response message with correlation ID
#[derive(Debug, Clone)]
struct Response {
correlation_id: u64,
result: String,
}
/// Command to trigger RPC
#[derive(Debug, Clone)]
enum Command {
MakeRequest(String),
MakeMultipleRequests,
}
/// Server actor that responds to requests
struct ServerActor {
name: String,
}
#[async_trait]
impl Actor for ServerActor {
async fn handle_message(&mut self, msg: Message, ctx: &mut ActorContext) {
if let Some(req) = msg.downcast_ref::<Request>() {
println!(
"🔵 [{}] Received request {} from {:?}: '{}'",
self.name, req.correlation_id, req.from, req.query
);
// Simulate processing time
tokio::time::sleep(Duration::from_millis(100)).await;
// Send response back
let response = Response {
correlation_id: req.correlation_id,
result: format!("[{}] Processed: {}", self.name, req.query),
};
ctx.send(req.from, Box::new(response)).await.ok();
println!(
"✅ [{}] Sent response for request {}",
self.name, req.correlation_id
);
}
}
}
/// Client actor that makes RPC calls using selective receive
struct ClientActor {
name: String,
server: Pid,
next_id: u64,
results: Arc<Mutex<Vec<(u64, String)>>>,
}
#[async_trait]
impl Actor for ClientActor {
async fn handle_message(&mut self, msg: Message, ctx: &mut ActorContext) {
// Handle commands
if let Some(cmd) = msg.downcast_ref::<Command>() {
match cmd {
Command::MakeRequest(query) => {
let request_id = self.next_id;
self.next_id += 1;
println!(
"🟢 [{}] Making RPC call {} with query: '{}'",
self.name, request_id, query
);
// Send request
let request = Request {
correlation_id: request_id,
from: ctx.pid(),
query: query.clone(),
};
ctx.send(self.server, Box::new(request)).await.ok();
// Use selective receive to wait for specific response
// Other messages will remain in mailbox
let response = ctx
.receive_timeout(
|msg| {
msg.downcast_ref::<Response>()
.filter(|r| r.correlation_id == request_id)
.cloned()
},
Duration::from_secs(2),
)
.await;
match response {
Some(resp) => {
println!(
"✨ [{}] Got response for request {}: '{}'",
self.name, request_id, resp.result
);
self.results
.lock()
.await
.push((resp.correlation_id, resp.result));
}
None => {
println!(
"⏰ [{}] Timeout waiting for response to request {}",
self.name, request_id
);
}
}
}
Command::MakeMultipleRequests => {
// Make multiple concurrent requests
// Each will wait for its specific response using correlation ID
let queries = vec![
"What is 2+2?",
"What is the capital of France?",
"What is Rust?",
];
for query in queries {
let request_id = self.next_id;
self.next_id += 1;
println!(
"🟢 [{}] Making RPC call {} with query: '{}'",
self.name, request_id, query
);
let request = Request {
correlation_id: request_id,
from: ctx.pid(),
query: query.to_string(),
};
ctx.send(self.server, Box::new(request)).await.ok();
}
// Now collect all responses - selective receive ensures
// we get them in any order they arrive
for _ in 0..3 {
let response = ctx
.receive_timeout(
|msg| msg.downcast_ref::<Response>().cloned(),
Duration::from_secs(2),
)
.await;
if let Some(resp) = response {
println!(
"✨ [{}] Got response {}: '{}'",
self.name, resp.correlation_id, resp.result
);
self.results
.lock()
.await
.push((resp.correlation_id, resp.result));
}
}
}
}
}
}
}
/// Actor that demonstrates try_receive (non-blocking)
struct MonitorActor {
stats: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl Actor for MonitorActor {
async fn handle_message(&mut self, msg: Message, ctx: &mut ActorContext) {
if let Some(cmd) = msg.downcast_ref::<&str>()
&& *cmd == "check"
{
println!("👀 [Monitor] Checking for any responses...");
// Try to receive without blocking
// This checks pending queue and current mailbox
let response = ctx.try_receive(|msg| msg.downcast_ref::<Response>().cloned());
match response {
Some(resp) => {
let stat = format!("Found response {} in mailbox", resp.correlation_id);
println!("📊 [Monitor] {}", stat);
self.stats.lock().await.push(stat);
}
None => {
println!("📊 [Monitor] No responses in mailbox");
}
}
}
}
}
#[tokio::main]
async fn main() {
println!("=== Selective Receive Example ===\n");
let system = Arc::new(ActorSystem::new());
// Spawn server
let server = system.spawn(ServerActor {
name: "Server".to_string(),
});
// Spawn client
let results = Arc::new(Mutex::new(Vec::new()));
let client = system.spawn(ClientActor {
name: "Client".to_string(),
server: server.pid(),
next_id: 1,
results: results.clone(),
});
println!("=== Single RPC Call ===\n");
// Make a single RPC call
client
.send(Box::new(Command::MakeRequest("Hello, Server!".to_string())))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
println!("\n=== Multiple Concurrent RPC Calls ===\n");
// Make multiple concurrent calls
client
.send(Box::new(Command::MakeMultipleRequests))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
println!("\n=== Try Receive (Non-blocking) ===\n");
// Spawn monitor that uses try_receive
let monitor_stats = Arc::new(Mutex::new(Vec::new()));
let monitor = system.spawn(MonitorActor {
stats: monitor_stats.clone(),
});
// Send some responses to server's mailbox for monitor to find
for i in 1..=3 {
monitor
.send(Box::new(Response {
correlation_id: 100 + i,
result: format!("Background response {}", i),
}))
.await
.unwrap();
}
// Check multiple times
for _ in 0..4 {
monitor.send(Box::new("check")).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_millis(200)).await;
// Print results
println!("\n=== Results ===");
let all_results = results.lock().await;
println!("Client received {} responses:", all_results.len());
for (id, result) in all_results.iter() {
println!(" [{}] {}", id, result);
}
let monitor_results = monitor_stats.lock().await;
println!("\nMonitor stats: {} items found", monitor_results.len());
for stat in monitor_results.iter() {
println!(" {}", stat);
}
println!("\n=== Key Concepts Demonstrated ===");
println!("1. RPC with correlation IDs - wait for specific responses");
println!("2. Selective receive with timeout - don't block forever");
println!("3. Multiple concurrent requests - each gets its own response");
println!("4. Non-blocking try_receive - check mailbox without waiting");
println!("5. Messages not matching pattern stay in mailbox for later\n");
}