use anyhow::Result;
use crate::av1_decoder::{self, Av1Decoder};
use crate::moq::{MoqBuilder, MoqSubscriber, MoqTrackReader};
#[derive(Debug, Clone, Copy)]
pub struct Intrinsics {
pub fx: f32,
pub fy: f32,
pub ppx: f32,
pub ppy: f32,
pub width: u32,
pub height: u32,
pub depth_shift: u32,
}
pub struct RealSenseFrames {
pub color_rgb: Vec<u8>,
pub depth_mm: Vec<u16>,
pub width: u32,
pub height: u32,
pub timestamp_ms: u64,
}
pub struct RealSenseClient {
video_reader: MoqTrackReader,
depth_reader: MoqTrackReader,
metadata_reader: Option<MoqTrackReader>,
video_decoder: Box<Av1Decoder>,
depth_decoder: Box<Av1Decoder>,
intrinsics: Option<Intrinsics>,
_subscriber: MoqSubscriber,
}
impl RealSenseClient {
pub async fn connect_moq(path: &str) -> Result<Self> {
let relay = if path.contains("://") {
path.to_string()
} else {
format!("https://cdn.1ms.ai/{}", path)
};
let (relay_url, moq_path) = if relay.contains("://") {
let url = url::Url::parse(&relay)?;
let base = format!(
"{}://{}:{}",
url.scheme(),
url.host_str().unwrap_or("localhost"),
url.port().unwrap_or(443)
);
let p = url.path().trim_start_matches('/').to_string();
(base, if p.is_empty() { path.to_string() } else { p })
} else {
("https://cdn.1ms.ai".to_string(), path.to_string())
};
let mut subscriber = MoqBuilder::new()
.relay(&relay_url)
.path(&moq_path)
.disable_tls_verify()
.connect_subscriber()
.await?;
let video_reader = subscriber
.subscribe_track("video")
.await?
.ok_or_else(|| anyhow::anyhow!("Failed to subscribe to video track"))?;
let depth_reader = subscriber
.subscribe_track("depth")
.await?
.ok_or_else(|| anyhow::anyhow!("Failed to subscribe to depth track"))?;
let metadata_reader = subscriber.subscribe_track("metadata").await.ok().flatten();
let video_decoder = Box::new(Av1Decoder::new(false)?); let depth_decoder = Box::new(Av1Decoder::new(true)?);
Ok(Self {
video_reader,
depth_reader,
metadata_reader,
video_decoder,
depth_decoder,
intrinsics: None,
_subscriber: subscriber,
})
}
pub async fn read_frames(&mut self) -> Result<RealSenseFrames> {
if self.intrinsics.is_none() {
if let Some(ref mut meta_reader) = self.metadata_reader {
tokio::select! {
biased;
result = meta_reader.read() => {
if let Ok(Some(data)) = result {
self.parse_metadata(&data);
}
}
_ = tokio::time::sleep(std::time::Duration::ZERO) => {}
}
}
}
let color_frame = loop {
let data = self
.video_reader
.read()
.await?
.ok_or_else(|| anyhow::anyhow!("Video track ended"))?;
let (timestamp_ms, payload) = parse_stamped_data(&data);
let obus = extract_av1_from_cmaf(payload);
if obus.is_empty() {
continue;
}
match self.video_decoder.decode(&obus)? {
Some(frame) => break (frame, timestamp_ms),
None => continue,
}
};
let depth_frame = loop {
let data = self
.depth_reader
.read()
.await?
.ok_or_else(|| anyhow::anyhow!("Depth track ended"))?;
let (_timestamp_ms, payload) = parse_stamped_data(&data);
let obus = extract_av1_from_cmaf(payload);
if obus.is_empty() {
continue;
}
match self.depth_decoder.decode(&obus)? {
Some(frame) => break frame,
None => continue,
}
};
let (color_decoded, timestamp_ms) = color_frame;
let depth_shift = self.intrinsics.map(|i| i.depth_shift).unwrap_or(0);
let depth_mm = av1_decoder::p010_y_to_depth_mm(&depth_frame.data, depth_shift);
Ok(RealSenseFrames {
color_rgb: color_decoded.data,
depth_mm,
width: color_decoded.width,
height: color_decoded.height,
timestamp_ms,
})
}
pub fn intrinsics(&self) -> Option<Intrinsics> {
self.intrinsics
}
fn parse_metadata(&mut self, data: &[u8]) {
if let Ok(text) = std::str::from_utf8(data) {
if let Some(intr) = parse_intrinsics_json(text) {
self.intrinsics = Some(intr);
}
}
}
}
fn parse_stamped_data(data: &[u8]) -> (u64, &[u8]) {
if data.len() < 8 {
return (0, data);
}
let ms = u64::from_le_bytes(data[..8].try_into().unwrap());
(ms, &data[8..])
}
fn extract_av1_from_cmaf(data: &[u8]) -> Vec<u8> {
let mut pos = 0;
let mut result = Vec::new();
while pos + 8 <= data.len() {
let box_size =
u32::from_be_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
let box_type = &data[pos + 4..pos + 8];
if box_size == 0 || pos + box_size > data.len() {
break;
}
if box_type == b"mdat" {
result.extend_from_slice(&data[pos + 8..pos + box_size]);
}
pos += box_size;
}
result
}
fn parse_intrinsics_json(json: &str) -> Option<Intrinsics> {
fn extract_f32(json: &str, key: &str) -> Option<f32> {
let pattern = format!("\"{}\":", key);
let start = json.find(&pattern)? + pattern.len();
let rest = json[start..].trim_start();
let end = rest.find(|c: char| c == ',' || c == '}' || c == ' ')?;
rest[..end].parse().ok()
}
fn extract_u32(json: &str, key: &str) -> Option<u32> {
extract_f32(json, key).map(|v| v as u32)
}
Some(Intrinsics {
fx: extract_f32(json, "fx")?,
fy: extract_f32(json, "fy")?,
ppx: extract_f32(json, "ppx")?,
ppy: extract_f32(json, "ppy")?,
width: extract_u32(json, "width")?,
height: extract_u32(json, "height")?,
depth_shift: extract_u32(json, "depth_shift").unwrap_or(0),
})
}
pub struct SyncRealSenseClient {
inner: RealSenseClient,
runtime: tokio::runtime::Runtime,
source: String,
has_read: bool,
}
impl SyncRealSenseClient {
pub fn connect_moq(path: &str) -> Result<Self> {
let runtime = tokio::runtime::Runtime::new()?;
let client = runtime.block_on(RealSenseClient::connect_moq(path))?;
Ok(Self {
inner: client,
runtime,
source: path.to_string(),
has_read: false,
})
}
pub fn connect_auto(source: &str) -> Result<Self> {
Self::connect_moq(source)
}
fn reconnect(&mut self) -> Result<()> {
let client = self
.runtime
.block_on(RealSenseClient::connect_moq(&self.source))?;
self.inner = client;
Ok(())
}
pub fn read_frames(&mut self) -> Result<RealSenseFrames> {
match self.runtime.block_on(self.inner.read_frames()) {
Ok(frames) => {
self.has_read = true;
Ok(frames)
}
Err(e) if !self.has_read => {
eprintln!("[xoq] First read failed ({e}), reconnecting...");
self.reconnect()?;
self.has_read = true;
self.runtime.block_on(self.inner.read_frames())
}
Err(e) => Err(e),
}
}
pub fn intrinsics(&self) -> Option<Intrinsics> {
self.inner.intrinsics()
}
}