wasmind 0.1.0

Core actor-based coordination runtime for AI agent workflows
Documentation
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{Instrument, Level};
use wasmind::actor::{agent, command, host_info, http, logger, messaging};
use wasmind_actor_utils::{common_messages::actors, messages::Message};
use wasmtime::{
    Engine, Store,
    component::{Component, HasSelf, Linker, ResourceAny, bindgen},
};

use crate::{context::WasmindContext, scope::Scope};

pub mod actor_state;
pub use actor_state::{ActorState, command::CommandResource};

use super::MessageEnvelope;

pub type ActorId = String;

bindgen!({
    world: "actor-world",
    with: {
        "wasmind:actor/command/cmd": CommandResource,
        "wasmind:actor/http/request": actor_state::http::HttpRequestResource,
    },
    path: "wit/world.wit",
    imports: { default: async },
    exports: { default: async },
});

impl PartialEq for MessageEnvelope {
    fn eq(&self, other: &Self) -> bool {
        self.id == other.id
    }
}

impl PartialOrd for MessageEnvelope {
    fn partial_cmp(&self, _other: &Self) -> Option<std::cmp::Ordering> {
        None
    }
}

impl Eq for MessageEnvelope {}

pub struct ManagerParams<'a> {
    pub actor_id: ActorId,
    pub wasm: &'a [u8],
    pub scope: Scope,
    pub tx: broadcast::Sender<MessageEnvelope>,
    pub rx: broadcast::Receiver<MessageEnvelope>,
    pub context: Arc<WasmindContext>,
    pub actor_config: Option<toml::Table>,
    pub engine: Engine,
}

pub struct Manager {
    actor_id: ActorId,
    actor_world: ActorWorld,
    actor_resource: ResourceAny,
    store: Store<ActorState>,
    tx: broadcast::Sender<MessageEnvelope>,
    rx: broadcast::Receiver<MessageEnvelope>,
    scope: Scope,
}

impl Manager {
    pub async fn new(params: ManagerParams<'_>) -> Self {
        let component = match Component::from_binary(&params.engine, params.wasm) {
            Ok(component) => component,
            Err(e) => panic!(
                "Error creating wasm component for: {} - {e:?}",
                params.actor_id
            ),
        };

        let mut store = Store::new(
            &params.engine,
            ActorState::new(
                params.actor_id.clone(),
                params.scope.clone(),
                params.tx.clone(),
                params.context,
            ),
        );

        let mut linker = Linker::new(&params.engine);
        wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
        messaging::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
        command::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
        http::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
        logger::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
        agent::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
        host_info::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();

        let actor_world = ActorWorld::instantiate_async(&mut store, &component, &linker)
            .await
            .unwrap();

        let config_str = params
            .actor_config
            .map(|c| toml::to_string(&c).unwrap_or_default())
            .unwrap_or_default();

        let actor_resource = actor_world
            .wasmind_actor_actor()
            .actor()
            .call_constructor(&mut store, &params.scope.to_string(), &config_str)
            .await
            .expect("Actor constructor (new/main function) failed - check logs for detailed error message");

        Manager {
            actor_id: params.actor_id,
            store,
            actor_resource,
            actor_world,
            tx: params.tx,
            rx: params.rx,
            scope: params.scope,
        }
    }

    pub fn run(mut self) {
        tracing::info_span!("actor_lifecycle", actor_id = self.actor_id).in_scope(move || {
            tokio::spawn(async move {
                let _ = self.tx.send(MessageEnvelope {
                    id: crate::utils::generate_root_correlation_id(),
                    message_type: actors::ActorReady::MESSAGE_TYPE.to_string(),
                    from_actor_id: self.actor_id.to_string(),
                    from_scope: self.scope.to_string(),
                    payload: serde_json::to_string(&actors::ActorReady)
                        .unwrap()
                        .into_bytes(),
                });

                loop {
                    match self.rx.recv().await {
                        Ok(msg) => {
                            if msg.from_scope == self.scope
                                && msg.message_type == actors::Exit::MESSAGE_TYPE
                            {
                                break;
                            } else {
                                let span = tracing::span!(
                                    Level::ERROR,
                                    "wasmind_actor_manager",
                                    correlation_id = msg.id
                                );

                                self.store.data_mut().current_message_id = Some(msg.id.clone());

                                if let Err(e) = self
                                    .actor_world
                                    .wasmind_actor_actor()
                                    .actor()
                                    .call_handle_message(&mut self.store, self.actor_resource, &msg)
                                    .instrument(span)
                                    .await
                                {
                                    tracing::error!("Calling handle_message: {e:?}");
                                }

                                self.store.data_mut().current_message_id = None;
                            }
                        }
                        Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                            tracing::error!(
                                "Receiver lagged by {n} messages! This was unexpected.",
                            );
                        }
                        Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                            tracing::error!("Channel closed");
                        }
                    }
                }

                if let Err(e) = self
                    .actor_world
                    .wasmind_actor_actor()
                    .actor()
                    .call_destructor(&mut self.store, self.actor_resource)
                    .await
                {
                    tracing::error!("Calling destructor: {e:?}");
                }
            });
        });
    }
}