actix_broker/
issue.rs

1use std::any::TypeId;
2
3use actix::prelude::*;
4
5use crate::{
6    broker::{ArbiterBroker, RegisteredBroker, SystemBroker},
7    msgs::*,
8};
9
10/// The `BrokerIssue` provides functions to issue messages to subscribers.
11///
12/// This will not deliver the message to the actor that sent it.
13pub trait BrokerIssue
14where
15    Self: Actor,
16    <Self as Actor>::Context: AsyncContext<Self>,
17{
18    /// Asynchronously issue a message.
19    /// This bypasses the mailbox capacity, and  will always queue the message.
20    /// If the mailbox is closed, the message is silently dropped and the subscriber
21    /// is detached from the broker.
22    fn issue_async<T: RegisteredBroker, M: BrokerMsg>(&self, msg: M) {
23        let broker = T::get_broker();
24        broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
25    }
26
27    /// Synchronously issue a message.
28    /// This also causes the broker to synchronously forward those messages on to any subscribers
29    /// before handling any other messages.
30    fn issue_sync<T: RegisteredBroker, M: BrokerMsg>(&self, msg: M, ctx: &mut Self::Context) {
31        let broker = T::get_broker();
32        broker
33            .send(IssueSync(msg, TypeId::of::<Self>()))
34            .into_actor(self)
35            .map(|_, _, _| ())
36            .wait(ctx);
37    }
38
39    /// Helper to asynchronously issue to an system broker
40    /// This is the equivalent of `self.issue_async::<SystemBroker, M>(ctx);`
41    fn issue_system_async<M: BrokerMsg>(&self, msg: M) {
42        self.issue_async::<SystemBroker, M>(msg);
43    }
44
45    /// Helper to synchronously issue to an system broker
46    /// This is the equivalent of `self.issue_sync::<SystemBroker, M>(ctx);`
47    fn issue_system_sync<M: BrokerMsg>(&self, msg: M, ctx: &mut Self::Context) {
48        self.issue_sync::<SystemBroker, M>(msg, ctx);
49    }
50
51    /// Helper to asynchronously issue to an arbiter-specific broker
52    /// This is the equivalent of `self.issue_async::<ArbiterBroker, M>(ctx);`
53    fn issue_arbiter_async<M: BrokerMsg>(&self, msg: M) {
54        self.issue_async::<ArbiterBroker, M>(msg);
55    }
56
57    /// Helper to synchronously issue to an arbiter-specific broker
58    /// This is the equivalent of `self.issue_sync::<ArbiterBroker, M>(ctx);`
59    fn issue_arbiter_sync<M: BrokerMsg>(&self, msg: M, ctx: &mut Self::Context) {
60        self.issue_sync::<ArbiterBroker, M>(msg, ctx);
61    }
62}
63
64impl<A> BrokerIssue for A
65where
66    A: Actor,
67    <A as Actor>::Context: AsyncContext<A>,
68{
69}