use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
pub struct DynamicQueue {
inner: Arc<Mutex<VecDeque<String>>>,
removed: Arc<Mutex<HashSet<String>>>,
stopped: Arc<Mutex<HashSet<String>>>,
kill_tokens: Arc<Mutex<HashMap<String, CancellationToken>>>,
notify: Arc<Notify>,
}
impl DynamicQueue {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::new())),
removed: Arc::new(Mutex::new(HashSet::new())),
stopped: Arc::new(Mutex::new(HashSet::new())),
kill_tokens: Arc::new(Mutex::new(HashMap::new())),
notify: Arc::new(Notify::new()),
}
}
pub async fn push(&self, id: String) -> bool {
{
let mut removed = self.removed.lock().await;
removed.remove(&id);
}
let mut queue = self.inner.lock().await;
if queue.contains(&id) {
return false;
}
queue.push_back(id);
drop(queue);
self.notify.notify_one();
true
}
pub async fn pop(&self) -> Option<String> {
let mut queue = self.inner.lock().await;
queue.pop_front()
}
pub async fn remove(&self, id: &str) -> bool {
let mut queue = self.inner.lock().await;
if let Some(pos) = queue.iter().position(|i| i == id) {
queue.remove(pos);
true
} else {
false
}
}
pub async fn mark_removed(&self, id: String) -> bool {
let mut removed = self.removed.lock().await;
removed.insert(id)
}
pub async fn drain_removed(&self) -> Vec<String> {
let mut removed = self.removed.lock().await;
removed.drain().collect()
}
#[cfg(test)]
pub async fn is_empty(&self) -> bool {
let queue = self.inner.lock().await;
queue.is_empty()
}
#[cfg(test)]
pub async fn contains(&self, id: &str) -> bool {
let queue = self.inner.lock().await;
queue.iter().any(|i| i == id)
}
#[cfg(test)]
pub async fn len(&self) -> usize {
let queue = self.inner.lock().await;
queue.len()
}
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.notify.notified()
}
pub fn notify_scheduler(&self) {
self.notify.notify_one();
}
pub async fn mark_stopped(&self, id: String) -> bool {
let mut stopped = self.stopped.lock().await;
stopped.insert(id)
}
pub async fn is_stopped(&self, id: &str) -> bool {
let stopped = self.stopped.lock().await;
stopped.contains(id)
}
pub async fn clear_stopped(&self, id: &str) -> bool {
let mut stopped = self.stopped.lock().await;
stopped.remove(id)
}
pub fn try_is_stopped(&self, id: &str) -> bool {
if let Ok(stopped) = self.stopped.try_lock() {
stopped.contains(id)
} else {
false
}
}
pub async fn register_kill_token(&self, id: String, token: CancellationToken) {
let mut tokens = self.kill_tokens.lock().await;
tokens.insert(id, token);
}
pub async fn unregister_kill_token(&self, id: &str) {
let mut tokens = self.kill_tokens.lock().await;
tokens.remove(id);
}
pub async fn force_kill(&self, id: &str) -> bool {
self.mark_stopped(id.to_string()).await;
let tokens = self.kill_tokens.lock().await;
if let Some(token) = tokens.get(id) {
token.cancel();
true
} else {
false
}
}
}
impl Default for DynamicQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_dynamic_queue_push_pop() {
let queue = DynamicQueue::new();
assert!(queue.is_empty().await);
assert!(queue.push("a".to_string()).await);
assert!(queue.push("b".to_string()).await);
assert_eq!(queue.len().await, 2);
assert_eq!(queue.pop().await, Some("a".to_string()));
assert_eq!(queue.pop().await, Some("b".to_string()));
assert_eq!(queue.pop().await, None);
}
#[tokio::test]
async fn test_dynamic_queue_dedup() {
let queue = DynamicQueue::new();
assert!(queue.push("a".to_string()).await);
assert!(!queue.push("a".to_string()).await);
assert_eq!(queue.len().await, 1);
}
#[tokio::test]
async fn test_dynamic_queue_contains() {
let queue = DynamicQueue::new();
queue.push("a".to_string()).await;
assert!(queue.contains("a").await);
assert!(!queue.contains("b").await);
}
#[tokio::test]
async fn test_dynamic_queue_remove() {
let queue = DynamicQueue::new();
queue.push("a".to_string()).await;
queue.push("b".to_string()).await;
queue.push("c".to_string()).await;
assert_eq!(queue.len().await, 3);
assert!(queue.remove("b").await);
assert_eq!(queue.len().await, 2);
assert!(!queue.contains("b").await);
assert_eq!(queue.pop().await, Some("a".to_string()));
assert_eq!(queue.pop().await, Some("c".to_string()));
}
#[tokio::test]
async fn test_dynamic_queue_remove_nonexistent() {
let queue = DynamicQueue::new();
queue.push("a".to_string()).await;
assert!(!queue.remove("nonexistent").await);
assert_eq!(queue.len().await, 1);
}
#[tokio::test]
async fn test_dynamic_queue_remove_from_empty() {
let queue = DynamicQueue::new();
assert!(!queue.remove("a").await);
}
#[tokio::test]
async fn test_dynamic_queue_remove_multiple() {
let queue = DynamicQueue::new();
queue.push("a".to_string()).await;
queue.push("b".to_string()).await;
queue.push("c".to_string()).await;
assert!(queue.remove("a").await);
assert!(queue.remove("c").await);
assert_eq!(queue.len().await, 1);
assert_eq!(queue.pop().await, Some("b".to_string()));
}
#[tokio::test]
async fn test_dynamic_queue_remove_then_push_same() {
let queue = DynamicQueue::new();
queue.push("a".to_string()).await;
assert!(queue.remove("a").await);
assert!(queue.push("a".to_string()).await);
assert_eq!(queue.len().await, 1);
}
#[tokio::test]
async fn test_mark_removed_and_drain() {
let queue = DynamicQueue::new();
assert!(queue.mark_removed("a".to_string()).await);
assert!(!queue.mark_removed("a".to_string()).await);
assert!(queue.mark_removed("b".to_string()).await);
let mut removed = queue.drain_removed().await;
removed.sort();
assert_eq!(removed, vec!["a".to_string(), "b".to_string()]);
assert!(queue.drain_removed().await.is_empty());
}
#[tokio::test]
async fn test_push_clears_removed_marker() {
let queue = DynamicQueue::new();
assert!(queue.mark_removed("a".to_string()).await);
assert!(queue.push("a".to_string()).await);
let removed = queue.drain_removed().await;
assert!(removed.is_empty());
}
#[tokio::test]
async fn test_force_kill_marks_stopped_and_cancels_token() {
let queue = DynamicQueue::new();
let token = CancellationToken::new();
queue
.register_kill_token("a".to_string(), token.clone())
.await;
assert!(!token.is_cancelled());
let had_token = queue.force_kill("a").await;
assert!(had_token);
assert!(token.is_cancelled());
assert!(queue.is_stopped("a").await);
}
#[tokio::test]
async fn test_force_kill_without_token_still_marks_stopped() {
let queue = DynamicQueue::new();
let had_token = queue.force_kill("b").await;
assert!(!had_token);
assert!(queue.is_stopped("b").await);
}
#[tokio::test]
async fn test_unregister_kill_token() {
let queue = DynamicQueue::new();
let token = CancellationToken::new();
queue
.register_kill_token("a".to_string(), token.clone())
.await;
queue.unregister_kill_token("a").await;
let had_token = queue.force_kill("a").await;
assert!(!had_token);
assert!(!token.is_cancelled());
}
}