#![allow(clippy::disallowed_types)]
mod static_discovery;
pub use static_discovery::{StaticDiscovery, StaticDiscoveryConfig};
mod gossip_discovery;
pub use gossip_discovery::{keys, GossipDiscovery, GossipDiscoveryConfig};
use std::collections::HashMap;
use std::fmt;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
pub use crate::state::NodeId;
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
rkyv::Archive,
rkyv::Serialize,
rkyv::Deserialize,
)]
pub enum NodeState {
Joining,
Active,
Suspected,
Draining,
Left,
}
impl fmt::Display for NodeState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Joining => write!(f, "joining"),
Self::Active => write!(f, "active"),
Self::Suspected => write!(f, "suspected"),
Self::Draining => write!(f, "draining"),
Self::Left => write!(f, "left"),
}
}
}
#[derive(
Debug,
Clone,
PartialEq,
Serialize,
Deserialize,
rkyv::Archive,
rkyv::Serialize,
rkyv::Deserialize,
)]
pub struct NodeMetadata {
pub cores: u32,
pub memory_bytes: u64,
pub failure_domain: Option<String>,
pub tags: HashMap<String, String>,
pub owned_partitions: Vec<u32>,
pub version: String,
}
impl Default for NodeMetadata {
fn default() -> Self {
Self {
cores: 1,
memory_bytes: 0,
failure_domain: None,
tags: HashMap::new(),
owned_partitions: Vec::new(),
version: String::new(),
}
}
}
#[derive(
Debug,
Clone,
PartialEq,
Serialize,
Deserialize,
rkyv::Archive,
rkyv::Serialize,
rkyv::Deserialize,
)]
pub struct NodeInfo {
pub id: NodeId,
pub name: String,
pub rpc_address: String,
pub raft_address: String,
pub state: NodeState,
pub metadata: NodeMetadata,
pub last_heartbeat_ms: i64,
}
#[must_use]
pub fn assignable_node_ids(members: &[NodeInfo]) -> Vec<NodeId> {
let mut ids: Vec<NodeId> = members
.iter()
.filter(|m| matches!(m.state, NodeState::Active))
.map(|m| m.id)
.filter(|id| !id.is_unassigned())
.collect();
ids.sort_unstable();
ids.dedup();
ids
}
#[derive(Debug, Clone)]
pub enum MembershipEvent {
NodeJoined(Box<NodeInfo>),
NodeStateChanged {
node_id: NodeId,
old_state: NodeState,
new_state: NodeState,
},
NodeLeft(NodeId),
}
#[derive(Debug, thiserror::Error)]
pub enum DiscoveryError {
#[error("bind error: {0}")]
Bind(String),
#[error("connection error to {address}: {reason}")]
Connection {
address: String,
reason: String,
},
#[error("serialization error: {0}")]
Serialization(String),
#[error("discovery not started")]
NotStarted,
#[error("discovery shut down")]
ShutDown,
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
}
#[allow(async_fn_in_trait)]
pub trait Discovery: Send + Sync + 'static {
async fn start(&mut self) -> Result<(), DiscoveryError>;
async fn peers(&self) -> Result<Vec<NodeInfo>, DiscoveryError>;
async fn announce(&self, info: NodeInfo) -> Result<(), DiscoveryError>;
fn membership_watch(&self) -> watch::Receiver<Vec<NodeInfo>>;
async fn stop(&mut self) -> Result<(), DiscoveryError>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_id_display() {
assert_eq!(NodeId(42).to_string(), "node-42");
}
#[test]
fn test_node_id_unassigned() {
assert!(NodeId::UNASSIGNED.is_unassigned());
assert!(!NodeId(1).is_unassigned());
}
#[test]
fn test_node_state_display() {
assert_eq!(NodeState::Active.to_string(), "active");
assert_eq!(NodeState::Suspected.to_string(), "suspected");
assert_eq!(NodeState::Draining.to_string(), "draining");
}
fn info_with(id: u64, state: NodeState) -> NodeInfo {
NodeInfo {
id: NodeId(id),
name: format!("n{id}"),
rpc_address: String::new(),
raft_address: String::new(),
state,
metadata: NodeMetadata::default(),
last_heartbeat_ms: 0,
}
}
#[test]
fn assignable_includes_only_active_sorted_deduped() {
let members = vec![
info_with(5, NodeState::Active),
info_with(2, NodeState::Joining),
info_with(3, NodeState::Suspected),
info_with(4, NodeState::Draining),
info_with(6, NodeState::Left),
info_with(1, NodeState::Active),
info_with(1, NodeState::Active), ];
assert_eq!(assignable_node_ids(&members), vec![NodeId(1), NodeId(5)]);
}
#[test]
fn assignable_drops_unassigned() {
let mut unassigned = info_with(7, NodeState::Active);
unassigned.id = NodeId::UNASSIGNED;
let members = vec![unassigned, info_with(7, NodeState::Active)];
assert_eq!(assignable_node_ids(&members), vec![NodeId(7)]);
}
#[test]
fn test_node_metadata_default() {
let meta = NodeMetadata::default();
assert_eq!(meta.cores, 1);
assert_eq!(meta.memory_bytes, 0);
assert!(meta.failure_domain.is_none());
assert!(meta.tags.is_empty());
assert!(meta.owned_partitions.is_empty());
}
#[test]
fn test_node_id_serialization() {
let id = NodeId(123);
let json = serde_json::to_string(&id).unwrap();
let back: NodeId = serde_json::from_str(&json).unwrap();
assert_eq!(id, back);
}
#[test]
fn test_node_info_serialization() {
let info = NodeInfo {
id: NodeId(1),
name: "test-node".into(),
rpc_address: "127.0.0.1:9000".into(),
raft_address: "127.0.0.1:9001".into(),
state: NodeState::Active,
metadata: NodeMetadata::default(),
last_heartbeat_ms: 1000,
};
let json = serde_json::to_string(&info).unwrap();
let back: NodeInfo = serde_json::from_str(&json).unwrap();
assert_eq!(back.id, info.id);
assert_eq!(back.name, "test-node");
}
}