use crate::actor::{AnyActor, CloneableMessage, Transport};
use async_trait::async_trait;
use autoagents_protocol::{Event, RuntimeID};
use ractor::ActorRef;
use std::any::{Any, TypeId};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;
use tokio::task::JoinError;
pub(crate) mod manager;
mod single_threaded;
use crate::actor::Topic;
use crate::utils::BoxEventStream;
pub use single_threaded::SingleThreadedRuntime;
#[derive(Debug, Clone)]
pub struct RuntimeConfig {
pub queue_size: Option<usize>,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
queue_size: Some(100),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum RuntimeError {
#[error("Send Message Error: {0}")]
SendMessage(String),
#[error("TopicTypeMismatch")]
TopicTypeMismatch(String, TypeId),
#[error("Join Error: {0}")]
JoinError(JoinError),
#[error("Event error: {0}")]
EventError(#[from] SendError<Event>),
}
#[async_trait]
pub trait Runtime: Send + Sync {
fn id(&self) -> RuntimeID;
async fn subscribe_any(
&self,
topic_name: &str,
topic_type: TypeId,
actor: Arc<dyn AnyActor>,
) -> Result<(), RuntimeError>;
async fn publish_any(
&self,
topic_name: &str,
topic_type: TypeId,
message: Arc<dyn Any + Send + Sync>,
) -> Result<(), RuntimeError>;
fn tx(&self) -> mpsc::Sender<Event>;
async fn transport(&self) -> Arc<dyn Transport>;
async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>>;
async fn subscribe_events(&self) -> BoxEventStream<Event>;
async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
#[async_trait]
pub trait TypedRuntime: Runtime {
async fn subscribe<M>(&self, topic: &Topic<M>, actor: ActorRef<M>) -> Result<(), RuntimeError>
where
M: CloneableMessage + 'static,
{
let arc_actor = Arc::new(actor) as Arc<dyn AnyActor>;
self.subscribe_any(topic.name(), TypeId::of::<M>(), arc_actor)
.await
}
async fn publish<M>(&self, topic: &Topic<M>, message: M) -> Result<(), RuntimeError>
where
M: CloneableMessage + 'static,
{
let arc_msg = Arc::new(message) as Arc<dyn Any + Send + Sync>;
self.publish_any(topic.name(), TypeId::of::<M>(), arc_msg)
.await
}
async fn send_message<M: CloneableMessage + 'static>(
&self,
message: M,
addr: ActorRef<M>,
) -> Result<(), RuntimeError> {
addr.cast(message)
.map_err(|e| RuntimeError::SendMessage(e.to_string()))
}
}
impl<T: Runtime + ?Sized> TypedRuntime for T {}
#[cfg(test)]
mod tests {
use super::*;
use crate::actor::{ActorMessage, CloneableMessage, LocalTransport, Topic, Transport};
use async_trait::async_trait;
use futures::stream;
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
struct TestMessage {
value: String,
}
impl ActorMessage for TestMessage {}
impl CloneableMessage for TestMessage {}
#[derive(Debug)]
struct TestRuntime {
published: Arc<Mutex<Vec<(String, TypeId, String)>>>,
tx: mpsc::Sender<Event>,
}
impl TestRuntime {
fn new() -> Self {
let (tx, _rx) = mpsc::channel(1);
Self {
published: Arc::new(Mutex::new(Vec::new())),
tx,
}
}
}
#[async_trait]
impl Runtime for TestRuntime {
fn id(&self) -> RuntimeID {
RuntimeID::new_v4()
}
async fn subscribe_any(
&self,
_topic_name: &str,
_topic_type: TypeId,
_actor: Arc<dyn AnyActor>,
) -> Result<(), RuntimeError> {
Ok(())
}
async fn publish_any(
&self,
topic_name: &str,
topic_type: TypeId,
message: Arc<dyn Any + Send + Sync>,
) -> Result<(), RuntimeError> {
let msg = message
.downcast_ref::<TestMessage>()
.map(|m| m.value.clone())
.unwrap_or_default();
let mut published = self.published.lock().await;
published.push((topic_name.to_string(), topic_type, msg));
Ok(())
}
fn tx(&self) -> mpsc::Sender<Event> {
self.tx.clone()
}
async fn transport(&self) -> Arc<dyn Transport> {
Arc::new(LocalTransport)
}
async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>> {
None
}
async fn subscribe_events(&self) -> BoxEventStream<Event> {
Box::pin(stream::empty())
}
async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
}
#[tokio::test]
async fn test_typed_runtime_publish_records_message() {
let runtime = TestRuntime::new();
let topic = Topic::<TestMessage>::new("topic");
runtime
.publish(
&topic,
TestMessage {
value: "hello".to_string(),
},
)
.await
.unwrap();
let published = runtime.published.lock().await.clone();
assert_eq!(published.len(), 1);
assert_eq!(published[0].0, "topic");
assert_eq!(published[0].2, "hello");
}
}