use crate::util::now_secs;
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use tokio::sync::{Mutex, broadcast};
use volli_core::WorkerEntry;
pub type WorkerTable = Arc<Mutex<HashMap<String, WorkerEntry>>>;
const DEFAULT_DISCONNECTED_TTL_SECS: u64 = 300;
pub async fn update_worker(
table: &WorkerTable,
tx: &broadcast::Sender<u64>,
worker: WorkerEntry,
version: &Arc<AtomicU64>,
) {
merge_workers(table, tx, vec![worker], version).await;
}
pub async fn merge_workers(
table: &WorkerTable,
tx: &broadcast::Sender<u64>,
workers: Vec<WorkerEntry>,
version: &Arc<AtomicU64>,
) {
let mut map = table.lock().await;
let mut changed = false;
for mut w in workers {
if w.connected_since.is_none()
&& let Some(existing) = map.get(&w.worker_id)
{
w.connected_since = existing.connected_since;
}
if w.last_seen.is_none() {
w.last_seen = Some(now_secs());
}
match map.get(&w.worker_id) {
Some(existing) if existing.manager_id == w.manager_id => {
map.insert(w.worker_id.clone(), w);
}
_ => {
map.insert(w.worker_id.clone(), w);
changed = true;
}
}
}
drop(map);
if changed {
let new_ver = version.fetch_add(1, Ordering::SeqCst) + 1;
let _ = tx.send(new_ver);
}
}
pub async fn list_workers(table: &WorkerTable) -> Vec<WorkerEntry> {
let mut map = table.lock().await;
let now = now_secs();
map.retain(|_, worker| {
if let Some(disconnected_at) = worker.disconnected_at {
now.saturating_sub(disconnected_at) <= DEFAULT_DISCONNECTED_TTL_SECS
} else {
true
}
});
map.values().cloned().collect()
}
pub async fn remove_worker(table: &WorkerTable, worker_id: &str) -> bool {
let mut map = table.lock().await;
map.remove(worker_id).is_some()
}
pub async fn mark_worker_disconnected(table: &WorkerTable, worker_id: &str) -> bool {
let mut map = table.lock().await;
if let Some(worker) = map.get_mut(worker_id) {
worker.disconnected_at = Some(now_secs());
return true;
}
false
}