autoagents_core/runtime/
mod.rs1use crate::actor::{AnyActor, CloneableMessage, Transport};
2use async_trait::async_trait;
3use autoagents_protocol::{Event, RuntimeID};
4use ractor::ActorRef;
5use std::any::{Any, TypeId};
6use std::fmt::Debug;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::SendError;
10use tokio::task::JoinError;
11
12pub(crate) mod manager;
13mod single_threaded;
14use crate::actor::Topic;
15use crate::utils::BoxEventStream;
16pub use single_threaded::SingleThreadedRuntime;
17
18#[derive(Debug, Clone)]
20pub struct RuntimeConfig {
21 pub queue_size: Option<usize>,
22}
23
24impl Default for RuntimeConfig {
25 fn default() -> Self {
26 Self {
27 queue_size: Some(100),
28 }
29 }
30}
31
32#[derive(Debug, thiserror::Error)]
34pub enum RuntimeError {
35 #[error("Send Message Error: {0}")]
36 SendMessage(String),
37
38 #[error("TopicTypeMismatch")]
39 TopicTypeMismatch(String, TypeId),
40
41 #[error("Join Error: {0}")]
42 JoinError(JoinError),
43
44 #[error("Event error: {0}")]
45 EventError(#[from] SendError<Event>),
46}
47
48#[async_trait]
52pub trait Runtime: Send + Sync {
53 fn id(&self) -> RuntimeID;
54
55 async fn subscribe_any(
56 &self,
57 topic_name: &str,
58 topic_type: TypeId,
59 actor: Arc<dyn AnyActor>,
60 ) -> Result<(), RuntimeError>;
61
62 async fn publish_any(
63 &self,
64 topic_name: &str,
65 topic_type: TypeId,
66 message: Arc<dyn Any + Send + Sync>,
67 ) -> Result<(), RuntimeError>;
68
69 fn tx(&self) -> mpsc::Sender<Event>;
73 async fn transport(&self) -> Arc<dyn Transport>;
74 async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>>;
75 async fn subscribe_events(&self) -> BoxEventStream<Event>;
77 async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
79 async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
81}
82
83#[async_trait]
86pub trait TypedRuntime: Runtime {
87 async fn subscribe<M>(&self, topic: &Topic<M>, actor: ActorRef<M>) -> Result<(), RuntimeError>
88 where
89 M: CloneableMessage + 'static,
90 {
91 let arc_actor = Arc::new(actor) as Arc<dyn AnyActor>;
92 self.subscribe_any(topic.name(), TypeId::of::<M>(), arc_actor)
93 .await
94 }
95
96 async fn publish<M>(&self, topic: &Topic<M>, message: M) -> Result<(), RuntimeError>
97 where
98 M: CloneableMessage + 'static,
99 {
100 let arc_msg = Arc::new(message) as Arc<dyn Any + Send + Sync>;
101 self.publish_any(topic.name(), TypeId::of::<M>(), arc_msg)
102 .await
103 }
104
105 async fn send_message<M: CloneableMessage + 'static>(
106 &self,
107 message: M,
108 addr: ActorRef<M>,
109 ) -> Result<(), RuntimeError> {
110 addr.cast(message)
111 .map_err(|e| RuntimeError::SendMessage(e.to_string()))
112 }
113}
114
115impl<T: Runtime + ?Sized> TypedRuntime for T {}