Skip to main content

moq_ffi/
origin.rs

1use std::sync::Arc;
2
3use crate::consumer::MoqBroadcastConsumer;
4use crate::error::MoqError;
5use crate::ffi::Task;
6use crate::producer::MoqBroadcastProducer;
7
8#[derive(uniffi::Object)]
9pub struct MoqOriginProducer {
10	inner: moq_net::OriginProducer,
11}
12
13#[derive(uniffi::Object)]
14pub struct MoqOriginConsumer {
15	inner: moq_net::OriginConsumer,
16}
17
18#[derive(uniffi::Object)]
19pub struct MoqAnnounced {
20	task: Task<Announced>,
21}
22
23struct Announced {
24	inner: moq_net::OriginConsumer,
25}
26
27impl Announced {
28	async fn next(&mut self) -> Result<Option<Arc<MoqAnnouncement>>, MoqError> {
29		loop {
30			match self.inner.announced().await {
31				Some((path, Some(broadcast))) => {
32					return Ok(Some(Arc::new(MoqAnnouncement {
33						path: path.to_string(),
34						broadcast: Arc::new(MoqBroadcastConsumer::new(broadcast)),
35					})));
36				}
37				// TODO moq-lite will change to not emit None (unannounce) events here.
38				Some((_path, None)) => continue,
39				None => return Ok(None),
40			}
41		}
42	}
43
44	async fn available(&mut self) -> Result<Arc<MoqBroadcastConsumer>, MoqError> {
45		loop {
46			match self.inner.announced().await {
47				Some((_path, Some(broadcast))) => {
48					return Ok(Arc::new(MoqBroadcastConsumer::new(broadcast)));
49				}
50				// TODO moq-lite will change to not emit None (unannounce) events here.
51				Some((_path, None)) => continue,
52				None => return Err(MoqError::Closed),
53			}
54		}
55	}
56}
57
58/// A broadcast announcement from an origin.
59#[derive(uniffi::Object)]
60pub struct MoqAnnouncement {
61	path: String,
62	broadcast: Arc<MoqBroadcastConsumer>,
63}
64
65/// Waits for a specific broadcast to be announced.
66#[derive(uniffi::Object)]
67pub struct MoqAnnouncedBroadcast {
68	task: Task<Announced>,
69}
70
71impl MoqOriginProducer {
72	pub(crate) fn inner(&self) -> &moq_net::OriginProducer {
73		&self.inner
74	}
75}
76
77#[uniffi::export]
78impl MoqOriginProducer {
79	/// Create a new origin for publishing and/or consuming broadcasts.
80	#[uniffi::constructor]
81	pub fn new() -> Arc<Self> {
82		let _guard = crate::ffi::RUNTIME.enter();
83		Arc::new(Self {
84			inner: moq_net::Origin::random().produce(),
85		})
86	}
87
88	/// Create a consumer for this origin.
89	pub fn consume(&self) -> Arc<MoqOriginConsumer> {
90		let _guard = crate::ffi::RUNTIME.enter();
91		Arc::new(MoqOriginConsumer {
92			inner: self.inner.consume(),
93		})
94	}
95
96	/// Publish a broadcast to this origin under the given path.
97	pub fn publish(&self, path: String, broadcast: &MoqBroadcastProducer) -> Result<(), MoqError> {
98		let _guard = crate::ffi::RUNTIME.enter();
99		let consumer = broadcast.consume_inner()?;
100		if !self.inner.publish_broadcast(path.as_str(), consumer) {
101			return Err(MoqError::Unauthorized);
102		}
103		Ok(())
104	}
105}
106
107#[uniffi::export]
108impl MoqOriginConsumer {
109	/// Subscribe to all broadcast announcements under a prefix.
110	pub fn announced(&self, prefix: String) -> Result<Arc<MoqAnnounced>, MoqError> {
111		let _guard = crate::ffi::RUNTIME.enter();
112		let origin = self.inner.clone().with_root(prefix).ok_or(MoqError::Unauthorized)?;
113		Ok(Arc::new(MoqAnnounced {
114			task: Task::new(Announced { inner: origin }),
115		}))
116	}
117
118	/// Wait for a specific broadcast to be announced by path.
119	pub fn announced_broadcast(&self, path: String) -> Result<Arc<MoqAnnouncedBroadcast>, MoqError> {
120		let _guard = crate::ffi::RUNTIME.enter();
121		let origin = self.inner.clone().with_root(path).ok_or(MoqError::Unauthorized)?;
122		Ok(Arc::new(MoqAnnouncedBroadcast {
123			task: Task::new(Announced { inner: origin }),
124		}))
125	}
126}
127
128// ---- MoqAnnounced ----
129
130#[uniffi::export]
131impl MoqAnnounced {
132	/// Get the next broadcast announcement. Returns `None` when the origin is closed.
133	///
134	/// Use `broadcast.closed()` to learn when a broadcast is unannounced.
135	pub async fn next(&self) -> Result<Option<Arc<MoqAnnouncement>>, MoqError> {
136		self.task.run(|mut state| async move { state.next().await }).await
137	}
138
139	/// Cancel all current and future `next()` calls.
140	pub fn cancel(&self) {
141		self.task.cancel();
142	}
143}
144
145#[uniffi::export]
146impl MoqAnnouncement {
147	/// The path of the announced broadcast.
148	pub fn path(&self) -> String {
149		self.path.clone()
150	}
151
152	/// The broadcast consumer.
153	pub fn broadcast(&self) -> Arc<MoqBroadcastConsumer> {
154		self.broadcast.clone()
155	}
156}
157
158// ---- MoqAnnouncedBroadcast ----
159
160#[uniffi::export]
161impl MoqAnnouncedBroadcast {
162	/// Wait until the broadcast is announced. Returns `Closed` if cancelled or the origin is closed.
163	///
164	/// Use `broadcast.closed()` to learn when a broadcast is unannounced.
165	pub async fn available(&self) -> Result<Arc<MoqBroadcastConsumer>, MoqError> {
166		self.task.run(|mut state| async move { state.available().await }).await
167	}
168
169	/// Cancel all current and future `available()` calls.
170	pub fn cancel(&self) {
171		self.task.cancel();
172	}
173}