use crate::camera::Frame;
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::Mutex;
const CAMERA_ALPN: &[u8] = b"xoq/camera/0";
#[derive(Clone)]
pub enum Transport {
Iroh { server_id: String },
Moq {
path: String,
relay_url: Option<String>,
},
}
pub struct CameraClientBuilder {
transport: Option<Transport>,
}
impl CameraClientBuilder {
pub fn new() -> Self {
Self { transport: None }
}
pub fn iroh(mut self, server_id: &str) -> Self {
self.transport = Some(Transport::Iroh {
server_id: server_id.to_string(),
});
self
}
pub fn moq(mut self, path: &str) -> Self {
self.transport = Some(Transport::Moq {
path: path.to_string(),
relay_url: None,
});
self
}
pub fn moq_with_relay(mut self, path: &str, relay_url: &str) -> Self {
self.transport = Some(Transport::Moq {
path: path.to_string(),
relay_url: Some(relay_url.to_string()),
});
self
}
pub async fn connect(self) -> Result<CameraClient> {
let transport = self
.transport
.ok_or_else(|| anyhow::anyhow!("Transport not specified"))?;
let inner = match transport {
Transport::Iroh { server_id } => {
use crate::iroh::IrohClientBuilder;
let conn = IrohClientBuilder::new()
.alpn(CAMERA_ALPN)
.connect_str(&server_id)
.await?;
let stream = conn.open_stream().await?;
let (_send, recv) = stream.split();
CameraClientInner::Iroh {
recv: Arc::new(Mutex::new(recv)),
_conn: conn,
}
}
Transport::Moq { path, relay_url } => {
use crate::moq::MoqBuilder;
let mut builder = MoqBuilder::new().path(&path);
if let Some(url) = &relay_url {
builder = builder.relay(url);
}
let mut conn = builder.connect_subscriber().await?;
let track = conn
.subscribe_track("camera")
.await?
.ok_or_else(|| anyhow::anyhow!("Camera track not found"))?;
CameraClientInner::Moq { track, _conn: conn }
}
};
Ok(CameraClient { inner })
}
}
impl Default for CameraClientBuilder {
fn default() -> Self {
Self::new()
}
}
enum CameraClientInner {
Iroh {
recv: Arc<Mutex<iroh::endpoint::RecvStream>>,
_conn: crate::iroh::IrohConnection,
},
Moq {
track: crate::moq::MoqTrackReader,
_conn: crate::moq::MoqSubscriber,
},
}
pub struct CameraClient {
inner: CameraClientInner,
}
impl CameraClient {
pub async fn connect(server_id: &str) -> Result<Self> {
CameraClientBuilder::new().iroh(server_id).connect().await
}
pub async fn read_frame(&mut self) -> Result<Frame> {
match &mut self.inner {
CameraClientInner::Iroh { recv, .. } => {
let (width, height, timestamp, jpeg_data) = {
let mut recv = recv.lock().await;
let mut header = [0u8; 20];
recv.read_exact(&mut header).await?;
let width = u32::from_le_bytes([header[0], header[1], header[2], header[3]]);
let height = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
let timestamp = u64::from_le_bytes([
header[8], header[9], header[10], header[11], header[12], header[13],
header[14], header[15],
]);
let length =
u32::from_le_bytes([header[16], header[17], header[18], header[19]]);
let mut jpeg_data = vec![0u8; length as usize];
recv.read_exact(&mut jpeg_data).await?;
(width, height, timestamp, jpeg_data)
};
let mut frame = Frame::from_jpeg(&jpeg_data)?;
frame.timestamp_us = timestamp;
if frame.width != width || frame.height != height {
tracing::warn!(
"Frame dimension mismatch: expected {}x{}, got {}x{}",
width,
height,
frame.width,
frame.height
);
}
Ok(frame)
}
CameraClientInner::Moq { track, .. } => {
let mut retries = 0;
let data = loop {
match track.read().await? {
Some(data) => break data,
None => {
retries += 1;
if retries > 200 {
anyhow::bail!("No frame available after retries");
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
}
};
if data.len() < 12 {
anyhow::bail!("Invalid frame data");
}
let width = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
let height = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
let timestamp = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
let jpeg_data = &data[12..];
let mut frame = Frame::from_jpeg(jpeg_data)?;
frame.timestamp_us = timestamp as u64;
if frame.width != width || frame.height != height {
tracing::warn!(
"Frame dimension mismatch: expected {}x{}, got {}x{}",
width,
height,
frame.width,
frame.height
);
}
Ok(frame)
}
}
}
pub async fn read_frames<F>(&mut self, mut callback: F) -> Result<()>
where
F: FnMut(Frame) -> bool,
{
loop {
let frame = self.read_frame().await?;
if !callback(frame) {
break;
}
}
Ok(())
}
}
pub struct RemoteCameraBuilder {
server_id: String,
}
impl RemoteCameraBuilder {
pub fn new(server_id: &str) -> Self {
RemoteCameraBuilder {
server_id: server_id.to_string(),
}
}
pub async fn connect(self) -> Result<CameraClient> {
CameraClient::connect(&self.server_id).await
}
}
pub fn remote_camera(server_id: &str) -> RemoteCameraBuilder {
RemoteCameraBuilder::new(server_id)
}