righvalor 0.1.0

RighValor: AI Infrastructure and Applications Framework for the Far Edge
use std::{
    collections::{HashMap, HashSet},
    time::Instant,
};

use crate::{
    config::ValorRuntimeTuning, service::ValorServiceId, types::ValorID,
    worker::ValorWorkerCapacity,
};

// Missed-heartbeat threshold before marking Unreachable; configured via apply_tuning at startup.
// Use an AtomicU32 to avoid unsafe statics.
static MISSED_BEFORE_UNREACHABLE: std::sync::atomic::AtomicU32 =
    std::sync::atomic::AtomicU32::new(2);

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValorWorkerState {
    Joining,
    Ready,
    Unreachable,
}

/// Record of a worker maintained by master
#[derive(Debug, Clone)]
pub struct ValorWorkerRecord {
    pub capacity: ValorWorkerCapacity,
    pub last_seen: Instant,
    pub services: Vec<ValorServiceId>,
    pub state: ValorWorkerState,
    pub last_heartbeat_ms: Option<u64>,
    pub last_seq_no: Option<u64>,
    pub last_ts_mono_ms: Option<u64>,
    pub missed_heartbeat_intervals: u32,
    /// Threshold: mark worker as Unreachable after this many seconds without updates
    pub unreachable_after_secs: u64,
    /// Threshold: evict worker from registry after this many seconds without updates
    pub eviction_after_secs: u64,
}

impl ValorWorkerRecord {
    pub fn new(now: Instant) -> Self {
        Self {
            capacity: ValorWorkerCapacity::default(),
            last_seen: now,
            services: Vec::new(),
            state: ValorWorkerState::Joining,
            last_heartbeat_ms: None,
            last_seq_no: None,
            last_ts_mono_ms: None,
            missed_heartbeat_intervals: 0,
            unreachable_after_secs: 0,
            eviction_after_secs: 0,
        }
    }
}

/// ValorWorkerRegistry maintains worker liveness, capacity and services
#[derive(Debug, Default)]
pub struct ValorWorkerRegistry {
    workers: HashMap<ValorID, ValorWorkerRecord>,
    /// Default threshold for marking Unreachable (per worker overrideable)
    default_unreachable_after_secs: u64,
    /// Default threshold for eviction/removal (per worker overrideable)
    default_eviction_after_secs: u64,
}

impl ValorWorkerRegistry {
    pub fn new(unreachable_after_secs: u64, eviction_after_secs: u64) -> Self {
        Self {
            workers: HashMap::new(),
            default_unreachable_after_secs: unreachable_after_secs,
            default_eviction_after_secs: eviction_after_secs,
        }
    }

    /// Configure runtime tuning values (e.g., missed_before_unreachable)
    pub fn apply_tuning(&mut self, tuning: &ValorRuntimeTuning) {
        // Note: missed_before_unreachable is global; others are per-worker defaults set at on_join
        MISSED_BEFORE_UNREACHABLE.store(
            tuning.missed_before_unreachable,
            std::sync::atomic::Ordering::Relaxed,
        );
        self.default_unreachable_after_secs = tuning.unreachable_after_secs;
        self.default_eviction_after_secs = tuning.eviction_after_secs;
    }

    pub fn on_join(&mut self, id: ValorID) {
        let now = Instant::now();
        self.workers
            .entry(id)
            .and_modify(|rec| {
                rec.last_seen = now;
                rec.state = ValorWorkerState::Joining;
                rec.unreachable_after_secs = self.default_unreachable_after_secs;
                rec.eviction_after_secs = self.default_eviction_after_secs;
                rec.services.clear();
            })
            .or_insert_with(|| {
                let mut rec = ValorWorkerRecord::new(now);
                rec.unreachable_after_secs = self.default_unreachable_after_secs;
                rec.eviction_after_secs = self.default_eviction_after_secs;
                rec
            });
    }

    pub fn on_leave(&mut self, id: &ValorID) {
        self.workers.remove(id);
    }

    /// Process heartbeat with timestamp and sequence number.
    /// Returns (became_ready, missing_gap)
    pub fn on_heartbeat(&mut self, id: &ValorID, ts_mono_ms: u64, seq_no: u64) -> (bool, u64) {
        let now = Instant::now();
        let entry = self.workers.entry(id.clone()).or_insert_with(|| {
            let mut rec = ValorWorkerRecord::new(now);
            rec.unreachable_after_secs = self.default_unreachable_after_secs;
            rec.eviction_after_secs = self.default_eviction_after_secs;
            rec
        });

        let prev_state = entry.state;
        let gap = if let Some(prev_seq) = entry.last_seq_no {
            if seq_no > prev_seq {
                seq_no.saturating_sub(prev_seq).saturating_sub(1)
            } else {
                // Non-monotonic (restart or wrap). Reset gap counting
                0
            }
        } else {
            0
        };

        entry.missed_heartbeat_intervals = entry
            .missed_heartbeat_intervals
            .saturating_add(u32::try_from(gap).unwrap_or(u32::MAX));
        entry.last_seq_no = Some(seq_no);
        entry.last_ts_mono_ms = Some(ts_mono_ms);
        entry.last_seen = now;
        let became_ready_now = prev_state != ValorWorkerState::Ready;
        entry.state = ValorWorkerState::Ready;
        entry.last_heartbeat_ms = Some(ts_mono_ms);

        // No extra debug logs to reduce noise
        (became_ready_now, gap)
    }

