acton_core/traits/subscribable.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::TypeId;
18use std::future::Future;
19
20use async_trait::async_trait;
21use tracing::*;
22
23use crate::message::{SubscribeBroker, UnsubscribeBroker};
24use crate::traits::{ActonMessage, AgentHandleInterface};
25use crate::traits::subscriber::Subscriber;
26
27/// Enables an entity (typically an agent handle) to manage its subscriptions to message types via the system broker.
28///
29/// This trait provides methods to asynchronously subscribe and unsubscribe from specific
30/// message types ([`ActonMessage`]). Implementors, usually [`AgentHandle`](crate::common::AgentHandle),
31/// interact with the central [`AgentBroker`](crate::common::AgentBroker) to register or
32/// deregister interest in receiving broadcast messages.
33#[async_trait]
34pub trait Subscribable: Send + Sync + 'static { // Added Send + Sync + 'static bounds
35 /// Asynchronously subscribes the agent associated with this handle to messages of type `M`.
36 ///
37 /// After subscribing, the agent will receive copies of messages of type `M` that are
38 /// broadcast via the [`Broker`](super::Broker) trait.
39 ///
40 /// # Type Parameters
41 ///
42 /// * `M`: The concrete message type to subscribe to. Must implement [`ActonMessage`]
43 /// and be `Send + Sync + 'static`.
44 ///
45 /// # Returns
46 ///
47 /// A `Future` that completes once the subscription request has been sent to the broker.
48 /// Completion does not guarantee the subscription is immediately active.
49 ///
50 /// # Requirements
51 ///
52 /// The implementing type `Self` must also implement [`AgentHandleInterface`] and [`Subscriber`].
53 fn subscribe<M: ActonMessage + Send + Sync + 'static>(
54 &self,
55 ) -> impl Future<Output = ()> + Send + Sync + '_
56 where
57 // These bounds are requirements for *calling* the method, enforced by the blanket impl.
58 Self: AgentHandleInterface + Subscriber;
59
60 /// Sends a request to unsubscribe the agent associated with this handle from messages of type `M`.
61 ///
62 /// After unsubscribing, the agent will no longer receive broadcast messages of type `M`.
63 ///
64 /// Note: The default blanket implementation currently spawns a Tokio task to send the
65 /// unsubscribe request asynchronously. The `UnsubscribeBroker` message itself might be incomplete
66 /// in the current implementation (commented-out fields).
67 ///
68 /// # Type Parameters
69 ///
70 /// * `M`: The concrete message type to unsubscribe from. Must implement [`ActonMessage`].
71 ///
72 /// # Requirements
73 ///
74 /// The implementing type `Self` must also implement [`AgentHandleInterface`] and [`Subscriber`].
75 fn unsubscribe<M: ActonMessage>(&self)
76 where
77 // These bounds are requirements for *calling* the method, enforced by the blanket impl.
78 Self: AgentHandleInterface + Subscriber;
79}
80
81/// Blanket implementation of `Subscribable` for types implementing necessary traits.
82///
83/// This implementation provides the `subscribe` and `unsubscribe` methods for any type `T`
84/// that implements [`AgentHandleInterface`] and [`Subscriber`]. It works by sending the
85/// appropriate internal messages ([`SubscribeBroker`] or [`UnsubscribeBroker`]) to the
86/// broker obtained via the [`Subscriber::get_broker`] method.
87#[async_trait]
88impl<T> Subscribable for T
89where
90 // Corrected bounds based on usage within the methods.
91 T: AgentHandleInterface + Subscriber + Send + Sync + 'static,
92{
93 /// Sends a [`SubscribeBroker`] message to the broker.
94 #[instrument(skip(self), fields(message_type = std::any::type_name::<M>(), subscriber = %self.id()))]
95 fn subscribe<M: ActonMessage + Send + Sync + 'static>(
96 &self,
97 ) -> impl Future<Output = ()> + Send + Sync + '_
98 // No need for where clause here as it's enforced by the impl block's bounds
99 {
100 let subscriber_id = self.id();
101 let message_type_id = TypeId::of::<M>();
102 let message_type_name = std::any::type_name::<M>().to_string();
103 // Create the subscription message with the agent's handle as context.
104 let subscription = SubscribeBroker {
105 subscriber_id: subscriber_id.clone(), // Clone Ern for the message
106 message_type_id,
107 subscriber_context: self.clone_ref(), // Clone the handle
108 };
109 let broker_option = self.get_broker(); // Get Option<BrokerRef>
110
111 async move {
112 trace!( type_id=?message_type_id, subscriber = %subscriber_id, "Sending subscription request");
113 if let Some(broker_handle) = broker_option {
114 trace!(broker = %broker_handle.id(), "Sending SubscribeBroker message");
115 // Send the subscription message to the broker.
116 broker_handle.send(subscription).await;
117 } else {
118 // Log an error if no broker is available.
119 error!(subscriber = %subscriber_id, message_type = %message_type_name, "Cannot subscribe: No broker found.");
120 }
121 }
122 }
123
124 /// Spawns a task to send an [`UnsubscribeBroker`] message to the broker.
125 /// Note: The current `UnsubscribeBroker` message structure might be incomplete.
126 #[instrument(skip(self), fields(message_type = std::any::type_name::<M>(), subscriber = %self.id()))]
127 fn unsubscribe<M: ActonMessage>(&self)
128 // No need for where clause here as it's enforced by the impl block's bounds
129 {
130 let type_id = TypeId::of::<M>();
131 let type_name = std::any::type_name::<M>();
132 let subscriber_id = self.id(); // Get subscriber ID
133 let broker_option = self.get_broker(); // Get Option<BrokerRef>
134
135 trace!(type_id = ?type_id, subscriber = %subscriber_id, "Initiating unsubscribe request for type {}", type_name);
136
137 if let Some(broker_handle) = broker_option {
138 // Create the unsubscribe message (currently seems incomplete based on commented fields).
139 let unsubscription = UnsubscribeBroker {
140 // ern: subscriber_id, // Assuming these fields will be added later
141 // message_type_id: type_id,
142 // subscriber_ref: self.clone_ref(),
143 };
144 // Spawn a task to send the message asynchronously.
145 tokio::spawn(async move {
146 trace!(broker = %broker_handle.id(), type_id = ?type_id, "Sending UnsubscribeBroker message");
147 broker_handle.send(unsubscription).await;
148 });
149 } else {
150 error!(subscriber = %subscriber_id, message_type = %type_name, "Cannot unsubscribe: No broker found.");
151 }
152 }
153}