actix_broker/
subscribe.rs

1//! messages.
2use actix::dev::ToEnvelope;
3use actix::prelude::*;
4
5use std::any::TypeId;
6
7use crate::broker::{ArbiterBroker, RegisteredBroker, SystemBroker};
8use crate::msgs::*;
9
10/// The `BrokerSubscribe` trait has functions to register an actor's interest in different
11/// messages.
12pub trait BrokerSubscribe
13where
14    Self: Actor,
15    <Self as Actor>::Context: AsyncContext<Self>,
16{
17    /// Asynchronously subscribe to a message.
18    fn subscribe_async<T: RegisteredBroker, M: BrokerMsg>(&self, ctx: &mut Self::Context)
19    where
20        Self: Handler<M>,
21        <Self as Actor>::Context: ToEnvelope<Self, M>,
22    {
23        let broker = T::get_broker();
24        let recipient = ctx.address().recipient::<M>();
25        broker.do_send(SubscribeAsync(recipient, TypeId::of::<Self>()));
26    }
27
28    /// Synchronously subscribe to a message.
29    /// This actor will do nothing else until its interest is registered.
30    /// If messages of that type have been sent to the broker previously, a copy of the latest
31    /// message is sent to the calling actor after it has subscribed.
32    fn subscribe_sync<T: RegisteredBroker, M: BrokerMsg>(&self, ctx: &mut Self::Context)
33    where
34        Self: Handler<M>,
35        <Self as Actor>::Context: ToEnvelope<Self, M>,
36    {
37        let broker = T::get_broker();
38        let recipient = ctx.address().recipient::<M>();
39
40        broker
41            .send(SubscribeSync(recipient, TypeId::of::<Self>()))
42            .into_actor(self)
43            .map(move |m, _, ctx| {
44                if let Ok(Some(msg)) = m {
45                    ctx.notify(msg);
46                }
47            })
48            .wait(ctx);
49    }
50
51    /// Helper to asynchronously subscribe to a system broker
52    /// This is the equivalent of `self.subscribe_async::<SystemBroker, M>(ctx);`
53    fn subscribe_system_async<M: BrokerMsg>(&self, ctx: &mut Self::Context)
54    where
55        Self: Handler<M>,
56        <Self as Actor>::Context: ToEnvelope<Self, M>,
57    {
58        self.subscribe_async::<SystemBroker, M>(ctx);
59    }
60
61    /// Helper to synchronously subscribe to a system broker
62    /// This is the equivalent of `self.subscribe_sync::<SystemBroker, M>(ctx);
63    fn subscribe_system_sync<M: BrokerMsg>(&self, ctx: &mut Self::Context)
64    where
65        Self: Handler<M>,
66        <Self as Actor>::Context: ToEnvelope<Self, M>,
67    {
68        self.subscribe_sync::<SystemBroker, M>(ctx);
69    }
70
71    /// Helper to asynchronously subscribe to an arbiter-specific broker
72    /// This is the equivalent of `self.subscribe_async::<ArbiterBroker, M>(ctx);`
73    fn subscribe_arbiter_async<M: BrokerMsg>(&self, ctx: &mut Self::Context)
74    where
75        Self: Handler<M>,
76        <Self as Actor>::Context: ToEnvelope<Self, M>,
77    {
78        self.subscribe_async::<ArbiterBroker, M>(ctx);
79    }
80
81    /// Helper to synchronously subscribe to an arbiter-specific broker
82    /// This is the equivalent of `self.subscribe_sync::<ArbiterBroker, M>(ctx);
83    fn subscribe_arbiter_sync<M: BrokerMsg>(&self, ctx: &mut Self::Context)
84    where
85        Self: Handler<M>,
86        <Self as Actor>::Context: ToEnvelope<Self, M>,
87    {
88        self.subscribe_sync::<ArbiterBroker, M>(ctx);
89    }
90}
91
92impl<A> BrokerSubscribe for A
93where
94    A: Actor,
95    <A as Actor>::Context: AsyncContext<A>,
96{
97}