autoagents_core/runtime/
mod.rs

1use crate::actor::{AnyActor, CloneableMessage, Transport};
2use crate::protocol::{Event, RuntimeID};
3use async_trait::async_trait;
4use ractor::ActorRef;
5use std::any::{Any, TypeId};
6use std::fmt::Debug;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::SendError;
10use tokio::task::JoinError;
11
12pub(crate) mod manager;
13mod single_threaded;
14use crate::actor::Topic;
15use crate::utils::BoxEventStream;
16pub use single_threaded::SingleThreadedRuntime;
17
18/// Configuration for runtime instances.
19#[derive(Debug, Clone)]
20pub struct RuntimeConfig {
21    pub queue_size: Option<usize>,
22}
23
24impl Default for RuntimeConfig {
25    fn default() -> Self {
26        Self {
27            queue_size: Some(100),
28        }
29    }
30}
31
32/// Error types for runtime operations and message routing.
33#[derive(Debug, thiserror::Error)]
34pub enum RuntimeError {
35    #[error("Send Message Error: {0}")]
36    SendMessage(String),
37
38    #[error("TopicTypeMismatch")]
39    TopicTypeMismatch(String, TypeId),
40
41    #[error("Join Error: {0}")]
42    JoinError(JoinError),
43
44    #[error("Event error: {0}")]
45    EventError(#[from] SendError<Event>),
46}
47
48/// Abstract runtime that manages actor subscriptions, pub/sub delivery, and
49/// emission of protocol events. Implementations can provide different threading
50/// or transport strategies.
51#[async_trait]
52pub trait Runtime: Send + Sync {
53    fn id(&self) -> RuntimeID;
54
55    async fn subscribe_any(
56        &self,
57        topic_name: &str,
58        topic_type: TypeId,
59        actor: Arc<dyn AnyActor>,
60    ) -> Result<(), RuntimeError>;
61
62    async fn publish_any(
63        &self,
64        topic_name: &str,
65        topic_type: TypeId,
66        message: Arc<dyn Any + Send + Sync>,
67    ) -> Result<(), RuntimeError>;
68
69    /// Local event processing sender. Agents receive this and emit protocol
70    /// `Event`s through it. The runtime is responsible for forwarding them to
71    /// the owning `Environment`.
72    fn tx(&self) -> mpsc::Sender<Event>;
73    async fn transport(&self) -> Arc<dyn Transport>;
74    async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>>;
75    /// Run the runtime event loop and process internal messages until stopped.
76    async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
77    /// Request shutdown of the runtime.
78    async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
79}
80
81/// Type-safe convenience layer over `Runtime` for strongly-typed topics and
82/// direct messaging to actors.
83#[async_trait]
84pub trait TypedRuntime: Runtime {
85    async fn subscribe<M>(&self, topic: &Topic<M>, actor: ActorRef<M>) -> Result<(), RuntimeError>
86    where
87        M: CloneableMessage + 'static,
88    {
89        let arc_actor = Arc::new(actor) as Arc<dyn AnyActor>;
90        self.subscribe_any(topic.name(), TypeId::of::<M>(), arc_actor)
91            .await
92    }
93
94    async fn publish<M>(&self, topic: &Topic<M>, message: M) -> Result<(), RuntimeError>
95    where
96        M: CloneableMessage + 'static,
97    {
98        let arc_msg = Arc::new(message) as Arc<dyn Any + Send + Sync>;
99        self.publish_any(topic.name(), TypeId::of::<M>(), arc_msg)
100            .await
101    }
102
103    async fn send_message<M: CloneableMessage + 'static>(
104        &self,
105        message: M,
106        addr: ActorRef<M>,
107    ) -> Result<(), RuntimeError> {
108        addr.cast(message)
109            .map_err(|e| RuntimeError::SendMessage(e.to_string()))
110    }
111}
112
113// Auto-implement TypedRuntime for all Runtime implementations
114impl<T: Runtime + ?Sized> TypedRuntime for T {}