use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use nodedb_types::config::tuning::ClusterTransportTuning;
use tracing::info;
use crate::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, RetryPolicy};
use crate::error::{ClusterError, Result};
use crate::transport::auth_context::AuthContext;
use crate::transport::config;
use crate::transport::credentials::{self, TransportCredentials};
pub struct NexarTransport {
pub(super) node_id: u64,
pub(super) listener: nexar::TransportListener,
pub(super) client_config: quinn::ClientConfig,
pub(super) peers: RwLock<HashMap<u64, quinn::Connection>>,
pub(super) peer_addrs: RwLock<HashMap<u64, SocketAddr>>,
pub(super) rpc_timeout: Duration,
pub(super) circuit_breaker: CircuitBreaker,
pub(super) retry_policy: RetryPolicy,
pub(super) auth: Arc<AuthContext>,
}
impl NexarTransport {
pub fn new(node_id: u64, listen_addr: SocketAddr, creds: TransportCredentials) -> Result<Self> {
Self::with_timeout(node_id, listen_addr, config::DEFAULT_RPC_TIMEOUT, creds)
}
pub fn with_timeout(
node_id: u64,
listen_addr: SocketAddr,
rpc_timeout: Duration,
creds: TransportCredentials,
) -> Result<Self> {
let defaults = ClusterTransportTuning::default();
Self::build(node_id, listen_addr, rpc_timeout, &defaults, creds)
}
pub fn with_tuning(
node_id: u64,
listen_addr: SocketAddr,
tuning: &ClusterTransportTuning,
creds: TransportCredentials,
) -> Result<Self> {
let rpc_timeout = Duration::from_secs(tuning.rpc_timeout_secs);
Self::build(node_id, listen_addr, rpc_timeout, tuning, creds)
}
fn build(
node_id: u64,
listen_addr: SocketAddr,
rpc_timeout: Duration,
tuning: &ClusterTransportTuning,
creds: TransportCredentials,
) -> Result<Self> {
let (server_config, client_config) = match &creds {
TransportCredentials::Mtls(tls) => (
config::make_raft_server_config_mtls(tls, tuning)?,
config::make_raft_client_config_mtls(tls, tuning)?,
),
TransportCredentials::Insecure => {
credentials::announce_insecure_transport(node_id);
(
config::make_raft_server_config(tuning)?,
config::make_raft_client_config(tuning)?,
)
}
};
let auth = Arc::new(AuthContext::from_credentials(node_id, &creds));
let listener = nexar::TransportListener::bind_with_config(listen_addr, server_config)
.map_err(|e| ClusterError::Transport {
detail: format!("bind {listen_addr}: {e}"),
})?;
info!(
node_id,
addr = %listener.local_addr(),
rpc_timeout_ms = rpc_timeout.as_millis() as u64,
mtls = !creds.is_insecure(),
"raft transport bound"
);
Ok(Self {
node_id,
listener,
client_config,
peers: RwLock::new(HashMap::new()),
peer_addrs: RwLock::new(HashMap::new()),
rpc_timeout,
circuit_breaker: CircuitBreaker::new(CircuitBreakerConfig::default()),
retry_policy: RetryPolicy::default(),
auth,
})
}
pub(super) fn auth(&self) -> &Arc<AuthContext> {
&self.auth
}
pub fn circuit_breaker(&self) -> &CircuitBreaker {
&self.circuit_breaker
}
pub fn local_addr(&self) -> SocketAddr {
self.listener.local_addr()
}
pub fn node_id(&self) -> u64 {
self.node_id
}
pub fn peer_snapshot(&self) -> Vec<TransportPeerSnapshot> {
let addrs = self.peer_addrs.read().unwrap_or_else(|p| p.into_inner());
let peers = self.peers.read().unwrap_or_else(|p| p.into_inner());
let mut out: Vec<TransportPeerSnapshot> = addrs
.iter()
.map(|(id, addr)| TransportPeerSnapshot {
peer_id: *id,
addr: addr.to_string(),
connected: peers.contains_key(id),
})
.collect();
out.sort_by_key(|p| p.peer_id);
out
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct TransportPeerSnapshot {
pub peer_id: u64,
pub addr: String,
pub connected: bool,
}