azalea-reflection-proxy 0.1.0

Spectate and take over an azalea bot session live through a local reflection proxy — Rust port of mineflayer-reflection-proxy
Documentation
//! azalea-reflection-proxy — spectate and control an azalea bot session
//! through a local reflection proxy. Rust port of
//! aesthetic0001/mineflayer-reflection-proxy.
//!
//! The proxy owns the single real (Microsoft-authed) connection to the
//! target server. Your bot connects to the proxy locally as an offline
//! client and becomes the controller; vanilla clients that join the
//! same local address become spectators, see the bot as a live player
//! entity, and can take over with `,acquire`.
//!
//! ```no_run
//! # async fn example() -> eyre::Result<()> {
//! use azalea_reflection_proxy::ReflectionProxy;
//!
//! let proxy = ReflectionProxy::builder()
//!     .target("mc.hypixel.net")
//!     .email("you@example.com")
//!     .spawn()
//!     .await?;
//!
//! // then point your azalea bot at it instead of the real server:
//! //   ClientBuilder::new()
//! //       .set_handler(handle)
//! //       .start(Account::offline("reflected"), proxy.local_addr())
//! // and add a vanilla-client server entry for the same address to
//! // spectate. proxy.local_addr() is a real SocketAddr, so a bound
//! // port of 0 picks a free one.
//! # Ok(()) }
//! ```

mod ids;
mod local_server;
pub mod plugin;
mod reflect;
mod relay;
mod session;
mod snapshot;
mod upstream;

use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};

use eyre::Result;
use tokio::sync::{Mutex, mpsc};
use tokio::task::JoinHandle;

pub use plugin::{Frame, Pipeline, ProxyPlugin, Verdict};

/// Configuration for a reflection proxy. Build with
/// [`ReflectionProxy::builder`].
pub struct ProxyBuilder {
    bind: String,
    target_host: String,
    target_port: u16,
    email: String,
    auth_cache: Option<PathBuf>,
    plugins: Vec<Box<dyn ProxyPlugin>>,
}

impl Default for ProxyBuilder {
    fn default() -> Self {
        Self {
            bind: "127.0.0.1:25566".into(),
            target_host: "localhost".into(),
            target_port: 25565,
            email: String::new(),
            auth_cache: None,
            plugins: Vec::new(),
        }
    }
}

impl ProxyBuilder {
    /// Local address the proxy listens on (default `127.0.0.1:25566`;
    /// use port 0 for an OS-assigned free port).
    pub fn bind(mut self, addr: impl Into<String>) -> Self {
        self.bind = addr.into();
        self
    }

    /// The real server, e.g. `"mc.hypixel.net"` or `"host:port"`.
    pub fn target(mut self, host: impl Into<String>) -> Self {
        let host = host.into();
        match host.rsplit_once(':') {
            Some((h, p)) if p.parse::<u16>().is_ok() => {
                self.target_host = h.to_string();
                self.target_port = p.parse().unwrap();
            }
            _ => self.target_host = host,
        }
        self
    }

    /// Microsoft account email. Tokens are cached (and refreshed) in
    /// azalea's standard cache file unless [`Self::auth_cache`] is set,
    /// so interactive login happens at most once per account.
    pub fn email(mut self, email: impl Into<String>) -> Self {
        self.email = email.into();
        self
    }

    /// Override the auth token cache path (default:
    /// `~/.minecraft/azalea-auth.json`, shared with azalea itself).
    pub fn auth_cache(mut self, path: impl Into<PathBuf>) -> Self {
        self.auth_cache = Some(path.into());
        self
    }

    /// Add a frame-level plugin (Forward/Drop/Replace verdicts on raw
    /// packets, in registration order — the port of the original's
    /// plugin pipeline).
    pub fn plugin(mut self, p: Box<dyn ProxyPlugin>) -> Self {
        self.plugins.push(p);
        self
    }

