autoagents_core/actor/
mod.rs1mod messaging;
2mod subscriber;
3mod topic;
4mod transport;
5
6use async_trait::async_trait;
7pub use messaging::{ActorMessage, CloneableMessage, SharedMessage};
8use ractor::ActorRef;
9use std::any::Any;
10use std::fmt::Debug;
11use std::sync::Arc;
12pub use subscriber::{SharedSubscriber, TypedSubscriber};
13pub use topic::Topic;
14pub use transport::{LocalTransport, Transport};
15
16#[async_trait]
17pub trait AnyActor: Send + Sync + Debug {
18 async fn send_any(
19 &self,
20 msg: Arc<dyn Any + Send + Sync>,
21 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
22}
23
24#[async_trait]
26impl<M: CloneableMessage + 'static> AnyActor for ActorRef<M> {
27 async fn send_any(
28 &self,
29 msg: Arc<dyn Any + Send + Sync>,
30 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
31 let msg = msg.downcast_ref::<M>().ok_or("Message type mismatch")?;
32
33 self.cast(msg.clone()).map_err(|e| e.into())
34 }
35}
36
37#[async_trait]
40impl<M: Send + Sync + 'static> AnyActor for ActorRef<SharedMessage<M>> {
41 async fn send_any(
42 &self,
43 msg: Arc<dyn Any + Send + Sync>,
44 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
45 let shared_msg = msg
47 .downcast_ref::<SharedMessage<M>>()
48 .ok_or("Message type mismatch")?;
49
50 self.cast(shared_msg.clone()).map_err(|e| e.into())
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58 use std::sync::{Arc, Mutex};
59
60 #[derive(Debug, Clone, PartialEq)]
62 struct TestCloneableMessage {
63 content: String,
64 }
65 impl ActorMessage for TestCloneableMessage {}
66 impl CloneableMessage for TestCloneableMessage {}
67
68 #[derive(Debug, PartialEq)]
69 struct TestNonCloneableMessage {
70 data: String,
71 }
72 impl ActorMessage for TestNonCloneableMessage {}
73
74 #[derive(Debug)]
76 struct MockActor {
77 received_messages: Arc<Mutex<Vec<String>>>,
78 }
79
80 impl MockActor {
81 fn new() -> Self {
82 Self {
83 received_messages: Arc::new(Mutex::new(Vec::new())),
84 }
85 }
86
87 fn received_messages(&self) -> Vec<String> {
88 self.received_messages.lock().unwrap().clone()
89 }
90 }
91
92 #[async_trait]
93 impl AnyActor for MockActor {
94 async fn send_any(
95 &self,
96 msg: Arc<dyn Any + Send + Sync>,
97 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
98 if let Some(cloneable_msg) = msg.downcast_ref::<TestCloneableMessage>() {
99 self.received_messages
100 .lock()
101 .unwrap()
102 .push(format!("cloneable:{}", cloneable_msg.content));
103 Ok(())
104 } else if let Some(shared_msg) =
105 msg.downcast_ref::<SharedMessage<TestNonCloneableMessage>>()
106 {
107 self.received_messages
108 .lock()
109 .unwrap()
110 .push(format!("shared:{}", shared_msg.inner().data));
111 Ok(())
112 } else {
113 Err("Unknown message type".into())
114 }
115 }
116 }
117
118 #[test]
119 fn test_any_actor_trait_object_creation() {
120 let actor = MockActor::new();
121 let _trait_obj: Box<dyn AnyActor> = Box::new(actor);
122 }
123
124 #[tokio::test]
125 async fn test_any_actor_send_cloneable_message() {
126 let actor = MockActor::new();
127 let msg = TestCloneableMessage {
128 content: "test_message".to_string(),
129 };
130 let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(msg);
131
132 let result = actor.send_any(arc_msg).await;
133
134 assert!(result.is_ok());
135 assert_eq!(actor.received_messages(), vec!["cloneable:test_message"]);
136 }
137
138 #[tokio::test]
139 async fn test_any_actor_send_shared_message() {
140 let actor = MockActor::new();
141 let inner_msg = TestNonCloneableMessage {
142 data: "shared_data".to_string(),
143 };
144 let shared_msg = SharedMessage::new(inner_msg);
145 let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(shared_msg);
146
147 let result = actor.send_any(arc_msg).await;
148
149 assert!(result.is_ok());
150 assert_eq!(actor.received_messages(), vec!["shared:shared_data"]);
151 }
152
153 #[tokio::test]
154 async fn test_any_actor_send_unknown_message() {
155 let actor = MockActor::new();
156
157 #[allow(dead_code)]
158 #[derive(Debug)]
159 struct UnknownMessage {
160 value: i32,
161 }
162
163 let msg = UnknownMessage { value: 42 };
164 let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(msg);
165
166 let result = actor.send_any(arc_msg).await;
167
168 assert!(result.is_err());
169 assert_eq!(actor.received_messages().len(), 0);
170 }
171
172 #[test]
173 fn test_actor_message_trait_bounds() {
174 fn assert_actor_message<T: ActorMessage>() {}
176 fn assert_cloneable_message<T: CloneableMessage>() {}
177
178 assert_actor_message::<TestCloneableMessage>();
179 assert_actor_message::<TestNonCloneableMessage>();
180 assert_cloneable_message::<TestCloneableMessage>();
181
182 }
185
186 #[test]
187 fn test_shared_message_actor_message_trait() {
188 fn assert_actor_message<T: ActorMessage>() {}
189
190 assert_actor_message::<SharedMessage<TestNonCloneableMessage>>();
191
192 }
196
197 #[tokio::test]
198 async fn test_multiple_any_actors() {
199 let actor1 = MockActor::new();
200 let actor2 = MockActor::new();
201
202 let actors: Vec<Box<dyn AnyActor>> = vec![Box::new(actor1), Box::new(actor2)];
203
204 let msg = TestCloneableMessage {
205 content: "broadcast".to_string(),
206 };
207 let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(msg);
208
209 for actor in &actors {
210 let result = actor.send_any(arc_msg.clone()).await;
211 assert!(result.is_ok());
212 }
213 }
214
215 #[test]
216 fn test_actor_debug_trait() {
217 let actor = MockActor::new();
218 let debug_str = format!("{actor:?}");
219 assert!(debug_str.contains("MockActor"));
220 }
221
222 #[tokio::test]
223 async fn test_concurrent_any_actor_sends() {
224 let actor = Arc::new(MockActor::new());
225
226 let handles = (0..5)
227 .map(|i| {
228 let actor = Arc::clone(&actor);
229 tokio::spawn(async move {
230 let msg = TestCloneableMessage {
231 content: format!("concurrent_{i}"),
232 };
233 let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(msg);
234 actor.send_any(arc_msg).await
235 })
236 })
237 .collect::<Vec<_>>();
238
239 let results = futures::future::join_all(handles).await;
240
241 for result in results {
242 assert!(result.is_ok());
243 assert!(result.unwrap().is_ok());
244 }
245
246 assert_eq!(actor.received_messages().len(), 5);
248 }
249}