acton_core/traits/
subscribable.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::future::Future;
19
20use async_trait::async_trait;
21use tracing::*;
22
23use crate::message::{SubscribeBroker, UnsubscribeBroker};
24use crate::traits::{ActonMessage, Actor};
25use crate::traits::subscriber::Subscriber;
26
27/// Trait for types that can subscribe to and unsubscribe from messages.
28#[async_trait]
29pub trait Subscribable {
30    /// Subscribes the implementing type to messages of type `T`.
31    ///
32    /// # Type Parameters
33    ///
34    /// * `T`: The type of message to subscribe to. Must implement `ActonMessage + Send + Sync + 'static`.
35    ///
36    /// # Returns
37    ///
38    /// A `Future` that resolves to `()` when the subscription is complete.
39    fn subscribe<T: ActonMessage + Send + Sync + 'static>(
40        &self,
41    ) -> impl Future<Output=()> + Send + Sync + '_
42    where
43        Self: Actor + Subscriber;
44
45    /// Unsubscribes the implementing type from messages of type `T`.
46    ///
47    /// # Type Parameters
48    ///
49    /// * `T`: The type of message to unsubscribe from. Must implement `ActonMessage`.
50    fn unsubscribe<T: ActonMessage>(&self)
51    where
52        Self: Actor + Subscriber + Send + Sync + 'static;
53}
54
55/// Implementation of `Subscribable` for any type that implements `ActonMessage + Send + Sync + 'static`.
56#[async_trait]
57impl<T> Subscribable for T
58where
59    T: ActonMessage + Send + Sync + 'static,
60{
61    fn subscribe<M: ActonMessage + Send + Sync + 'static>(
62        &self,
63    ) -> impl Future<Output=()> + Send + Sync + '_
64    where
65        Self: Actor + Subscriber + 'static,
66    {
67        let subscriber_id = self.id();
68        let message_type_id = TypeId::of::<M>();
69        let message_type_name = std::any::type_name::<M>().to_string();
70        let subscription = SubscribeBroker {
71            subscriber_id,
72            message_type_id,
73            subscriber_context: self.clone_ref(),
74        };
75        let broker = self.get_broker();
76        let ern = self.id().clone();
77
78        async move {
79            trace!( type_id=?message_type_id, subscriber_ern = ern.to_string(), "Subscribing to type_name {}", message_type_name);
80            if let Some(broadcast_broker) = broker {
81                let broker_key = broadcast_broker.name();
82                trace!(
83                    "Subscribing to type_name {} with {}",
84                    message_type_name,
85                    broker_key
86                );
87                broadcast_broker.send(subscription).await;
88            } else {
89                error!( subscriber_ern = ern.to_string(), "No broker found for type_name {}", message_type_name);
90            }
91        }
92    }
93    fn unsubscribe<M: ActonMessage>(&self)
94    where
95        Self: Actor + Subscriber,
96    {
97        // let subscriber_id = self.ern();
98        let subscription = UnsubscribeBroker {
99            // ern: subscriber_id,
100            // message_type_id: TypeId::of::<M>(),
101            // subscriber_ref: self.clone_ref(),
102        };
103        let broker = self.get_broker();
104        if let Some(broker) = broker {
105            let broker = broker.clone();
106            tokio::spawn(async move {
107                broker.send(subscription).await;
108            });
109        }
110        trace!(
111            type_id = ?TypeId::of::<M>(),
112            repository_actor = self.id().to_string(),
113            "Unsubscribed to {}",
114            std::any::type_name::<M>()
115        );
116    }
117}