volli-manager 0.1.12

Manager for volli
Documentation
#![allow(unused_crate_dependencies)]
use std::collections::HashMap;
use std::sync::{
    Arc,
    atomic::{AtomicU64, Ordering},
};

use tokio::sync::{Mutex, broadcast};
use volli_core::WorkerEntry;
use volli_manager::workers::{
    WorkerTable, list_workers, merge_workers, remove_worker, update_worker,
};

#[tokio::test]
async fn worker_updates_merge() {
    let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
    let (tx, _) = broadcast::channel(1);
    let version = Arc::new(AtomicU64::new(1));

    update_worker(
        &table,
        &tx,
        WorkerEntry {
            worker_id: "w1".into(),
            manager_id: "m1".into(),
            worker_name: None,
            last_seen: Some(1),
            connected_since: Some(1),
            disconnected_at: None,
        },
        &version,
    )
    .await;
    assert_eq!(version.load(Ordering::SeqCst), 2);
    assert_eq!(list_workers(&table).await.len(), 1);

    merge_workers(
        &table,
        &tx,
        vec![WorkerEntry {
            worker_id: "w2".into(),
            manager_id: "m2".into(),
            worker_name: None,
            last_seen: Some(2),
            connected_since: Some(2),
            disconnected_at: None,
        }],
        &version,
    )
    .await;
    let workers = list_workers(&table).await;
    assert_eq!(workers.len(), 2);
}

#[tokio::test]
async fn worker_remove_clears_table_entry() {
    let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
    let (tx, _) = broadcast::channel(1);
    let version = Arc::new(AtomicU64::new(1));

    update_worker(
        &table,
        &tx,
        WorkerEntry {
            worker_id: "w1".into(),
            manager_id: "m1".into(),
            worker_name: None,
            last_seen: Some(1),
            connected_since: Some(1),
            disconnected_at: None,
        },
        &version,
    )
    .await;
    assert_eq!(list_workers(&table).await.len(), 1);

    let removed = remove_worker(&table, "w1").await;
    assert!(removed);
    assert_eq!(list_workers(&table).await.len(), 0);
}

#[tokio::test]
async fn worker_merge_preserves_connected_since() {
    let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
    let (tx, _) = broadcast::channel(1);
    let version = Arc::new(AtomicU64::new(1));

    update_worker(
        &table,
        &tx,
        WorkerEntry {
            worker_id: "w1".into(),
            manager_id: "m1".into(),
            worker_name: None,
            last_seen: Some(1),
            connected_since: Some(10),
            disconnected_at: None,
        },
        &version,
    )
    .await;

    merge_workers(
        &table,
        &tx,
        vec![WorkerEntry {
            worker_id: "w1".into(),
            manager_id: "m1".into(),
            worker_name: None,
            last_seen: Some(2),
            connected_since: None,
            disconnected_at: None,
        }],
        &version,
    )
    .await;

    let workers = list_workers(&table).await;
    assert_eq!(workers.len(), 1);
    assert_eq!(workers[0].connected_since, Some(10));
}

#[tokio::test]
async fn worker_list_prunes_disconnected_after_ttl() {
    let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
    let (tx, _) = broadcast::channel(1);
    let version = Arc::new(AtomicU64::new(1));
    let stale_at = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs()
        .saturating_sub(600);

    update_worker(
        &table,
        &tx,
        WorkerEntry {
            worker_id: "w1".into(),
            manager_id: "m1".into(),
            worker_name: None,
            last_seen: Some(1),
            connected_since: Some(1),
            disconnected_at: Some(stale_at),
        },
        &version,
    )
    .await;

    let workers = list_workers(&table).await;
    assert!(workers.is_empty());
}