1mod heartbeat;
9use ractor::async_trait;
13use ractor::factory::*;
14use ractor::{Actor, ActorProcessingErr, ActorRef, SpawnErr};
15use tokio::task::JoinHandle;
16
17#[derive(Debug, Clone)]
22pub enum AlertMessage {
23 Heartbeat,
26
27 }
32
33pub struct AlertsState {
38 api_url: String,
39 identity: std::sync::Arc<crate::AgentIdentity>,
40 workspace_id: Option<String>,
41 claim_token: Option<String>,
42 storage_path: std::path::PathBuf,
43 heartbeat_count: u64,
44 http_client: reqwest_middleware::ClientWithMiddleware,
45}
46
47pub struct AlertsWorkerArgs {
58 pub api_url: String,
59 pub identity: std::sync::Arc<crate::AgentIdentity>,
60 pub workspace_id: Option<String>,
61 pub storage_path: std::path::PathBuf,
62}
63
64pub struct AlertsWorker;
69
70#[async_trait]
71impl Worker for AlertsWorker {
72 type Key = ();
73 type Message = AlertMessage;
74 type State = AlertsState;
75 type Arguments = AlertsWorkerArgs;
76
77 async fn pre_start(
78 &self,
79 wid: WorkerId,
80 _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
81 args: AlertsWorkerArgs,
82 ) -> Result<AlertsState, ActorProcessingErr> {
83 tracing::debug!("AlertsWorker {} starting", wid);
84 tracing::debug!(" API URL: {}", args.api_url);
85 tracing::debug!(" PeerId: {}", args.identity.peer_id());
86
87 let claim_token = crate::identity::read_claim_token(&args.storage_path);
89 if claim_token.is_some() {
90 tracing::debug!(" Loaded claim_token from storage");
91 }
92
93 Ok(AlertsState {
94 api_url: args.api_url,
95 identity: args.identity,
96 workspace_id: args.workspace_id,
97 claim_token,
98 storage_path: args.storage_path,
99 heartbeat_count: 0,
100 http_client: crate::http::build_http_client(),
101 })
102 }
103
104 async fn post_start(
105 &self,
106 wid: WorkerId,
107 _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
108 _state: &mut AlertsState,
109 ) -> Result<(), ActorProcessingErr> {
110 tracing::debug!("AlertsWorker {} ready", wid);
113 Ok(())
114 }
115
116 async fn handle(
117 &self,
118 _wid: WorkerId,
119 _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
120 Job { msg, key, .. }: Job<(), AlertMessage>,
121 state: &mut AlertsState,
122 ) -> Result<(), ActorProcessingErr> {
123 match msg {
125 AlertMessage::Heartbeat => {
126 match heartbeat::handle_heartbeat(
128 &state.api_url,
129 &state.identity,
130 &state.http_client,
131 &mut state.heartbeat_count,
132 )
133 .await
134 {
135 Ok(()) => {
136 }
138 Err(e) if e.to_string().contains("404") => {
139 tracing::info!("Heartbeat 404 - registering agent");
141
142 match heartbeat::handle_register(
143 &state.api_url,
144 &state.identity,
145 &state.workspace_id,
146 &state.claim_token,
147 &state.http_client,
148 )
149 .await
150 {
151 Ok(response) => {
152 tracing::info!(
153 "Agent registered as '{}' (id: {}, is_new: {})",
154 response.name,
155 response.agent_id,
156 response.is_new
157 );
158
159 let name_to_save = Some(response.name.as_str());
161 let claim_token_to_save = response.claim_token.as_deref();
162
163 match crate::identity::update_agent_toml_registration(
164 &state.storage_path,
165 name_to_save,
166 claim_token_to_save,
167 ) {
168 Ok(_) => {
169 tracing::debug!("Updated registration in agent.toml");
170 if let Some(ref new_claim_token) = response.claim_token {
171 state.claim_token = Some(new_claim_token.clone());
172 }
173 }
174 Err(e) => {
175 tracing::error!("Failed to update agent.toml: {}", e);
176 }
177 }
178
179 if let Err(e) = heartbeat::handle_heartbeat(
182 &state.api_url,
183 &state.identity,
184 &state.http_client,
185 &mut state.heartbeat_count,
186 )
187 .await
188 {
189 tracing::warn!("Post-registration heartbeat failed: {}", e);
190 }
191 }
192 Err(e) => {
193 tracing::warn!("Registration failed: {}", e);
194 }
196 }
197 }
198 Err(e) => {
199 tracing::warn!("Heartbeat failed: {}", e);
201 }
202 }
203 }
204 }
206
207 Ok(key)
208 }
209
210 async fn post_stop(
211 &self,
212 wid: WorkerId,
213 _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
214 state: &mut AlertsState,
215 ) -> Result<(), ActorProcessingErr> {
216 tracing::debug!(
217 "AlertsWorker {} stopped (sent {} heartbeats)",
218 wid,
219 state.heartbeat_count
220 );
221 Ok(())
222 }
223}
224
225pub struct AlertsWorkerBuilder {
230 identity: std::sync::Arc<crate::AgentIdentity>,
231 workspace_id: Option<String>,
232 api_url: Option<String>,
233 storage_path: std::path::PathBuf,
234}
235
236impl WorkerBuilder<AlertsWorker, AlertsWorkerArgs> for AlertsWorkerBuilder {
237 fn build(&mut self, _wid: WorkerId) -> (AlertsWorker, AlertsWorkerArgs) {
238 (
239 AlertsWorker,
240 AlertsWorkerArgs {
241 api_url: self.api_url.clone().unwrap_or_else(crate::constants::api_url),
243 identity: self.identity.clone(),
244 workspace_id: self.workspace_id.clone(),
245 storage_path: self.storage_path.clone(),
246 },
247 )
248 }
249}
250
251pub struct AlertsFactoryHooks;
256
257#[async_trait]
258impl FactoryLifecycleHooks<(), AlertMessage> for AlertsFactoryHooks {
259 async fn on_factory_started(
260 &self,
261 _factory: ActorRef<FactoryMessage<(), AlertMessage>>,
262 ) -> Result<(), ActorProcessingErr> {
263 tracing::debug!("Alerts started");
264 Ok(())
265 }
266
267 async fn on_factory_stopped(&self) -> Result<(), ActorProcessingErr> {
268 tracing::info!("Alerts stopped");
269 Ok(())
270 }
271
272 async fn on_factory_draining(
273 &self,
274 _factory: ActorRef<FactoryMessage<(), AlertMessage>>,
275 ) -> Result<(), ActorProcessingErr> {
276 tracing::debug!("AlertsFactory draining");
277 Ok(())
278 }
279}
280
281pub async fn spawn_alerts_factory(
289 identity: std::sync::Arc<crate::AgentIdentity>,
290 workspace_id: Option<String>,
291 api_url: Option<String>,
292 storage_path: std::path::PathBuf,
293) -> Result<(ActorRef<FactoryMessage<(), AlertMessage>>, JoinHandle<()>), SpawnErr> {
294 tracing::debug!("Spawning AlertsFactory...");
295
296 let factory_def = Factory::<
297 (),
298 AlertMessage,
299 AlertsWorkerArgs,
300 AlertsWorker,
301 routing::QueuerRouting<(), AlertMessage>,
302 queues::DefaultQueue<(), AlertMessage>,
303 >::default();
304
305 let factory_args = FactoryArguments::builder()
306 .worker_builder(Box::new(AlertsWorkerBuilder {
307 identity,
308 workspace_id,
309 api_url,
310 storage_path,
311 }))
312 .queue(Default::default())
313 .router(Default::default())
314 .num_initial_workers(1)
315 .lifecycle_hooks(Box::new(AlertsFactoryHooks))
316 .build();
317
318 let (factory, factory_handle) = Actor::spawn(
319 Some("AlertsFactory".to_string()),
320 factory_def,
321 factory_args,
322 )
323 .await
324 .map_err(|e| SpawnErr::StartupFailed(Box::new(e)))?;
325
326 tracing::debug!("\u{2713} AlertsFactory spawned successfully (1 worker)");
327
328 Ok((factory, factory_handle))
329}