actix_broker/
subscribe.rs

1//! messages.
2use std::any::TypeId;
3
4use actix::{dev::ToEnvelope, prelude::*};
5
6use crate::{
7    broker::{ArbiterBroker, RegisteredBroker, SystemBroker},
8    msgs::*,
9};
10
11/// The `BrokerSubscribe` trait has functions to register an actor's interest in different
12/// messages.
13pub trait BrokerSubscribe
14where
15    Self: Actor,
16    <Self as Actor>::Context: AsyncContext<Self>,
17{
18    /// Asynchronously subscribe to a message.
19    fn subscribe_async<T: RegisteredBroker, M: BrokerMsg>(&self, ctx: &mut Self::Context)
20    where
21        Self: Handler<M>,
22        <Self as Actor>::Context: ToEnvelope<Self, M>,
23    {
24        let broker = T::get_broker();
25        let recipient = ctx.address().recipient::<M>();
26        broker.do_send(SubscribeAsync(recipient, TypeId::of::<Self>()));
27    }
28
29    /// Synchronously subscribe to a message.
30    /// This actor will do nothing else until its interest is registered.
31    /// If messages of that type have been sent to the broker previously, a copy of the latest
32    /// message is sent to the calling actor after it has subscribed.
33    fn subscribe_sync<T: RegisteredBroker, M: BrokerMsg>(&self, ctx: &mut Self::Context)
34    where
35        Self: Handler<M>,
36        <Self as Actor>::Context: ToEnvelope<Self, M>,
37    {
38        let broker = T::get_broker();
39        let recipient = ctx.address().recipient::<M>();
40
41        broker
42            .send(SubscribeSync(recipient, TypeId::of::<Self>()))
43            .into_actor(self)
44            .map(move |m, _, ctx| {
45                if let Ok(Some(msg)) = m {
46                    ctx.notify(msg);
47                }
48            })
49            .wait(ctx);
50    }
51
52    /// Helper to asynchronously subscribe to a system broker
53    /// This is the equivalent of `self.subscribe_async::<SystemBroker, M>(ctx);`
54    fn subscribe_system_async<M: BrokerMsg>(&self, ctx: &mut Self::Context)
55    where
56        Self: Handler<M>,
57        <Self as Actor>::Context: ToEnvelope<Self, M>,
58    {
59        self.subscribe_async::<SystemBroker, M>(ctx);
60    }
61
62    /// Helper to synchronously subscribe to a system broker
63    /// This is the equivalent of `self.subscribe_sync::<SystemBroker, M>(ctx);
64    fn subscribe_system_sync<M: BrokerMsg>(&self, ctx: &mut Self::Context)
65    where
66        Self: Handler<M>,
67        <Self as Actor>::Context: ToEnvelope<Self, M>,
68    {
69        self.subscribe_sync::<SystemBroker, M>(ctx);
70    }
71
72    /// Helper to asynchronously subscribe to an arbiter-specific broker
73    /// This is the equivalent of `self.subscribe_async::<ArbiterBroker, M>(ctx);`
74    fn subscribe_arbiter_async<M: BrokerMsg>(&self, ctx: &mut Self::Context)
75    where
76        Self: Handler<M>,
77        <Self as Actor>::Context: ToEnvelope<Self, M>,
78    {
79        self.subscribe_async::<ArbiterBroker, M>(ctx);
80    }
81
82    /// Helper to synchronously subscribe to an arbiter-specific broker
83    /// This is the equivalent of `self.subscribe_sync::<ArbiterBroker, M>(ctx);
84    fn subscribe_arbiter_sync<M: BrokerMsg>(&self, ctx: &mut Self::Context)
85    where
86        Self: Handler<M>,
87        <Self as Actor>::Context: ToEnvelope<Self, M>,
88    {
89        self.subscribe_sync::<ArbiterBroker, M>(ctx);
90    }
91}
92
93impl<A> BrokerSubscribe for A
94where
95    A: Actor,
96    <A as Actor>::Context: AsyncContext<A>,
97{
98}