use super::*;
use std::ffi::{c_char, c_void};
use std::sync::mpsc;
use std::time::Duration;
const TIMEOUT: Duration = Duration::from_secs(10);
fn id(raw: i32) -> u32 {
assert!(raw > 0, "expected positive id, got {raw}");
raw as u32
}
struct Guard<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> Drop for Guard<F> {
fn drop(&mut self) {
if let Some(f) = self.0.take() {
f();
}
}
}
struct Callback {
rx: mpsc::Receiver<i32>,
ptr: *mut c_void,
}
impl Callback {
fn new() -> Self {
let (tx, rx) = mpsc::channel();
let ptr = Box::into_raw(Box::new(tx)) as *mut c_void;
Self { rx, ptr }
}
fn recv(&self) -> i32 {
self.rx.recv_timeout(TIMEOUT).expect("callback timed out")
}
fn try_recv(&self, timeout: Duration) -> Option<i32> {
self.rx.recv_timeout(timeout).ok()
}
}
impl Drop for Callback {
fn drop(&mut self) {
unsafe { drop(Box::from_raw(self.ptr as *mut mpsc::Sender<i32>)) };
}
}
extern "C" fn channel_callback(user_data: *mut c_void, code: i32) {
let tx = unsafe { &*(user_data as *const mpsc::Sender<i32>) };
let _ = tx.send(code);
}
fn opus_head() -> Vec<u8> {
let mut head = Vec::with_capacity(19);
head.extend_from_slice(b"OpusHead");
head.push(1); head.push(2); head.extend_from_slice(&0u16.to_le_bytes()); head.extend_from_slice(&48000u32.to_le_bytes()); head.extend_from_slice(&0u16.to_le_bytes()); head.push(0); head
}
fn h264_init() -> Vec<u8> {
let mut init = Vec::new();
init.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
init.extend_from_slice(&[
0x67, 0x64, 0x00, 0x1f, 0xac, 0x24, 0x84, 0x01, 0x40, 0x16, 0xec, 0x04, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40,
0x00, 0x00, 0x0c, 0x23, 0xc6, 0x0c, 0x92,
]);
init.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
init.extend_from_slice(&[0x68, 0xee, 0x32, 0xc8, 0xb0]);
init
}
#[test]
fn origin_lifecycle() {
let origin = id(moq_origin_create());
assert_eq!(moq_origin_close(origin), 0, "moq_origin_close should succeed");
assert!(moq_origin_close(origin) < 0, "double-close should fail");
}
#[test]
fn publish_media_lifecycle() {
let broadcast = id(moq_publish_create());
let _guard = Guard(Some(|| {
moq_publish_close(broadcast);
}));
let init = opus_head();
let format = b"opus";
let media = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
let payload = b"opus frame";
let ret = unsafe { moq_publish_media_frame(media, payload.as_ptr(), payload.len(), 1000) };
assert_eq!(ret, 0, "moq_publish_media_frame should succeed");
assert_eq!(moq_publish_media_close(media), 0);
assert_eq!(moq_publish_close(broadcast), 0);
}
#[test]
fn close_invalid_or_zero_ids() {
assert!(moq_origin_close(9999) < 0);
assert!(moq_session_close(9999) < 0);
assert!(moq_publish_close(9999) < 0);
assert!(moq_consume_close(9999) < 0);
assert!(moq_consume_frame_close(9999) < 0);
assert!(moq_origin_close(0) < 0);
assert!(moq_session_close(0) < 0);
assert!(moq_publish_close(0) < 0);
}
#[test]
fn double_close_all_resource_types() {
let origin = id(moq_origin_create());
assert_eq!(moq_origin_close(origin), 0);
assert!(moq_origin_close(origin) < 0);
let broadcast = id(moq_publish_create());
let init = opus_head();
let format = b"opus";
let media = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
assert_eq!(moq_publish_media_close(media), 0);
assert!(moq_publish_media_close(media) < 0);
assert_eq!(moq_publish_close(broadcast), 0);
let origin = id(moq_origin_create());
let broadcast = id(moq_publish_create());
let init = opus_head();
let media = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
let path = b"double-close-test";
assert_eq!(
unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) },
0
);
let consume = id(unsafe { moq_origin_consume(origin, path.as_ptr() as *const c_char, path.len()) });
let catalog_cb = Callback::new();
let catalog_task = id(unsafe { moq_consume_catalog(consume, Some(channel_callback), catalog_cb.ptr) });
let catalog_id = id(catalog_cb.recv());
let frame_cb = Callback::new();
let track = id(unsafe { moq_consume_audio_ordered(catalog_id, 0, 10_000, Some(channel_callback), frame_cb.ptr) });
let payload = b"test";
assert_eq!(
unsafe { moq_publish_media_frame(media, payload.as_ptr(), payload.len(), 1_000_000) },
0
);
let frame_id = id(frame_cb.recv());
assert_eq!(moq_consume_frame_close(frame_id), 0);
assert!(moq_consume_frame_close(frame_id) < 0);
assert_eq!(moq_consume_audio_close(track), 0);
assert!(moq_consume_audio_close(track) < 0);
assert_eq!(moq_consume_catalog_free(catalog_id), 0);
assert!(moq_consume_catalog_free(catalog_id) < 0);
assert_eq!(moq_consume_catalog_close(catalog_task), 0);
assert!(moq_consume_catalog_close(catalog_task) < 0);
assert_eq!(moq_consume_close(consume), 0);
assert_eq!(moq_publish_media_close(media), 0);
assert_eq!(moq_publish_close(broadcast), 0);
assert_eq!(moq_origin_close(origin), 0);
}
#[test]
fn unknown_format() {
let broadcast = id(moq_publish_create());
let _guard = Guard(Some(|| {
moq_publish_close(broadcast);
}));
let format = b"nope";
let ret = unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
std::ptr::null(),
0,
)
};
assert!(ret < 0, "unknown format should fail");
}
#[test]
fn local_announce() {
let origin = id(moq_origin_create());
let cb = Callback::new();
let announced_task = id(unsafe { moq_origin_announced(origin, Some(channel_callback), cb.ptr) });
let broadcast = id(moq_publish_create());
let path = b"test/broadcast";
assert_eq!(
unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) },
0,
"moq_origin_publish should succeed"
);
let announced_id = id(cb.recv());
let mut info = moq_announced {
path: std::ptr::null(),
path_len: 0,
active: false,
};
assert_eq!(unsafe { moq_origin_announced_info(announced_id, &mut info) }, 0);
assert!(info.active, "broadcast should be active");
let announced_path =
unsafe { std::str::from_utf8(std::slice::from_raw_parts(info.path as *const u8, info.path_len)).unwrap() };
assert_eq!(announced_path, "test/broadcast");
assert_eq!(moq_origin_announced_close(announced_task), 0);
assert_eq!(moq_publish_close(broadcast), 0);
assert_eq!(moq_origin_close(origin), 0);
}
#[test]
fn announced_deactivation() {
let origin = id(moq_origin_create());
let cb = Callback::new();
let announced_task = id(unsafe { moq_origin_announced(origin, Some(channel_callback), cb.ptr) });
let broadcast = id(moq_publish_create());
let path = b"deactivate/test";
assert_eq!(
unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) },
0
);
let announced_id = id(cb.recv());
let mut info = moq_announced {
path: std::ptr::null(),
path_len: 0,
active: false,
};
assert_eq!(unsafe { moq_origin_announced_info(announced_id, &mut info) }, 0);
assert!(info.active);
assert_eq!(moq_publish_close(broadcast), 0);
let deactivated_id = id(cb.recv());
assert_eq!(unsafe { moq_origin_announced_info(deactivated_id, &mut info) }, 0);
assert!(!info.active, "broadcast should be inactive after publisher closes");
assert_eq!(moq_origin_announced_close(announced_task), 0);
assert_eq!(moq_origin_close(origin), 0);
}
#[test]
fn local_publish_consume() {
let origin = id(moq_origin_create());
let broadcast = id(moq_publish_create());
let init = opus_head();
let format = b"opus";
let media = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
let path = b"live";
assert_eq!(
unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) },
0
);
let consume = id(unsafe { moq_origin_consume(origin, path.as_ptr() as *const c_char, path.len()) });
let catalog_cb = Callback::new();
let catalog_task = id(unsafe { moq_consume_catalog(consume, Some(channel_callback), catalog_cb.ptr) });
let catalog_id = id(catalog_cb.recv());
let mut audio_cfg = moq_audio_config {
name: std::ptr::null(),
name_len: 0,
codec: std::ptr::null(),
codec_len: 0,
description: std::ptr::null(),
description_len: 0,
sample_rate: 0,
channel_count: 0,
};
assert_eq!(unsafe { moq_consume_audio_config(catalog_id, 0, &mut audio_cfg) }, 0);
assert_eq!(audio_cfg.sample_rate, 48000);
assert_eq!(audio_cfg.channel_count, 2);
let codec = unsafe {
std::str::from_utf8(std::slice::from_raw_parts(
audio_cfg.codec as *const u8,
audio_cfg.codec_len,
))
}
.unwrap();
assert_eq!(codec, "opus");
let mut video_cfg = moq_video_config {
name: std::ptr::null(),
name_len: 0,
codec: std::ptr::null(),
codec_len: 0,
description: std::ptr::null(),
description_len: 0,
coded_width: std::ptr::null(),
coded_height: std::ptr::null(),
};
assert!(
unsafe { moq_consume_video_config(catalog_id, 0, &mut video_cfg) } < 0,
"video config should fail (no video tracks)"
);
let frame_cb = Callback::new();
let track = id(unsafe { moq_consume_audio_ordered(catalog_id, 0, 10_000, Some(channel_callback), frame_cb.ptr) });
let payload = b"opus audio payload data";
let timestamp_us: u64 = 1_000_000;
assert_eq!(
unsafe { moq_publish_media_frame(media, payload.as_ptr(), payload.len(), timestamp_us) },
0
);
let frame_id = id(frame_cb.recv());
let mut frame = moq_frame {
payload: std::ptr::null(),
payload_size: 0,
timestamp_us: 0,
keyframe: false,
};
assert_eq!(unsafe { moq_consume_frame_chunk(frame_id, 0, &mut frame) }, 0);
assert_eq!(frame.payload_size, payload.len());
assert_eq!(frame.timestamp_us, timestamp_us);
let received = unsafe { std::slice::from_raw_parts(frame.payload, frame.payload_size) };
assert_eq!(received, payload, "frame payload should match");
assert!(unsafe { moq_consume_frame_chunk(frame_id, 999, &mut frame) } < 0);
assert_eq!(moq_consume_frame_close(frame_id), 0);
assert_eq!(moq_consume_audio_close(track), 0);
assert_eq!(moq_consume_catalog_free(catalog_id), 0);
assert_eq!(moq_consume_catalog_close(catalog_task), 0);
assert_eq!(moq_consume_close(consume), 0);
assert_eq!(moq_publish_media_close(media), 0);
assert_eq!(moq_publish_close(broadcast), 0);
assert_eq!(moq_origin_close(origin), 0);
}
#[test]
fn video_publish_consume() {
let origin = id(moq_origin_create());
let broadcast = id(moq_publish_create());
let init = h264_init();
let format = b"avc3";
let media = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
let path = b"video-test";
assert_eq!(
unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) },
0
);
let consume = id(unsafe { moq_origin_consume(origin, path.as_ptr() as *const c_char, path.len()) });
let catalog_cb = Callback::new();
let catalog_task = id(unsafe { moq_consume_catalog(consume, Some(channel_callback), catalog_cb.ptr) });
let catalog_id = id(catalog_cb.recv());
let mut video_cfg = moq_video_config {
name: std::ptr::null(),
name_len: 0,
codec: std::ptr::null(),
codec_len: 0,
description: std::ptr::null(),
description_len: 0,
coded_width: std::ptr::null(),
coded_height: std::ptr::null(),
};
assert_eq!(
unsafe { moq_consume_video_config(catalog_id, 0, &mut video_cfg) },
0,
"video config should succeed for avc3 H.264 track"
);
let codec = unsafe {
std::str::from_utf8(std::slice::from_raw_parts(
video_cfg.codec as *const u8,
video_cfg.codec_len,
))
}
.unwrap();
assert!(
codec.starts_with("avc1.") || codec.starts_with("avc3."),
"codec should be avc1/avc3, got {codec}"
);
assert!(!video_cfg.coded_width.is_null(), "coded_width should be set");
assert!(!video_cfg.coded_height.is_null(), "coded_height should be set");
let width = unsafe { *video_cfg.coded_width };
let height = unsafe { *video_cfg.coded_height };
assert_eq!(width, 1280);
assert_eq!(height, 720);
let mut audio_cfg = moq_audio_config {
name: std::ptr::null(),
name_len: 0,
codec: std::ptr::null(),
codec_len: 0,
description: std::ptr::null(),
description_len: 0,
sample_rate: 0,
channel_count: 0,
};
assert!(
unsafe { moq_consume_audio_config(catalog_id, 0, &mut audio_cfg) } < 0,
"audio config should fail (no audio tracks)"
);
let frame_cb = Callback::new();
let track = id(unsafe { moq_consume_video_ordered(catalog_id, 0, 10_000, Some(channel_callback), frame_cb.ptr) });
let keyframe = [0x00, 0x00, 0x00, 0x01, 0x65, 0xAA, 0xBB, 0xCC];
assert_eq!(
unsafe { moq_publish_media_frame(media, keyframe.as_ptr(), keyframe.len(), 0) },
0
);
let frame_id = id(frame_cb.recv());
let mut frame = moq_frame {
payload: std::ptr::null(),
payload_size: 0,
timestamp_us: 0,
keyframe: false,
};
assert_eq!(unsafe { moq_consume_frame_chunk(frame_id, 0, &mut frame) }, 0);
assert_eq!(frame.timestamp_us, 0);
assert!(frame.payload_size > 0, "frame should have payload data");
assert_eq!(moq_consume_frame_close(frame_id), 0);
assert_eq!(moq_consume_video_close(track), 0);
assert_eq!(moq_consume_catalog_free(catalog_id), 0);
assert_eq!(moq_consume_catalog_close(catalog_task), 0);
assert_eq!(moq_consume_close(consume), 0);
assert_eq!(moq_publish_media_close(media), 0);
assert_eq!(moq_publish_close(broadcast), 0);
assert_eq!(moq_origin_close(origin), 0);
}
#[test]
fn multiple_frames_ordering() {
let origin = id(moq_origin_create());
let broadcast = id(moq_publish_create());
let init = opus_head();
let format = b"opus";
let media = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
let path = b"ordering-test";
assert_eq!(
unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) },
0
);
let consume = id(unsafe { moq_origin_consume(origin, path.as_ptr() as *const c_char, path.len()) });
let catalog_cb = Callback::new();
let catalog_task = id(unsafe { moq_consume_catalog(consume, Some(channel_callback), catalog_cb.ptr) });
let catalog_id = id(catalog_cb.recv());
let frame_cb = Callback::new();
let track = id(unsafe { moq_consume_audio_ordered(catalog_id, 0, 10_000, Some(channel_callback), frame_cb.ptr) });
let timestamps: [u64; 5] = [0, 20_000, 40_000, 60_000, 80_000];
for (i, &ts) in timestamps.iter().enumerate() {
let payload = format!("frame-{i}");
assert_eq!(
unsafe { moq_publish_media_frame(media, payload.as_ptr(), payload.len(), ts) },
0
);
}
for (i, &expected_ts) in timestamps.iter().enumerate() {
let frame_id = id(frame_cb.recv());
let mut frame = moq_frame {
payload: std::ptr::null(),
payload_size: 0,
timestamp_us: 0,
keyframe: false,
};
assert_eq!(unsafe { moq_consume_frame_chunk(frame_id, 0, &mut frame) }, 0);
assert_eq!(frame.timestamp_us, expected_ts, "frame {i} has wrong timestamp");
let received = unsafe { std::slice::from_raw_parts(frame.payload, frame.payload_size) };
let expected = format!("frame-{i}");
assert_eq!(received, expected.as_bytes(), "frame {i} has wrong payload");
assert_eq!(moq_consume_frame_close(frame_id), 0);
}
assert_eq!(moq_consume_audio_close(track), 0);
assert_eq!(moq_consume_catalog_free(catalog_id), 0);
assert_eq!(moq_consume_catalog_close(catalog_task), 0);
assert_eq!(moq_consume_close(consume), 0);
assert_eq!(moq_publish_media_close(media), 0);
assert_eq!(moq_publish_close(broadcast), 0);
assert_eq!(moq_origin_close(origin), 0);
}
#[test]
fn catalog_update_on_new_track() {
let origin = id(moq_origin_create());
let broadcast = id(moq_publish_create());
let init = opus_head();
let format = b"opus";
let media1 = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
let path = b"catalog-update";
assert_eq!(
unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) },
0
);
let consume = id(unsafe { moq_origin_consume(origin, path.as_ptr() as *const c_char, path.len()) });
let catalog_cb = Callback::new();
let catalog_task = id(unsafe { moq_consume_catalog(consume, Some(channel_callback), catalog_cb.ptr) });
let catalog_id1 = id(catalog_cb.recv());
let mut audio_cfg = moq_audio_config {
name: std::ptr::null(),
name_len: 0,
codec: std::ptr::null(),
codec_len: 0,
description: std::ptr::null(),
description_len: 0,
sample_rate: 0,
channel_count: 0,
};
assert_eq!(unsafe { moq_consume_audio_config(catalog_id1, 0, &mut audio_cfg) }, 0);
assert!(unsafe { moq_consume_audio_config(catalog_id1, 1, &mut audio_cfg) } < 0);
let media2 = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
let catalog_id2 = id(catalog_cb.recv());
assert_eq!(unsafe { moq_consume_audio_config(catalog_id2, 0, &mut audio_cfg) }, 0);
assert_eq!(unsafe { moq_consume_audio_config(catalog_id2, 1, &mut audio_cfg) }, 0);
assert_eq!(moq_consume_catalog_free(catalog_id1), 0);
assert_eq!(moq_consume_catalog_free(catalog_id2), 0);
assert_eq!(moq_consume_catalog_close(catalog_task), 0);
assert_eq!(moq_consume_close(consume), 0);
assert_eq!(moq_publish_media_close(media1), 0);
assert_eq!(moq_publish_media_close(media2), 0);
assert_eq!(moq_publish_close(broadcast), 0);
assert_eq!(moq_origin_close(origin), 0);
}
#[test]
fn null_pointer_handling() {
assert_eq!(
unsafe { moq_consume_frame_chunk(9999, 0, std::ptr::null_mut()) },
-6,
"null dst should return InvalidPointer (-6)"
);
assert_eq!(
unsafe { moq_consume_video_config(9999, 0, std::ptr::null_mut()) },
-6,
"null dst should return InvalidPointer (-6)"
);
assert_eq!(
unsafe { moq_consume_audio_config(9999, 0, std::ptr::null_mut()) },
-6,
"null dst should return InvalidPointer (-6)"
);
assert_eq!(
unsafe { moq_origin_announced_info(9999, std::ptr::null_mut()) },
-6,
"null dst should return InvalidPointer (-6)"
);
}
#[test]
fn session_connect_invalid_url() {
let url = b"not a valid url!!!";
let ret = unsafe {
moq_session_connect(
url.as_ptr() as *const c_char,
url.len(),
0,
0,
None,
std::ptr::null_mut(),
)
};
assert!(ret < 0, "connecting with an invalid URL should fail immediately");
}
#[test]
fn session_connect_and_close() {
let cb = Callback::new();
let url = b"moqt://localhost:1";
let session = id(unsafe {
moq_session_connect(
url.as_ptr() as *const c_char,
url.len(),
0,
0,
Some(channel_callback),
cb.ptr,
)
});
assert_eq!(moq_session_close(session), 0);
assert!(
cb.try_recv(Duration::from_millis(200)).is_none(),
"callback should not fire after session_close"
);
}