autoagents_core/runtime/
mod.rs

1use crate::actor::{AnyActor, CloneableMessage, Transport};
2use crate::protocol::{Event, RuntimeID};
3use async_trait::async_trait;
4use ractor::ActorRef;
5use std::any::{Any, TypeId};
6use std::fmt::Debug;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::SendError;
10use tokio::task::JoinError;
11
12pub(crate) mod manager;
13mod single_threaded;
14use crate::actor::Topic;
15use crate::utils::BoxEventStream;
16pub use single_threaded::SingleThreadedRuntime;
17
18/// Configuration for runtime instances
19#[derive(Debug, Clone)]
20pub struct RuntimeConfig {
21    pub queue_size: Option<usize>,
22}
23
24impl Default for RuntimeConfig {
25    fn default() -> Self {
26        Self {
27            queue_size: Some(100),
28        }
29    }
30}
31
32/// Error types for Session operations
33#[derive(Debug, thiserror::Error)]
34pub enum RuntimeError {
35    #[error("Send Message Error: {0}")]
36    SendMessage(String),
37
38    #[error("TopicTypeMismatch")]
39    TopicTypeMismatch(String, TypeId),
40
41    #[error("Join Error: {0}")]
42    JoinError(JoinError),
43
44    #[error("Event error: {0}")]
45    EventError(#[from] SendError<Event>),
46}
47
48#[async_trait]
49pub trait Runtime: Send + Sync {
50    fn id(&self) -> RuntimeID;
51
52    async fn subscribe_any(
53        &self,
54        topic_name: &str,
55        topic_type: TypeId,
56        actor: Arc<dyn AnyActor>,
57    ) -> Result<(), RuntimeError>;
58
59    async fn publish_any(
60        &self,
61        topic_name: &str,
62        topic_type: TypeId,
63        message: Arc<dyn Any + Send + Sync>,
64    ) -> Result<(), RuntimeError>;
65
66    //Local event processing event handler, This is passed to the agent handler and agents emit events using this, The runtime is responsible to move it to the parent environment
67    fn tx(&self) -> mpsc::Sender<Event>;
68    async fn transport(&self) -> Arc<dyn Transport>;
69    async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>>;
70    async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
71    async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
72}
73
74#[async_trait]
75pub trait TypedRuntime: Runtime {
76    async fn subscribe<M>(&self, topic: &Topic<M>, actor: ActorRef<M>) -> Result<(), RuntimeError>
77    where
78        M: CloneableMessage + 'static,
79    {
80        let arc_actor = Arc::new(actor) as Arc<dyn AnyActor>;
81        self.subscribe_any(topic.name(), TypeId::of::<M>(), arc_actor)
82            .await
83    }
84
85    async fn publish<M>(&self, topic: &Topic<M>, message: M) -> Result<(), RuntimeError>
86    where
87        M: CloneableMessage + 'static,
88    {
89        let arc_msg = Arc::new(message) as Arc<dyn Any + Send + Sync>;
90        self.publish_any(topic.name(), TypeId::of::<M>(), arc_msg)
91            .await
92    }
93
94    async fn send_message<M: CloneableMessage + 'static>(
95        &self,
96        message: M,
97        addr: ActorRef<M>,
98    ) -> Result<(), RuntimeError> {
99        addr.cast(message)
100            .map_err(|e| RuntimeError::SendMessage(e.to_string()))
101    }
102}
103
104// Auto-implement TypedRuntime for all Runtime implementations
105impl<T: Runtime + ?Sized> TypedRuntime for T {}