Skip to main content

proc_daemon/
daemon.rs

1//! Core daemon implementation with builder pattern.
2//!
3//! This module provides the main `Daemon` struct and `DaemonBuilder` for creating
4//! high-performance, resilient daemon services. The builder pattern allows for
5//! flexible configuration while maintaining zero-copy performance characteristics.
6
7use std::future::Future;
8use std::io::{self, Write};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::Instant;
12use tracing::{debug, error, info, instrument, warn};
13
14use crate::config::Config;
15use crate::error::{Error, Result};
16use crate::shutdown::{ShutdownCoordinator, ShutdownReason};
17use crate::signal::{ConfigurableSignalHandler, SignalConfig, SignalHandler};
18use crate::subsystem::{Subsystem, SubsystemId, SubsystemManager};
19
20#[cfg(feature = "config-watch")]
21use arc_swap::ArcSwap;
22#[cfg(feature = "config-watch")]
23use notify::RecommendedWatcher;
24
25/// Type alias for subsystem registration function
26type SubsystemRegistrationFn = Box<dyn FnOnce(&SubsystemManager) -> SubsystemId + Send + 'static>;
27
28/// Main daemon instance that coordinates all subsystems and handles lifecycle.
29pub struct Daemon {
30    /// Configuration
31    config: Arc<Config>,
32    /// Live-updating configuration snapshot (when config-watch is enabled)
33    #[cfg(feature = "config-watch")]
34    config_shared: Arc<ArcSwap<Config>>,
35    /// Shutdown coordination
36    shutdown_coordinator: ShutdownCoordinator,
37    /// Subsystem management
38    subsystem_manager: SubsystemManager,
39    /// Signal handling
40    signal_handler: Option<SignalHandlerKind>,
41    /// Keep the config watcher alive (when enabled)
42    #[cfg(feature = "config-watch")]
43    _config_watcher: Option<RecommendedWatcher>,
44    /// Start time
45    started_at: Option<Instant>,
46}
47
48struct PidFileGuard {
49    path: PathBuf,
50}
51
52impl PidFileGuard {
53    fn create(path: &Path) -> Result<Self> {
54        let pid = std::process::id();
55        let mut file = std::fs::File::create(path).map_err(|e| {
56            Error::io_with_source(
57                format!("Failed to create PID file at {}", path.display()),
58                e,
59            )
60        })?;
61        writeln!(file, "{pid}").map_err(|e| {
62            Error::io_with_source(format!("Failed to write PID file at {}", path.display()), e)
63        })?;
64        Ok(Self {
65            path: path.to_path_buf(),
66        })
67    }
68}
69
70impl Drop for PidFileGuard {
71    fn drop(&mut self) {
72        let _ = std::fs::remove_file(&self.path);
73    }
74}
75
76struct RotatingFileWriterInner {
77    file: std::fs::File,
78    path: PathBuf,
79    max_size: u64,
80    max_files: u32,
81    size: u64,
82}
83
84#[derive(Clone)]
85struct RotatingFileWriter {
86    inner: Arc<std::sync::Mutex<RotatingFileWriterInner>>,
87}
88
89impl RotatingFileWriter {
90    fn new(path: PathBuf, max_size: Option<u64>, max_files: Option<u32>) -> io::Result<Self> {
91        let file = std::fs::OpenOptions::new()
92            .create(true)
93            .append(true)
94            .open(&path)?;
95        let size = file.metadata().map(|m| m.len()).unwrap_or(0);
96
97        let max_size = max_size.unwrap_or(u64::MAX);
98        let max_files = max_files.unwrap_or(0);
99
100        Ok(Self {
101            inner: Arc::new(std::sync::Mutex::new(RotatingFileWriterInner {
102                file,
103                path,
104                max_size,
105                max_files,
106                size,
107            })),
108        })
109    }
110
111    fn rotate_locked(inner: &mut RotatingFileWriterInner) -> io::Result<()> {
112        if inner.max_files == 0 {
113            return Ok(());
114        }
115
116        for idx in (1..=inner.max_files).rev() {
117            let from = Self::rotated_path(&inner.path, idx - 1);
118            let to = Self::rotated_path(&inner.path, idx);
119            if from.exists() {
120                let _ = std::fs::remove_file(&to);
121                std::fs::rename(&from, &to)?;
122            }
123        }
124        inner.file = std::fs::OpenOptions::new()
125            .create(true)
126            .append(true)
127            .open(&inner.path)?;
128        inner.size = 0;
129        Ok(())
130    }
131
132    fn rotated_path(path: &Path, idx: u32) -> PathBuf {
133        if idx == 0 {
134            return path.to_path_buf();
135        }
136        PathBuf::from(format!("{}.{}", path.display(), idx))
137    }
138}
139
140struct RotatingFileWriterGuard {
141    inner: Arc<std::sync::Mutex<RotatingFileWriterInner>>,
142}
143
144impl io::Write for RotatingFileWriterGuard {
145    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
146        let mut inner = self
147            .inner
148            .lock()
149            .map_err(|_| io::Error::other("rotating log writer mutex poisoned"))?;
150
151        if inner.size.saturating_add(buf.len() as u64) > inner.max_size {
152            RotatingFileWriter::rotate_locked(&mut inner)?;
153        }
154
155        let written = inner.file.write(buf)?;
156        inner.size = inner.size.saturating_add(written as u64);
157        drop(inner);
158        Ok(written)
159    }
160
161    fn flush(&mut self) -> io::Result<()> {
162        let mut inner = self
163            .inner
164            .lock()
165            .map_err(|_| io::Error::other("rotating log writer mutex poisoned"))?;
166        let result = inner.file.flush();
167        drop(inner);
168        result
169    }
170}
171
172impl<'a> tracing_subscriber::fmt::writer::MakeWriter<'a> for RotatingFileWriter {
173    type Writer = RotatingFileWriterGuard;
174
175    fn make_writer(&'a self) -> Self::Writer {
176        RotatingFileWriterGuard {
177            inner: Arc::clone(&self.inner),
178        }
179    }
180}
181
182#[derive(Clone)]
183enum SignalHandlerKind {
184    Default(Arc<SignalHandler>),
185    Configurable(Arc<ConfigurableSignalHandler>),
186}
187
188impl SignalHandlerKind {
189    #[allow(dead_code)]
190    async fn handle_signals(&self) -> Result<()> {
191        match self {
192            Self::Default(handler) => handler.handle_signals().await,
193            Self::Configurable(handler) => handler.handle_signals().await,
194        }
195    }
196
197    fn stop(&self) {
198        match self {
199            Self::Default(handler) => handler.stop(),
200            Self::Configurable(handler) => handler.stop(),
201        }
202    }
203}
204
205impl Daemon {
206    /// Create a new daemon builder with the provided configuration.
207    #[must_use]
208    pub fn builder(config: Config) -> DaemonBuilder {
209        DaemonBuilder::new(config)
210    }
211
212    /// Create a new daemon with default configuration.
213    ///
214    /// # Errors
215    ///
216    /// Will return an error if the default configuration is invalid.
217    pub fn with_defaults() -> Result<DaemonBuilder> {
218        let config = Config::new()?;
219        Ok(Self::builder(config))
220    }
221
222    /// Run the daemon until shutdown is requested.
223    /// This is the main entry point that starts all subsystems and waits for shutdown.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if logging initialization fails, configuration validation fails,
228    /// subsystem startup fails, or if there is an error during the shutdown sequence.
229    #[instrument(skip(self), fields(daemon_name = %self.config.name))]
230    pub async fn run(mut self) -> Result<()> {
231        info!(daemon_name = %self.config.name, "Starting daemon");
232        self.started_at = Some(Instant::now());
233
234        // Apply working directory before initializing logging or PID files
235        if let Some(work_dir) = &self.config.work_dir {
236            std::env::set_current_dir(work_dir).map_err(|e| {
237                Error::io_with_source(
238                    format!("Failed to set working directory to {}", work_dir.display()),
239                    e,
240                )
241            })?;
242        }
243
244        // Write PID file if configured
245        let _pid_guard = if let Some(pid_file) = &self.config.pid_file {
246            Some(PidFileGuard::create(pid_file)?)
247        } else {
248            None
249        };
250
251        // Initialize logging
252        self.init_logging()?;
253
254        // Apply optional scheduler hints (no-op placeholders for future tuning)
255        #[cfg(feature = "scheduler-hints")]
256        {
257            crate::scheduler::apply_process_hints(&self.config);
258            crate::scheduler::apply_runtime_hints();
259        }
260
261        // Start all subsystems
262        if let Err(e) = self.subsystem_manager.start_all().await {
263            error!(error = %e, "Failed to start all subsystems");
264            return Err(e);
265        }
266
267        // Start signal handling in the background
268        // Only spawn when a supported async runtime is enabled.
269        #[cfg(any(feature = "tokio", feature = "async-std"))]
270        let signal_task = self.signal_handler.as_ref().map(|signal_handler| {
271            let handler = signal_handler.clone();
272            Self::spawn_signal_handler(handler)
273        });
274
275        #[cfg(not(any(feature = "tokio", feature = "async-std")))]
276        let _signal_task: Option<()> = None;
277
278        // Wait for shutdown to be initiated
279        info!("Daemon started successfully, waiting for shutdown signal");
280
281        // Main daemon loop - wait for shutdown
282        loop {
283            if self.shutdown_coordinator.is_shutdown() {
284                break;
285            }
286
287            // Check subsystem health periodically
288            if self.config.monitoring.health_checks {
289                let health_results = self.subsystem_manager.run_health_checks();
290                // Early exit on first unhealthy subsystem to avoid allocations
291                let mut found_unhealthy = false;
292                for (id, name, healthy) in &health_results {
293                    if !healthy {
294                        if !found_unhealthy {
295                            warn!("Unhealthy subsystems detected:");
296                            found_unhealthy = true;
297                        }
298                        warn!(subsystem_id = id, subsystem_name = %name, "Unhealthy subsystem");
299                        // Could implement auto-restart logic here
300                    }
301                }
302            }
303
304            // Sleep for a short interval
305            #[cfg(feature = "tokio")]
306            tokio::time::sleep(self.config.health_check_interval()).await;
307
308            #[cfg(all(feature = "async-std", not(feature = "tokio")))]
309            async_std::task::sleep(self.config.health_check_interval()).await;
310
311            #[cfg(not(any(feature = "tokio", feature = "async-std")))]
312            std::thread::sleep(self.config.health_check_interval());
313        }
314
315        // Graceful shutdown sequence
316        info!("Shutdown initiated, beginning graceful shutdown");
317
318        // Stop signal handling
319        if let Some(signal_handler) = &self.signal_handler {
320            signal_handler.stop();
321        }
322
323        // Wait for signal handler task to complete
324        #[cfg(any(feature = "tokio", feature = "async-std"))]
325        if let Some(task) = signal_task {
326            #[cfg(feature = "tokio")]
327            {
328                if let Err(e) = task.await {
329                    warn!(error = %e, "Signal handler task failed");
330                }
331            }
332
333            #[cfg(all(feature = "async-std", not(feature = "tokio")))]
334            {
335                if let Err(e) = task.await {
336                    warn!(error = %e, "Signal handler task failed");
337                }
338            }
339        }
340
341        // Stop all subsystems
342        if let Err(e) = self.subsystem_manager.stop_all().await {
343            error!(error = %e, "Failed to stop all subsystems gracefully");
344        }
345
346        // Wait for graceful shutdown with timeout
347        if let Err(e) = self.shutdown_coordinator.wait_for_shutdown().await {
348            warn!(error = %e, "Graceful shutdown timeout exceeded");
349
350            // Wait for force shutdown timeout
351            if let Err(e) = self.shutdown_coordinator.wait_for_force_shutdown().await {
352                error!(error = %e, "Force shutdown timeout exceeded");
353            }
354
355            // Wait for kill shutdown timeout
356            if let Err(e) = self.shutdown_coordinator.wait_for_kill_shutdown().await {
357                error!(error = %e, "Kill shutdown timeout exceeded, exiting immediately");
358            }
359        }
360
361        let elapsed = self.started_at.map(|t| t.elapsed());
362        info!(uptime = ?elapsed, "Daemon shutdown complete");
363
364        Ok(())
365    }
366
367    /// Initialize the logging system based on configuration.
368    fn init_logging(&self) -> Result<()> {
369        use tracing_subscriber::fmt::format::FmtSpan;
370        use tracing_subscriber::fmt::writer::BoxMakeWriter;
371        use tracing_subscriber::{EnvFilter, FmtSubscriber};
372
373        let level: tracing::Level = self.config.logging.level.into();
374        let filter = EnvFilter::from_default_env().add_directive(level.into());
375
376        // Configure output writer
377        let writer = if let Some(path) = &self.config.logging.file {
378            Some(
379                RotatingFileWriter::new(
380                    path.clone(),
381                    self.config.logging.max_file_size,
382                    self.config.logging.max_files,
383                )
384                .map_err(|e| {
385                    Error::io_with_source(
386                        format!("Failed to initialize log file at {}", path.display()),
387                        e,
388                    )
389                })?,
390            )
391        } else {
392            None
393        };
394
395        let make_writer = |writer: &Option<RotatingFileWriter>| {
396            writer.as_ref().map_or_else(
397                || BoxMakeWriter::new(std::io::stdout),
398                |writer| BoxMakeWriter::new(writer.clone()),
399            )
400        };
401
402        // Configure output format
403        if self.config.is_json_logging() {
404            #[cfg(feature = "json-logs")]
405            {
406                let base_subscriber = FmtSubscriber::builder()
407                    .with_env_filter(filter)
408                    .with_span_events(FmtSpan::CLOSE)
409                    .with_target(true)
410                    .with_thread_ids(true)
411                    .with_thread_names(true)
412                    .with_writer(make_writer(&writer));
413
414                let json_subscriber = base_subscriber
415                    .json()
416                    .flatten_event(true)
417                    .with_current_span(false);
418
419                tracing::subscriber::set_global_default(json_subscriber.finish()).map_err(|e| {
420                    Error::config(format!("Failed to initialize JSON logging: {e}"))
421                })?;
422
423                return Ok(()); // Return early as logging is initialized
424            }
425
426            #[cfg(not(feature = "json-logs"))]
427            {
428                return Err(Error::config(
429                    "JSON logging requested but feature not enabled",
430                ));
431            }
432        }
433
434        // Regular non-JSON logging
435        let base_subscriber = FmtSubscriber::builder()
436            .with_env_filter(filter)
437            .with_span_events(FmtSpan::CLOSE)
438            .with_target(true)
439            .with_thread_ids(true)
440            .with_thread_names(true)
441            .with_writer(make_writer(&writer));
442
443        let regular_subscriber = base_subscriber
444            .with_ansi(self.config.is_colored_logging())
445            .compact();
446
447        tracing::subscriber::set_global_default(regular_subscriber.finish())
448            .map_err(|e| Error::config(format!("Failed to initialize logging: {e}")))?;
449
450        debug!(
451            "Logging initialized with level: {:?}",
452            self.config.logging.level
453        );
454        Ok(())
455    }
456
457    /// Spawn the signal handler task.
458    #[cfg(feature = "tokio")]
459    fn spawn_signal_handler(handler: SignalHandlerKind) -> tokio::task::JoinHandle<Result<()>> {
460        tokio::spawn(async move { handler.handle_signals().await })
461    }
462
463    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
464    fn spawn_signal_handler(handler: SignalHandlerKind) -> async_std::task::JoinHandle<Result<()>> {
465        async_std::task::spawn(async move { handler.handle_signals().await })
466    }
467
468    /// Get daemon statistics.
469    pub fn get_stats(&self) -> DaemonStats {
470        let subsystem_stats = self.subsystem_manager.get_stats();
471        let shutdown_stats = self.shutdown_coordinator.get_stats();
472        let total_restarts = subsystem_stats.total_restarts;
473
474        DaemonStats {
475            name: self.config.name.clone(),
476            uptime: self.started_at.map(|t| t.elapsed()),
477            is_shutdown: shutdown_stats.is_shutdown,
478            shutdown_reason: shutdown_stats.reason,
479            subsystem_stats,
480            total_restarts,
481        }
482    }
483
484    /// Request graceful shutdown programmatically.
485    pub fn shutdown(&self) -> bool {
486        self.shutdown_coordinator
487            .initiate_shutdown(ShutdownReason::Requested)
488    }
489
490    /// Check if the daemon is running.
491    pub fn is_running(&self) -> bool {
492        !self.shutdown_coordinator.is_shutdown()
493    }
494
495    /// Get the daemon configuration.
496    pub fn config(&self) -> &Config {
497        &self.config
498    }
499
500    /// Get a snapshot of the current configuration. When the `config-watch` feature
501    /// is enabled and hot-reload is active, this reflects the most recent loaded
502    /// configuration; otherwise it returns the initial configuration.
503    #[cfg(feature = "config-watch")]
504    pub fn config_snapshot(&self) -> Arc<Config> {
505        self.config_shared.load_full()
506    }
507}
508
509impl Clone for Daemon {
510    fn clone(&self) -> Self {
511        Self {
512            config: Arc::clone(&self.config),
513            #[cfg(feature = "config-watch")]
514            config_shared: Arc::clone(&self.config_shared),
515            shutdown_coordinator: self.shutdown_coordinator.clone(),
516            subsystem_manager: self.subsystem_manager.clone(),
517            signal_handler: self.signal_handler.clone(),
518            #[cfg(feature = "config-watch")]
519            _config_watcher: None,
520            started_at: self.started_at,
521        }
522    }
523}
524
525/// Statistics about the daemon's current state.
526#[derive(Debug, Clone)]
527pub struct DaemonStats {
528    /// Daemon name
529    pub name: String,
530    /// Time since daemon started
531    pub uptime: Option<std::time::Duration>,
532    /// Whether shutdown has been initiated
533    pub is_shutdown: bool,
534    /// Reason for shutdown (if any)
535    pub shutdown_reason: Option<crate::shutdown::ShutdownReason>,
536    /// Subsystem statistics
537    pub subsystem_stats: crate::subsystem::SubsystemStats,
538    /// Total number of subsystem restarts
539    pub total_restarts: u64,
540}
541
542/// Builder for creating daemon instances with fluent API.
543pub struct DaemonBuilder {
544    config: Config,
545    // Pre-allocate the vector with a reasonable capacity
546    subsystems: Vec<SubsystemRegistrationFn>,
547    signal_config: Option<SignalConfig>,
548    enable_signals: bool,
549    config_path: Option<PathBuf>,
550}
551
552impl DaemonBuilder {
553    /// Create a new daemon builder with the provided configuration.
554    #[must_use]
555    pub fn new(config: Config) -> Self {
556        Self {
557            config,
558            // Pre-allocate the subsystems vector with a reasonable capacity
559            subsystems: Vec::with_capacity(16),
560            signal_config: None,
561            enable_signals: true,
562            config_path: None,
563        }
564    }
565
566    /// Configure signal handling.
567    #[must_use]
568    pub fn with_signal_config(mut self, config: SignalConfig) -> Self {
569        self.signal_config = Some(config);
570        self
571    }
572
573    /// Override the configuration file path used for hot-reload.
574    #[must_use]
575    pub fn with_config_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
576        self.config_path = Some(path.into());
577        self
578    }
579
580    /// Disable signal handling.
581    #[must_use]
582    pub const fn without_signals(mut self) -> Self {
583        self.enable_signals = false;
584        self
585    }
586
587    /// Enable only specific signals.
588    #[must_use]
589    pub fn with_signals(mut self, sigterm: bool, sigint: bool) -> Self {
590        let mut config = SignalConfig::new();
591        if !sigterm {
592            config = config.without_sigterm();
593        }
594        if !sigint {
595            config = config.without_sigint();
596        }
597        self.signal_config = Some(config);
598        self
599    }
600
601    /// Add a task that will be run as part of the daemon.
602    ///
603    /// A task is a function that will be executed repeatedly until shutdown is requested.
604    ///
605    /// # Arguments
606    ///
607    /// * `name` - Name of the task for identification
608    /// * `task_fn` - Function that implements the task logic
609    ///
610    /// # Returns
611    ///
612    /// Updated builder instance
613    #[must_use]
614    pub fn with_task<F, Fut>(mut self, name: &str, task_fn: F) -> Self
615    where
616        F: Fn(crate::shutdown::ShutdownHandle) -> Fut + Send + Sync + 'static,
617        Fut: Future<Output = Result<()>> + Send + 'static,
618    {
619        // Clone the name to avoid lifetime issues
620        let name = name.to_string();
621        let subsystem_fn = Box::new(move |subsystem_manager: &SubsystemManager| {
622            subsystem_manager.register_fn(&name, task_fn)
623        });
624
625        self.subsystems.push(subsystem_fn);
626        self
627    }
628
629    /// Add a subsystem that will be managed by the daemon.
630    ///
631    /// A subsystem is a component that implements the `Subsystem` trait.
632    ///
633    /// # Arguments
634    ///
635    /// * `subsystem` - The subsystem to add
636    ///
637    /// # Returns
638    ///
639    /// Updated builder instance
640    #[must_use]
641    pub fn with_subsystem<S>(mut self, subsystem: S) -> Self
642    where
643        S: Subsystem + Send + Sync + 'static,
644    {
645        let subsystem_fn = Box::new(move |subsystem_manager: &SubsystemManager| {
646            subsystem_manager.register(subsystem)
647        });
648
649        self.subsystems.push(subsystem_fn);
650        self
651    }
652
653    /// Add a subsystem using a registration function.
654    ///
655    /// This is a lower-level method that gives direct access to the `SubsystemManager`
656    /// for registration. It's useful when you need more control over the registration process.
657    ///
658    /// # Arguments
659    ///
660    /// * `name` - Name for identification in logs
661    /// * `register_fn` - Function that handles the subsystem registration
662    ///
663    /// # Returns
664    ///
665    /// Updated builder instance
666    #[must_use]
667    pub fn with_subsystem_fn<F>(mut self, name: &str, register_fn: F) -> Self
668    where
669        F: FnOnce(&SubsystemManager) -> SubsystemId + Send + 'static,
670    {
671        debug!("Adding subsystem registration function for {}", name);
672        self.subsystems.push(Box::new(register_fn));
673        self
674    }
675
676    /// Build the daemon instance.
677    /// Builds a daemon from the configured builder
678    ///
679    /// # Errors
680    ///
681    /// Returns an error if the daemon configuration is invalid or if required components cannot be initialized
682    pub fn build(self) -> Result<Daemon> {
683        // Validate configuration
684        self.config.validate()?;
685
686        // Create shutdown coordinator
687        let shutdown_coordinator = ShutdownCoordinator::new(
688            self.config.shutdown.graceful,
689            self.config.shutdown.force,
690            self.config.shutdown.kill,
691        );
692
693        // Create subsystem manager
694        let subsystem_manager = SubsystemManager::new(shutdown_coordinator.clone());
695
696        // Register all subsystems
697        for subsystem_fn in self.subsystems {
698            let id = subsystem_fn(&subsystem_manager);
699            debug!(subsystem_id = id, "Registered subsystem");
700        }
701
702        // Create signal handler if enabled
703        let signal_handler = if self.enable_signals {
704            self.signal_config.clone().map_or_else(
705                || {
706                    Some(SignalHandlerKind::Default(Arc::new(SignalHandler::new(
707                        shutdown_coordinator.clone(),
708                    ))))
709                },
710                |config| {
711                    Some(SignalHandlerKind::Configurable(Arc::new(
712                        ConfigurableSignalHandler::new(shutdown_coordinator.clone(), config),
713                    )))
714                },
715            )
716        } else {
717            None
718        };
719
720        // Prepare configuration arcs
721        let config_arc = Arc::new(self.config);
722
723        #[cfg(feature = "config-watch")]
724        let config_shared: Arc<ArcSwap<Config>> = Arc::new(ArcSwap::from(config_arc.clone()));
725
726        // Optionally start config watcher when hot_reload is enabled
727        #[cfg(feature = "config-watch")]
728        let mut config_watcher: Option<RecommendedWatcher> = None;
729
730        #[cfg(feature = "config-watch")]
731        {
732            if config_arc.hot_reload {
733                let swap = Arc::clone(&config_shared);
734                let watch_path = self
735                    .config_path
736                    .or_else(|| {
737                        config_arc
738                            .work_dir
739                            .as_ref()
740                            .map(|d| d.join(crate::DEFAULT_CONFIG_FILE))
741                    })
742                    .unwrap_or_else(|| PathBuf::from(crate::DEFAULT_CONFIG_FILE));
743
744                let watch_path_for_cb = watch_path.clone();
745
746                match Config::watch_file(&watch_path, move |res| match res {
747                    Ok(new_cfg) => {
748                        swap.store(Arc::new(new_cfg));
749                        info!(
750                            "Configuration hot-reloaded from {}",
751                            watch_path_for_cb.display()
752                        );
753                    }
754                    Err(e) => {
755                        warn!(error = %e, "Configuration reload failed");
756                    }
757                }) {
758                    Ok(w) => {
759                        config_watcher = Some(w);
760                        info!("Config watcher started for {}", watch_path.display());
761                    }
762                    Err(e) => {
763                        warn!(error = %e, "Failed to start config watcher; continuing without hot-reload");
764                    }
765                }
766            }
767        }
768
769        Ok(Daemon {
770            config: config_arc,
771            #[cfg(feature = "config-watch")]
772            config_shared,
773            shutdown_coordinator,
774            subsystem_manager,
775            signal_handler,
776            #[cfg(feature = "config-watch")]
777            _config_watcher: config_watcher,
778            started_at: None,
779        })
780    }
781
782    /// Build and run the daemon in one step.
783    /// Runs the daemon until completion or error
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if the daemon encounters an unrecoverable error during execution
788    pub async fn run(self) -> Result<()> {
789        let daemon = self.build()?;
790        daemon.run().await
791    }
792}
793
794/// Convenience macro for creating subsystems from closures.
795#[macro_export]
796macro_rules! subsystem {
797    ($name:expr, $closure:expr) => {
798        Box::new(move |shutdown: $crate::shutdown::ShutdownHandle| {
799            Box::pin($closure(shutdown)) as Pin<Box<dyn Future<Output = $crate::Result<()>> + Send>>
800        })
801    };
802}
803
804/// Convenience macro for creating simple task-based subsystems.
805#[macro_export]
806macro_rules! task {
807    ($name:expr, $body:expr) => {
808        |shutdown: $crate::shutdown::ShutdownHandle| async move {
809            #[cfg(feature = "tokio")]
810            let mut shutdown = shutdown;
811            loop {
812                #[cfg(feature = "tokio")]
813                {
814                    tokio::select! {
815                        _ = shutdown.cancelled() => {
816                            tracing::info!("Task '{}' shutting down", $name);
817                            break;
818                        }
819                        _ = async { $body } => {}
820                    }
821                }
822
823                #[cfg(all(feature = "async-std", not(feature = "tokio")))]
824                {
825                    if shutdown.is_shutdown() {
826                        tracing::info!("Task '{}' shutting down", $name);
827                        break;
828                    }
829                    // Execute the body directly without awaiting
830                    $body;
831                    // Add a small delay to prevent tight loop
832                    async_std::task::sleep(std::time::Duration::from_millis(10)).await;
833                }
834            }
835            Ok(())
836        }
837    };
838}
839
840#[cfg(test)]
841mod tests {
842    use super::*;
843    use std::pin::Pin;
844    use std::time::Duration;
845
846    async fn test_subsystem(shutdown: crate::shutdown::ShutdownHandle) -> Result<()> {
847        #[cfg(feature = "tokio")]
848        let mut shutdown = shutdown;
849        loop {
850            #[cfg(feature = "tokio")]
851            {
852                tokio::select! {
853                    () = shutdown.cancelled() => break,
854                    () = tokio::time::sleep(Duration::from_millis(10)) => {}
855                }
856            }
857
858            #[cfg(all(feature = "async-std", not(feature = "tokio")))]
859            {
860                if shutdown.is_shutdown() {
861                    break;
862                }
863                async_std::task::sleep(Duration::from_millis(10)).await;
864            }
865        }
866        Ok(())
867    }
868
869    #[cfg(feature = "tokio")]
870    #[cfg_attr(miri, ignore)]
871    #[tokio::test]
872    async fn test_daemon_builder() {
873        // Add a test timeout to prevent freezing
874        let test_result = tokio::time::timeout(Duration::from_secs(5), async {
875            let config = Config::new().unwrap();
876            let daemon = Daemon::builder(config)
877                .with_subsystem_fn("test", |subsystem_manager| {
878                    subsystem_manager.register_fn("test_subsystem", test_subsystem)
879                })
880                .build()
881                .unwrap();
882
883            assert!(daemon.is_running());
884            assert_eq!(daemon.config().name, "proc-daemon");
885
886            // Ensure proper cleanup
887            daemon.shutdown();
888        })
889        .await;
890
891        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
892    }
893
894    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
895    #[async_std::test]
896    async fn test_daemon_builder() {
897        // Add a test timeout to prevent freezing
898        let test_result = async_std::future::timeout(Duration::from_secs(5), async {
899            let config = Config::new().unwrap();
900            let daemon = Daemon::builder(config)
901                .with_subsystem_fn("test", |subsystem_manager| {
902                    subsystem_manager.register_fn("test_subsystem", test_subsystem)
903                })
904                .build()
905                .unwrap();
906
907            assert!(daemon.is_running());
908            assert_eq!(daemon.config().name, "proc-daemon");
909
910            // Ensure proper cleanup
911            daemon.shutdown();
912        })
913        .await;
914
915        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
916    }
917
918    #[cfg(feature = "tokio")]
919    #[cfg_attr(miri, ignore)]
920    #[tokio::test]
921    async fn test_daemon_with_defaults() {
922        // Add a test timeout to prevent freezing
923        let test_result = tokio::time::timeout(Duration::from_secs(5), async {
924            let builder = Daemon::with_defaults().unwrap();
925            let daemon = builder
926                .with_task("simple_task", |_shutdown| async {
927                    tokio::time::sleep(Duration::from_millis(10)).await;
928                    Ok(())
929                })
930                .build()
931                .unwrap();
932
933            assert!(daemon.is_running());
934
935            // Ensure proper cleanup
936            daemon.shutdown();
937        })
938        .await;
939
940        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
941    }
942
943    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
944    #[async_std::test]
945    async fn test_daemon_with_defaults() {
946        // Add a test timeout to prevent freezing
947        let test_result = async_std::future::timeout(Duration::from_secs(5), async {
948            let builder = Daemon::with_defaults().unwrap();
949            let daemon = builder
950                .with_task("simple_task", |_shutdown| async {
951                    async_std::task::sleep(Duration::from_millis(10)).await;
952                    Ok(())
953                })
954                .build()
955                .unwrap();
956
957            assert!(daemon.is_running());
958
959            // Ensure proper cleanup
960            daemon.shutdown();
961        })
962        .await;
963
964        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
965    }
966
967    #[cfg(feature = "tokio")]
968    #[cfg_attr(miri, ignore)]
969    #[tokio::test]
970    async fn test_daemon_shutdown() {
971        // Add a test timeout to prevent freezing
972        let test_result = tokio::time::timeout(Duration::from_secs(5), async {
973            let config = Config::builder()
974                .name("test-daemon")
975                .shutdown_timeout(Duration::from_millis(100))
976                .unwrap()
977                .build()
978                .unwrap();
979
980            let daemon = Daemon::builder(config)
981                .with_subsystem_fn("test", |subsystem_manager| {
982                    subsystem_manager.register_fn("test_subsystem", test_subsystem)
983                })
984                .without_signals()
985                .build()
986                .unwrap();
987
988            // Request shutdown
989            daemon.shutdown();
990            assert!(!daemon.is_running());
991        })
992        .await;
993
994        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
995    }
996
997    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
998    #[async_std::test]
999    async fn test_daemon_shutdown() {
1000        // Add a test timeout to prevent freezing
1001        let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1002            let config = Config::builder()
1003                .name("test-daemon")
1004                .shutdown_timeout(Duration::from_millis(100))
1005                .unwrap()
1006                .build()
1007                .unwrap();
1008
1009            let daemon = Daemon::builder(config)
1010                .with_subsystem_fn("test", |subsystem_manager| {
1011                    subsystem_manager.register_fn("test_subsystem", test_subsystem)
1012                })
1013                .without_signals()
1014                .build()
1015                .unwrap();
1016
1017            // Request shutdown
1018            daemon.shutdown();
1019            assert!(!daemon.is_running());
1020        })
1021        .await;
1022
1023        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1024    }
1025
1026    #[test]
1027    fn test_daemon_stats() {
1028        let config = Config::new().unwrap();
1029        let daemon = Daemon::builder(config).build().unwrap();
1030
1031        let stats = daemon.get_stats();
1032        assert_eq!(stats.name, "proc-daemon");
1033        assert!(stats.uptime.is_none()); // Not started yet
1034        assert!(!stats.is_shutdown);
1035    }
1036
1037    struct TestSubsystemStruct {
1038        name: String,
1039    }
1040
1041    impl TestSubsystemStruct {
1042        fn new(name: &str) -> Self {
1043            Self {
1044                name: name.to_string(),
1045            }
1046        }
1047    }
1048
1049    impl Subsystem for TestSubsystemStruct {
1050        fn run(
1051            &self,
1052            shutdown: crate::shutdown::ShutdownHandle,
1053        ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1054            Box::pin(async move {
1055                #[cfg(feature = "tokio")]
1056                let mut shutdown = shutdown;
1057                loop {
1058                    #[cfg(feature = "tokio")]
1059                    {
1060                        tokio::select! {
1061                            () = shutdown.cancelled() => break,
1062                            () = tokio::time::sleep(Duration::from_millis(10)) => {}
1063                        }
1064                    }
1065
1066                    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1067                    {
1068                        if shutdown.is_shutdown() {
1069                            break;
1070                        }
1071                        async_std::task::sleep(Duration::from_millis(10)).await;
1072                    }
1073                }
1074                Ok(())
1075            })
1076        }
1077
1078        fn name(&self) -> &str {
1079            &self.name
1080        }
1081    }
1082
1083    #[cfg(feature = "tokio")]
1084    #[cfg_attr(miri, ignore)]
1085    #[tokio::test]
1086    async fn test_daemon_with_struct_subsystem() {
1087        // Add a test timeout to prevent freezing
1088        let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1089            let config = Config::new().unwrap();
1090            let subsystem = TestSubsystemStruct::new("struct_test");
1091
1092            let daemon = Daemon::builder(config)
1093                .with_subsystem(subsystem)
1094                .without_signals()
1095                .build()
1096                .unwrap();
1097
1098            let stats = daemon.get_stats();
1099            assert_eq!(stats.subsystem_stats.total_subsystems, 1);
1100
1101            // Ensure proper cleanup
1102            daemon.shutdown();
1103        })
1104        .await;
1105
1106        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1107    }
1108
1109    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1110    #[async_std::test]
1111    async fn test_daemon_with_struct_subsystem() {
1112        // Add a test timeout to prevent freezing
1113        let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1114            let config = Config::new().unwrap();
1115            let subsystem = TestSubsystemStruct::new("struct_test");
1116
1117            let daemon = Daemon::builder(config)
1118                .with_subsystem(subsystem)
1119                .without_signals()
1120                .build()
1121                .unwrap();
1122
1123            let stats = daemon.get_stats();
1124            assert_eq!(stats.subsystem_stats.total_subsystems, 1);
1125
1126            // Ensure proper cleanup
1127            daemon.shutdown();
1128        })
1129        .await;
1130
1131        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1132    }
1133
1134    #[cfg(feature = "tokio")]
1135    #[cfg_attr(miri, ignore)]
1136    #[tokio::test]
1137    async fn test_daemon_signal_configuration() {
1138        // Add a test timeout to prevent freezing
1139        let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1140            let config = Config::new().unwrap();
1141            let signal_config = SignalConfig::new().with_sighup().without_sigint();
1142
1143            let daemon = Daemon::builder(config)
1144                .with_signal_config(signal_config)
1145                .build()
1146                .unwrap();
1147
1148            assert!(daemon.signal_handler.is_some());
1149
1150            // Ensure proper cleanup
1151            daemon.shutdown();
1152        })
1153        .await;
1154
1155        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1156    }
1157
1158    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1159    #[async_std::test]
1160    async fn test_daemon_signal_configuration() {
1161        // Add a test timeout to prevent freezing
1162        let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1163            let config = Config::new().unwrap();
1164            let signal_config = SignalConfig::new().with_sighup().without_sigint();
1165
1166            let daemon = Daemon::builder(config)
1167                .with_signal_config(signal_config)
1168                .build()
1169                .unwrap();
1170
1171            assert!(daemon.signal_handler.is_some());
1172
1173            // Ensure proper cleanup
1174            daemon.shutdown();
1175        })
1176        .await;
1177
1178        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1179    }
1180
1181    #[cfg(feature = "tokio")]
1182    #[cfg_attr(miri, ignore)]
1183    #[tokio::test]
1184    async fn test_macro_usage() {
1185        // Add a test timeout to prevent freezing
1186        let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1187            let config = Config::new().unwrap();
1188
1189            let daemon = Daemon::builder(config)
1190                .with_task(
1191                    "macro_test",
1192                    task!("macro_test", {
1193                        tokio::time::sleep(Duration::from_millis(1)).await;
1194                    }),
1195                )
1196                .without_signals()
1197                .build()
1198                .unwrap();
1199
1200            let stats = daemon.get_stats();
1201            assert_eq!(stats.subsystem_stats.total_subsystems, 1);
1202
1203            // Ensure proper cleanup
1204            daemon.shutdown();
1205        })
1206        .await;
1207
1208        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1209    }
1210
1211    #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1212    #[async_std::test]
1213    async fn test_macro_usage() {
1214        // Add a test timeout to prevent freezing
1215        let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1216            let config = Config::new().unwrap();
1217
1218            let daemon = Daemon::builder(config)
1219                .with_task(
1220                    "macro_test",
1221                    task!("macro_test", {
1222                        async_std::task::sleep(Duration::from_millis(1)).await;
1223                    }),
1224                )
1225                .without_signals()
1226                .build()
1227                .unwrap();
1228
1229            let stats = daemon.get_stats();
1230            assert_eq!(stats.subsystem_stats.total_subsystems, 1);
1231
1232            // Ensure proper cleanup
1233            daemon.shutdown();
1234        })
1235        .await;
1236
1237        assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1238    }
1239}