amagi 0.1.6

Rust SDK, CLI, and Web API service skeleton for multi-platform social web adapters.
Documentation
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, RwLock};

use serde::Serialize;
use tokio::sync::{mpsc, oneshot};

use crate::node::NodeRole;
use crate::node::protocol::NodeEnvelope;
use crate::node::session::NodeSessionState;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub(crate) enum NodeAvailability {
    Ready,
    Draining,
    Isolated,
}

/// Snapshot of one online downstream node.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct NodeRecord {
    pub session_id: String,
    pub node_id: String,
    pub role: NodeRole,
    pub version: Option<String>,
    pub session_state: NodeSessionState,
    pub capabilities: Vec<String>,
    pub platforms: Vec<String>,
    pub availability: NodeAvailability,
    pub max_concurrent_tasks: Option<u32>,
    pub active_tasks: u32,
    pub connected_at_ms: u64,
    pub last_seen_ms: u64,
}

#[derive(Clone)]
pub(crate) struct NodeRegistry {
    inner: Arc<RwLock<HashMap<String, RegisteredNode>>>,
    pending: Arc<RwLock<HashMap<String, PendingNodeRequest>>>,
}

#[derive(Clone)]
struct RegisteredNode {
    record: NodeRecord,
    #[allow(dead_code)]
    sender: mpsc::UnboundedSender<NodeEnvelope>,
}

struct PendingNodeRequest {
    tx: oneshot::Sender<NodeEnvelope>,
    target: NodeRecord,
    sender: mpsc::UnboundedSender<NodeEnvelope>,
}

impl Default for NodeRegistry {
    fn default() -> Self {
        Self {
            inner: Arc::new(RwLock::new(HashMap::new())),
            pending: Arc::new(RwLock::new(HashMap::new())),
        }
    }
}

impl fmt::Debug for NodeRegistry {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("NodeRegistry")
            .field("online_nodes", &self.count())
            .finish()
    }
}

impl NodeRegistry {
    pub(crate) fn upsert(&self, record: NodeRecord, sender: mpsc::UnboundedSender<NodeEnvelope>) {
        let mut guard = self
            .inner
            .write()
            .expect("node registry should be writable");
        guard.insert(record.node_id.clone(), RegisteredNode { record, sender });
    }

    pub(crate) fn remove_if_session(&self, node_id: &str, session_id: &str) -> bool {
        let mut guard = self
            .inner
            .write()
            .expect("node registry should be writable");
        if guard
            .get(node_id)
            .is_some_and(|node| node.record.session_id == session_id)
        {
            guard.remove(node_id);
            return true;
        }
        false
    }

    pub(crate) fn update_last_seen(&self, node_id: &str, timestamp_ms: u64) {
        let mut guard = self
            .inner
            .write()
            .expect("node registry should be writable");
        if let Some(node) = guard.get_mut(node_id) {
            node.record.last_seen_ms = timestamp_ms;
            node.record.session_state = NodeSessionState::Ready;
        }
    }

    pub(crate) fn update_advertisement(
        &self,
        node_id: &str,
        capabilities: Vec<String>,
        platforms: Vec<String>,
    ) -> bool {
        let mut guard = self
            .inner
            .write()
            .expect("node registry should be writable");
        let Some(node) = guard.get_mut(node_id) else {
            return false;
        };
        node.record.capabilities = capabilities;
        node.record.platforms = platforms;
        node.record.last_seen_ms = crate::node::protocol::now_ms();
        node.record.session_state = NodeSessionState::Ready;
        true
    }

    pub(crate) fn set_availability(&self, node_id: &str, availability: NodeAvailability) -> bool {
        let mut guard = self
            .inner
            .write()
            .expect("node registry should be writable");
        let Some(node) = guard.get_mut(node_id) else {
            return false;
        };
        node.record.availability = availability;
        node.record.last_seen_ms = crate::node::protocol::now_ms();
        node.record.session_state = NodeSessionState::Ready;
        true
    }

    pub(crate) fn set_active_tasks(&self, node_id: &str, active_tasks: u32) -> bool {
        let mut guard = self
            .inner
            .write()
            .expect("node registry should be writable");
        let Some(node) = guard.get_mut(node_id) else {
            return false;
        };
        node.record.active_tasks = active_tasks;
        node.record.last_seen_ms = crate::node::protocol::now_ms();
        node.record.session_state = NodeSessionState::Ready;
        true
    }

    pub(crate) fn set_capacity(&self, node_id: &str, max_concurrent_tasks: Option<u32>) -> bool {
        let mut guard = self
            .inner
            .write()
            .expect("node registry should be writable");
        let Some(node) = guard.get_mut(node_id) else {
            return false;
        };
        node.record.max_concurrent_tasks = max_concurrent_tasks;
        node.record.last_seen_ms = crate::node::protocol::now_ms();
        node.record.session_state = NodeSessionState::Ready;
        true
    }

    pub(crate) fn set_session_state(&self, node_id: &str, session_state: NodeSessionState) -> bool {
        let mut guard = self
            .inner
            .write()
            .expect("node registry should be writable");
        let Some(node) = guard.get_mut(node_id) else {
            return false;
        };
        node.record.session_state = session_state;
        node.record.last_seen_ms = crate::node::protocol::now_ms();
        true
    }

    #[allow(dead_code)]
    pub(crate) fn sender_for(&self, node_id: &str) -> Option<mpsc::UnboundedSender<NodeEnvelope>> {
        let guard = self.inner.read().expect("node registry should be readable");
        guard.get(node_id).map(|node| node.sender.clone())
    }

