1pub mod mc;
2
3use dashmap::DashMap;
4use futures::future::BoxFuture;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::time::{Duration, Instant};
8use t_port::{Protocol, identify, tunnel};
9use tokio::net::{TcpListener, TcpStream};
10use tokio::time::timeout;
11
12pub type WakeupCallback = Arc<dyn Fn() -> BoxFuture<'static, ()> + Send + Sync>;
13
14#[derive(Clone, Copy, Debug, PartialEq)]
15pub enum WakeupCondition {
16 Motd,
17 Join,
18 Disabled,
19}
20
21pub struct Config {
22 pub listen: String,
23 pub web: Option<String>,
24 pub mc: String,
25 pub wakeup_on: WakeupCondition,
26 pub debug: bool,
27 pub on_wakeup: Option<WakeupCallback>,
28 pub is_waking: AtomicBool,
29}
30
31pub struct UserHistory {
32 pub attempts: u32,
33 pub last_seen: Instant,
34}
35
36pub async fn run(cfg: Arc<Config>) -> Result<(), Box<dyn std::error::Error>> {
37 let history: Arc<DashMap<String, UserHistory>> = Arc::new(DashMap::new());
38 let listener = TcpListener::bind(&cfg.listen).await?;
39 println!("Proxy active on {}", cfg.listen);
40
41 let history_gc = Arc::clone(&history);
42 tokio::spawn(async move {
43 let mut interval = tokio::time::interval(Duration::from_secs(60));
44 loop {
45 interval.tick().await;
46 history_gc.retain(|_, v| v.last_seen.elapsed() < Duration::from_secs(300));
47 }
48 });
49
50 loop {
51 let (socket, addr) = listener.accept().await?;
52 let cfg = Arc::clone(&cfg);
53 let history = Arc::clone(&history);
54 let ip = addr.ip().to_string();
55
56 tokio::spawn(async move {
57 let _ = process(socket, cfg, history, ip).await;
58 });
59 }
60}
61
62async fn process(
63 mut socket: TcpStream,
64 cfg: Arc<Config>,
65 history: Arc<DashMap<String, UserHistory>>,
66 ip: String,
67) -> tokio::io::Result<()> {
68 let mut head = [0u8; 8];
69 let n = timeout(Duration::from_secs(2), socket.peek(&mut head[..]))
70 .await
71 .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "peek timeout"))??;
72
73 match identify(&head[..n]) {
74 Protocol::Http => {
75 if let Some(web_target) = &cfg.web {
76 tunnel(socket, web_target.clone()).await
77 } else {
78 if cfg.debug {
79 println!("HTTP request received but no web target configured. Closing.");
80 }
81 Ok(())
82 }
83 }
84 Protocol::Binary => {
85 if let Ok(Ok(mut target)) =
86 timeout(Duration::from_secs(1), TcpStream::connect(&cfg.mc)).await
87 {
88 cfg.is_waking.store(false, Ordering::SeqCst);
89 tokio::io::copy_bidirectional(&mut socket, &mut target).await?;
90 return Ok(());
91 }
92
93 if cfg.on_wakeup.is_some() {
94 let state = mc::inspect_handshake(&socket).await;
95 mc::handler::McHandler::send_fallback(&mut socket, state, cfg, history, ip).await
96 } else {
97 if cfg.debug {
98 println!(
99 "Connection to {} failed and no on-wakeup command set. Closing.",
100 cfg.mc
101 );
102 }
103 Ok(())
104 }
105 }
106 }
107}