sigterm 0.3.10

Signal-aware async control and cancellation primitives for Tokio.
Documentation
/* src/broadcast.rs */

//! Broadcast shutdown signals to multiple tasks.
//!
//! This module allows multiple tasks to subscribe to a single shutdown signal.
//! It is built on top of `tokio::sync::Notify`.

use std::sync::Arc;
use tokio::sync::Notify;

/// A mechanism to broadcast a shutdown signal to multiple subscribers.
#[derive(Clone, Default)]
pub struct Broadcast {
	notify: Arc<Notify>,
}

impl Broadcast {
	/// Creates a new broadcast shutdown mechanism.
	pub fn new() -> Self {
		#[cfg(feature = "tracing")]
		tracing::trace!("creating new broadcast shutdown");

		Self::default()
	}

	/// Creates a subscription that waits for the shutdown signal.
	pub fn subscribe(&self) -> Subscriber {
		#[cfg(feature = "tracing")]
		tracing::trace!("creating new broadcast subscriber");

		Subscriber {
			notify: self.notify.clone(),
		}
	}

	/// Broadcasts the shutdown signal to all subscribers.
	pub fn shutdown(&self) {
		#[cfg(feature = "tracing")]
		tracing::info!("broadcasting shutdown to all subscribers");

		self.notify.notify_waiters();
	}
}

/// A subscriber that waits for the broadcast shutdown signal.
#[derive(Clone)]
pub struct Subscriber {
	notify: Arc<Notify>,
}

impl Subscriber {
	/// Waits for the shutdown signal.
	pub async fn recv(&self) {
		#[cfg(feature = "tracing")]
		tracing::debug!("subscriber waiting for broadcast shutdown");

		self.notify.notified().await;

		#[cfg(feature = "tracing")]
		tracing::debug!("subscriber received broadcast shutdown");
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use std::time::Duration;
	use tokio::time::timeout;

	#[tokio::test]
	async fn test_broadcast_shutdown() {
		let broadcast = Broadcast::new();
		let sub1 = broadcast.subscribe();
		let sub2 = broadcast.subscribe();

		let task1 = tokio::spawn(async move {
			sub1.recv().await;
		});

		let task2 = tokio::spawn(async move {
			sub2.recv().await;
		});

		// Ensure tasks have started and are waiting
		tokio::time::sleep(Duration::from_millis(10)).await;

		broadcast.shutdown();
		assert!(timeout(Duration::from_millis(100), task1).await.is_ok());
		assert!(timeout(Duration::from_millis(100), task2).await.is_ok());
	}

	#[tokio::test]
	async fn test_late_subscriber() {
		// Broadcast uses Notify, which does NOT retain the signal for future subscribers
		// unless they are waiting *before* notification.
		// This test documents/verifies that behavior.
		let broadcast = Broadcast::new();

		broadcast.shutdown(); // Signal sent now

		let sub = broadcast.subscribe();
		let task = tokio::spawn(async move {
			sub.recv().await;
		});

		// The subscriber should NOT complete because it missed the signal
		assert!(timeout(Duration::from_millis(50), task).await.is_err());
	}
}