Skip to main content

moq_ffi/
consumer.rs

1use std::sync::Arc;
2
3use bytes::Buf;
4
5use crate::error::MoqError;
6use crate::ffi::Task;
7use crate::media::*;
8
9#[derive(Clone, uniffi::Object)]
10pub struct MoqBroadcastConsumer {
11	inner: moq_net::BroadcastConsumer,
12}
13
14impl MoqBroadcastConsumer {
15	pub(crate) fn new(inner: moq_net::BroadcastConsumer) -> Self {
16		Self { inner }
17	}
18
19	/// Access the underlying `moq_net::BroadcastConsumer` for sibling
20	/// modules (e.g. `audio`) that need to subscribe a typed track.
21	pub(crate) fn inner(&self) -> &moq_net::BroadcastConsumer {
22		&self.inner
23	}
24}
25
26#[derive(uniffi::Object)]
27pub struct MoqCatalogConsumer {
28	task: Task<Catalog>,
29}
30
31struct Catalog {
32	inner: moq_mux::catalog::hang::Consumer,
33}
34
35impl Catalog {
36	async fn next(&mut self) -> Result<Option<MoqCatalog>, MoqError> {
37		match self.inner.next().await {
38			Ok(Some(catalog)) => Ok(Some(convert_catalog(&catalog))),
39			Ok(None) => Ok(None),
40			Err(e) => Err(e.into()),
41		}
42	}
43}
44
45#[derive(uniffi::Object)]
46pub struct MoqMediaConsumer {
47	task: Task<Media>,
48}
49
50struct Media {
51	inner: moq_mux::container::Consumer<moq_mux::catalog::hang::Container>,
52}
53
54impl Media {
55	async fn next(&mut self) -> Result<Option<MoqFrame>, MoqError> {
56		let frame = self.inner.read().await?;
57
58		let Some(frame) = frame else {
59			return Ok(None);
60		};
61
62		let timestamp_us: u64 = frame
63			.timestamp
64			.as_micros()
65			.try_into()
66			.map_err(|_| MoqError::Codec("timestamp overflow".into()))?;
67
68		let mut buf = frame.payload;
69		let payload = buf.copy_to_bytes(buf.remaining()).to_vec();
70
71		Ok(Some(MoqFrame {
72			payload,
73			timestamp_us,
74			keyframe: frame.keyframe,
75		}))
76	}
77}
78
79// ---- Broadcast ----
80
81#[uniffi::export]
82impl MoqBroadcastConsumer {
83	/// Subscribe to the catalog for this broadcast.
84	pub fn subscribe_catalog(&self) -> Result<Arc<MoqCatalogConsumer>, MoqError> {
85		let _guard = crate::ffi::RUNTIME.enter();
86		let track = self.inner.subscribe_track(&hang::catalog::Catalog::default_track())?;
87		let consumer = moq_mux::catalog::hang::Consumer::from(track);
88		Ok(Arc::new(MoqCatalogConsumer {
89			task: Task::new(Catalog { inner: consumer }),
90		}))
91	}
92
93	/// Subscribe to a track by name — same pattern as moq-boy's command/status tracks.
94	///
95	/// Frames are returned as plain byte payloads with no codec or container parsing.
96	pub fn subscribe_track(&self, name: String) -> Result<Arc<MoqTrackConsumer>, MoqError> {
97		let _guard = crate::ffi::RUNTIME.enter();
98		let track = self.inner.subscribe_track(&moq_net::Track { name, priority: 0 })?;
99		Ok(Arc::new(MoqTrackConsumer::new(track)))
100	}
101
102	/// Subscribe to a track by name, delivering frames in decode order.
103	///
104	/// `container` is the track container from the catalog.
105	/// `max_latency_ms` controls the maximum buffering before skipping a GoP.
106	pub fn subscribe_media(
107		&self,
108		name: String,
109		container: Container,
110		max_latency_ms: u64,
111	) -> Result<Arc<MoqMediaConsumer>, MoqError> {
112		let _guard = crate::ffi::RUNTIME.enter();
113		// Parse the container before subscribing so we don't leave a dangling
114		// subscription if init parsing fails.
115		let container: hang::catalog::Container = container.into();
116		let media: moq_mux::catalog::hang::Container = (&container)
117			.try_into()
118			.map_err(|e| MoqError::Codec(format!("invalid container: {e}")))?;
119		let track = self.inner.subscribe_track(&moq_net::Track { name, priority: 0 })?;
120		let latency = std::time::Duration::from_millis(max_latency_ms);
121		let consumer = moq_mux::container::Consumer::new(track, media).with_latency(latency);
122		Ok(Arc::new(MoqMediaConsumer {
123			task: Task::new(Media { inner: consumer }),
124		}))
125	}
126}
127
128// ---- Track Consumer ----
129
130struct TrackInner {
131	track: moq_net::TrackConsumer,
132}
133
134impl TrackInner {
135	async fn recv_group(&mut self) -> Result<Option<moq_net::GroupConsumer>, MoqError> {
136		Ok(self.track.recv_group().await?)
137	}
138
139	async fn next_group(&mut self) -> Result<Option<moq_net::GroupConsumer>, MoqError> {
140		Ok(self.track.next_group().await?)
141	}
142
143	async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, MoqError> {
144		Ok(self.track.read_frame().await?.map(|b| b.to_vec()))
145	}
146}
147
148#[derive(uniffi::Object)]
149pub struct MoqTrackConsumer {
150	task: Task<TrackInner>,
151}
152
153impl MoqTrackConsumer {
154	pub(crate) fn new(track: moq_net::TrackConsumer) -> Self {
155		Self {
156			task: Task::new(TrackInner { track }),
157		}
158	}
159}
160
161#[uniffi::export]
162impl MoqTrackConsumer {
163	/// Return the next group in arrival order. Returns `None` when the track ends.
164	///
165	/// Groups are returned as they arrive on the wire, which may be out of sequence
166	/// order (e.g. if a later group lands before an earlier one on a separate stream).
167	pub async fn recv_group(&self) -> Result<Option<Arc<MoqGroupConsumer>>, MoqError> {
168		self.task
169			.run(|mut state| async move {
170				Ok(state.recv_group().await?.map(|group| {
171					Arc::new(MoqGroupConsumer {
172						sequence: group.sequence,
173						task: Task::new(GroupInner { group }),
174					})
175				}))
176			})
177			.await
178	}
179
180	/// Return the next group in sequence order, skipping forward if the reader
181	/// has fallen behind. Returns `None` when the track ends.
182	pub async fn next_group(&self) -> Result<Option<Arc<MoqGroupConsumer>>, MoqError> {
183		self.task
184			.run(|mut state| async move {
185				Ok(state.next_group().await?.map(|group| {
186					Arc::new(MoqGroupConsumer {
187						sequence: group.sequence,
188						task: Task::new(GroupInner { group }),
189					})
190				}))
191			})
192			.await
193	}
194
195	/// Read the first frame of the next group.
196	///
197	/// Convenience for tracks using one-frame-per-group (like moq-boy's
198	/// status/command tracks). Returns `None` when the track ends.
199	pub async fn read_frame(&self) -> Result<Option<Vec<u8>>, MoqError> {
200		self.task.run(|mut state| async move { state.read_frame().await }).await
201	}
202
203	pub fn cancel(&self) {
204		self.task.cancel();
205	}
206}
207
208struct GroupInner {
209	group: moq_net::GroupConsumer,
210}
211
212impl GroupInner {
213	async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, MoqError> {
214		Ok(self.group.read_frame().await?.map(|b| b.to_vec()))
215	}
216}
217
218#[derive(uniffi::Object)]
219pub struct MoqGroupConsumer {
220	sequence: u64,
221	task: Task<GroupInner>,
222}
223
224impl MoqGroupConsumer {
225	pub(crate) fn new(group: moq_net::GroupConsumer) -> Self {
226		Self {
227			sequence: group.sequence,
228			task: Task::new(GroupInner { group }),
229		}
230	}
231}
232
233#[uniffi::export]
234impl MoqGroupConsumer {
235	/// The sequence number of this group within the track.
236	pub fn sequence(&self) -> u64 {
237		self.sequence
238	}
239
240	/// Read the next frame in this group. Returns `None` when the group ends.
241	pub async fn read_frame(&self) -> Result<Option<Vec<u8>>, MoqError> {
242		self.task.run(|mut state| async move { state.read_frame().await }).await
243	}
244
245	pub fn cancel(&self) {
246		self.task.cancel();
247	}
248}
249
250// ---- Catalog Consumer ----
251
252#[uniffi::export]
253impl MoqCatalogConsumer {
254	/// Get the next catalog update. Returns `None` when the track ends or is closed.
255	pub async fn next(&self) -> Result<Option<MoqCatalog>, MoqError> {
256		self.task.run(|mut state| async move { state.next().await }).await
257	}
258
259	/// Cancel all current and future `next()` calls.
260	pub fn cancel(&self) {
261		self.task.cancel();
262	}
263}
264
265// ---- Media Consumer ----
266
267#[uniffi::export]
268impl MoqMediaConsumer {
269	/// Get the next frame. Returns `None` when the track ends or is closed.
270	pub async fn next(&self) -> Result<Option<MoqFrame>, MoqError> {
271		self.task.run(|mut state| async move { state.next().await }).await
272	}
273
274	/// Cancel all current and future `next()` calls.
275	pub fn cancel(&self) {
276		self.task.cancel();
277	}
278}