1use std::sync::Arc;
2
3use bytes::Buf;
4
5use crate::error::MoqError;
6use crate::ffi::Task;
7use crate::media::*;
8
9#[derive(Clone, uniffi::Object)]
10pub struct MoqBroadcastConsumer {
11 inner: moq_net::BroadcastConsumer,
12}
13
14impl MoqBroadcastConsumer {
15 pub(crate) fn new(inner: moq_net::BroadcastConsumer) -> Self {
16 Self { inner }
17 }
18
19 pub(crate) fn inner(&self) -> &moq_net::BroadcastConsumer {
22 &self.inner
23 }
24}
25
26#[derive(uniffi::Object)]
27pub struct MoqCatalogConsumer {
28 task: Task<Catalog>,
29}
30
31struct Catalog {
32 inner: moq_mux::catalog::hang::Consumer,
33}
34
35impl Catalog {
36 async fn next(&mut self) -> Result<Option<MoqCatalog>, MoqError> {
37 match self.inner.next().await {
38 Ok(Some(catalog)) => Ok(Some(convert_catalog(&catalog))),
39 Ok(None) => Ok(None),
40 Err(e) => Err(e.into()),
41 }
42 }
43}
44
45#[derive(uniffi::Object)]
46pub struct MoqMediaConsumer {
47 task: Task<Media>,
48}
49
50struct Media {
51 inner: moq_mux::container::Consumer<moq_mux::catalog::hang::Container>,
52}
53
54impl Media {
55 async fn next(&mut self) -> Result<Option<MoqFrame>, MoqError> {
56 let frame = self.inner.read().await?;
57
58 let Some(frame) = frame else {
59 return Ok(None);
60 };
61
62 let timestamp_us: u64 = frame
63 .timestamp
64 .as_micros()
65 .try_into()
66 .map_err(|_| MoqError::Codec("timestamp overflow".into()))?;
67
68 let mut buf = frame.payload;
69 let payload = buf.copy_to_bytes(buf.remaining()).to_vec();
70
71 Ok(Some(MoqFrame {
72 payload,
73 timestamp_us,
74 keyframe: frame.keyframe,
75 }))
76 }
77}
78
79#[uniffi::export]
82impl MoqBroadcastConsumer {
83 pub fn subscribe_catalog(&self) -> Result<Arc<MoqCatalogConsumer>, MoqError> {
85 let _guard = crate::ffi::RUNTIME.enter();
86 let track = self.inner.subscribe_track(&hang::catalog::Catalog::default_track())?;
87 let consumer = moq_mux::catalog::hang::Consumer::from(track);
88 Ok(Arc::new(MoqCatalogConsumer {
89 task: Task::new(Catalog { inner: consumer }),
90 }))
91 }
92
93 pub fn subscribe_track(&self, name: String) -> Result<Arc<MoqTrackConsumer>, MoqError> {
97 let _guard = crate::ffi::RUNTIME.enter();
98 let track = self.inner.subscribe_track(&moq_net::Track { name, priority: 0 })?;
99 Ok(Arc::new(MoqTrackConsumer::new(track)))
100 }
101
102 pub fn subscribe_media(
107 &self,
108 name: String,
109 container: Container,
110 max_latency_ms: u64,
111 ) -> Result<Arc<MoqMediaConsumer>, MoqError> {
112 let _guard = crate::ffi::RUNTIME.enter();
113 let container: hang::catalog::Container = container.into();
116 let media: moq_mux::catalog::hang::Container = (&container)
117 .try_into()
118 .map_err(|e| MoqError::Codec(format!("invalid container: {e}")))?;
119 let track = self.inner.subscribe_track(&moq_net::Track { name, priority: 0 })?;
120 let latency = std::time::Duration::from_millis(max_latency_ms);
121 let consumer = moq_mux::container::Consumer::new(track, media).with_latency(latency);
122 Ok(Arc::new(MoqMediaConsumer {
123 task: Task::new(Media { inner: consumer }),
124 }))
125 }
126}
127
128struct TrackInner {
131 track: moq_net::TrackConsumer,
132}
133
134impl TrackInner {
135 async fn recv_group(&mut self) -> Result<Option<moq_net::GroupConsumer>, MoqError> {
136 Ok(self.track.recv_group().await?)
137 }
138
139 async fn next_group(&mut self) -> Result<Option<moq_net::GroupConsumer>, MoqError> {
140 Ok(self.track.next_group().await?)
141 }
142
143 async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, MoqError> {
144 Ok(self.track.read_frame().await?.map(|b| b.to_vec()))
145 }
146}
147
148#[derive(uniffi::Object)]
149pub struct MoqTrackConsumer {
150 task: Task<TrackInner>,
151}
152
153impl MoqTrackConsumer {
154 pub(crate) fn new(track: moq_net::TrackConsumer) -> Self {
155 Self {
156 task: Task::new(TrackInner { track }),
157 }
158 }
159}
160
161#[uniffi::export]
162impl MoqTrackConsumer {
163 pub async fn recv_group(&self) -> Result<Option<Arc<MoqGroupConsumer>>, MoqError> {
168 self.task
169 .run(|mut state| async move {
170 Ok(state.recv_group().await?.map(|group| {
171 Arc::new(MoqGroupConsumer {
172 sequence: group.sequence,
173 task: Task::new(GroupInner { group }),
174 })
175 }))
176 })
177 .await
178 }
179
180 pub async fn next_group(&self) -> Result<Option<Arc<MoqGroupConsumer>>, MoqError> {
183 self.task
184 .run(|mut state| async move {
185 Ok(state.next_group().await?.map(|group| {
186 Arc::new(MoqGroupConsumer {
187 sequence: group.sequence,
188 task: Task::new(GroupInner { group }),
189 })
190 }))
191 })
192 .await
193 }
194
195 pub async fn read_frame(&self) -> Result<Option<Vec<u8>>, MoqError> {
200 self.task.run(|mut state| async move { state.read_frame().await }).await
201 }
202
203 pub fn cancel(&self) {
204 self.task.cancel();
205 }
206}
207
208struct GroupInner {
209 group: moq_net::GroupConsumer,
210}
211
212impl GroupInner {
213 async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, MoqError> {
214 Ok(self.group.read_frame().await?.map(|b| b.to_vec()))
215 }
216}
217
218#[derive(uniffi::Object)]
219pub struct MoqGroupConsumer {
220 sequence: u64,
221 task: Task<GroupInner>,
222}
223
224impl MoqGroupConsumer {
225 pub(crate) fn new(group: moq_net::GroupConsumer) -> Self {
226 Self {
227 sequence: group.sequence,
228 task: Task::new(GroupInner { group }),
229 }
230 }
231}
232
233#[uniffi::export]
234impl MoqGroupConsumer {
235 pub fn sequence(&self) -> u64 {
237 self.sequence
238 }
239
240 pub async fn read_frame(&self) -> Result<Option<Vec<u8>>, MoqError> {
242 self.task.run(|mut state| async move { state.read_frame().await }).await
243 }
244
245 pub fn cancel(&self) {
246 self.task.cancel();
247 }
248}
249
250#[uniffi::export]
253impl MoqCatalogConsumer {
254 pub async fn next(&self) -> Result<Option<MoqCatalog>, MoqError> {
256 self.task.run(|mut state| async move { state.next().await }).await
257 }
258
259 pub fn cancel(&self) {
261 self.task.cancel();
262 }
263}
264
265#[uniffi::export]
268impl MoqMediaConsumer {
269 pub async fn next(&self) -> Result<Option<MoqFrame>, MoqError> {
271 self.task.run(|mut state| async move { state.next().await }).await
272 }
273
274 pub fn cancel(&self) {
276 self.task.cancel();
277 }
278}