    /// Update capacity and return previous capacity if any
    pub fn update_capacity(
        &mut self,
        id: &ValorID,
        capacity: ValorWorkerCapacity,
    ) -> Option<ValorWorkerCapacity> {
        let now = Instant::now();
        let entry = self
            .workers
            .entry(id.clone())
            .and_modify(|rec| {
                rec.last_seen = now;
                rec.state = ValorWorkerState::Ready;
            })
            .or_insert_with(|| {
                let mut rec = ValorWorkerRecord::new(now);
                rec.unreachable_after_secs = self.default_unreachable_after_secs;
                rec.eviction_after_secs = self.default_eviction_after_secs;
                rec
            });
        let prev = entry.capacity;
        entry.capacity = capacity;
        Some(prev)
    }

    pub fn update_services(&mut self, id: &ValorID, services: Vec<ValorServiceId>) {
        let now = Instant::now();
        self.workers
            .entry(id.clone())
            .and_modify(|rec| {
                rec.last_seen = now;
                rec.services = services.clone();
                if rec.state == ValorWorkerState::Joining {
                    rec.state = ValorWorkerState::Ready;
                }
            })
            .or_insert_with(|| ValorWorkerRecord {
                capacity: ValorWorkerCapacity::default(),
                last_seen: now,
                services,
                state: ValorWorkerState::Ready,
                last_heartbeat_ms: None,
                last_seq_no: None,
                last_ts_mono_ms: None,
                missed_heartbeat_intervals: 0,
                unreachable_after_secs: self.default_unreachable_after_secs,
                eviction_after_secs: self.default_eviction_after_secs,
            });
    }

    /// Cleanup with details: returns removed worker ids
    pub fn cleanup_stale_detailed(&mut self) -> (usize, usize, Vec<ValorID>) {
        let now = Instant::now();
        let before = self.workers.len();
        let to_remove: Vec<ValorID> = self
            .workers
            .iter()
            .filter_map(|(id, rec)| {
                let elapsed = now.duration_since(rec.last_seen).as_secs();
                if elapsed > rec.eviction_after_secs {
                    Some(id.clone())
                } else {
                    None
                }
            })
            .collect();

        for id in &to_remove {
            self.workers.remove(id);
        }

        let threshold = MISSED_BEFORE_UNREACHABLE.load(std::sync::atomic::Ordering::Relaxed);
        for rec in self.workers.values_mut() {
            let elapsed = now.duration_since(rec.last_seen).as_secs();
            if elapsed > rec.unreachable_after_secs || rec.missed_heartbeat_intervals >= threshold {
                rec.state = ValorWorkerState::Unreachable;
            }
        }

        let after = self.workers.len();
        (before, after, to_remove)
    }

    pub fn len(&self) -> usize {
        self.workers.len()
    }

    pub fn get_capacity(&self, id: &ValorID) -> Option<ValorWorkerCapacity> {
        self.workers.get(id).map(|r| r.capacity)
    }

    pub fn get_state(&self, id: &ValorID) -> Option<ValorWorkerState> {
        self.workers.get(id).map(|r| r.state)
    }

    pub fn get_missed_heartbeat_intervals(&self, id: &ValorID) -> Option<u32> {
        self.workers.get(id).map(|r| r.missed_heartbeat_intervals)
    }

    pub fn state_counts(&self) -> (usize, usize, usize, usize) {
        let mut joining = 0usize;
        let mut ready = 0usize;
        let mut unreachable = 0usize;
        for rec in self.workers.values() {
            match rec.state {
                ValorWorkerState::Joining => joining += 1,
                ValorWorkerState::Ready => ready += 1,
                ValorWorkerState::Unreachable => unreachable += 1,
            }
        }
        let total = self.workers.len();
        (joining, ready, unreachable, total)
    }

    pub fn iter(&self) -> impl Iterator<Item = (&ValorID, &ValorWorkerRecord)> {
        self.workers.iter()
    }

    /// Union of services advertised by READY workers
    pub fn union_services(&self) -> HashSet<ValorServiceId> {
        let mut set: HashSet<ValorServiceId> = HashSet::new();
        for rec in self.workers.values() {
            if rec.state == ValorWorkerState::Ready {
                for id in &rec.services {
                    set.insert(id.clone());
                }
            }
        }
        set
    }
}