Skip to main content

mc_gate/
lib.rs

1pub mod mc;
2
3use dashmap::DashMap;
4use futures::future::BoxFuture;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::time::{Duration, Instant};
8use t_port::{Protocol, identify, tunnel};
9use tokio::net::{TcpListener, TcpStream};
10use tokio::time::timeout;
11
12pub type WakeupCallback = Arc<dyn Fn() -> BoxFuture<'static, ()> + Send + Sync>;
13
14#[derive(Clone, Copy, Debug, PartialEq)]
15pub enum WakeupCondition {
16    Motd,
17    Join,
18    Disabled,
19}
20
21pub struct Config {
22    pub listen: String,
23    pub web: Option<String>,
24    pub mc: String,
25    pub wakeup_on: WakeupCondition,
26    pub debug: bool,
27    pub on_wakeup: Option<WakeupCallback>,
28    pub is_waking: AtomicBool,
29}
30
31pub struct UserHistory {
32    pub attempts: u32,
33    pub last_seen: Instant,
34}
35
36pub async fn run(cfg: Arc<Config>) -> Result<(), Box<dyn std::error::Error>> {
37    let history: Arc<DashMap<String, UserHistory>> = Arc::new(DashMap::new());
38    let listener = TcpListener::bind(&cfg.listen).await?;
39    println!("Proxy active on {}", cfg.listen);
40
41    let history_gc = Arc::clone(&history);
42    tokio::spawn(async move {
43        let mut interval = tokio::time::interval(Duration::from_secs(60));
44        loop {
45            interval.tick().await;
46            history_gc.retain(|_, v| v.last_seen.elapsed() < Duration::from_secs(300));
47        }
48    });
49
50    loop {
51        let (socket, addr) = listener.accept().await?;
52        let cfg = Arc::clone(&cfg);
53        let history = Arc::clone(&history);
54        let ip = addr.ip().to_string();
55
56        tokio::spawn(async move {
57            let _ = process(socket, cfg, history, ip).await;
58        });
59    }
60}
61
62async fn process(
63    mut socket: TcpStream,
64    cfg: Arc<Config>,
65    history: Arc<DashMap<String, UserHistory>>,
66    ip: String,
67) -> tokio::io::Result<()> {
68    let mut head = [0u8; 8];
69    let n = timeout(Duration::from_secs(2), socket.peek(&mut head[..]))
70        .await
71        .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "peek timeout"))??;
72
73    match identify(&head[..n]) {
74        Protocol::Http => {
75            if let Some(web_target) = &cfg.web {
76                tunnel(socket, web_target.clone()).await
77            } else {
78                if cfg.debug {
79                    println!("HTTP request received but no web target configured. Closing.");
80                }
81                Ok(())
82            }
83        }
84        Protocol::Binary => {
85            if let Ok(Ok(mut target)) =
86                timeout(Duration::from_secs(1), TcpStream::connect(&cfg.mc)).await
87            {
88                cfg.is_waking.store(false, Ordering::SeqCst);
89                tokio::io::copy_bidirectional(&mut socket, &mut target).await?;
90                return Ok(());
91            }
92
93            if cfg.on_wakeup.is_some() {
94                let state = mc::inspect_handshake(&socket).await;
95                mc::handler::McHandler::send_fallback(&mut socket, state, cfg, history, ip).await
96            } else {
97                if cfg.debug {
98                    println!(
99                        "Connection to {} failed and no on-wakeup command set. Closing.",
100                        cfg.mc
101                    );
102                }
103                Ok(())
104            }
105        }
106    }
107}