1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
impl<C> Behaviour<C>
where
C: request_response::Codec<
Protocol = StreamProtocol,
Request = SwarmRequest,
Response = SwarmResponse,
> + Clone
+ Send
+ 'static,
{
/// Creates a new messaging behaviour with a custom codec.
pub fn with_codec(local_peer_id: PeerId, config: Config, codec: C) -> Self {
let request_response = request_response::Behaviour::with_codec(
codec,
[(proto_name(), request_response::ProtocolSupport::Full)],
config.into(),
);
Behaviour {
request_response,
local_peer_id,
next_id: 0,
requests: HashMap::new(),
join_set: JoinSet::new(),
}
}
/// Sends an ask request to a remote actor.
///
/// This is a low-level method that sends a request expecting a reply and
/// generates events. Use `RemoteActorRef::ask` for higher-level messaging
/// that doesn't emit events.
///
/// # Arguments
///
/// * `actor_id` - The target actor's ID
/// * `actor_remote_id` - The target actor's remote type ID
/// * `message_remote_id` - The message's remote type ID
/// * `payload` - The serialized message payload
/// * `mailbox_timeout` - Optional timeout for mailbox delivery
/// * `reply_timeout` - Optional timeout for receiving a reply
/// * `immediate` - Whether to fail if the mailbox is full
///
/// # Returns
///
/// The request ID for tracking the ask progress.
#[allow(clippy::too_many_arguments)]
pub fn ask(
&mut self,
// Actor ID.
actor_id: ActorId,
// Actor remote ID.
actor_remote_id: Cow<'static, str>,
// Message remote ID.
message_remote_id: Cow<'static, str>,
// Payload.
payload: Vec<u8>,
// Mailbox timeout.
mailbox_timeout: Option<Duration>,
// Reply timeout.
reply_timeout: Option<Duration>,
// Fail if mailbox is full.
immediate: bool,
) -> RequestId {
self.ask_with_reply(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
None,
)
.unwrap()
}
/// Sends a tell message to a remote actor.
///
/// This is a low-level method that sends a one-way message and generates
/// events. Use `RemoteActorRef::tell` for higher-level messaging that
/// doesn't emit events.
///
/// # Arguments
///
/// * `actor_id` - The target actor's ID
/// * `actor_remote_id` - The target actor's remote type ID
/// * `message_remote_id` - The message's remote type ID
/// * `payload` - The serialized message payload
/// * `mailbox_timeout` - Optional timeout for mailbox delivery
/// * `immediate` - Whether to fail if the mailbox is full
///
/// # Returns
///
/// The request ID for tracking the tell progress.
pub fn tell(
&mut self,
// Actor ID.
actor_id: ActorId,
// Actor remote ID.
actor_remote_id: Cow<'static, str>,
// Message remote ID.
message_remote_id: Cow<'static, str>,
// Payload.
payload: Vec<u8>,
// Mailbox timeout.
mailbox_timeout: Option<Duration>,
// Fail if mailbox is full.
immediate: bool,
) -> RequestId {
self.tell_with_reply(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
None,
)
.unwrap()
}
/// Creates a link between two actors across the network.
///
/// This is a low-level method that establishes supervision relationships
/// and generates events. Use `ActorRef::link` for higher-level linking
/// that doesn't emit events.
///
/// # Arguments
///
/// * `actor_id` - The first actor's ID
/// * `actor_remote_id` - The first actor's remote type ID
/// * `sibling_id` - The second actor's ID to link with
/// * `sibling_remote_id` - The second actor's remote type ID
///
/// # Returns
///
/// The request ID for tracking the link progress.
pub fn link(
&mut self,
// Actor A ID.
actor_id: ActorId,
// Actor A remote ID.
actor_remote_id: Cow<'static, str>,
// Actor B ID.
sibling_id: ActorId,
// Actor B remote ID.
sibling_remote_id: Cow<'static, str>,
) -> RequestId {
self.link_with_reply(
actor_id,
actor_remote_id,
sibling_id,
sibling_remote_id,
None,
)
.unwrap()
}
/// Removes a link between two actors across the network.
///
/// This is a low-level method that removes supervision relationships
/// and generates events. Use `ActorRef::unlink` for higher-level unlinking
/// that doesn't emit events.
///
/// # Arguments
///
/// * `actor_id` - The first actor's ID
/// * `actor_remote_id` - The first actor's remote type ID
/// * `sibling_id` - The second actor's ID to unlink from
///
/// # Returns
///
/// The request ID for tracking the unlink progress.
pub fn unlink(
&mut self,
// Actor ID.
actor_id: ActorId,
// Actor remote ID.
actor_remote_id: Cow<'static, str>,
// Sibling ID.
sibling_id: ActorId,
) -> RequestId {
self.unlink_with_reply(actor_id, actor_remote_id, sibling_id, None)
.unwrap()
}
/// Signals that a linked actor has died to another actor.
///
/// This is a low-level method that notifies actors of link failures
/// and generates events. This is typically called automatically by
/// the actor system when links are broken.
///
/// # Arguments
///
/// * `dead_actor_id` - The ID of the actor that died
/// * `notified_actor_id` - The ID of the actor to notify
/// * `notified_actor_remote_id` - The remote type ID of the actor to notify
/// * `stop_reason` - The reason the actor stopped
///
/// # Returns
///
/// The request ID for tracking the signal progress.
pub fn signal_link_died(
&mut self,
// The actor which died.
dead_actor_id: ActorId,
// The actor to notify.
notified_actor_id: ActorId,
// Actor remote iD
notified_actor_remote_id: Cow<'static, str>,
// The reason the actor died.
stop_reason: ActorStopReason,
) -> RequestId {
self.signal_link_died_with_reply(
dead_actor_id,
notified_actor_id,
notified_actor_remote_id,
stop_reason,
None,
)
.unwrap()
}
#[allow(clippy::too_many_arguments)]
pub(super) fn ask_with_reply(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
reply_timeout: Option<Duration>,
immediate: bool,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = actor_id.peer_id().expect("swarm should be bootstrapped");
self.request_with_reply(
&peer_id,
reply,
(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
),
|(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
)| {
ask(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
)
.map(|r| {
SwarmResponse::Ask(r.map_err(|e| WireRemoteSendError::from_bytes_error(&e)))
})
},
move |(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
reply_timeout,
immediate,
)| SwarmRequest::Ask {
actor_id: WireActorId::from_runtime(&actor_id),
actor_remote_id: actor_remote_id.to_string(),
message_remote_id: message_remote_id.to_string(),
payload,
mailbox_timeout,
reply_timeout,
immediate,
},
)
}
#[allow(clippy::too_many_arguments)]
pub(super) fn tell_with_reply(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
message_remote_id: Cow<'static, str>,
payload: Vec<u8>,
mailbox_timeout: Option<Duration>,
immediate: bool,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = actor_id.peer_id().expect("swarm should be bootstrapped");
self.request_with_reply(
&peer_id,
reply,
(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
),
|(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
)| {
tell(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
)
.map(|r| {
SwarmResponse::Tell(r.map_err(|e| WireRemoteSendError::from_infallible(&e)))
})
},
move |(
actor_id,
actor_remote_id,
message_remote_id,
payload,
mailbox_timeout,
immediate,
)| SwarmRequest::Tell {
actor_id: WireActorId::from_runtime(&actor_id),
actor_remote_id: actor_remote_id.to_string(),
message_remote_id: message_remote_id.to_string(),
payload,
mailbox_timeout,
immediate,
},
)
}
pub(super) fn link_with_reply(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
sibling_remote_id: Cow<'static, str>,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = actor_id.peer_id().expect("swarm should be bootstrapped");
self.request_with_reply(
&peer_id,
reply,
(actor_id, actor_remote_id, sibling_id, sibling_remote_id),
|(actor_id, actor_remote_id, sibling_id, sibling_remote_id)| {
link(actor_id, actor_remote_id, sibling_id, sibling_remote_id).map(|r| {
SwarmResponse::Link(r.map_err(|e| WireRemoteSendError::from_infallible(&e)))
})
},
move |(actor_id, actor_remote_id, sibling_id, sibling_remote_id)| SwarmRequest::Link {
actor_id: WireActorId::from_runtime(&actor_id),
actor_remote_id: actor_remote_id.to_string(),
sibling_id: WireActorId::from_runtime(&sibling_id),
sibling_remote_id: sibling_remote_id.to_string(),
},
)
}
pub(super) fn unlink_with_reply(
&mut self,
actor_id: ActorId,
actor_remote_id: Cow<'static, str>,
sibling_id: ActorId,
reply: Option<oneshot::Sender<SwarmResponse>>,
) -> Option<RequestId> {
let peer_id = actor_id.peer_id().expect("swarm should be bootstrapped");
self.request_with_reply(
&peer_id,
reply,
(actor_id, actor_remote_id, sibling_id),
|(actor_id, actor_remote_id, sibling_id)| {
unlink(actor_id, actor_remote_id, sibling_id).map(|r| {
SwarmResponse::Unlink(r.map_err(|e| WireRemoteSendError::from_infallible(&e)))
})
},
move |(actor_id, actor_remote_id, sibling_id)| SwarmRequest::Unlink {
actor_id: WireActorId::from_runtime(&actor_id),
actor_remote_id: actor_remote_id.to_string(),
sibling_id: WireActorId::from_runtime(&sibling_id),
},
)
}
}