actix_broker/issue.rs
1use std::any::TypeId;
2
3use actix::prelude::*;
4
5use crate::{
6 broker::{ArbiterBroker, RegisteredBroker, SystemBroker},
7 msgs::*,
8};
9
10/// The `BrokerIssue` provides functions to issue messages to subscribers.
11///
12/// This will not deliver the message to the actor that sent it.
13pub trait BrokerIssue
14where
15 Self: Actor,
16 <Self as Actor>::Context: AsyncContext<Self>,
17{
18 /// Asynchronously issue a message.
19 /// This bypasses the mailbox capacity, and will always queue the message.
20 /// If the mailbox is closed, the message is silently dropped and the subscriber
21 /// is detached from the broker.
22 fn issue_async<T: RegisteredBroker, M: BrokerMsg>(&self, msg: M) {
23 let broker = T::get_broker();
24 broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
25 }
26
27 /// Synchronously issue a message.
28 /// This also causes the broker to synchronously forward those messages on to any subscribers
29 /// before handling any other messages.
30 fn issue_sync<T: RegisteredBroker, M: BrokerMsg>(&self, msg: M, ctx: &mut Self::Context) {
31 let broker = T::get_broker();
32 broker
33 .send(IssueSync(msg, TypeId::of::<Self>()))
34 .into_actor(self)
35 .map(|_, _, _| ())
36 .wait(ctx);
37 }
38
39 /// Helper to asynchronously issue to an system broker
40 /// This is the equivalent of `self.issue_async::<SystemBroker, M>(ctx);`
41 fn issue_system_async<M: BrokerMsg>(&self, msg: M) {
42 self.issue_async::<SystemBroker, M>(msg);
43 }
44
45 /// Helper to synchronously issue to an system broker
46 /// This is the equivalent of `self.issue_sync::<SystemBroker, M>(ctx);`
47 fn issue_system_sync<M: BrokerMsg>(&self, msg: M, ctx: &mut Self::Context) {
48 self.issue_sync::<SystemBroker, M>(msg, ctx);
49 }
50
51 /// Helper to asynchronously issue to an arbiter-specific broker
52 /// This is the equivalent of `self.issue_async::<ArbiterBroker, M>(ctx);`
53 fn issue_arbiter_async<M: BrokerMsg>(&self, msg: M) {
54 self.issue_async::<ArbiterBroker, M>(msg);
55 }
56
57 /// Helper to synchronously issue to an arbiter-specific broker
58 /// This is the equivalent of `self.issue_sync::<ArbiterBroker, M>(ctx);`
59 fn issue_arbiter_sync<M: BrokerMsg>(&self, msg: M, ctx: &mut Self::Context) {
60 self.issue_sync::<ArbiterBroker, M>(msg, ctx);
61 }
62}
63
64impl<A> BrokerIssue for A
65where
66 A: Actor,
67 <A as Actor>::Context: AsyncContext<A>,
68{
69}