autoagents_core/actor/
mod.rs

1mod messaging;
2mod subscriber;
3mod topic;
4mod transport;
5
6use async_trait::async_trait;
7pub use messaging::{ActorMessage, CloneableMessage, SharedMessage};
8use ractor::ActorRef;
9use std::any::Any;
10use std::fmt::Debug;
11use std::sync::Arc;
12pub use subscriber::{SharedSubscriber, TypedSubscriber};
13pub use topic::Topic;
14pub use transport::{LocalTransport, Transport};
15
16#[async_trait]
17pub trait AnyActor: Send + Sync + Debug {
18    async fn send_any(
19        &self,
20        msg: Arc<dyn Any + Send + Sync>,
21    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
22}
23
24// For actors that receive cloneable messages
25#[async_trait]
26impl<M: CloneableMessage + 'static> AnyActor for ActorRef<M> {
27    async fn send_any(
28        &self,
29        msg: Arc<dyn Any + Send + Sync>,
30    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
31        let msg = msg.downcast_ref::<M>().ok_or("Message type mismatch")?;
32
33        self.cast(msg.clone()).map_err(|e| e.into())
34    }
35}
36
37// Special implementation for SharedMessage actors
38// This doesn't conflict because SharedMessage<M> doesn't implement CloneableMessage
39#[async_trait]
40impl<M: Send + Sync + 'static> AnyActor for ActorRef<SharedMessage<M>> {
41    async fn send_any(
42        &self,
43        msg: Arc<dyn Any + Send + Sync>,
44    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
45        // Downcast to SharedMessage<M>
46        let shared_msg = msg
47            .downcast_ref::<SharedMessage<M>>()
48            .ok_or("Message type mismatch")?;
49
50        // Clone the SharedMessage (which clones the Arc, not M)
51        self.cast(shared_msg.clone()).map_err(|e| e.into())
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use std::sync::{Arc, Mutex};
59
60    // Test message types
61    #[derive(Debug, Clone, PartialEq)]
62    struct TestCloneableMessage {
63        content: String,
64    }
65    impl ActorMessage for TestCloneableMessage {}
66    impl CloneableMessage for TestCloneableMessage {}
67
68    #[derive(Debug, PartialEq)]
69    struct TestNonCloneableMessage {
70        data: String,
71    }
72    impl ActorMessage for TestNonCloneableMessage {}
73
74    // Mock actor for testing
75    #[derive(Debug)]
76    struct MockActor {
77        received_messages: Arc<Mutex<Vec<String>>>,
78    }
79
80    impl MockActor {
81        fn new() -> Self {
82            Self {
83                received_messages: Arc::new(Mutex::new(Vec::new())),
84            }
85        }
86
87        fn received_messages(&self) -> Vec<String> {
88            self.received_messages.lock().unwrap().clone()
89        }
90    }
91
92    #[async_trait]
93    impl AnyActor for MockActor {
94        async fn send_any(
95            &self,
96            msg: Arc<dyn Any + Send + Sync>,
97        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
98            if let Some(cloneable_msg) = msg.downcast_ref::<TestCloneableMessage>() {
99                self.received_messages
100                    .lock()
101                    .unwrap()
102                    .push(format!("cloneable:{}", cloneable_msg.content));
103                Ok(())
104            } else if let Some(shared_msg) =
105                msg.downcast_ref::<SharedMessage<TestNonCloneableMessage>>()
106            {
107                self.received_messages
108                    .lock()
109                    .unwrap()
110                    .push(format!("shared:{}", shared_msg.inner().data));
111                Ok(())
112            } else {
113                Err("Unknown message type".into())
114            }
115        }
116    }
117
118    #[test]
119    fn test_any_actor_trait_object_creation() {
120        let actor = MockActor::new();
121        let _trait_obj: Box<dyn AnyActor> = Box::new(actor);
122    }
123
124    #[tokio::test]
125    async fn test_any_actor_send_cloneable_message() {
126        let actor = MockActor::new();
127        let msg = TestCloneableMessage {
128            content: "test_message".to_string(),
129        };
130        let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(msg);
131
132        let result = actor.send_any(arc_msg).await;
133
134        assert!(result.is_ok());
135        assert_eq!(actor.received_messages(), vec!["cloneable:test_message"]);
136    }
137
138    #[tokio::test]
139    async fn test_any_actor_send_shared_message() {
140        let actor = MockActor::new();
141        let inner_msg = TestNonCloneableMessage {
142            data: "shared_data".to_string(),
143        };
144        let shared_msg = SharedMessage::new(inner_msg);
145        let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(shared_msg);
146
147        let result = actor.send_any(arc_msg).await;
148
149        assert!(result.is_ok());
150        assert_eq!(actor.received_messages(), vec!["shared:shared_data"]);
151    }
152
153    #[tokio::test]
154    async fn test_any_actor_send_unknown_message() {
155        let actor = MockActor::new();
156
157        #[allow(dead_code)]
158        #[derive(Debug)]
159        struct UnknownMessage {
160            value: i32,
161        }
162
163        let msg = UnknownMessage { value: 42 };
164        let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(msg);
165
166        let result = actor.send_any(arc_msg).await;
167
168        assert!(result.is_err());
169        assert_eq!(actor.received_messages().len(), 0);
170    }
171
172    #[test]
173    fn test_actor_message_trait_bounds() {
174        // Test that our message types satisfy the trait bounds
175        fn assert_actor_message<T: ActorMessage>() {}
176        fn assert_cloneable_message<T: CloneableMessage>() {}
177
178        assert_actor_message::<TestCloneableMessage>();
179        assert_actor_message::<TestNonCloneableMessage>();
180        assert_cloneable_message::<TestCloneableMessage>();
181
182        // TestNonCloneableMessage should not satisfy CloneableMessage
183        // assert_cloneable_message::<TestNonCloneableMessage>(); // This would fail to compile
184    }
185
186    #[test]
187    fn test_shared_message_actor_message_trait() {
188        fn assert_actor_message<T: ActorMessage>() {}
189
190        assert_actor_message::<SharedMessage<TestNonCloneableMessage>>();
191
192        // SharedMessage should NOT satisfy CloneableMessage to avoid conflicts
193        // fn assert_cloneable_message<T: CloneableMessage>() {}
194        // assert_cloneable_message::<SharedMessage<TestNonCloneableMessage>>(); // Would fail
195    }
196
197    #[tokio::test]
198    async fn test_multiple_any_actors() {
199        let actor1 = MockActor::new();
200        let actor2 = MockActor::new();
201
202        let actors: Vec<Box<dyn AnyActor>> = vec![Box::new(actor1), Box::new(actor2)];
203
204        let msg = TestCloneableMessage {
205            content: "broadcast".to_string(),
206        };
207        let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(msg);
208
209        for actor in &actors {
210            let result = actor.send_any(arc_msg.clone()).await;
211            assert!(result.is_ok());
212        }
213    }
214
215    #[test]
216    fn test_actor_debug_trait() {
217        let actor = MockActor::new();
218        let debug_str = format!("{actor:?}");
219        assert!(debug_str.contains("MockActor"));
220    }
221
222    #[tokio::test]
223    async fn test_concurrent_any_actor_sends() {
224        let actor = Arc::new(MockActor::new());
225
226        let handles = (0..5)
227            .map(|i| {
228                let actor = Arc::clone(&actor);
229                tokio::spawn(async move {
230                    let msg = TestCloneableMessage {
231                        content: format!("concurrent_{i}"),
232                    };
233                    let arc_msg: Arc<dyn Any + Send + Sync> = Arc::new(msg);
234                    actor.send_any(arc_msg).await
235                })
236            })
237            .collect::<Vec<_>>();
238
239        let results = futures::future::join_all(handles).await;
240
241        for result in results {
242            assert!(result.is_ok());
243            assert!(result.unwrap().is_ok());
244        }
245
246        // Should have received 5 messages
247        assert_eq!(actor.received_messages().len(), 5);
248    }
249}