Skip to main content

syspulse_core/
manager.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use chrono::Utc;
7use tokio::sync::{broadcast, Mutex, RwLock};
8use tracing::{error, info, warn};
9
10use crate::daemon::{DaemonInstance, DaemonSpec, HealthStatus};
11use crate::error::{Result, SyspulseError};
12use crate::ipc::protocol::{Request, Response};
13use crate::ipc::server::IpcServer;
14use crate::lifecycle::LifecycleState;
15use crate::logs::LogManager;
16use crate::paths;
17use crate::process::{self, ProcessDriver};
18use crate::registry::Registry;
19use crate::restart::RestartEvaluator;
20use crate::scheduler::Scheduler;
21
22pub struct DaemonManager {
23    registry: Arc<Mutex<Registry>>,
24    process_driver: Arc<dyn ProcessDriver>,
25    log_manager: Arc<LogManager>,
26    instances: Arc<RwLock<HashMap<String, DaemonInstance>>>,
27    health_handles: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
28    shutdown_tx: broadcast::Sender<()>,
29}
30
31impl DaemonManager {
32    /// Create a new DaemonManager. If `data_dir` is None, uses the default.
33    pub fn new(data_dir: Option<PathBuf>) -> Result<Self> {
34        let data = data_dir.unwrap_or_else(paths::data_dir);
35        paths::ensure_dirs()?;
36
37        let db_path = data.join("syspulse.db");
38        let registry = Registry::new(&db_path)?;
39        let process_driver = process::create_driver();
40        let log_manager = LogManager::new(&data);
41        let (shutdown_tx, _) = broadcast::channel(16);
42
43        // Load existing instance states from the registry.
44        let saved_states = registry.list_states().unwrap_or_default();
45        let mut instances = HashMap::new();
46        for inst in saved_states {
47            instances.insert(inst.spec_name.clone(), inst);
48        }
49
50        Ok(Self {
51            registry: Arc::new(Mutex::new(registry)),
52            process_driver: Arc::from(process_driver),
53            log_manager: Arc::new(log_manager),
54            instances: Arc::new(RwLock::new(instances)),
55            health_handles: Arc::new(Mutex::new(HashMap::new())),
56            shutdown_tx,
57        })
58    }
59
60    /// Start a daemon by name.
61    pub async fn start_daemon(&self, name: &str) -> Result<DaemonInstance> {
62        let spec = {
63            let reg = self.registry.lock().await;
64            reg.get_spec(name)?
65        };
66
67        // Get or create the instance.
68        let mut instances = self.instances.write().await;
69        let instance = instances
70            .entry(name.to_string())
71            .or_insert_with(|| DaemonInstance::new(name));
72
73        // Validate state transition.
74        let new_state = instance.state.transition_to(LifecycleState::Starting)?;
75        instance.state = new_state;
76
77        // Set up log files.
78        let (stdout_path, stderr_path) = self.log_manager.setup_log_files(name)?;
79        instance.stdout_log = Some(stdout_path.clone());
80        instance.stderr_log = Some(stderr_path.clone());
81
82        // Spawn the process.
83        let proc_info = self
84            .process_driver
85            .spawn(&spec, &stdout_path, &stderr_path)
86            .await?;
87
88        instance.pid = Some(proc_info.pid);
89        instance.started_at = Some(Utc::now());
90        instance.stopped_at = None;
91        instance.exit_code = None;
92        instance.state = instance.state.transition_to(LifecycleState::Running)?;
93
94        if spec.health_check.is_some() {
95            instance.health_status = HealthStatus::Unknown;
96        } else {
97            instance.health_status = HealthStatus::NotConfigured;
98        }
99
100        // Persist state.
101        {
102            let reg = self.registry.lock().await;
103            reg.update_state(instance)?;
104        }
105
106        let result = instance.clone();
107
108        // Drop the write lock before spawning health check.
109        drop(instances);
110
111        // Start health check background task if configured.
112        if let Some(ref health_spec) = spec.health_check {
113            let daemon_name = name.to_string();
114            let shutdown_rx = self.shutdown_tx.subscribe();
115            let instances = Arc::clone(&self.instances);
116            let registry = Arc::clone(&self.registry);
117            let health_spec: crate::daemon::HealthCheckSpec = health_spec.clone();
118
119            let handle = tokio::spawn(async move {
120                Self::run_health_check(instances, registry, daemon_name, health_spec, shutdown_rx)
121                    .await;
122            });
123
124            let mut handles = self.health_handles.lock().await;
125            handles.insert(name.to_string(), handle);
126        }
127
128        info!(
129            "Started daemon '{}' with PID {}",
130            name,
131            result.pid.unwrap_or(0)
132        );
133        Ok(result)
134    }
135
136    /// Stop a running daemon.
137    pub async fn stop_daemon(&self, name: &str, force: bool) -> Result<DaemonInstance> {
138        let mut instances = self.instances.write().await;
139        let instance = instances
140            .get_mut(name)
141            .ok_or_else(|| SyspulseError::DaemonNotFound(name.to_string()))?;
142
143        if !instance.state.is_active() {
144            return Err(SyspulseError::InvalidStateTransition {
145                from: format!("{:?}", instance.state),
146                to: "Stopping".to_string(),
147            });
148        }
149
150        instance.state = instance.state.transition_to(LifecycleState::Stopping)?;
151
152        // Cancel health check task.
153        {
154            let mut handles = self.health_handles.lock().await;
155            if let Some(handle) = handles.remove(name) {
156                handle.abort();
157            }
158        }
159
160        // Stop the process.
161        if let Some(pid) = instance.pid {
162            if force {
163                self.process_driver.kill(pid).await?;
164            } else {
165                let timeout = {
166                    let reg = self.registry.lock().await;
167                    reg.get_spec(name)
168                        .map(|s| s.stop_timeout_secs)
169                        .unwrap_or(30)
170                };
171                self.process_driver.stop(pid, timeout).await?;
172            }
173            // Try to get exit code.
174            let exit_code = self.process_driver.wait(pid).await.ok().flatten();
175            instance.exit_code = exit_code;
176        }
177
178        instance.state = instance.state.transition_to(LifecycleState::Stopped)?;
179        instance.stopped_at = Some(Utc::now());
180        instance.pid = None;
181        instance.health_status = HealthStatus::Unknown;
182
183        // Persist state.
184        {
185            let reg = self.registry.lock().await;
186            reg.update_state(instance)?;
187        }
188
189        info!("Stopped daemon '{}'", name);
190        Ok(instance.clone())
191    }
192
193    /// Restart a daemon (stop then start).
194    pub async fn restart_daemon(&self, name: &str, force: bool) -> Result<DaemonInstance> {
195        // Only stop if active.
196        {
197            let instances = self.instances.read().await;
198            if let Some(inst) = instances.get(name) {
199                if inst.state.is_active() {
200                    drop(instances);
201                    self.stop_daemon(name, force).await?;
202                }
203            }
204        }
205        self.start_daemon(name).await
206    }
207
208    /// Get the current status of a daemon.
209    pub async fn status(&self, name: &str) -> Result<DaemonInstance> {
210        let instances = self.instances.read().await;
211        instances
212            .get(name)
213            .cloned()
214            .ok_or_else(|| SyspulseError::DaemonNotFound(name.to_string()))
215    }
216
217    /// List all daemon instances.
218    pub async fn list(&self) -> Result<Vec<DaemonInstance>> {
219        let instances = self.instances.read().await;
220        Ok(instances.values().cloned().collect())
221    }
222
223    /// Register a new daemon spec.
224    pub async fn add_daemon(&self, spec: DaemonSpec) -> Result<()> {
225        let name = spec.name.clone();
226
227        {
228            let reg = self.registry.lock().await;
229            reg.register(&spec)?;
230        }
231
232        // Initialize instance in Stopped state (or Scheduled if it has a cron).
233        let mut instance = DaemonInstance::new(&name);
234        if spec.schedule.is_some() {
235            instance.state = LifecycleState::Scheduled;
236        }
237
238        let mut instances = self.instances.write().await;
239        instances.insert(name.clone(), instance);
240
241        info!("Added daemon '{}'", name);
242        Ok(())
243    }
244
245    /// Remove a daemon. If `force` is true, stop it first if running.
246    pub async fn remove_daemon(&self, name: &str, force: bool) -> Result<()> {
247        // Check if running.
248        {
249            let instances = self.instances.read().await;
250            if let Some(inst) = instances.get(name) {
251                if inst.state.is_active() {
252                    if !force {
253                        return Err(SyspulseError::Process(format!(
254                            "Daemon '{}' is still running. Use force to stop and remove.",
255                            name
256                        )));
257                    }
258                }
259            }
260        }
261
262        // Stop if active and force.
263        if force {
264            let instances = self.instances.read().await;
265            let is_active = instances
266                .get(name)
267                .map(|i| i.state.is_active())
268                .unwrap_or(false);
269            drop(instances);
270            if is_active {
271                self.stop_daemon(name, true).await?;
272            }
273        }
274
275        // Unregister.
276        {
277            let reg = self.registry.lock().await;
278            reg.unregister(name)?;
279        }
280
281        // Remove from in-memory map.
282        {
283            let mut instances = self.instances.write().await;
284            instances.remove(name);
285        }
286
287        // Cancel any health check.
288        {
289            let mut handles = self.health_handles.lock().await;
290            if let Some(handle) = handles.remove(name) {
291                handle.abort();
292            }
293        }
294
295        info!("Removed daemon '{}'", name);
296        Ok(())
297    }
298
299    /// Read logs for a daemon.
300    pub async fn get_logs(&self, name: &str, lines: usize, stderr: bool) -> Result<Vec<String>> {
301        // Verify the daemon exists.
302        {
303            let instances = self.instances.read().await;
304            if !instances.contains_key(name) {
305                return Err(SyspulseError::DaemonNotFound(name.to_string()));
306            }
307        }
308        self.log_manager.read_logs(name, lines, stderr)
309    }
310
311    /// Dispatch an IPC request to the appropriate method and return a response.
312    pub async fn handle_request(self: &Arc<Self>, request: Request) -> Response {
313        match request {
314            Request::Start { name, .. } => match self.start_daemon(&name).await {
315                Ok(inst) => Response::Ok {
316                    message: format!("Daemon '{}' started (PID {})", name, inst.pid.unwrap_or(0)),
317                },
318                Err(e) => error_response(e),
319            },
320            Request::Stop { name, force, .. } => match self.stop_daemon(&name, force).await {
321                Ok(_) => Response::Ok {
322                    message: format!("Daemon '{}' stopped", name),
323                },
324                Err(e) => error_response(e),
325            },
326            Request::Restart { name, force, .. } => match self.restart_daemon(&name, force).await {
327                Ok(inst) => Response::Ok {
328                    message: format!(
329                        "Daemon '{}' restarted (PID {})",
330                        name,
331                        inst.pid.unwrap_or(0)
332                    ),
333                },
334                Err(e) => error_response(e),
335            },
336            Request::Status { name } => match name {
337                Some(name) => match self.status(&name).await {
338                    Ok(instance) => Response::Status { instance },
339                    Err(e) => error_response(e),
340                },
341                None => match self.list().await {
342                    Ok(instances) => Response::List { instances },
343                    Err(e) => error_response(e),
344                },
345            },
346            Request::List => match self.list().await {
347                Ok(instances) => Response::List { instances },
348                Err(e) => error_response(e),
349            },
350            Request::Logs {
351                name,
352                lines,
353                stderr,
354            } => match self.get_logs(&name, lines, stderr).await {
355                Ok(log_lines) => Response::Logs { lines: log_lines },
356                Err(e) => error_response(e),
357            },
358            Request::Add { spec } => match self.add_daemon(spec).await {
359                Ok(()) => Response::Ok {
360                    message: "Daemon added".to_string(),
361                },
362                Err(e) => error_response(e),
363            },
364            Request::Remove { name, force } => match self.remove_daemon(&name, force).await {
365                Ok(()) => Response::Ok {
366                    message: format!("Daemon '{}' removed", name),
367                },
368                Err(e) => error_response(e),
369            },
370            Request::Shutdown => {
371                info!("Shutdown requested via IPC");
372                // The actual shutdown is triggered by the caller seeing this response.
373                // We signal the broadcast channel.
374                let _ = self.shutdown_tx.send(());
375                Response::Ok {
376                    message: "Shutting down".to_string(),
377                }
378            }
379            Request::Ping => Response::Pong,
380        }
381    }
382
383    /// Main entry point for the daemon manager. Called by `syspulse daemon`.
384    pub async fn run(self: Arc<Self>) -> Result<()> {
385        info!("Starting syspulse daemon manager");
386
387        // Write PID file.
388        let pid_path = paths::pid_path();
389        std::fs::write(&pid_path, std::process::id().to_string())?;
390
391        // Restore daemons that were Running before a crash/restart.
392        self.restore_running_daemons().await;
393
394        // Set up cron scheduler for scheduled daemons.
395        let mut scheduler = Scheduler::new().await?;
396        self.setup_scheduled_daemons(&mut scheduler).await?;
397        scheduler.start().await?;
398
399        // Start the IPC server.
400        let socket_path = paths::socket_path();
401        let ipc_server = IpcServer::new(socket_path);
402        let shutdown_rx_ipc = self.shutdown_tx.subscribe();
403
404        let manager_for_ipc = Arc::clone(&self);
405        let ipc_handle = tokio::spawn(async move {
406            let handler = Arc::new(move |req: Request| {
407                let mgr = Arc::clone(&manager_for_ipc);
408                async move { mgr.handle_request(req).await }
409            });
410            if let Err(e) = ipc_server.run(handler, shutdown_rx_ipc).await {
411                error!("IPC server error: {}", e);
412            }
413        });
414
415        // Start the process monitor background task.
416        let manager_for_monitor = Arc::clone(&self);
417        let shutdown_rx_monitor = self.shutdown_tx.subscribe();
418        let monitor_handle = tokio::spawn(async move {
419            Self::monitor_processes(manager_for_monitor, shutdown_rx_monitor).await;
420        });
421
422        // Wait for shutdown signal (Ctrl+C / SIGTERM).
423        let shutdown_tx = self.shutdown_tx.clone();
424        tokio::select! {
425            _ = tokio::signal::ctrl_c() => {
426                info!("Received Ctrl+C, initiating shutdown");
427                let _ = shutdown_tx.send(());
428            }
429            _ = self.wait_for_shutdown() => {
430                info!("Shutdown signal received");
431            }
432        }
433
434        // Graceful shutdown: stop all running daemons.
435        info!("Stopping all running daemons...");
436        self.stop_all_daemons().await;
437
438        // Shut down scheduler.
439        scheduler.shutdown().await.ok();
440
441        // Wait for background tasks to finish.
442        ipc_handle.abort();
443        monitor_handle.abort();
444        let _ = tokio::join!(ipc_handle, monitor_handle);
445
446        // Clean up PID file.
447        std::fs::remove_file(&pid_path).ok();
448
449        info!("Daemon manager shut down cleanly");
450        Ok(())
451    }
452
453    /// Wait until a shutdown signal is received on the broadcast channel.
454    async fn wait_for_shutdown(&self) {
455        let mut rx = self.shutdown_tx.subscribe();
456        let _ = rx.recv().await;
457    }
458
459    /// Attempt to restore daemons that were in Running state when we last shut down.
460    async fn restore_running_daemons(&self) {
461        let instances = self.instances.read().await;
462        let to_restart: Vec<String> = instances
463            .iter()
464            .filter(|(_, inst)| inst.state == LifecycleState::Running)
465            .map(|(name, _)| name.clone())
466            .collect();
467        drop(instances);
468
469        for name in to_restart {
470            // First mark as stopped (the old process is gone), then start fresh.
471            {
472                let mut instances = self.instances.write().await;
473                if let Some(inst) = instances.get_mut(&name) {
474                    inst.state = LifecycleState::Stopped;
475                    inst.pid = None;
476                }
477            }
478            info!("Restoring previously running daemon '{}'", name);
479            if let Err(e) = self.start_daemon(&name).await {
480                error!("Failed to restore daemon '{}': {}", name, e);
481            }
482        }
483    }
484
485    /// Set up cron schedules for all daemons that have a schedule field.
486    async fn setup_scheduled_daemons(&self, scheduler: &mut Scheduler) -> Result<()> {
487        let specs = {
488            let reg = self.registry.lock().await;
489            reg.list_specs().unwrap_or_default()
490        };
491
492        for spec in specs {
493            if let Some(ref cron_expr) = spec.schedule {
494                let manager = Arc::new({
495                    // We need a reference to self for the callback, but we're behind Arc<Self>
496                    // in run(). The callback captures the instances/registry/etc. via clones.
497                    let instances = Arc::clone(&self.instances);
498                    let registry = Arc::clone(&self.registry);
499                    let process_driver = Arc::clone(&self.process_driver);
500                    let log_manager = Arc::clone(&self.log_manager);
501                    let shutdown_tx = self.shutdown_tx.clone();
502                    let health_handles = Arc::clone(&self.health_handles);
503                    ManagerComponents {
504                        instances,
505                        registry,
506                        process_driver,
507                        log_manager,
508                        shutdown_tx,
509                        health_handles,
510                    }
511                });
512
513                scheduler
514                    .schedule_daemon(&spec.name, cron_expr, move |name| {
515                        let mgr = Arc::clone(&manager);
516                        async move {
517                            info!("Cron trigger: starting daemon '{}'", name);
518                            if let Err(e) = cron_start_daemon(&mgr, &name).await {
519                                error!("Cron failed to start '{}': {}", name, e);
520                            }
521                        }
522                    })
523                    .await?;
524            }
525        }
526
527        Ok(())
528    }
529
530    /// Stop all currently running daemons (used during shutdown).
531    async fn stop_all_daemons(&self) {
532        let names: Vec<String> = {
533            let instances = self.instances.read().await;
534            instances
535                .iter()
536                .filter(|(_, inst)| inst.state.is_active())
537                .map(|(name, _)| name.clone())
538                .collect()
539        };
540
541        for name in names {
542            if let Err(e) = self.stop_daemon(&name, false).await {
543                warn!("Failed to stop daemon '{}' during shutdown: {}", name, e);
544                // Try force kill.
545                if let Err(e2) = self.stop_daemon(&name, true).await {
546                    error!("Failed to force-stop daemon '{}': {}", name, e2);
547                }
548            }
549        }
550    }
551
552    /// Background task: monitors running processes, detects unexpected exits,
553    /// and handles restart policies.
554    async fn monitor_processes(
555        manager: Arc<DaemonManager>,
556        mut shutdown_rx: broadcast::Receiver<()>,
557    ) {
558        let mut interval = tokio::time::interval(Duration::from_secs(1));
559
560        loop {
561            tokio::select! {
562                _ = interval.tick() => {}
563                _ = shutdown_rx.recv() => {
564                    info!("Process monitor shutting down");
565                    break;
566                }
567            }
568
569            // Collect names of daemons in Running state.
570            let running: Vec<(String, u32)> = {
571                let instances = manager.instances.read().await;
572                instances
573                    .iter()
574                    .filter_map(|(name, inst)| {
575                        if inst.state == LifecycleState::Running {
576                            inst.pid.map(|pid| (name.clone(), pid))
577                        } else {
578                            None
579                        }
580                    })
581                    .collect()
582            };
583
584            for (name, pid) in running {
585                let alive = manager.process_driver.is_alive(pid).await;
586                if alive {
587                    continue;
588                }
589
590                // Process has exited unexpectedly.
591                warn!("Daemon '{}' (PID {}) has exited unexpectedly", name, pid);
592
593                let exit_code = manager.process_driver.wait(pid).await.ok().flatten();
594
595                // Update instance state.
596                let (should_restart, backoff) = {
597                    let mut instances = manager.instances.write().await;
598                    if let Some(inst) = instances.get_mut(&name) {
599                        inst.state = LifecycleState::Failed;
600                        inst.pid = None;
601                        inst.exit_code = exit_code;
602                        inst.stopped_at = Some(Utc::now());
603                        inst.health_status = HealthStatus::Unknown;
604
605                        // Persist the failed state.
606                        if let Ok(reg) = manager.registry.try_lock() {
607                            reg.update_state(inst).ok();
608                        }
609
610                        // Check restart policy.
611                        let spec = {
612                            if let Ok(reg) = manager.registry.try_lock() {
613                                reg.get_spec(&name).ok()
614                            } else {
615                                None
616                            }
617                        };
618
619                        if let Some(spec) = spec {
620                            let should = RestartEvaluator::should_restart(
621                                &spec.restart_policy,
622                                exit_code,
623                                inst.restart_count,
624                            );
625                            let backoff = RestartEvaluator::backoff_duration(
626                                &spec.restart_policy,
627                                inst.restart_count,
628                            );
629                            inst.restart_count += 1;
630                            (should, backoff)
631                        } else {
632                            (false, Duration::ZERO)
633                        }
634                    } else {
635                        (false, Duration::ZERO)
636                    }
637                };
638
639                // Cancel health check.
640                {
641                    let mut handles = manager.health_handles.lock().await;
642                    if let Some(handle) = handles.remove(&name) {
643                        handle.abort();
644                    }
645                }
646
647                if should_restart {
648                    info!("Restarting daemon '{}' after {:?} backoff", name, backoff);
649
650                    let mgr = Arc::clone(&manager);
651                    let daemon_name = name.clone();
652                    tokio::spawn(async move {
653                        tokio::time::sleep(backoff).await;
654                        // Reset state to Stopped so we can transition to Starting.
655                        {
656                            let mut instances = mgr.instances.write().await;
657                            if let Some(inst) = instances.get_mut(&daemon_name) {
658                                inst.state = LifecycleState::Stopped;
659                            }
660                        }
661                        if let Err(e) = mgr.start_daemon(&daemon_name).await {
662                            error!("Failed to restart daemon '{}': {}", daemon_name, e);
663                        }
664                    });
665                }
666            }
667        }
668    }
669
670    /// Background task: runs periodic health checks for a daemon.
671    async fn run_health_check(
672        instances: Arc<RwLock<HashMap<String, DaemonInstance>>>,
673        registry: Arc<Mutex<Registry>>,
674        daemon_name: String,
675        health_spec: crate::daemon::HealthCheckSpec,
676        mut shutdown_rx: broadcast::Receiver<()>,
677    ) {
678        use crate::health;
679
680        // Wait for the start period before beginning checks.
681        if health_spec.start_period_secs > 0 {
682            tokio::select! {
683                _ = tokio::time::sleep(Duration::from_secs(health_spec.start_period_secs)) => {}
684                _ = shutdown_rx.recv() => return,
685            }
686        }
687
688        let checker = health::create_checker(health_spec.clone());
689        let interval = Duration::from_secs(health_spec.interval_secs);
690        let max_failures = health_spec.retries;
691        let mut consecutive_failures: u32 = 0;
692
693        loop {
694            tokio::select! {
695                _ = tokio::time::sleep(interval) => {}
696                _ = shutdown_rx.recv() => break,
697            }
698
699            let result = checker.check().await;
700            let status = match result {
701                Ok(s) => s,
702                Err(e) => {
703                    warn!("Health check error for '{}': {}", daemon_name, e);
704                    HealthStatus::Unhealthy
705                }
706            };
707
708            match status {
709                HealthStatus::Healthy => {
710                    consecutive_failures = 0;
711                    let mut insts = instances.write().await;
712                    if let Some(inst) = insts.get_mut(&daemon_name) {
713                        if inst.state == LifecycleState::Running {
714                            inst.health_status = HealthStatus::Healthy;
715                        }
716                    }
717                }
718                HealthStatus::Unhealthy => {
719                    consecutive_failures += 1;
720                    if consecutive_failures >= max_failures {
721                        warn!(
722                            "Daemon '{}' is unhealthy after {} consecutive failures",
723                            daemon_name, consecutive_failures
724                        );
725                        let mut insts = instances.write().await;
726                        if let Some(inst) = insts.get_mut(&daemon_name) {
727                            inst.health_status = HealthStatus::Unhealthy;
728                            if let Ok(reg) = registry.try_lock() {
729                                reg.update_state(inst).ok();
730                            }
731                        }
732                    }
733                }
734                _ => {}
735            }
736        }
737    }
738}
739
740/// Internal helper struct for passing manager components into cron callbacks
741/// without requiring Arc<DaemonManager>.
742struct ManagerComponents {
743    instances: Arc<RwLock<HashMap<String, DaemonInstance>>>,
744    registry: Arc<Mutex<Registry>>,
745    process_driver: Arc<dyn ProcessDriver>,
746    log_manager: Arc<LogManager>,
747    shutdown_tx: broadcast::Sender<()>,
748    health_handles: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
749}
750
751/// Start a daemon using raw components (for cron callbacks).
752async fn cron_start_daemon(components: &ManagerComponents, name: &str) -> Result<DaemonInstance> {
753    let spec = {
754        let reg = components.registry.lock().await;
755        reg.get_spec(name)?
756    };
757
758    let mut instances = components.instances.write().await;
759    let instance = instances
760        .entry(name.to_string())
761        .or_insert_with(|| DaemonInstance::new(name));
762
763    // For scheduled daemons, allow Scheduled -> Starting or Stopped -> Starting.
764    if instance.state == LifecycleState::Scheduled || instance.state == LifecycleState::Stopped {
765        instance.state = instance.state.transition_to(LifecycleState::Starting)?;
766    } else if instance.state == LifecycleState::Running {
767        // Already running, nothing to do.
768        return Ok(instance.clone());
769    } else {
770        return Err(SyspulseError::InvalidStateTransition {
771            from: format!("{:?}", instance.state),
772            to: "Starting".to_string(),
773        });
774    }
775
776    let (stdout_path, stderr_path) = components.log_manager.setup_log_files(name)?;
777    instance.stdout_log = Some(stdout_path.clone());
778    instance.stderr_log = Some(stderr_path.clone());
779
780    let proc_info = components
781        .process_driver
782        .spawn(&spec, &stdout_path, &stderr_path)
783        .await?;
784
785    instance.pid = Some(proc_info.pid);
786    instance.started_at = Some(Utc::now());
787    instance.stopped_at = None;
788    instance.exit_code = None;
789    instance.state = instance.state.transition_to(LifecycleState::Running)?;
790
791    if spec.health_check.is_some() {
792        instance.health_status = HealthStatus::Unknown;
793    } else {
794        instance.health_status = HealthStatus::NotConfigured;
795    }
796
797    {
798        let reg = components.registry.lock().await;
799        reg.update_state(instance)?;
800    }
801
802    let result = instance.clone();
803    drop(instances);
804
805    // Start health check if configured.
806    if let Some(ref health_spec) = spec.health_check {
807        let daemon_name = name.to_string();
808        let shutdown_rx = components.shutdown_tx.subscribe();
809        let insts = Arc::clone(&components.instances);
810        let registry = Arc::clone(&components.registry);
811        let hs: crate::daemon::HealthCheckSpec = health_spec.clone();
812
813        let handle = tokio::spawn(async move {
814            DaemonManager::run_health_check(insts, registry, daemon_name, hs, shutdown_rx).await;
815        });
816
817        let mut handles = components.health_handles.lock().await;
818        handles.insert(name.to_string(), handle);
819    }
820
821    info!(
822        "Cron-started daemon '{}' with PID {}",
823        name,
824        result.pid.unwrap_or(0)
825    );
826    Ok(result)
827}
828
829fn error_response(e: SyspulseError) -> Response {
830    let code = match &e {
831        SyspulseError::DaemonNotFound(_) => 404,
832        SyspulseError::DaemonAlreadyExists(_) => 409,
833        SyspulseError::InvalidStateTransition { .. } => 409,
834        SyspulseError::Process(_) => 500,
835        SyspulseError::HealthCheck(_) => 500,
836        SyspulseError::Ipc(_) => 500,
837        SyspulseError::Registry(_) => 500,
838        SyspulseError::Config(_) => 400,
839        SyspulseError::Scheduler(_) => 500,
840        SyspulseError::Io(_) => 500,
841        SyspulseError::Serialization(_) => 400,
842        SyspulseError::Database(_) => 500,
843        SyspulseError::Timeout(_) => 504,
844    };
845    Response::Error {
846        code,
847        message: e.to_string(),
848    }
849}