use std::{
collections::{BinaryHeap, HashMap},
sync::{Arc, Mutex},
time::Instant,
};
use tokio::{
sync::{Semaphore, mpsc::Receiver},
time::sleep,
};
use super::super::{
executor::Executor,
types::{CommandQueueConfig, CommandQueueResult, CommandStatus, QueueMessage, QueueProcessor},
};
impl QueueProcessor {
pub(crate) fn new(
config: CommandQueueConfig,
receiver: Receiver<QueueMessage>,
executor: Arc<dyn Executor>,
statuses: Arc<Mutex<HashMap<String, CommandStatus>>>,
results: Arc<Mutex<HashMap<String, CommandQueueResult>>>,
) -> Self {
Self {
concurrency_semaphore: Arc::new(Semaphore::new(config.max_concurrent_commands)),
config,
receiver,
executor,
queue: BinaryHeap::new(),
statuses,
results,
last_execution: None,
running: true,
batch_mode: false,
}
}
pub(crate) async fn process_queue(mut self) {
while self.running {
let mut collected_commands = false;
let collect_deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_millis(self.config.collection_window_ms);
while tokio::time::Instant::now() < collect_deadline {
match self.receiver.try_recv() {
Ok(QueueMessage::Execute(boxed_cmd)) => {
self.queue.push(*boxed_cmd);
collected_commands = true;
}
Ok(QueueMessage::BatchStart) => {
self.batch_mode = true;
}
Ok(QueueMessage::BatchEnd) => {
self.batch_mode = false;
collected_commands = true; }
Ok(QueueMessage::Shutdown) => {
self.running = false;
break;
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
break;
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
self.running = false;
break;
}
}
tokio::time::sleep(tokio::time::Duration::from_micros(
self.config.collection_sleep_us,
))
.await;
}
if !self.batch_mode && (collected_commands || !self.queue.is_empty()) {
self.process_next_command().await;
} else if self.running {
tokio::time::sleep(tokio::time::Duration::from_millis(self.config.idle_sleep_ms))
.await;
}
}
while !self.queue.is_empty() {
self.process_next_command().await;
}
log::info!("Command queue processor has shut down");
}
#[allow(clippy::manual_let_else)]
async fn process_next_command(&mut self) {
if let Some(rate_limit) = self.config.rate_limit
&& let Some(last_time) = self.last_execution
{
let elapsed = last_time.elapsed();
if elapsed < rate_limit {
sleep(rate_limit - elapsed).await;
}
}
let Some(cmd) = self.queue.pop() else {
return;
};
let id = cmd.id.clone();
match self.statuses.lock() {
Ok(mut statuses) => {
statuses.insert(id.clone(), CommandStatus::Running);
}
Err(e) => {
log::error!("Failed to acquire status lock for command {id}: {e:?}");
return;
}
}
let executor = Arc::clone(&self.executor);
let statuses = Arc::clone(&self.statuses);
let results = Arc::clone(&self.results);
let semaphore = Arc::clone(&self.concurrency_semaphore);
tokio::spawn(async move {
let permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(e) => {
log::error!("Failed to acquire semaphore permit for command {id}: {e:?}");
match statuses.lock() {
Ok(mut statuses) => {
statuses.insert(id.clone(), CommandStatus::Failed);
}
Err(lock_err) => {
log::error!(
"Failed to acquire status lock while handling semaphore error for command {id}: {lock_err:?}"
);
}
}
match results.lock() {
Ok(mut results_map) => {
results_map.insert(
id.clone(),
CommandQueueResult::failure(
id,
format!("Failed to acquire execution permit: {e:?}"),
),
);
}
Err(lock_err) => {
log::error!(
"Failed to acquire results lock while handling semaphore error for command {id}: {lock_err:?}"
);
}
}
return;
}
};
let result = executor.execute(cmd.command).await;
let (status, queue_result) = match result {
Ok(output) => {
let status = CommandStatus::Completed;
let result = CommandQueueResult::success(id.clone(), output);
(status, result)
}
Err(err) => {
let status = CommandStatus::Failed;
let result = CommandQueueResult::failure(id.clone(), err.to_string());
(status, result)
}
};
match statuses.lock() {
Ok(mut statuses) => {
statuses.insert(id.clone(), status);
}
Err(e) => {
log::error!(
"Failed to acquire status lock for final status update of command {id}: {e:?}"
);
}
}
match results.lock() {
Ok(mut results_map) => {
results_map.insert(id.clone(), queue_result);
}
Err(e) => {
log::error!(
"Failed to acquire results lock for final result storage of command {id}: {e:?}"
);
}
}
drop(permit);
});
self.last_execution = Some(Instant::now());
}
}