#![allow(dead_code)]
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use nodedb_cluster::{
CacheApplier, ClusterCatalog, ClusterConfig, ClusterLifecycleState, ClusterLifecycleTracker,
ClusterTopology, MetadataCache, NexarTransport, RaftLoop, start_cluster,
};
pub fn test_transport(node_id: u64) -> Result<NexarTransport, nodedb_cluster::ClusterError> {
NexarTransport::with_timeout(
node_id,
"127.0.0.1:0".parse().unwrap(),
Duration::from_secs(4),
)
}
use nodedb_raft::message::LogEntry;
use tempfile::TempDir;
use tokio::sync::watch;
pub struct NoopApplier;
impl nodedb_cluster::CommitApplier for NoopApplier {
fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
entries.last().map(|e| e.index).unwrap_or(0)
}
}
pub struct TestNode {
_data_dir: Option<TempDir>,
pub data_dir_path: PathBuf,
pub transport: Arc<NexarTransport>,
pub topology: Arc<RwLock<ClusterTopology>>,
pub lifecycle: ClusterLifecycleTracker,
pub node_id: u64,
pub metadata_cache: Arc<RwLock<MetadataCache>>,
pub multi_raft: Arc<std::sync::Mutex<nodedb_cluster::MultiRaft>>,
raft_loop: Arc<RaftLoop<NoopApplier>>,
shutdown_tx: watch::Sender<bool>,
serve_handle: tokio::task::JoinHandle<()>,
run_handle: tokio::task::JoinHandle<()>,
}
impl TestNode {
pub async fn spawn(
node_id: u64,
seed_nodes: Vec<SocketAddr>,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let transport = Arc::new(test_transport(node_id)?);
Self::spawn_with_transport(node_id, transport, seed_nodes).await
}
pub async fn spawn_with_transport(
node_id: u64,
transport: Arc<NexarTransport>,
seed_nodes: Vec<SocketAddr>,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let data_dir = tempfile::tempdir()?;
let data_dir_path = data_dir.path().to_path_buf();
Self::spawn_inner(
node_id,
transport,
seed_nodes,
data_dir_path,
Some(data_dir),
)
.await
}
pub async fn spawn_with_data_dir(
node_id: u64,
data_dir: &Path,
seed_nodes: Vec<SocketAddr>,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let transport = Arc::new(test_transport(node_id)?);
Self::spawn_inner(node_id, transport, seed_nodes, data_dir.to_path_buf(), None).await
}
async fn spawn_inner(
node_id: u64,
transport: Arc<NexarTransport>,
seed_nodes: Vec<SocketAddr>,
data_dir_path: PathBuf,
owned_data_dir: Option<TempDir>,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let catalog = Arc::new(ClusterCatalog::open(&data_dir_path.join("cluster.redb"))?);
let listen_addr = transport.local_addr();
let seeds = if seed_nodes.is_empty() {
vec![listen_addr]
} else {
seed_nodes
};
let config = ClusterConfig {
node_id,
listen_addr,
seed_nodes: seeds,
num_groups: 2,
replication_factor: 3,
data_dir: data_dir_path.clone(),
force_bootstrap: false,
join_retry: nodedb_cluster::JoinRetryPolicy {
max_attempts: 8,
max_backoff_secs: 2,
},
swim_udp_addr: None,
};
let lifecycle = ClusterLifecycleTracker::new();
let state = start_cluster(&config, &catalog, &transport, &lifecycle).await?;
lifecycle.to_ready(state.topology.node_count());
let topology = Arc::new(RwLock::new(state.topology));
let metadata_cache = Arc::new(RwLock::new(MetadataCache::new()));
let metadata_applier: Arc<dyn nodedb_cluster::MetadataApplier> =
Arc::new(CacheApplier::new(metadata_cache.clone()));
let raft_loop = Arc::new(
RaftLoop::new(
state.multi_raft,
transport.clone(),
topology.clone(),
NoopApplier,
)
.with_metadata_applier(metadata_applier)
.with_catalog(catalog.clone()),
);
let multi_raft = raft_loop.multi_raft_handle();
let (shutdown_tx, shutdown_rx_serve) = watch::channel(false);
let shutdown_rx_run = shutdown_tx.subscribe();
let transport_for_serve = transport.clone();
let handler_for_serve = raft_loop.clone();
let serve_handle = tokio::spawn(async move {
let _ = transport_for_serve
.serve(handler_for_serve, shutdown_rx_serve)
.await;
});
let raft_loop_for_run = raft_loop.clone();
let run_handle = tokio::spawn(async move {
raft_loop_for_run.run(shutdown_rx_run).await;
});
drop(catalog);
Ok(Self {
_data_dir: owned_data_dir,
data_dir_path,
transport,
topology,
lifecycle,
node_id,
metadata_cache,
multi_raft,
raft_loop,
shutdown_tx,
serve_handle,
run_handle,
})
}
pub fn propose_metadata(
&self,
entry: &nodedb_cluster::MetadataEntry,
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let bytes = nodedb_cluster::encode_entry(entry)?;
Ok(self.raft_loop.propose_to_metadata_group(bytes)?)
}
pub fn is_metadata_leader(&self) -> bool {
for status in self.raft_loop.group_statuses() {
if status.group_id == nodedb_cluster::METADATA_GROUP_ID {
return status.leader_id == self.node_id;
}
}
false
}
pub fn catalog_entries_applied(&self) -> u64 {
self.metadata_cache
.read()
.unwrap_or_else(|p| p.into_inner())
.catalog_entries_applied
}
pub fn listen_addr(&self) -> SocketAddr {
self.transport.local_addr()
}
pub fn topology_size(&self) -> usize {
self.topology
.read()
.unwrap_or_else(|p| p.into_inner())
.node_count()
}
pub fn lifecycle_state(&self) -> ClusterLifecycleState {
self.lifecycle.current()
}
pub fn topology_contains(&self, node_id: u64) -> bool {
self.topology
.read()
.unwrap_or_else(|p| p.into_inner())
.contains(node_id)
}
pub fn topology_ids(&self) -> Vec<u64> {
let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
let mut ids: Vec<u64> = topo.all_nodes().map(|n| n.node_id).collect();
ids.sort_unstable();
ids
}
pub async fn shutdown(self) {
self.raft_loop.begin_shutdown();
let _ = self.shutdown_tx.send(true);
self.serve_handle.abort();
self.run_handle.abort();
let _ = self.serve_handle.await;
let _ = self.run_handle.await;
for _ in 0..32 {
tokio::task::yield_now().await;
}
}
}
pub async fn wait_for<F: FnMut() -> bool>(
desc: &str,
deadline: Duration,
step: Duration,
mut pred: F,
) {
let start = std::time::Instant::now();
while start.elapsed() < deadline {
if pred() {
return;
}
tokio::time::sleep(step).await;
}
panic!("timed out after {:?} waiting for: {}", deadline, desc);
}