Skip to main content

mc_minder/
command_sender.rs

1use anyhow::{Context, Result};
2use log::{debug, warn, info};
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::process::ChildStdin;
5use tokio::sync::Mutex;
6use tokio::time::{timeout, Duration};
7
8// ============================================================
9// CommandSenderMode - Enum for different transport modes
10// ============================================================
11
12#[derive(Clone)]
13#[allow(dead_code)]
14pub enum CommandSenderMode {
15    Rcon { host: String, port: u16, password: String },
16    PooledRcon { host: String, port: u16, password: String },
17    Stdin { stdin: std::sync::Arc<Mutex<ChildStdin>> },
18}
19
20pub struct CommandSender {
21    mode: CommandSenderMode,
22}
23
24#[allow(dead_code)]
25impl CommandSender {
26    pub fn new(mode: CommandSenderMode) -> Self {
27        Self { mode }
28    }
29
30    pub fn rcon(host: String, port: u16, password: String) -> Self {
31        Self { mode: CommandSenderMode::Rcon { host, port, password } }
32    }
33
34    /// Create a pooled RCON sender that maintains a persistent connection.
35    /// This is more efficient than creating a new connection per command.
36    pub fn pooled_rcon(host: String, port: u16, password: String) -> Self {
37        Self { mode: CommandSenderMode::PooledRcon { host, port, password } }
38    }
39
40    pub fn stdin(stdin: ChildStdin) -> Self {
41        Self { mode: CommandSenderMode::Stdin { stdin: std::sync::Arc::new(Mutex::new(stdin)) } }
42    }
43
44    /// Send a command and return the response text (if available).
45    /// For RCON, this returns the server's response to the command.
46    /// For Stdin, this returns a confirmation string.
47    pub async fn send_command(&mut self, command: &str) -> Result<String> {
48        match &mut self.mode {
49            CommandSenderMode::Rcon { host, port, password } => {
50                RconCommandSender::new(host.clone(), *port, password.clone())
51                    .send_command(command)
52                    .await
53            }
54            CommandSenderMode::PooledRcon { host, port, password } => {
55                PooledRconSender::new(host.clone(), *port, password.clone())
56                    .send_command(command)
57                    .await
58            }
59            CommandSenderMode::Stdin { stdin } => {
60                StdinCommandSender::new(stdin.clone()).send_command(command).await
61            }
62        }
63    }
64
65    /// Send a command without caring about the response.
66    /// Useful for fire-and-forget commands like `say` and `tellraw`.
67    #[allow(dead_code)]
68    pub async fn send_command_ignore_response(&mut self, command: &str) -> Result<()> {
69        match self.send_command(command).await {
70            Ok(_) => Ok(()),
71            Err(e) => Err(e),
72        }
73    }
74
75    #[allow(dead_code)]
76    pub fn name(&self) -> &'static str {
77        match &self.mode {
78            CommandSenderMode::Rcon { .. } => "RCON",
79            CommandSenderMode::PooledRcon { .. } => "PooledRCON",
80            CommandSenderMode::Stdin { .. } => "Stdin",
81        }
82    }
83}
84
85// ============================================================
86// RconCommandSender - Single-shot RCON connection per command
87// ============================================================
88
89pub struct RconCommandSender {
90    host: String,
91    port: u16,
92    password: String,
93}
94
95impl RconCommandSender {
96    pub fn new(host: String, port: u16, password: String) -> Self {
97        Self { host, port, password }
98    }
99
100    pub async fn send_command(&mut self, command: &str) -> Result<String> {
101        const MAX_RETRIES: u32 = 3;
102
103        for attempt in 1..=MAX_RETRIES {
104            match self.execute_with_response(command).await {
105                Ok(response) => {
106                    debug!("[RconSender] Command sent successfully: {}", command);
107                    return Ok(response);
108                }
109                Err(e) => {
110                    warn!("[RconSender] Attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
111                    if attempt < MAX_RETRIES {
112                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113                    }
114                }
115            }
116        }
117
118        Err(anyhow::anyhow!(
119            "[RconSender] Failed to send command after {} attempts: {}",
120            MAX_RETRIES, command
121        ))
122    }
123
124    async fn execute_with_response(&mut self, command: &str) -> Result<String> {
125        use tokio::net::TcpStream;
126
127        const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
128
129        let addr = format!("{}:{}", self.host, self.port);
130        debug!("[RconSender] Connecting to RCON at {}", addr);
131
132        let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
133            .await
134            .context("RCON connection timeout")?
135            .context(format!(
136                "[RconSender] Failed to connect to RCON at {}. Please check:\n\
137                 - enable-rcon=true\n\
138                 - rcon.port={}\n\
139                 - rcon.password=<password>",
140                addr, self.port
141            ))?;
142
143        self.authenticate(&mut stream).await?;
144        let response = self.send_packet(&mut stream, 1, 2, command).await?;
145
146        Ok(response)
147    }
148
149    async fn authenticate(&self, stream: &mut tokio::net::TcpStream) -> Result<()> {
150        const PACKET_TYPE_AUTH: i32 = 3;
151
152        let payload_bytes = self.password.as_bytes();
153        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
154
155        let mut buf = Vec::with_capacity(4 + length as usize);
156        buf.extend_from_slice(&length.to_le_bytes());
157        buf.extend_from_slice(&1_i32.to_le_bytes());
158        buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
159        buf.extend_from_slice(payload_bytes);
160        buf.push(0);
161
162        stream.write_all(&buf).await.context("[RconSender] Auth write failed")?;
163        stream.flush().await.context("[RconSender] Auth flush failed")?;
164
165        let mut len_buf = [0u8; 4];
166        stream.read_exact(&mut len_buf).await.context("[RconSender] Auth read length failed")?;
167        let resp_len = i32::from_le_bytes(len_buf);
168
169        if resp_len < 10 {
170            return Err(anyhow::anyhow!("[RconSender] Invalid auth response length: {}", resp_len));
171        }
172
173        let mut data = vec![0u8; resp_len as usize];
174        stream.read_exact(&mut data).await.context("[RconSender] Auth read data failed")?;
175
176        let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
177        if id == -1 {
178            return Err(anyhow::anyhow!("[RconSender] RCON authentication rejected"));
179        }
180
181        debug!("[RconSender] Authenticated successfully");
182        Ok(())
183    }
184
185    async fn send_packet(&self, stream: &mut tokio::net::TcpStream, packet_id: i32, packet_type: i32, payload: &str) -> Result<String> {
186        const READ_TIMEOUT: Duration = Duration::from_secs(10);
187
188        let payload_bytes = payload.as_bytes();
189        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
190
191        let mut buf = Vec::with_capacity(4 + length as usize);
192        buf.extend_from_slice(&length.to_le_bytes());
193        buf.extend_from_slice(&packet_id.to_le_bytes());
194        buf.extend_from_slice(&packet_type.to_le_bytes());
195        buf.extend_from_slice(payload_bytes);
196        buf.push(0);
197
198        let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf))
199            .await
200            .context("[RconSender] Write timeout")?;
201        write_result.context("[RconSender] Write failed")?;
202
203        stream.flush().await.context("[RconSender] Flush failed")?;
204
205        let mut len_buf = [0u8; 4];
206        let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut len_buf))
207            .await
208            .context("[RconSender] Read timeout")?;
209        read_result.context("[RconSender] Read length failed")?;
210
211        let resp_len = i32::from_le_bytes(len_buf);
212        let mut response = String::new();
213
214        if resp_len >= 10 {
215            let mut data = vec![0u8; resp_len as usize];
216            let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut data))
217                .await
218                .context("[RconSender] Read timeout")?;
219            read_result.context("[RconSender] Read data failed")?;
220
221            // Extract response text (skip 8 bytes: request_id + packet_type, remove trailing null)
222            if data.len() > 9 {
223                response = String::from_utf8_lossy(&data[8..data.len().saturating_sub(1)]).into_owned();
224            }
225        }
226
227        debug!("[RconSender] Packet sent: id={}, type={}, cmd={}", packet_id, packet_type, payload);
228        Ok(response)
229    }
230}
231
232// ============================================================
233// PooledRconSender - Persistent RCON connection with auto-reconnect
234// ============================================================
235
236pub struct PooledRconSender {
237    host: String,
238    port: u16,
239    password: String,
240    stream: Option<tokio::net::TcpStream>,
241}
242
243impl PooledRconSender {
244    pub fn new(host: String, port: u16, password: String) -> Self {
245        Self {
246            host,
247            port,
248            password,
249            stream: None,
250        }
251    }
252
253    pub async fn send_command(&mut self, command: &str) -> Result<String> {
254        const MAX_RETRIES: u32 = 3;
255
256        for attempt in 1..=MAX_RETRIES {
257            // Try existing connection first
258            if self.stream.is_some() {
259                match self.execute_command(command).await {
260                    Ok(response) => return Ok(response),
261                    Err(e) => {
262                        // Connection lost, reconnect
263                        self.stream = None;
264                        warn!("[PooledRconSender] Connection lost, reconnect attempt {}/{}: {}", attempt, MAX_RETRIES, e);
265                        // Fall through to reconnect attempt
266                    }
267                }
268            }
269
270            // No connection or lost connection, establish new one
271            if let Err(e) = self.connect().await {
272                warn!("[PooledRconSender] Connect attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
273                if attempt < MAX_RETRIES {
274                    tokio::time::sleep(Duration::from_secs(1)).await;
275                }
276                continue;
277            }
278
279            // Send command on fresh connection
280            match self.execute_command(command).await {
281                Ok(response) => return Ok(response),
282                Err(e) => {
283                    warn!("[PooledRconSender] Command failed on attempt {}: {}", attempt, e);
284                    self.stream = None;
285                }
286            }
287        }
288
289        Err(anyhow::anyhow!(
290            "[PooledRconSender] Failed after {} attempts: {}",
291            MAX_RETRIES, command
292        ))
293    }
294
295    async fn connect(&mut self) -> Result<()> {
296        use tokio::net::TcpStream;
297
298        const PACKET_TYPE_AUTH: i32 = 3;
299        const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
300
301        let addr = format!("{}:{}", self.host, self.port);
302        debug!("[PooledRconSender] Connecting to {}", addr);
303
304        let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
305            .await
306            .context(format!("[PooledRconSender] Connection timeout to {}", addr))?
307            .context(format!("[PooledRconSender] Failed to connect to RCON at {}", addr))?;
308
309        // Authenticate
310        let payload_bytes = self.password.as_bytes();
311        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
312
313        let mut buf = Vec::with_capacity(4 + length as usize);
314        buf.extend_from_slice(&length.to_le_bytes());
315        buf.extend_from_slice(&1_i32.to_le_bytes());
316        buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
317        buf.extend_from_slice(payload_bytes);
318        buf.push(0);
319
320        stream.write_all(&buf).await.context("[PooledRconSender] Auth write failed")?;
321        stream.flush().await.context("[PooledRconSender] Auth flush failed")?;
322
323        // Read auth response
324        let mut len_buf = [0u8; 4];
325        stream.read_exact(&mut len_buf).await.context("[PooledRconSender] Auth read length failed")?;
326        let resp_len = i32::from_le_bytes(len_buf);
327
328        if resp_len < 10 {
329            return Err(anyhow::anyhow!("[PooledRconSender] Invalid auth response length: {}", resp_len));
330        }
331
332        let mut data = vec![0u8; resp_len as usize];
333        stream.read_exact(&mut data).await.context("[PooledRconSender] Auth read data failed")?;
334
335        let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
336        if id == -1 {
337            return Err(anyhow::anyhow!("[PooledRconSender] RCON authentication rejected"));
338        }
339
340        debug!("[PooledRconSender] Authenticated successfully to {}", addr);
341        self.stream = Some(stream);
342        Ok(())
343    }
344
345    async fn execute_command(&mut self, command: &str) -> Result<String> {
346        const PACKET_TYPE_COMMAND: i32 = 2;
347
348        let stream = self.stream.as_mut()
349            .ok_or_else(|| anyhow::anyhow!("[PooledRconSender] No connection"))?;
350
351        // Send command packet
352        let payload_bytes = command.as_bytes();
353        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
354
355        let mut buf = Vec::with_capacity(4 + length as usize);
356        buf.extend_from_slice(&length.to_le_bytes());
357        buf.extend_from_slice(&1_i32.to_le_bytes());
358        buf.extend_from_slice(&PACKET_TYPE_COMMAND.to_le_bytes());
359        buf.extend_from_slice(payload_bytes);
360        buf.push(0);
361
362        let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf)).await
363            .context("[PooledRconSender] Write timeout")?;
364        write_result.context("[PooledRconSender] Write failed")?;
365        stream.flush().await.context("[PooledRconSender] Flush failed")?;
366
367        // Read response
368        let mut len_buf = [0u8; 4];
369        let read_result = timeout(Duration::from_secs(10), stream.read_exact(&mut len_buf)).await
370            .context("[PooledRconSender] Read timeout")?;
371        read_result.context("[PooledRconSender] Read length failed")?;
372
373        let resp_len = i32::from_le_bytes(len_buf);
374        let mut response = String::new();
375
376        if resp_len >= 10 {
377            let mut data = vec![0u8; resp_len as usize];
378            let read_result = timeout(Duration::from_secs(10), stream.read_exact(&mut data)).await
379                .context("[PooledRconSender] Read timeout")?;
380            read_result.context("[PooledRconSender] Read data failed")?;
381
382            // Extract response text (skip 8 bytes: request_id + packet_type)
383            if data.len() > 9 {
384                response = String::from_utf8_lossy(&data[8..data.len().saturating_sub(1)]).into_owned();
385            }
386        }
387
388        debug!("[PooledRconSender] Command sent: {}", command);
389        Ok(response)
390    }
391}
392
393// ============================================================
394// StdinCommandSender - Sends commands via process stdin
395// ============================================================
396
397pub struct StdinCommandSender {
398    stdin: std::sync::Arc<Mutex<ChildStdin>>,
399}
400
401impl StdinCommandSender {
402    pub fn new(stdin: std::sync::Arc<Mutex<ChildStdin>>) -> Self {
403        Self { stdin }
404    }
405
406    pub async fn send_command(&mut self, command: &str) -> Result<String> {
407        let mut stdin = self.stdin.lock().await;
408        stdin.write_all(command.as_bytes()).await.context("[StdinSender] Write failed")?;
409        stdin.write_all(b"\n").await.context("[StdinSender] Newline failed")?;
410        stdin.flush().await.context("[StdinSender] Flush failed")?;
411        debug!("[StdinSender] Command sent: {}", command);
412        Ok(format!("Command sent: {}", command))
413    }
414}
415
416// ============================================================
417// MultiCommandSender - Tries multiple senders in order
418// ============================================================
419
420pub struct MultiCommandSender {
421    senders: Vec<CommandSender>,
422}
423
424impl MultiCommandSender {
425    pub fn new() -> Self {
426        Self { senders: Vec::new() }
427    }
428
429    pub fn add_sender(&mut self, sender: CommandSender) {
430        self.senders.push(sender);
431    }
432
433    /// Send a command through the first available sender.
434    /// Returns the response text from the first successful sender.
435    pub async fn send_command(&mut self, command: &str) -> Result<String> {
436        if self.senders.is_empty() {
437            return Err(anyhow::anyhow!("[MultiSender] No command senders available"));
438        }
439
440        let mut last_error = None;
441        for sender in &mut self.senders {
442            match sender.send_command(command).await {
443                Ok(response) => {
444                    info!("[MultiSender] Command sent via {}: {}", sender.name(), command);
445                    return Ok(response);
446                }
447                Err(e) => {
448                    warn!("[MultiSender] {} failed: {}", sender.name(), e);
449                    last_error = Some(e);
450                }
451            }
452        }
453
454        Err(last_error.unwrap_or_else(|| anyhow::anyhow!("[MultiSender] All senders failed")))
455    }
456
457    /// Send a command without caring about the response text.
458    pub async fn send_command_ignore_response(&mut self, command: &str) -> Result<()> {
459        self.send_command(command).await.map(|_| ())
460    }
461
462    #[allow(dead_code)]
463    pub fn is_empty(&self) -> bool {
464        self.senders.is_empty()
465    }
466}
467
468impl Default for MultiCommandSender {
469    fn default() -> Self {
470        Self::new()
471    }
472}