autoagents-core 0.3.7

Agent Framework for Building Autonomous Agents
Documentation
use crate::actor::{AnyActor, CloneableMessage, Transport};
use async_trait::async_trait;
use autoagents_protocol::{Event, RuntimeID};
use ractor::ActorRef;
use std::any::{Any, TypeId};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;
use tokio::task::JoinError;

pub(crate) mod manager;
mod single_threaded;
use crate::actor::Topic;
use crate::utils::BoxEventStream;
pub use single_threaded::SingleThreadedRuntime;

/// Configuration for runtime instances.
#[derive(Debug, Clone)]
pub struct RuntimeConfig {
    pub queue_size: Option<usize>,
}

impl Default for RuntimeConfig {
    fn default() -> Self {
        Self {
            queue_size: Some(100),
        }
    }
}

/// Error types for runtime operations and message routing.
#[derive(Debug, thiserror::Error)]
pub enum RuntimeError {
    #[error("Send Message Error: {0}")]
    SendMessage(String),

    #[error("TopicTypeMismatch")]
    TopicTypeMismatch(String, TypeId),

    #[error("Join Error: {0}")]
    JoinError(JoinError),

    #[error("Event error: {0}")]
    EventError(#[from] SendError<Event>),
}

/// Abstract runtime that manages actor subscriptions, pub/sub delivery, and
/// emission of protocol events. Implementations can provide different threading
/// or transport strategies.
#[async_trait]
pub trait Runtime: Send + Sync {
    fn id(&self) -> RuntimeID;

    async fn subscribe_any(
        &self,
        topic_name: &str,
        topic_type: TypeId,
        actor: Arc<dyn AnyActor>,
    ) -> Result<(), RuntimeError>;

    async fn publish_any(
        &self,
        topic_name: &str,
        topic_type: TypeId,
        message: Arc<dyn Any + Send + Sync>,
    ) -> Result<(), RuntimeError>;

    /// Local event processing sender. Agents receive this and emit protocol
    /// `Event`s through it. The runtime is responsible for forwarding them to
    /// the owning `Environment`.
    fn tx(&self) -> mpsc::Sender<Event>;
    async fn transport(&self) -> Arc<dyn Transport>;
    async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>>;
    /// Subscribe to runtime protocol events without consuming the receiver.
    async fn subscribe_events(&self) -> BoxEventStream<Event>;
    /// Run the runtime event loop and process internal messages until stopped.
    async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
    /// Request shutdown of the runtime.
    async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}

/// Type-safe convenience layer over `Runtime` for strongly-typed topics and
/// direct messaging to actors.
#[async_trait]
pub trait TypedRuntime: Runtime {
    async fn subscribe<M>(&self, topic: &Topic<M>, actor: ActorRef<M>) -> Result<(), RuntimeError>
    where
        M: CloneableMessage + 'static,
    {
        let arc_actor = Arc::new(actor) as Arc<dyn AnyActor>;
        self.subscribe_any(topic.name(), TypeId::of::<M>(), arc_actor)
            .await
    }

    async fn publish<M>(&self, topic: &Topic<M>, message: M) -> Result<(), RuntimeError>
    where
        M: CloneableMessage + 'static,
    {
        let arc_msg = Arc::new(message) as Arc<dyn Any + Send + Sync>;
        self.publish_any(topic.name(), TypeId::of::<M>(), arc_msg)
            .await
    }

    async fn send_message<M: CloneableMessage + 'static>(
        &self,
        message: M,
        addr: ActorRef<M>,
    ) -> Result<(), RuntimeError> {
        addr.cast(message)
            .map_err(|e| RuntimeError::SendMessage(e.to_string()))
    }
}

// Auto-implement TypedRuntime for all Runtime implementations
impl<T: Runtime + ?Sized> TypedRuntime for T {}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::actor::{ActorMessage, CloneableMessage, LocalTransport, Topic, Transport};
    use async_trait::async_trait;
    use futures::stream;
    use tokio::sync::Mutex;

    #[derive(Debug, Clone)]
    struct TestMessage {
        value: String,
    }

    impl ActorMessage for TestMessage {}
    impl CloneableMessage for TestMessage {}

    #[derive(Debug)]
    struct TestRuntime {
        published: Arc<Mutex<Vec<(String, TypeId, String)>>>,
        tx: mpsc::Sender<Event>,
    }

    impl TestRuntime {
        fn new() -> Self {
            let (tx, _rx) = mpsc::channel(1);
            Self {
                published: Arc::new(Mutex::new(Vec::new())),
                tx,
            }
        }
    }

    #[async_trait]
    impl Runtime for TestRuntime {
        fn id(&self) -> RuntimeID {
            RuntimeID::new_v4()
        }

        async fn subscribe_any(
            &self,
            _topic_name: &str,
            _topic_type: TypeId,
            _actor: Arc<dyn AnyActor>,
        ) -> Result<(), RuntimeError> {
            Ok(())
        }

        async fn publish_any(
            &self,
            topic_name: &str,
            topic_type: TypeId,
            message: Arc<dyn Any + Send + Sync>,
        ) -> Result<(), RuntimeError> {
            let msg = message
                .downcast_ref::<TestMessage>()
                .map(|m| m.value.clone())
                .unwrap_or_default();
            let mut published = self.published.lock().await;
            published.push((topic_name.to_string(), topic_type, msg));
            Ok(())
        }

        fn tx(&self) -> mpsc::Sender<Event> {
            self.tx.clone()
        }

        async fn transport(&self) -> Arc<dyn Transport> {
            Arc::new(LocalTransport)
        }

        async fn take_event_receiver(&self) -> Option<BoxEventStream<Event>> {
            None
        }

        async fn subscribe_events(&self) -> BoxEventStream<Event> {
            Box::pin(stream::empty())
        }

        async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
            Ok(())
        }

        async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
            Ok(())
        }
    }

    #[tokio::test]
    async fn test_typed_runtime_publish_records_message() {
        let runtime = TestRuntime::new();
        let topic = Topic::<TestMessage>::new("topic");
        runtime
            .publish(
                &topic,
                TestMessage {
                    value: "hello".to_string(),
                },
            )
            .await
            .unwrap();

        let published = runtime.published.lock().await.clone();
        assert_eq!(published.len(), 1);
        assert_eq!(published[0].0, "topic");
        assert_eq!(published[0].2, "hello");
    }
}