1use snafu::ResultExt;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4use tokio::sync::broadcast;
5use tracing::Level;
6use wasmind_actor_utils::STARTING_SCOPE;
7use wasmind_actor_utils::common_messages::actors;
8use wasmind_actor_utils::messages::Message;
9
10use crate::SerializationSnafu;
11use crate::{WasmindResult, actors::MessageEnvelope, context::WasmindContext, scope::Scope};
12
13pub struct WasmindCoordinator {
15 rx: broadcast::Receiver<MessageEnvelope>,
17
18 context: Arc<WasmindContext>,
20
21 ready_actors: HashMap<Scope, HashSet<String>>,
23
24 replayable: Vec<MessageEnvelope>,
26}
27
28impl WasmindCoordinator {
29 pub fn new(context: Arc<WasmindContext>) -> Self {
30 let rx = context.tx.subscribe();
31 Self {
32 rx,
33 context,
34 ready_actors: HashMap::new(),
35 replayable: vec![],
36 }
37 }
38
39 pub async fn start_wasmind(
40 &self,
41 starting_actors: &[&str],
42 root_agent_name: String,
43 ) -> WasmindResult<Scope> {
44 self.context
45 .spawn_agent_in_scope(
46 starting_actors,
47 STARTING_SCOPE.to_string(),
48 root_agent_name,
49 None,
50 )
51 .await?;
52 Ok(STARTING_SCOPE.to_string())
53 }
54
55 pub async fn run(mut self) -> WasmindResult<()> {
57 loop {
58 match self.rx.recv().await {
59 Ok(msg) => {
60 let span = tracing::span!(
61 Level::ERROR,
62 "wasmind_coordinator_run",
63 correlation_id = msg.id
64 );
65 let _enter = span.enter();
66
67 let message_json =
68 if let Ok(json_string) = String::from_utf8(msg.payload.clone()) {
69 json_string
70 } else {
71 "na".to_string()
72 };
73 tracing::debug!(
74 name = "wasmind_coordinator_received_message",
75 scope = msg.from_scope,
76 actor_id = msg.from_actor_id,
77 message_type = msg.message_type,
78 message = %message_json
79 );
80
81 match msg.message_type.as_str() {
82 actors::ActorReady::MESSAGE_TYPE => {
83 self.handle_actor_ready(msg)?;
84 }
85 actors::Exit::MESSAGE_TYPE => {
86 if msg.from_scope == STARTING_SCOPE {
87 tracing::info!("Starting scope exited, shutting down system");
88 return Ok(());
89 }
90 tracing::info!("Scope {} is shutting down", msg.from_scope);
91 }
92 _ => {}
93 }
94 }
95 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
96 tracing::error!("Coordinator receiver lagged by {} messages", n);
97 }
98 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
99 tracing::error!("Channel closed");
100 return Err(crate::Error::ChannelClosed);
101 }
102 }
103 }
104 }
105
106 fn handle_actor_ready(&mut self, msg: MessageEnvelope) -> WasmindResult<()> {
107 let scope = msg.from_scope.clone();
108
109 self.ready_actors
110 .entry(scope.clone())
111 .or_default()
112 .insert(msg.from_actor_id.clone());
113
114 if let Some(expected_actors) = self.context.scope_tracking.lock().get(&scope) {
115 let ready_count = self.ready_actors.get(&scope).map(|s| s.len()).unwrap_or(0);
116 let expected_count = expected_actors.len();
117
118 tracing::debug!(
119 "Scope {} has {}/{} actors ready",
120 scope,
121 ready_count,
122 expected_count
123 );
124
125 if ready_count == expected_count {
126 tracing::info!("All actors ready for scope {}", scope);
127
128 let all_ready_msg = MessageEnvelope {
129 id: crate::utils::generate_root_correlation_id(),
130 message_type: actors::AllActorsReady::MESSAGE_TYPE.to_string(),
131 from_actor_id: "wasmind_coordinator".to_string(),
132 from_scope: scope.to_string(),
133 payload: serde_json::to_string(&actors::AllActorsReady)
134 .unwrap()
135 .into_bytes(),
136 };
137
138 if let Err(e) = self.context.tx.send(all_ready_msg) {
139 tracing::error!("Failed to broadcast AllActorsReady: {}", e);
140 }
141
142 for message in &self.replayable {
143 if let Err(e) = self.context.tx.send(message.clone()) {
144 tracing::error!("Failed to broadcast Replayable Message: {}", e);
145 }
146 }
147 }
148 }
149
150 Ok(())
151 }
152
153 pub fn broadcast_common_message<T>(&mut self, message: T, replayable: bool) -> WasmindResult<()>
154 where
155 T: wasmind_actor_utils::common_messages::Message + Clone,
156 {
157 self.broadcast_common_message_in_scope(message, &STARTING_SCOPE.to_string(), replayable)
158 }
159
160 pub fn broadcast_common_message_in_scope<T>(
161 &mut self,
162 message: T,
163 scope: &Scope,
164 replayable: bool,
165 ) -> WasmindResult<()>
166 where
167 T: wasmind_actor_utils::common_messages::Message + Clone,
168 {
169 let message_envelope = MessageEnvelope {
170 id: crate::utils::generate_root_correlation_id(),
171 from_actor_id: "wasmind__coordinator".to_string(),
172 from_scope: scope.to_owned(),
173 message_type: T::MESSAGE_TYPE.to_string(),
174 payload: serde_json::to_vec(&message).context(SerializationSnafu {
175 message: "Failed to serialize message for broadcast",
176 })?,
177 };
178 if replayable {
179 self.replayable.push(message_envelope.clone());
180 }
181 self.context.broadcast(message_envelope)
182 }
183
184 pub fn get_sender(&self) -> broadcast::Sender<MessageEnvelope> {
186 self.context.tx.clone()
187 }
188}