autoagents_core/actor/
subscriber.rs1use crate::actor::{AnyActor, CloneableMessage, SharedMessage};
2use ractor::ActorRef;
3use std::any::Any;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7pub struct TypedSubscriber<M: CloneableMessage> {
8 actors: Vec<Box<dyn AnyActor>>,
9 _marker: PhantomData<M>,
10}
11
12impl<M: CloneableMessage + 'static> Default for TypedSubscriber<M> {
13 fn default() -> Self {
14 Self {
15 actors: Vec::new(),
16 _marker: PhantomData,
17 }
18 }
19}
20
21impl<M: CloneableMessage + 'static> TypedSubscriber<M> {
22 pub fn new() -> Self {
23 Self::default()
24 }
25
26 pub fn add(&mut self, actor: ActorRef<M>) {
27 self.actors.push(Box::new(actor) as Box<dyn AnyActor>);
28 }
29
30 pub async fn publish(&self, message: M) {
31 let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(message);
32 for actor in &self.actors {
33 let _ = actor.send_any(arc_msg.clone()).await;
34 }
35 }
36}
37
38pub struct SharedSubscriber<M: Send + Sync + 'static> {
40 actors: Vec<Box<dyn AnyActor>>,
41 _marker: PhantomData<M>,
42}
43
44impl<M: Send + Sync + 'static> Default for SharedSubscriber<M> {
45 fn default() -> Self {
46 Self {
47 actors: Vec::new(),
48 _marker: PhantomData,
49 }
50 }
51}
52
53impl<M: Send + Sync + 'static> SharedSubscriber<M> {
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn add(&mut self, actor: ActorRef<SharedMessage<M>>) {
59 self.actors.push(Box::new(actor) as Box<dyn AnyActor>);
60 }
61
62 pub async fn publish(&self, message: SharedMessage<M>) {
63 let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(message);
64 for actor in &self.actors {
65 let _ = actor.send_any(arc_msg.clone()).await;
66 }
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73 use crate::actor::{ActorMessage, CloneableMessage, SharedMessage};
74 use async_trait::async_trait;
75 use ractor::{Actor, ActorProcessingErr, ActorRef};
76 use tokio::sync::Mutex;
77 use tokio::time::{Duration, sleep};
78
79 #[derive(Clone, Debug)]
80 struct TestMessage {
81 content: String,
82 }
83
84 impl ActorMessage for TestMessage {}
85 impl CloneableMessage for TestMessage {}
86
87 struct CollectActor {
88 received: Arc<Mutex<Vec<String>>>,
89 }
90
91 #[async_trait]
92 impl Actor for CollectActor {
93 type Msg = TestMessage;
94 type State = ();
95 type Arguments = Arc<Mutex<Vec<String>>>;
96
97 async fn pre_start(
98 &self,
99 _myself: ActorRef<Self::Msg>,
100 _args: Self::Arguments,
101 ) -> Result<Self::State, ActorProcessingErr> {
102 Ok(())
103 }
104
105 async fn handle(
106 &self,
107 _myself: ActorRef<Self::Msg>,
108 message: Self::Msg,
109 _state: &mut Self::State,
110 ) -> Result<(), ActorProcessingErr> {
111 let mut received = self.received.lock().await;
112 received.push(message.content);
113 Ok(())
114 }
115 }
116
117 #[tokio::test]
118 async fn test_typed_subscriber_publish() {
119 let received = Arc::new(Mutex::new(Vec::new()));
120 let (actor_ref, _handle) = Actor::spawn(
121 None,
122 CollectActor {
123 received: received.clone(),
124 },
125 received.clone(),
126 )
127 .await
128 .unwrap();
129
130 let mut subscriber = TypedSubscriber::<TestMessage>::new();
131 subscriber.add(actor_ref);
132 subscriber
133 .publish(TestMessage {
134 content: "hello".to_string(),
135 })
136 .await;
137
138 sleep(Duration::from_millis(10)).await;
139 let items = received.lock().await.clone();
140 assert_eq!(items, vec!["hello"]);
141 }
142
143 #[derive(Debug)]
144 struct SharedPayload {
145 value: String,
146 }
147
148 impl ActorMessage for SharedPayload {}
149
150 struct SharedActor {
151 received: Arc<Mutex<Vec<String>>>,
152 }
153
154 #[async_trait]
155 impl Actor for SharedActor {
156 type Msg = SharedMessage<SharedPayload>;
157 type State = ();
158 type Arguments = Arc<Mutex<Vec<String>>>;
159
160 async fn pre_start(
161 &self,
162 _myself: ActorRef<Self::Msg>,
163 _args: Self::Arguments,
164 ) -> Result<Self::State, ActorProcessingErr> {
165 Ok(())
166 }
167
168 async fn handle(
169 &self,
170 _myself: ActorRef<Self::Msg>,
171 message: Self::Msg,
172 _state: &mut Self::State,
173 ) -> Result<(), ActorProcessingErr> {
174 let mut received = self.received.lock().await;
175 received.push(message.inner().value.clone());
176 Ok(())
177 }
178 }
179
180 #[tokio::test]
181 async fn test_shared_subscriber_publish() {
182 let received = Arc::new(Mutex::new(Vec::new()));
183 let (actor_ref, _handle) = Actor::spawn(
184 None,
185 SharedActor {
186 received: received.clone(),
187 },
188 received.clone(),
189 )
190 .await
191 .unwrap();
192
193 let mut subscriber = SharedSubscriber::<SharedPayload>::new();
194 subscriber.add(actor_ref);
195 subscriber
196 .publish(SharedMessage::new(SharedPayload {
197 value: "shared".to_string(),
198 }))
199 .await;
200
201 sleep(Duration::from_millis(10)).await;
202 let items = received.lock().await.clone();
203 assert_eq!(items, vec!["shared"]);
204 }
205}