mc_minder/
command_sender.rs1use anyhow::{Context, Result};
2use log::{debug, warn, info};
3use tokio::io::AsyncWriteExt;
4use tokio::process::ChildStdin;
5use tokio::sync::Mutex;
6use tokio::time::{timeout, Duration};
7
8#[derive(Clone)]
9pub enum CommandSenderMode {
10 Rcon { host: String, port: u16, password: String },
11 Stdin { stdin: std::sync::Arc<Mutex<ChildStdin>> },
12}
13
14pub struct CommandSender {
15 mode: CommandSenderMode,
16}
17
18impl CommandSender {
19 pub fn new(mode: CommandSenderMode) -> Self {
20 Self { mode }
21 }
22
23 pub fn rcon(host: String, port: u16, password: String) -> Self {
24 Self { mode: CommandSenderMode::Rcon { host, port, password } }
25 }
26
27 pub fn stdin(stdin: ChildStdin) -> Self {
28 Self { mode: CommandSenderMode::Stdin { stdin: std::sync::Arc::new(Mutex::new(stdin)) } }
29 }
30
31 pub async fn send_command(&mut self, command: &str) -> Result<()> {
32 match &mut self.mode {
33 CommandSenderMode::Rcon { host, port, password } => {
34 RconCommandSender::new(host.clone(), *port, password.clone()).send_command(command).await
35 }
36 CommandSenderMode::Stdin { stdin } => {
37 StdinCommandSender::new(stdin.clone()).send_command(command).await
38 }
39 }
40 }
41
42 pub fn name(&self) -> &'static str {
43 match &self.mode {
44 CommandSenderMode::Rcon { .. } => "RCON",
45 CommandSenderMode::Stdin { .. } => "Stdin",
46 }
47 }
48}
49
50pub struct RconCommandSender {
51 host: String,
52 port: u16,
53 password: String,
54}
55
56impl RconCommandSender {
57 pub fn new(host: String, port: u16, password: String) -> Self {
58 Self { host, port, password }
59 }
60
61 pub async fn send_command(&mut self, command: &str) -> Result<()> {
62 const MAX_RETRIES: u32 = 3;
63
64 for attempt in 1..=MAX_RETRIES {
65 match self.execute_with_retry(command).await {
66 Ok(_) => {
67 debug!("[RconSender] Command sent successfully: {}", command);
68 return Ok(());
69 }
70 Err(e) => {
71 warn!("[RconSender] Attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
72 if attempt < MAX_RETRIES {
73 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
74 }
75 }
76 }
77 }
78
79 Err(anyhow::anyhow!(
80 "[RconSender] Failed to send command after {} attempts: {}",
81 MAX_RETRIES, command
82 ))
83 }
84
85 async fn execute_with_retry(&mut self, command: &str) -> Result<()> {
86 use tokio::net::TcpStream;
87
88 const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
89
90 let addr = format!("{}:{}", self.host, self.port);
91 debug!("[RconSender] Connecting to RCON at {}", addr);
92
93 let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
94 .await
95 .context("RCON connection timeout")?
96 .context(format!(
97 "Failed to connect to RCON at {}. Please check:\n\
98 - enable-rcon=true\n\
99 - rcon.port={}\n\
100 - rcon.password=<password>",
101 addr, self.port
102 ))?;
103
104 self.authenticate(&mut stream).await?;
105 self.send_packet(&mut stream, 1, 2, command).await?;
106
107 Ok(())
108 }
109
110 async fn authenticate(&self, stream: &mut tokio::net::TcpStream) -> Result<()> {
111 use tokio::io::{AsyncReadExt, AsyncWriteExt};
112
113 const PACKET_TYPE_AUTH: i32 = 3;
114
115 let payload_bytes = self.password.as_bytes();
116 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
117
118 let mut buf = Vec::with_capacity(4 + length as usize);
119 buf.extend_from_slice(&length.to_le_bytes());
120 buf.extend_from_slice(&1_i32.to_le_bytes());
121 buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
122 buf.extend_from_slice(payload_bytes);
123 buf.push(0);
124
125 stream.write_all(&buf).await.context("RCON auth write failed")?;
126 stream.flush().await.context("RCON auth flush failed")?;
127
128 let mut len_buf = [0u8; 4];
129 stream.read_exact(&mut len_buf).await.context("RCON auth read length failed")?;
130 let resp_len = i32::from_le_bytes(len_buf);
131
132 if resp_len < 10 {
133 return Err(anyhow::anyhow!("Invalid auth response length: {}", resp_len));
134 }
135
136 let mut data = vec![0u8; resp_len as usize];
137 stream.read_exact(&mut data).await.context("RCON auth read data failed")?;
138
139 let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
140 if id == -1 {
141 return Err(anyhow::anyhow!("RCON authentication rejected"));
142 }
143
144 debug!("[RconSender] Authenticated successfully");
145 Ok(())
146 }
147
148 async fn send_packet(&self, stream: &mut tokio::net::TcpStream, packet_id: i32, packet_type: i32, payload: &str) -> Result<()> {
149 use tokio::io::{AsyncReadExt, AsyncWriteExt};
150
151 const READ_TIMEOUT: Duration = Duration::from_secs(10);
152
153 let payload_bytes = payload.as_bytes();
154 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
155
156 let mut buf = Vec::with_capacity(4 + length as usize);
157 buf.extend_from_slice(&length.to_le_bytes());
158 buf.extend_from_slice(&packet_id.to_le_bytes());
159 buf.extend_from_slice(&packet_type.to_le_bytes());
160 buf.extend_from_slice(payload_bytes);
161 buf.push(0);
162
163 let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf))
164 .await
165 .context("RCON write timeout")?;
166 write_result.context("RCON write failed")?;
167
168 stream.flush().await.context("RCON flush failed")?;
169
170 let mut len_buf = [0u8; 4];
171 let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut len_buf))
172 .await
173 .context("RCON read timeout")?;
174 read_result.context("RCON read length failed")?;
175
176 let resp_len = i32::from_le_bytes(len_buf);
177 if resp_len >= 10 {
178 let mut data = vec![0u8; resp_len as usize];
179 let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut data))
180 .await
181 .context("RCON read timeout")?;
182 read_result.context("RCON read data failed")?;
183 }
184
185 debug!("[RconSender] Packet sent: id={}, type={}, cmd={}", packet_id, packet_type, payload);
186 Ok(())
187 }
188}
189
190pub struct StdinCommandSender {
191 stdin: std::sync::Arc<Mutex<ChildStdin>>,
192}
193
194impl StdinCommandSender {
195 pub fn new(stdin: std::sync::Arc<Mutex<ChildStdin>>) -> Self {
196 Self { stdin }
197 }
198
199 pub async fn send_command(&mut self, command: &str) -> Result<()> {
200 let mut stdin = self.stdin.lock().await;
201 stdin.write_all(command.as_bytes()).await.context("Stdin write failed")?;
202 stdin.write_all(b"\n").await.context("Stdin newline failed")?;
203 stdin.flush().await.context("Stdin flush failed")?;
204 debug!("[StdinSender] Command sent: {}", command);
205 Ok(())
206 }
207}
208
209pub struct MultiCommandSender {
210 senders: Vec<CommandSender>,
211}
212
213impl MultiCommandSender {
214 pub fn new() -> Self {
215 Self { senders: Vec::new() }
216 }
217
218 pub fn add_sender(&mut self, sender: CommandSender) {
219 self.senders.push(sender);
220 }
221
222 pub async fn send_command(&mut self, command: &str) -> Result<()> {
223 if self.senders.is_empty() {
224 return Err(anyhow::anyhow!("No command senders available"));
225 }
226
227 let mut last_error = None;
228 for sender in &mut self.senders {
229 match sender.send_command(command).await {
230 Ok(_) => {
231 info!("[MultiSender] Command sent via {}: {}", sender.name(), command);
232 return Ok(());
233 }
234 Err(e) => {
235 warn!("[MultiSender] {} failed: {}", sender.name(), e);
236 last_error = Some(e);
237 }
238 }
239 }
240
241 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All senders failed")))
242 }
243
244 pub fn is_empty(&self) -> bool {
245 self.senders.is_empty()
246 }
247}
248
249impl Default for MultiCommandSender {
250 fn default() -> Self {
251 Self::new()
252 }
253}