mecha10-cli 0.1.47

Mecha10 CLI tool
Documentation
//! Redis pub/sub operations for node discovery

use anyhow::Result;
use futures_util::StreamExt;
use redis;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};

use super::{DiscoveryConfig, NodeInfo};

/// Discover active nodes on the network via Redis pub/sub
///
/// # Arguments
///
/// * `config` - Discovery configuration
/// * `discovered_nodes` - Mutable reference to discovered nodes map
// Note: get_async_connection() is deprecated but required for pub/sub
// The replacement get_multiplexed_async_connection() doesn't support into_pubsub()
#[allow(deprecated)]
pub async fn discover(
    config: &DiscoveryConfig,
    discovered_nodes: &mut HashMap<String, NodeInfo>,
) -> Result<Vec<NodeInfo>> {
    // Connect to Redis for discovery
    let client = redis::Client::open(config.redis_url.as_str())?;
    let conn = client.get_async_connection().await?;

    // Subscribe to discovery topic
    let mut pubsub = conn.into_pubsub();
    pubsub.subscribe(&config.discovery_topic).await?;

    // Publish discovery request
    let client2 = redis::Client::open(config.redis_url.as_str())?;
    let mut pub_conn = client2.get_multiplexed_async_connection().await?;

    let discovery_request = serde_json::json!({
        "type": "discovery_request",
        "timestamp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
    });

    redis::cmd("PUBLISH")
        .arg(&config.discovery_topic)
        .arg(discovery_request.to_string())
        .query_async::<()>(&mut pub_conn)
        .await?;

    // Wait for responses (with timeout)
    let timeout = tokio::time::sleep(Duration::from_secs(2));
    tokio::pin!(timeout);

    let mut stream = pubsub.on_message();

    loop {
        tokio::select! {
            _ = &mut timeout => break,
            msg = stream.next() => {
                if let Some(msg) = msg {
                    if let Ok(payload) = msg.get_payload::<String>() {
                        if let Ok(data) = serde_json::from_str::<serde_json::Value>(&payload) {
                            if data["type"] == "discovery_response" {
                                let node = super::parser::parse_node_info(&data);
                                if let Some(node) = node {
                                    discovered_nodes.insert(node.node_id.clone(), node);
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    Ok(discovered_nodes.values().cloned().collect())
}

/// Monitor for new nodes via Redis pub/sub
///
/// # Arguments
///
/// * `config` - Discovery configuration
/// * `discovered_nodes` - Mutable reference to discovered nodes map
/// * `callback` - Function to call when a new node is discovered
// Note: get_async_connection() is deprecated but required for pub/sub
#[allow(deprecated)]
pub async fn monitor<F>(
    config: &DiscoveryConfig,
    discovered_nodes: &mut HashMap<String, NodeInfo>,
    mut callback: F,
) -> Result<()>
where
    F: FnMut(&NodeInfo),
{
    let client = redis::Client::open(config.redis_url.as_str())?;
    let conn = client.get_async_connection().await?;
    let mut pubsub = conn.into_pubsub();

    pubsub.subscribe(&config.discovery_topic).await?;

    let mut stream = pubsub.on_message();
    loop {
        if let Some(msg) = stream.next().await {
            if let Ok(payload) = msg.get_payload::<String>() {
                if let Ok(data) = serde_json::from_str::<serde_json::Value>(&payload) {
                    if data["type"] == "discovery_response" || data["type"] == "node_heartbeat" {
                        if let Some(node) = super::parser::parse_node_info(&data) {
                            let is_new = !discovered_nodes.contains_key(&node.node_id);

                            discovered_nodes.insert(node.node_id.clone(), node.clone());

                            if is_new {
                                callback(&node);
                            }
                        }
                    }
                }
            }
        }
    }
}

/// Broadcast presence announcement via Redis
///
/// # Arguments
///
/// * `config` - Discovery configuration
/// * `node_info` - Information about this node
pub async fn announce(config: &DiscoveryConfig, node_info: &NodeInfo) -> Result<()> {
    let client = redis::Client::open(config.redis_url.as_str())?;
    let mut conn = client.get_multiplexed_async_connection().await?;

    let announcement = serde_json::json!({
        "type": "discovery_response",
        "node_id": node_info.node_id,
        "node_type": node_info.node_type,
        "host": node_info.host,
        "port": node_info.port,
        "timestamp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(),
        "metadata": node_info.metadata
    });

    redis::cmd("PUBLISH")
        .arg(&config.discovery_topic)
        .arg(announcement.to_string())
        .query_async::<()>(&mut conn)
        .await?;

    Ok(())
}

/// Send heartbeat for a node via Redis
///
/// # Arguments
///
/// * `config` - Discovery configuration
/// * `node_id` - Node identifier
pub async fn heartbeat(config: &DiscoveryConfig, node_id: &str) -> Result<()> {
    let client = redis::Client::open(config.redis_url.as_str())?;
    let mut conn = client.get_multiplexed_async_connection().await?;

    let heartbeat = serde_json::json!({
        "type": "node_heartbeat",
        "node_id": node_id,
        "timestamp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
    });

    redis::cmd("PUBLISH")
        .arg(&config.discovery_topic)
        .arg(heartbeat.to_string())
        .query_async::<()>(&mut conn)
        .await?;

    Ok(())
}