1use anyhow::{Context, Result, bail};
2use log::{info, debug, warn};
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::net::TcpStream;
5use tokio::time::{timeout, Duration};
6use std::sync::atomic::{AtomicBool, Ordering};
7
8const PACKET_TYPE_AUTH: i32 = 3;
9const PACKET_TYPE_EXEC_COMMAND: i32 = 2;
10const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
11const READ_TIMEOUT: Duration = Duration::from_secs(10);
12const WRITE_TIMEOUT: Duration = Duration::from_secs(10);
13
14struct Packet {
15 id: i32,
16 packet_type: i32,
17 payload: String,
18}
19
20pub struct RconClient {
21 host: String,
22 port: u16,
23 password: String,
24 stream: Option<TcpStream>,
25 connected: AtomicBool,
26}
27
28impl RconClient {
29 pub fn new(host: String, port: u16, password: String) -> Self {
30 Self {
31 host,
32 port,
33 password,
34 stream: None,
35 connected: AtomicBool::new(false),
36 }
37 }
38
39 pub fn is_connected(&self) -> bool {
40 self.connected.load(Ordering::Relaxed)
41 }
42
43 pub async fn connect(&mut self) -> Result<()> {
44 let addr = format!("{}:{}", self.host, self.port);
45 info!("Connecting to RCON at {}", addr);
46
47 let stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
48 .await
49 .context("RCON connection timeout - please check if Minecraft server is running")?
50 .context(format!(
51 "Failed to connect to RCON at {}. \n\
52 Please check server.properties:\n\
53 - enable-rcon=true\n\
54 - rcon.port=25575\n\
55 - rcon.password=<your_password>",
56 addr
57 ))?;
58
59 self.stream = Some(stream);
60
61 if let Err(e) = self.authenticate().await {
62 self.stream = None;
63 self.connected.store(false, Ordering::Relaxed);
64 bail!(
65 "RCON authentication failed. \n\
66 Please check that the password in config.toml matches rcon.password in server.properties.\n\
67 Error: {}", e
68 );
69 }
70
71 self.connected.store(true, Ordering::Relaxed);
72 info!("RCON authenticated successfully");
73 Ok(())
74 }
75
76 async fn authenticate(&mut self) -> Result<()> {
77 let packet = Packet {
78 id: 1,
79 packet_type: PACKET_TYPE_AUTH,
80 payload: self.password.clone(),
81 };
82
83 self.try_send(&packet).await?;
84 let response = self.read_packet().await?;
85
86 if response.id == -1 {
87 bail!("Authentication rejected by server");
88 }
89
90 Ok(())
91 }
92
93 async fn ensure_connected(&mut self) -> Result<()> {
94 if !self.is_connected() {
95 debug!("RCON not connected, attempting to reconnect...");
96 self.connect().await?;
97 }
98 Ok(())
99 }
100
101 async fn try_send(&mut self, packet: &Packet) -> Result<()> {
103 let stream = self.stream.as_mut().context("Not connected to RCON")?;
104
105 let payload_bytes = packet.payload.as_bytes();
106 let length = 10 + payload_bytes.len() as i32;
107
108 let mut buf = Vec::with_capacity(length as usize + 4);
109 buf.extend_from_slice(&length.to_be_bytes());
110 buf.extend_from_slice(&packet.id.to_be_bytes());
111 buf.extend_from_slice(&packet.packet_type.to_be_bytes());
112 buf.extend_from_slice(payload_bytes);
113 buf.extend_from_slice(&[0, 0]);
114
115 timeout(WRITE_TIMEOUT, stream.write_all(&buf))
116 .await
117 .context("RCON write timeout")?
118 .context("Failed to write to RCON stream")?;
119
120 let _ = timeout(WRITE_TIMEOUT, stream.flush())
121 .await
122 .context("RCON flush timeout")?;
123
124 debug!("Sent RCON packet: id={}, type={}", packet.id, packet.packet_type);
125 Ok(())
126 }
127
128 pub async fn execute(&mut self, command: &str) -> Result<String> {
129 self.ensure_connected().await?;
130
131 let packet = Packet {
132 id: 1,
133 packet_type: PACKET_TYPE_EXEC_COMMAND,
134 payload: command.to_string(),
135 };
136
137 if let Err(e) = self.try_send(&packet).await {
139 warn!("RCON send failed, attempting reconnect: {}", e);
140 self.stream = None;
141 self.connected.store(false, Ordering::Relaxed);
142 self.connect().await?;
144 self.try_send(&packet).await?;
145 }
146
147 match self.read_packet().await {
149 Ok(response) => Ok(response.payload),
150 Err(e) => {
151 warn!("RCON read failed: {}", e);
152 self.stream = None;
153 self.connected.store(false, Ordering::Relaxed);
154 bail!("RCON read failed: {}", e);
155 }
156 }
157 }
158
159 async fn read_packet(&mut self) -> Result<Packet> {
160 let stream = self.stream.as_mut().context("Not connected to RCON")?;
161
162 let mut length_buf = [0u8; 4];
163 timeout(READ_TIMEOUT, stream.read_exact(&mut length_buf))
164 .await
165 .context("RCON read length timeout")?
166 .context("Failed to read packet length")?;
167 let length = i32::from_be_bytes(length_buf);
168
169 let mut header_buf = [0u8; 8];
170 timeout(READ_TIMEOUT, stream.read_exact(&mut header_buf))
171 .await
172 .context("RCON read header timeout")?
173 .context("Failed to read packet header")?;
174 let id = i32::from_be_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]]);
175 let packet_type = i32::from_be_bytes([header_buf[4], header_buf[5], header_buf[6], header_buf[7]]);
176
177 let payload_length = (length - 10) as usize;
178 if payload_length > 4096 {
179 bail!("RCON packet too large: {} bytes", payload_length);
180 }
181
182 let mut payload = vec![0u8; payload_length];
183 if payload_length > 0 {
184 timeout(READ_TIMEOUT, stream.read_exact(&mut payload))
185 .await
186 .context("RCON read payload timeout")?
187 .context("Failed to read packet payload")?;
188 }
189
190 let mut padding = [0u8; 2];
191 timeout(READ_TIMEOUT, stream.read_exact(&mut padding))
192 .await
193 .context("RCON read padding timeout")?
194 .context("Failed to read packet padding")?;
195
196 let payload = String::from_utf8_lossy(&payload)
197 .trim_end_matches('\0')
198 .to_string();
199
200 debug!("Received RCON packet: id={}, type={}, payload_len={}", id, packet_type, payload.len());
201
202 Ok(Packet {
203 id,
204 packet_type,
205 payload,
206 })
207 }
208
209 #[allow(dead_code)]
210 pub async fn disconnect(&mut self) {
211 if let Some(mut stream) = self.stream.take() {
212 let _ = stream.shutdown().await;
213 self.connected.store(false, Ordering::Relaxed);
214 info!("Disconnected from RCON");
215 }
216 }
217
218 pub async fn say(&mut self, message: &str) -> Result<()> {
219 let command = format!("say {}", message);
220 self.execute(&command).await?;
221 Ok(())
222 }
223
224 pub async fn tell(&mut self, player: &str, message: &str) -> Result<()> {
225 let safe_message = message
226 .replace('\\', "\\\\")
227 .replace('"', "\\\"")
228 .replace('\n', "\\n");
229 let command = format!("tellraw {} {{\"text\":\"{}\"}}", player, safe_message);
230 self.execute(&command).await?;
231 Ok(())
232 }
233}