1use anyhow::{Context, Result};
2use log::{debug, warn, info};
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::process::ChildStdin;
5use tokio::sync::Mutex;
6use tokio::time::{timeout, Duration};
7
8#[derive(Clone)]
13pub enum CommandSenderMode {
14 Rcon { host: String, port: u16, password: String },
15 PooledRcon { host: String, port: u16, password: String },
16 Stdin { stdin: std::sync::Arc<Mutex<ChildStdin>> },
17}
18
19pub struct CommandSender {
20 mode: CommandSenderMode,
21}
22
23impl CommandSender {
24 pub fn new(mode: CommandSenderMode) -> Self {
25 Self { mode }
26 }
27
28 pub fn rcon(host: String, port: u16, password: String) -> Self {
29 Self { mode: CommandSenderMode::Rcon { host, port, password } }
30 }
31
32 pub fn pooled_rcon(host: String, port: u16, password: String) -> Self {
35 Self { mode: CommandSenderMode::PooledRcon { host, port, password } }
36 }
37
38 pub fn stdin(stdin: ChildStdin) -> Self {
39 Self { mode: CommandSenderMode::Stdin { stdin: std::sync::Arc::new(Mutex::new(stdin)) } }
40 }
41
42 pub async fn send_command(&mut self, command: &str) -> Result<String> {
46 match &mut self.mode {
47 CommandSenderMode::Rcon { host, port, password } => {
48 RconCommandSender::new(host.clone(), *port, password.clone())
49 .send_command(command)
50 .await
51 }
52 CommandSenderMode::PooledRcon { host, port, password } => {
53 PooledRconSender::new(host.clone(), *port, password.clone())
54 .send_command(command)
55 .await
56 }
57 CommandSenderMode::Stdin { stdin } => {
58 StdinCommandSender::new(stdin.clone()).send_command(command).await
59 }
60 }
61 }
62
63 pub async fn send_command_ignore_response(&mut self, command: &str) -> Result<()> {
66 match self.send_command(command).await {
67 Ok(_) => Ok(()),
68 Err(e) => Err(e),
69 }
70 }
71
72 pub fn name(&self) -> &'static str {
73 match &self.mode {
74 CommandSenderMode::Rcon { .. } => "RCON",
75 CommandSenderMode::PooledRcon { .. } => "PooledRCON",
76 CommandSenderMode::Stdin { .. } => "Stdin",
77 }
78 }
79}
80
81pub struct RconCommandSender {
86 host: String,
87 port: u16,
88 password: String,
89}
90
91impl RconCommandSender {
92 pub fn new(host: String, port: u16, password: String) -> Self {
93 Self { host, port, password }
94 }
95
96 pub async fn send_command(&mut self, command: &str) -> Result<String> {
97 const MAX_RETRIES: u32 = 3;
98
99 for attempt in 1..=MAX_RETRIES {
100 match self.execute_with_response(command).await {
101 Ok(response) => {
102 debug!("[RconSender] Command sent successfully: {}", command);
103 return Ok(response);
104 }
105 Err(e) => {
106 warn!("[RconSender] Attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
107 if attempt < MAX_RETRIES {
108 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
109 }
110 }
111 }
112 }
113
114 Err(anyhow::anyhow!(
115 "[RconSender] Failed to send command after {} attempts: {}",
116 MAX_RETRIES, command
117 ))
118 }
119
120 async fn execute_with_response(&mut self, command: &str) -> Result<String> {
121 use tokio::net::TcpStream;
122
123 const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
124
125 let addr = format!("{}:{}", self.host, self.port);
126 debug!("[RconSender] Connecting to RCON at {}", addr);
127
128 let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
129 .await
130 .context("RCON connection timeout")?
131 .context(format!(
132 "[RconSender] Failed to connect to RCON at {}. Please check:\n\
133 - enable-rcon=true\n\
134 - rcon.port={}\n\
135 - rcon.password=<password>",
136 addr, self.port
137 ))?;
138
139 self.authenticate(&mut stream).await?;
140 let response = self.send_packet(&mut stream, 1, 2, command).await?;
141
142 Ok(response)
143 }
144
145 async fn authenticate(&self, stream: &mut tokio::net::TcpStream) -> Result<()> {
146 const PACKET_TYPE_AUTH: i32 = 3;
147
148 let payload_bytes = self.password.as_bytes();
149 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
150
151 let mut buf = Vec::with_capacity(4 + length as usize);
152 buf.extend_from_slice(&length.to_le_bytes());
153 buf.extend_from_slice(&1_i32.to_le_bytes());
154 buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
155 buf.extend_from_slice(payload_bytes);
156 buf.push(0);
157
158 stream.write_all(&buf).await.context("[RconSender] Auth write failed")?;
159 stream.flush().await.context("[RconSender] Auth flush failed")?;
160
161 let mut len_buf = [0u8; 4];
162 stream.read_exact(&mut len_buf).await.context("[RconSender] Auth read length failed")?;
163 let resp_len = i32::from_le_bytes(len_buf);
164
165 if resp_len < 10 {
166 return Err(anyhow::anyhow!("[RconSender] Invalid auth response length: {}", resp_len));
167 }
168
169 let mut data = vec![0u8; resp_len as usize];
170 stream.read_exact(&mut data).await.context("[RconSender] Auth read data failed")?;
171
172 let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
173 if id == -1 {
174 return Err(anyhow::anyhow!("[RconSender] RCON authentication rejected"));
175 }
176
177 debug!("[RconSender] Authenticated successfully");
178 Ok(())
179 }
180
181 async fn send_packet(&self, stream: &mut tokio::net::TcpStream, packet_id: i32, packet_type: i32, payload: &str) -> Result<String> {
182 const READ_TIMEOUT: Duration = Duration::from_secs(10);
183
184 let payload_bytes = payload.as_bytes();
185 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
186
187 let mut buf = Vec::with_capacity(4 + length as usize);
188 buf.extend_from_slice(&length.to_le_bytes());
189 buf.extend_from_slice(&packet_id.to_le_bytes());
190 buf.extend_from_slice(&packet_type.to_le_bytes());
191 buf.extend_from_slice(payload_bytes);
192 buf.push(0);
193
194 let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf))
195 .await
196 .context("[RconSender] Write timeout")?;
197 write_result.context("[RconSender] Write failed")?;
198
199 stream.flush().await.context("[RconSender] Flush failed")?;
200
201 let mut len_buf = [0u8; 4];
202 let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut len_buf))
203 .await
204 .context("[RconSender] Read timeout")?;
205 read_result.context("[RconSender] Read length failed")?;
206
207 let resp_len = i32::from_le_bytes(len_buf);
208 let mut response = String::new();
209
210 if resp_len >= 10 {
211 let mut data = vec![0u8; resp_len as usize];
212 let read_result = timeout(READ_TIMEOUT, stream.read_exact(&mut data))
213 .await
214 .context("[RconSender] Read timeout")?;
215 read_result.context("[RconSender] Read data failed")?;
216
217 if data.len() > 9 {
219 response = String::from_utf8_lossy(&data[8..data.len().saturating_sub(1)]).into_owned();
220 }
221 }
222
223 debug!("[RconSender] Packet sent: id={}, type={}, cmd={}", packet_id, packet_type, payload);
224 Ok(response)
225 }
226}
227
228pub struct PooledRconSender {
233 host: String,
234 port: u16,
235 password: String,
236 stream: Option<tokio::net::TcpStream>,
237}
238
239impl PooledRconSender {
240 pub fn new(host: String, port: u16, password: String) -> Self {
241 Self {
242 host,
243 port,
244 password,
245 stream: None,
246 }
247 }
248
249 pub async fn send_command(&mut self, command: &str) -> Result<String> {
250 const MAX_RETRIES: u32 = 3;
251
252 for attempt in 1..=MAX_RETRIES {
253 if self.stream.is_some() {
255 match self.execute_command(command).await {
256 Ok(response) => return Ok(response),
257 Err(e) => {
258 self.stream = None;
260 warn!("[PooledRconSender] Connection lost, reconnect attempt {}/{}: {}", attempt, MAX_RETRIES, e);
261 }
263 }
264 }
265
266 if let Err(e) = self.connect().await {
268 warn!("[PooledRconSender] Connect attempt {}/{} failed: {}", attempt, MAX_RETRIES, e);
269 if attempt < MAX_RETRIES {
270 tokio::time::sleep(Duration::from_secs(1)).await;
271 }
272 continue;
273 }
274
275 match self.execute_command(command).await {
277 Ok(response) => return Ok(response),
278 Err(e) => {
279 warn!("[PooledRconSender] Command failed on attempt {}: {}", attempt, e);
280 self.stream = None;
281 }
282 }
283 }
284
285 Err(anyhow::anyhow!(
286 "[PooledRconSender] Failed after {} attempts: {}",
287 MAX_RETRIES, command
288 ))
289 }
290
291 async fn connect(&mut self) -> Result<()> {
292 use tokio::net::TcpStream;
293
294 const PACKET_TYPE_AUTH: i32 = 3;
295 const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
296
297 let addr = format!("{}:{}", self.host, self.port);
298 debug!("[PooledRconSender] Connecting to {}", addr);
299
300 let mut stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
301 .await
302 .context(format!("[PooledRconSender] Connection timeout to {}", addr))?
303 .context(format!("[PooledRconSender] Failed to connect to RCON at {}", addr))?;
304
305 let payload_bytes = self.password.as_bytes();
307 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
308
309 let mut buf = Vec::with_capacity(4 + length as usize);
310 buf.extend_from_slice(&length.to_le_bytes());
311 buf.extend_from_slice(&1_i32.to_le_bytes());
312 buf.extend_from_slice(&PACKET_TYPE_AUTH.to_le_bytes());
313 buf.extend_from_slice(payload_bytes);
314 buf.push(0);
315
316 stream.write_all(&buf).await.context("[PooledRconSender] Auth write failed")?;
317 stream.flush().await.context("[PooledRconSender] Auth flush failed")?;
318
319 let mut len_buf = [0u8; 4];
321 stream.read_exact(&mut len_buf).await.context("[PooledRconSender] Auth read length failed")?;
322 let resp_len = i32::from_le_bytes(len_buf);
323
324 if resp_len < 10 {
325 return Err(anyhow::anyhow!("[PooledRconSender] Invalid auth response length: {}", resp_len));
326 }
327
328 let mut data = vec![0u8; resp_len as usize];
329 stream.read_exact(&mut data).await.context("[PooledRconSender] Auth read data failed")?;
330
331 let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
332 if id == -1 {
333 return Err(anyhow::anyhow!("[PooledRconSender] RCON authentication rejected"));
334 }
335
336 debug!("[PooledRconSender] Authenticated successfully to {}", addr);
337 self.stream = Some(stream);
338 Ok(())
339 }
340
341 async fn execute_command(&mut self, command: &str) -> Result<String> {
342 const PACKET_TYPE_COMMAND: i32 = 2;
343
344 let stream = self.stream.as_mut()
345 .ok_or_else(|| anyhow::anyhow!("[PooledRconSender] No connection"))?;
346
347 let payload_bytes = command.as_bytes();
349 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
350
351 let mut buf = Vec::with_capacity(4 + length as usize);
352 buf.extend_from_slice(&length.to_le_bytes());
353 buf.extend_from_slice(&1_i32.to_le_bytes());
354 buf.extend_from_slice(&PACKET_TYPE_COMMAND.to_le_bytes());
355 buf.extend_from_slice(payload_bytes);
356 buf.push(0);
357
358 let write_result = timeout(Duration::from_secs(10), stream.write_all(&buf)).await
359 .context("[PooledRconSender] Write timeout")?;
360 write_result.context("[PooledRconSender] Write failed")?;
361 stream.flush().await.context("[PooledRconSender] Flush failed")?;
362
363 let mut len_buf = [0u8; 4];
365 let read_result = timeout(Duration::from_secs(10), stream.read_exact(&mut len_buf)).await
366 .context("[PooledRconSender] Read timeout")?;
367 read_result.context("[PooledRconSender] Read length failed")?;
368
369 let resp_len = i32::from_le_bytes(len_buf);
370 let mut response = String::new();
371
372 if resp_len >= 10 {
373 let mut data = vec![0u8; resp_len as usize];
374 let read_result = timeout(Duration::from_secs(10), stream.read_exact(&mut data)).await
375 .context("[PooledRconSender] Read timeout")?;
376 read_result.context("[PooledRconSender] Read data failed")?;
377
378 if data.len() > 9 {
380 response = String::from_utf8_lossy(&data[8..data.len().saturating_sub(1)]).into_owned();
381 }
382 }
383
384 debug!("[PooledRconSender] Command sent: {}", command);
385 Ok(response)
386 }
387}
388
389pub struct StdinCommandSender {
394 stdin: std::sync::Arc<Mutex<ChildStdin>>,
395}
396
397impl StdinCommandSender {
398 pub fn new(stdin: std::sync::Arc<Mutex<ChildStdin>>) -> Self {
399 Self { stdin }
400 }
401
402 pub async fn send_command(&mut self, command: &str) -> Result<String> {
403 let mut stdin = self.stdin.lock().await;
404 stdin.write_all(command.as_bytes()).await.context("[StdinSender] Write failed")?;
405 stdin.write_all(b"\n").await.context("[StdinSender] Newline failed")?;
406 stdin.flush().await.context("[StdinSender] Flush failed")?;
407 debug!("[StdinSender] Command sent: {}", command);
408 Ok(format!("Command sent: {}", command))
409 }
410}
411
412pub struct MultiCommandSender {
417 senders: Vec<CommandSender>,
418}
419
420impl MultiCommandSender {
421 pub fn new() -> Self {
422 Self { senders: Vec::new() }
423 }
424
425 pub fn add_sender(&mut self, sender: CommandSender) {
426 self.senders.push(sender);
427 }
428
429 pub async fn send_command(&mut self, command: &str) -> Result<String> {
432 if self.senders.is_empty() {
433 return Err(anyhow::anyhow!("[MultiSender] No command senders available"));
434 }
435
436 let mut last_error = None;
437 for sender in &mut self.senders {
438 match sender.send_command(command).await {
439 Ok(response) => {
440 info!("[MultiSender] Command sent via {}: {}", sender.name(), command);
441 return Ok(response);
442 }
443 Err(e) => {
444 warn!("[MultiSender] {} failed: {}", sender.name(), e);
445 last_error = Some(e);
446 }
447 }
448 }
449
450 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("[MultiSender] All senders failed")))
451 }
452
453 pub async fn send_command_ignore_response(&mut self, command: &str) -> Result<()> {
455 self.send_command(command).await.map(|_| ())
456 }
457
458 #[allow(dead_code)]
459 pub fn is_empty(&self) -> bool {
460 self.senders.is_empty()
461 }
462}
463
464impl Default for MultiCommandSender {
465 fn default() -> Self {
466 Self::new()
467 }
468}