1mod heartbeat;
9use ractor::async_trait;
13use ractor::factory::*;
14use ractor::{Actor, ActorProcessingErr, ActorRef, SpawnErr};
15use tokio::task::JoinHandle;
16
17#[derive(Debug, Clone)]
22pub enum AlertMessage {
23 Heartbeat,
26
27 }
32
33pub struct AlertsState {
38 api_url: String,
39 identity: std::sync::Arc<crate::AgentIdentity>,
40 workspace_id: Option<String>,
41 claim_token: Option<String>,
42 storage_path: std::path::PathBuf,
43 heartbeat_count: u64,
44 http_client: reqwest_middleware::ClientWithMiddleware,
45}
46
47impl AlertsState {
48 async fn try_register(&mut self) -> bool {
52 match heartbeat::handle_register(
53 &self.api_url,
54 &self.identity,
55 &self.workspace_id,
56 &self.claim_token,
57 &self.http_client,
58 )
59 .await
60 {
61 Ok(response) => {
62 tracing::info!(
63 "Agent registered as '{}' (id: {}, is_new: {})",
64 response.name,
65 response.agent_id,
66 response.is_new
67 );
68
69 if let Some(ref new_token) = response.claim_token {
71 self.claim_token = Some(new_token.clone());
72 }
73
74 if let Err(e) = crate::identity::update_agent_toml_registration(
76 &self.storage_path,
77 Some(response.name.as_str()),
78 response.claim_token.as_deref(),
79 ) {
80 tracing::error!("Failed to update agent.toml: {}", e);
81 }
82
83 true
84 }
85 Err(e) => {
86 if self.claim_token.is_some() {
87 tracing::warn!(
88 "Registration failed: {} - continuing with existing claim_token",
89 e
90 );
91 } else {
92 tracing::warn!(
95 "Registration failed and no existing claim_token: {} - agent will remain unclaimed",
96 e
97 );
98 }
99 false
100 }
101 }
102 }
103}
104
105pub struct AlertsWorkerArgs {
116 pub api_url: String,
117 pub identity: std::sync::Arc<crate::AgentIdentity>,
118 pub workspace_id: Option<String>,
119 pub storage_path: std::path::PathBuf,
120}
121
122pub struct AlertsWorker;
127
128#[async_trait]
129impl Worker for AlertsWorker {
130 type Key = ();
131 type Message = AlertMessage;
132 type State = AlertsState;
133 type Arguments = AlertsWorkerArgs;
134
135 async fn pre_start(
136 &self,
137 wid: WorkerId,
138 _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
139 args: AlertsWorkerArgs,
140 ) -> Result<AlertsState, ActorProcessingErr> {
141 tracing::debug!("AlertsWorker {} starting", wid);
142 tracing::debug!(" API URL: {}", args.api_url);
143 tracing::debug!(" PeerId: {}", args.identity.peer_id());
144
145 let claim_token = crate::identity::read_claim_token(&args.storage_path);
147 if claim_token.is_some() {
148 tracing::debug!(" Loaded claim_token from storage");
149 }
150
151 Ok(AlertsState {
152 api_url: args.api_url,
153 identity: args.identity,
154 workspace_id: args.workspace_id,
155 claim_token,
156 storage_path: args.storage_path,
157 heartbeat_count: 0,
158 http_client: crate::http::build_http_client(),
159 })
160 }
161
162 async fn post_start(
163 &self,
164 wid: WorkerId,
165 _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
166 state: &mut AlertsState,
167 ) -> Result<(), ActorProcessingErr> {
168 state.try_register().await;
170 tracing::debug!("AlertsWorker {} ready", wid);
171 Ok(())
172 }
173
174 async fn handle(
175 &self,
176 _wid: WorkerId,
177 _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
178 Job { msg, key, .. }: Job<(), AlertMessage>,
179 state: &mut AlertsState,
180 ) -> Result<(), ActorProcessingErr> {
181 match msg {
183 AlertMessage::Heartbeat => {
184 match heartbeat::handle_heartbeat(
186 &state.api_url,
187 &state.identity,
188 &state.http_client,
189 &mut state.heartbeat_count,
190 )
191 .await
192 {
193 Ok(()) => {
194 }
196 Err(e) if e.to_string().contains("404") => {
197 tracing::info!("Heartbeat 404 - re-registering agent");
199
200 if state.try_register().await {
201 if let Err(e) = heartbeat::handle_heartbeat(
204 &state.api_url,
205 &state.identity,
206 &state.http_client,
207 &mut state.heartbeat_count,
208 )
209 .await
210 {
211 tracing::warn!("Post-registration heartbeat failed: {}", e);
212 }
213 }
214 }
215 Err(e) => {
216 tracing::warn!("Heartbeat failed: {}", e);
218 }
219 }
220 }
221 }
223
224 Ok(key)
225 }
226
227 async fn post_stop(
228 &self,
229 wid: WorkerId,
230 _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
231 state: &mut AlertsState,
232 ) -> Result<(), ActorProcessingErr> {
233 tracing::debug!(
234 "AlertsWorker {} stopped (sent {} heartbeats)",
235 wid,
236 state.heartbeat_count
237 );
238 Ok(())
239 }
240}
241
242pub struct AlertsWorkerBuilder {
247 identity: std::sync::Arc<crate::AgentIdentity>,
248 workspace_id: Option<String>,
249 api_url: Option<String>,
250 storage_path: std::path::PathBuf,
251}
252
253impl WorkerBuilder<AlertsWorker, AlertsWorkerArgs> for AlertsWorkerBuilder {
254 fn build(&mut self, _wid: WorkerId) -> (AlertsWorker, AlertsWorkerArgs) {
255 (
256 AlertsWorker,
257 AlertsWorkerArgs {
258 api_url: self.api_url.clone().unwrap_or_else(crate::constants::api_url),
260 identity: self.identity.clone(),
261 workspace_id: self.workspace_id.clone(),
262 storage_path: self.storage_path.clone(),
263 },
264 )
265 }
266}
267
268pub struct AlertsFactoryHooks;
273
274#[async_trait]
275impl FactoryLifecycleHooks<(), AlertMessage> for AlertsFactoryHooks {
276 async fn on_factory_started(
277 &self,
278 _factory: ActorRef<FactoryMessage<(), AlertMessage>>,
279 ) -> Result<(), ActorProcessingErr> {
280 tracing::debug!("Alerts started");
281 Ok(())
282 }
283
284 async fn on_factory_stopped(&self) -> Result<(), ActorProcessingErr> {
285 tracing::info!("Alerts stopped");
286 Ok(())
287 }
288
289 async fn on_factory_draining(
290 &self,
291 _factory: ActorRef<FactoryMessage<(), AlertMessage>>,
292 ) -> Result<(), ActorProcessingErr> {
293 tracing::debug!("AlertsFactory draining");
294 Ok(())
295 }
296}
297
298pub async fn spawn_alerts_factory(
306 identity: std::sync::Arc<crate::AgentIdentity>,
307 workspace_id: Option<String>,
308 api_url: Option<String>,
309 storage_path: std::path::PathBuf,
310) -> Result<(ActorRef<FactoryMessage<(), AlertMessage>>, JoinHandle<()>), SpawnErr> {
311 tracing::debug!("Spawning AlertsFactory...");
312
313 let factory_def = Factory::<
314 (),
315 AlertMessage,
316 AlertsWorkerArgs,
317 AlertsWorker,
318 routing::QueuerRouting<(), AlertMessage>,
319 queues::DefaultQueue<(), AlertMessage>,
320 >::default();
321
322 let factory_args = FactoryArguments::builder()
323 .worker_builder(Box::new(AlertsWorkerBuilder {
324 identity,
325 workspace_id,
326 api_url,
327 storage_path,
328 }))
329 .queue(Default::default())
330 .router(Default::default())
331 .num_initial_workers(1)
332 .lifecycle_hooks(Box::new(AlertsFactoryHooks))
333 .build();
334
335 let (factory, factory_handle) = Actor::spawn(
336 Some("AlertsFactory".to_string()),
337 factory_def,
338 factory_args,
339 )
340 .await
341 .map_err(|e| SpawnErr::StartupFailed(Box::new(e)))?;
342
343 tracing::debug!("\u{2713} AlertsFactory spawned successfully (1 worker)");
344
345 Ok((factory, factory_handle))
346}