Skip to main content

loa_core/core/alerts/
mod.rs

1//! Alerts Factory - Manages workers for different alert types
2//!
3//! The factory routes alert messages to specific worker implementations:
4//! - heartbeat: API authentication and liveness checks
5//! - email: Send email notifications (future)
6//! - slack: Send Slack notifications (future)
7
8mod heartbeat;
9// Future: mod email;
10// Future: mod slack;
11
12use ractor::async_trait;
13use ractor::factory::*;
14use ractor::{Actor, ActorProcessingErr, ActorRef, SpawnErr};
15use tokio::task::JoinHandle;
16
17// ============================================================================
18// MESSAGES
19// ============================================================================
20
21#[derive(Debug, Clone)]
22pub enum AlertMessage {
23    /// Send a heartbeat to the backend API
24    /// Returns ws_url in response for ConfigManager to connect WebSocket
25    Heartbeat,
26
27    // Future alert types:
28    // Email { to: String, subject: String, body: String },
29    // Slack { channel: String, message: String },
30    // PagerDuty { severity: String, summary: String },
31}
32
33// ============================================================================
34// WORKER STATE
35// ============================================================================
36
37pub struct AlertsState {
38    api_url: String,
39    identity: std::sync::Arc<crate::AgentIdentity>,
40    workspace_id: Option<String>,
41    claim_token: Option<String>,
42    storage_path: std::path::PathBuf,
43    heartbeat_count: u64,
44    http_client: reqwest_middleware::ClientWithMiddleware,
45}
46
47impl AlertsState {
48    /// Attempt to register the agent with the backend.
49    /// Returns true if registration succeeded, false otherwise.
50    /// On failure, logs warnings but never fails - allows embedded agents to operate without cloud.
51    async fn try_register(&mut self) -> bool {
52        match heartbeat::handle_register(
53            &self.api_url,
54            &self.identity,
55            &self.workspace_id,
56            &self.claim_token,
57            &self.http_client,
58        )
59        .await
60        {
61            Ok(response) => {
62                tracing::info!(
63                    "Agent registered as '{}' (id: {}, is_new: {})",
64                    response.name,
65                    response.agent_id,
66                    response.is_new
67                );
68
69                // Update local state with server response
70                if let Some(ref new_token) = response.claim_token {
71                    self.claim_token = Some(new_token.clone());
72                }
73
74                // Persist to agent.toml
75                if let Err(e) = crate::identity::update_agent_toml_registration(
76                    &self.storage_path,
77                    Some(response.name.as_str()),
78                    response.claim_token.as_deref(),
79                ) {
80                    tracing::error!("Failed to update agent.toml: {}", e);
81                }
82
83                true
84            }
85            Err(e) => {
86                if self.claim_token.is_some() {
87                    tracing::warn!(
88                        "Registration failed: {} - continuing with existing claim_token",
89                        e
90                    );
91                } else {
92                    // No existing claim_token - agent will operate unclaimed until fixed
93                    // This allows embedded agents to function locally without cloud connectivity
94                    tracing::warn!(
95                        "Registration failed and no existing claim_token: {} - agent will remain unclaimed",
96                        e
97                    );
98                }
99                false
100            }
101        }
102    }
103}
104
105// ============================================================================
106// CONFIGURATION
107// ============================================================================
108//
109// Note: API URL is now defined in crate::constants module for centralized management
110
111// ============================================================================
112// WORKER ARGUMENTS
113// ============================================================================
114
115pub struct AlertsWorkerArgs {
116    pub api_url: String,
117    pub identity: std::sync::Arc<crate::AgentIdentity>,
118    pub workspace_id: Option<String>,
119    pub storage_path: std::path::PathBuf,
120}
121
122// ============================================================================
123// WORKER IMPLEMENTATION
124// ============================================================================
125
126pub struct AlertsWorker;
127
128#[async_trait]
129impl Worker for AlertsWorker {
130    type Key = ();
131    type Message = AlertMessage;
132    type State = AlertsState;
133    type Arguments = AlertsWorkerArgs;
134
135    async fn pre_start(
136        &self,
137        wid: WorkerId,
138        _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
139        args: AlertsWorkerArgs,
140    ) -> Result<AlertsState, ActorProcessingErr> {
141        tracing::debug!("AlertsWorker {} starting", wid);
142        tracing::debug!("  API URL: {}", args.api_url);
143        tracing::debug!("  PeerId: {}", args.identity.peer_id());
144
145        // Load claim token from storage (TOML or legacy file)
146        let claim_token = crate::identity::read_claim_token(&args.storage_path);
147        if claim_token.is_some() {
148            tracing::debug!("  Loaded claim_token from storage");
149        }
150
151        Ok(AlertsState {
152            api_url: args.api_url,
153            identity: args.identity,
154            workspace_id: args.workspace_id,
155            claim_token,
156            storage_path: args.storage_path,
157            heartbeat_count: 0,
158            http_client: crate::http::build_http_client(),
159        })
160    }
161
162    async fn post_start(
163        &self,
164        wid: WorkerId,
165        _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
166        state: &mut AlertsState,
167    ) -> Result<(), ActorProcessingErr> {
168        // Always register on startup to ensure agent is known and claims are applied
169        state.try_register().await;
170        tracing::debug!("AlertsWorker {} ready", wid);
171        Ok(())
172    }
173
174    async fn handle(
175        &self,
176        _wid: WorkerId,
177        _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
178        Job { msg, key, .. }: Job<(), AlertMessage>,
179        state: &mut AlertsState,
180    ) -> Result<(), ActorProcessingErr> {
181        // Route message to specific handler
182        match msg {
183            AlertMessage::Heartbeat => {
184                // Heartbeat-first design: try heartbeat, if 404 → register → heartbeat
185                match heartbeat::handle_heartbeat(
186                    &state.api_url,
187                    &state.identity,
188                    &state.http_client,
189                    &mut state.heartbeat_count,
190                )
191                .await
192                {
193                    Ok(()) => {
194                        // Success - heartbeat handler already sent CheckTier to Config
195                    }
196                    Err(e) if e.to_string().contains("404") => {
197                        // Not registered - register now
198                        tracing::info!("Heartbeat 404 - re-registering agent");
199
200                        if state.try_register().await {
201                            // Immediately send heartbeat to trigger config fetch
202                            // (heartbeat response has tier_id → CheckTier → FetchConfig)
203                            if let Err(e) = heartbeat::handle_heartbeat(
204                                &state.api_url,
205                                &state.identity,
206                                &state.http_client,
207                                &mut state.heartbeat_count,
208                            )
209                            .await
210                            {
211                                tracing::warn!("Post-registration heartbeat failed: {}", e);
212                            }
213                        }
214                    }
215                    Err(e) => {
216                        // Other error (network, 5xx) - retry middleware handles retries
217                        tracing::warn!("Heartbeat failed: {}", e);
218                    }
219                }
220            }
221            // Future alert types will be routed here
222        }
223
224        Ok(key)
225    }
226
227    async fn post_stop(
228        &self,
229        wid: WorkerId,
230        _factory: &ActorRef<FactoryMessage<(), AlertMessage>>,
231        state: &mut AlertsState,
232    ) -> Result<(), ActorProcessingErr> {
233        tracing::debug!(
234            "AlertsWorker {} stopped (sent {} heartbeats)",
235            wid,
236            state.heartbeat_count
237        );
238        Ok(())
239    }
240}
241
242// ============================================================================
243// WORKER BUILDER
244// ============================================================================
245
246pub struct AlertsWorkerBuilder {
247    identity: std::sync::Arc<crate::AgentIdentity>,
248    workspace_id: Option<String>,
249    api_url: Option<String>,
250    storage_path: std::path::PathBuf,
251}
252
253impl WorkerBuilder<AlertsWorker, AlertsWorkerArgs> for AlertsWorkerBuilder {
254    fn build(&mut self, _wid: WorkerId) -> (AlertsWorker, AlertsWorkerArgs) {
255        (
256            AlertsWorker,
257            AlertsWorkerArgs {
258                // Use custom api_url if provided, otherwise fall back to default
259                api_url: self.api_url.clone().unwrap_or_else(crate::constants::api_url),
260                identity: self.identity.clone(),
261                workspace_id: self.workspace_id.clone(),
262                storage_path: self.storage_path.clone(),
263            },
264        )
265    }
266}
267
268// ============================================================================
269// FACTORY LIFECYCLE HOOKS
270// ============================================================================
271
272pub struct AlertsFactoryHooks;
273
274#[async_trait]
275impl FactoryLifecycleHooks<(), AlertMessage> for AlertsFactoryHooks {
276    async fn on_factory_started(
277        &self,
278        _factory: ActorRef<FactoryMessage<(), AlertMessage>>,
279    ) -> Result<(), ActorProcessingErr> {
280        tracing::debug!("Alerts started");
281        Ok(())
282    }
283
284    async fn on_factory_stopped(&self) -> Result<(), ActorProcessingErr> {
285        tracing::info!("Alerts stopped");
286        Ok(())
287    }
288
289    async fn on_factory_draining(
290        &self,
291        _factory: ActorRef<FactoryMessage<(), AlertMessage>>,
292    ) -> Result<(), ActorProcessingErr> {
293        tracing::debug!("AlertsFactory draining");
294        Ok(())
295    }
296}
297
298// ============================================================================
299// FACTORY SPAWN HELPER
300// ============================================================================
301
302/// Spawn the Alerts factory with a single worker
303///
304/// Returns a factory ActorRef and its join handle.
305pub async fn spawn_alerts_factory(
306    identity: std::sync::Arc<crate::AgentIdentity>,
307    workspace_id: Option<String>,
308    api_url: Option<String>,
309    storage_path: std::path::PathBuf,
310) -> Result<(ActorRef<FactoryMessage<(), AlertMessage>>, JoinHandle<()>), SpawnErr> {
311    tracing::debug!("Spawning AlertsFactory...");
312
313    let factory_def = Factory::<
314        (),
315        AlertMessage,
316        AlertsWorkerArgs,
317        AlertsWorker,
318        routing::QueuerRouting<(), AlertMessage>,
319        queues::DefaultQueue<(), AlertMessage>,
320    >::default();
321
322    let factory_args = FactoryArguments::builder()
323        .worker_builder(Box::new(AlertsWorkerBuilder {
324            identity,
325            workspace_id,
326            api_url,
327            storage_path,
328        }))
329        .queue(Default::default())
330        .router(Default::default())
331        .num_initial_workers(1)
332        .lifecycle_hooks(Box::new(AlertsFactoryHooks))
333        .build();
334
335    let (factory, factory_handle) = Actor::spawn(
336        Some("AlertsFactory".to_string()),
337        factory_def,
338        factory_args,
339    )
340    .await
341    .map_err(|e| SpawnErr::StartupFailed(Box::new(e)))?;
342
343    tracing::debug!("\u{2713} AlertsFactory spawned successfully (1 worker)");
344
345    Ok((factory, factory_handle))
346}