use anyhow::Result;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use xoq::iroh::IrohServerBuilder;
use xoq::MoqBuilder;
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
fn stamp(data: Vec<u8>, ms: u64) -> Vec<u8> {
let mut out = Vec::with_capacity(8 + data.len());
out.extend_from_slice(&ms.to_le_bytes());
out.extend_from_slice(&data);
out
}
#[cfg(feature = "camera")]
use xoq::camera::list_cameras;
#[cfg(feature = "camera")]
use xoq::camera::{Camera, CameraOptions, RawFormat};
#[cfg(feature = "camera-macos")]
use xoq::camera_macos::list_cameras as list_cameras_macos;
#[cfg(feature = "camera-macos")]
use xoq::camera_macos::Camera as CameraMacos;
#[cfg(feature = "nvenc")]
use xoq::nvenc_av1::NvencAv1Encoder;
#[cfg(feature = "vtenc")]
use xoq::vtenc::VtEncoder;
const CAMERA_JPEG_ALPN: &[u8] = b"xoq/camera-jpeg/0";
#[cfg(feature = "nvenc")]
const CAMERA_AV1_ALPN: &[u8] = b"xoq/camera-av1/0";
#[cfg(feature = "vtenc")]
const CAMERA_H264_ALPN: &[u8] = b"xoq/camera-h264/0";
#[cfg(feature = "camera")]
fn get_usb_path(video_index: u32) -> Option<String> {
use std::process::Command;
let device = format!("/dev/video{}", video_index);
let output = Command::new("udevadm")
.args(["info", "--query=property", "--name", &device])
.output()
.ok()?;
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if let Some(path) = line.strip_prefix("ID_PATH=") {
return Some(path.to_string());
}
}
None
}
fn make_key_name(video_index: u32) -> String {
#[cfg(feature = "camera")]
{
if let Some(usb_path) = get_usb_path(video_index) {
let safe_path = usb_path.replace(':', "-").replace('.', "_");
return format!(".xoq_camera_key_{}", safe_path);
}
}
format!(".xoq_camera_key_idx{}", video_index)
}
#[derive(Clone)]
struct CameraConfig {
index: u32,
name: String,
width: u32,
height: u32,
fps: u32,
quality: u8,
#[cfg_attr(not(any(feature = "nvenc", feature = "vtenc")), allow(dead_code))]
bitrate: u32,
use_h264: bool,
identity_path: PathBuf,
moq_path: Option<String>,
relay: String,
insecure: bool,
}
fn parse_args() -> Option<(Vec<CameraConfig>, PathBuf)> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
return None;
}
if args.iter().any(|a| a == "--list") {
print_cameras();
std::process::exit(0);
}
let mut indices = Vec::new();
let mut key_dir = PathBuf::from(".");
let mut width = 640u32;
let mut height = 480u32;
let mut fps = 30u32;
let mut quality = 80u8;
let mut bitrate = 2_000_000u32; let mut use_h264 = false;
let mut moq_path: Option<String> = None;
let mut relay = String::from("https://cdn.1ms.ai");
let mut insecure = false;
let mut i = 1;
while i < args.len() {
let arg = &args[i];
match arg.as_str() {
"--key-dir" if i + 1 < args.len() => {
key_dir = PathBuf::from(&args[i + 1]);
i += 2;
}
"--width" if i + 1 < args.len() => {
width = args[i + 1].parse().unwrap_or(640);
i += 2;
}
"--height" if i + 1 < args.len() => {
height = args[i + 1].parse().unwrap_or(480);
i += 2;
}
"--fps" if i + 1 < args.len() => {
fps = args[i + 1].parse().unwrap_or(30);
i += 2;
}
"--quality" if i + 1 < args.len() => {
quality = args[i + 1].parse().unwrap_or(80);
i += 2;
}
"--bitrate" if i + 1 < args.len() => {
bitrate = args[i + 1].parse().unwrap_or(2_000_000);
i += 2;
}
"--h264" | "--nvenc" | "--vtenc" => {
use_h264 = true;
i += 1;
}
"--relay" if i + 1 < args.len() => {
relay = args[i + 1].clone();
i += 2;
}
"--insecure" => {
insecure = true;
i += 1;
}
"--moq" => {
if i + 1 < args.len()
&& !args[i + 1].starts_with("--")
&& args[i + 1].parse::<u32>().is_err()
{
moq_path = Some(args[i + 1].clone());
i += 2;
} else {
moq_path = Some(String::new());
i += 1;
}
}
_ => {
if let Ok(idx) = arg.parse::<u32>() {
indices.push(idx);
}
i += 1;
}
}
}
if indices.is_empty() {
return None;
}
let name_map = std::collections::HashMap::<u32, String>::new();
let configs = indices
.into_iter()
.map(|index| {
let name = name_map
.get(&index)
.cloned()
.unwrap_or_else(|| format!("Camera {}", index));
let key_name = make_key_name(index);
let identity_path = key_dir.join(&key_name);
let resolved_moq_path = moq_path.as_ref().map(|p| {
if p.is_empty() {
format!("anon/camera-{}", index)
} else {
p.clone()
}
});
CameraConfig {
index,
name,
width,
height,
fps,
quality,
bitrate,
use_h264,
identity_path,
moq_path: resolved_moq_path,
relay: relay.clone(),
insecure,
}
})
.collect();
Some((configs, key_dir))
}
fn get_camera_name_map() -> std::collections::HashMap<u32, String> {
#[cfg(feature = "camera")]
{
if let Ok(cameras) = list_cameras() {
return cameras.iter().map(|c| (c.index, c.name.clone())).collect();
}
}
#[cfg(feature = "camera-macos")]
{
if let Ok(cameras) = list_cameras_macos() {
return cameras.iter().map(|c| (c.index, c.name.clone())).collect();
}
}
std::collections::HashMap::new()
}
fn print_cameras() {
println!("Available cameras:");
#[cfg(feature = "camera")]
match list_cameras() {
Ok(cameras) if !cameras.is_empty() => {
for cam in cameras {
println!(" [{}] {}", cam.index, cam.name);
}
return;
}
_ => {}
}
#[cfg(feature = "camera-macos")]
match list_cameras_macos() {
Ok(cameras) if !cameras.is_empty() => {
for cam in cameras {
println!(" [{}] {}", cam.index, cam.name);
}
return;
}
_ => {}
}
println!(" (none found)");
}
fn encoder_name() -> &'static str {
#[cfg(feature = "nvenc")]
{
return "AV1 (NVENC)";
}
#[cfg(feature = "vtenc")]
{
return "H.264 (VideoToolbox)";
}
#[allow(unreachable_code)]
"H.264"
}
fn print_usage() {
println!("Usage: camera_server <camera_index>... [options]");
println!();
println!("Examples:");
println!(" camera_server 0 # Single camera (JPEG)");
#[cfg(feature = "nvenc")]
println!(" camera_server 0 --h264 # NVENC AV1 encoding");
#[cfg(feature = "vtenc")]
println!(" camera_server 0 --h264 # VideoToolbox H.264 encoding");
#[cfg(not(any(feature = "nvenc", feature = "vtenc")))]
println!(" camera_server 0 --h264 # H.264/AV1 encoding (requires nvenc or vtenc feature)");
println!(" camera_server 0 2 4 # Multiple cameras");
println!(" camera_server 0 --key-dir /etc/xoq # Custom key directory");
println!(" camera_server --list # List available cameras");
println!();
println!("Options:");
println!(" --list List available cameras and exit");
println!(" --key-dir <path> Directory for identity keys (default: .)");
println!(" --width <px> Frame width (default: 640)");
println!(" --height <px> Frame height (default: 480)");
println!(" --fps <rate> Framerate (default: 30)");
println!(" --quality <1-100> JPEG quality (default: 80)");
println!(" --h264 Use H.264 encoding (NVENC on Linux, VideoToolbox on macOS)");
println!(" --bitrate <bps> H.264 bitrate in bps (default: 2000000)");
println!(" --moq [path] Use MoQ relay transport (default: anon/camera-<index>)");
println!(" --relay <url> MoQ relay URL (default: https://cdn.1ms.ai)");
println!(" --insecure Disable TLS verification (for self-signed certs)");
println!();
print_cameras();
}
async fn run_camera_server(config: CameraConfig) -> Result<()> {
loop {
tracing::info!("[cam{}] Starting {}...", config.index, config.name);
let result = if let Some(ref moq_path) = config.moq_path {
if config.use_h264 {
#[cfg(all(feature = "nvenc", feature = "camera"))]
{
run_camera_server_moq_h264_nvenc(&config, moq_path).await
}
#[cfg(all(feature = "vtenc", any(feature = "camera", feature = "camera-macos")))]
{
run_camera_server_moq_h264_vtenc(&config, moq_path).await
}
#[cfg(not(any(
all(feature = "nvenc", feature = "camera"),
all(feature = "vtenc", any(feature = "camera", feature = "camera-macos"))
)))]
{
anyhow::bail!(
"MoQ H.264 requires the 'nvenc' or 'vtenc' feature and a camera feature"
)
}
} else {
run_camera_server_moq(&config, moq_path).await
}
} else if config.use_h264 {
#[cfg(feature = "nvenc")]
{
run_camera_server_h264_nvenc(&config).await
}
#[cfg(feature = "vtenc")]
{
run_camera_server_h264_vtenc(&config).await
}
#[cfg(not(any(feature = "nvenc", feature = "vtenc")))]
{
anyhow::bail!("H.264 requires the 'nvenc' or 'vtenc' feature")
}
} else {
run_camera_server_jpeg(&config).await
};
match result {
Ok(()) => {
tracing::info!("[cam{}] Stopped cleanly", config.index);
break;
}
Err(e) => {
tracing::error!("[cam{}] Failed: {}", config.index, e);
tracing::info!("[cam{}] Restarting in 5s...", config.index);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
Ok(())
}
#[cfg(not(any(feature = "camera", feature = "camera-macos")))]
async fn run_camera_server_moq(_config: &CameraConfig, _moq_path: &str) -> Result<()> {
anyhow::bail!("MoQ mode requires the 'camera' or 'camera-macos' feature")
}
#[cfg(any(feature = "camera", feature = "camera-macos"))]
async fn run_camera_server_moq(config: &CameraConfig, moq_path: &str) -> Result<()> {
#[cfg(feature = "camera")]
let camera = Camera::open(config.index, config.width, config.height, config.fps)?;
#[cfg(all(feature = "camera-macos", not(feature = "camera")))]
let camera = CameraMacos::open(config.index, config.width, config.height, config.fps)?;
tracing::info!(
"[cam{}] Opened: {}x{} ({}) - MoQ JPEG mode",
config.index,
camera.width(),
camera.height(),
camera.format_name()
);
let camera = Arc::new(Mutex::new(camera));
let mut builder = MoqBuilder::new().relay(&config.relay).path(moq_path);
if config.insecure {
builder = builder.disable_tls_verify();
}
let mut publisher = builder.connect_publisher().await?;
tracing::info!(
"[cam{}] MoQ path: {} (relay: {})",
config.index,
moq_path,
config.relay
);
let mut track = publisher.create_track("camera");
let mut frame_count = 0u64;
let quality = config.quality;
let cam_idx = config.index;
loop {
let (jpeg, width, height, timestamp_us) = {
let mut cam = camera.lock().await;
let frame = cam.capture()?;
let jpeg = frame.to_jpeg(quality)?;
(jpeg, frame.width, frame.height, frame.timestamp_us)
};
let wall_ms = now_ms();
let mut buf = Vec::with_capacity(12 + jpeg.len());
buf.extend_from_slice(&width.to_le_bytes());
buf.extend_from_slice(&height.to_le_bytes());
buf.extend_from_slice(&(timestamp_us as u32).to_le_bytes());
buf.extend_from_slice(&jpeg);
track.write(stamp(buf, wall_ms));
frame_count += 1;
}
}
#[cfg(all(feature = "vtenc", any(feature = "camera", feature = "camera-macos")))]
async fn run_camera_server_moq_h264_vtenc(config: &CameraConfig, moq_path: &str) -> Result<()> {
use xoq::cmaf::{CmafConfig, CmafMuxer, NalUnit as CmafNalUnit};
#[cfg(feature = "camera")]
let camera = Camera::open(config.index, config.width, config.height, config.fps)?;
#[cfg(all(feature = "camera-macos", not(feature = "camera")))]
let camera = CameraMacos::open(config.index, config.width, config.height, config.fps)?;
let actual_width = camera.width();
let actual_height = camera.height();
tracing::info!(
"[cam{}] Opened: {}x{} ({}) - MoQ H.264/CMAF mode",
config.index,
actual_width,
actual_height,
camera.format_name()
);
let mut encoder = VtEncoder::new(actual_width, actual_height, config.fps, config.bitrate)?;
tracing::info!("[cam{}] VideoToolbox encoder initialized", config.index);
let mut muxer = CmafMuxer::new(CmafConfig {
fragment_duration_ms: 33, timescale: 90000,
});
let camera = Arc::new(Mutex::new(camera));
let mut builder = MoqBuilder::new().relay(&config.relay).path(moq_path);
if config.insecure {
builder = builder.disable_tls_verify();
}
let mut publisher = builder.connect_publisher().await?;
tracing::info!(
"[cam{}] MoQ path: {} (H.264 CMAF, relay: {})",
config.index,
moq_path,
config.relay
);
let mut track = publisher.create_track("video");
let mut init_segment: Option<Vec<u8>> = None;
let mut frame_count = 0u64;
let cam_idx = config.index;
loop {
let encoded = {
let mut cam = camera.lock().await;
let pixel_buffer = cam.capture_pixel_buffer()?;
encoder.encode_pixel_buffer_nals(pixel_buffer.as_ptr(), pixel_buffer.timestamp_us)?
};
let wall_ms = now_ms();
if init_segment.is_none() {
if let (Some(ref sps), Some(ref pps)) = (&encoded.sps, &encoded.pps) {
let init = muxer.create_init_segment(sps, pps, actual_width, actual_height);
track.write(stamp(init.clone(), wall_ms));
init_segment = Some(init);
tracing::info!("[cam{}] Sent CMAF init segment", cam_idx);
}
}
let pts = (frame_count as i64) * 90000 / config.fps as i64;
let dts = pts;
let duration = (90000 / config.fps) as u32;
let cmaf_nals: Vec<CmafNalUnit> = encoded
.nals
.iter()
.map(|n| CmafNalUnit {
data: n.data.clone(),
nal_type: n.nal_type,
})
.collect();
if let Some(segment) = muxer.add_frame(&cmaf_nals, pts, dts, duration, encoded.is_keyframe)
{
if encoded.is_keyframe {
if let Some(ref init) = init_segment {
let mut combined = init.clone();
combined.extend_from_slice(&segment);
track.write(stamp(combined, wall_ms));
} else {
track.write(stamp(segment, wall_ms));
}
} else {
track.write(stamp(segment, wall_ms));
}
}
frame_count += 1;
}
}
#[cfg(all(feature = "nvenc", feature = "camera"))]
async fn run_camera_server_moq_h264_nvenc(config: &CameraConfig, moq_path: &str) -> Result<()> {
use xoq::cmaf::{parse_av1_frame, Av1CmafMuxer, CmafConfig};
let camera = Camera::open_with_options(
config.index,
config.width,
config.height,
config.fps,
CameraOptions { prefer_yuyv: true },
)?;
let actual_width = camera.width();
let actual_height = camera.height();
let mut use_raw = camera.is_yuyv() || camera.is_grey();
tracing::info!(
"[cam{}] Opened: {}x{} ({}) - MoQ AV1/CMAF NVENC mode",
config.index,
actual_width,
actual_height,
camera.format_name()
);
let mut encoder = NvencAv1Encoder::new(
actual_width,
actual_height,
config.fps,
config.bitrate,
false,
)?;
tracing::info!("[cam{}] NVENC AV1 encoder initialized", config.index);
let mut muxer = Av1CmafMuxer::new(CmafConfig {
fragment_duration_ms: 33, timescale: 90000,
});
let camera = Arc::new(Mutex::new(camera));
let mut builder = MoqBuilder::new().relay(&config.relay).path(moq_path);
if config.insecure {
builder = builder.disable_tls_verify();
}
let mut publisher = builder.connect_publisher().await?;
tracing::info!(
"[cam{}] MoQ path: {} (AV1 CMAF NVENC, relay: {})",
config.index,
moq_path,
config.relay
);
let mut track = publisher.create_track("video");
let mut init_segment: Option<Vec<u8>> = None;
let mut frame_count = 0u64;
let cam_idx = config.index;
loop {
let av1_data = {
let mut cam = camera.lock().await;
if use_raw {
let raw_frame = cam.capture_raw()?;
match raw_frame.format {
RawFormat::Yuyv => {
encoder.encode_yuyv(&raw_frame.data, raw_frame.timestamp_us)?
}
RawFormat::Grey => {
encoder.encode_grey(&raw_frame.data, raw_frame.timestamp_us)?
}
_ => {
use_raw = false;
let frame = cam.capture()?;
encoder.encode_rgb(&frame.data, frame.timestamp_us)?
}
}
} else {
let frame = cam.capture()?;
encoder.encode_rgb(&frame.data, frame.timestamp_us)?
}
};
let wall_ms = now_ms();
let parsed = parse_av1_frame(&av1_data);
if init_segment.is_none() {
if let Some(ref seq_hdr) = parsed.sequence_header {
let init = muxer.create_init_segment(seq_hdr, actual_width, actual_height);
track.write(stamp(init.clone(), wall_ms));
init_segment = Some(init);
tracing::info!("[cam{}] Sent AV1 CMAF init segment", cam_idx);
}
}
let pts = (frame_count as i64) * 90000 / config.fps as i64;
let dts = pts;
let duration = (90000 / config.fps) as u32;
if let Some(segment) = muxer.add_frame(&parsed.data, pts, dts, duration, parsed.is_keyframe)
{
if parsed.is_keyframe {
if let Some(ref init) = init_segment {
let mut combined = init.clone();
combined.extend_from_slice(&segment);
track.write(stamp(combined, wall_ms));
} else {
track.write(stamp(segment, wall_ms));
}
} else {
track.write(stamp(segment, wall_ms));
}
}
frame_count += 1;
}
}
#[cfg(not(any(feature = "camera", feature = "camera-macos")))]
async fn run_camera_server_jpeg(_config: &CameraConfig) -> Result<()> {
anyhow::bail!("JPEG mode requires the 'camera' or 'camera-macos' feature")
}
#[cfg(any(feature = "camera", feature = "camera-macos"))]
async fn run_camera_server_jpeg(config: &CameraConfig) -> Result<()> {
#[cfg(feature = "camera")]
let camera = Camera::open(config.index, config.width, config.height, config.fps)?;
#[cfg(all(feature = "camera-macos", not(feature = "camera")))]
let camera = CameraMacos::open(config.index, config.width, config.height, config.fps)?;
tracing::info!(
"[cam{}] Opened: {}x{} ({}) - JPEG mode",
config.index,
camera.width(),
camera.height(),
camera.format_name()
);
let camera = Arc::new(Mutex::new(camera));
let server = IrohServerBuilder::new()
.alpn(CAMERA_JPEG_ALPN)
.identity_path(&config.identity_path)
.bind()
.await?;
tracing::info!("[cam{}] Server ID: {}", config.index, server.id());
loop {
let conn = match server.accept().await {
Ok(Some(c)) => c,
Ok(None) => break,
Err(e) => {
tracing::debug!("[cam{}] Accept error (retrying): {}", config.index, e);
continue;
}
};
tracing::info!("[cam{}] Client: {}", config.index, conn.remote_id());
let stream = match conn.accept_stream().await {
Ok(s) => s,
Err(e) => {
tracing::debug!("[cam{}] Stream error: {}", config.index, e);
continue;
}
};
let (mut send, _recv) = stream.split();
let mut frame_count = 0u64;
let quality = config.quality;
let cam_idx = config.index;
loop {
let (jpeg, width, height, timestamp_us) = {
let mut cam = camera.lock().await;
let frame = cam.capture()?;
let jpeg = frame.to_jpeg(quality)?;
(jpeg, frame.width, frame.height, frame.timestamp_us)
};
let mut header = Vec::with_capacity(20);
header.extend_from_slice(&width.to_le_bytes());
header.extend_from_slice(&height.to_le_bytes());
header.extend_from_slice(×tamp_us.to_le_bytes());
header.extend_from_slice(&(jpeg.len() as u32).to_le_bytes());
if send.write_all(&header).await.is_err() || send.write_all(&jpeg).await.is_err() {
break;
}
frame_count += 1;
}
tracing::info!("[cam{}] Client disconnected", cam_idx);
}
Ok(())
}
#[cfg(feature = "nvenc")]
async fn run_camera_server_h264_nvenc(config: &CameraConfig) -> Result<()> {
let camera = Camera::open_with_options(
config.index,
config.width,
config.height,
config.fps,
CameraOptions { prefer_yuyv: true },
)?;
let actual_width = camera.width();
let actual_height = camera.height();
let mut use_raw = camera.is_yuyv() || camera.is_grey();
tracing::info!(
"[cam{}] Opened: {}x{} ({}) - AV1/NVENC P2P mode",
config.index,
actual_width,
actual_height,
camera.format_name()
);
let encoder = NvencAv1Encoder::new(
actual_width,
actual_height,
config.fps,
config.bitrate,
false,
)?;
tracing::info!("[cam{}] NVENC AV1 encoder initialized", config.index);
let camera = Arc::new(Mutex::new(camera));
let encoder = Arc::new(Mutex::new(encoder));
let server = IrohServerBuilder::new()
.alpn(CAMERA_AV1_ALPN)
.identity_path(&config.identity_path)
.bind()
.await?;
tracing::info!("[cam{}] Server ID: {}", config.index, server.id());
loop {
let conn = match server.accept().await {
Ok(Some(c)) => c,
Ok(None) => break,
Err(e) => {
tracing::debug!("[cam{}] Accept error (retrying): {}", config.index, e);
continue;
}
};
tracing::info!("[cam{}] Client: {}", config.index, conn.remote_id());
let stream = match conn.accept_stream().await {
Ok(s) => s,
Err(e) => {
tracing::debug!("[cam{}] Stream error: {}", config.index, e);
continue;
}
};
let (mut send, _recv) = stream.split();
let mut frame_count = 0u64;
let cam_idx = config.index;
encoder.lock().await.frame_count = 0;
loop {
let av1_data = {
let mut cam = camera.lock().await;
let mut enc = encoder.lock().await;
if use_raw {
let raw_frame = cam.capture_raw()?;
match raw_frame.format {
RawFormat::Yuyv => {
enc.encode_yuyv(&raw_frame.data, raw_frame.timestamp_us)?
}
RawFormat::Grey => {
enc.encode_grey(&raw_frame.data, raw_frame.timestamp_us)?
}
_ => {
use_raw = false;
let frame = cam.capture()?;
enc.encode_rgb(&frame.data, frame.timestamp_us)?
}
}
} else {
let frame = cam.capture()?;
enc.encode_rgb(&frame.data, frame.timestamp_us)?
}
};
if frame_count < 3 {
tracing::info!(
"[cam{}] Frame {}: {} bytes (AV1)",
cam_idx,
frame_count,
av1_data.len(),
);
}
let timestamp_us = frame_count * 1_000_000 / config.fps as u64;
let mut header = Vec::with_capacity(20);
header.extend_from_slice(&actual_width.to_le_bytes());
header.extend_from_slice(&actual_height.to_le_bytes());
header.extend_from_slice(×tamp_us.to_le_bytes());
header.extend_from_slice(&(av1_data.len() as u32).to_le_bytes());
let write_result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
send.write_all(&header).await?;
send.write_all(&av1_data).await?;
Ok::<(), std::io::Error>(())
})
.await;
match write_result {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!("[cam{}] Write error: {}", cam_idx, e);
break;
}
Err(_) => {
tracing::warn!("[cam{}] Write timeout (5s), dropping connection", cam_idx);
break;
}
}
frame_count += 1;
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
}
tracing::info!("[cam{}] Client disconnected", cam_idx);
}
Ok(())
}
#[cfg(feature = "vtenc")]
async fn run_camera_server_h264_vtenc(config: &CameraConfig) -> Result<()> {
let camera = CameraMacos::open(config.index, config.width, config.height, config.fps)?;
let actual_width = camera.width();
let actual_height = camera.height();
tracing::info!(
"[cam{}] Opened: {}x{} ({}) - H.264/VideoToolbox mode",
config.index,
actual_width,
actual_height,
camera.format_name()
);
let encoder = VtEncoder::new(actual_width, actual_height, config.fps, config.bitrate)?;
tracing::info!("[cam{}] VideoToolbox encoder initialized", config.index);
let camera = Arc::new(Mutex::new(camera));
let encoder = Arc::new(Mutex::new(encoder));
let server = IrohServerBuilder::new()
.alpn(CAMERA_H264_ALPN)
.identity_path(&config.identity_path)
.bind()
.await?;
tracing::info!("[cam{}] Server ID: {}", config.index, server.id());
loop {
let conn = match server.accept().await {
Ok(Some(c)) => c,
Ok(None) => break,
Err(e) => {
tracing::debug!("[cam{}] Accept error (retrying): {}", config.index, e);
continue;
}
};
tracing::info!("[cam{}] Client: {}", config.index, conn.remote_id());
let stream = match conn.accept_stream().await {
Ok(s) => s,
Err(e) => {
tracing::debug!("[cam{}] Stream error: {}", config.index, e);
continue;
}
};
let (mut send, _recv) = stream.split();
let mut frame_count = 0u64;
let cam_idx = config.index;
loop {
let h264_data = {
let mut cam = camera.lock().await;
let pixel_buffer = cam.capture_pixel_buffer()?;
let mut enc = encoder.lock().await;
enc.encode_pixel_buffer(pixel_buffer.as_ptr(), pixel_buffer.timestamp_us)?
};
let timestamp_us = frame_count * 1_000_000 / config.fps as u64;
let mut header = Vec::with_capacity(20);
header.extend_from_slice(&actual_width.to_le_bytes());
header.extend_from_slice(&actual_height.to_le_bytes());
header.extend_from_slice(×tamp_us.to_le_bytes());
header.extend_from_slice(&(h264_data.len() as u32).to_le_bytes());
if send.write_all(&header).await.is_err() || send.write_all(&h264_data).await.is_err() {
break;
}
use tokio::time::sleep;
use tokio::time::Duration;
sleep(Duration::from_millis(30)).await; frame_count += 1;
}
tracing::info!("[cam{}] Client disconnected", cam_idx);
}
Ok(())
}
fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("xoq=info".parse()?)
.add_directive("warn".parse()?),
)
.init();
let (configs, key_dir) = match parse_args() {
Some(r) => r,
None => {
print_usage();
return Ok(());
}
};
let use_h264 = configs.first().map(|c| c.use_h264).unwrap_or(false);
let use_moq = configs.first().and_then(|c| c.moq_path.as_ref()).is_some();
#[cfg(not(any(feature = "nvenc", feature = "vtenc")))]
if use_h264 && !use_moq {
eprintln!("Error: H.264 encoding requires the 'nvenc' or 'vtenc' feature.");
#[cfg(target_os = "macos")]
eprintln!("Rebuild with: cargo run --example camera_server --features iroh,vtenc");
#[cfg(not(target_os = "macos"))]
eprintln!("Rebuild with: cargo run --example camera_server --features iroh,nvenc");
return Ok(());
}
let encoding_str = if use_moq && use_h264 {
"H.264 CMAF (MoQ relay)"
} else if use_moq {
"JPEG (MoQ relay)"
} else if use_h264 {
encoder_name()
} else {
"JPEG"
};
tracing::info!("Camera server starting");
tracing::info!("Key dir: {}", key_dir.display());
tracing::info!("Cameras: {}", configs.len());
tracing::info!("Encoding: {}", encoding_str);
println!("\n========================================");
println!("Camera Server Starting...");
println!("Encoding: {}", encoding_str);
println!("========================================\n");
let rt = tokio::runtime::Runtime::new()?;
rt.spawn(async move {
let mut tasks: JoinSet<Result<()>> = JoinSet::new();
for config in configs {
tasks.spawn(run_camera_server(config));
}
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::info!("Shutting down...");
tasks.abort_all();
}
_ = async {
while tasks.join_next().await.is_some() {}
} => {
tracing::info!("All camera servers stopped");
}
}
while tasks.join_next().await.is_some() {}
std::process::exit(0);
});
#[cfg(target_os = "macos")]
unsafe {
extern "C" {
fn CFRunLoopRun();
}
CFRunLoopRun();
}
#[cfg(not(target_os = "macos"))]
std::thread::park();
Ok(())
}