use anyhow::Result;
use futures_util::StreamExt;
use redis;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use super::{DiscoveryConfig, NodeInfo};
#[allow(deprecated)]
pub async fn discover(
config: &DiscoveryConfig,
discovered_nodes: &mut HashMap<String, NodeInfo>,
) -> Result<Vec<NodeInfo>> {
let client = redis::Client::open(config.redis_url.as_str())?;
let conn = client.get_async_connection().await?;
let mut pubsub = conn.into_pubsub();
pubsub.subscribe(&config.discovery_topic).await?;
let client2 = redis::Client::open(config.redis_url.as_str())?;
let mut pub_conn = client2.get_multiplexed_async_connection().await?;
let discovery_request = serde_json::json!({
"type": "discovery_request",
"timestamp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
});
redis::cmd("PUBLISH")
.arg(&config.discovery_topic)
.arg(discovery_request.to_string())
.query_async::<()>(&mut pub_conn)
.await?;
let timeout = tokio::time::sleep(Duration::from_secs(2));
tokio::pin!(timeout);
let mut stream = pubsub.on_message();
loop {
tokio::select! {
_ = &mut timeout => break,
msg = stream.next() => {
if let Some(msg) = msg {
if let Ok(payload) = msg.get_payload::<String>() {
if let Ok(data) = serde_json::from_str::<serde_json::Value>(&payload) {
if data["type"] == "discovery_response" {
let node = super::parser::parse_node_info(&data);
if let Some(node) = node {
discovered_nodes.insert(node.node_id.clone(), node);
}
}
}
}
}
}
}
}
Ok(discovered_nodes.values().cloned().collect())
}
#[allow(deprecated)]
pub async fn monitor<F>(
config: &DiscoveryConfig,
discovered_nodes: &mut HashMap<String, NodeInfo>,
mut callback: F,
) -> Result<()>
where
F: FnMut(&NodeInfo),
{
let client = redis::Client::open(config.redis_url.as_str())?;
let conn = client.get_async_connection().await?;
let mut pubsub = conn.into_pubsub();
pubsub.subscribe(&config.discovery_topic).await?;
let mut stream = pubsub.on_message();
loop {
if let Some(msg) = stream.next().await {
if let Ok(payload) = msg.get_payload::<String>() {
if let Ok(data) = serde_json::from_str::<serde_json::Value>(&payload) {
if data["type"] == "discovery_response" || data["type"] == "node_heartbeat" {
if let Some(node) = super::parser::parse_node_info(&data) {
let is_new = !discovered_nodes.contains_key(&node.node_id);
discovered_nodes.insert(node.node_id.clone(), node.clone());
if is_new {
callback(&node);
}
}
}
}
}
}
}
}
pub async fn announce(config: &DiscoveryConfig, node_info: &NodeInfo) -> Result<()> {
let client = redis::Client::open(config.redis_url.as_str())?;
let mut conn = client.get_multiplexed_async_connection().await?;
let announcement = serde_json::json!({
"type": "discovery_response",
"node_id": node_info.node_id,
"node_type": node_info.node_type,
"host": node_info.host,
"port": node_info.port,
"timestamp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(),
"metadata": node_info.metadata
});
redis::cmd("PUBLISH")
.arg(&config.discovery_topic)
.arg(announcement.to_string())
.query_async::<()>(&mut conn)
.await?;
Ok(())
}
pub async fn heartbeat(config: &DiscoveryConfig, node_id: &str) -> Result<()> {
let client = redis::Client::open(config.redis_url.as_str())?;
let mut conn = client.get_multiplexed_async_connection().await?;
let heartbeat = serde_json::json!({
"type": "node_heartbeat",
"node_id": node_id,
"timestamp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
});
redis::cmd("PUBLISH")
.arg(&config.discovery_topic)
.arg(heartbeat.to_string())
.query_async::<()>(&mut conn)
.await?;
Ok(())
}