acton_core/traits/
subscribable.rs1use std::any::TypeId;
18use std::future::Future;
19
20use async_trait::async_trait;
21use tracing::*;
22
23use crate::message::{SubscribeBroker, UnsubscribeBroker};
24use crate::traits::{ActonMessage, Actor};
25use crate::traits::subscriber::Subscriber;
26
27#[async_trait]
29pub trait Subscribable {
30 fn subscribe<T: ActonMessage + Send + Sync + 'static>(
40 &self,
41 ) -> impl Future<Output=()> + Send + Sync + '_
42 where
43 Self: Actor + Subscriber;
44
45 fn unsubscribe<T: ActonMessage>(&self)
51 where
52 Self: Actor + Subscriber + Send + Sync + 'static;
53}
54
55#[async_trait]
57impl<T> Subscribable for T
58where
59 T: ActonMessage + Send + Sync + 'static,
60{
61 fn subscribe<M: ActonMessage + Send + Sync + 'static>(
62 &self,
63 ) -> impl Future<Output=()> + Send + Sync + '_
64 where
65 Self: Actor + Subscriber + 'static,
66 {
67 let subscriber_id = self.id();
68 let message_type_id = TypeId::of::<M>();
69 let message_type_name = std::any::type_name::<M>().to_string();
70 let subscription = SubscribeBroker {
71 subscriber_id,
72 message_type_id,
73 subscriber_context: self.clone_ref(),
74 };
75 let broker = self.get_broker();
76 let ern = self.id().clone();
77
78 async move {
79 trace!( type_id=?message_type_id, subscriber_ern = ern.to_string(), "Subscribing to type_name {}", message_type_name);
80 if let Some(broadcast_broker) = broker {
81 let broker_key = broadcast_broker.name();
82 trace!(
83 "Subscribing to type_name {} with {}",
84 message_type_name,
85 broker_key
86 );
87 broadcast_broker.send(subscription).await;
88 } else {
89 error!( subscriber_ern = ern.to_string(), "No broker found for type_name {}", message_type_name);
90 }
91 }
92 }
93 fn unsubscribe<M: ActonMessage>(&self)
94 where
95 Self: Actor + Subscriber,
96 {
97 let subscription = UnsubscribeBroker {
99 };
103 let broker = self.get_broker();
104 if let Some(broker) = broker {
105 let broker = broker.clone();
106 tokio::spawn(async move {
107 broker.send(subscription).await;
108 });
109 }
110 trace!(
111 type_id = ?TypeId::of::<M>(),
112 repository_actor = self.id().to_string(),
113 "Unsubscribed to {}",
114 std::any::type_name::<M>()
115 );
116 }
117}