pub mod multi_worker;
pub mod worker;
pub use sockudo_adapter::cleanup::{
AuthInfo, CleanupChannelFlavor, CleanupReceiverHandle, CleanupSender, CleanupSenderHandle,
ConnectionCleanupInfo, DisconnectTask, MultiWorkerSender, WorkerStats,
};
pub use sockudo_core::options::{CleanupConfig, WorkerThreadsConfig};
pub use tokio_util::sync::CancellationToken;
pub trait WorkerThreadsResolve {
fn resolve(&self) -> usize;
}
impl WorkerThreadsResolve for WorkerThreadsConfig {
fn resolve(&self) -> usize {
match self {
WorkerThreadsConfig::Auto => {
let cpu_count = num_cpus::get();
let auto_threads = (cpu_count / 4).clamp(1, 4);
tracing::info!(
"Auto-detected {} CPUs, using {} cleanup worker threads",
cpu_count,
auto_threads
);
auto_threads
}
WorkerThreadsConfig::Fixed(n) => *n,
}
}
}
pub struct CleanupSystem {
cancel_token: CancellationToken,
worker_handles: Vec<tokio::task::JoinHandle<()>>,
}
impl CleanupSystem {
pub fn new() -> Self {
Self {
cancel_token: CancellationToken::new(),
worker_handles: Vec::new(),
}
}
pub fn cancel_token(&self) -> CancellationToken {
self.cancel_token.clone()
}
pub fn add_worker_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
self.worker_handles.push(handle);
}
pub async fn shutdown(self) {
tracing::info!("Initiating cleanup system shutdown...");
self.cancel_token.cancel();
for (i, handle) in self.worker_handles.into_iter().enumerate() {
match handle.await {
Ok(()) => {
tracing::debug!("Cleanup worker {} shut down successfully", i);
}
Err(e) => {
tracing::warn!("Cleanup worker {} task failed: {}", i, e);
}
}
}
tracing::info!("Cleanup system shutdown complete");
}
pub fn is_shutting_down(&self) -> bool {
self.cancel_token.is_cancelled()
}
}
impl Default for CleanupSystem {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct WebhookEvent {
pub event_type: String,
pub app_id: String,
pub channel: String,
pub user_id: Option<String>,
pub data: sonic_rs::Value,
}