use std::future::Future;
use super::{LoopHandle, LoopRegistry, ShutdownReceiver, ShutdownWatch};
pub fn spawn_loop<F, Fut>(
registry: &LoopRegistry,
shutdown: &ShutdownWatch,
name: &'static str,
body: F,
) where
F: FnOnce(ShutdownReceiver) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let rx = shutdown.subscribe();
let handle = tokio::spawn(async move { body(rx).await });
if let Err(e) = registry.register(name, LoopHandle::Async(handle)) {
tracing::warn!(
error = %e,
"spawn_loop after registry close — task will run to completion \
but shutdown_all will not wait for it"
);
}
}
pub fn spawn_blocking_loop<F>(
registry: &LoopRegistry,
shutdown: &ShutdownWatch,
name: &'static str,
body: F,
) where
F: FnOnce(ShutdownReceiver) + Send + 'static,
{
let rx = shutdown.subscribe();
let handle = tokio::task::spawn_blocking(move || body(rx));
if let Err(e) = registry.register(name, LoopHandle::Blocking(handle)) {
tracing::warn!(
error = %e,
"spawn_blocking_loop after registry close — thread will run to \
completion but shutdown_all will not wait for it"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
#[tokio::test]
async fn spawn_loop_registers_and_runs() {
let registry = LoopRegistry::new();
let watch = Arc::new(ShutdownWatch::new());
let counter = Arc::new(AtomicUsize::new(0));
let counter_c = Arc::clone(&counter);
spawn_loop(®istry, &watch, "test_loop", |mut rx| async move {
loop {
tokio::select! {
_ = rx.wait_cancelled() => break,
_ = tokio::time::sleep(Duration::from_millis(5)) => {
counter_c.fetch_add(1, Ordering::Relaxed);
}
}
}
});
tokio::time::sleep(Duration::from_millis(30)).await;
assert_eq!(registry.live_count(), 1);
assert!(counter.load(Ordering::Relaxed) >= 1);
watch.signal();
let report = registry.shutdown_all(Duration::from_millis(100)).await;
assert!(report.is_clean(), "{report}");
assert_eq!(report.exited_clean, vec!["test_loop"]);
}
#[tokio::test]
async fn spawn_loop_body_receives_signal() {
let registry = LoopRegistry::new();
let watch = Arc::new(ShutdownWatch::new());
spawn_loop(®istry, &watch, "wait_only", |mut rx| async move {
rx.wait_cancelled().await;
});
watch.signal();
let report = registry.shutdown_all(Duration::from_millis(50)).await;
assert!(report.is_clean());
}
#[tokio::test]
async fn spawn_blocking_loop_runs_on_blocking_thread() {
let registry = LoopRegistry::new();
let watch = Arc::new(ShutdownWatch::new());
let done = Arc::new(AtomicUsize::new(0));
let done_c = Arc::clone(&done);
spawn_blocking_loop(®istry, &watch, "blocker", move |rx| {
while !rx.is_cancelled() {
std::thread::sleep(Duration::from_millis(2));
}
done_c.store(1, Ordering::SeqCst);
});
tokio::time::sleep(Duration::from_millis(10)).await;
watch.signal();
let report = registry.shutdown_all(Duration::from_millis(200)).await;
assert!(report.is_clean(), "{report}");
assert_eq!(done.load(Ordering::SeqCst), 1);
}
}