Skip to main content

autoagents_core/runtime/
mod.rs

1use crate::actor::{AnyActor, CloneableMessage, Transport};
2use async_trait::async_trait;
3use autoagents_protocol::{Event, RuntimeID};
4use ractor::ActorRef;
5use std::any::{Any, TypeId};
6use std::fmt::Debug;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::SendError;
10use tokio::task::JoinError;
11
12pub(crate) mod manager;
13mod single_threaded;
14use crate::actor::Topic;
15use crate::utils::BoxEventStream;
16pub use single_threaded::SingleThreadedRuntime;
17
18/// Configuration for runtime instances.
19#[derive(Debug, Clone)]
20pub struct RuntimeConfig {
21    pub queue_size: Option<usize>,
22}
23
24impl Default for RuntimeConfig {
25    fn default() -> Self {
26        Self {
27            queue_size: Some(100),
28        }
29    }
30}
31
32/// Error types for runtime operations and message routing.
33#[derive(Debug, thiserror::Error)]
34pub enum RuntimeError {
35    #[error("Send Message Error: {0}")]
36    SendMessage(String),
37
38    #[error("TopicTypeMismatch")]
39    TopicTypeMismatch(String, TypeId),
40
41    #[error("Join Error: {0}")]
42    JoinError(JoinError),
43
44    #[error("Event error: {0}")]
45    EventError(#[from] SendError<Event>),
46}
47
48/// Abstract runtime that manages actor subscriptions, pub/sub delivery, and
49/// emission of protocol events. Implementations can provide different threading
50/// or transport strategies.
51#[async_trait]
52pub trait Runtime: Send + Sync {
53    fn id(&self) -> RuntimeID;
54
55    async fn subscribe_any(
56        &self,
57        topic_name: &str,
58        topic_type: TypeId,
59        actor: Arc<dyn AnyActor>,
60    ) -> Result<(), RuntimeError>;
61
62    async fn publish_any(
63        &self,
64        topic_name: &str,
65        topic_type: TypeId,
66        message: Arc<dyn Any + Send + Sync>,
67    ) -> Result<(), RuntimeError>;
68
69    /// Local event processing sender. Agents receive this and emit protocol
70    /// `Event`s through it. The runtime is responsible for forwarding them to
71    /// the owning `Environment`.
72    fn tx(&self) -> mpsc::Sender<Event>;
73    async fn transport(&self) -> Arc<dyn Transport>;
74    async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>>;
75    /// Subscribe to runtime protocol events without consuming the receiver.
76    async fn subscribe_events(&self) -> BoxEventStream<Event>;
77    /// Run the runtime event loop and process internal messages until stopped.
78    async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
79    /// Request shutdown of the runtime.
80    async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
81}
82
83/// Type-safe convenience layer over `Runtime` for strongly-typed topics and
84/// direct messaging to actors.
85#[async_trait]
86pub trait TypedRuntime: Runtime {
87    async fn subscribe<M>(&self, topic: &Topic<M>, actor: ActorRef<M>) -> Result<(), RuntimeError>
88    where
89        M: CloneableMessage + 'static,
90    {
91        let arc_actor = Arc::new(actor) as Arc<dyn AnyActor>;
92        self.subscribe_any(topic.name(), TypeId::of::<M>(), arc_actor)
93            .await
94    }
95
96    async fn publish<M>(&self, topic: &Topic<M>, message: M) -> Result<(), RuntimeError>
97    where
98        M: CloneableMessage + 'static,
99    {
100        let arc_msg = Arc::new(message) as Arc<dyn Any + Send + Sync>;
101        self.publish_any(topic.name(), TypeId::of::<M>(), arc_msg)
102            .await
103    }
104
105    async fn send_message<M: CloneableMessage + 'static>(
106        &self,
107        message: M,
108        addr: ActorRef<M>,
109    ) -> Result<(), RuntimeError> {
110        addr.cast(message)
111            .map_err(|e| RuntimeError::SendMessage(e.to_string()))
112    }
113}
114
115// Auto-implement TypedRuntime for all Runtime implementations
116impl<T: Runtime + ?Sized> TypedRuntime for T {}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use crate::actor::{ActorMessage, CloneableMessage, LocalTransport, Topic, Transport};
122    use async_trait::async_trait;
123    use futures::stream;
124    use tokio::sync::Mutex;
125
126    #[derive(Debug, Clone)]
127    struct TestMessage {
128        value: String,
129    }
130
131    impl ActorMessage for TestMessage {}
132    impl CloneableMessage for TestMessage {}
133
134    #[derive(Debug)]
135    struct TestRuntime {
136        published: Arc<Mutex<Vec<(String, TypeId, String)>>>,
137        tx: mpsc::Sender<Event>,
138    }
139
140    impl TestRuntime {
141        fn new() -> Self {
142            let (tx, _rx) = mpsc::channel(1);
143            Self {
144                published: Arc::new(Mutex::new(Vec::new())),
145                tx,
146            }
147        }
148    }
149
150    #[async_trait]
151    impl Runtime for TestRuntime {
152        fn id(&self) -> RuntimeID {
153            RuntimeID::new_v4()
154        }
155
156        async fn subscribe_any(
157            &self,
158            _topic_name: &str,
159            _topic_type: TypeId,
160            _actor: Arc<dyn AnyActor>,
161        ) -> Result<(), RuntimeError> {
162            Ok(())
163        }
164
165        async fn publish_any(
166            &self,
167            topic_name: &str,
168            topic_type: TypeId,
169            message: Arc<dyn Any + Send + Sync>,
170        ) -> Result<(), RuntimeError> {
171            let msg = message
172                .downcast_ref::<TestMessage>()
173                .map(|m| m.value.clone())
174                .unwrap_or_default();
175            let mut published = self.published.lock().await;
176            published.push((topic_name.to_string(), topic_type, msg));
177            Ok(())
178        }
179
180        fn tx(&self) -> mpsc::Sender<Event> {
181            self.tx.clone()
182        }
183
184        async fn transport(&self) -> Arc<dyn Transport> {
185            Arc::new(LocalTransport)
186        }
187
188        async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>> {
189            None
190        }
191
192        async fn subscribe_events(&self) -> BoxEventStream<Event> {
193            Box::pin(stream::empty())
194        }
195
196        async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
197            Ok(())
198        }
199
200        async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
201            Ok(())
202        }
203    }
204
205    #[tokio::test]
206    async fn test_typed_runtime_publish_records_message() {
207        let runtime = TestRuntime::new();
208        let topic = Topic::<TestMessage>::new("topic");
209        runtime
210            .publish(
211                &topic,
212                TestMessage {
213                    value: "hello".to_string(),
214                },
215            )
216            .await
217            .unwrap();
218
219        let published = runtime.published.lock().await.clone();
220        assert_eq!(published.len(), 1);
221        assert_eq!(published[0].0, "topic");
222        assert_eq!(published[0].2, "hello");
223    }
224}