1use anyhow::{Context, Result};
2use log::{debug, warn, info};
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::process::ChildStdin;
5use tokio::sync::Mutex;
6use tokio::time::{timeout, Duration};
7
8#[derive(Clone)]
13#[allow(dead_code)]
14pub enum CommandSenderMode {
15 Rcon { host: String, port: u16, password: String },
16 PooledRcon { host: String, port: u16, password: String },
17 Stdin { stdin: std::sync::Arc<Mutex<ChildStdin>> },
18}
19
20pub struct CommandSender {
21 mode: CommandSenderMode,
22}
23
24#[allow(dead_code)]
25impl CommandSender {
26 pub fn new(mode: CommandSenderMode) -> Self {
27 Self { mode }
28 }
29
30 pub fn rcon(host: String, port: u16, password: String) -> Self {
31 Self { mode: CommandSenderMode::Rcon { host, port, password } }
32 }
33
34 pub fn pooled_rcon(host: String, port: u16, password: String) -> Self {
37 Self { mode: CommandSenderMode::PooledRcon { host, port, password } }
38 }
39
40 pub fn stdin(stdin: ChildStdin) -> Self {
41 Self { mode: CommandSenderMode::Stdin { stdin: std::sync::Arc::new(Mutex::new(stdin)) } }
42 }
43
44 pub async fn send_command(&mut self, command: &str) -> Result<String> {
48 match &mut self.mode {
49 CommandSenderMode::Rcon { host, port, password } => {
50 RconCommandSender::new(host.clone(), *port, password.clone())
51 .send_command(command)
52 .await
53 }
54 CommandSenderMode::PooledRcon { host, port, password } => {
55 PooledRconSender::new(host.clone(), *port, password.clone())
56 .send_command(command)
57 .await
58 }
59 CommandSenderMode::Stdin { stdin } => {
60 StdinCommandSender::new(stdin.clone()).send_command(command).await
61 }
62 }
63 }
64
65 #[allow(dead_code)]
68 pub async fn send_command_ignore_response(&mut self, command: &str) -> Result<()> {
69 match self.send_command(command).await {
70 Ok(_) => Ok(()),
71 Err(e) => Err(e),
72 }
73 }
74
75 #[allow(dead_code)]
76 pub fn name(&self) -> &'static str {
77 match &self.mode {
78 CommandSenderMode::Rcon { .. } => "RCON",
79 CommandSenderMode::PooledRcon { .. } => "PooledRCON",
80 CommandSenderMode::Stdin { .. } => "Stdin",
81 }
82 }
83}
84
85pub struct RconCommandSender {
90 host: String,
91 port: u16,
92 password: String,
93}
94
95impl RconCommandSender {
96 pub fn new(host: String, port: u16, password: String) -> Self {
97 Self { host, port, password }
98 }
99
100 pub async fn send_command(&mut self, command: &str) -> Result<String> {
101 const MAX_RETRIES: u32 = 3;
102
103 for attempt in 1..=MAX_RETRIES {
104 match self.execute_with_response(command).await {
105 Ok(response) => {
106 debug!("[RconSender] Command sent successfully: {}", command);
107 return Ok(response);
108 }
109 Err(e) => {
110 warn!("[RconSender] Attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
111 if attempt < MAX_RETRIES {
112 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113 }
114 }
115 }
116 }
117
118 Err(anyhow::anyhow!(
119 "[RconSender] Failed to send command after {} attempts: {}",
120 MAX_RETRIES, command
121 ))
122 }
123
124 async fn execute_with_response(&mut self, command: &str) -> Result<String> {
125 use tokio::net::TcpStream;
126
127 const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
128
129 let addr = format!("{}:{}", self.host, self.port);
130 debug!("[RconSender] Connecting to RCON at {}", addr);
131
132 let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
133 .await
134 .context("RCON connection timeout")?
135 .context(format!(
136 "[RconSender] Failed to connect to RCON at {}. Please check:\n\
137 - enable-rcon=true\n\
138 - rcon.port={}\n\
139 - rcon.password=<password>",
140 addr, self.port
141 ))?;
142
143 self.authenticate(&mut stream).await?;
144 let response = self.send_packet(&mut stream, 1, 2, command).await?;
145
146 Ok(response)
147 }
148
149 async fn authenticate(&self, stream: &mut tokio::net::TcpStream) -> Result<()> {
150 const PACKET_TYPE_AUTH: i32 = 3;
151
152 let payload_bytes = self.password.as_bytes();
153 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
154
155 let mut buf = Vec::with_capacity(4 + length as usize);
156 buf.extend_from_slice(&length.to_le_bytes());
157 buf.extend_from_slice(&1_i32.to_le_bytes());
158 buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
159 buf.extend_from_slice(payload_bytes);
160 buf.push(0);
161
162 stream.write_all(&buf).await.context("[RconSender] Auth write failed")?;
163 stream.flush().await.context("[RconSender] Auth flush failed")?;
164
165 let mut len_buf = [0u8; 4];
166 stream.read_exact(&mut len_buf).await.context("[RconSender] Auth read length failed")?;
167 let resp_len = i32::from_le_bytes(len_buf);
168
169 if resp_len < 10 {
170 return Err(anyhow::anyhow!("[RconSender] Invalid auth response length: {}", resp_len));
171 }
172
173 let mut data = vec![0u8; resp_len as usize];
174 stream.read_exact(&mut data).await.context("[RconSender] Auth read data failed")?;
175
176 let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
177 if id == -1 {
178 return Err(anyhow::anyhow!("[RconSender] RCON authentication rejected"));
179 }
180
181 debug!("[RconSender] Authenticated successfully");
182 Ok(())
183 }
184
185 async fn send_packet(&self, stream: &mut tokio::net::TcpStream, packet_id: i32, packet_type: i32, payload: &str) -> Result<String> {
186 const READ_TIMEOUT: Duration = Duration::from_secs(10);
187
188 let payload_bytes = payload.as_bytes();
189 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
190
191 let mut buf = Vec::with_capacity(4 + length as usize);
192 buf.extend_from_slice(&length.to_le_bytes());
193 buf.extend_from_slice(&packet_id.to_le_bytes());
194 buf.extend_from_slice(&packet_type.to_le_bytes());
195 buf.extend_from_slice(payload_bytes);
196 buf.push(0);
197
198 let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf))
199 .await
200 .context("[RconSender] Write timeout")?;
201 write_result.context("[RconSender] Write failed")?;
202
203 stream.flush().await.context("[RconSender] Flush failed")?;
204
205 let mut len_buf = [0u8; 4];
206 let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut len_buf))
207 .await
208 .context("[RconSender] Read timeout")?;
209 read_result.context("[RconSender] Read length failed")?;
210
211 let resp_len = i32::from_le_bytes(len_buf);
212 let mut response = String::new();
213
214 if resp_len >= 10 {
215 let mut data = vec![0u8; resp_len as usize];
216 let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut data))
217 .await
218 .context("[RconSender] Read timeout")?;
219 read_result.context("[RconSender] Read data failed")?;
220
221 if data.len() > 9 {
223 response = String::from_utf8_lossy(&data[8..data.len().saturating_sub(1)]).into_owned();
224 }
225 }
226
227 debug!("[RconSender] Packet sent: id={}, type={}, cmd={}", packet_id, packet_type, payload);
228 Ok(response)
229 }
230}
231
232pub struct PooledRconSender {
237 host: String,
238 port: u16,
239 password: String,
240 stream: Option<tokio::net::TcpStream>,
241}
242
243impl PooledRconSender {
244 pub fn new(host: String, port: u16, password: String) -> Self {
245 Self {
246 host,
247 port,
248 password,
249 stream: None,
250 }
251 }
252
253 pub async fn send_command(&mut self, command: &str) -> Result<String> {
254 const MAX_RETRIES: u32 = 3;
255
256 for attempt in 1..=MAX_RETRIES {
257 if self.stream.is_some() {
259 match self.execute_command(command).await {
260 Ok(response) => return Ok(response),
261 Err(e) => {
262 self.stream = None;
264 warn!("[PooledRconSender] Connection lost, reconnect attempt {}/{}: {}", attempt, MAX_RETRIES, e);
265 }
267 }
268 }
269
270 if let Err(e) = self.connect().await {
272 warn!("[PooledRconSender] Connect attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
273 if attempt < MAX_RETRIES {
274 tokio::time::sleep(Duration::from_secs(1)).await;
275 }
276 continue;
277 }
278
279 match self.execute_command(command).await {
281 Ok(response) => return Ok(response),
282 Err(e) => {
283 warn!("[PooledRconSender] Command failed on attempt {}: {}", attempt, e);
284 self.stream = None;
285 }
286 }
287 }
288
289 Err(anyhow::anyhow!(
290 "[PooledRconSender] Failed after {} attempts: {}",
291 MAX_RETRIES, command
292 ))
293 }
294
295 async fn connect(&mut self) -> Result<()> {
296 use tokio::net::TcpStream;
297
298 const PACKET_TYPE_AUTH: i32 = 3;
299 const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
300
301 let addr = format!("{}:{}", self.host, self.port);
302 debug!("[PooledRconSender] Connecting to {}", addr);
303
304 let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
305 .await
306 .context(format!("[PooledRconSender] Connection timeout to {}", addr))?
307 .context(format!("[PooledRconSender] Failed to connect to RCON at {}", addr))?;
308
309 let payload_bytes = self.password.as_bytes();
311 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
312
313 let mut buf = Vec::with_capacity(4 + length as usize);
314 buf.extend_from_slice(&length.to_le_bytes());
315 buf.extend_from_slice(&1_i32.to_le_bytes());
316 buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
317 buf.extend_from_slice(payload_bytes);
318 buf.push(0);
319
320 stream.write_all(&buf).await.context("[PooledRconSender] Auth write failed")?;
321 stream.flush().await.context("[PooledRconSender] Auth flush failed")?;
322
323 let mut len_buf = [0u8; 4];
325 stream.read_exact(&mut len_buf).await.context("[PooledRconSender] Auth read length failed")?;
326 let resp_len = i32::from_le_bytes(len_buf);
327
328 if resp_len < 10 {
329 return Err(anyhow::anyhow!("[PooledRconSender] Invalid auth response length: {}", resp_len));
330 }
331
332 let mut data = vec![0u8; resp_len as usize];
333 stream.read_exact(&mut data).await.context("[PooledRconSender] Auth read data failed")?;
334
335 let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
336 if id == -1 {
337 return Err(anyhow::anyhow!("[PooledRconSender] RCON authentication rejected"));
338 }
339
340 debug!("[PooledRconSender] Authenticated successfully to {}", addr);
341 self.stream = Some(stream);
342 Ok(())
343 }
344
345 async fn execute_command(&mut self, command: &str) -> Result<String> {
346 const PACKET_TYPE_COMMAND: i32 = 2;
347
348 let stream = self.stream.as_mut()
349 .ok_or_else(|| anyhow::anyhow!("[PooledRconSender] No connection"))?;
350
351 let payload_bytes = command.as_bytes();
353 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
354
355 let mut buf = Vec::with_capacity(4 + length as usize);
356 buf.extend_from_slice(&length.to_le_bytes());
357 buf.extend_from_slice(&1_i32.to_le_bytes());
358 buf.extend_from_slice(&PACKET_TYPE_COMMAND.to_le_bytes());
359 buf.extend_from_slice(payload_bytes);
360 buf.push(0);
361
362 let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf)).await
363 .context("[PooledRconSender] Write timeout")?;
364 write_result.context("[PooledRconSender] Write failed")?;
365 stream.flush().await.context("[PooledRconSender] Flush failed")?;
366
367 let mut len_buf = [0u8; 4];
369 let read_result = timeout(Duration::from_secs(10), stream.read_exact(&mut len_buf)).await
370 .context("[PooledRconSender] Read timeout")?;
371 read_result.context("[PooledRconSender] Read length failed")?;
372
373 let resp_len = i32::from_le_bytes(len_buf);
374 let mut response = String::new();
375
376 if resp_len >= 10 {
377 let mut data = vec![0u8; resp_len as usize];
378 let read_result = timeout(Duration::from_secs(10), stream.read_exact(&mut data)).await
379 .context("[PooledRconSender] Read timeout")?;
380 read_result.context("[PooledRconSender] Read data failed")?;
381
382 if data.len() > 9 {
384 response = String::from_utf8_lossy(&data[8..data.len().saturating_sub(1)]).into_owned();
385 }
386 }
387
388 debug!("[PooledRconSender] Command sent: {}", command);
389 Ok(response)
390 }
391}
392
393pub struct StdinCommandSender {
398 stdin: std::sync::Arc<Mutex<ChildStdin>>,
399}
400
401impl StdinCommandSender {
402 pub fn new(stdin: std::sync::Arc<Mutex<ChildStdin>>) -> Self {
403 Self { stdin }
404 }
405
406 pub async fn send_command(&mut self, command: &str) -> Result<String> {
407 let mut stdin = self.stdin.lock().await;
408 stdin.write_all(command.as_bytes()).await.context("[StdinSender] Write failed")?;
409 stdin.write_all(b"\n").await.context("[StdinSender] Newline failed")?;
410 stdin.flush().await.context("[StdinSender] Flush failed")?;
411 debug!("[StdinSender] Command sent: {}", command);
412 Ok(format!("Command sent: {}", command))
413 }
414}
415
416pub struct MultiCommandSender {
421 senders: Vec<CommandSender>,
422}
423
424impl MultiCommandSender {
425 pub fn new() -> Self {
426 Self { senders: Vec::new() }
427 }
428
429 pub fn add_sender(&mut self, sender: CommandSender) {
430 self.senders.push(sender);
431 }
432
433 pub async fn send_command(&mut self, command: &str) -> Result<String> {
436 if self.senders.is_empty() {
437 return Err(anyhow::anyhow!("[MultiSender] No command senders available"));
438 }
439
440 let mut last_error = None;
441 for sender in &mut self.senders {
442 match sender.send_command(command).await {
443 Ok(response) => {
444 info!("[MultiSender] Command sent via {}: {}", sender.name(), command);
445 return Ok(response);
446 }
447 Err(e) => {
448 warn!("[MultiSender] {} failed: {}", sender.name(), e);
449 last_error = Some(e);
450 }
451 }
452 }
453
454 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("[MultiSender] All senders failed")))
455 }
456
457 pub async fn send_command_ignore_response(&mut self, command: &str) -> Result<()> {
459 self.send_command(command).await.map(|_| ())
460 }
461
462 #[allow(dead_code)]
463 pub fn is_empty(&self) -> bool {
464 self.senders.is_empty()
465 }
466}
467
468impl Default for MultiCommandSender {
469 fn default() -> Self {
470 Self::new()
471 }
472}