#![allow(unused_crate_dependencies)]
use std::collections::HashMap;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use tokio::sync::{Mutex, broadcast};
use volli_core::WorkerEntry;
use volli_manager::workers::{
WorkerTable, list_workers, merge_workers, remove_worker, update_worker,
};
#[tokio::test]
async fn worker_updates_merge() {
let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(1);
let version = Arc::new(AtomicU64::new(1));
update_worker(
&table,
&tx,
WorkerEntry {
worker_id: "w1".into(),
manager_id: "m1".into(),
worker_name: None,
last_seen: Some(1),
connected_since: Some(1),
disconnected_at: None,
},
&version,
)
.await;
assert_eq!(version.load(Ordering::SeqCst), 2);
assert_eq!(list_workers(&table).await.len(), 1);
merge_workers(
&table,
&tx,
vec![WorkerEntry {
worker_id: "w2".into(),
manager_id: "m2".into(),
worker_name: None,
last_seen: Some(2),
connected_since: Some(2),
disconnected_at: None,
}],
&version,
)
.await;
let workers = list_workers(&table).await;
assert_eq!(workers.len(), 2);
}
#[tokio::test]
async fn worker_remove_clears_table_entry() {
let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(1);
let version = Arc::new(AtomicU64::new(1));
update_worker(
&table,
&tx,
WorkerEntry {
worker_id: "w1".into(),
manager_id: "m1".into(),
worker_name: None,
last_seen: Some(1),
connected_since: Some(1),
disconnected_at: None,
},
&version,
)
.await;
assert_eq!(list_workers(&table).await.len(), 1);
let removed = remove_worker(&table, "w1").await;
assert!(removed);
assert_eq!(list_workers(&table).await.len(), 0);
}
#[tokio::test]
async fn worker_merge_preserves_connected_since() {
let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(1);
let version = Arc::new(AtomicU64::new(1));
update_worker(
&table,
&tx,
WorkerEntry {
worker_id: "w1".into(),
manager_id: "m1".into(),
worker_name: None,
last_seen: Some(1),
connected_since: Some(10),
disconnected_at: None,
},
&version,
)
.await;
merge_workers(
&table,
&tx,
vec![WorkerEntry {
worker_id: "w1".into(),
manager_id: "m1".into(),
worker_name: None,
last_seen: Some(2),
connected_since: None,
disconnected_at: None,
}],
&version,
)
.await;
let workers = list_workers(&table).await;
assert_eq!(workers.len(), 1);
assert_eq!(workers[0].connected_since, Some(10));
}
#[tokio::test]
async fn worker_list_prunes_disconnected_after_ttl() {
let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
let (tx, _) = broadcast::channel(1);
let version = Arc::new(AtomicU64::new(1));
let stale_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.saturating_sub(600);
update_worker(
&table,
&tx,
WorkerEntry {
worker_id: "w1".into(),
manager_id: "m1".into(),
worker_name: None,
last_seen: Some(1),
connected_since: Some(1),
disconnected_at: Some(stale_at),
},
&version,
)
.await;
let workers = list_workers(&table).await;
assert!(workers.is_empty());
}