#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::mpsc as block_mpsc;
use std::sync::Arc;
use anyhow::{Context, Result};
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
extract::{Request, State},
http::StatusCode,
middleware::{self, Next},
response::{Html, IntoResponse, Response},
routing::get,
Router,
};
use clap::Parser;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::time::{self, Duration};
use futures_util::StreamExt;
use std::os::unix::net::UnixStream;
use x11rb::connection::Connection;
use x11rb::protocol::xproto::{self, Window};
use x11rb::protocol::shm::{self, Seg};
use x11rb::protocol::xtest;
use x11rb::rust_connection::{DefaultStream, RustConnection};
use x11rb_protocol::xauth::get_auth;
use webrtc::peer_connection::{
PeerConnection, PeerConnectionBuilder, PeerConnectionEventHandler,
RTCConfigurationBuilder, RTCIceServer, RTCPeerConnectionIceEvent,
RTCSessionDescription, RTCIceGatheringState, RTCPeerConnectionState,
RTCIceCandidateInit, MediaEngine, register_default_interceptors, Registry,
};
use webrtc::media_stream::track_local::static_sample::TrackLocalStaticSample;
use webrtc::media_stream::track_local::TrackLocal;
use webrtc::media_stream::{MediaStreamTrack, Track};
use webrtc::runtime;
use rtc_media::Sample;
use rtc::rtp_transceiver::rtp_sender::{
RtpCodecKind, RTCRtpCodec, RTCRtpCodecParameters, RTCRtpCodingParameters,
RTCRtpEncodingParameters,
};
use rtc::peer_connection::configuration::media_engine::{MIME_TYPE_H264, MIME_TYPE_OPUS};
use openh264::encoder::{Encoder, EncoderConfig, BitRate, FrameRate, UsageType, RateControlMode, IntraFramePeriod, Profile};
use openh264::formats::YUVBuffer;
use openh264::OpenH264API;
use bytes::Bytes;
use rand::Rng;
use async_trait::async_trait;
use std::os::fd::IntoRawFd;
use shiguredo_libyuv::{self, FilterMode, ImageSize, ArgbImage, I420Image, I420ImageMut};
fn with_resize_uninit(buf: &mut Vec<u8>, size: usize, write: impl FnOnce(&mut [u8])) {
buf.clear();
buf.reserve(size);
unsafe { buf.set_len(size); }
write(&mut buf[..size]);
}
#[derive(Clone)]
struct ServerState {
args: Arc<Args>,
token: Option<String>,
}
const X11_KEY_PRESS: u8 = 2;
const X11_KEY_RELEASE: u8 = 3;
const X11_BUTTON_PRESS: u8 = 4;
const X11_BUTTON_RELEASE: u8 = 5;
const X11_MOTION_NOTIFY: u8 = 6;
#[derive(Parser, Clone)]
#[command(
name = "vnrit",
version,
about = "Lightweight X11 WebRTC streaming server (pure Rust)",
long_about = "\
vnrit streams an X11 display to one or more browsers over WebRTC.
1. Start the server: vnrit --display :1
2. Open the URL in a browser (printed on startup, default http://0.0.0.0:8080)
3. Click to send keyboard/mouse events back to the X11 display.
The frontend supports touch-to-mouse translation (one-finger move,
two-finger scroll, tap = left click, long-press = right click).
Uses pure Rust: webrtc-rs for WebRTC, openh264 for H.264 encoding,
x11rb for X11 screen capture and input injection.
No GStreamer dependency.
",
after_help = "\
══════════════════════════════════════════════════════════════════
V N R I T G U I D E
══════════════════════════════════════════════════════════════════
─── CODEC ──────────────────────────────���────────────────────────
H.264 (only) built-in via openh264 (Cisco OpenH264).
Constrained Baseline profile, real-time screen content mode.
─── RECOMMENDED COMMAND ────────────────────────────────────────
vnrit --height 720 --bitrate 500
• 720p downscale (good clarity, low bandwidth)
• 500 kbps bitrate (smooth GUI at ~3 MB/min)
─── BITRATE RECOMMENDATIONS ─────────────────────────────────────
720p @ 24 fps:
300 kbps Low quality, usable for text terminals
500 kbps Good quality for GUI desktops (recommended)
1000 kbps High quality, default setting
2000+ kbps Near-lossless on static content
─── STREAM SCALING ──────────────────────────────────────────────
By default vnrit streams at the desktop's native resolution
(e.g. 1920x1080). Use --height to downscale on the server side:
vnrit --height 720 # stream at 720p (maintains aspect ratio)
vnrit --height 480 # stream at 480p (low bandwidth)
─── INPUT (WebSocket Protocol) ──────────────────────────────────
The frontend sends keyboard/mouse input as CSV lines over the
same WebSocket used for WebRTC signaling:
mouse,<x>,<y>,<button>,<pressed>
x/y = absolute pixel coordinates
button: 1=left, 2=middle, 3=right
pressed: 1=down, 0=up
Example: mouse,800,600,1,1
key,<keycode>,<pressed>
keycode = X11 keysym (see /usr/include/X11/keysymdef.h)
pressed: 1=down, 0=up
Example: key,65,1 (space bar press)
─── EXAMPLES ────────────────────────────────────────────────────
vnrit --display :1 -p 8080 --height 720 --bitrate 500
vnrit --display :0 --stun \"\" # LAN only, no STUN
─── NOTES ───────────────────────────────────────────────────────
- vnrit requires a running X11 server (Xvnc, Xvfb, or real X).
- On Termux, it connects via the Unix socket at
/data/data/com.termux/files/usr/tmp/.X11-unix/X<display>.
- No audio supported (video-only).
- Each browser tab creates a separate WebRTC connection.
"
)]
struct Args {
#[arg(
long,
default_value = ":1",
help = "X11 display to capture (e.g. :0, :1)",
long_help = "X11 display identifier to capture. Uses the standard X11 \
display format :<number>. On Termux the connection is made via a Unix socket \
at /data/data/com.termux/files/usr/tmp/.X11-unix/X<number>."
)]
display: String,
#[arg(
long,
short = 'p',
default_value = "8080",
help = "HTTP/WebSocket listen port",
)]
port: u16,
#[arg(
long,
default_value = "24",
help = "Capture framerate in fps",
)]
framerate: i32,
#[arg(
long,
default_value = "stun:stun.cloudflare.com:3478",
help = "STUN server URL (set empty string to disable)"
)]
stun: String,
#[arg(
long,
default_value = "1000",
help = "Target bitrate in kbps",
)]
bitrate: i32,
#[arg(
long,
default_value = "0",
help = "Downscale stream height in pixels (0 = no scaling)",
long_help = "If non-zero, the video stream is scaled down to the given height while \
maintaining aspect ratio. This reduces bandwidth and encoding CPU usage."
)]
height: i32,
#[arg(
long,
help = "Authentication token (if set, all connections require this token)",
long_help = "If set, all HTTP and WebSocket connections must include a 'token' query parameter \
or a 'token' cookie matching this value. The server sets a cookie on first successful \
authentication so subsequent requests (including the WebSocket upgrade) can reuse it."
)]
token: Option<String>,
#[arg(
long,
default_value = "info",
help = "Log level (off, error, warn, info, debug, trace)",
)]
log_level: String,
}
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
enum SignalingMessage {
#[serde(rename = "offer")]
Offer { sdp: String },
#[serde(rename = "answer")]
Answer { sdp: String },
#[serde(rename = "ice")]
Ice { candidate: String, sdp_mline_index: u32 },
#[serde(rename = "ready")]
Ready,
}
struct CaptureState {
conn: RustConnection,
root: Window,
}
struct InputState {
conn: RustConnection,
root: Window,
screen_w: u16,
screen_h: u16,
cursor_x: AtomicI32,
cursor_y: AtomicI32,
last_sent_x: AtomicI32,
last_sent_y: AtomicI32,
keycode_cache: HashMap<u32, u8>,
pressed_keys: std::sync::Mutex<std::collections::HashSet<u8>>,
}
struct ShmScreenCapture {
conn: Arc<CaptureState>,
width: u16,
height: u16,
shmseg: Seg,
shm_ptr: *mut u8,
shm_size: usize,
bpp: u8,
}
unsafe impl Send for ShmScreenCapture {}
unsafe impl Sync for ShmScreenCapture {}
impl ShmScreenCapture {
fn try_new(capture: Arc<CaptureState>, width: u16, height: u16, depth: u8) -> Result<Option<Self>> {
let bpp = if depth >= 24 { 4u8 } else { ((depth as u32 + 7) / 8) as u8 };
let shm_size = (width as usize) * (height as usize) * (bpp as usize);
let ver = match shm::query_version(&capture.conn) {
Ok(cookie) => match cookie.reply() {
Ok(reply) => reply,
Err(e) => {
log::debug!("[shm] MIT-SHM reply error: {:?}, falling back to get_image", e);
return Ok(None);
}
},
Err(e) => {
log::debug!("[shm] MIT-SHM query failed: {}, falling back to get_image", e);
return Ok(None);
}
};
if ver.major_version == 0 && ver.minor_version == 0 {
log::debug!("[shm] MIT-SHM extension missing, falling back to get_image");
return Ok(None);
}
log::debug!("[shm] MIT-SHM v{}.{}, allocating {} bytes ({}x{}x{})",
ver.major_version, ver.minor_version, shm_size, width, height, depth);
let shmseg = capture.conn.generate_id()
.context("failed to generate SHM seg ID")?;
let cookie = shm::create_segment(&capture.conn, shmseg, shm_size as u32, false)
.context("SHM create_segment failed")?;
let reply = cookie.reply()
.context("SHM create_segment reply failed")?;
let raw_fd = reply.shm_fd.into_raw_fd();
let shm_ptr = unsafe {
let ptr = libc::mmap(
std::ptr::null_mut(),
shm_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
raw_fd,
0,
);
if ptr == libc::MAP_FAILED {
libc::close(raw_fd);
return Err(anyhow::anyhow!("mmap failed for SHM segment: size={}", shm_size));
}
ptr as *mut u8
};
unsafe { libc::close(raw_fd); }
log::debug!("[shm] segment allocated at {:?} ({} bytes)", shm_ptr, shm_size);
Ok(Some(ShmScreenCapture {
conn: capture,
width,
height,
shmseg,
shm_ptr,
shm_size,
bpp,
}))
}
fn capture(&self, out: &mut Vec<u8>) -> Result<()> {
let cookie = shm::get_image(
&self.conn.conn,
self.conn.root, 0, 0, self.width,
self.height,
!0, 2, self.shmseg,
0, ).context("SHM get_image failed")?;
let _reply = cookie.reply().context("SHM get_image reply failed")?;
let size = (self.width as usize) * (self.height as usize) * (self.bpp as usize);
out.clear();
out.reserve(size);
unsafe {
out.set_len(size);
std::ptr::copy_nonoverlapping(self.shm_ptr, out.as_mut_ptr(), size);
}
Ok(())
}
fn dimensions(&self) -> (u16, u16) {
(self.width, self.height)
}
}
impl Drop for ShmScreenCapture {
fn drop(&mut self) {
unsafe {
libc::munmap(self.shm_ptr as *mut libc::c_void, self.shm_size);
}
let _ = shm::detach(&self.conn.conn, self.shmseg);
log::debug!("[shm] cleaned up segment {:?}", self.shm_ptr);
}
}
struct FallbackCapture {
conn: Arc<CaptureState>,
width: u16,
height: u16,
}
impl FallbackCapture {
fn capture(&self, out: &mut Vec<u8>) -> Result<()> {
let cookie = xproto::get_image(
&self.conn.conn,
xproto::ImageFormat::Z_PIXMAP,
self.conn.root,
0, 0,
self.width, self.height,
!0, ).context("get_image failed")?;
let reply = cookie.reply().context("get_image reply failed")?;
*out = reply.data;
Ok(())
}
fn dimensions(&self) -> (u16, u16) {
(self.width, self.height)
}
}
enum ScreenCapture {
Shm(ShmScreenCapture),
Fallback(FallbackCapture),
}
impl ScreenCapture {
fn capture(&self, out: &mut Vec<u8>) -> Result<()> {
match self {
ScreenCapture::Shm(s) => s.capture(out),
ScreenCapture::Fallback(f) => f.capture(out),
}
}
fn dimensions(&self) -> (u16, u16) {
match self {
ScreenCapture::Shm(s) => s.dimensions(),
ScreenCapture::Fallback(f) => f.dimensions(),
}
}
}
fn bgra_to_i420(bgra: &[u8], width: u32, height: u32, out: &mut Vec<u8>) {
let w = width as usize;
let h = height as usize;
let y_size = w * h;
let uv_size = (w / 2) * (h / 2);
let total = y_size + 2 * uv_size;
with_resize_uninit(out, total, |buf| {
let src = ArgbImage { data: bgra, stride: (width * 4) as usize };
let (y_plane, rest) = buf.split_at_mut(y_size);
let (u_plane, v_plane) = rest.split_at_mut(uv_size);
let mut dst = I420ImageMut {
y: y_plane, y_stride: width as usize,
u: u_plane, u_stride: (width / 2) as usize,
v: v_plane, v_stride: (width / 2) as usize,
};
let size = ImageSize::new(width as usize, height as usize);
let _ = shiguredo_libyuv::argb_to_i420(&src, &mut dst, size);
});
}
fn scale_bgra_direct(bgra: &[u8], src_w: u32, src_h: u32, dst_w: u32, dst_h: u32,
i420_out: &mut Vec<u8>, temp: &mut Vec<u8>) {
let src_y_size = (src_w * src_h) as usize;
let src_uv_size = ((src_w / 2) * (src_h / 2)) as usize;
let native_i420_size = src_y_size + 2 * src_uv_size;
with_resize_uninit(temp, native_i420_size, |t| {
bgra_to_i420_into(bgra, src_w, src_h, t);
});
let dst_y_size = (dst_w * dst_h) as usize;
let dst_uv_size = ((dst_w / 2) * (dst_h / 2)) as usize;
with_resize_uninit(i420_out, dst_y_size + 2 * dst_uv_size, |out| {
let (src_y, rest) = temp.split_at(src_y_size);
let (src_u, src_v) = rest.split_at(src_uv_size);
let src_img = I420Image {
y: src_y, y_stride: src_w as usize,
u: src_u, u_stride: (src_w / 2) as usize,
v: src_v, v_stride: (src_w / 2) as usize,
};
let (dst_y, rest) = out.split_at_mut(dst_y_size);
let (dst_u, dst_v) = rest.split_at_mut(dst_uv_size);
let mut dst_img = I420ImageMut {
y: dst_y, y_stride: dst_w as usize,
u: dst_u, u_stride: (dst_w / 2) as usize,
v: dst_v, v_stride: (dst_w / 2) as usize,
};
let src_size = ImageSize::new(src_w as usize, src_h as usize);
let dst_size = ImageSize::new(dst_w as usize, dst_h as usize);
let _ = shiguredo_libyuv::i420_scale(&src_img, src_size, &mut dst_img, dst_size, FilterMode::Bilinear);
});
}
fn bgra_to_i420_into(bgra: &[u8], width: u32, height: u32, out: &mut [u8]) {
let src = ArgbImage { data: bgra, stride: (width * 4) as usize };
let y_size = (width * height) as usize;
let uv_size = ((width / 2) * (height / 2)) as usize;
let (y_plane, rest) = out.split_at_mut(y_size);
let (u_plane, v_plane) = rest.split_at_mut(uv_size);
let mut dst = I420ImageMut {
y: y_plane, y_stride: width as usize,
u: u_plane, u_stride: (width / 2) as usize,
v: v_plane, v_stride: (width / 2) as usize,
};
let size = ImageSize::new(width as usize, height as usize);
let _ = shiguredo_libyuv::argb_to_i420(&src, &mut dst, size);
}
struct VideoEncoder {
inner: Encoder,
width: u32,
height: u32,
}
impl VideoEncoder {
fn new(args: &Args, width: u32, height: u32) -> Result<Self> {
let bitrate_bps = (args.bitrate as u32) * 1000;
let framerate = args.framerate as f32;
let intra_period = (args.framerate as u32) * 10;
let enc_threads = std::thread::available_parallelism()
.map(|n| (n.get() / 2).min(4).max(1) as u16)
.unwrap_or(0);
let config = EncoderConfig::new()
.num_threads(enc_threads)
.bitrate(BitRate::from_bps(bitrate_bps))
.max_frame_rate(FrameRate::from_hz(framerate))
.usage_type(UsageType::ScreenContentRealTime)
.rate_control_mode(RateControlMode::Quality)
.intra_frame_period(IntraFramePeriod::from_num_frames(intra_period))
.profile(Profile::Baseline);
let encoder = Encoder::with_api_config(OpenH264API::from_source(), config)
.context("failed to create openh264 encoder")?;
log::info!("[encoder] {} threads configured", enc_threads);
Ok(VideoEncoder {
inner: encoder,
width,
height,
})
}
fn encode(&mut self, i420: Vec<u8>, out: &mut Vec<u8>) -> Result<()> {
let yuv = YUVBuffer::from_vec(i420, self.width as usize, self.height as usize);
let bitstream = self.inner
.encode(&yuv)
.context("openh264 encode failed")?;
out.clear();
bitstream.write_vec(out);
Ok(())
}
fn force_keyframe(&mut self) {
self.inner.force_intra_frame();
}
}
struct WebrtcHandler {
ice_tx: runtime::Sender<String>,
gather_complete_tx: runtime::Sender<()>,
connected_tx: runtime::Sender<()>,
done_tx: runtime::Sender<()>,
}
#[async_trait]
impl PeerConnectionEventHandler for WebrtcHandler {
async fn on_ice_candidate(&self, event: RTCPeerConnectionIceEvent) {
log::debug!("[ice] candidate: {} ...", &event.candidate.address[..event.candidate.address.len().min(20)]);
if let Ok(init) = event.candidate.to_json() {
let msg = serde_json::to_string(&SignalingMessage::Ice {
candidate: init.candidate,
sdp_mline_index: init.sdp_mline_index.unwrap_or(0) as u32,
}).ok();
if let Some(msg) = msg {
let _ = self.ice_tx.try_send(msg);
}
}
}
async fn on_ice_gathering_state_change(&self, state: RTCIceGatheringState) {
log::debug!("[ice] gathering state: {:?}", state);
if state == RTCIceGatheringState::Complete {
let _ = self.gather_complete_tx.try_send(());
}
}
async fn on_connection_state_change(&self, state: RTCPeerConnectionState) {
log::info!("[pc] connection state: {:?}", state);
match state {
RTCPeerConnectionState::Connected => {
let _ = self.connected_tx.try_send(());
}
RTCPeerConnectionState::Failed
| RTCPeerConnectionState::Disconnected
| RTCPeerConnectionState::Closed => {
let _ = self.done_tx.try_send(());
}
_ => {}
}
}
async fn on_signaling_state_change(&self, state: webrtc::peer_connection::RTCSignalingState) {
log::info!("[pc] signaling state: {:?}", state);
}
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
env_logger::Builder::from_env(
env_logger::Env::default().default_filter_or(&args.log_level)
)
.filter(Some("rtc_ice"), log::LevelFilter::Error)
.filter(Some("rtc::peer_connection"), log::LevelFilter::Error)
.filter(Some("rtc_dtls"), log::LevelFilter::Error)
.filter(Some("openh264"), log::LevelFilter::Error)
.format_timestamp(None)
.init();
let token = args.token.clone();
let state = ServerState {
args: Arc::new(args),
token,
};
let addr = format!("0.0.0.0:{}", state.args.port);
println!("vnrit listening on http://{}", addr);
println!(" Display: {}", state.args.display);
println!(" FPS : {}", state.args.framerate);
println!(" Bitrate: {} kbps", state.args.bitrate);
if state.args.height > 0 {
println!(" Scale : {}p", state.args.height);
} else {
println!(" Scale : native (no scaling)");
}
match &state.token {
Some(t) => println!(" Auth token: {} (required)", t),
None => println!(" Auth token: none (open access)"),
}
let app = Router::new()
.route("/", get(root_handler))
.route("/ws", get(ws_handler))
.layer(middleware::from_fn_with_state(state.clone(), auth_middleware))
.with_state(state);
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
async fn root_handler(State(state): State<ServerState>) -> Html<String> {
let html = include_str!("index.html")
.replace("{{STUN_SERVER}}", &state.args.stun);
Html(html)
}
async fn ws_handler(ws: WebSocketUpgrade, State(state): State<ServerState>) -> impl IntoResponse {
ws.on_upgrade(move |ws| handle_ws(ws, state))
}
async fn auth_middleware(
State(state): State<ServerState>,
req: Request,
next: Next,
) -> Result<Response, Response> {
let expected_token = match &state.token {
Some(t) => t.clone(),
None => return Ok(next.run(req).await),
};
let query_token = req.uri().query().and_then(|q| {
for pair in q.split('&') {
let mut parts = pair.splitn(2, '=');
if parts.next() == Some("token") {
return parts.next().map(|v| v.to_string());
}
}
None
});
let cookie_token = req
.headers()
.get("Cookie")
.and_then(|c| c.to_str().ok())
.and_then(|c| {
for cookie in c.split(';') {
let trimmed = cookie.trim();
if let Some(val) = trimmed.strip_prefix("token=") {
return Some(val.to_string());
}
}
None
});
let authenticated = query_token.as_deref() == Some(&expected_token)
|| cookie_token.as_deref() == Some(&expected_token);
if !authenticated {
return Err((
StatusCode::UNAUTHORIZED,
"unauthorized — provide ?token=<token> or Cookie: token=<token>",
)
.into_response());
}
let mut response = next.run(req).await;
if query_token.as_deref() == Some(&expected_token) {
let cookie = format!(
"token={}; Path=/; HttpOnly; SameSite=Lax; Max-Age=86400",
expected_token
);
if let Ok(hv) = cookie.parse() {
response.headers_mut().insert(axum::http::header::SET_COOKIE, hv);
}
}
Ok(response)
}
fn connect_to_display(display: &str) -> Result<(RustConnection, usize)> {
match RustConnection::connect(Some(display)) {
Ok(v) => Ok(v),
Err(e) => {
log::info!("[x11] standard connect failed: {}, trying Termux socket path...", e);
let display_num: u16 = display.trim_start_matches(':').split('.').next()
.and_then(|s| s.parse().ok())
.context("invalid display format")?;
let sock = format!(
"/data/data/com.termux/files/usr/tmp/.X11-unix/X{}", display_num
);
log::info!("[x11] connecting to {}", sock);
let unix_stream = UnixStream::connect(&sock)
.context("cannot connect to Termux X11 socket")?;
let (stream, (family, address)) = DefaultStream::from_unix_stream(unix_stream)
.context("from_unix_stream failed")?;
let (auth_name, auth_data) = get_auth(family, &address, display_num)
.unwrap_or(None)
.unwrap_or_else(|| (Vec::new(), Vec::new()));
let conn = RustConnection::connect_to_stream_with_auth_info(
stream, 0, auth_name, auth_data,
).context("connect_to_stream failed")?;
log::info!("[x11] connected via Termux socket path");
Ok((conn, 0usize))
}
}
}
fn setup_x11_connections(display: &str) -> Result<(Arc<CaptureState>, Arc<InputState>, u16, u16, u8)> {
log::info!("[x11] connecting to display {} (capture connection)", display);
let (cap_conn, screen_num) = connect_to_display(display)?;
let screen = &cap_conn.setup().roots[screen_num];
let root = screen.root;
let screen_width = screen.width_in_pixels;
let screen_height = screen.height_in_pixels;
let screen_depth = screen.root_depth;
log::info!("[x11] connecting to display {} (input connection)", display);
let (inp_conn, _) = connect_to_display(display)?;
let xtest_cookie = xtest::get_version(&inp_conn, 2, 2)
.context("XTest not available")?;
xtest_cookie.reply().context("XTest query failed")?;
let ptr = xproto::query_pointer(&inp_conn, root)
.context("query_pointer failed")?
.reply()
.context("query_pointer reply failed")?;
let setup = inp_conn.setup();
let first_keycode = setup.min_keycode;
let keycode_count = setup.max_keycode - setup.min_keycode + 1;
let kbd = xproto::get_keyboard_mapping(&inp_conn, first_keycode, keycode_count)
.context("get_keyboard_mapping failed")?
.reply()
.context("get_keyboard_mapping reply failed")?;
log::info!(
"[x11] connected, root=0x{:x}, pointer=({},{}), dims={}x{}, keycodes={}-{}",
root, ptr.root_x, ptr.root_y,
screen_width, screen_height,
first_keycode, setup.max_keycode
);
let keycode_cache = {
let kpk = kbd.keysyms_per_keycode as usize;
let mut m = HashMap::new();
for (i, chunk) in kbd.keysyms.chunks(kpk).enumerate() {
let kc = first_keycode + i as u8;
for &ks in chunk {
if ks != 0 {
m.entry(ks).or_insert(kc);
}
}
}
m
};
let capture_state = Arc::new(CaptureState {
conn: cap_conn,
root,
});
let input_state = Arc::new(InputState {
conn: inp_conn,
root,
screen_w: screen_width,
screen_h: screen_height,
cursor_x: AtomicI32::new(ptr.root_x as i32),
cursor_y: AtomicI32::new(ptr.root_y as i32),
last_sent_x: AtomicI32::new(-1),
last_sent_y: AtomicI32::new(-1),
keycode_cache,
pressed_keys: std::sync::Mutex::new(std::collections::HashSet::new()),
});
Ok((capture_state, input_state, screen_width, screen_height, screen_depth))
}
fn find_default_monitor() -> Option<String> {
use libpulse_binding as pulse;
use std::sync::mpsc as block_mpsc;
let mut mainloop = pulse::mainloop::standard::Mainloop::new()?;
let mut ctx = pulse::context::Context::new(&mainloop, "vnrit-monitor-detect")?;
let (tx, rx) = block_mpsc::channel();
ctx.set_state_callback(Some(Box::new(move || {
let _ = tx.send(());
})));
if ctx.connect(None, pulse::context::FlagSet::NOFLAGS, None).is_err() {
log::info!("[audio] PA connect failed");
return None;
}
for _ in 0..200 {
if mainloop.iterate(false).is_error() { break; }
if rx.try_recv().is_ok() && ctx.get_state() == pulse::context::State::Ready {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
ctx.set_state_callback(None);
if ctx.get_state() != pulse::context::State::Ready {
log::info!("[audio] PA context not ready, fallback to default source");
return None;
}
let (info_tx, info_rx) = block_mpsc::channel();
let introspect = ctx.introspect();
introspect.get_server_info(Box::new(move |info: &pulse::context::introspect::ServerInfo| {
if let Some(ref name) = info.default_sink_name {
let _ = info_tx.send(name.to_string());
}
}));
for _ in 0..50 {
if mainloop.iterate(false).is_error() { break; }
if let Ok(name) = info_rx.try_recv() {
let monitor = format!("{}.monitor", name);
log::info!("[audio] detected default sink '{}', monitor '{}'", name, monitor);
return Some(monitor);
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
log::info!("[audio] failed to detect monitor source");
None
}
async fn handle_ws(ws: WebSocket, state: ServerState) {
log::info!("[ws] client connected");
let (capture_state, input_state, native_w, native_h, depth) = match setup_x11_connections(&state.args.display) {
Ok(v) => v,
Err(e) => {
log::error!("[x11] FATAL: failed to connect: {:#}", e);
return;
}
};
let (out_w, out_h) = if state.args.height > 0 {
let h = state.args.height as u32;
let w = (native_w as u32 * h) / native_h as u32;
(w / 2 * 2, h / 2 * 2)
} else {
(native_w as u32, native_h as u32)
};
let needs_scaling = out_w != native_w as u32 || out_h != native_h as u32;
log::info!("[capture] native={}x{} output={}x{} scaling={}",
native_w, native_h, out_w, out_h, needs_scaling);
let (out_tx, mut out_rx) = mpsc::channel::<Message>(256);
let (in_tx, mut in_rx) = mpsc::channel::<Result<Message, axum::Error>>(256);
let in_tx_task = in_tx.clone();
let io_handle = tokio::spawn(async move {
use futures_util::SinkExt;
let (mut ws_sink, mut ws_stream) = ws.split();
loop {
tokio::select! {
outgoing = out_rx.recv() => {
match outgoing {
Some(msg) => {
if let Err(e) = ws_sink.send(msg).await {
log::debug!("[wsio] send error: {}", e);
break;
}
}
None => break,
}
}
incoming = ws_stream.next() => {
match incoming {
Some(Ok(msg)) => {
if in_tx_task.send(Ok(msg)).await.is_err() {
break;
}
}
Some(Err(e)) => {
log::debug!("[wsio] recv error: {}", e);
break;
}
None => break,
}
}
}
}
log::debug!("[wsio] task ended");
});
loop {
match in_rx.recv().await {
Some(Ok(Message::Text(t))) => {
if let Ok(SignalingMessage::Ready) = serde_json::from_str(&t) {
break;
}
}
Some(Ok(Message::Close(_))) | None => {
log::debug!("[ws] disconnected before ready");
return;
}
_ => {}
}
}
log::info!("[ws] ready received, creating WebRTC peer connection...");
let mut media_engine = MediaEngine::default();
let video_codec = RTCRtpCodecParameters {
rtp_codec: RTCRtpCodec {
mime_type: MIME_TYPE_H264.to_owned(),
clock_rate: 90000,
channels: 0,
sdp_fmtp_line: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f"
.into(),
rtcp_feedback: vec![],
},
payload_type: 102,
..Default::default()
};
media_engine
.register_codec(video_codec.clone(), RtpCodecKind::Video)
.expect("failed to register H264 codec");
let audio_codec = RTCRtpCodecParameters {
rtp_codec: RTCRtpCodec {
mime_type: MIME_TYPE_OPUS.to_owned(),
clock_rate: 48000,
channels: 2,
sdp_fmtp_line: "minptime=10;useinbandfec=1".into(),
rtcp_feedback: vec![],
},
payload_type: 111,
..Default::default()
};
media_engine
.register_codec(audio_codec.clone(), RtpCodecKind::Audio)
.expect("failed to register Opus codec");
let registry = register_default_interceptors(Registry::new(), &mut media_engine)
.expect("failed to register default interceptors");
let ice_servers = if state.args.stun.is_empty() {
vec![]
} else {
vec![RTCIceServer {
urls: vec![state.args.stun.clone()],
..Default::default()
}]
};
let config = RTCConfigurationBuilder::new()
.with_ice_servers(ice_servers)
.build();
let (ice_tx, mut ice_rx) = runtime::channel::<String>(256);
let (gather_complete_tx, mut gather_complete_rx) = runtime::channel::<()>(1);
let (connected_tx, mut connected_rx) = runtime::channel::<()>(1);
let (done_tx, mut done_rx) = runtime::channel::<()>(1);
let handler = Arc::new(WebrtcHandler {
ice_tx,
gather_complete_tx,
connected_tx,
done_tx,
});
let rt = runtime::default_runtime().expect("no webrtc runtime available");
let pc = match PeerConnectionBuilder::new()
.with_configuration(config)
.with_media_engine(media_engine)
.with_interceptor_registry(registry)
.with_handler(handler.clone())
.with_runtime(rt)
.with_udp_addrs(vec![format!("{}:0", get_local_ip())])
.build()
.await
{
Ok(pc) => pc,
Err(e) => {
log::info!("[pc] build failed: {:#}", e);
return;
}
};
log::info!("[pc] PeerConnection created");
let ssrc = rand::rng().random::<u32>();
let rtp_codec = video_codec.rtp_codec.clone();
let track = match TrackLocalStaticSample::new(MediaStreamTrack::new(
"video-stream".to_owned(),
"video-track".to_owned(),
"desktop".to_owned(),
RtpCodecKind::Video,
vec![RTCRtpEncodingParameters {
rtp_coding_parameters: RTCRtpCodingParameters {
ssrc: Some(ssrc),
..Default::default()
},
codec: rtp_codec.clone(),
..Default::default()
}],
)) {
Ok(t) => Arc::new(t),
Err(e) => {
log::info!("[pc] failed to create track: {}", e);
return;
}
};
let track_local: Arc<dyn TrackLocal> = track.clone();
if let Err(e) = pc.add_track(track_local).await {
log::info!("[pc] add_track (video) failed: {}", e);
return;
}
log::info!("[pc] video track added");
let audio_ssrc = rand::rng().random::<u32>();
let audio_rtp_codec = audio_codec.rtp_codec.clone();
let audio_track = match TrackLocalStaticSample::new(MediaStreamTrack::new(
"audio-stream".to_owned(),
"audio-track".to_owned(),
"microphone".to_owned(),
RtpCodecKind::Audio,
vec![RTCRtpEncodingParameters {
rtp_coding_parameters: RTCRtpCodingParameters {
ssrc: Some(audio_ssrc),
..Default::default()
},
codec: audio_rtp_codec,
..Default::default()
}],
)) {
Ok(t) => Arc::new(t),
Err(e) => {
log::info!("[pc] failed to create audio track: {}", e);
return;
}
};
let audio_track_local: Arc<dyn TrackLocal> = audio_track.clone();
if let Err(e) = pc.add_track(audio_track_local).await {
log::info!("[pc] add_track (audio) failed: {}", e);
return;
}
log::info!("[pc] audio track added, creating offer...");
send_cursor_position(&out_tx, &input_state, native_w, native_h, out_w, out_h);
let screen_capture = match ShmScreenCapture::try_new(capture_state.clone(), native_w, native_h, depth) {
Ok(Some(shm)) => {
log::info!("[capture] using MIT-SHM acceleration");
ScreenCapture::Shm(shm)
}
Ok(None) => {
log::info!("[capture] SHM unavailable, using get_image fallback");
ScreenCapture::Fallback(FallbackCapture { conn: capture_state.clone(), width: native_w, height: native_h })
}
Err(e) => {
log::info!("[capture] SHM init failed: {}, using get_image fallback", e);
ScreenCapture::Fallback(FallbackCapture { conn: capture_state.clone(), width: native_w, height: native_h })
}
};
let (cap_w, cap_h) = screen_capture.dimensions();
let cap_w32 = cap_w as u32;
let cap_h32 = cap_h as u32;
let mut encoder = match VideoEncoder::new(&state.args, out_w, out_h) {
Ok(e) => e,
Err(err) => {
log::info!("[encoder] failed to create: {:#}", err);
return;
}
};
log::info!("[encoder] created ({}x{}, {}kbps, {}fps)",
out_w, out_h, state.args.bitrate, state.args.framerate);
let offer = match pc.create_offer(None).await {
Ok(o) => o,
Err(e) => {
log::info!("[pc] create_offer failed: {}", e);
return;
}
};
if let Err(e) = pc.set_local_description(offer).await {
log::info!("[pc] set_local_description failed: {}", e);
return;
}
if let Some(local) = pc.local_description().await {
let offer_msg = match serde_json::to_string(&SignalingMessage::Offer {
sdp: local.sdp.clone(),
}) {
Ok(m) => m,
Err(e) => {
log::error!("[sdp] failed to serialize offer: {}", e);
return;
}
};
log::info!("[sdp] sending offer ({} bytes)", local.sdp.len());
if out_tx.try_send(Message::Text(offer_msg.into())).is_err() {
log::info!("[ws] failed to send offer");
return;
}
} else {
log::info!("[pc] ERROR: no local description after create_offer");
return;
}
let answer_sdp = loop {
match in_rx.recv().await {
Some(Ok(Message::Text(t))) => {
if let Ok(SignalingMessage::Answer { sdp }) = serde_json::from_str(&t) {
log::info!("[sdp] received answer ({} bytes)", sdp.len());
break sdp;
}
}
Some(Ok(Message::Close(_))) | None => {
log::info!("[ws] disconnected waiting for answer");
return;
}
_ => {}
}
};
let answer = RTCSessionDescription::answer(answer_sdp)
.expect("invalid answer SDP");
if let Err(e) = pc.set_remote_description(answer).await {
log::info!("[pc] set_remote_description failed: {}", e);
return;
}
log::info!("[sdp] remote description set");
let ice_out_tx = out_tx.clone();
let ice_forward = tokio::spawn(async move {
while let Some(candidate_msg) = ice_rx.recv().await {
if ice_out_tx.try_send(Message::Text(candidate_msg.into())).is_err() {
break;
}
}
});
tokio::select! {
_ = gather_complete_rx.recv() => { log::debug!("[ice] gathering complete"); }
_ = done_rx.recv() => { log::debug!("[ice] connection ended during gathering"); return; }
}
tokio::select! {
_ = connected_rx.recv() => { log::info!("[pc] connection established!"); }
_ = done_rx.recv() => { log::info!("[pc] connection failed before connected"); return; }
}
let frame_duration = std::time::Duration::from_millis(1000 / state.args.framerate as u64);
let track_ssrc = *track.ssrcs().await.first().unwrap_or(&0);
if track_ssrc == 0 {
log::info!("[pc] ERROR: no SSRC available for video track");
return;
}
log::info!("[pipeline] starting 4-stage pipeline (cap→conv→enc→send), {} fps", state.args.framerate);
let (raw_tx, raw_rx) = block_mpsc::sync_channel::<Bytes>(4);
let (yuv_tx, yuv_rx) = block_mpsc::sync_channel::<Vec<u8>>(4);
let (enc_tx, mut enc_rx) = tokio::sync::mpsc::channel::<Bytes>(32);
let cancel = tokio_util::sync::CancellationToken::new();
let cap_stop = cancel.clone();
let cap_raw_tx = raw_tx.clone();
let cap_handle = tokio::task::spawn_blocking(move || {
let mut raw_buf = vec![0u8; cap_w as usize * cap_h as usize * 4];
let mut last_raw = raw_buf.clone(); loop {
if cap_stop.is_cancelled() { break; }
let frame_start = std::time::Instant::now();
if let Err(e) = screen_capture.capture(&mut raw_buf) {
log::info!("[capture] error: {:#}, repeating last frame", e);
raw_buf.clone_from(&last_raw);
} else {
last_raw.clone_from(&raw_buf);
}
if cap_stop.is_cancelled() { return; }
if cap_raw_tx.send(Bytes::from(std::mem::take(&mut raw_buf))).is_err() { break; }
let elapsed = frame_start.elapsed();
if elapsed < frame_duration {
std::thread::sleep(frame_duration - elapsed);
}
}
log::debug!("[capture] task ended");
});
let conv_stop = cancel.clone();
let conv_yuv_tx = yuv_tx.clone();
let conv_handle = tokio::task::spawn_blocking(move || {
let mut i420_buf = vec![0u8; (out_w * out_h * 3 / 2) as usize];
let mut tmp_argb = Vec::new();
loop {
if conv_stop.is_cancelled() { break; }
match raw_rx.recv_timeout(std::time::Duration::from_millis(5)) {
Ok(raw) => {
if needs_scaling {
scale_bgra_direct(raw.as_ref(), cap_w32, cap_h32,
out_w, out_h, &mut i420_buf, &mut tmp_argb);
} else {
bgra_to_i420(raw.as_ref(), out_w, out_h, &mut i420_buf);
}
if conv_stop.is_cancelled() { return; }
if conv_yuv_tx.send(std::mem::take(&mut i420_buf)).is_err() { return; }
}
Err(block_mpsc::RecvTimeoutError::Timeout) => continue,
Err(block_mpsc::RecvTimeoutError::Disconnected) => break,
}
}
log::debug!("[convert] task ended");
});
let enc_stop = cancel.clone();
let enc_tx_clone = enc_tx.clone();
let enc_handle = tokio::task::spawn_blocking(move || {
let mut enc_buf = Vec::with_capacity(65536);
let mut frame_count: u64 = 0;
loop {
if enc_stop.is_cancelled() { break; }
match yuv_rx.recv_timeout(std::time::Duration::from_millis(5)) {
Ok(yuv) => {
enc_buf.clear();
if let Err(e) = encoder.encode(yuv, &mut enc_buf) {
log::error!("[encoder] error: {:#}, forcing keyframe", e);
encoder.force_keyframe();
continue;
}
let frame = Bytes::from(std::mem::take(&mut enc_buf));
if enc_stop.is_cancelled() { return; }
if enc_tx_clone.blocking_send(frame).is_err() { return; }
frame_count += 1;
let period = state.args.framerate as u64 * 10;
if frame_count % period == 0 {
encoder.force_keyframe();
}
}
Err(block_mpsc::RecvTimeoutError::Timeout) => continue,
Err(block_mpsc::RecvTimeoutError::Disconnected) => break,
}
}
log::debug!("[encoder] task ended");
});
let send_stop = cancel.clone();
let send_handle = tokio::spawn(async move {
loop {
tokio::select! {
Some(h264) = enc_rx.recv() => {
if h264.is_empty() { continue; }
if let Err(e) = track.sample_writer(track_ssrc).write_sample(&Sample {
data: h264,
duration: frame_duration,
..Default::default()
}).await {
log::info!("[send] write_sample error: {}", e);
break;
}
}
_ = send_stop.cancelled() => { break; }
}
}
log::debug!("[send] task ended");
});
use libpulse_binding as pulse;
use libpulse_simple_binding as psimple;
let audio_source = find_default_monitor();
if let Some(ref src) = audio_source {
log::info!("[audio] using monitor source: {}", src);
} else {
log::info!("[audio] no monitor found, falling back to default record source");
}
let audio_frame_duration = std::time::Duration::from_millis(20);
let (pcm_tx, pcm_rx) = block_mpsc::sync_channel::<Vec<u8>>(8);
let (opus_tx, mut opus_rx) = tokio::sync::mpsc::channel::<Bytes>(8);
let audio_cap_stop = cancel.clone();
let audio_pcm_tx = pcm_tx.clone();
let audio_source = audio_source.clone();
let audio_cap_handle = tokio::task::spawn_blocking(move || {
let spec = pulse::sample::Spec {
format: pulse::sample::Format::S16NE,
channels: 2,
rate: 48000,
};
if !spec.is_valid() {
log::info!("[audio] invalid PulseAudio sample spec");
return;
}
let dev = audio_source.as_deref();
let pa = match psimple::Simple::new(
None, "vnrit", pulse::stream::Direction::Record,
dev, "audio-capture", &spec, None, None,
) {
Ok(s) => s,
Err(e) => {
log::error!("[audio] PulseAudio init failed: {} — audio disabled", e);
return;
}
};
log::info!("[audio] PulseAudio capture started");
const PCM_FRAME_BYTES: usize = 3840;
let mut buf = vec![0u8; PCM_FRAME_BYTES];
loop {
if audio_cap_stop.is_cancelled() { break; }
if let Err(e) = pa.read(&mut buf) {
log::info!("[audio] read error: {}", e);
break;
}
if audio_cap_stop.is_cancelled() { return; }
if audio_pcm_tx.send(buf.clone()).is_err() { return; }
}
log::debug!("[audio] capture task ended");
});
let audio_enc_stop = cancel.clone();
let audio_opus_tx = opus_tx.clone();
let audio_enc_handle = tokio::task::spawn_blocking(move || {
let mut encoder = match opus::Encoder::new(48000, opus::Channels::Stereo, opus::Application::Audio) {
Ok(e) => e,
Err(e) => {
log::info!("[audio] Opus encoder init failed: {}", e);
return;
}
};
let mut opus_out = Vec::with_capacity(4096);
loop {
if audio_enc_stop.is_cancelled() { break; }
match pcm_rx.recv_timeout(std::time::Duration::from_millis(5)) {
Ok(pcm) => {
assert_eq!(pcm.len() % 2, 0, "PCM buffer length must be even for i16 samples");
let samples = unsafe {
std::slice::from_raw_parts(pcm.as_ptr() as *const i16, pcm.len() / 2)
};
opus_out.clear();
opus_out.resize(4096, 0);
match encoder.encode(samples, &mut opus_out) {
Ok(n) => {
opus_out.truncate(n);
let frame = Bytes::copy_from_slice(&opus_out);
if audio_enc_stop.is_cancelled() { return; }
let _ = audio_opus_tx.try_send(frame);
}
Err(e) => log::info!("[audio] encode error: {}", e),
}
}
Err(block_mpsc::RecvTimeoutError::Timeout) => continue,
Err(block_mpsc::RecvTimeoutError::Disconnected) => break,
}
}
log::debug!("[audio] encode task ended");
});
let audio_send_stop = cancel.clone();
let audio_send_handle = tokio::spawn(async move {
loop {
tokio::select! {
Some(opus_packet) = opus_rx.recv() => {
if opus_packet.is_empty() { continue; }
if let Err(e) = audio_track.sample_writer(audio_ssrc).write_sample(&Sample {
data: opus_packet,
duration: audio_frame_duration,
..Default::default()
}).await {
log::info!("[audio] write_sample error: {}", e);
break;
}
}
_ = audio_send_stop.cancelled() => { break; }
}
}
log::debug!("[audio] send task ended");
});
let mut cursor_timer = time::interval(Duration::from_millis(50));
loop {
tokio::select! {
_ = cursor_timer.tick() => {
sync_cursor_position(&out_tx, &input_state, native_w, native_h, out_w, out_h);
}
msg = in_rx.recv() => {
match msg {
Some(Ok(Message::Text(t))) => {
if let Ok(sig) = serde_json::from_str::<SignalingMessage>(&t) {
match sig {
SignalingMessage::Ice { candidate, sdp_mline_index } => {
let init = RTCIceCandidateInit {
candidate,
sdp_mline_index: Some(sdp_mline_index as u16),
..Default::default()
};
let _ = pc.add_ice_candidate(init).await;
}
SignalingMessage::Offer { .. } => {
log::debug!("[ws] unexpected duplicate offer, ignoring");
}
_ => {}
}
} else if t.starts_with("mr,") || t.starts_with("ma,")
|| t.starts_with("md,") || t.starts_with("mu,")
|| t.starts_with("ms,") || t.starts_with("kd,")
|| t.starts_with("ku,")
{
let scale_x = native_w as f64 / out_w as f64;
let scale_y = native_h as f64 / out_h as f64;
handle_input_message(&t, &input_state, scale_x, scale_y);
send_cursor_position(&out_tx, &input_state, native_w, native_h, out_w, out_h);
} else {
let preview: String = t.chars().take(100).collect();
log::debug!("[ws] unrecognized message: '{}'", preview);
let _ = out_tx.try_send(Message::Text(
serde_json::json!({"type":"error","message":"invalid message format"}).to_string().into()
));
}
}
Some(Ok(Message::Close(_))) => {
log::debug!("[ws] client sent close");
break;
}
Some(Err(_)) | None => {
log::debug!("[ws] channel disconnected");
break;
}
_ => {}
}
}
_ = async {
match done_rx.try_recv() {
Ok(()) => {}
Err(_) => futures_util::future::pending::<()>().await,
}
} => {
log::debug!("[loop] connection closed, exiting");
break;
}
}
}
log::info!("[cleanup] client disconnected, shutting down pipeline...");
cancel.cancel();
{
let keys = input_state.pressed_keys.lock().unwrap();
for &kc in keys.iter() {
let _ = xtest::fake_input(&input_state.conn, X11_KEY_RELEASE,
kc, 0, input_state.root, 0, 0, 0);
}
if !keys.is_empty() {
let _ = input_state.conn.flush();
}
}
drop(raw_tx);
drop(yuv_tx);
drop(enc_tx);
drop(pcm_tx);
drop(opus_tx);
let _ = cap_handle.await;
let _ = conv_handle.await;
let _ = enc_handle.await;
let _ = send_handle.await;
let _ = audio_cap_handle.await;
let _ = audio_enc_handle.await;
let _ = audio_send_handle.await;
drop(handler);
let _ = ice_forward.await;
io_handle.abort();
let _ = io_handle.await;
let _ = pc.close().await;
log::info!("[ws] cleanup complete");
}
fn handle_input_message(raw: &str, state: &InputState, scale_x: f64, scale_y: f64) {
let parts: Vec<&str> = raw.split(',').collect();
if parts.is_empty() {
return;
}
match parts[0] {
"mr" if parts.len() >= 3 => {
let dx: i32 = parts[1].parse().unwrap_or(0);
let dy: i32 = parts[2].parse().unwrap_or(0);
let max_x = state.screen_w as i32 - 1;
let max_y = state.screen_h as i32 - 1;
let new_x = state.cursor_x.load(Ordering::Relaxed).saturating_add(dx).clamp(0, max_x);
let new_y = state.cursor_y.load(Ordering::Relaxed).saturating_add(dy).clamp(0, max_y);
state.cursor_x.store(new_x, Ordering::Relaxed);
state.cursor_y.store(new_y, Ordering::Relaxed);
let _ = xtest::fake_input(&state.conn, X11_MOTION_NOTIFY,
0, 0, state.root,
state.cursor_x.load(Ordering::Relaxed) as i16,
state.cursor_y.load(Ordering::Relaxed) as i16,
0);
let _ = state.conn.flush();
}
"ma" if parts.len() >= 3 => {
let raw_x = parts[1].parse::<i32>().unwrap_or(0);
let raw_y = parts[2].parse::<i32>().unwrap_or(0);
let new_x = ((raw_x as f64 * scale_x) as i32).clamp(0, state.screen_w as i32 - 1);
let new_y = ((raw_y as f64 * scale_y) as i32).clamp(0, state.screen_h as i32 - 1);
state.cursor_x.store(new_x, Ordering::Relaxed);
state.cursor_y.store(new_y, Ordering::Relaxed);
let _ = xtest::fake_input(&state.conn, X11_MOTION_NOTIFY,
0, 0, state.root, new_x as i16, new_y as i16, 0);
let _ = state.conn.flush();
}
"md" if parts.len() >= 2 => {
let btn: u8 = match parts[1] {
"2" => 2,
"3" => 3,
_ => 1,
};
let _ = xtest::fake_input(&state.conn, X11_BUTTON_PRESS,
btn, 0, state.root, state.cursor_x.load(Ordering::Relaxed) as i16, state.cursor_y.load(Ordering::Relaxed) as i16, 0);
let _ = state.conn.flush();
}
"mu" if parts.len() >= 2 => {
let btn: u8 = match parts[1] {
"2" => 2,
"3" => 3,
_ => 1,
};
let _ = xtest::fake_input(&state.conn, X11_BUTTON_RELEASE,
btn, 0, state.root, state.cursor_x.load(Ordering::Relaxed) as i16, state.cursor_y.load(Ordering::Relaxed) as i16, 0);
let _ = state.conn.flush();
}
"ms" if parts.len() >= 2 => {
let delta: f64 = parts[1].parse().unwrap_or(0.0);
let steps = (delta.abs() / 40.0).round().clamp(1.0, 20.0) as u32;
let btn = if delta > 0.0 { 5_u8 } else { 4_u8 };
let cx = state.cursor_x.load(Ordering::Relaxed) as i16;
let cy = state.cursor_y.load(Ordering::Relaxed) as i16;
for _ in 0..steps {
let _ = xtest::fake_input(&state.conn, X11_BUTTON_PRESS,
btn, 0, state.root, cx, cy, 0);
let _ = xtest::fake_input(&state.conn, X11_BUTTON_RELEASE,
btn, 0, state.root, cx, cy, 0);
}
let _ = state.conn.flush();
}
"kd" if parts.len() >= 2 => {
let keysym = code_to_keysym(parts[1]);
if keysym != 0 {
let kc = find_keycode(state, keysym);
if kc > 0 {
state.pressed_keys.lock().unwrap().insert(kc);
let _ = xtest::fake_input(&state.conn, X11_KEY_PRESS,
kc, 0, state.root, state.cursor_x.load(Ordering::Relaxed) as i16, state.cursor_y.load(Ordering::Relaxed) as i16, 0);
let _ = state.conn.flush();
}
}
}
"ku" if parts.len() >= 2 => {
let keysym = code_to_keysym(parts[1]);
if keysym != 0 {
let kc = find_keycode(state, keysym);
if kc > 0 {
state.pressed_keys.lock().unwrap().remove(&kc);
let _ = xtest::fake_input(&state.conn, X11_KEY_RELEASE,
kc, 0, state.root, state.cursor_x.load(Ordering::Relaxed) as i16, state.cursor_y.load(Ordering::Relaxed) as i16, 0);
let _ = state.conn.flush();
}
}
}
_ => {}
}
}
fn send_cursor_position(out_tx: &mpsc::Sender<Message>, state: &InputState,
native_w: u16, native_h: u16, out_w: u32, out_h: u32)
{
let x = state.cursor_x.load(Ordering::Relaxed);
let y = state.cursor_y.load(Ordering::Relaxed);
if x == state.last_sent_x.load(Ordering::Relaxed)
&& y == state.last_sent_y.load(Ordering::Relaxed)
{
return;
}
state.last_sent_x.store(x, Ordering::Relaxed);
state.last_sent_y.store(y, Ordering::Relaxed);
let sx = if native_w > 0 { x as u64 * out_w as u64 / native_w as u64 } else { x as u64 };
let sy = if native_h > 0 { y as u64 * out_h as u64 / native_h as u64 } else { y as u64 };
let Ok(msg) = serde_json::to_string(&serde_json::json!({
"type": "cursor",
"x": sx,
"y": sy
})) else { return };
let _ = out_tx.try_send(Message::Text(msg.into()));
}
fn sync_cursor_position(out_tx: &mpsc::Sender<Message>, state: &InputState,
native_w: u16, native_h: u16, out_w: u32, out_h: u32)
{
if let Ok(q) = xproto::query_pointer(&state.conn, state.root) {
if let Ok(r) = q.reply() {
state.cursor_x.store(r.root_x as i32, Ordering::Relaxed);
state.cursor_y.store(r.root_y as i32, Ordering::Relaxed);
}
}
send_cursor_position(out_tx, state, native_w, native_h, out_w, out_h);
}
fn find_keycode(s: &InputState, keysym: u32) -> u8 {
s.keycode_cache.get(&keysym).copied().unwrap_or(0)
}
fn code_to_keysym(code: &str) -> u32 {
match code {
"Enter" => 0xff0d,
"Backspace" => 0xff08,
"Space" => 0x0020,
"Tab" => 0xff09,
"Escape" => 0xff1b,
"ArrowUp" => 0xff52,
"ArrowDown" => 0xff54,
"ArrowLeft" => 0xff51,
"ArrowRight" => 0xff53,
"ShiftLeft" | "ShiftRight" => 0xffe1,
"ControlLeft" | "ControlRight" => 0xffe3,
"AltLeft" | "AltRight" => 0xffe9,
"MetaLeft" | "MetaRight" => 0xffeb,
"CapsLock" => 0xffe5,
"Delete" => 0xffff,
"Insert" => 0xff63,
"Home" => 0xff50,
"End" => 0xff57,
"PageUp" => 0xff55,
"PageDown" => 0xff56,
"Minus" => 0x002d,
"Equal" => 0x003d,
"BracketLeft" => 0x005b,
"BracketRight" => 0x005d,
"Semicolon" => 0x003b,
"Quote" => 0x0027,
"Backquote" => 0x0060,
"PrintScreen" => 0xff61,
"ScrollLock" => 0xff14,
"Pause" => 0xff13,
"Break" => 0xff6b,
"SysRq" => 0xff15,
"NumLock" => 0xff7f,
"Comma" => 0x002c,
"Period" => 0x002e,
"Slash" => 0x002f,
"Backslash" | "IntlBackslash" => 0x005c,
k if k.starts_with("Numpad") => match k {
"Numpad0" => 0xffb0,
"Numpad1" => 0xffb1,
"Numpad2" => 0xffb2,
"Numpad3" => 0xffb3,
"Numpad4" => 0xffb4,
"Numpad5" => 0xffb5,
"Numpad6" => 0xffb6,
"Numpad7" => 0xffb7,
"Numpad8" => 0xffb8,
"Numpad9" => 0xffb9,
"NumpadEnter" => 0xff8d,
"NumpadAdd" => 0xffab,
"NumpadSubtract" => 0xffad,
"NumpadMultiply" => 0xffaa,
"NumpadDivide" => 0xffaf,
"NumpadDecimal" => 0xffae,
_ => return 0,
},
k if k.starts_with('F') && k.len() <= 4 => {
let n: u32 = k[1..].parse().unwrap_or(0);
if (1..=24).contains(&n) {
0xffbe + n - 1
} else {
0
}
}
"Digit0" | "Digit1" | "Digit2" | "Digit3" | "Digit4"
| "Digit5" | "Digit6" | "Digit7" | "Digit8" | "Digit9" => {
code.as_bytes()[5] as u32
}
_ => {
if let Some(c) = code.strip_prefix("Key") {
if c.len() == 1 {
let b = c.as_bytes()[0];
if b.is_ascii_alphabetic() {
return b as u32;
}
}
}
0
}
}
}
fn get_local_ip() -> String {
let fallback = "127.0.0.1".to_string();
if let Ok(socket) = std::net::UdpSocket::bind("0.0.0.0:0") {
if socket.connect("8.8.8.8:80").is_ok() {
if let Ok(addr) = socket.local_addr() {
return addr.ip().to_string();
}
}
}
fallback
}