Skip to main content

noxu_rep/
group_admin.rs

1//! Group-administration service: master transfer and group shutdown.
2//!
3//! Closes findings F7 (`transfer_master`) and F8 (`shutdown_group`)
4//! of `docs/src/internal/api-audit-2026-05-rep.md`.
5//!
6//! # Wire protocol (over `TcpServiceDispatcher` `ADMIN` channel)
7//!
8//! A single framed message from caller → recipient:
9//!
10//! ```text
11//!   byte 0      : command code
12//!     0x01 = TRANSFER_MASTER
13//!     0x02 = SHUTDOWN_GROUP
14//!     0x03 = STEP_DOWN
15//!   bytes 1..9  : term       (u64 LE) — for TRANSFER and STEP_DOWN
16//!   bytes 9..   : master_name (UTF-8) — for TRANSFER (the new master)
17//! ```
18//!
19//! The recipient applies the command to its local `ReplicatedEnvironment`
20//! and replies with a single-byte ack:
21//!
22//! ```text
23//!   byte 0 : ack
24//!     0x00 = OK
25//!     0x01 = REJECTED (e.g., recipient is not in a state to honour
26//!                       the request; details in the log)
27//! ```
28
29use std::sync::Arc;
30use std::sync::Weak;
31use std::time::Duration;
32
33use crate::error::{RepError, Result};
34use crate::net::Channel;
35use crate::net::service_dispatcher::{ServiceHandler, connect_to_service};
36
37pub const ADMIN_SERVICE_NAME: &str = "ADMIN";
38
39pub const CMD_TRANSFER_MASTER: u8 = 0x01;
40pub const CMD_SHUTDOWN_GROUP: u8 = 0x02;
41pub const CMD_STEP_DOWN: u8 = 0x03;
42
43pub const ACK_OK: u8 = 0x00;
44pub const ACK_REJECTED: u8 = 0x01;
45
46/// Service handler for the ADMIN channel.
47///
48/// Holds a `Weak<ReplicatedEnvironment>` so that handler-spawned per-
49/// connection threads can apply commands to the live environment
50/// without keeping the env alive past `close()`.
51pub struct AdminService {
52    env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
53}
54
55impl AdminService {
56    pub fn new(
57        env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
58    ) -> Self {
59        Self { env }
60    }
61}
62
63impl ServiceHandler for AdminService {
64    fn service_name(&self) -> &str {
65        ADMIN_SERVICE_NAME
66    }
67
68    fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
69        let msg =
70            channel.receive(Duration::from_secs(10))?.ok_or_else(|| {
71                RepError::ProtocolError("ADMIN: empty command frame".into())
72            })?;
73        if msg.is_empty() {
74            let _ = channel.send(&[ACK_REJECTED]);
75            return Ok(());
76        }
77
78        let env = match self.env.upgrade() {
79            Some(e) => e,
80            None => {
81                // Env is gone; reject.
82                let _ = channel.send(&[ACK_REJECTED]);
83                return Ok(());
84            }
85        };
86
87        match msg[0] {
88            CMD_TRANSFER_MASTER => {
89                if msg.len() < 1 + 8 {
90                    let _ = channel.send(&[ACK_REJECTED]);
91                    return Ok(());
92                }
93                let mut t = [0u8; 8];
94                t.copy_from_slice(&msg[1..9]);
95                let term = u64::from_le_bytes(t);
96                let new_master =
97                    String::from_utf8(msg[9..].to_vec()).map_err(|_| {
98                        RepError::ProtocolError(
99                            "ADMIN: TRANSFER non-UTF8 master".into(),
100                        )
101                    })?;
102                let result = if new_master == env.get_node_name() {
103                    // We are the target.  Become master at the new term.
104                    env.become_master(term)
105                } else {
106                    // We are a peer — record the new master.
107                    env.become_replica(&new_master)
108                };
109                let ack = if result.is_ok() { ACK_OK } else { ACK_REJECTED };
110                let _ = channel.send(&[ack]);
111            }
112            CMD_SHUTDOWN_GROUP => {
113                let result = env.close();
114                let ack = if result.is_ok() { ACK_OK } else { ACK_REJECTED };
115                let _ = channel.send(&[ack]);
116            }
117            CMD_STEP_DOWN => {
118                if msg.len() < 1 + 8 {
119                    let _ = channel.send(&[ACK_REJECTED]);
120                    return Ok(());
121                }
122                // Old master self-demotes; target name unused on
123                // step-down — the recipient just transitions out of
124                // mastership.  Caller is expected to still hold
125                // mastership at the time of the call.
126                let res = env.ensure_unknown_state();
127                let ack = if res.is_ok() { ACK_OK } else { ACK_REJECTED };
128                let _ = channel.send(&[ack]);
129            }
130            other => {
131                log::warn!("ADMIN: unknown command 0x{:02x}", other);
132                let _ = channel.send(&[ACK_REJECTED]);
133            }
134        }
135        Ok(())
136    }
137}
138
139/// Send a `TRANSFER_MASTER` command to `peer_addr`.
140pub fn send_transfer_master(
141    peer_addr: std::net::SocketAddr,
142    new_master: &str,
143    term: u64,
144) -> Result<bool> {
145    let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
146    let mut buf = Vec::with_capacity(1 + 8 + new_master.len());
147    buf.push(CMD_TRANSFER_MASTER);
148    buf.extend_from_slice(&term.to_le_bytes());
149    buf.extend_from_slice(new_master.as_bytes());
150    channel.send(&buf)?;
151    let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
152    Ok(matches!(reply.first(), Some(&ACK_OK)))
153}
154
155/// Send a `SHUTDOWN_GROUP` command to `peer_addr`.
156pub fn send_shutdown_group(peer_addr: std::net::SocketAddr) -> Result<bool> {
157    let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
158    channel.send(&[CMD_SHUTDOWN_GROUP])?;
159    let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
160    Ok(matches!(reply.first(), Some(&ACK_OK)))
161}
162
163/// Send a `STEP_DOWN` command to `peer_addr`.
164pub fn send_step_down(
165    peer_addr: std::net::SocketAddr,
166    term: u64,
167) -> Result<bool> {
168    let channel = connect_to_service(peer_addr, ADMIN_SERVICE_NAME)?;
169    let mut buf = Vec::with_capacity(1 + 8);
170    buf.push(CMD_STEP_DOWN);
171    buf.extend_from_slice(&term.to_le_bytes());
172    channel.send(&buf)?;
173    let reply = channel.receive(Duration::from_secs(10))?.unwrap_or_default();
174    Ok(matches!(reply.first(), Some(&ACK_OK)))
175}
176
177/// Shared helper: register the ADMIN service on `dispatcher`, holding a
178/// `Weak<ReplicatedEnvironment>` so the handler outlives no longer
179/// than the env itself.
180pub(crate) fn register_admin_service(
181    dispatcher: &crate::net::service_dispatcher::AnyServiceDispatcher,
182    env: Weak<crate::replicated_environment::ReplicatedEnvironment>,
183) {
184    let svc = AdminService::new(env);
185    dispatcher.register(ADMIN_SERVICE_NAME, Arc::new(svc));
186}