    #[allow(dead_code)]
    pub(crate) fn records(&self, heartbeat_ms: Option<u64>) -> Vec<NodeRecord> {
        let guard = self.inner.read().expect("node registry should be readable");
        let mut records = guard
            .values()
            .map(|node| effective_record(&node.record, heartbeat_ms))
            .collect::<Vec<_>>();
        records.sort_by(|left, right| left.node_id.cmp(&right.node_id));
        records
    }

    pub(crate) fn downstream_senders(
        &self,
    ) -> Vec<(NodeRecord, mpsc::UnboundedSender<NodeEnvelope>)> {
        let guard = self.inner.read().expect("node registry should be readable");
        guard
            .values()
            .map(|node| (node.record.clone(), node.sender.clone()))
            .collect()
    }

    pub(crate) fn count(&self) -> usize {
        let guard = self.inner.read().expect("node registry should be readable");
        guard.len()
    }

    pub(crate) fn sender_for_platform(
        &self,
        platform: &str,
        heartbeat_ms: Option<u64>,
    ) -> Option<(NodeRecord, mpsc::UnboundedSender<NodeEnvelope>)> {
        let guard = self.inner.read().expect("node registry should be readable");
        guard
            .values()
            .filter_map(|node| {
                let record = effective_record(&node.record, heartbeat_ms);
                (matches!(record.availability, NodeAvailability::Ready)
                    && matches!(record.session_state, NodeSessionState::Ready)
                    && record.platforms.iter().any(|item| item == platform))
                .then(|| (record, node.sender.clone()))
            })
            .filter(|node| {
                node.0
                    .max_concurrent_tasks
                    .is_none_or(|limit| node.0.active_tasks < limit)
            })
            .min_by(|left, right| {
                left.0
                    .active_tasks
                    .cmp(&right.0.active_tasks)
                    .then_with(|| left.0.node_id.cmp(&right.0.node_id))
            })
    }

    pub(crate) fn sender_for_node_platform(
        &self,
        node_id: &str,
        platform: &str,
        heartbeat_ms: Option<u64>,
    ) -> Option<(NodeRecord, mpsc::UnboundedSender<NodeEnvelope>)> {
        let guard = self.inner.read().expect("node registry should be readable");
        let node = guard.get(node_id)?;
        let record = effective_record(&node.record, heartbeat_ms);
        if !matches!(record.availability, NodeAvailability::Ready)
            || !matches!(record.session_state, NodeSessionState::Ready)
        {
            return None;
        }
        if record
            .max_concurrent_tasks
            .is_some_and(|limit| record.active_tasks >= limit)
        {
            return None;
        }
        record
            .platforms
            .iter()
            .any(|item| item == platform)
            .then(|| (record, node.sender.clone()))
    }

    pub(crate) fn sender_for_node(
        &self,
        node_id: &str,
    ) -> Option<(NodeRecord, mpsc::UnboundedSender<NodeEnvelope>)> {
        let guard = self.inner.read().expect("node registry should be readable");
        guard
            .get(node_id)
            .map(|node| (node.record.clone(), node.sender.clone()))
    }

    pub(crate) fn record_for_node(
        &self,
        node_id: &str,
        heartbeat_ms: Option<u64>,
    ) -> Option<NodeRecord> {
        let guard = self.inner.read().expect("node registry should be readable");
        guard
            .get(node_id)
            .map(|node| effective_record(&node.record, heartbeat_ms))
    }

    pub(crate) fn register_pending(
        &self,
        request_id: impl Into<String>,
        target: NodeRecord,
        sender: mpsc::UnboundedSender<NodeEnvelope>,
    ) -> oneshot::Receiver<NodeEnvelope> {
        let (tx, rx) = oneshot::channel();
        let mut guard = self
            .pending
            .write()
            .expect("pending node requests should be writable");
        guard.insert(request_id.into(), PendingNodeRequest { tx, target, sender });
        rx
    }

    pub(crate) fn cancel_pending(&self, request_id: &str) {
        let mut guard = self
            .pending
            .write()
            .expect("pending node requests should be writable");
        guard.remove(request_id);
    }

    pub(crate) fn fulfill_pending(&self, envelope: NodeEnvelope) -> bool {
        let pending = {
            let mut guard = self
                .pending
                .write()
                .expect("pending node requests should be writable");
            guard.remove(&envelope.id)
        };

        pending.is_some_and(|pending| pending.tx.send(envelope).is_ok())
    }

    pub(crate) fn pending_target_for(
        &self,
        request_id: &str,
    ) -> Option<(NodeRecord, mpsc::UnboundedSender<NodeEnvelope>)> {
        let guard = self
            .pending
            .read()
            .expect("pending node requests should be readable");
        guard
            .get(request_id)
            .map(|pending| (pending.target.clone(), pending.sender.clone()))
    }
}

fn effective_record(record: &NodeRecord, heartbeat_ms: Option<u64>) -> NodeRecord {
    let mut effective = record.clone();
    if effective.session_state == NodeSessionState::Ready
        && heartbeat_ms.is_some_and(|heartbeat_ms| {
            crate::node::protocol::now_ms().saturating_sub(effective.last_seen_ms)
                > heartbeat_ms.saturating_mul(2)
        })
    {
        effective.session_state = NodeSessionState::Degraded;
    }
    effective
}