1use std::future::Future;
8use std::io::{self, Write};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::Instant;
12use tracing::{debug, error, info, instrument, warn};
13
14use crate::config::Config;
15use crate::error::{Error, Result};
16use crate::shutdown::{ShutdownCoordinator, ShutdownReason};
17use crate::signal::{ConfigurableSignalHandler, SignalConfig, SignalHandler};
18use crate::subsystem::{Subsystem, SubsystemId, SubsystemManager};
19
20#[cfg(feature = "config-watch")]
21use arc_swap::ArcSwap;
22#[cfg(feature = "config-watch")]
23use notify::RecommendedWatcher;
24
25type SubsystemRegistrationFn = Box<dyn FnOnce(&SubsystemManager) -> SubsystemId + Send + 'static>;
27
28pub struct Daemon {
30 config: Arc<Config>,
32 #[cfg(feature = "config-watch")]
34 config_shared: Arc<ArcSwap<Config>>,
35 shutdown_coordinator: ShutdownCoordinator,
37 subsystem_manager: SubsystemManager,
39 signal_handler: Option<SignalHandlerKind>,
41 #[cfg(feature = "config-watch")]
43 _config_watcher: Option<RecommendedWatcher>,
44 started_at: Option<Instant>,
46}
47
48struct PidFileGuard {
49 path: PathBuf,
50}
51
52impl PidFileGuard {
53 fn create(path: &Path) -> Result<Self> {
54 let pid = std::process::id();
55 let mut file = std::fs::File::create(path).map_err(|e| {
56 Error::io_with_source(
57 format!("Failed to create PID file at {}", path.display()),
58 e,
59 )
60 })?;
61 writeln!(file, "{pid}").map_err(|e| {
62 Error::io_with_source(format!("Failed to write PID file at {}", path.display()), e)
63 })?;
64 Ok(Self {
65 path: path.to_path_buf(),
66 })
67 }
68}
69
70impl Drop for PidFileGuard {
71 fn drop(&mut self) {
72 let _ = std::fs::remove_file(&self.path);
73 }
74}
75
76struct RotatingFileWriterInner {
77 file: std::fs::File,
78 path: PathBuf,
79 max_size: u64,
80 max_files: u32,
81 size: u64,
82}
83
84#[derive(Clone)]
85struct RotatingFileWriter {
86 inner: Arc<std::sync::Mutex<RotatingFileWriterInner>>,
87}
88
89impl RotatingFileWriter {
90 fn new(path: PathBuf, max_size: Option<u64>, max_files: Option<u32>) -> io::Result<Self> {
91 let file = std::fs::OpenOptions::new()
92 .create(true)
93 .append(true)
94 .open(&path)?;
95 let size = file.metadata().map(|m| m.len()).unwrap_or(0);
96
97 let max_size = max_size.unwrap_or(u64::MAX);
98 let max_files = max_files.unwrap_or(0);
99
100 Ok(Self {
101 inner: Arc::new(std::sync::Mutex::new(RotatingFileWriterInner {
102 file,
103 path,
104 max_size,
105 max_files,
106 size,
107 })),
108 })
109 }
110
111 fn rotate_locked(inner: &mut RotatingFileWriterInner) -> io::Result<()> {
112 if inner.max_files == 0 {
113 return Ok(());
114 }
115
116 for idx in (1..=inner.max_files).rev() {
117 let from = Self::rotated_path(&inner.path, idx - 1);
118 let to = Self::rotated_path(&inner.path, idx);
119 if from.exists() {
120 let _ = std::fs::remove_file(&to);
121 std::fs::rename(&from, &to)?;
122 }
123 }
124 inner.file = std::fs::OpenOptions::new()
125 .create(true)
126 .append(true)
127 .open(&inner.path)?;
128 inner.size = 0;
129 Ok(())
130 }
131
132 fn rotated_path(path: &Path, idx: u32) -> PathBuf {
133 if idx == 0 {
134 return path.to_path_buf();
135 }
136 PathBuf::from(format!("{}.{}", path.display(), idx))
137 }
138}
139
140struct RotatingFileWriterGuard {
141 inner: Arc<std::sync::Mutex<RotatingFileWriterInner>>,
142}
143
144impl io::Write for RotatingFileWriterGuard {
145 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
146 let mut inner = self
147 .inner
148 .lock()
149 .map_err(|_| io::Error::other("rotating log writer mutex poisoned"))?;
150
151 if inner.size.saturating_add(buf.len() as u64) > inner.max_size {
152 RotatingFileWriter::rotate_locked(&mut inner)?;
153 }
154
155 let written = inner.file.write(buf)?;
156 inner.size = inner.size.saturating_add(written as u64);
157 drop(inner);
158 Ok(written)
159 }
160
161 fn flush(&mut self) -> io::Result<()> {
162 let mut inner = self
163 .inner
164 .lock()
165 .map_err(|_| io::Error::other("rotating log writer mutex poisoned"))?;
166 let result = inner.file.flush();
167 drop(inner);
168 result
169 }
170}
171
172impl<'a> tracing_subscriber::fmt::writer::MakeWriter<'a> for RotatingFileWriter {
173 type Writer = RotatingFileWriterGuard;
174
175 fn make_writer(&'a self) -> Self::Writer {
176 RotatingFileWriterGuard {
177 inner: Arc::clone(&self.inner),
178 }
179 }
180}
181
182#[derive(Clone)]
183enum SignalHandlerKind {
184 Default(Arc<SignalHandler>),
185 Configurable(Arc<ConfigurableSignalHandler>),
186}
187
188impl SignalHandlerKind {
189 #[allow(dead_code)]
190 async fn handle_signals(&self) -> Result<()> {
191 match self {
192 Self::Default(handler) => handler.handle_signals().await,
193 Self::Configurable(handler) => handler.handle_signals().await,
194 }
195 }
196
197 fn stop(&self) {
198 match self {
199 Self::Default(handler) => handler.stop(),
200 Self::Configurable(handler) => handler.stop(),
201 }
202 }
203}
204
205impl Daemon {
206 #[must_use]
208 pub fn builder(config: Config) -> DaemonBuilder {
209 DaemonBuilder::new(config)
210 }
211
212 pub fn with_defaults() -> Result<DaemonBuilder> {
218 let config = Config::new()?;
219 Ok(Self::builder(config))
220 }
221
222 #[instrument(skip(self), fields(daemon_name = %self.config.name))]
230 pub async fn run(mut self) -> Result<()> {
231 info!(daemon_name = %self.config.name, "Starting daemon");
232 self.started_at = Some(Instant::now());
233
234 if let Some(work_dir) = &self.config.work_dir {
236 std::env::set_current_dir(work_dir).map_err(|e| {
237 Error::io_with_source(
238 format!("Failed to set working directory to {}", work_dir.display()),
239 e,
240 )
241 })?;
242 }
243
244 let _pid_guard = if let Some(pid_file) = &self.config.pid_file {
246 Some(PidFileGuard::create(pid_file)?)
247 } else {
248 None
249 };
250
251 self.init_logging()?;
253
254 #[cfg(feature = "scheduler-hints")]
256 {
257 crate::scheduler::apply_process_hints(&self.config);
258 crate::scheduler::apply_runtime_hints();
259 }
260
261 if let Err(e) = self.subsystem_manager.start_all().await {
263 error!(error = %e, "Failed to start all subsystems");
264 return Err(e);
265 }
266
267 #[cfg(any(feature = "tokio", feature = "async-std"))]
270 let signal_task = self.signal_handler.as_ref().map(|signal_handler| {
271 let handler = signal_handler.clone();
272 Self::spawn_signal_handler(handler)
273 });
274
275 #[cfg(not(any(feature = "tokio", feature = "async-std")))]
276 let _signal_task: Option<()> = None;
277
278 info!("Daemon started successfully, waiting for shutdown signal");
280
281 loop {
283 if self.shutdown_coordinator.is_shutdown() {
284 break;
285 }
286
287 if self.config.monitoring.health_checks {
289 let health_results = self.subsystem_manager.run_health_checks();
290 let mut found_unhealthy = false;
292 for (id, name, healthy) in &health_results {
293 if !healthy {
294 if !found_unhealthy {
295 warn!("Unhealthy subsystems detected:");
296 found_unhealthy = true;
297 }
298 warn!(subsystem_id = id, subsystem_name = %name, "Unhealthy subsystem");
299 }
301 }
302 }
303
304 #[cfg(feature = "tokio")]
306 tokio::time::sleep(self.config.health_check_interval()).await;
307
308 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
309 async_std::task::sleep(self.config.health_check_interval()).await;
310
311 #[cfg(not(any(feature = "tokio", feature = "async-std")))]
312 std::thread::sleep(self.config.health_check_interval());
313 }
314
315 info!("Shutdown initiated, beginning graceful shutdown");
317
318 if let Some(signal_handler) = &self.signal_handler {
320 signal_handler.stop();
321 }
322
323 #[cfg(any(feature = "tokio", feature = "async-std"))]
325 if let Some(task) = signal_task {
326 #[cfg(feature = "tokio")]
327 {
328 if let Err(e) = task.await {
329 warn!(error = %e, "Signal handler task failed");
330 }
331 }
332
333 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
334 {
335 if let Err(e) = task.await {
336 warn!(error = %e, "Signal handler task failed");
337 }
338 }
339 }
340
341 if let Err(e) = self.subsystem_manager.stop_all().await {
343 error!(error = %e, "Failed to stop all subsystems gracefully");
344 }
345
346 if let Err(e) = self.shutdown_coordinator.wait_for_shutdown().await {
348 warn!(error = %e, "Graceful shutdown timeout exceeded");
349
350 if let Err(e) = self.shutdown_coordinator.wait_for_force_shutdown().await {
352 error!(error = %e, "Force shutdown timeout exceeded");
353 }
354
355 if let Err(e) = self.shutdown_coordinator.wait_for_kill_shutdown().await {
357 error!(error = %e, "Kill shutdown timeout exceeded, exiting immediately");
358 }
359 }
360
361 let elapsed = self.started_at.map(|t| t.elapsed());
362 info!(uptime = ?elapsed, "Daemon shutdown complete");
363
364 Ok(())
365 }
366
367 fn init_logging(&self) -> Result<()> {
369 use tracing_subscriber::fmt::format::FmtSpan;
370 use tracing_subscriber::fmt::writer::BoxMakeWriter;
371 use tracing_subscriber::{EnvFilter, FmtSubscriber};
372
373 let level: tracing::Level = self.config.logging.level.into();
374 let filter = EnvFilter::from_default_env().add_directive(level.into());
375
376 let writer = if let Some(path) = &self.config.logging.file {
378 Some(
379 RotatingFileWriter::new(
380 path.clone(),
381 self.config.logging.max_file_size,
382 self.config.logging.max_files,
383 )
384 .map_err(|e| {
385 Error::io_with_source(
386 format!("Failed to initialize log file at {}", path.display()),
387 e,
388 )
389 })?,
390 )
391 } else {
392 None
393 };
394
395 let make_writer = |writer: &Option<RotatingFileWriter>| {
396 writer.as_ref().map_or_else(
397 || BoxMakeWriter::new(std::io::stdout),
398 |writer| BoxMakeWriter::new(writer.clone()),
399 )
400 };
401
402 if self.config.is_json_logging() {
404 #[cfg(feature = "json-logs")]
405 {
406 let base_subscriber = FmtSubscriber::builder()
407 .with_env_filter(filter)
408 .with_span_events(FmtSpan::CLOSE)
409 .with_target(true)
410 .with_thread_ids(true)
411 .with_thread_names(true)
412 .with_writer(make_writer(&writer));
413
414 let json_subscriber = base_subscriber
415 .json()
416 .flatten_event(true)
417 .with_current_span(false);
418
419 tracing::subscriber::set_global_default(json_subscriber.finish()).map_err(|e| {
420 Error::config(format!("Failed to initialize JSON logging: {e}"))
421 })?;
422
423 return Ok(()); }
425
426 #[cfg(not(feature = "json-logs"))]
427 {
428 return Err(Error::config(
429 "JSON logging requested but feature not enabled",
430 ));
431 }
432 }
433
434 let base_subscriber = FmtSubscriber::builder()
436 .with_env_filter(filter)
437 .with_span_events(FmtSpan::CLOSE)
438 .with_target(true)
439 .with_thread_ids(true)
440 .with_thread_names(true)
441 .with_writer(make_writer(&writer));
442
443 let regular_subscriber = base_subscriber
444 .with_ansi(self.config.is_colored_logging())
445 .compact();
446
447 tracing::subscriber::set_global_default(regular_subscriber.finish())
448 .map_err(|e| Error::config(format!("Failed to initialize logging: {e}")))?;
449
450 debug!(
451 "Logging initialized with level: {:?}",
452 self.config.logging.level
453 );
454 Ok(())
455 }
456
457 #[cfg(feature = "tokio")]
459 fn spawn_signal_handler(handler: SignalHandlerKind) -> tokio::task::JoinHandle<Result<()>> {
460 tokio::spawn(async move { handler.handle_signals().await })
461 }
462
463 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
464 fn spawn_signal_handler(handler: SignalHandlerKind) -> async_std::task::JoinHandle<Result<()>> {
465 async_std::task::spawn(async move { handler.handle_signals().await })
466 }
467
468 pub fn get_stats(&self) -> DaemonStats {
470 let subsystem_stats = self.subsystem_manager.get_stats();
471 let shutdown_stats = self.shutdown_coordinator.get_stats();
472 let total_restarts = subsystem_stats.total_restarts;
473
474 DaemonStats {
475 name: self.config.name.clone(),
476 uptime: self.started_at.map(|t| t.elapsed()),
477 is_shutdown: shutdown_stats.is_shutdown,
478 shutdown_reason: shutdown_stats.reason,
479 subsystem_stats,
480 total_restarts,
481 }
482 }
483
484 pub fn shutdown(&self) -> bool {
486 self.shutdown_coordinator
487 .initiate_shutdown(ShutdownReason::Requested)
488 }
489
490 pub fn is_running(&self) -> bool {
492 !self.shutdown_coordinator.is_shutdown()
493 }
494
495 pub fn config(&self) -> &Config {
497 &self.config
498 }
499
500 #[cfg(feature = "config-watch")]
504 pub fn config_snapshot(&self) -> Arc<Config> {
505 self.config_shared.load_full()
506 }
507}
508
509impl Clone for Daemon {
510 fn clone(&self) -> Self {
511 Self {
512 config: Arc::clone(&self.config),
513 #[cfg(feature = "config-watch")]
514 config_shared: Arc::clone(&self.config_shared),
515 shutdown_coordinator: self.shutdown_coordinator.clone(),
516 subsystem_manager: self.subsystem_manager.clone(),
517 signal_handler: self.signal_handler.clone(),
518 #[cfg(feature = "config-watch")]
519 _config_watcher: None,
520 started_at: self.started_at,
521 }
522 }
523}
524
525#[derive(Debug, Clone)]
527pub struct DaemonStats {
528 pub name: String,
530 pub uptime: Option<std::time::Duration>,
532 pub is_shutdown: bool,
534 pub shutdown_reason: Option<crate::shutdown::ShutdownReason>,
536 pub subsystem_stats: crate::subsystem::SubsystemStats,
538 pub total_restarts: u64,
540}
541
542pub struct DaemonBuilder {
544 config: Config,
545 subsystems: Vec<SubsystemRegistrationFn>,
547 signal_config: Option<SignalConfig>,
548 enable_signals: bool,
549 config_path: Option<PathBuf>,
550}
551
552impl DaemonBuilder {
553 #[must_use]
555 pub fn new(config: Config) -> Self {
556 Self {
557 config,
558 subsystems: Vec::with_capacity(16),
560 signal_config: None,
561 enable_signals: true,
562 config_path: None,
563 }
564 }
565
566 #[must_use]
568 pub fn with_signal_config(mut self, config: SignalConfig) -> Self {
569 self.signal_config = Some(config);
570 self
571 }
572
573 #[must_use]
575 pub fn with_config_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
576 self.config_path = Some(path.into());
577 self
578 }
579
580 #[must_use]
582 pub const fn without_signals(mut self) -> Self {
583 self.enable_signals = false;
584 self
585 }
586
587 #[must_use]
589 pub fn with_signals(mut self, sigterm: bool, sigint: bool) -> Self {
590 let mut config = SignalConfig::new();
591 if !sigterm {
592 config = config.without_sigterm();
593 }
594 if !sigint {
595 config = config.without_sigint();
596 }
597 self.signal_config = Some(config);
598 self
599 }
600
601 #[must_use]
614 pub fn with_task<F, Fut>(mut self, name: &str, task_fn: F) -> Self
615 where
616 F: Fn(crate::shutdown::ShutdownHandle) -> Fut + Send + Sync + 'static,
617 Fut: Future<Output = Result<()>> + Send + 'static,
618 {
619 let name = name.to_string();
621 let subsystem_fn = Box::new(move |subsystem_manager: &SubsystemManager| {
622 subsystem_manager.register_fn(&name, task_fn)
623 });
624
625 self.subsystems.push(subsystem_fn);
626 self
627 }
628
629 #[must_use]
641 pub fn with_subsystem<S>(mut self, subsystem: S) -> Self
642 where
643 S: Subsystem + Send + Sync + 'static,
644 {
645 let subsystem_fn = Box::new(move |subsystem_manager: &SubsystemManager| {
646 subsystem_manager.register(subsystem)
647 });
648
649 self.subsystems.push(subsystem_fn);
650 self
651 }
652
653 #[must_use]
667 pub fn with_subsystem_fn<F>(mut self, name: &str, register_fn: F) -> Self
668 where
669 F: FnOnce(&SubsystemManager) -> SubsystemId + Send + 'static,
670 {
671 debug!("Adding subsystem registration function for {}", name);
672 self.subsystems.push(Box::new(register_fn));
673 self
674 }
675
676 pub fn build(self) -> Result<Daemon> {
683 self.config.validate()?;
685
686 let shutdown_coordinator = ShutdownCoordinator::new(
688 self.config.shutdown.graceful,
689 self.config.shutdown.force,
690 self.config.shutdown.kill,
691 );
692
693 let subsystem_manager = SubsystemManager::new(shutdown_coordinator.clone());
695
696 for subsystem_fn in self.subsystems {
698 let id = subsystem_fn(&subsystem_manager);
699 debug!(subsystem_id = id, "Registered subsystem");
700 }
701
702 let signal_handler = if self.enable_signals {
704 self.signal_config.clone().map_or_else(
705 || {
706 Some(SignalHandlerKind::Default(Arc::new(SignalHandler::new(
707 shutdown_coordinator.clone(),
708 ))))
709 },
710 |config| {
711 Some(SignalHandlerKind::Configurable(Arc::new(
712 ConfigurableSignalHandler::new(shutdown_coordinator.clone(), config),
713 )))
714 },
715 )
716 } else {
717 None
718 };
719
720 let config_arc = Arc::new(self.config);
722
723 #[cfg(feature = "config-watch")]
724 let config_shared: Arc<ArcSwap<Config>> = Arc::new(ArcSwap::from(config_arc.clone()));
725
726 #[cfg(feature = "config-watch")]
728 let mut config_watcher: Option<RecommendedWatcher> = None;
729
730 #[cfg(feature = "config-watch")]
731 {
732 if config_arc.hot_reload {
733 let swap = Arc::clone(&config_shared);
734 let watch_path = self
735 .config_path
736 .or_else(|| {
737 config_arc
738 .work_dir
739 .as_ref()
740 .map(|d| d.join(crate::DEFAULT_CONFIG_FILE))
741 })
742 .unwrap_or_else(|| PathBuf::from(crate::DEFAULT_CONFIG_FILE));
743
744 let watch_path_for_cb = watch_path.clone();
745
746 match Config::watch_file(&watch_path, move |res| match res {
747 Ok(new_cfg) => {
748 swap.store(Arc::new(new_cfg));
749 info!(
750 "Configuration hot-reloaded from {}",
751 watch_path_for_cb.display()
752 );
753 }
754 Err(e) => {
755 warn!(error = %e, "Configuration reload failed");
756 }
757 }) {
758 Ok(w) => {
759 config_watcher = Some(w);
760 info!("Config watcher started for {}", watch_path.display());
761 }
762 Err(e) => {
763 warn!(error = %e, "Failed to start config watcher; continuing without hot-reload");
764 }
765 }
766 }
767 }
768
769 Ok(Daemon {
770 config: config_arc,
771 #[cfg(feature = "config-watch")]
772 config_shared,
773 shutdown_coordinator,
774 subsystem_manager,
775 signal_handler,
776 #[cfg(feature = "config-watch")]
777 _config_watcher: config_watcher,
778 started_at: None,
779 })
780 }
781
782 pub async fn run(self) -> Result<()> {
789 let daemon = self.build()?;
790 daemon.run().await
791 }
792}
793
794#[macro_export]
796macro_rules! subsystem {
797 ($name:expr, $closure:expr) => {
798 Box::new(move |shutdown: $crate::shutdown::ShutdownHandle| {
799 Box::pin($closure(shutdown)) as Pin<Box<dyn Future<Output = $crate::Result<()>> + Send>>
800 })
801 };
802}
803
804#[macro_export]
806macro_rules! task {
807 ($name:expr, $body:expr) => {
808 |shutdown: $crate::shutdown::ShutdownHandle| async move {
809 #[cfg(feature = "tokio")]
810 let mut shutdown = shutdown;
811 loop {
812 #[cfg(feature = "tokio")]
813 {
814 tokio::select! {
815 _ = shutdown.cancelled() => {
816 tracing::info!("Task '{}' shutting down", $name);
817 break;
818 }
819 _ = async { $body } => {}
820 }
821 }
822
823 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
824 {
825 if shutdown.is_shutdown() {
826 tracing::info!("Task '{}' shutting down", $name);
827 break;
828 }
829 $body;
831 async_std::task::sleep(std::time::Duration::from_millis(10)).await;
833 }
834 }
835 Ok(())
836 }
837 };
838}
839
840#[cfg(test)]
841mod tests {
842 use super::*;
843 use std::pin::Pin;
844 use std::time::Duration;
845
846 async fn test_subsystem(shutdown: crate::shutdown::ShutdownHandle) -> Result<()> {
847 #[cfg(feature = "tokio")]
848 let mut shutdown = shutdown;
849 loop {
850 #[cfg(feature = "tokio")]
851 {
852 tokio::select! {
853 () = shutdown.cancelled() => break,
854 () = tokio::time::sleep(Duration::from_millis(10)) => {}
855 }
856 }
857
858 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
859 {
860 if shutdown.is_shutdown() {
861 break;
862 }
863 async_std::task::sleep(Duration::from_millis(10)).await;
864 }
865 }
866 Ok(())
867 }
868
869 #[cfg(feature = "tokio")]
870 #[cfg_attr(miri, ignore)]
871 #[tokio::test]
872 async fn test_daemon_builder() {
873 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
875 let config = Config::new().unwrap();
876 let daemon = Daemon::builder(config)
877 .with_subsystem_fn("test", |subsystem_manager| {
878 subsystem_manager.register_fn("test_subsystem", test_subsystem)
879 })
880 .build()
881 .unwrap();
882
883 assert!(daemon.is_running());
884 assert_eq!(daemon.config().name, "proc-daemon");
885
886 daemon.shutdown();
888 })
889 .await;
890
891 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
892 }
893
894 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
895 #[async_std::test]
896 async fn test_daemon_builder() {
897 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
899 let config = Config::new().unwrap();
900 let daemon = Daemon::builder(config)
901 .with_subsystem_fn("test", |subsystem_manager| {
902 subsystem_manager.register_fn("test_subsystem", test_subsystem)
903 })
904 .build()
905 .unwrap();
906
907 assert!(daemon.is_running());
908 assert_eq!(daemon.config().name, "proc-daemon");
909
910 daemon.shutdown();
912 })
913 .await;
914
915 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
916 }
917
918 #[cfg(feature = "tokio")]
919 #[cfg_attr(miri, ignore)]
920 #[tokio::test]
921 async fn test_daemon_with_defaults() {
922 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
924 let builder = Daemon::with_defaults().unwrap();
925 let daemon = builder
926 .with_task("simple_task", |_shutdown| async {
927 tokio::time::sleep(Duration::from_millis(10)).await;
928 Ok(())
929 })
930 .build()
931 .unwrap();
932
933 assert!(daemon.is_running());
934
935 daemon.shutdown();
937 })
938 .await;
939
940 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
941 }
942
943 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
944 #[async_std::test]
945 async fn test_daemon_with_defaults() {
946 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
948 let builder = Daemon::with_defaults().unwrap();
949 let daemon = builder
950 .with_task("simple_task", |_shutdown| async {
951 async_std::task::sleep(Duration::from_millis(10)).await;
952 Ok(())
953 })
954 .build()
955 .unwrap();
956
957 assert!(daemon.is_running());
958
959 daemon.shutdown();
961 })
962 .await;
963
964 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
965 }
966
967 #[cfg(feature = "tokio")]
968 #[cfg_attr(miri, ignore)]
969 #[tokio::test]
970 async fn test_daemon_shutdown() {
971 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
973 let config = Config::builder()
974 .name("test-daemon")
975 .shutdown_timeout(Duration::from_millis(100))
976 .unwrap()
977 .build()
978 .unwrap();
979
980 let daemon = Daemon::builder(config)
981 .with_subsystem_fn("test", |subsystem_manager| {
982 subsystem_manager.register_fn("test_subsystem", test_subsystem)
983 })
984 .without_signals()
985 .build()
986 .unwrap();
987
988 daemon.shutdown();
990 assert!(!daemon.is_running());
991 })
992 .await;
993
994 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
995 }
996
997 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
998 #[async_std::test]
999 async fn test_daemon_shutdown() {
1000 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1002 let config = Config::builder()
1003 .name("test-daemon")
1004 .shutdown_timeout(Duration::from_millis(100))
1005 .unwrap()
1006 .build()
1007 .unwrap();
1008
1009 let daemon = Daemon::builder(config)
1010 .with_subsystem_fn("test", |subsystem_manager| {
1011 subsystem_manager.register_fn("test_subsystem", test_subsystem)
1012 })
1013 .without_signals()
1014 .build()
1015 .unwrap();
1016
1017 daemon.shutdown();
1019 assert!(!daemon.is_running());
1020 })
1021 .await;
1022
1023 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1024 }
1025
1026 #[test]
1027 fn test_daemon_stats() {
1028 let config = Config::new().unwrap();
1029 let daemon = Daemon::builder(config).build().unwrap();
1030
1031 let stats = daemon.get_stats();
1032 assert_eq!(stats.name, "proc-daemon");
1033 assert!(stats.uptime.is_none()); assert!(!stats.is_shutdown);
1035 }
1036
1037 struct TestSubsystemStruct {
1038 name: String,
1039 }
1040
1041 impl TestSubsystemStruct {
1042 fn new(name: &str) -> Self {
1043 Self {
1044 name: name.to_string(),
1045 }
1046 }
1047 }
1048
1049 impl Subsystem for TestSubsystemStruct {
1050 fn run(
1051 &self,
1052 shutdown: crate::shutdown::ShutdownHandle,
1053 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1054 Box::pin(async move {
1055 #[cfg(feature = "tokio")]
1056 let mut shutdown = shutdown;
1057 loop {
1058 #[cfg(feature = "tokio")]
1059 {
1060 tokio::select! {
1061 () = shutdown.cancelled() => break,
1062 () = tokio::time::sleep(Duration::from_millis(10)) => {}
1063 }
1064 }
1065
1066 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1067 {
1068 if shutdown.is_shutdown() {
1069 break;
1070 }
1071 async_std::task::sleep(Duration::from_millis(10)).await;
1072 }
1073 }
1074 Ok(())
1075 })
1076 }
1077
1078 fn name(&self) -> &str {
1079 &self.name
1080 }
1081 }
1082
1083 #[cfg(feature = "tokio")]
1084 #[cfg_attr(miri, ignore)]
1085 #[tokio::test]
1086 async fn test_daemon_with_struct_subsystem() {
1087 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1089 let config = Config::new().unwrap();
1090 let subsystem = TestSubsystemStruct::new("struct_test");
1091
1092 let daemon = Daemon::builder(config)
1093 .with_subsystem(subsystem)
1094 .without_signals()
1095 .build()
1096 .unwrap();
1097
1098 let stats = daemon.get_stats();
1099 assert_eq!(stats.subsystem_stats.total_subsystems, 1);
1100
1101 daemon.shutdown();
1103 })
1104 .await;
1105
1106 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1107 }
1108
1109 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1110 #[async_std::test]
1111 async fn test_daemon_with_struct_subsystem() {
1112 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1114 let config = Config::new().unwrap();
1115 let subsystem = TestSubsystemStruct::new("struct_test");
1116
1117 let daemon = Daemon::builder(config)
1118 .with_subsystem(subsystem)
1119 .without_signals()
1120 .build()
1121 .unwrap();
1122
1123 let stats = daemon.get_stats();
1124 assert_eq!(stats.subsystem_stats.total_subsystems, 1);
1125
1126 daemon.shutdown();
1128 })
1129 .await;
1130
1131 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1132 }
1133
1134 #[cfg(feature = "tokio")]
1135 #[cfg_attr(miri, ignore)]
1136 #[tokio::test]
1137 async fn test_daemon_signal_configuration() {
1138 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1140 let config = Config::new().unwrap();
1141 let signal_config = SignalConfig::new().with_sighup().without_sigint();
1142
1143 let daemon = Daemon::builder(config)
1144 .with_signal_config(signal_config)
1145 .build()
1146 .unwrap();
1147
1148 assert!(daemon.signal_handler.is_some());
1149
1150 daemon.shutdown();
1152 })
1153 .await;
1154
1155 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1156 }
1157
1158 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1159 #[async_std::test]
1160 async fn test_daemon_signal_configuration() {
1161 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1163 let config = Config::new().unwrap();
1164 let signal_config = SignalConfig::new().with_sighup().without_sigint();
1165
1166 let daemon = Daemon::builder(config)
1167 .with_signal_config(signal_config)
1168 .build()
1169 .unwrap();
1170
1171 assert!(daemon.signal_handler.is_some());
1172
1173 daemon.shutdown();
1175 })
1176 .await;
1177
1178 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1179 }
1180
1181 #[cfg(feature = "tokio")]
1182 #[cfg_attr(miri, ignore)]
1183 #[tokio::test]
1184 async fn test_macro_usage() {
1185 let test_result = tokio::time::timeout(Duration::from_secs(5), async {
1187 let config = Config::new().unwrap();
1188
1189 let daemon = Daemon::builder(config)
1190 .with_task(
1191 "macro_test",
1192 task!("macro_test", {
1193 tokio::time::sleep(Duration::from_millis(1)).await;
1194 }),
1195 )
1196 .without_signals()
1197 .build()
1198 .unwrap();
1199
1200 let stats = daemon.get_stats();
1201 assert_eq!(stats.subsystem_stats.total_subsystems, 1);
1202
1203 daemon.shutdown();
1205 })
1206 .await;
1207
1208 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1209 }
1210
1211 #[cfg(all(feature = "async-std", not(feature = "tokio")))]
1212 #[async_std::test]
1213 async fn test_macro_usage() {
1214 let test_result = async_std::future::timeout(Duration::from_secs(5), async {
1216 let config = Config::new().unwrap();
1217
1218 let daemon = Daemon::builder(config)
1219 .with_task(
1220 "macro_test",
1221 task!("macro_test", {
1222 async_std::task::sleep(Duration::from_millis(1)).await;
1223 }),
1224 )
1225 .without_signals()
1226 .build()
1227 .unwrap();
1228
1229 let stats = daemon.get_stats();
1230 assert_eq!(stats.subsystem_stats.total_subsystems, 1);
1231
1232 daemon.shutdown();
1234 })
1235 .await;
1236
1237 assert!(test_result.is_ok(), "Test timed out after 5 seconds");
1238 }
1239}