autoagents_core/runtime/
mod.rs1use crate::actor::{AnyActor, CloneableMessage, Transport};
2use async_trait::async_trait;
3use autoagents_protocol::{Event, RuntimeID};
4use ractor::ActorRef;
5use std::any::{Any, TypeId};
6use std::fmt::Debug;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::SendError;
10use tokio::task::JoinError;
11
12pub(crate) mod manager;
13mod single_threaded;
14use crate::actor::Topic;
15use crate::utils::BoxEventStream;
16pub use single_threaded::SingleThreadedRuntime;
17
18#[derive(Debug, Clone)]
20pub struct RuntimeConfig {
21 pub queue_size: Option<usize>,
22}
23
24impl Default for RuntimeConfig {
25 fn default() -> Self {
26 Self {
27 queue_size: Some(100),
28 }
29 }
30}
31
32#[derive(Debug, thiserror::Error)]
34pub enum RuntimeError {
35 #[error("Send Message Error: {0}")]
36 SendMessage(String),
37
38 #[error("TopicTypeMismatch")]
39 TopicTypeMismatch(String, TypeId),
40
41 #[error("Join Error: {0}")]
42 JoinError(JoinError),
43
44 #[error("Event error: {0}")]
45 EventError(#[from] SendError<Event>),
46}
47
48#[async_trait]
52pub trait Runtime: Send + Sync {
53 fn id(&self) -> RuntimeID;
54
55 async fn subscribe_any(
56 &self,
57 topic_name: &str,
58 topic_type: TypeId,
59 actor: Arc<dyn AnyActor>,
60 ) -> Result<(), RuntimeError>;
61
62 async fn publish_any(
63 &self,
64 topic_name: &str,
65 topic_type: TypeId,
66 message: Arc<dyn Any + Send + Sync>,
67 ) -> Result<(), RuntimeError>;
68
69 fn tx(&self) -> mpsc::Sender<Event>;
73 async fn transport(&self) -> Arc<dyn Transport>;
74 async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>>;
75 async fn subscribe_events(&self) -> BoxEventStream<Event>;
77 async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
79 async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
81}
82
83#[async_trait]
86pub trait TypedRuntime: Runtime {
87 async fn subscribe<M>(&self, topic: &Topic<M>, actor: ActorRef<M>) -> Result<(), RuntimeError>
88 where
89 M: CloneableMessage + 'static,
90 {
91 let arc_actor = Arc::new(actor) as Arc<dyn AnyActor>;
92 self.subscribe_any(topic.name(), TypeId::of::<M>(), arc_actor)
93 .await
94 }
95
96 async fn publish<M>(&self, topic: &Topic<M>, message: M) -> Result<(), RuntimeError>
97 where
98 M: CloneableMessage + 'static,
99 {
100 let arc_msg = Arc::new(message) as Arc<dyn Any + Send + Sync>;
101 self.publish_any(topic.name(), TypeId::of::<M>(), arc_msg)
102 .await
103 }
104
105 async fn send_message<M: CloneableMessage + 'static>(
106 &self,
107 message: M,
108 addr: ActorRef<M>,
109 ) -> Result<(), RuntimeError> {
110 addr.cast(message)
111 .map_err(|e| RuntimeError::SendMessage(e.to_string()))
112 }
113}
114
115impl<T: Runtime + ?Sized> TypedRuntime for T {}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 use crate::actor::{ActorMessage, CloneableMessage, LocalTransport, Topic, Transport};
122 use async_trait::async_trait;
123 use futures::stream;
124 use tokio::sync::Mutex;
125
126 #[derive(Debug, Clone)]
127 struct TestMessage {
128 value: String,
129 }
130
131 impl ActorMessage for TestMessage {}
132 impl CloneableMessage for TestMessage {}
133
134 #[derive(Debug)]
135 struct TestRuntime {
136 published: Arc<Mutex<Vec<(String, TypeId, String)>>>,
137 tx: mpsc::Sender<Event>,
138 }
139
140 impl TestRuntime {
141 fn new() -> Self {
142 let (tx, _rx) = mpsc::channel(1);
143 Self {
144 published: Arc::new(Mutex::new(Vec::new())),
145 tx,
146 }
147 }
148 }
149
150 #[async_trait]
151 impl Runtime for TestRuntime {
152 fn id(&self) -> RuntimeID {
153 RuntimeID::new_v4()
154 }
155
156 async fn subscribe_any(
157 &self,
158 _topic_name: &str,
159 _topic_type: TypeId,
160 _actor: Arc<dyn AnyActor>,
161 ) -> Result<(), RuntimeError> {
162 Ok(())
163 }
164
165 async fn publish_any(
166 &self,
167 topic_name: &str,
168 topic_type: TypeId,
169 message: Arc<dyn Any + Send + Sync>,
170 ) -> Result<(), RuntimeError> {
171 let msg = message
172 .downcast_ref::<TestMessage>()
173 .map(|m| m.value.clone())
174 .unwrap_or_default();
175 let mut published = self.published.lock().await;
176 published.push((topic_name.to_string(), topic_type, msg));
177 Ok(())
178 }
179
180 fn tx(&self) -> mpsc::Sender<Event> {
181 self.tx.clone()
182 }
183
184 async fn transport(&self) -> Arc<dyn Transport> {
185 Arc::new(LocalTransport)
186 }
187
188 async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>> {
189 None
190 }
191
192 async fn subscribe_events(&self) -> BoxEventStream<Event> {
193 Box::pin(stream::empty())
194 }
195
196 async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
197 Ok(())
198 }
199
200 async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
201 Ok(())
202 }
203 }
204
205 #[tokio::test]
206 async fn test_typed_runtime_publish_records_message() {
207 let runtime = TestRuntime::new();
208 let topic = Topic::<TestMessage>::new("topic");
209 runtime
210 .publish(
211 &topic,
212 TestMessage {
213 value: "hello".to_string(),
214 },
215 )
216 .await
217 .unwrap();
218
219 let published = runtime.published.lock().await.clone();
220 assert_eq!(published.len(), 1);
221 assert_eq!(published[0].0, "topic");
222 assert_eq!(published[0].2, "hello");
223 }
224}