use snafu::ResultExt;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::Level;
use wasmind_actor_utils::STARTING_SCOPE;
use wasmind_actor_utils::common_messages::actors;
use wasmind_actor_utils::messages::Message;
use crate::SerializationSnafu;
use crate::{WasmindResult, actors::MessageEnvelope, context::WasmindContext, scope::Scope};
pub struct WasmindCoordinator {
rx: broadcast::Receiver<MessageEnvelope>,
context: Arc<WasmindContext>,
ready_actors: HashMap<Scope, HashSet<String>>,
replayable: Vec<MessageEnvelope>,
}
impl WasmindCoordinator {
pub fn new(context: Arc<WasmindContext>) -> Self {
let rx = context.tx.subscribe();
Self {
rx,
context,
ready_actors: HashMap::new(),
replayable: vec![],
}
}
pub async fn start_wasmind(
&self,
starting_actors: &[&str],
root_agent_name: String,
) -> WasmindResult<Scope> {
self.context
.spawn_agent_in_scope(
starting_actors,
STARTING_SCOPE.to_string(),
root_agent_name,
None,
)
.await?;
Ok(STARTING_SCOPE.to_string())
}
pub async fn run(mut self) -> WasmindResult<()> {
loop {
match self.rx.recv().await {
Ok(msg) => {
let span = tracing::span!(
Level::ERROR,
"wasmind_coordinator_run",
correlation_id = msg.id
);
let _enter = span.enter();
let message_json =
if let Ok(json_string) = String::from_utf8(msg.payload.clone()) {
json_string
} else {
"na".to_string()
};
tracing::debug!(
name = "wasmind_coordinator_received_message",
scope = msg.from_scope,
actor_id = msg.from_actor_id,
message_type = msg.message_type,
message = %message_json
);
match msg.message_type.as_str() {
actors::ActorReady::MESSAGE_TYPE => {
self.handle_actor_ready(msg)?;
}
actors::Exit::MESSAGE_TYPE => {
if msg.from_scope == STARTING_SCOPE {
tracing::info!("Starting scope exited, shutting down system");
return Ok(());
}
tracing::info!("Scope {} is shutting down", msg.from_scope);
}
_ => {}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::error!("Coordinator receiver lagged by {} messages", n);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::error!("Channel closed");
return Err(crate::Error::ChannelClosed);
}
}
}
}
fn handle_actor_ready(&mut self, msg: MessageEnvelope) -> WasmindResult<()> {
let scope = msg.from_scope.clone();
self.ready_actors
.entry(scope.clone())
.or_default()
.insert(msg.from_actor_id.clone());
if let Some(expected_actors) = self.context.scope_tracking.lock().get(&scope) {
let ready_count = self.ready_actors.get(&scope).map(|s| s.len()).unwrap_or(0);
let expected_count = expected_actors.len();
tracing::debug!(
"Scope {} has {}/{} actors ready",
scope,
ready_count,
expected_count
);
if ready_count == expected_count {
tracing::info!("All actors ready for scope {}", scope);
let all_ready_msg = MessageEnvelope {
id: crate::utils::generate_root_correlation_id(),
message_type: actors::AllActorsReady::MESSAGE_TYPE.to_string(),
from_actor_id: "wasmind_coordinator".to_string(),
from_scope: scope.to_string(),
payload: serde_json::to_string(&actors::AllActorsReady)
.unwrap()
.into_bytes(),
};
if let Err(e) = self.context.tx.send(all_ready_msg) {
tracing::error!("Failed to broadcast AllActorsReady: {}", e);
}
for message in &self.replayable {
if let Err(e) = self.context.tx.send(message.clone()) {
tracing::error!("Failed to broadcast Replayable Message: {}", e);
}
}
}
}
Ok(())
}
pub fn broadcast_common_message<T>(&mut self, message: T, replayable: bool) -> WasmindResult<()>
where
T: wasmind_actor_utils::common_messages::Message + Clone,
{
self.broadcast_common_message_in_scope(message, &STARTING_SCOPE.to_string(), replayable)
}
pub fn broadcast_common_message_in_scope<T>(
&mut self,
message: T,
scope: &Scope,
replayable: bool,
) -> WasmindResult<()>
where
T: wasmind_actor_utils::common_messages::Message + Clone,
{
let message_envelope = MessageEnvelope {
id: crate::utils::generate_root_correlation_id(),
from_actor_id: "wasmind__coordinator".to_string(),
from_scope: scope.to_owned(),
message_type: T::MESSAGE_TYPE.to_string(),
payload: serde_json::to_vec(&message).context(SerializationSnafu {
message: "Failed to serialize message for broadcast",
})?,
};
if replayable {
self.replayable.push(message_envelope.clone());
}
self.context.broadcast(message_envelope)
}
pub fn get_sender(&self) -> broadcast::Sender<MessageEnvelope> {
self.context.tx.clone()
}
}