volli-manager 0.1.12

Manager for volli
Documentation
use crate::util::now_secs;
use std::{
    collections::HashMap,
    sync::{
        Arc,
        atomic::{AtomicU64, Ordering},
    },
};
use tokio::sync::{Mutex, broadcast};
use volli_core::WorkerEntry;

pub type WorkerTable = Arc<Mutex<HashMap<String, WorkerEntry>>>;
const DEFAULT_DISCONNECTED_TTL_SECS: u64 = 300;

pub async fn update_worker(
    table: &WorkerTable,
    tx: &broadcast::Sender<u64>,
    worker: WorkerEntry,
    version: &Arc<AtomicU64>,
) {
    merge_workers(table, tx, vec![worker], version).await;
}

pub async fn merge_workers(
    table: &WorkerTable,
    tx: &broadcast::Sender<u64>,
    workers: Vec<WorkerEntry>,
    version: &Arc<AtomicU64>,
) {
    let mut map = table.lock().await;
    let mut changed = false;
    for mut w in workers {
        if w.connected_since.is_none()
            && let Some(existing) = map.get(&w.worker_id)
        {
            w.connected_since = existing.connected_since;
        }
        if w.last_seen.is_none() {
            w.last_seen = Some(now_secs());
        }
        match map.get(&w.worker_id) {
            // If the worker is known and only last_seen changed, do NOT bump version.
            Some(existing) if existing.manager_id == w.manager_id => {
                // Update liveness timestamp without triggering a version bump signal; this keeps
                // gossip stable and avoids continuous version churn across the mesh.
                map.insert(w.worker_id.clone(), w);
            }
            // New worker or moved between managers: this is a material change; bump version.
            _ => {
                map.insert(w.worker_id.clone(), w);
                changed = true;
            }
        }
    }
    drop(map);
    if changed {
        let new_ver = version.fetch_add(1, Ordering::SeqCst) + 1;
        let _ = tx.send(new_ver);
    }
}

pub async fn list_workers(table: &WorkerTable) -> Vec<WorkerEntry> {
    let mut map = table.lock().await;
    let now = now_secs();
    map.retain(|_, worker| {
        if let Some(disconnected_at) = worker.disconnected_at {
            now.saturating_sub(disconnected_at) <= DEFAULT_DISCONNECTED_TTL_SECS
        } else {
            true
        }
    });
    map.values().cloned().collect()
}

pub async fn remove_worker(table: &WorkerTable, worker_id: &str) -> bool {
    let mut map = table.lock().await;
    map.remove(worker_id).is_some()
}

pub async fn mark_worker_disconnected(table: &WorkerTable, worker_id: &str) -> bool {
    let mut map = table.lock().await;
    if let Some(worker) = map.get_mut(worker_id) {
        worker.disconnected_at = Some(now_secs());
        return true;
    }
    false
}