Skip to main content

autoagents_core/actor/
subscriber.rs

1use crate::actor::{AnyActor, CloneableMessage, SharedMessage};
2use ractor::ActorRef;
3use std::any::Any;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7pub struct TypedSubscriber<M: CloneableMessage> {
8    actors: Vec<Box<dyn AnyActor>>,
9    _marker: PhantomData<M>,
10}
11
12impl<M: CloneableMessage + 'static> Default for TypedSubscriber<M> {
13    fn default() -> Self {
14        Self {
15            actors: Vec::new(),
16            _marker: PhantomData,
17        }
18    }
19}
20
21impl<M: CloneableMessage + 'static> TypedSubscriber<M> {
22    pub fn new() -> Self {
23        Self::default()
24    }
25
26    pub fn add(&mut self, actor: ActorRef<M>) {
27        self.actors.push(Box::new(actor) as Box<dyn AnyActor>);
28    }
29
30    pub async fn publish(&self, message: M) {
31        let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(message);
32        for actor in &self.actors {
33            let _ = actor.send_any(arc_msg.clone()).await;
34        }
35    }
36}
37
38// Subscriber for shared messages
39pub struct SharedSubscriber<M: Send + Sync + 'static> {
40    actors: Vec<Box<dyn AnyActor>>,
41    _marker: PhantomData<M>,
42}
43
44impl<M: Send + Sync + 'static> Default for SharedSubscriber<M> {
45    fn default() -> Self {
46        Self {
47            actors: Vec::new(),
48            _marker: PhantomData,
49        }
50    }
51}
52
53impl<M: Send + Sync + 'static> SharedSubscriber<M> {
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    pub fn add(&mut self, actor: ActorRef<SharedMessage<M>>) {
59        self.actors.push(Box::new(actor) as Box<dyn AnyActor>);
60    }
61
62    pub async fn publish(&self, message: SharedMessage<M>) {
63        let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(message);
64        for actor in &self.actors {
65            let _ = actor.send_any(arc_msg.clone()).await;
66        }
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73    use crate::actor::{ActorMessage, CloneableMessage, SharedMessage};
74    use async_trait::async_trait;
75    use ractor::{Actor, ActorProcessingErr, ActorRef};
76    use tokio::sync::Mutex;
77    use tokio::time::{Duration, sleep};
78
79    #[derive(Clone, Debug)]
80    struct TestMessage {
81        content: String,
82    }
83
84    impl ActorMessage for TestMessage {}
85    impl CloneableMessage for TestMessage {}
86
87    struct CollectActor {
88        received: Arc<Mutex<Vec<String>>>,
89    }
90
91    #[async_trait]
92    impl Actor for CollectActor {
93        type Msg = TestMessage;
94        type State = ();
95        type Arguments = Arc<Mutex<Vec<String>>>;
96
97        async fn pre_start(
98            &self,
99            _myself: ActorRef<Self::Msg>,
100            _args: Self::Arguments,
101        ) -> Result<Self::State, ActorProcessingErr> {
102            Ok(())
103        }
104
105        async fn handle(
106            &self,
107            _myself: ActorRef<Self::Msg>,
108            message: Self::Msg,
109            _state: &mut Self::State,
110        ) -> Result<(), ActorProcessingErr> {
111            let mut received = self.received.lock().await;
112            received.push(message.content);
113            Ok(())
114        }
115    }
116
117    #[tokio::test]
118    async fn test_typed_subscriber_publish() {
119        let received = Arc::new(Mutex::new(Vec::new()));
120        let (actor_ref, _handle) = Actor::spawn(
121            None,
122            CollectActor {
123                received: received.clone(),
124            },
125            received.clone(),
126        )
127        .await
128        .unwrap();
129
130        let mut subscriber = TypedSubscriber::<TestMessage>::new();
131        subscriber.add(actor_ref);
132        subscriber
133            .publish(TestMessage {
134                content: "hello".to_string(),
135            })
136            .await;
137
138        sleep(Duration::from_millis(10)).await;
139        let items = received.lock().await.clone();
140        assert_eq!(items, vec!["hello"]);
141    }
142
143    #[derive(Debug)]
144    struct SharedPayload {
145        value: String,
146    }
147
148    impl ActorMessage for SharedPayload {}
149
150    struct SharedActor {
151        received: Arc<Mutex<Vec<String>>>,
152    }
153
154    #[async_trait]
155    impl Actor for SharedActor {
156        type Msg = SharedMessage<SharedPayload>;
157        type State = ();
158        type Arguments = Arc<Mutex<Vec<String>>>;
159
160        async fn pre_start(
161            &self,
162            _myself: ActorRef<Self::Msg>,
163            _args: Self::Arguments,
164        ) -> Result<Self::State, ActorProcessingErr> {
165            Ok(())
166        }
167
168        async fn handle(
169            &self,
170            _myself: ActorRef<Self::Msg>,
171            message: Self::Msg,
172            _state: &mut Self::State,
173        ) -> Result<(), ActorProcessingErr> {
174            let mut received = self.received.lock().await;
175            received.push(message.inner().value.clone());
176            Ok(())
177        }
178    }
179
180    #[tokio::test]
181    async fn test_shared_subscriber_publish() {
182        let received = Arc::new(Mutex::new(Vec::new()));
183        let (actor_ref, _handle) = Actor::spawn(
184            None,
185            SharedActor {
186                received: received.clone(),
187            },
188            received.clone(),
189        )
190        .await
191        .unwrap();
192
193        let mut subscriber = SharedSubscriber::<SharedPayload>::new();
194        subscriber.add(actor_ref);
195        subscriber
196            .publish(SharedMessage::new(SharedPayload {
197                value: "shared".to_string(),
198            }))
199            .await;
200
201        sleep(Duration::from_millis(10)).await;
202        let items = received.lock().await.clone();
203        assert_eq!(items, vec!["shared"]);
204    }
205}