use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::oneshot::{Receiver, Sender};
use tracing::debug;
pub const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub struct ShutdownSignal {
pub shutdown_type: ShutdownType,
sender: Option<Sender<()>>,
}
impl Drop for ShutdownSignal {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(()); }
}
}
#[derive(Debug, Clone, Copy)]
pub enum ShutdownType {
Graceful,
Force,
}
#[derive(Clone)]
pub struct ShutdownController {
subscribers: Arc<Mutex<Vec<Sender<ShutdownSignal>>>>,
}
impl std::fmt::Debug for ShutdownController {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ShutdownController")
.field("subscriber_count", &self.subscribers.lock().unwrap().len())
.finish()
}
}
impl Default for ShutdownController {
fn default() -> Self {
Self::new()
}
}
impl ShutdownController {
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn subscribe(&mut self) -> ShutdownReceiver {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.subscribers.lock().unwrap().push(sender);
ShutdownReceiver { receiver }
}
pub async fn signal_shutdown(self, shutdown_type: ShutdownType) {
debug!("Signaling shutdown to all subscribers");
let subscribers = {
let mut guard = self.subscribers.lock().unwrap();
std::mem::take(&mut *guard)
};
let mut receivers = Vec::new();
for sender in subscribers {
let (responder, receiver) = tokio::sync::oneshot::channel();
receivers.push(receiver);
match sender.send(ShutdownSignal {
shutdown_type,
sender: Some(responder),
}) {
Ok(_) => {
debug!("Shutdown signal sent");
}
Err(e) => {
debug!("Failed to send shutdown signal: {:?}", e);
}
}
}
for receiver in receivers {
if let Err(e) = receiver.await {
debug!("Failed to receive shutdown signal: {:?}", e);
}
}
}
}
pub struct ShutdownReceiver {
pub receiver: Receiver<ShutdownSignal>,
}
impl ShutdownReceiver {
pub async fn wait_for_shutdown(self) -> ShutdownSignal {
debug!("Waiting for shutdown signal");
match self.receiver.await {
Ok(signal) => {
debug!("Received shutdown signal");
signal
}
Err(e) => {
debug!("Shutdown channel error: {}, using default signal", e);
ShutdownSignal {
sender: None,
shutdown_type: ShutdownType::Graceful,
}
}
}
}
}