    /// Bind the listener and start accepting clients in the background.
    pub async fn spawn(self) -> Result<ReflectionProxy> {
        if self.email.is_empty() {
            eyre::bail!("ProxyBuilder::email is required");
        }
        let listener = local_server::listen(&local_server::LocalServerConfig {
            bind: self.bind.clone(),
        })
        .await?;
        let local_addr = listener.local_addr()?;

        let cfg = Arc::new(upstream::UpstreamConfig {
            host: self.target_host,
            port: self.target_port,
            email: self.email,
            auth_cache: self.auth_cache,
        });
        let pipeline = Arc::new(Pipeline {
            plugins: self.plugins,
        });
        let registry: SessionRegistry = Arc::new(Mutex::new(None));

        let accept_task = tokio::spawn(accept_loop(listener, cfg, pipeline, registry));

        Ok(ReflectionProxy {
            local_addr,
            accept_task,
        })
    }
}

/// A running reflection proxy. Dropping the handle does NOT stop it;
/// call [`Self::shutdown`] for that.
pub struct ReflectionProxy {
    local_addr: SocketAddr,
    accept_task: JoinHandle<()>,
}

impl ReflectionProxy {
    pub fn builder() -> ProxyBuilder {
        ProxyBuilder::default()
    }

    /// The address your bot (`Account::offline(...)`) and any vanilla
    /// spectator clients should connect to.
    pub fn local_addr(&self) -> SocketAddr {
        self.local_addr
    }

    /// Stop accepting new clients. Live sessions keep running until
    /// their connections close.
    pub fn shutdown(&self) {
        self.accept_task.abort();
    }

    /// Run until the accept loop ends (i.e. forever, unless shutdown()
    /// is called or the listener fails). Handy for binary main().
    pub async fn wait(self) {
        let _ = self.accept_task.await;
    }
}

/// At most one live session; new connections attach to it as viewers.
/// When its sender reports closed the session task has exited, and the
/// next connection becomes a fresh controller.
type SessionRegistry = Arc<Mutex<Option<mpsc::Sender<session::SessionMsg>>>>;

static NEXT_CLIENT_ID: AtomicU32 = AtomicU32::new(1);

async fn accept_loop(
    listener: tokio::net::TcpListener,
    cfg: Arc<upstream::UpstreamConfig>,
    pipeline: Arc<Pipeline>,
    registry: SessionRegistry,
) {
    loop {
        let (stream, addr) = match listener.accept().await {
            Ok(x) => x,
            Err(e) => {
                tracing::error!("accept failed: {e}");
                break;
            }
        };
        tracing::info!("connection from {addr}");
        let (cfg, pipeline, registry) = (cfg.clone(), pipeline.clone(), registry.clone());
        tokio::spawn(async move {
            if let Err(e) = handle_connection(stream, cfg, pipeline, registry).await {
                // status pings land here too, so this is not an error
                tracing::info!("connection ended: {e:#}");
            }
        });
    }
}

async fn handle_connection(
    stream: tokio::net::TcpStream,
    cfg: Arc<upstream::UpstreamConfig>,
    pipeline: Arc<Pipeline>,
    registry: SessionRegistry,
) -> Result<()> {
    let local = local_server::accept_login(stream).await?;
    let username = local.username.clone();
    let id = NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed);

    // Held across the upstream connect on purpose: a second client that
    // races in while the controller is still authenticating waits here,
    // then attaches as a viewer instead of spawning a second session.
    let mut guard = registry.lock().await;

    if let Some(tx) = guard.as_ref().filter(|tx| !tx.is_closed()).cloned() {
        drop(guard);
        session::attach_viewer(&tx, id, local).await?;
        tracing::info!("'{username}' attached as viewer (client {id})");
        return Ok(());
    }

    tracing::info!("'{username}' is the controller (client {id}); connecting upstream");
    let up = upstream::connect(&cfg).await?;
    tracing::info!("upstream established as {}", up.profile.name);

    *guard = Some(session::spawn(up, local, id, pipeline));
    Ok(())
}