use anyhow::Result;
use std::time::{SystemTime, UNIX_EPOCH};
use xoq::cmaf::{parse_av1_frame, Av1CmafMuxer, CmafConfig};
use xoq::nvenc_av1::NvencAv1Encoder;
use xoq::realsense::RealSenseCamera;
use xoq::MoqBuilder;
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
fn stamp(data: Vec<u8>, ms: u64) -> Vec<u8> {
let mut out = Vec::with_capacity(8 + data.len());
out.extend_from_slice(&ms.to_le_bytes());
out.extend_from_slice(&data);
out
}
const DEPTH_SHIFT: u32 = 0;
fn depth_to_p010(depth_mm: &[u16], p010_buf: &mut Vec<u8>, width: u32, height: u32) {
let w = width as usize;
let h = height as usize;
let y_bytes = w * h * 2; let uv_bytes = w * (h / 2) * 2; p010_buf.resize(y_bytes + uv_bytes, 0);
let mut gray = vec![0u16; w * h];
for i in 0..(w * h).min(depth_mm.len()) {
let d = depth_mm[i] as u32;
if d > 0 {
gray[i] = ((d >> DEPTH_SHIFT).min(1023)) as u16;
}
}
let mut fill_left = vec![0u16; w * h];
let mut fill_right = vec![0u16; w * h];
for y in 0..h {
let row = y * w;
let mut last_valid = 0u16;
for x in 0..w {
if gray[row + x] != 0 {
last_valid = gray[row + x];
}
fill_left[row + x] = last_valid;
}
last_valid = 0;
for x in (0..w).rev() {
if gray[row + x] != 0 {
last_valid = gray[row + x];
}
fill_right[row + x] = last_valid;
}
}
for i in 0..(w * h) {
if gray[i] == 0 {
gray[i] = fill_left[i].max(fill_right[i]);
}
}
let mut fill_top = vec![0u16; w * h];
let mut fill_bot = vec![0u16; w * h];
for x in 0..w {
let mut last_valid = 0u16;
for y in 0..h {
if gray[y * w + x] != 0 {
last_valid = gray[y * w + x];
}
fill_top[y * w + x] = last_valid;
}
last_valid = 0;
for y in (0..h).rev() {
if gray[y * w + x] != 0 {
last_valid = gray[y * w + x];
}
fill_bot[y * w + x] = last_valid;
}
}
for i in 0..(w * h) {
if gray[i] == 0 {
gray[i] = fill_top[i].max(fill_bot[i]);
}
}
for i in 0..(w * h) {
let val = (gray[i] << 6).to_le_bytes(); p010_buf[i * 2] = val[0];
p010_buf[i * 2 + 1] = val[1];
}
for i in 0..(w * (h / 2)) {
let offset = y_bytes + i * 2;
p010_buf[offset] = 0x00; p010_buf[offset + 1] = 0x80;
}
}
struct Args {
relay: String,
path: String,
serial: Option<String>,
width: u32,
height: u32,
fps: u32,
color_bitrate: u32,
depth_qp: u32,
insecure: bool,
tare_distance_mm: Option<f32>,
}
fn parse_args() -> Args {
let args: Vec<String> = std::env::args().collect();
let mut result = Args {
relay: "https://cdn.1ms.ai".to_string(),
path: "anon/realsense".to_string(),
serial: None,
width: 1280,
height: 720,
fps: 15,
color_bitrate: 2_000_000,
depth_qp: 10,
insecure: false,
tare_distance_mm: None,
};
let mut i = 1;
while i < args.len() {
match args[i].as_str() {
"--relay" if i + 1 < args.len() => {
result.relay = args[i + 1].clone();
i += 2;
}
"--path" if i + 1 < args.len() => {
result.path = args[i + 1].clone();
i += 2;
}
"--serial" if i + 1 < args.len() => {
result.serial = Some(args[i + 1].clone());
i += 2;
}
"--width" if i + 1 < args.len() => {
result.width = args[i + 1].parse().unwrap_or(1280);
i += 2;
}
"--height" if i + 1 < args.len() => {
result.height = args[i + 1].parse().unwrap_or(720);
i += 2;
}
"--fps" if i + 1 < args.len() => {
result.fps = args[i + 1].parse().unwrap_or(15);
i += 2;
}
"--bitrate" if i + 1 < args.len() => {
result.color_bitrate = args[i + 1].parse().unwrap_or(2_000_000);
i += 2;
}
"--depth-qp" if i + 1 < args.len() => {
result.depth_qp = args[i + 1].parse().unwrap_or(20);
i += 2;
}
"--tare-distance" if i + 1 < args.len() => {
result.tare_distance_mm = Some(args[i + 1].parse().unwrap_or(0.0));
i += 2;
}
"--insecure" => {
result.insecure = true;
i += 1;
}
"--help" | "-h" => {
print_usage();
std::process::exit(0);
}
_ => {
i += 1;
}
}
}
result
}
fn print_usage() {
println!("RealSense Server - Color (AV1) + Depth (AV1 10-bit) over MoQ");
println!();
println!("Usage: realsense_server [options]");
println!();
println!("Options:");
println!(" --relay <url> MoQ relay URL (default: https://cdn.1ms.ai)");
println!(" --path <path> MoQ broadcast path (default: anon/realsense)");
println!(" --width <px> Resolution width (default: 1280)");
println!(" --height <px> Resolution height (default: 720)");
println!(" --fps <rate> Framerate (default: 15)");
println!(" --bitrate <bps> AV1 color bitrate (default: 2000000)");
println!(" --depth-qp <qp> AV1 depth QP (0=lossless, 20=high quality, default: 20)");
println!(" --serial <serial> RealSense serial number (default: first device)");
println!(" --tare-distance <mm> Run tare calibration with known distance (mm)");
println!(" --insecure Disable TLS verification");
println!();
println!("Tracks published:");
println!(" \"metadata\" - Intrinsics JSON (on keyframes)");
println!(" \"video\" - Color AV1/CMAF (fMP4), 8-bit");
println!(
" \"depth\" - Depth AV1/CMAF (fMP4), 10-bit P010, gray10 = depth_mm >> {} ({}mm/step, max {}mm)",
DEPTH_SHIFT,
1u32 << DEPTH_SHIFT,
1023u32 << DEPTH_SHIFT
);
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("xoq=info".parse()?)
.add_directive("warn".parse()?),
)
.init();
let mut args = parse_args();
println!();
println!("========================================");
println!("RealSense Server");
println!("========================================");
println!("Relay: {}", args.relay);
println!("Path: {}", args.path);
println!(
"Resolution: {}x{} @ {}fps",
args.width, args.height, args.fps
);
println!(
"Color: AV1 NVENC 8-bit, {} kbps",
args.color_bitrate / 1000
);
println!(
"Depth: AV1 NVENC 10-bit P010, CQP QP={}, P7 high-quality, shift={} ({}mm/step, max {}mm)",
args.depth_qp,
DEPTH_SHIFT,
1u32 << DEPTH_SHIFT,
1023u32 << DEPTH_SHIFT,
);
println!("Tracks: \"video\" (AV1/CMAF), \"depth\" (AV1 10-bit/CMAF), \"metadata\" (JSON)");
println!("========================================");
println!();
match RealSenseCamera::list_devices() {
Ok(devices) => {
println!("Connected RealSense devices:");
for (i, (name, serial)) in devices.iter().enumerate() {
println!(" [{}] {} (serial: {})", i, name, serial);
}
println!();
}
Err(e) => tracing::warn!("Could not list devices: {}", e),
}
if let Some(tare_mm) = args.tare_distance_mm {
match RealSenseCamera::run_tare_calibration(args.serial.as_deref(), tare_mm) {
Ok((health, applied)) => {
if applied {
tracing::info!(
"Tare calibration applied (health: {:.3}, distance: {}mm)",
health,
tare_mm
);
} else {
tracing::warn!(
"Tare calibration completed but not applied (health: {:.3})",
health
);
}
}
Err(e) => {
tracing::warn!(
"Tare calibration failed: {}, continuing with existing calibration",
e
);
}
}
} else {
match RealSenseCamera::run_on_chip_calibration(args.serial.as_deref()) {
Ok((health, applied)) => {
if applied {
tracing::info!("On-chip calibration applied (health: {:.3})", health);
} else {
tracing::info!("Calibration already good (health: {:.3}), skipped", health);
}
}
Err(e) => {
tracing::warn!(
"On-chip calibration failed: {}, continuing with existing calibration",
e
);
}
}
}
tracing::info!("Opening RealSense camera...");
let mut camera =
match RealSenseCamera::open(args.width, args.height, args.fps, args.serial.as_deref()) {
Ok(cam) => cam,
Err(e) if args.width != 640 || args.height != 480 => {
tracing::warn!(
"Failed to open at {}x{}: {}. Falling back to 640x480 (USB 2?)",
args.width,
args.height,
e
);
args.width = 640;
args.height = 480;
RealSenseCamera::open(args.width, args.height, args.fps, args.serial.as_deref())?
}
Err(e) => return Err(e),
};
let intr = camera.intrinsics();
tracing::info!(
"RealSense opened: {}x{} @ {}fps, depth_scale={}, fx={:.1} fy={:.1} ppx={:.1} ppy={:.1}",
camera.width(),
camera.height(),
args.fps,
camera.depth_scale(),
intr.fx,
intr.fy,
intr.ppx,
intr.ppy,
);
tracing::info!("Initializing AV1 encoder for color (8-bit)...");
let mut color_encoder =
NvencAv1Encoder::new(args.width, args.height, args.fps, args.color_bitrate, false)?;
tracing::info!("AV1 color encoder ready");
tracing::info!(
"Initializing AV1 encoder for depth (10-bit P010, CQP QP={})...",
args.depth_qp
);
let mut depth_encoder =
NvencAv1Encoder::new_cqp(args.width, args.height, args.fps, args.depth_qp, true)?;
tracing::info!("AV1 depth encoder ready (P7 high-quality, CQP)");
let mut depth_p010 = Vec::new();
let mut frame_count = 0u64;
let mut moq_delay = std::time::Duration::from_secs(1);
loop {
tracing::info!("Connecting to MoQ relay: {}", args.relay);
let mut builder = MoqBuilder::new().relay(&args.relay).path(&args.path);
if args.insecure {
builder = builder.disable_tls_verify();
}
let mut publisher = match builder.connect_publisher().await {
Ok(pub_) => pub_,
Err(e) => {
tracing::error!("MoQ connect failed: {}, retrying in {:?}...", e, moq_delay);
tokio::time::sleep(moq_delay).await;
moq_delay = (moq_delay * 2).min(std::time::Duration::from_secs(30));
continue;
}
};
let mut video_track = publisher.create_track("video");
let mut depth_track = publisher.create_track("depth");
let mut metadata_track = publisher.create_track("metadata");
tracing::info!("MoQ connected, tracks: video + depth + metadata");
moq_delay = std::time::Duration::from_secs(1);
let frag_ms = 1000 / args.fps;
let mut color_muxer = Av1CmafMuxer::new(CmafConfig {
fragment_duration_ms: frag_ms,
timescale: 90000,
});
let mut depth_muxer = Av1CmafMuxer::new(CmafConfig {
fragment_duration_ms: frag_ms,
timescale: 90000,
});
depth_muxer.set_high_bitdepth(true);
let mut color_init_segment: Option<Vec<u8>> = None;
let mut depth_init_segment: Option<Vec<u8>> = None;
let disconnect_reason = loop {
let frames = camera.capture()?;
let timestamp_us = frames.timestamp_us;
let wall_ms = now_ms();
let av1_data = color_encoder.encode_rgb(&frames.color_rgb, timestamp_us)?;
let parsed = parse_av1_frame(&av1_data);
if color_init_segment.is_none() {
if let Some(ref seq_hdr) = parsed.sequence_header {
let init =
color_muxer.create_init_segment(seq_hdr, frames.width, frames.height);
video_track.write(stamp(init.clone(), wall_ms));
color_init_segment = Some(init);
tracing::info!("Sent AV1 CMAF init segment (color)");
}
}
let pts = (frame_count as i64) * 90000 / args.fps as i64;
let dts = pts;
let duration = (90000 / args.fps) as u32;
if let Some(segment) =
color_muxer.add_frame(&parsed.data, pts, dts, duration, parsed.is_keyframe)
{
if parsed.is_keyframe {
if let Some(ref init) = color_init_segment {
let mut combined = init.clone();
combined.extend_from_slice(&segment);
video_track.write(stamp(combined, wall_ms));
} else {
video_track.write(stamp(segment, wall_ms));
}
} else {
video_track.write(stamp(segment, wall_ms));
}
}
depth_to_p010(
&frames.depth_mm,
&mut depth_p010,
frames.width,
frames.height,
);
let depth_av1 = depth_encoder.encode_p010(&depth_p010, timestamp_us)?;
let depth_parsed = parse_av1_frame(&depth_av1);
if depth_init_segment.is_none() {
if let Some(ref seq_hdr) = depth_parsed.sequence_header {
let init =
depth_muxer.create_init_segment(seq_hdr, frames.width, frames.height);
depth_track.write(stamp(init.clone(), wall_ms));
depth_init_segment = Some(init);
tracing::info!("Sent AV1 CMAF init segment (depth, 10-bit)");
}
}
if let Some(segment) = depth_muxer.add_frame(
&depth_parsed.data,
pts,
dts,
duration,
depth_parsed.is_keyframe,
) {
if depth_parsed.is_keyframe {
if let Some(ref init) = depth_init_segment {
let mut combined = init.clone();
combined.extend_from_slice(&segment);
depth_track.write(stamp(combined, wall_ms));
} else {
depth_track.write(stamp(segment, wall_ms));
}
} else {
depth_track.write(stamp(segment, wall_ms));
}
}
if parsed.is_keyframe {
let gravity_json = match frames.accel {
Some([ax, ay, az]) => {
if frame_count < 3 {
let len = (ax * ax + ay * ay + az * az).sqrt();
tracing::warn!(
"IMU accel=[{:.3}, {:.3}, {:.3}] |a|={:.2}",
ax,
ay,
az,
len
);
}
format!(r#","gravity":[{:.3},{:.3},{:.3}]"#, ax, ay, az)
}
None => String::new(),
};
let metadata_json = format!(
r#"{{"fx":{:.1},"fy":{:.1},"ppx":{:.1},"ppy":{:.1},"width":{},"height":{},"depth_shift":{}{}}}"#,
intr.fx,
intr.fy,
intr.ppx,
intr.ppy,
camera.width(),
camera.height(),
DEPTH_SHIFT,
gravity_json,
);
metadata_track.write(metadata_json.into_bytes());
}
frame_count += 1;
if (frame_count) % (args.fps as u64) == 0 {
let mut dmin = u16::MAX;
let mut dmax = 0u16;
let mut nonzero = 0u32;
for &d in &frames.depth_mm {
if d > 0 {
nonzero += 1;
if d < dmin {
dmin = d;
}
if d > dmax {
dmax = d;
}
}
}
let accel_str = match frames.accel {
Some([ax, ay, az]) => format!(" | accel: [{:.3}, {:.3}, {:.3}]", ax, ay, az),
None => String::new(),
};
tracing::info!(
"Frame {}: color {}B, depth {}B | depth: {}–{}mm, {}/{} valid{}",
frame_count,
av1_data.len(),
depth_av1.len(),
dmin,
dmax,
nonzero,
frames.depth_mm.len(),
accel_str,
);
}
tokio::select! {
biased;
result = publisher.closed() => {
break format!("MoQ session closed: {:?}", result.err());
}
_ = tokio::time::sleep(std::time::Duration::ZERO) => {
}
}
};
tracing::warn!("{}, reconnecting to relay...", disconnect_reason);
}
}