1use std::sync::Arc;
30use std::sync::Weak;
31use std::time::Duration;
32
33use crate::error::{RepError, Result};
34use crate::net::Channel;
35use crate::net::service_dispatcher::{ServiceHandler, connect_to_service};
36
37pub const ADMIN_SERVICE_NAME: &str = "ADMIN";
38
39pub const CMD_TRANSFER_MASTER: u8 = 0x01;
40pub const CMD_SHUTDOWN_GROUP: u8 = 0x02;
41pub const CMD_STEP_DOWN: u8 = 0x03;
42
43pub const ACK_OK: u8 = 0x00;
44pub const ACK_REJECTED: u8 = 0x01;
45
46pub struct AdminService {
52 env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
53}
54
55impl AdminService {
56 pub fn new(
57 env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
58 ) -> Self {
59 Self { env }
60 }
61}
62
63impl ServiceHandler for AdminService {
64 fn service_name(&self) -> &str {
65 ADMIN_SERVICE_NAME
66 }
67
68 fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
69 let msg =
70 channel.receive(Duration::from_secs(10))?.ok_or_else(|| {
71 RepError::ProtocolError("ADMIN: empty command frame".into())
72 })?;
73 if msg.is_empty() {
74 let _ = channel.send(&[ACK_REJECTED]);
75 return Ok(());
76 }
77
78 let env = match self.env.upgrade() {
79 Some(e) => e,
80 None => {
81 let _ = channel.send(&[ACK_REJECTED]);
83 return Ok(());
84 }
85 };
86
87 match msg[0] {
88 CMD_TRANSFER_MASTER => {
89 if msg.len() < 1 + 8 {
90 let _ = channel.send(&[ACK_REJECTED]);
91 return Ok(());
92 }
93 let mut t = [0u8; 8];
94 t.copy_from_slice(&msg[1..9]);
95 let term = u64::from_le_bytes(t);
96 let new_master =
97 String::from_utf8(msg[9..].to_vec()).map_err(|_| {
98 RepError::ProtocolError(
99 "ADMIN: TRANSFER non-UTF8 master".into(),
100 )
101 })?;
102 let result = if new_master == env.get_node_name() {
103 env.become_master(term)
105 } else {
106 env.become_replica(&new_master)
108 };
109 let ack = if result.is_ok() { ACK_OK } else { ACK_REJECTED };
110 let _ = channel.send(&[ack]);
111 }
112 CMD_SHUTDOWN_GROUP => {
113 let result = env.close();
114 let ack = if result.is_ok() { ACK_OK } else { ACK_REJECTED };
115 let _ = channel.send(&[ack]);
116 }
117 CMD_STEP_DOWN => {
118 if msg.len() < 1 + 8 {
119 let _ = channel.send(&[ACK_REJECTED]);
120 return Ok(());
121 }
122 let res = env.ensure_unknown_state();
127 let ack = if res.is_ok() { ACK_OK } else { ACK_REJECTED };
128 let _ = channel.send(&[ack]);
129 }
130 other => {
131 log::warn!("ADMIN: unknown command 0x{:02x}", other);
132 let _ = channel.send(&[ACK_REJECTED]);
133 }
134 }
135 Ok(())
136 }
137}
138
139pub fn send_transfer_master(
141 peer_addr: std::net::SocketAddr,
142 new_master: &str,
143 term: u64,
144) -> Result<bool> {
145 let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
146 let mut buf = Vec::with_capacity(1 + 8 + new_master.len());
147 buf.push(CMD_TRANSFER_MASTER);
148 buf.extend_from_slice(&term.to_le_bytes());
149 buf.extend_from_slice(new_master.as_bytes());
150 channel.send(&buf)?;
151 let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
152 Ok(matches!(reply.first(), Some(&ACK_OK)))
153}
154
155pub fn send_shutdown_group(peer_addr: std::net::SocketAddr) -> Result<bool> {
157 let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
158 channel.send(&[CMD_SHUTDOWN_GROUP])?;
159 let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
160 Ok(matches!(reply.first(), Some(&ACK_OK)))
161}
162
163pub fn send_step_down(
165 peer_addr: std::net::SocketAddr,
166 term: u64,
167) -> Result<bool> {
168 let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
169 let mut buf = Vec::with_capacity(1 + 8);
170 buf.push(CMD_STEP_DOWN);
171 buf.extend_from_slice(&term.to_le_bytes());
172 channel.send(&buf)?;
173 let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
174 Ok(matches!(reply.first(), Some(&ACK_OK)))
175}
176
177pub fn register_admin_service(
181 dispatcher: &crate::net::service_dispatcher::TcpServiceDispatcher,
182 env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
183) {
184 let svc = AdminService::new(env);
185 dispatcher.register(ADMIN_SERVICE_NAME, Arc::new(svc));
186}