autoagents_core/runtime/
mod.rs1use crate::actor::{AnyActor, CloneableMessage, Transport};
2use crate::protocol::{Event, RuntimeID};
3use async_trait::async_trait;
4use ractor::ActorRef;
5use std::any::{Any, TypeId};
6use std::fmt::Debug;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::SendError;
10use tokio::task::JoinError;
11
12pub(crate) mod manager;
13mod single_threaded;
14use crate::actor::Topic;
15use crate::utils::BoxEventStream;
16pub use single_threaded::SingleThreadedRuntime;
17
18#[derive(Debug, Clone)]
20pub struct RuntimeConfig {
21 pub queue_size: Option<usize>,
22}
23
24impl Default for RuntimeConfig {
25 fn default() -> Self {
26 Self {
27 queue_size: Some(100),
28 }
29 }
30}
31
32#[derive(Debug, thiserror::Error)]
34pub enum RuntimeError {
35 #[error("Send Message Error: {0}")]
36 SendMessage(String),
37
38 #[error("TopicTypeMismatch")]
39 TopicTypeMismatch(String, TypeId),
40
41 #[error("Join Error: {0}")]
42 JoinError(JoinError),
43
44 #[error("Event error: {0}")]
45 EventError(#[from] SendError<Event>),
46}
47
48#[async_trait]
49pub trait Runtime: Send + Sync {
50 fn id(&self) -> RuntimeID;
51
52 async fn subscribe_any(
53 &self,
54 topic_name: &str,
55 topic_type: TypeId,
56 actor: Arc<dyn AnyActor>,
57 ) -> Result<(), RuntimeError>;
58
59 async fn publish_any(
60 &self,
61 topic_name: &str,
62 topic_type: TypeId,
63 message: Arc<dyn Any + Send + Sync>,
64 ) -> Result<(), RuntimeError>;
65
66 fn tx(&self) -> mpsc::Sender<Event>;
68 async fn transport(&self) -> Arc<dyn Transport>;
69 async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>>;
70 async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
71 async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
72}
73
74#[async_trait]
75pub trait TypedRuntime: Runtime {
76 async fn subscribe<M>(&self, topic: &Topic<M>, actor: ActorRef<M>) -> Result<(), RuntimeError>
77 where
78 M: CloneableMessage + 'static,
79 {
80 let arc_actor = Arc::new(actor) as Arc<dyn AnyActor>;
81 self.subscribe_any(topic.name(), TypeId::of::<M>(), arc_actor)
82 .await
83 }
84
85 async fn publish<M>(&self, topic: &Topic<M>, message: M) -> Result<(), RuntimeError>
86 where
87 M: CloneableMessage + 'static,
88 {
89 let arc_msg = Arc::new(message) as Arc<dyn Any + Send + Sync>;
90 self.publish_any(topic.name(), TypeId::of::<M>(), arc_msg)
91 .await
92 }
93
94 async fn send_message<M: CloneableMessage + 'static>(
95 &self,
96 message: M,
97 addr: ActorRef<M>,
98 ) -> Result<(), RuntimeError> {
99 addr.cast(message)
100 .map_err(|e| RuntimeError::SendMessage(e.to_string()))
101 }
102}
103
104impl<T: Runtime + ?Sized> TypedRuntime for T {}