use super::*;
mod communicator;
use communicator::{Communicator, RaftConnection};
use std::collections::HashMap;
pub struct RaftNode {
self_node_id: NodeAddress,
cache: moka::sync::Cache<NodeAddress, RaftConnection>,
process_map: spin::RwLock<HashMap<ShardIndex, Arc<process::RaftProcess>>>,
}
impl RaftNode {
pub fn new(id: NodeAddress) -> Self {
let builder = moka::sync::Cache::builder()
.initial_capacity(3)
.time_to_idle(Duration::from_secs(60));
Self {
self_node_id: id,
cache: builder.build(),
process_map: HashMap::new().into(),
}
}
pub fn get_handle(&self, shard_index: ShardIndex) -> RaftHandle {
RaftHandle {
shard_index,
self_node_id: self.self_node_id.clone(),
connection_cache: self.cache.clone(),
}
}
pub fn attach_process(&self, shard_index: ShardIndex, p: process::RaftProcess) {
self.process_map.write().insert(shard_index, Arc::new(p));
}
pub fn detach_process(&self, shard_index: ShardIndex) {
self.process_map.write().remove(&shard_index);
}
pub(super) fn get_process(&self, shard_index: ShardIndex) -> Option<Arc<process::RaftProcess>> {
self.process_map.read().get(&shard_index).cloned()
}
}
#[derive(Clone)]
pub struct RaftHandle {
pub self_node_id: NodeAddress,
pub shard_index: ShardIndex,
connection_cache: moka::sync::Cache<NodeAddress, RaftConnection>,
}
impl RaftHandle {
pub(super) fn self_node_id(&self) -> NodeAddress {
self.self_node_id.clone()
}
pub(super) fn connect(&self, dest_node_id: NodeAddress) -> Communicator {
let conn: RaftConnection = self.connection_cache.get_with(dest_node_id.clone(), || {
RaftConnection::new(self.self_node_id.clone(), dest_node_id.clone())
});
Communicator::new(conn, self.shard_index)
}
}