acton_core/traits/
subscribable.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::future::Future;
19
20use async_trait::async_trait;
21use tracing::*;
22
23use crate::message::{SubscribeBroker, UnsubscribeBroker};
24use crate::traits::{ActonMessage, AgentHandleInterface};
25use crate::traits::subscriber::Subscriber;
26
27/// Enables an entity (typically an agent handle) to manage its subscriptions to message types via the system broker.
28///
29/// This trait provides methods to asynchronously subscribe and unsubscribe from specific
30/// message types ([`ActonMessage`]). Implementors, usually [`AgentHandle`](crate::common::AgentHandle),
31/// interact with the central [`AgentBroker`](crate::common::AgentBroker) to register or
32/// deregister interest in receiving broadcast messages.
33#[async_trait]
34pub trait Subscribable: Send + Sync + 'static { // Added Send + Sync + 'static bounds
35    /// Asynchronously subscribes the agent associated with this handle to messages of type `M`.
36    ///
37    /// After subscribing, the agent will receive copies of messages of type `M` that are
38    /// broadcast via the [`Broker`](super::Broker) trait.
39    ///
40    /// # Type Parameters
41    ///
42    /// * `M`: The concrete message type to subscribe to. Must implement [`ActonMessage`]
43    ///   and be `Send + Sync + 'static`.
44    ///
45    /// # Returns
46    ///
47    /// A `Future` that completes once the subscription request has been sent to the broker.
48    /// Completion does not guarantee the subscription is immediately active.
49    ///
50    /// # Requirements
51    ///
52    /// The implementing type `Self` must also implement [`AgentHandleInterface`] and [`Subscriber`].
53    fn subscribe<M: ActonMessage + Send + Sync + 'static>(
54        &self,
55    ) -> impl Future<Output = ()> + Send + Sync + '_
56    where
57        // These bounds are requirements for *calling* the method, enforced by the blanket impl.
58        Self: AgentHandleInterface + Subscriber;
59
60    /// Sends a request to unsubscribe the agent associated with this handle from messages of type `M`.
61    ///
62    /// After unsubscribing, the agent will no longer receive broadcast messages of type `M`.
63    ///
64    /// Note: The default blanket implementation currently spawns a Tokio task to send the
65    /// unsubscribe request asynchronously. The `UnsubscribeBroker` message itself might be incomplete
66    /// in the current implementation (commented-out fields).
67    ///
68    /// # Type Parameters
69    ///
70    /// * `M`: The concrete message type to unsubscribe from. Must implement [`ActonMessage`].
71    ///
72    /// # Requirements
73    ///
74    /// The implementing type `Self` must also implement [`AgentHandleInterface`] and [`Subscriber`].
75    fn unsubscribe<M: ActonMessage>(&self)
76    where
77        // These bounds are requirements for *calling* the method, enforced by the blanket impl.
78        Self: AgentHandleInterface + Subscriber;
79}
80
81/// Blanket implementation of `Subscribable` for types implementing necessary traits.
82///
83/// This implementation provides the `subscribe` and `unsubscribe` methods for any type `T`
84/// that implements [`AgentHandleInterface`] and [`Subscriber`]. It works by sending the
85/// appropriate internal messages ([`SubscribeBroker`] or [`UnsubscribeBroker`]) to the
86/// broker obtained via the [`Subscriber::get_broker`] method.
87#[async_trait]
88impl<T> Subscribable for T
89where
90    // Corrected bounds based on usage within the methods.
91    T: AgentHandleInterface + Subscriber + Send + Sync + 'static,
92{
93    /// Sends a [`SubscribeBroker`] message to the broker.
94    #[instrument(skip(self), fields(message_type = std::any::type_name::<M>(), subscriber = %self.id()))]
95    fn subscribe<M: ActonMessage + Send + Sync + 'static>(
96        &self,
97    ) -> impl Future<Output = ()> + Send + Sync + '_
98    // No need for where clause here as it's enforced by the impl block's bounds
99    {
100        let subscriber_id = self.id();
101        let message_type_id = TypeId::of::<M>();
102        let message_type_name = std::any::type_name::<M>().to_string();
103        // Create the subscription message with the agent's handle as context.
104        let subscription = SubscribeBroker {
105            subscriber_id: subscriber_id.clone(), // Clone Ern for the message
106            message_type_id,
107            subscriber_context: self.clone_ref(), // Clone the handle
108        };
109        let broker_option = self.get_broker(); // Get Option<BrokerRef>
110
111        async move {
112            trace!( type_id=?message_type_id, subscriber = %subscriber_id, "Sending subscription request");
113            if let Some(broker_handle) = broker_option {
114                trace!(broker = %broker_handle.id(), "Sending SubscribeBroker message");
115                // Send the subscription message to the broker.
116                broker_handle.send(subscription).await;
117            } else {
118                // Log an error if no broker is available.
119                error!(subscriber = %subscriber_id, message_type = %message_type_name, "Cannot subscribe: No broker found.");
120            }
121        }
122    }
123
124    /// Spawns a task to send an [`UnsubscribeBroker`] message to the broker.
125    /// Note: The current `UnsubscribeBroker` message structure might be incomplete.
126    #[instrument(skip(self), fields(message_type = std::any::type_name::<M>(), subscriber = %self.id()))]
127    fn unsubscribe<M: ActonMessage>(&self)
128    // No need for where clause here as it's enforced by the impl block's bounds
129    {
130        let type_id = TypeId::of::<M>();
131        let type_name = std::any::type_name::<M>();
132        let subscriber_id = self.id(); // Get subscriber ID
133        let broker_option = self.get_broker(); // Get Option<BrokerRef>
134
135        trace!(type_id = ?type_id, subscriber = %subscriber_id, "Initiating unsubscribe request for type {}", type_name);
136
137        if let Some(broker_handle) = broker_option {
138            // Create the unsubscribe message (currently seems incomplete based on commented fields).
139            let unsubscription = UnsubscribeBroker {
140                // ern: subscriber_id, // Assuming these fields will be added later
141                // message_type_id: type_id,
142                // subscriber_ref: self.clone_ref(),
143            };
144            // Spawn a task to send the message asynchronously.
145            tokio::spawn(async move {
146                trace!(broker = %broker_handle.id(), type_id = ?type_id, "Sending UnsubscribeBroker message");
147                broker_handle.send(unsubscription).await;
148            });
149        } else {
150            error!(subscriber = %subscriber_id, message_type = %type_name, "Cannot unsubscribe: No broker found.");
151        }
152    }
153}