use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{Instrument, Level};
use wasmind::actor::{agent, command, host_info, http, logger, messaging};
use wasmind_actor_utils::{common_messages::actors, messages::Message};
use wasmtime::{
Engine, Store,
component::{Component, HasSelf, Linker, ResourceAny, bindgen},
};
use crate::{context::WasmindContext, scope::Scope};
pub mod actor_state;
pub use actor_state::{ActorState, command::CommandResource};
use super::MessageEnvelope;
pub type ActorId = String;
bindgen!({
world: "actor-world",
with: {
"wasmind:actor/command/cmd": CommandResource,
"wasmind:actor/http/request": actor_state::http::HttpRequestResource,
},
path: "wit/world.wit",
imports: { default: async },
exports: { default: async },
});
impl PartialEq for MessageEnvelope {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl PartialOrd for MessageEnvelope {
fn partial_cmp(&self, _other: &Self) -> Option<std::cmp::Ordering> {
None
}
}
impl Eq for MessageEnvelope {}
pub struct ManagerParams<'a> {
pub actor_id: ActorId,
pub wasm: &'a [u8],
pub scope: Scope,
pub tx: broadcast::Sender<MessageEnvelope>,
pub rx: broadcast::Receiver<MessageEnvelope>,
pub context: Arc<WasmindContext>,
pub actor_config: Option<toml::Table>,
pub engine: Engine,
}
pub struct Manager {
actor_id: ActorId,
actor_world: ActorWorld,
actor_resource: ResourceAny,
store: Store<ActorState>,
tx: broadcast::Sender<MessageEnvelope>,
rx: broadcast::Receiver<MessageEnvelope>,
scope: Scope,
}
impl Manager {
pub async fn new(params: ManagerParams<'_>) -> Self {
let component = match Component::from_binary(¶ms.engine, params.wasm) {
Ok(component) => component,
Err(e) => panic!(
"Error creating wasm component for: {} - {e:?}",
params.actor_id
),
};
let mut store = Store::new(
¶ms.engine,
ActorState::new(
params.actor_id.clone(),
params.scope.clone(),
params.tx.clone(),
params.context,
),
);
let mut linker = Linker::new(¶ms.engine);
wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
messaging::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
command::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
http::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
logger::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
agent::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
host_info::add_to_linker::<_, HasSelf<_>>(&mut linker, |state| state).unwrap();
let actor_world = ActorWorld::instantiate_async(&mut store, &component, &linker)
.await
.unwrap();
let config_str = params
.actor_config
.map(|c| toml::to_string(&c).unwrap_or_default())
.unwrap_or_default();
let actor_resource = actor_world
.wasmind_actor_actor()
.actor()
.call_constructor(&mut store, ¶ms.scope.to_string(), &config_str)
.await
.expect("Actor constructor (new/main function) failed - check logs for detailed error message");
Manager {
actor_id: params.actor_id,
store,
actor_resource,
actor_world,
tx: params.tx,
rx: params.rx,
scope: params.scope,
}
}
pub fn run(mut self) {
tracing::info_span!("actor_lifecycle", actor_id = self.actor_id).in_scope(move || {
tokio::spawn(async move {
let _ = self.tx.send(MessageEnvelope {
id: crate::utils::generate_root_correlation_id(),
message_type: actors::ActorReady::MESSAGE_TYPE.to_string(),
from_actor_id: self.actor_id.to_string(),
from_scope: self.scope.to_string(),
payload: serde_json::to_string(&actors::ActorReady)
.unwrap()
.into_bytes(),
});
loop {
match self.rx.recv().await {
Ok(msg) => {
if msg.from_scope == self.scope
&& msg.message_type == actors::Exit::MESSAGE_TYPE
{
break;
} else {
let span = tracing::span!(
Level::ERROR,
"wasmind_actor_manager",
correlation_id = msg.id
);
self.store.data_mut().current_message_id = Some(msg.id.clone());
if let Err(e) = self
.actor_world
.wasmind_actor_actor()
.actor()
.call_handle_message(&mut self.store, self.actor_resource, &msg)
.instrument(span)
.await
{
tracing::error!("Calling handle_message: {e:?}");
}
self.store.data_mut().current_message_id = None;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::error!(
"Receiver lagged by {n} messages! This was unexpected.",
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::error!("Channel closed");
}
}
}
if let Err(e) = self
.actor_world
.wasmind_actor_actor()
.actor()
.call_destructor(&mut self.store, self.actor_resource)
.await
{
tracing::error!("Calling destructor: {e:?}");
}
});
});
}
}