wasmind/
coordinator.rs

1use snafu::ResultExt;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4use tokio::sync::broadcast;
5use tracing::Level;
6use wasmind_actor_utils::STARTING_SCOPE;
7use wasmind_actor_utils::common_messages::actors;
8use wasmind_actor_utils::messages::Message;
9
10use crate::SerializationSnafu;
11use crate::{WasmindResult, actors::MessageEnvelope, context::WasmindContext, scope::Scope};
12
13/// Coordinator that monitors actor lifecycle and system exit
14pub struct WasmindCoordinator {
15    /// Receiver for monitoring messages
16    rx: broadcast::Receiver<MessageEnvelope>,
17
18    /// Reference to context for scope tracking info
19    context: Arc<WasmindContext>,
20
21    /// Track which actors have sent ActorReady per scope
22    ready_actors: HashMap<Scope, HashSet<String>>,
23
24    /// Replayable messages which are broadcasted everytime a new agent is spawned
25    replayable: Vec<MessageEnvelope>,
26}
27
28impl WasmindCoordinator {
29    pub fn new(context: Arc<WasmindContext>) -> Self {
30        let rx = context.tx.subscribe();
31        Self {
32            rx,
33            context,
34            ready_actors: HashMap::new(),
35            replayable: vec![],
36        }
37    }
38
39    pub async fn start_wasmind(
40        &self,
41        starting_actors: &[&str],
42        root_agent_name: String,
43    ) -> WasmindResult<Scope> {
44        self.context
45            .spawn_agent_in_scope(
46                starting_actors,
47                STARTING_SCOPE.to_string(),
48                root_agent_name,
49                None,
50            )
51            .await?;
52        Ok(STARTING_SCOPE.to_string())
53    }
54
55    /// Run the coordinator until system exit
56    pub async fn run(mut self) -> WasmindResult<()> {
57        loop {
58            match self.rx.recv().await {
59                Ok(msg) => {
60                    let span = tracing::span!(
61                        Level::ERROR,
62                        "wasmind_coordinator_run",
63                        correlation_id = msg.id
64                    );
65                    let _enter = span.enter();
66
67                    let message_json =
68                        if let Ok(json_string) = String::from_utf8(msg.payload.clone()) {
69                            json_string
70                        } else {
71                            "na".to_string()
72                        };
73                    tracing::debug!(
74                        name = "wasmind_coordinator_received_message",
75                        scope = msg.from_scope,
76                        actor_id = msg.from_actor_id,
77                        message_type = msg.message_type,
78                        message = %message_json
79                    );
80
81                    match msg.message_type.as_str() {
82                        actors::ActorReady::MESSAGE_TYPE => {
83                            self.handle_actor_ready(msg)?;
84                        }
85                        actors::Exit::MESSAGE_TYPE => {
86                            if msg.from_scope == STARTING_SCOPE {
87                                tracing::info!("Starting scope exited, shutting down system");
88                                return Ok(());
89                            }
90                            tracing::info!("Scope {} is shutting down", msg.from_scope);
91                        }
92                        _ => {}
93                    }
94                }
95                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
96                    tracing::error!("Coordinator receiver lagged by {} messages", n);
97                }
98                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
99                    tracing::error!("Channel closed");
100                    return Err(crate::Error::ChannelClosed);
101                }
102            }
103        }
104    }
105
106    fn handle_actor_ready(&mut self, msg: MessageEnvelope) -> WasmindResult<()> {
107        let scope = msg.from_scope.clone();
108
109        self.ready_actors
110            .entry(scope.clone())
111            .or_default()
112            .insert(msg.from_actor_id.clone());
113
114        if let Some(expected_actors) = self.context.scope_tracking.lock().get(&scope) {
115            let ready_count = self.ready_actors.get(&scope).map(|s| s.len()).unwrap_or(0);
116            let expected_count = expected_actors.len();
117
118            tracing::debug!(
119                "Scope {} has {}/{} actors ready",
120                scope,
121                ready_count,
122                expected_count
123            );
124
125            if ready_count == expected_count {
126                tracing::info!("All actors ready for scope {}", scope);
127
128                let all_ready_msg = MessageEnvelope {
129                    id: crate::utils::generate_root_correlation_id(),
130                    message_type: actors::AllActorsReady::MESSAGE_TYPE.to_string(),
131                    from_actor_id: "wasmind_coordinator".to_string(),
132                    from_scope: scope.to_string(),
133                    payload: serde_json::to_string(&actors::AllActorsReady)
134                        .unwrap()
135                        .into_bytes(),
136                };
137
138                if let Err(e) = self.context.tx.send(all_ready_msg) {
139                    tracing::error!("Failed to broadcast AllActorsReady: {}", e);
140                }
141
142                for message in &self.replayable {
143                    if let Err(e) = self.context.tx.send(message.clone()) {
144                        tracing::error!("Failed to broadcast Replayable Message: {}", e);
145                    }
146                }
147            }
148        }
149
150        Ok(())
151    }
152
153    pub fn broadcast_common_message<T>(&mut self, message: T, replayable: bool) -> WasmindResult<()>
154    where
155        T: wasmind_actor_utils::common_messages::Message + Clone,
156    {
157        self.broadcast_common_message_in_scope(message, &STARTING_SCOPE.to_string(), replayable)
158    }
159
160    pub fn broadcast_common_message_in_scope<T>(
161        &mut self,
162        message: T,
163        scope: &Scope,
164        replayable: bool,
165    ) -> WasmindResult<()>
166    where
167        T: wasmind_actor_utils::common_messages::Message + Clone,
168    {
169        let message_envelope = MessageEnvelope {
170            id: crate::utils::generate_root_correlation_id(),
171            from_actor_id: "wasmind__coordinator".to_string(),
172            from_scope: scope.to_owned(),
173            message_type: T::MESSAGE_TYPE.to_string(),
174            payload: serde_json::to_vec(&message).context(SerializationSnafu {
175                message: "Failed to serialize message for broadcast",
176            })?,
177        };
178        if replayable {
179            self.replayable.push(message_envelope.clone());
180        }
181        self.context.broadcast(message_envelope)
182    }
183
184    /// Get the broadcast sender for sending messages to the system
185    pub fn get_sender(&self) -> broadcast::Sender<MessageEnvelope> {
186        self.context.tx.clone()
187    }
188}