righvalor 0.1.0

RighValor: AI Infrastructure and Applications Framework for the Far Edge
use std::collections::HashMap;

use tokio::sync::{oneshot, RwLock};

use crate::common::task::{ValorMasterTask, ValorTaskId, ValorTaskStatus};

/// Minimal result aggregator for task terminal notifications
///
/// Subscribers register a oneshot for a given task_id; when the task reaches
/// a terminal state, all waiters are notified with the final task snapshot.
#[derive(Default)]
pub struct ValorResultAggregator {
    waiters: RwLock<HashMap<ValorTaskId, Vec<oneshot::Sender<ValorMasterTask>>>>,
}

impl ValorResultAggregator {
    pub fn new() -> Self {
        Self {
            waiters: RwLock::new(HashMap::new()),
        }
    }

    /// Register a waiter for a task's terminal result
    pub async fn register_waiter(
        &self,
        task_id: &ValorTaskId,
    ) -> oneshot::Receiver<ValorMasterTask> {
        let (tx, rx) = oneshot::channel();
        let mut map = self.waiters.write().await;
        map.entry(task_id.clone()).or_insert_with(Vec::new).push(tx);
        rx
    }

    /// Notify subscribers if task is terminal; otherwise no-op
    pub async fn notify_if_terminal(&self, task: &ValorMasterTask) {
        if !is_terminal(task.status) {
            return;
        }
        let mut map = self.waiters.write().await;
        if let Some(waiters) = map.remove(&task.task_id) {
            for tx in waiters {
                let _ = tx.send(task.clone());
            }
        }
    }
}

fn is_terminal(status: ValorTaskStatus) -> bool {
    matches!(
        status,
        ValorTaskStatus::Completed | ValorTaskStatus::Failed | ValorTaskStatus::Cancelled
    )
}