Skip to main content

mc_minder/
command_sender.rs

1use anyhow::{Context, Result};
2use log::{debug, warn, info};
3use tokio::io::AsyncWriteExt;
4use tokio::process::ChildStdin;
5use tokio::sync::Mutex;
6use tokio::time::{timeout, Duration};
7
8#[derive(Clone)]
9pub enum CommandSenderMode {
10    Rcon { host: String, port: u16, password: String },
11    Stdin { stdin: std::sync::Arc<Mutex<ChildStdin>> },
12}
13
14pub struct CommandSender {
15    mode: CommandSenderMode,
16}
17
18impl CommandSender {
19    pub fn new(mode: CommandSenderMode) -> Self {
20        Self { mode }
21    }
22
23    pub fn rcon(host: String, port: u16, password: String) -> Self {
24        Self { mode: CommandSenderMode::Rcon { host, port, password } }
25    }
26
27    pub fn stdin(stdin: ChildStdin) -> Self {
28        Self { mode: CommandSenderMode::Stdin { stdin: std::sync::Arc::new(Mutex::new(stdin)) } }
29    }
30
31    pub async fn send_command(&mut self, command: &str) -> Result<()> {
32        match &mut self.mode {
33            CommandSenderMode::Rcon { host, port, password } => {
34                RconCommandSender::new(host.clone(), *port, password.clone()).send_command(command).await
35            }
36            CommandSenderMode::Stdin { stdin } => {
37                StdinCommandSender::new(stdin.clone()).send_command(command).await
38            }
39        }
40    }
41
42    pub fn name(&self) -> &'static str {
43        match &self.mode {
44            CommandSenderMode::Rcon { .. } => "RCON",
45            CommandSenderMode::Stdin { .. } => "Stdin",
46        }
47    }
48}
49
50pub struct RconCommandSender {
51    host: String,
52    port: u16,
53    password: String,
54}
55
56impl RconCommandSender {
57    pub fn new(host: String, port: u16, password: String) -> Self {
58        Self { host, port, password }
59    }
60
61    pub async fn send_command(&mut self, command: &str) -> Result<()> {
62        const MAX_RETRIES: u32 = 3;
63
64        for attempt in 1..=MAX_RETRIES {
65            match self.execute_with_retry(command).await {
66                Ok(_) => {
67                    debug!("[RconSender] Command sent successfully: {}", command);
68                    return Ok(());
69                }
70                Err(e) => {
71                    warn!("[RconSender] Attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
72                    if attempt < MAX_RETRIES {
73                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
74                    }
75                }
76            }
77        }
78
79        Err(anyhow::anyhow!(
80            "[RconSender] Failed to send command after {} attempts: {}",
81            MAX_RETRIES, command
82        ))
83    }
84
85    async fn execute_with_retry(&mut self, command: &str) -> Result<()> {
86        use tokio::net::TcpStream;
87
88        const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
89
90        let addr = format!("{}:{}", self.host, self.port);
91        debug!("[RconSender] Connecting to RCON at {}", addr);
92
93        let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
94            .await
95            .context("RCON connection timeout")?
96            .context(format!(
97                "Failed to connect to RCON at {}. Please check:\n\
98                 - enable-rcon=true\n\
99                 - rcon.port={}\n\
100                 - rcon.password=<password>",
101                addr, self.port
102            ))?;
103
104        self.authenticate(&mut stream).await?;
105        self.send_packet(&mut stream, 1, 2, command).await?;
106
107        Ok(())
108    }
109
110    async fn authenticate(&self, stream: &mut tokio::net::TcpStream) -> Result<()> {
111        use tokio::io::{AsyncReadExt, AsyncWriteExt};
112
113        const PACKET_TYPE_AUTH: i32 = 3;
114
115        let payload_bytes = self.password.as_bytes();
116        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
117
118        let mut buf = Vec::with_capacity(4 + length as usize);
119        buf.extend_from_slice(&length.to_le_bytes());
120        buf.extend_from_slice(&1_i32.to_le_bytes());
121        buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
122        buf.extend_from_slice(payload_bytes);
123        buf.push(0);
124
125        stream.write_all(&buf).await.context("RCON auth write failed")?;
126        stream.flush().await.context("RCON auth flush failed")?;
127
128        let mut len_buf = [0u8; 4];
129        stream.read_exact(&mut len_buf).await.context("RCON auth read length failed")?;
130        let resp_len = i32::from_le_bytes(len_buf);
131
132        if resp_len < 10 {
133            return Err(anyhow::anyhow!("Invalid auth response length: {}", resp_len));
134        }
135
136        let mut data = vec![0u8; resp_len as usize];
137        stream.read_exact(&mut data).await.context("RCON auth read data failed")?;
138
139        let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
140        if id == -1 {
141            return Err(anyhow::anyhow!("RCON authentication rejected"));
142        }
143
144        debug!("[RconSender] Authenticated successfully");
145        Ok(())
146    }
147
148    async fn send_packet(&self, stream: &mut tokio::net::TcpStream, packet_id: i32, packet_type: i32, payload: &str) -> Result<()> {
149        use tokio::io::{AsyncReadExt, AsyncWriteExt};
150
151        const READ_TIMEOUT: Duration = Duration::from_secs(10);
152
153        let payload_bytes = payload.as_bytes();
154        let length = 4 + 4 + payload_bytes.len() as i32 + 1;
155
156        let mut buf = Vec::with_capacity(4 + length as usize);
157        buf.extend_from_slice(&length.to_le_bytes());
158        buf.extend_from_slice(&packet_id.to_le_bytes());
159        buf.extend_from_slice(&packet_type.to_le_bytes());
160        buf.extend_from_slice(payload_bytes);
161        buf.push(0);
162
163        let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf))
164            .await
165            .context("RCON write timeout")?;
166        write_result.context("RCON write failed")?;
167
168        stream.flush().await.context("RCON flush failed")?;
169
170        let mut len_buf = [0u8; 4];
171        let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut len_buf))
172            .await
173            .context("RCON read timeout")?;
174        read_result.context("RCON read length failed")?;
175
176        let resp_len = i32::from_le_bytes(len_buf);
177        if resp_len >= 10 {
178            let mut data = vec![0u8; resp_len as usize];
179            let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut data))
180                .await
181                .context("RCON read timeout")?;
182            read_result.context("RCON read data failed")?;
183        }
184
185        debug!("[RconSender] Packet sent: id={}, type={}, cmd={}", packet_id, packet_type, payload);
186        Ok(())
187    }
188}
189
190pub struct StdinCommandSender {
191    stdin: std::sync::Arc<Mutex<ChildStdin>>,
192}
193
194impl StdinCommandSender {
195    pub fn new(stdin: std::sync::Arc<Mutex<ChildStdin>>) -> Self {
196        Self { stdin }
197    }
198
199    pub async fn send_command(&mut self, command: &str) -> Result<()> {
200        let mut stdin = self.stdin.lock().await;
201        stdin.write_all(command.as_bytes()).await.context("Stdin write failed")?;
202        stdin.write_all(b"\n").await.context("Stdin newline failed")?;
203        stdin.flush().await.context("Stdin flush failed")?;
204        debug!("[StdinSender] Command sent: {}", command);
205        Ok(())
206    }
207}
208
209pub struct MultiCommandSender {
210    senders: Vec<CommandSender>,
211}
212
213impl MultiCommandSender {
214    pub fn new() -> Self {
215        Self { senders: Vec::new() }
216    }
217
218    pub fn add_sender(&mut self, sender: CommandSender) {
219        self.senders.push(sender);
220    }
221
222    pub async fn send_command(&mut self, command: &str) -> Result<()> {
223        if self.senders.is_empty() {
224            return Err(anyhow::anyhow!("No command senders available"));
225        }
226
227        let mut last_error = None;
228        for sender in &mut self.senders {
229            match sender.send_command(command).await {
230                Ok(_) => {
231                    info!("[MultiSender] Command sent via {}: {}", sender.name(), command);
232                    return Ok(());
233                }
234                Err(e) => {
235                    warn!("[MultiSender] {} failed: {}", sender.name(), e);
236                    last_error = Some(e);
237                }
238            }
239        }
240
241        Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All senders failed")))
242    }
243
244    pub fn is_empty(&self) -> bool {
245        self.senders.is_empty()
246    }
247}
248
249impl Default for MultiCommandSender {
250    fn default() -> Self {
251        Self::new()
252    }
253}