use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use crate::error::{RepError, Result};
use crate::net::Channel;
use crate::net::service_dispatcher::{ServiceHandler, connect_to_service};
pub const ADMIN_SERVICE_NAME: &str = "ADMIN";
pub const CMD_TRANSFER_MASTER: u8 = 0x01;
pub const CMD_SHUTDOWN_GROUP: u8 = 0x02;
pub const CMD_STEP_DOWN: u8 = 0x03;
pub const ACK_OK: u8 = 0x00;
pub const ACK_REJECTED: u8 = 0x01;
pub struct AdminService {
env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
}
impl AdminService {
pub fn new(
env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
) -> Self {
Self { env }
}
}
impl ServiceHandler for AdminService {
fn service_name(&self) -> &str {
ADMIN_SERVICE_NAME
}
fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
let msg =
channel.receive(Duration::from_secs(10))?.ok_or_else(|| {
RepError::ProtocolError("ADMIN: empty command frame".into())
})?;
if msg.is_empty() {
let _ = channel.send(&[ACK_REJECTED]);
return Ok(());
}
let env = match self.env.upgrade() {
Some(e) => e,
None => {
let _ = channel.send(&[ACK_REJECTED]);
return Ok(());
}
};
match msg[0] {
CMD_TRANSFER_MASTER => {
if msg.len() < 1 + 8 {
let _ = channel.send(&[ACK_REJECTED]);
return Ok(());
}
let mut t = [0u8; 8];
t.copy_from_slice(&msg[1..9]);
let term = u64::from_le_bytes(t);
let new_master =
String::from_utf8(msg[9..].to_vec()).map_err(|_| {
RepError::ProtocolError(
"ADMIN: TRANSFER non-UTF8 master".into(),
)
})?;
let result = if new_master == env.get_node_name() {
env.become_master(term)
} else {
env.become_replica(&new_master)
};
let ack = if result.is_ok() { ACK_OK } else { ACK_REJECTED };
let _ = channel.send(&[ack]);
}
CMD_SHUTDOWN_GROUP => {
let result = env.close();
let ack = if result.is_ok() { ACK_OK } else { ACK_REJECTED };
let _ = channel.send(&[ack]);
}
CMD_STEP_DOWN => {
if msg.len() < 1 + 8 {
let _ = channel.send(&[ACK_REJECTED]);
return Ok(());
}
let res = env.ensure_unknown_state();
let ack = if res.is_ok() { ACK_OK } else { ACK_REJECTED };
let _ = channel.send(&[ack]);
}
other => {
log::warn!("ADMIN: unknown command 0x{:02x}", other);
let _ = channel.send(&[ACK_REJECTED]);
}
}
Ok(())
}
}
pub fn send_transfer_master(
peer_addr: std::net::SocketAddr,
new_master: &str,
term: u64,
) -> Result<bool> {
let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
let mut buf = Vec::with_capacity(1 + 8 + new_master.len());
buf.push(CMD_TRANSFER_MASTER);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(new_master.as_bytes());
channel.send(&buf)?;
let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
Ok(matches!(reply.first(), Some(&ACK_OK)))
}
pub fn send_shutdown_group(peer_addr: std::net::SocketAddr) -> Result<bool> {
let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
channel.send(&[CMD_SHUTDOWN_GROUP])?;
let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
Ok(matches!(reply.first(), Some(&ACK_OK)))
}
pub fn send_step_down(
peer_addr: std::net::SocketAddr,
term: u64,
) -> Result<bool> {
let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
let mut buf = Vec::with_capacity(1 + 8);
buf.push(CMD_STEP_DOWN);
buf.extend_from_slice(&term.to_le_bytes());
channel.send(&buf)?;
let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
Ok(matches!(reply.first(), Some(&ACK_OK)))
}
pub(crate) fn register_admin_service(
dispatcher: &crate::net::service_dispatcher::AnyServiceDispatcher,
env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
) {
let svc = AdminService::new(env);
dispatcher.register(ADMIN_SERVICE_NAME, Arc::new(svc));
}