azalea_reflection_proxy/
lib.rs1mod ids;
32mod local_server;
33pub mod plugin;
34mod reflect;
35mod relay;
36mod session;
37mod snapshot;
38mod upstream;
39
40use std::net::SocketAddr;
41use std::path::PathBuf;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU32, Ordering};
44
45use eyre::Result;
46use tokio::sync::{Mutex, mpsc};
47use tokio::task::JoinHandle;
48
49pub use plugin::{Frame, Pipeline, ProxyPlugin, Verdict};
50
51pub struct ProxyBuilder {
54 bind: String,
55 target_host: String,
56 target_port: u16,
57 email: String,
58 auth_cache: Option<PathBuf>,
59 plugins: Vec<Box<dyn ProxyPlugin>>,
60}
61
62impl Default for ProxyBuilder {
63 fn default() -> Self {
64 Self {
65 bind: "127.0.0.1:25566".into(),
66 target_host: "localhost".into(),
67 target_port: 25565,
68 email: String::new(),
69 auth_cache: None,
70 plugins: Vec::new(),
71 }
72 }
73}
74
75impl ProxyBuilder {
76 pub fn bind(mut self, addr: impl Into<String>) -> Self {
79 self.bind = addr.into();
80 self
81 }
82
83 pub fn target(mut self, host: impl Into<String>) -> Self {
85 let host = host.into();
86 match host.rsplit_once(':') {
87 Some((h, p)) if p.parse::<u16>().is_ok() => {
88 self.target_host = h.to_string();
89 self.target_port = p.parse().unwrap();
90 }
91 _ => self.target_host = host,
92 }
93 self
94 }
95
96 pub fn email(mut self, email: impl Into<String>) -> Self {
100 self.email = email.into();
101 self
102 }
103
104 pub fn auth_cache(mut self, path: impl Into<PathBuf>) -> Self {
107 self.auth_cache = Some(path.into());
108 self
109 }
110
111 pub fn plugin(mut self, p: Box<dyn ProxyPlugin>) -> Self {
115 self.plugins.push(p);
116 self
117 }
118
119 pub async fn spawn(self) -> Result<ReflectionProxy> {
121 if self.email.is_empty() {
122 eyre::bail!("ProxyBuilder::email is required");
123 }
124 let listener = local_server::listen(&local_server::LocalServerConfig {
125 bind: self.bind.clone(),
126 })
127 .await?;
128 let local_addr = listener.local_addr()?;
129
130 let cfg = Arc::new(upstream::UpstreamConfig {
131 host: self.target_host,
132 port: self.target_port,
133 email: self.email,
134 auth_cache: self.auth_cache,
135 });
136 let pipeline = Arc::new(Pipeline {
137 plugins: self.plugins,
138 });
139 let registry: SessionRegistry = Arc::new(Mutex::new(None));
140
141 let accept_task = tokio::spawn(accept_loop(listener, cfg, pipeline, registry));
142
143 Ok(ReflectionProxy {
144 local_addr,
145 accept_task,
146 })
147 }
148}
149
150pub struct ReflectionProxy {
153 local_addr: SocketAddr,
154 accept_task: JoinHandle<()>,
155}
156
157impl ReflectionProxy {
158 pub fn builder() -> ProxyBuilder {
159 ProxyBuilder::default()
160 }
161
162 pub fn local_addr(&self) -> SocketAddr {
165 self.local_addr
166 }
167
168 pub fn shutdown(&self) {
171 self.accept_task.abort();
172 }
173
174 pub async fn wait(self) {
177 let _ = self.accept_task.await;
178 }
179}
180
181type SessionRegistry = Arc<Mutex<Option<mpsc::Sender<session::SessionMsg>>>>;
185
186static NEXT_CLIENT_ID: AtomicU32 = AtomicU32::new(1);
187
188async fn accept_loop(
189 listener: tokio::net::TcpListener,
190 cfg: Arc<upstream::UpstreamConfig>,
191 pipeline: Arc<Pipeline>,
192 registry: SessionRegistry,
193) {
194 loop {
195 let (stream, addr) = match listener.accept().await {
196 Ok(x) => x,
197 Err(e) => {
198 tracing::error!("accept failed: {e}");
199 break;
200 }
201 };
202 tracing::info!("connection from {addr}");
203 let (cfg, pipeline, registry) = (cfg.clone(), pipeline.clone(), registry.clone());
204 tokio::spawn(async move {
205 if let Err(e) = handle_connection(stream, cfg, pipeline, registry).await {
206 tracing::info!("connection ended: {e:#}");
208 }
209 });
210 }
211}
212
213async fn handle_connection(
214 stream: tokio::net::TcpStream,
215 cfg: Arc<upstream::UpstreamConfig>,
216 pipeline: Arc<Pipeline>,
217 registry: SessionRegistry,
218) -> Result<()> {
219 let local = local_server::accept_login(stream).await?;
220 let username = local.username.clone();
221 let id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed);
222
223 let mut guard = registry.lock().await;
227
228 if let Some(tx) = guard.as_ref().filter(|tx| !tx.is_closed()).cloned() {
229 drop(guard);
230 session::attach_viewer(&tx, id, local).await?;
231 tracing::info!("'{username}' attached as viewer (client {id})");
232 return Ok(());
233 }
234
235 tracing::info!("'{username}' is the controller (client {id}); connecting upstream");
236 let up = upstream::connect(&cfg).await?;
237 tracing::info!("upstream established as {}", up.profile.name);
238
239 *guard = Some(session::spawn(up, local, id, pipeline));
240 Ok(())
241}