use anyhow::Result;
use quicnet::{Identity, Peer};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[cfg(feature = "video")]
use {
openh264::{encoder::Encoder, decoder::Decoder},
x11rb::connection::Connection,
x11rb::protocol::xproto::*,
image::RgbImage,
};
#[derive(Debug, Clone, Copy)]
enum VideoCodec {
Mjpeg,
H264,
}
#[tokio::main]
async fn main() -> Result<()> {
rustls::crypto::ring::default_provider()
.install_default()
.expect("failed to install crypto provider");
let args: Vec<_> = std::env::args().collect();
match args.get(1).map(|s| s.as_str()) {
Some("server") => run_server().await,
Some("client") => {
let server = args.get(2).ok_or_else(|| anyhow::anyhow!("need server"))?;
let preference = args.get(3).map(|s| s.as_str());
run_client(server, preference).await
}
_ => {
eprintln!("server: {} server", args[0]);
eprintln!("client: {} client <addr> [mjpeg|h264]", args[0]);
std::process::exit(1);
}
}
}
async fn run_server() -> Result<()> {
let identity = Identity::load_or_generate()?;
let peer = Peer::new("[::]:8888".parse()?, identity)?;
eprintln!("video stream server on {}", peer.local_addr()?);
eprintln!("peer: {}", peer.identity().peer_id());
while let Some(incoming) = peer.accept().await {
tokio::spawn(async move {
if let Err(e) = handle_session(incoming).await {
eprintln!("session error: {e}");
}
eprintln!("session ended");
});
}
Ok(())
}
#[cfg(feature = "video")]
async fn handle_session(incoming: quicnet::IncomingConnection) -> Result<()> {
let (conn, peer_id) = incoming.accept().await?;
eprintln!("[{}] video session start", peer_id.short());
let (send, mut recv) = conn.accept_bi().await?;
eprintln!("[{}] waiting for codec byte from client...", peer_id.short());
let mut codec_byte = [0u8; 1];
recv.read_exact(&mut codec_byte).await?;
eprintln!("[{}] received codec byte: {}", peer_id.short(), codec_byte[0]);
let codec = match codec_byte[0] {
1 => VideoCodec::H264,
_ => VideoCodec::Mjpeg,
};
eprintln!("[{}] using codec: {:?}", peer_id.short(), codec);
let result = match codec {
VideoCodec::Mjpeg => stream_mjpeg(send).await,
VideoCodec::H264 => stream_h264(send).await,
};
match result {
Ok(_) => eprintln!("[{}] stream ended normally", peer_id.short()),
Err(e) => eprintln!("[{}] stream error: {}", peer_id.short(), e),
}
Ok(())
}
#[cfg(feature = "video")]
async fn stream_mjpeg(mut send: quinn::SendStream) -> Result<()> {
let (conn, screen_num) = x11rb::connect(None)?;
let screen = &conn.setup().roots[screen_num];
let root = screen.root;
loop {
let geometry = conn.get_geometry(root)?.reply()?;
let image = conn.get_image(
ImageFormat::Z_PIXMAP,
root,
0, 0,
geometry.width,
geometry.height,
!0,
)?.reply()?;
let mut rgb_image = RgbImage::new(geometry.width as u32, geometry.height as u32);
for (i, pixel) in image.data.chunks_exact(4).enumerate() {
let x = (i as u32) % (geometry.width as u32);
let y = (i as u32) / (geometry.width as u32);
rgb_image.put_pixel(x, y, image::Rgb([pixel[2], pixel[1], pixel[0]]));
}
let mut jpeg_data = Vec::new();
let mut encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut jpeg_data, 85);
encoder.encode_image(&rgb_image)?;
send.write_all(&(jpeg_data.len() as u32).to_be_bytes()).await?;
send.write_all(&jpeg_data).await?;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
#[cfg(feature = "video")]
async fn stream_h264(mut send: quinn::SendStream) -> Result<()> {
let mut encoder = Encoder::new()?;
let (conn, screen_num) = x11rb::connect(None)?;
let screen = &conn.setup().roots[screen_num];
let root = screen.root;
loop {
let geometry = conn.get_geometry(root)?.reply()?;
let image = conn.get_image(
ImageFormat::Z_PIXMAP,
root,
0, 0,
geometry.width,
geometry.height,
!0,
)?.reply()?;
let yuv = bgra_to_yuv(&image.data, geometry.width as usize, geometry.height as usize);
let bitstream_vec = {
let bitstream = encoder.encode(&yuv)?;
bitstream.to_vec()
};
if !bitstream_vec.is_empty() {
send.write_all(&(bitstream_vec.len() as u32).to_be_bytes()).await?;
send.write_all(&bitstream_vec).await?;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
#[cfg(feature = "video")]
fn bgra_to_yuv(bgra: &[u8], width: usize, height: usize) -> openh264::formats::YUVBuffer {
use openh264::formats::YUVBuffer;
let y_plane_size = width * height;
let uv_plane_size = y_plane_size / 4;
let mut y_plane = Vec::with_capacity(y_plane_size);
let mut u_plane = Vec::with_capacity(uv_plane_size);
let mut v_plane = Vec::with_capacity(uv_plane_size);
let mut rgb_data = Vec::with_capacity(width * height * 3);
for bgra_pixel in bgra.chunks_exact(4) {
rgb_data.extend_from_slice(&[bgra_pixel[2], bgra_pixel[1], bgra_pixel[0]]); }
for y in (0..height).step_by(2) {
for x in (0..width).step_by(2) {
let mut u_sum = 0i32;
let mut v_sum = 0i32;
let mut sample_count = 0;
for dy in 0..2 {
for dx in 0..2 {
let py = y + dy;
let px = x + dx;
if py >= height || px >= width {
continue;
}
let rgb_idx = (py * width + px) * 3;
let r = rgb_data[rgb_idx] as i32;
let g = rgb_data[rgb_idx + 1] as i32;
let b = rgb_data[rgb_idx + 2] as i32;
let y_val = (66 * r + 129 * g + 25 * b + 128) >> 8;
let u_val = (-38 * r - 74 * g + 112 * b + 128) >> 8;
let v_val = (112 * r - 94 * g - 18 * b + 128) >> 8;
y_plane.push((y_val + 16).clamp(0, 255) as u8);
u_sum += u_val;
v_sum += v_val;
sample_count += 1;
}
}
if sample_count > 0 {
u_plane.push(((u_sum / sample_count) + 128).clamp(0, 255) as u8);
v_plane.push(((v_sum / sample_count) + 128).clamp(0, 255) as u8);
}
}
}
let mut combined_yuv = Vec::with_capacity(y_plane_size + uv_plane_size * 2);
combined_yuv.extend_from_slice(&y_plane);
combined_yuv.extend_from_slice(&u_plane);
combined_yuv.extend_from_slice(&v_plane);
YUVBuffer::from_vec(combined_yuv, width, height)
}
async fn run_client(server: &str, preference: Option<&str>) -> Result<()> {
let addr: std::net::SocketAddr = server.parse()?;
let identity = Identity::load_or_generate()?;
let peer = Peer::new("[::]:0".parse()?, identity)?;
eprintln!("connecting to {}", addr);
let (conn, _) = peer.dial(addr, None).await?;
let (mut send, mut recv) = conn.open_bi().await?;
let codec = match preference {
Some("mjpeg") => {
eprintln!("user requested MJPEG");
VideoCodec::Mjpeg
}
Some("h264") => {
eprintln!("user requested H.264");
VideoCodec::H264
}
_ => {
eprintln!("auto-detecting codec support...");
#[cfg(feature = "video")]
{
match openh264::decoder::Decoder::new() {
Ok(_) => {
eprintln!("H.264 decoder available, using H.264");
VideoCodec::H264
}
Err(_) => {
eprintln!("H.264 decoder not available, falling back to MJPEG");
VideoCodec::Mjpeg
}
}
}
#[cfg(not(feature = "video"))]
{
eprintln!("video feature disabled, using MJPEG");
VideoCodec::Mjpeg
}
}
};
let codec_byte = match codec {
VideoCodec::H264 => 1u8,
VideoCodec::Mjpeg => 0u8,
};
eprintln!("sending codec byte: {}", codec_byte);
send.write_all(&[codec_byte]).await?;
eprintln!("codec byte sent successfully");
eprintln!("negotiated codec: {:?}", codec);
let result = match codec {
VideoCodec::Mjpeg => receive_mjpeg(recv).await,
VideoCodec::H264 => receive_h264(recv).await,
};
match result {
Ok(_) => eprintln!("stream ended normally"),
Err(e) => eprintln!("stream error: {}", e),
}
Ok(())
}
async fn receive_mjpeg(mut recv: quinn::RecvStream) -> Result<()> {
loop {
let mut size_buf = [0u8; 4];
if recv.read_exact(&mut size_buf).await.is_err() {
break;
}
let size = u32::from_be_bytes(size_buf) as usize;
let mut jpeg_data = vec![0u8; size];
recv.read_exact(&mut jpeg_data).await?;
eprintln!("received MJPEG frame: {} bytes", size);
}
Ok(())
}
#[cfg(feature = "video")]
async fn receive_h264(mut recv: quinn::RecvStream) -> Result<()> {
let mut decoder = Decoder::new()?;
loop {
let mut size_buf = [0u8; 4];
if recv.read_exact(&mut size_buf).await.is_err() {
break;
}
let size = u32::from_be_bytes(size_buf) as usize;
let mut frame_data = vec![0u8; size];
recv.read_exact(&mut frame_data).await?;
eprintln!("received H.264 frame: {} bytes", size);
if !frame_data.is_empty() {
if let Some(_yuv) = decoder.decode(&frame_data)? {
eprintln!("successfully decoded H.264 frame");
} else {
eprintln!("H.264 frame received but not ready for display yet");
}
} else {
eprintln!("received empty H.264 frame - encoder issue");
}
}
Ok(())
}
#[cfg(not(feature = "video"))]
async fn handle_session(_: quicnet::IncomingConnection) -> Result<()> {
eprintln!("video feature not enabled");
Ok(())
}
#[cfg(not(feature = "video"))]
async fn receive_h264(_: quinn::RecvStream) -> Result<()> {
eprintln!("video feature not enabled");
Ok(())
}