use super::{CleanupConfig, DisconnectTask, worker::CleanupWorker};
use crate::adapter::connection_manager::ConnectionManager;
use crate::app::manager::AppManager;
use crate::webhook::integration::WebhookIntegration;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::{Mutex, mpsc};
use tracing::{error, info, warn};
pub struct MultiWorkerCleanupSystem {
senders: Vec<mpsc::Sender<DisconnectTask>>,
worker_handles: Vec<tokio::task::JoinHandle<()>>,
round_robin_counter: Arc<AtomicUsize>,
config: CleanupConfig,
}
impl MultiWorkerCleanupSystem {
pub fn new(
connection_manager: Arc<Mutex<dyn ConnectionManager + Send + Sync>>,
app_manager: Arc<dyn AppManager + Send + Sync>,
webhook_integration: Option<Arc<WebhookIntegration>>,
config: CleanupConfig,
) -> Self {
let num_workers = config.worker_threads.resolve();
info!(
"Initializing multi-worker cleanup system with {} workers",
num_workers
);
let mut senders = Vec::with_capacity(num_workers);
let mut worker_handles = Vec::with_capacity(num_workers);
for worker_id in 0..num_workers {
let (sender, receiver) = mpsc::channel(config.queue_buffer_size);
let worker_config = config.clone();
let worker = CleanupWorker::new(
connection_manager.clone(),
app_manager.clone(),
webhook_integration.clone(),
worker_config.clone(),
);
let handle = tokio::spawn(async move {
info!("Cleanup worker {} starting", worker_id);
worker.run(receiver).await;
info!("Cleanup worker {} stopped", worker_id);
});
senders.push(sender);
worker_handles.push(handle);
}
info!(
"Multi-worker cleanup system initialized with {} workers, batch_size={} per worker",
num_workers, config.batch_size
);
Self {
senders,
worker_handles,
round_robin_counter: Arc::new(AtomicUsize::new(0)),
config,
}
}
pub fn get_sender(&self) -> MultiWorkerSender {
MultiWorkerSender {
senders: self.senders.clone(),
round_robin_counter: self.round_robin_counter.clone(),
}
}
pub fn get_direct_sender(
&self,
) -> Option<tokio::sync::mpsc::Sender<crate::cleanup::DisconnectTask>> {
if self.senders.len() == 1 {
self.senders.first().cloned()
} else {
None
}
}
pub fn get_worker_handles(self) -> Vec<tokio::task::JoinHandle<()>> {
self.worker_handles
}
pub fn get_config(&self) -> &CleanupConfig {
&self.config
}
pub async fn shutdown(self) -> Result<(), String> {
info!("Shutting down multi-worker cleanup system...");
drop(self.senders);
let mut shutdown_errors = Vec::new();
for (i, handle) in self.worker_handles.into_iter().enumerate() {
if let Err(e) = handle.await {
let error_msg = format!("Worker {} shutdown error: {}", i, e);
error!("{}", error_msg);
shutdown_errors.push(error_msg);
}
}
if shutdown_errors.is_empty() {
info!("Multi-worker cleanup system shutdown complete");
Ok(())
} else {
Err(format!(
"Shutdown completed with {} errors: {:?}",
shutdown_errors.len(),
shutdown_errors
))
}
}
}
pub struct MultiWorkerSender {
senders: Vec<mpsc::Sender<DisconnectTask>>,
round_robin_counter: Arc<AtomicUsize>,
}
impl MultiWorkerSender {
pub fn send(
&self,
task: DisconnectTask,
) -> Result<(), Box<mpsc::error::SendError<DisconnectTask>>> {
if self.senders.is_empty() {
return Err(Box::new(mpsc::error::SendError(task)));
}
let worker_index = self
.round_robin_counter
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some((current + 1) % self.senders.len())
})
.unwrap_or(0);
match self.senders[worker_index].try_send(task) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(task)) => {
warn!("Worker {} queue is full, trying next worker", worker_index);
for offset in 1..self.senders.len() {
let next_index = (worker_index + offset) % self.senders.len();
match self.senders[next_index].try_send(task.clone()) {
Ok(()) => return Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => continue,
Err(mpsc::error::TrySendError::Closed(_)) => continue,
}
}
error!("All cleanup worker queues are full or closed");
Err(Box::new(mpsc::error::SendError(task)))
}
Err(mpsc::error::TrySendError::Closed(task)) => {
warn!("Worker {} channel closed, trying next worker", worker_index);
for offset in 1..self.senders.len() {
let next_index = (worker_index + offset) % self.senders.len();
match self.senders[next_index].try_send(task.clone()) {
Ok(()) => return Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => continue,
Err(mpsc::error::TrySendError::Closed(_)) => continue,
}
}
error!("All cleanup workers are unavailable");
Err(Box::new(mpsc::error::SendError(task)))
}
}
}
pub fn send_with_fallback(
&self,
task: DisconnectTask,
) -> Result<(), Box<mpsc::error::SendError<DisconnectTask>>> {
self.send(task)
}
pub fn is_available(&self) -> bool {
self.senders.iter().any(|sender| !sender.is_closed())
}
pub fn worker_count(&self) -> usize {
self.senders.len()
}
pub fn get_worker_stats(&self) -> WorkerStats {
let total = self.senders.len();
let available = self
.senders
.iter()
.filter(|sender| !sender.is_closed())
.count();
let closed = total - available;
WorkerStats {
total_workers: total,
available_workers: available,
closed_workers: closed,
}
}
}
#[derive(Debug, Clone)]
pub struct WorkerStats {
pub total_workers: usize,
pub available_workers: usize,
pub closed_workers: usize,
}
impl Clone for MultiWorkerSender {
fn clone(&self) -> Self {
Self {
senders: self.senders.clone(),
round_robin_counter: self.round_robin_counter.clone(),
}
}
}
impl MultiWorkerSender {
#[cfg(test)]
pub fn new_for_test(senders: Vec<mpsc::Sender<DisconnectTask>>) -> Self {
Self {
senders,
round_robin_counter: Arc::new(AtomicUsize::new(0)),
}
}
}