use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::elections::acceptor_state::PersistentAcceptorState;
use crate::elections::paxos::run_acceptor_with_state;
use crate::error::Result;
use crate::net::channel::Channel;
use crate::net::service_dispatcher::ServiceHandler;
pub const ELECTION_SERVICE_NAME: &str = "ELECTION";
pub struct ElectionAcceptorState {
pub node_name: String,
pub own_vlsn: AtomicU64,
pub own_priority: u32,
pub own_term: AtomicU64,
pub persistent: Arc<PersistentAcceptorState>,
}
impl ElectionAcceptorState {
pub fn new(node_name: String, own_priority: u32) -> Self {
Self {
node_name,
own_vlsn: AtomicU64::new(0),
own_priority,
own_term: AtomicU64::new(0),
persistent: Arc::new(PersistentAcceptorState::in_memory()),
}
}
pub fn with_env_home(
node_name: String,
own_priority: u32,
env_home: &std::path::Path,
) -> Self {
Self {
node_name,
own_vlsn: AtomicU64::new(0),
own_priority,
own_term: AtomicU64::new(0),
persistent: Arc::new(PersistentAcceptorState::load_or_default(
env_home,
)),
}
}
pub fn set_vlsn(&self, vlsn: u64) {
self.own_vlsn.store(vlsn, Ordering::SeqCst);
}
pub fn set_term(&self, term: u64) {
self.own_term.store(term, Ordering::SeqCst);
}
pub fn snapshot(&self) -> (u64, u32, u64) {
(
self.own_vlsn.load(Ordering::SeqCst),
self.own_priority,
self.own_term.load(Ordering::SeqCst),
)
}
}
pub struct ElectionService {
state: Arc<ElectionAcceptorState>,
}
impl ElectionService {
pub fn new(state: Arc<ElectionAcceptorState>) -> Self {
Self { state }
}
}
impl ServiceHandler for ElectionService {
fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
let (vlsn, priority, term) = self.state.snapshot();
match run_acceptor_with_state(
&*channel,
&self.state.node_name,
vlsn,
priority,
term,
&self.state.persistent,
) {
Ok(_) => Ok(()),
Err(e) => {
log::debug!("ELECTION service: acceptor returned error: {}", e);
Ok(())
}
}
}
fn service_name(&self) -> &str {
ELECTION_SERVICE_NAME
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::elections::paxos::run_election;
use crate::net::channel::TcpChannel;
use crate::net::service_dispatcher::{
TcpServiceDispatcher, connect_to_service,
};
use crate::node_type::NodeType;
use crate::rep_group::RepGroup;
use crate::rep_node::RepNode;
use std::sync::Arc;
fn make_group_2(self_name: &str, peer_name: &str) -> RepGroup {
let mut g = RepGroup::new("g".into(), 1);
g.add_node(RepNode::new(
self_name.into(),
NodeType::Electable,
"127.0.0.1".into(),
5_001,
1,
));
g.add_node(RepNode::new(
peer_name.into(),
NodeType::Electable,
"127.0.0.1".into(),
5_002,
2,
));
g
}
#[test]
fn election_service_handles_acceptor_round_trip() {
let acceptor_state =
Arc::new(ElectionAcceptorState::new("peer".into(), 1));
acceptor_state.set_vlsn(50);
acceptor_state.set_term(1);
let svc = Arc::new(ElectionService::new(acceptor_state));
let sd =
TcpServiceDispatcher::new("127.0.0.1:0".parse().unwrap()).unwrap();
sd.register(ELECTION_SERVICE_NAME, svc);
let bound = sd.start().unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
let group = make_group_2("self", "peer");
let ch = connect_to_service(bound, ELECTION_SERVICE_NAME).unwrap();
let ch_arc: Arc<dyn Channel> = Arc::new(ch);
let winner = run_election(1, "self", &group, &[ch_arc], 100, 1, 1);
assert_eq!(winner, Some(1));
std::thread::sleep(std::time::Duration::from_millis(50));
sd.stop();
}
#[test]
fn election_service_state_snapshot_consistency() {
let s = ElectionAcceptorState::new("n".into(), 5);
assert_eq!(s.snapshot(), (0, 5, 0));
s.set_vlsn(42);
s.set_term(7);
assert_eq!(s.snapshot(), (42, 5, 7));
}
#[allow(dead_code)]
fn _ensure_tcp_channel_in_scope() -> Option<TcpChannel> {
None
}
}