Skip to main content

mc_minder/
command_sender.rs

1use anyhow::{Context, Result};
2use log::{debug, warn, info};
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::process::ChildStdin;
5use tokio::sync::Mutex;
6use tokio::time::{timeout, Duration};
7
8// ============================================================
9// CommandSenderMode - Enum for different transport modes
10// ============================================================
11
12#[derive(Clone)]
13pub enum CommandSenderMode {
14    Rcon { host: String, port: u16, password: String },
15    PooledRcon { host: String, port: u16, password: String },
16    Stdin { stdin: std::sync::Arc<Mutex<ChildStdin>> },
17}
18
19pub struct CommandSender {
20    mode: CommandSenderMode,
21}
22
23impl CommandSender {
24    pub fn new(mode: CommandSenderMode) -> Self {
25        Self { mode }
26    }
27
28    pub fn rcon(host: String, port: u16, password: String) -> Self {
29        Self { mode: CommandSenderMode::Rcon { host, port, password } }
30    }
31
32    /// Create a pooled RCON sender that maintains a persistent connection.
33    /// This is more efficient than creating a new connection per command.
34    pub fn pooled_rcon(host: String, port: u16, password: String) -> Self {
35        Self { mode: CommandSenderMode::PooledRcon { host, port, password } }
36    }
37
38    pub fn stdin(stdin: ChildStdin) -> Self {
39        Self { mode: CommandSenderMode::Stdin { stdin: std::sync::Arc::new(Mutex::new(stdin)) } }
40    }
41
42    /// Send a command and return the response text (if available).
43    /// For RCON, this returns the server's response to the command.
44    /// For Stdin, this returns a confirmation string.
45    pub async fn send_command(&mut self, command: &str) -> Result<String> {
46        match &mut self.mode {
47            CommandSenderMode::Rcon { host, port, password } => {
48                RconCommandSender::new(host.clone(), *port, password.clone())
49                    .send_command(command)
50                    .await
51            }
52            CommandSenderMode::PooledRcon { host, port, password } => {
53                PooledRconSender::new(host.clone(), *port, password.clone())
54                    .send_command(command)
55                    .await
56            }
57            CommandSenderMode::Stdin { stdin } => {
58                StdinCommandSender::new(stdin.clone()).send_command(command).await
59            }
60        }
61    }
62
63    /// Send a command without caring about the response.
64    /// Useful for fire-and-forget commands like `say` and `tellraw`.
65    pub async fn send_command_ignore_response(&mut self, command: &str) -> Result<()> {
66        match self.send_command(command).await {
67            Ok(_) => Ok(()),
68            Err(e) => Err(e),
69        }
70    }
71
72    pub fn name(&self) -> &'static str {
73        match &self.mode {
74            CommandSenderMode::Rcon { .. } => "RCON",
75            CommandSenderMode::PooledRcon { .. } => "PooledRCON",
76            CommandSenderMode::Stdin { .. } => "Stdin",
77        }
78    }
79}
80
81// ============================================================
82// RconCommandSender - Single-shot RCON connection per command
83// ============================================================
84
85pub struct RconCommandSender {
86    host: String,
87    port: u16,
88    password: String,
89}
90
91impl RconCommandSender {
92    pub fn new(host: String, port: u16, password: String) -> Self {
93        Self { host, port, password }
94    }
95
96    pub async fn send_command(&mut self, command: &str) -> Result<String> {
97        const MAX_RETRIES: u32 = 3;
98
99        for attempt in 1..=MAX_RETRIES {
100            match self.execute_with_response(command).await {
101                Ok(response) => {
102                    debug!("[RconSender] Command sent successfully: {}", command);
103                    return Ok(response);
104                }
105                Err(e) => {
106                    warn!("[RconSender] Attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
107                    if attempt < MAX_RETRIES {
108                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
109                    }
110                }
111            }
112        }
113
114        Err(anyhow::anyhow!(
115            "[RconSender] Failed to send command after {} attempts: {}",
116            MAX_RETRIES, command
117        ))
118    }
119
120    async fn execute_with_response(&mut self, command: &str) -> Result<String> {
121        use tokio::net::TcpStream;
122
123        const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
124
125        let addr = format!("{}:{}", self.host, self.port);
126        debug!("[RconSender] Connecting to RCON at {}", addr);
127
128        let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
129            .await
130            .context("RCON connection timeout")?
131            .context(format!(
132                "[RconSender] Failed to connect to RCON at {}. Please check:\n\
133                 - enable-rcon=true\n\
134                 - rcon.port={}\n\
135                 - rcon.password=<password>",
136                addr, self.port
137            ))?;
138
139        self.authenticate(&mut stream).await?;
140        let response = self.send_packet(&mut stream, 1, 2, command).await?;
141
142        Ok(response)
143    }
144
145    async fn authenticate(&self, stream: &mut tokio::net::TcpStream) -> Result<()> {
146        const PACKET_TYPE_AUTH: i32 = 3;
147
148        let payload_bytes = self.password.as_bytes();
149        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
150
151        let mut buf = Vec::with_capacity(4 + length as usize);
152        buf.extend_from_slice(&length.to_le_bytes());
153        buf.extend_from_slice(&1_i32.to_le_bytes());
154        buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
155        buf.extend_from_slice(payload_bytes);
156        buf.push(0);
157
158        stream.write_all(&buf).await.context("[RconSender] Auth write failed")?;
159        stream.flush().await.context("[RconSender] Auth flush failed")?;
160
161        let mut len_buf = [0u8; 4];
162        stream.read_exact(&mut len_buf).await.context("[RconSender] Auth read length failed")?;
163        let resp_len = i32::from_le_bytes(len_buf);
164
165        if resp_len < 10 {
166            return Err(anyhow::anyhow!("[RconSender] Invalid auth response length: {}", resp_len));
167        }
168
169        let mut data = vec![0u8; resp_len as usize];
170        stream.read_exact(&mut data).await.context("[RconSender] Auth read data failed")?;
171
172        let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
173        if id == -1 {
174            return Err(anyhow::anyhow!("[RconSender] RCON authentication rejected"));
175        }
176
177        debug!("[RconSender] Authenticated successfully");
178        Ok(())
179    }
180
181    async fn send_packet(&self, stream: &mut tokio::net::TcpStream, packet_id: i32, packet_type: i32, payload: &str) -> Result<String> {
182        const READ_TIMEOUT: Duration = Duration::from_secs(10);
183
184        let payload_bytes = payload.as_bytes();
185        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
186
187        let mut buf = Vec::with_capacity(4 + length as usize);
188        buf.extend_from_slice(&length.to_le_bytes());
189        buf.extend_from_slice(&packet_id.to_le_bytes());
190        buf.extend_from_slice(&packet_type.to_le_bytes());
191        buf.extend_from_slice(payload_bytes);
192        buf.push(0);
193
194        let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf))
195            .await
196            .context("[RconSender] Write timeout")?;
197        write_result.context("[RconSender] Write failed")?;
198
199        stream.flush().await.context("[RconSender] Flush failed")?;
200
201        let mut len_buf = [0u8; 4];
202        let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut len_buf))
203            .await
204            .context("[RconSender] Read timeout")?;
205        read_result.context("[RconSender] Read length failed")?;
206
207        let resp_len = i32::from_le_bytes(len_buf);
208        let mut response = String::new();
209
210        if resp_len >= 10 {
211            let mut data = vec![0u8; resp_len as usize];
212            let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut data))
213                .await
214                .context("[RconSender] Read timeout")?;
215            read_result.context("[RconSender] Read data failed")?;
216
217            // Extract response text (skip 8 bytes: request_id + packet_type, remove trailing null)
218            if data.len() > 9 {
219                response = String::from_utf8_lossy(&data[8..data.len().saturating_sub(1)]).into_owned();
220            }
221        }
222
223        debug!("[RconSender] Packet sent: id={}, type={}, cmd={}", packet_id, packet_type, payload);
224        Ok(response)
225    }
226}
227
228// ============================================================
229// PooledRconSender - Persistent RCON connection with auto-reconnect
230// ============================================================
231
232pub struct PooledRconSender {
233    host: String,
234    port: u16,
235    password: String,
236    stream: Option<tokio::net::TcpStream>,
237}
238
239impl PooledRconSender {
240    pub fn new(host: String, port: u16, password: String) -> Self {
241        Self {
242            host,
243            port,
244            password,
245            stream: None,
246        }
247    }
248
249    pub async fn send_command(&mut self, command: &str) -> Result<String> {
250        const MAX_RETRIES: u32 = 3;
251
252        for attempt in 1..=MAX_RETRIES {
253            // Try existing connection first
254            if self.stream.is_some() {
255                match self.execute_command(command).await {
256                    Ok(response) => return Ok(response),
257                    Err(e) => {
258                        // Connection lost, reconnect
259                        self.stream = None;
260                        warn!("[PooledRconSender] Connection lost, reconnect attempt {}/{}: {}", attempt, MAX_RETRIES, e);
261                        // Fall through to reconnect attempt
262                    }
263                }
264            }
265
266            // No connection or lost connection, establish new one
267            if let Err(e) = self.connect().await {
268                warn!("[PooledRconSender] Connect attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
269                if attempt < MAX_RETRIES {
270                    tokio::time::sleep(Duration::from_secs(1)).await;
271                }
272                continue;
273            }
274
275            // Send command on fresh connection
276            match self.execute_command(command).await {
277                Ok(response) => return Ok(response),
278                Err(e) => {
279                    warn!("[PooledRconSender] Command failed on attempt {}: {}", attempt, e);
280                    self.stream = None;
281                }
282            }
283        }
284
285        Err(anyhow::anyhow!(
286            "[PooledRconSender] Failed after {} attempts: {}",
287            MAX_RETRIES, command
288        ))
289    }
290
291    async fn connect(&mut self) -> Result<()> {
292        use tokio::net::TcpStream;
293
294        const PACKET_TYPE_AUTH: i32 = 3;
295        const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
296
297        let addr = format!("{}:{}", self.host, self.port);
298        debug!("[PooledRconSender] Connecting to {}", addr);
299
300        let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
301            .await
302            .context(format!("[PooledRconSender] Connection timeout to {}", addr))?
303            .context(format!("[PooledRconSender] Failed to connect to RCON at {}", addr))?;
304
305        // Authenticate
306        let payload_bytes = self.password.as_bytes();
307        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
308
309        let mut buf = Vec::with_capacity(4 + length as usize);
310        buf.extend_from_slice(&length.to_le_bytes());
311        buf.extend_from_slice(&1_i32.to_le_bytes());
312        buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
313        buf.extend_from_slice(payload_bytes);
314        buf.push(0);
315
316        stream.write_all(&buf).await.context("[PooledRconSender] Auth write failed")?;
317        stream.flush().await.context("[PooledRconSender] Auth flush failed")?;
318
319        // Read auth response
320        let mut len_buf = [0u8; 4];
321        stream.read_exact(&mut len_buf).await.context("[PooledRconSender] Auth read length failed")?;
322        let resp_len = i32::from_le_bytes(len_buf);
323
324        if resp_len < 10 {
325            return Err(anyhow::anyhow!("[PooledRconSender] Invalid auth response length: {}", resp_len));
326        }
327
328        let mut data = vec![0u8; resp_len as usize];
329        stream.read_exact(&mut data).await.context("[PooledRconSender] Auth read data failed")?;
330
331        let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
332        if id == -1 {
333            return Err(anyhow::anyhow!("[PooledRconSender] RCON authentication rejected"));
334        }
335
336        debug!("[PooledRconSender] Authenticated successfully to {}", addr);
337        self.stream = Some(stream);
338        Ok(())
339    }
340
341    async fn execute_command(&mut self, command: &str) -> Result<String> {
342        const PACKET_TYPE_COMMAND: i32 = 2;
343
344        let stream = self.stream.as_mut()
345            .ok_or_else(|| anyhow::anyhow!("[PooledRconSender] No connection"))?;
346
347        // Send command packet
348        let payload_bytes = command.as_bytes();
349        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
350
351        let mut buf = Vec::with_capacity(4 + length as usize);
352        buf.extend_from_slice(&length.to_le_bytes());
353        buf.extend_from_slice(&1_i32.to_le_bytes());
354        buf.extend_from_slice(&PACKET_TYPE_COMMAND.to_le_bytes());
355        buf.extend_from_slice(payload_bytes);
356        buf.push(0);
357
358        let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf)).await
359            .context("[PooledRconSender] Write timeout")?;
360        write_result.context("[PooledRconSender] Write failed")?;
361        stream.flush().await.context("[PooledRconSender] Flush failed")?;
362
363        // Read response
364        let mut len_buf = [0u8; 4];
365        let read_result = timeout(Duration::from_secs(10), stream.read_exact(&mut len_buf)).await
366            .context("[PooledRconSender] Read timeout")?;
367        read_result.context("[PooledRconSender] Read length failed")?;
368
369        let resp_len = i32::from_le_bytes(len_buf);
370        let mut response = String::new();
371
372        if resp_len >= 10 {
373            let mut data = vec![0u8; resp_len as usize];
374            let read_result = timeout(Duration::from_secs(10), stream.read_exact(&mut data)).await
375                .context("[PooledRconSender] Read timeout")?;
376            read_result.context("[PooledRconSender] Read data failed")?;
377
378            // Extract response text (skip 8 bytes: request_id + packet_type)
379            if data.len() > 9 {
380                response = String::from_utf8_lossy(&data[8..data.len().saturating_sub(1)]).into_owned();
381            }
382        }
383
384        debug!("[PooledRconSender] Command sent: {}", command);
385        Ok(response)
386    }
387}
388
389// ============================================================
390// StdinCommandSender - Sends commands via process stdin
391// ============================================================
392
393pub struct StdinCommandSender {
394    stdin: std::sync::Arc<Mutex<ChildStdin>>,
395}
396
397impl StdinCommandSender {
398    pub fn new(stdin: std::sync::Arc<Mutex<ChildStdin>>) -> Self {
399        Self { stdin }
400    }
401
402    pub async fn send_command(&mut self, command: &str) -> Result<String> {
403        let mut stdin = self.stdin.lock().await;
404        stdin.write_all(command.as_bytes()).await.context("[StdinSender] Write failed")?;
405        stdin.write_all(b"\n").await.context("[StdinSender] Newline failed")?;
406        stdin.flush().await.context("[StdinSender] Flush failed")?;
407        debug!("[StdinSender] Command sent: {}", command);
408        Ok(format!("Command sent: {}", command))
409    }
410}
411
412// ============================================================
413// MultiCommandSender - Tries multiple senders in order
414// ============================================================
415
416pub struct MultiCommandSender {
417    senders: Vec<CommandSender>,
418}
419
420impl MultiCommandSender {
421    pub fn new() -> Self {
422        Self { senders: Vec::new() }
423    }
424
425    pub fn add_sender(&mut self, sender: CommandSender) {
426        self.senders.push(sender);
427    }
428
429    /// Send a command through the first available sender.
430    /// Returns the response text from the first successful sender.
431    pub async fn send_command(&mut self, command: &str) -> Result<String> {
432        if self.senders.is_empty() {
433            return Err(anyhow::anyhow!("[MultiSender] No command senders available"));
434        }
435
436        let mut last_error = None;
437        for sender in &mut self.senders {
438            match sender.send_command(command).await {
439                Ok(response) => {
440                    info!("[MultiSender] Command sent via {}: {}", sender.name(), command);
441                    return Ok(response);
442                }
443                Err(e) => {
444                    warn!("[MultiSender] {} failed: {}", sender.name(), e);
445                    last_error = Some(e);
446                }
447            }
448        }
449
450        Err(last_error.unwrap_or_else(|| anyhow::anyhow!("[MultiSender] All senders failed")))
451    }
452
453    /// Send a command without caring about the response text.
454    pub async fn send_command_ignore_response(&mut self, command: &str) -> Result<()> {
455        self.send_command(command).await.map(|_| ())
456    }
457
458    #[allow(dead_code)]
459    pub fn is_empty(&self) -> bool {
460        self.senders.is_empty()
461    }
462}
463
464impl Default for MultiCommandSender {
465    fn default() -> Self {
466        Self::new()
467    }
468}