Skip to main content

loa_core/core/alerts/
mod.rs

1//! Alerts Factory - Manages workers for different alert types
2//!
3//! The factory routes alert messages to specific worker implementations:
4//! - heartbeat: API authentication and liveness checks
5//! - email: Send email notifications (future)
6//! - slack: Send Slack notifications (future)
7
8mod heartbeat;
9// Future: mod email;
10// Future: mod slack;
11
12use ractor::async_trait;
13use ractor::factory::*;
14use ractor::{Actor, ActorProcessingErr, ActorRef, SpawnErr};
15use tokio::task::JoinHandle;
16
17// ============================================================================
18// MESSAGES
19// ============================================================================
20
21#[derive(Debug, Clone)]
22pub enum AlertMessage {
23    /// Send a heartbeat to the backend API
24    /// Returns ws_url in response for ConfigManager to connect WebSocket
25    Heartbeat,
26
27    // Future alert types:
28    // Email { to: String, subject: String, body: String },
29    // Slack { channel: String, message: String },
30    // PagerDuty { severity: String, summary: String },
31}
32
33// ============================================================================
34// WORKER STATE
35// ============================================================================
36
37pub struct AlertsState {
38    api_url: String,
39    identity: std::sync::Arc<crate::AgentIdentity>,
40    workspace_id: Option<String>,
41    claim_token: Option<String>,
42    storage_path: std::path::PathBuf,
43    heartbeat_count: u64,
44    http_client: reqwest_middleware::ClientWithMiddleware,
45}
46
47// ============================================================================
48// CONFIGURATION
49// ============================================================================
50//
51// Note: API URL is now defined in crate::constants module for centralized management
52
53// ============================================================================
54// WORKER ARGUMENTS
55// ============================================================================
56
57pub struct AlertsWorkerArgs {
58    pub api_url: String,
59    pub identity: std::sync::Arc<crate::AgentIdentity>,
60    pub workspace_id: Option<String>,
61    pub storage_path: std::path::PathBuf,
62}
63
64// ============================================================================
65// WORKER IMPLEMENTATION
66// ============================================================================
67
68pub struct AlertsWorker;
69
70#[async_trait]
71impl Worker for AlertsWorker {
72    type Key = ();
73    type Message = AlertMessage;
74    type State = AlertsState;
75    type Arguments = AlertsWorkerArgs;
76
77    async fn pre_start(
78        &self,
79        wid: WorkerId,
80        _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
81        args: AlertsWorkerArgs,
82    ) -> Result<AlertsState, ActorProcessingErr> {
83        tracing::debug!("AlertsWorker {} starting", wid);
84        tracing::debug!("  API URL: {}", args.api_url);
85        tracing::debug!("  PeerId: {}", args.identity.peer_id());
86
87        // Load claim token from storage (TOML or legacy file)
88        let claim_token = crate::identity::read_claim_token(&args.storage_path);
89        if claim_token.is_some() {
90            tracing::debug!("  Loaded claim_token from storage");
91        }
92
93        Ok(AlertsState {
94            api_url: args.api_url,
95            identity: args.identity,
96            workspace_id: args.workspace_id,
97            claim_token,
98            storage_path: args.storage_path,
99            heartbeat_count: 0,
100            http_client: crate::http::build_http_client(),
101        })
102    }
103
104    async fn post_start(
105        &self,
106        wid: WorkerId,
107        _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
108        _state: &mut AlertsState,
109    ) -> Result<(), ActorProcessingErr> {
110        // Heartbeat-first design: no registration here
111        // First heartbeat will return 404 if unregistered, triggering auto-registration
112        tracing::debug!("AlertsWorker {} ready", wid);
113        Ok(())
114    }
115
116    async fn handle(
117        &self,
118        _wid: WorkerId,
119        _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
120        Job { msg, key, .. }: Job<(), AlertMessage>,
121        state: &mut AlertsState,
122    ) -> Result<(), ActorProcessingErr> {
123        // Route message to specific handler
124        match msg {
125            AlertMessage::Heartbeat => {
126                // Heartbeat-first design: try heartbeat, if 404 → register → heartbeat
127                match heartbeat::handle_heartbeat(
128                    &state.api_url,
129                    &state.identity,
130                    &state.http_client,
131                    &mut state.heartbeat_count,
132                )
133                .await
134                {
135                    Ok(()) => {
136                        // Success - heartbeat handler already sent CheckTier to Config
137                    }
138                    Err(e) if e.to_string().contains("404") => {
139                        // Not registered - register now
140                        tracing::info!("Heartbeat 404 - registering agent");
141
142                        match heartbeat::handle_register(
143                            &state.api_url,
144                            &state.identity,
145                            &state.workspace_id,
146                            &state.claim_token,
147                            &state.http_client,
148                        )
149                        .await
150                        {
151                            Ok(response) => {
152                                tracing::info!(
153                                    "Agent registered as '{}' (id: {}, is_new: {})",
154                                    response.name,
155                                    response.agent_id,
156                                    response.is_new
157                                );
158
159                                // Update registration data in agent.toml
160                                let name_to_save = Some(response.name.as_str());
161                                let claim_token_to_save = response.claim_token.as_deref();
162
163                                match crate::identity::update_agent_toml_registration(
164                                    &state.storage_path,
165                                    name_to_save,
166                                    claim_token_to_save,
167                                ) {
168                                    Ok(_) => {
169                                        tracing::debug!("Updated registration in agent.toml");
170                                        if let Some(ref new_claim_token) = response.claim_token {
171                                            state.claim_token = Some(new_claim_token.clone());
172                                        }
173                                    }
174                                    Err(e) => {
175                                        tracing::error!("Failed to update agent.toml: {}", e);
176                                    }
177                                }
178
179                                // Immediately send heartbeat to trigger config fetch
180                                // (heartbeat response has tier_id → CheckTier → FetchConfig)
181                                if let Err(e) = heartbeat::handle_heartbeat(
182                                    &state.api_url,
183                                    &state.identity,
184                                    &state.http_client,
185                                    &mut state.heartbeat_count,
186                                )
187                                .await
188                                {
189                                    tracing::warn!("Post-registration heartbeat failed: {}", e);
190                                }
191                            }
192                            Err(e) => {
193                                tracing::warn!("Registration failed: {}", e);
194                                // Will retry on next heartbeat tick
195                            }
196                        }
197                    }
198                    Err(e) => {
199                        // Other error (network, 5xx) - retry middleware handles retries
200                        tracing::warn!("Heartbeat failed: {}", e);
201                    }
202                }
203            }
204            // Future alert types will be routed here
205        }
206
207        Ok(key)
208    }
209
210    async fn post_stop(
211        &self,
212        wid: WorkerId,
213        _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
214        state: &mut AlertsState,
215    ) -> Result<(), ActorProcessingErr> {
216        tracing::debug!(
217            "AlertsWorker {} stopped (sent {} heartbeats)",
218            wid,
219            state.heartbeat_count
220        );
221        Ok(())
222    }
223}
224
225// ============================================================================
226// WORKER BUILDER
227// ============================================================================
228
229pub struct AlertsWorkerBuilder {
230    identity: std::sync::Arc<crate::AgentIdentity>,
231    workspace_id: Option<String>,
232    api_url: Option<String>,
233    storage_path: std::path::PathBuf,
234}
235
236impl WorkerBuilder<AlertsWorker, AlertsWorkerArgs> for AlertsWorkerBuilder {
237    fn build(&mut self, _wid: WorkerId) -> (AlertsWorker, AlertsWorkerArgs) {
238        (
239            AlertsWorker,
240            AlertsWorkerArgs {
241                // Use custom api_url if provided, otherwise fall back to default
242                api_url: self.api_url.clone().unwrap_or_else(crate::constants::api_url),
243                identity: self.identity.clone(),
244                workspace_id: self.workspace_id.clone(),
245                storage_path: self.storage_path.clone(),
246            },
247        )
248    }
249}
250
251// ============================================================================
252// FACTORY LIFECYCLE HOOKS
253// ============================================================================
254
255pub struct AlertsFactoryHooks;
256
257#[async_trait]
258impl FactoryLifecycleHooks<(), AlertMessage> for AlertsFactoryHooks {
259    async fn on_factory_started(
260        &self,
261        _factory: ActorRef<FactoryMessage<(), AlertMessage>>,
262    ) -> Result<(), ActorProcessingErr> {
263        tracing::debug!("Alerts started");
264        Ok(())
265    }
266
267    async fn on_factory_stopped(&self) -> Result<(), ActorProcessingErr> {
268        tracing::info!("Alerts stopped");
269        Ok(())
270    }
271
272    async fn on_factory_draining(
273        &self,
274        _factory: ActorRef<FactoryMessage<(), AlertMessage>>,
275    ) -> Result<(), ActorProcessingErr> {
276        tracing::debug!("AlertsFactory draining");
277        Ok(())
278    }
279}
280
281// ============================================================================
282// FACTORY SPAWN HELPER
283// ============================================================================
284
285/// Spawn the Alerts factory with a single worker
286///
287/// Returns a factory ActorRef and its join handle.
288pub async fn spawn_alerts_factory(
289    identity: std::sync::Arc<crate::AgentIdentity>,
290    workspace_id: Option<String>,
291    api_url: Option<String>,
292    storage_path: std::path::PathBuf,
293) -> Result<(ActorRef<FactoryMessage<(), AlertMessage>>, JoinHandle<()>), SpawnErr> {
294    tracing::debug!("Spawning AlertsFactory...");
295
296    let factory_def = Factory::<
297        (),
298        AlertMessage,
299        AlertsWorkerArgs,
300        AlertsWorker,
301        routing::QueuerRouting<(), AlertMessage>,
302        queues::DefaultQueue<(), AlertMessage>,
303    >::default();
304
305    let factory_args = FactoryArguments::builder()
306        .worker_builder(Box::new(AlertsWorkerBuilder {
307            identity,
308            workspace_id,
309            api_url,
310            storage_path,
311        }))
312        .queue(Default::default())
313        .router(Default::default())
314        .num_initial_workers(1)
315        .lifecycle_hooks(Box::new(AlertsFactoryHooks))
316        .build();
317
318    let (factory, factory_handle) = Actor::spawn(
319        Some("AlertsFactory".to_string()),
320        factory_def,
321        factory_args,
322    )
323    .await
324    .map_err(|e| SpawnErr::StartupFailed(Box::new(e)))?;
325
326    tracing::debug!("\u{2713} AlertsFactory spawned successfully (1 worker)");
327
328    Ok((factory, factory_handle))
329}