1use anyhow::{Context, Result, bail};
2use log::{info, debug, warn};
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::net::TcpStream;
5use tokio::time::{timeout, Duration};
6use std::sync::atomic::{AtomicBool, Ordering};
7
8const PACKET_TYPE_AUTH: i32 = 3;
9const PACKET_TYPE_EXEC_COMMAND: i32 = 2;
10const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
11const READ_TIMEOUT: Duration = Duration::from_secs(10);
12const WRITE_TIMEOUT: Duration = Duration::from_secs(10);
13
14struct Packet {
15 id: i32,
16 packet_type: i32,
17 payload: String,
18}
19
20pub struct RconClient {
21 host: String,
22 port: u16,
23 password: String,
24 stream: Option<TcpStream>,
25 connected: AtomicBool,
26}
27
28impl RconClient {
29 pub fn new(host: String, port: u16, password: String) -> Self {
30 Self {
31 host,
32 port,
33 password,
34 stream: None,
35 connected: AtomicBool::new(false),
36 }
37 }
38
39 pub fn is_connected(&self) -> bool {
40 self.connected.load(Ordering::Relaxed)
41 }
42
43 pub async fn connect(&mut self) -> Result<()> {
44 let addr = format!("{}:{}", self.host, self.port);
45 info!("Connecting to RCON at {}", addr);
46
47 let stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
48 .await
49 .context("RCON connection timeout - please check if Minecraft server is running")?
50 .context(format!(
51 "Failed to connect to RCON at {}. \n\
52 Please check server.properties:\n\
53 - enable-rcon=true\n\
54 - rcon.port=25575\n\
55 - rcon.password=<your_password>",
56 addr
57 ))?;
58
59 self.stream = Some(stream);
60
61 if let Err(e) = self.authenticate().await {
62 self.stream = None;
63 self.connected.store(false, Ordering::Relaxed);
64 bail!(
65 "RCON authentication failed. \n\
66 Please check that the password in config.toml matches rcon.password in server.properties.\n\
67 Error: {}", e
68 );
69 }
70
71 self.connected.store(true, Ordering::Relaxed);
72 info!("RCON authenticated successfully");
73 Ok(())
74 }
75
76 async fn authenticate(&mut self) -> Result<()> {
77 let packet = Packet {
78 id: 1,
79 packet_type: PACKET_TYPE_AUTH,
80 payload: self.password.clone(),
81 };
82
83 self.try_send(&packet).await?;
84 let response = self.read_packet().await?;
85
86 if response.id == -1 {
87 bail!("Authentication rejected by server");
88 }
89
90 Ok(())
91 }
92
93 async fn ensure_connected(&mut self) -> Result<()> {
94 if !self.is_connected() {
95 debug!("RCON not connected, attempting to reconnect...");
96 self.connect().await?;
97 }
98 Ok(())
99 }
100
101 async fn try_send(&mut self, packet: &Packet) -> Result<()> {
103 let stream = self.stream.as_mut().context("Not connected to RCON")?;
104
105 let payload_bytes = packet.payload.as_bytes();
106 let length = 4 + 4 + payload_bytes.len() as i32 + 1;
108
109 let mut buf = Vec::with_capacity(4 + length as usize);
110 buf.extend_from_slice(&length.to_le_bytes());
112 buf.extend_from_slice(&packet.id.to_le_bytes());
113 buf.extend_from_slice(&packet.packet_type.to_le_bytes());
114 buf.extend_from_slice(payload_bytes);
115 buf.push(0); timeout(WRITE_TIMEOUT, stream.write_all(&buf))
118 .await
119 .context("RCON write timeout")?
120 .context("Failed to write to RCON stream")?;
121
122 let _ = timeout(WRITE_TIMEOUT, stream.flush())
123 .await
124 .context("RCON flush timeout")?;
125
126 debug!("Sent RCON packet: id={}, type={}, length={}", packet.id, packet.packet_type, length);
127 Ok(())
128 }
129
130 pub async fn execute(&mut self, command: &str) -> Result<String> {
131 self.ensure_connected().await?;
132
133 let packet = Packet {
134 id: 1,
135 packet_type: PACKET_TYPE_EXEC_COMMAND,
136 payload: command.to_string(),
137 };
138
139 if let Err(e) = self.try_send(&packet).await {
141 warn!("RCON send failed, attempting reconnect: {}", e);
142 self.stream = None;
143 self.connected.store(false, Ordering::Relaxed);
144 self.connect().await?;
146 self.try_send(&packet).await?;
147 }
148
149 match self.read_packet().await {
151 Ok(response) => Ok(response.payload),
152 Err(e) => {
153 warn!("RCON read failed: {}", e);
154 self.stream = None;
155 self.connected.store(false, Ordering::Relaxed);
156 bail!("RCON read failed: {}", e);
157 }
158 }
159 }
160
161 async fn read_packet(&mut self) -> Result<Packet> {
162 let stream = self.stream.as_mut().context("Not connected to RCON")?;
163
164 let mut length_buf = [0u8; 4];
166 timeout(READ_TIMEOUT, stream.read_exact(&mut length_buf))
167 .await
168 .context("RCON read length timeout")?
169 .context("Failed to read packet length")?;
170 let length = i32::from_le_bytes(length_buf);
171
172 if !(10..=4096).contains(&length) {
173 bail!("Invalid RCON packet length: {}", length);
174 }
175
176 let mut data = vec![0u8; length as usize];
178 timeout(READ_TIMEOUT, stream.read_exact(&mut data))
179 .await
180 .context("RCON read packet data timeout")?
181 .context("Failed to read packet data")?;
182
183 let id = i32::from_le_bytes([data[0], data[1], data[2], data[3]]);
185 let packet_type = i32::from_le_bytes([data[4], data[5], data[6], data[7]]);
186
187 let payload_length = (length - 4 - 4 - 1) as usize;
189 let payload = if payload_length > 0 {
190 String::from_utf8_lossy(&data[8..8 + payload_length])
191 .trim_end_matches('\0')
192 .to_string()
193 } else {
194 String::new()
195 };
196
197 debug!("Received RCON packet: id={}, type={}, payload_len={}, payload='{}'",
198 id, packet_type, payload.len(),
199 if payload.len() > 50 { &payload[..50] } else { &payload });
200
201 Ok(Packet {
202 id,
203 packet_type,
204 payload,
205 })
206 }
207
208 #[allow(dead_code)]
209 pub async fn disconnect(&mut self) {
210 if let Some(mut stream) = self.stream.take() {
211 let _ = stream.shutdown().await;
212 self.connected.store(false, Ordering::Relaxed);
213 info!("Disconnected from RCON");
214 }
215 }
216
217 pub async fn say(&mut self, message: &str) -> Result<()> {
218 let command = format!("say {}", message);
219 self.execute(&command).await?;
220 Ok(())
221 }
222
223 pub async fn tell(&mut self, player: &str, message: &str) -> Result<()> {
224 let safe_message = message
225 .replace('\\', "\\\\")
226 .replace('"', "\\\"")
227 .replace('\n', "\\n");
228 let command = format!("tellraw {} {{\"text\":\"{}\"}}", player, safe_message);
229 self.execute(&command).await?;
230 Ok(())
231 }
232}