use std::{
fmt::Debug,
marker::PhantomData,
sync::{Arc, Condvar, Mutex},
thread::{self, JoinHandle},
};
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum DaemonState<QuitReason> {
Holding,
Quitting(Option<QuitReason>),
Quit(Option<QuitReason>),
}
#[derive(Debug)]
pub struct ResourceDaemon<T, QuitReason: Clone + Send + 'static> {
phantom: PhantomData<T>,
state: Arc<(Mutex<DaemonState<QuitReason>>, Condvar)>,
thread_handle: Option<JoinHandle<()>>,
}
unsafe impl<T, QuitReason: Clone + Send + 'static> Send for ResourceDaemon<T, QuitReason> {}
unsafe impl<T, QuitReason: Clone + Send + 'static> Sync for ResourceDaemon<T, QuitReason> {}
#[derive(Debug, Clone)]
pub struct QuitSignal<QuitReason: Clone + Send + 'static>(
Arc<(Mutex<DaemonState<QuitReason>>, Condvar)>,
);
impl<QuitReason: Clone + Send + 'static> QuitSignal<QuitReason> {
pub fn dispatch(&self, reason: QuitReason) {
wake_to_quit(&self.0, Some(reason));
}
}
fn wake_to_quit<QuitReason: Clone + Send + 'static>(
state: &Arc<(Mutex<DaemonState<QuitReason>>, Condvar)>,
reason: Option<QuitReason>,
) {
let mut guard = state.0.lock().unwrap();
if matches!(&*guard, DaemonState::Holding) {
*guard = DaemonState::Quitting(reason);
state.1.notify_one();
}
drop(guard);
}
impl<T, QuitReason: Clone + Send + 'static> ResourceDaemon<T, QuitReason> {
#[must_use]
pub fn new<
Provider: FnOnce(QuitSignal<QuitReason>) -> Result<T, QuitReason> + Send + 'static,
>(
resource_provider: Provider,
) -> Self {
let state = Arc::new((Mutex::new(DaemonState::Holding), Condvar::default()));
Self {
thread_handle: Some(thread::spawn({
let state = state.clone();
move || {
let resource = resource_provider({
let state = state.clone();
QuitSignal(state)
});
match resource {
Err(err) => {
*state.0.lock().unwrap() = DaemonState::Quit(Some(err));
}
Ok(resource) => {
let s = state
.1
.wait_while(state.0.lock().unwrap(), |q| {
matches!(q, DaemonState::Holding)
})
.unwrap();
drop(s);
log::debug!("ResourceDaemon: dropping resource in daemon thread");
drop(resource);
log::debug!("ResourceDaemon: resource dropped, updating state to Quit");
let mut s = state.0.lock().unwrap();
match *s {
DaemonState::Holding => {
*s = DaemonState::Quit(None);
}
DaemonState::Quitting(ref mut reason) => {
*s = DaemonState::Quit(reason.take());
}
DaemonState::Quit(_) => (),
}
log::debug!("ResourceDaemon: daemon thread exiting");
}
}
}
})),
phantom: PhantomData,
state,
}
}
fn wake_to_quit_and_join(&mut self, reason: Option<QuitReason>) {
log::debug!("ResourceDaemon: wake_to_quit_and_join called");
wake_to_quit(&self.state, reason);
if let Some(join_handle) = self.thread_handle.take() {
log::debug!("ResourceDaemon: joining daemon thread...");
let join_result = join_handle.join();
log::debug!("ResourceDaemon: daemon thread join completed: {join_result:?}");
} else {
log::debug!("ResourceDaemon: no thread handle to join");
}
}
pub fn quit(&mut self, reason: QuitReason) {
self.wake_to_quit_and_join(Some(reason));
}
#[must_use]
pub fn state(&self) -> DaemonState<QuitReason> {
self.state.0.lock().unwrap().clone()
}
}
impl<T, QuitReason: Clone + Send + 'static> Drop for ResourceDaemon<T, QuitReason> {
fn drop(&mut self) {
log::debug!("ResourceDaemon: Drop called, shutting down daemon");
self.wake_to_quit_and_join(None);
log::debug!("ResourceDaemon: Drop completed");
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
fn wait_for_state<T, QuitReason: Clone + Send + Debug + 'static>(
daemon: &ResourceDaemon<T, QuitReason>,
predicate: impl Fn(&DaemonState<QuitReason>) -> bool,
timeout: Duration,
) -> Result<DaemonState<QuitReason>, String> {
let start = Instant::now();
let poll_interval = Duration::from_millis(1);
loop {
let state = daemon.state();
if predicate(&state) {
return Ok(state);
}
if start.elapsed() > timeout {
return Err(format!(
"Timeout after {timeout:?} waiting for state condition, current state: {state:?}",
));
}
std::thread::sleep(poll_interval);
}
}
fn wait_for_condition(
condition: impl Fn() -> bool,
timeout: Duration,
description: &str,
) -> Result<(), String> {
let start = Instant::now();
let poll_interval = Duration::from_millis(1);
loop {
if condition() {
return Ok(());
}
if start.elapsed() > timeout {
return Err(format!(
"Timeout after {timeout:?} waiting for: {description}",
));
}
std::thread::sleep(poll_interval);
}
}
#[test_log::test]
fn test_daemon_state_debug() {
let state: DaemonState<String> = DaemonState::Holding;
assert_eq!(format!("{state:?}"), "Holding");
let state: DaemonState<String> = DaemonState::Quitting(Some("reason".to_string()));
assert_eq!(format!("{state:?}"), "Quitting(Some(\"reason\"))");
let state: DaemonState<String> = DaemonState::Quit(None);
assert_eq!(format!("{state:?}"), "Quit(None)");
}
#[test_log::test]
#[allow(clippy::redundant_clone)]
fn test_daemon_state_clone() {
let state: DaemonState<String> = DaemonState::Holding;
let cloned = state.clone();
assert_eq!(cloned, DaemonState::Holding);
let state = DaemonState::Quitting(Some("test".to_string()));
let cloned = state.clone();
assert!(matches!(cloned, DaemonState::Quitting(Some(ref s)) if s == "test"));
}
#[test_log::test]
fn test_daemon_state_equality() {
let state1: DaemonState<String> = DaemonState::Holding;
let state2: DaemonState<String> = DaemonState::Holding;
assert_eq!(state1, state2);
let state1 = DaemonState::Quitting(Some("reason".to_string()));
let state2 = DaemonState::Quitting(Some("reason".to_string()));
assert_eq!(state1, state2);
let state1: DaemonState<String> = DaemonState::Quit(None);
let state2: DaemonState<String> = DaemonState::Quit(None);
assert_eq!(state1, state2);
}
#[test_log::test]
fn test_daemon_state_ordering() {
let holding: DaemonState<String> = DaemonState::Holding;
let quitting: DaemonState<String> = DaemonState::Quitting(None);
let quit: DaemonState<String> = DaemonState::Quit(None);
assert!(holding < quitting);
assert!(quitting < quit);
assert!(holding < quit);
}
#[test_log::test]
fn test_resource_daemon_new_success() {
let daemon = ResourceDaemon::<i32, String>::new(|_signal| Ok(42));
assert_eq!(daemon.state(), DaemonState::Holding);
}
#[test_log::test]
fn test_resource_daemon_new_with_error() {
let daemon = ResourceDaemon::<i32, String>::new(|_signal| Err("error".to_string()));
let state = wait_for_state(
&daemon,
|s| matches!(s, DaemonState::Quit(_)),
Duration::from_secs(1),
)
.expect("Daemon should reach Quit state");
assert!(matches!(state, DaemonState::Quit(Some(ref s)) if s == "error"));
}
#[test_log::test]
fn test_resource_daemon_quit() {
let counter = Arc::new(Mutex::new(0));
let counter_clone = counter.clone();
let mut daemon = ResourceDaemon::<i32, String>::new(move |_signal| {
*counter_clone.lock().unwrap() = 1;
Ok(42)
});
let counter_check = counter.clone();
wait_for_condition(
|| *counter_check.lock().unwrap() == 1,
Duration::from_secs(1),
"resource creation",
)
.expect("Resource should be created");
assert_eq!(*counter.lock().unwrap(), 1);
daemon.quit("test reason".to_string());
let state = daemon.state();
assert!(matches!(state, DaemonState::Quit(Some(ref s)) if s == "test reason"));
}
#[test_log::test]
#[allow(clippy::redundant_clone)]
#[allow(clippy::items_after_statements)]
fn test_resource_daemon_drop() {
let dropped = Arc::new(Mutex::new(false));
let dropped_clone = dropped.clone();
struct DropTracker {
dropped: Arc<Mutex<bool>>,
}
impl Drop for DropTracker {
fn drop(&mut self) {
*self.dropped.lock().unwrap() = true;
}
}
{
let _daemon = ResourceDaemon::<DropTracker, String>::new(move |_signal| {
Ok(DropTracker {
dropped: dropped_clone.clone(),
})
});
assert!(!*dropped.lock().unwrap());
}
let dropped_check = dropped.clone();
wait_for_condition(
|| *dropped_check.lock().unwrap(),
Duration::from_secs(1),
"resource to be dropped",
)
.expect("Resource should be dropped");
assert!(*dropped.lock().unwrap());
}
#[test_log::test]
fn test_quit_signal_dispatch() {
let daemon = ResourceDaemon::<i32, String>::new(|signal| {
std::thread::spawn(move || {
signal.dispatch("internal quit".to_string());
});
Ok(42)
});
let state = wait_for_state(
&daemon,
|s| matches!(s, DaemonState::Quitting(_) | DaemonState::Quit(_)),
Duration::from_secs(1),
)
.expect("Daemon should reach Quitting or Quit state");
assert!(
matches!(state, DaemonState::Quitting(_) | DaemonState::Quit(_)),
"Expected 'Quitting' or 'Quit' state, but got {state:?}"
);
}
#[test_log::test]
fn test_quit_signal_debug() {
let daemon = ResourceDaemon::<i32, String>::new(|signal| {
let debug_str = format!("{signal:?}");
assert!(debug_str.contains("QuitSignal"));
Ok(42)
});
assert_eq!(daemon.state(), DaemonState::Holding);
}
#[test_log::test]
#[allow(clippy::redundant_clone)]
fn test_quit_signal_clone() {
let daemon = ResourceDaemon::<i32, String>::new(|signal| {
let signal_clone = signal.clone();
std::thread::spawn(move || {
signal_clone.dispatch("cloned signal".to_string());
});
Ok(42)
});
let state = wait_for_state(
&daemon,
|s| matches!(s, DaemonState::Quitting(_) | DaemonState::Quit(_)),
Duration::from_secs(1),
)
.expect("Daemon should reach Quitting or Quit state");
assert!(
matches!(state, DaemonState::Quitting(_) | DaemonState::Quit(_)),
"Expected 'Quitting' or 'Quit' state, but got {state:?}"
);
}
#[test_log::test]
#[allow(clippy::redundant_clone)]
fn test_resource_daemon_state_transitions() {
let state_log = Arc::new(Mutex::new(Vec::new()));
let state_log_clone = state_log.clone();
let mut daemon = ResourceDaemon::<i32, String>::new(move |_signal| {
state_log_clone.lock().unwrap().push("created".to_string());
Ok(42)
});
let state_log_check = state_log.clone();
wait_for_condition(
|| {
state_log_check
.lock()
.unwrap()
.contains(&"created".to_string())
},
Duration::from_secs(1),
"resource creation",
)
.expect("Resource should be created");
assert_eq!(daemon.state(), DaemonState::Holding);
daemon.quit("test".to_string());
let state = daemon.state();
assert!(matches!(state, DaemonState::Quit(_)));
}
#[test_log::test]
fn test_multiple_quit_calls() {
let mut daemon = ResourceDaemon::<i32, String>::new(|_signal| Ok(42));
assert_eq!(daemon.state(), DaemonState::Holding);
daemon.quit("first".to_string());
let state1 = daemon.state();
daemon.quit("second".to_string());
let state2 = daemon.state();
assert_eq!(state1, state2);
assert!(matches!(state1, DaemonState::Quit(Some(ref s)) if s == "first"));
}
#[test_log::test]
fn test_resource_daemon_send_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<ResourceDaemon<i32, String>>();
assert_sync::<ResourceDaemon<i32, String>>();
}
#[test_log::test]
fn test_resource_daemon_debug() {
let daemon = ResourceDaemon::<i32, String>::new(|_signal| Ok(42));
let debug_str = format!("{daemon:?}");
assert!(debug_str.contains("ResourceDaemon"));
}
}