pipewire_native/
main_loop.rs1use pipewire_native_spa as spa;
6
7use spa::flags;
8use spa::interface::r#loop::LoopUtilsSource;
9use spa::{emit_hook, hook::HookList};
10
11use std::os::fd::RawFd;
12use std::pin::Pin;
13use std::sync::atomic::AtomicBool;
14use std::sync::atomic::Ordering;
15use std::sync::{Arc, Mutex};
16
17use crate::{debug, default_topic, log, new_refcounted, properties::Properties, refcounted, trace};
18use crate::{HookId, GLOBAL_SUPPORT};
19
20default_topic!(log::topic::MAIN_LOOP);
21
22pub struct MainLoopEvents {
24 pub destroy: Option<Box<dyn FnMut()>>,
26}
27
28unsafe impl Send for MainLoopEvents {}
29
30#[derive(Clone)]
31pub(crate) struct LoopSupport {
32 #[allow(dead_code)]
33 handle: Arc<Box<dyn spa::interface::plugin::Handle + Send + Sync>>,
34 pub(crate) loop_: Arc<Pin<Box<spa::interface::r#loop::LoopImpl>>>,
35 pub(crate) loop_utils: Arc<Pin<Box<spa::interface::r#loop::LoopUtilsImpl>>>,
36 pub(crate) loop_control: Arc<Pin<Box<spa::interface::r#loop::LoopControlImpl>>>,
37}
38
39pub struct Source {
41 inner: Pin<Box<LoopUtilsSource>>,
42}
43
44impl Source {
45 pub fn mask(&self) -> spa::flags::Io {
47 self.inner.mask
48 }
49
50 fn from_loop_utils(source: Pin<Box<LoopUtilsSource>>) -> Self {
51 Source { inner: source }
52 }
53}
54
55pub type SourceEventFn = spa::interface::r#loop::SourceEventFn;
57pub type SourceIdleFn = spa::interface::r#loop::SourceIdleFn;
59pub type SourceIoFn = spa::interface::r#loop::SourceIoFn;
61pub type SourceSignalFn = spa::interface::r#loop::SourceSignalFn;
63pub type SourceTimerFn = spa::interface::r#loop::SourceTimerFn;
65
66refcounted! {
67 pub struct MainLoop {
77 support: LoopSupport,
78 running: AtomicBool,
81 name: String,
82 hooks: Arc<Mutex<HookList<MainLoopEvents>>>,
83 }
84}
85
86impl MainLoop {
87 pub fn new(props: &Properties) -> Option<MainLoop> {
89 let l = InnerMainLoop::new(props)?;
90
91 debug!("Creating main loop");
92
93 Some(MainLoop {
94 inner: new_refcounted(l),
95 })
96 }
97
98 pub(crate) fn set_running(&self) -> std::io::Result<()> {
99 if self
100 .inner
101 .running
102 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
103 .is_err()
104 {
105 Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists))
106 } else {
107 Ok(())
108 }
109 }
110
111 pub(crate) fn run_once(&self) -> std::io::Result<i32> {
112 if !self.inner.running.load(Ordering::Relaxed) {
113 return Err(std::io::Error::from(std::io::ErrorKind::NotConnected));
114 }
115
116 self.inner.support.loop_control.enter();
117
118 let res = self
119 .inner
120 .support
121 .loop_control
122 .iterate(Some(std::time::Duration::MAX));
123
124 self.inner.support.loop_control.leave();
125
126 res
127 }
128
129 pub fn run(&self) {
132 if self
133 .inner
134 .running
135 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
136 .is_err()
137 {
138 return;
139 }
140
141 self.inner.support.loop_control.enter();
142
143 while self.inner.running.load(Ordering::Relaxed) {
144 if let Err(res) = self
145 .inner
146 .support
147 .loop_control
148 .iterate(Some(std::time::Duration::MAX))
149 {
150 if res.kind() == std::io::ErrorKind::Interrupted {
151 continue;
152 }
153 }
154 }
155
156 self.inner.support.loop_control.leave();
157 }
158
159 pub fn quit(&self) {
161 debug!("quit");
162
163 let this = self.clone();
164
165 let stop = move |_block: bool, _seq: u32, _data: &[u8]| {
166 this.inner.running.store(false, Ordering::Relaxed);
167 0
168 };
169
170 let _ = self
171 .inner
172 .support
173 .loop_
174 .invoke(1, &[], false, Box::new(stop));
175 }
176
177 pub fn add_listener(&self, events: MainLoopEvents) -> HookId {
179 self.inner.hooks.lock().unwrap().append(events)
180 }
181
182 pub fn remove_listener(&self, hook_id: HookId) {
184 self.inner.hooks.lock().unwrap().remove(hook_id);
185 }
186
187 #[doc(hidden)]
190 pub fn get_fd(&self) -> RawFd {
191 self.inner.support.loop_control.get_fd() as RawFd
192 }
193
194 #[doc(hidden)]
195 pub fn enter(&self) {
196 trace!("enter");
197 self.inner.support.loop_control.enter()
198 }
199
200 #[doc(hidden)]
201 pub fn leave(&self) {
202 trace!("leave");
203 self.inner.support.loop_control.leave()
204 }
205
206 pub fn iterate(&self, timeout: Option<std::time::Duration>) -> std::io::Result<i32> {
209 trace!("iterate");
210 self.inner.support.loop_control.iterate(timeout)
211 }
212
213 #[doc(hidden)]
214 pub fn check(&self) -> std::io::Result<i32> {
215 self.inner.support.loop_control.check()
216 }
217
218 pub fn lock(&self) -> std::io::Result<()> {
221 trace!("lock");
222 self.inner.support.loop_control.lock()?;
223 Ok(())
224 }
225
226 pub fn unlock(&self) -> std::io::Result<()> {
228 trace!("unlock");
229 self.inner.support.loop_control.unlock()?;
230 Ok(())
231 }
232
233 pub fn get_time(&self, timeout: std::time::Duration) -> std::io::Result<libc::timespec> {
236 self.inner.support.loop_control.get_time(timeout)
237 }
238
239 pub fn wait(&self, abstime: &libc::timespec) -> std::io::Result<()> {
241 debug!("wait");
242 self.inner.support.loop_control.wait(abstime)?;
243 Ok(())
244 }
245
246 pub fn signal(&self, wait_for_accept: bool) -> std::io::Result<()> {
248 debug!("signal");
249 self.inner.support.loop_control.signal(wait_for_accept)?;
250 Ok(())
251 }
252
253 pub fn accept(&self) -> std::io::Result<()> {
256 debug!("accept");
257 self.inner.support.loop_control.accept()?;
258 Ok(())
259 }
260
261 pub fn add_io(
263 &self,
264 fd: RawFd,
265 mask: flags::Io,
266 close: bool,
267 func: Box<SourceIoFn>,
268 ) -> Option<Source> {
269 self.inner
270 .support
271 .loop_utils
272 .add_io(fd, mask, close, func)
273 .map(Source::from_loop_utils)
274 }
275
276 pub fn update_io(&self, source: &mut Source, mask: flags::Io) -> std::io::Result<i32> {
278 self.inner
279 .support
280 .loop_utils
281 .update_io(&mut source.inner, mask)
282 }
283
284 pub fn add_idle(&self, enabled: bool, func: Box<SourceIdleFn>) -> Option<Source> {
286 self.inner
287 .support
288 .loop_utils
289 .add_idle(enabled, func)
290 .map(Source::from_loop_utils)
291 }
292
293 pub fn enable_idle(&self, source: &mut Source, enabled: bool) -> std::io::Result<i32> {
295 debug!("idle {enabled}");
296 self.inner
297 .support
298 .loop_utils
299 .enable_idle(&mut source.inner, enabled)
300 }
301
302 pub fn add_event(&self, func: Box<SourceEventFn>) -> Option<Source> {
304 self.inner
305 .support
306 .loop_utils
307 .add_event(func)
308 .map(Source::from_loop_utils)
309 }
310
311 pub fn signal_event(&self, source: &mut Source) -> std::io::Result<i32> {
313 self.inner
314 .support
315 .loop_utils
316 .signal_event(&mut source.inner)
317 }
318
319 pub fn add_timer(&self, func: Box<SourceTimerFn>) -> Option<Source> {
321 self.inner
322 .support
323 .loop_utils
324 .add_timer(func)
325 .map(Source::from_loop_utils)
326 }
327
328 pub fn update_timer(
330 &self,
331 source: &mut Source,
332 value: &libc::timespec,
333 interval: Option<&libc::timespec>,
334 absolute: bool,
335 ) -> std::io::Result<i32> {
336 self.inner
337 .support
338 .loop_utils
339 .update_timer(&mut source.inner, value, interval, absolute)
340 }
341
342 pub fn add_signal(&self, signal_number: i32, func: Box<SourceSignalFn>) -> Option<Source> {
344 self.inner
345 .support
346 .loop_utils
347 .add_signal(signal_number, func)
348 .map(Source::from_loop_utils)
349 }
350
351 pub fn destroy_source(&self, source: Source) {
353 self.inner.support.loop_utils.destroy_source(source.inner)
354 }
355
356 pub fn set_name(&mut self, name: &str) {
358 debug!("main loop name {name}");
359 if let Some(inner) = Arc::get_mut(&mut self.inner) {
360 inner.name = name.to_string()
361 }
362 }
363
364 pub(crate) fn support(&self) -> LoopSupport {
365 self.inner.support.clone()
366 }
367}
368
369impl Drop for InnerMainLoop {
370 fn drop(&mut self) {
371 self.destroy();
372 }
373}
374
375impl InnerMainLoop {
376 pub fn new(props: &Properties) -> Option<InnerMainLoop> {
377 let support = GLOBAL_SUPPORT
378 .get()
379 .expect("Global support should be initialised");
380
381 let handle = support
382 .load_spa_handle(None, spa::interface::plugin::LOOP_FACTORY, None)
383 .ok()?;
384
385 let loop_ = handle.get_interface(spa::interface::LOOP).and_then(|i| {
386 Arc::new(Box::into_pin(i))
387 .downcast_arc_pin_box::<spa::interface::r#loop::LoopImpl>()
388 .ok()
389 })?;
390 let loop_utils = handle
391 .get_interface(spa::interface::LOOP_UTILS)
392 .and_then(|i| {
393 Arc::new(Box::into_pin(i))
394 .downcast_arc_pin_box::<spa::interface::r#loop::LoopUtilsImpl>()
395 .ok()
396 })?;
397 let loop_control = handle
398 .get_interface(spa::interface::LOOP_CONTROL)
399 .and_then(|i| {
400 Arc::new(Box::into_pin(i))
401 .downcast_arc_pin_box::<spa::interface::r#loop::LoopControlImpl>()
402 .ok()
403 })?;
404
405 let name = if let Some(n) = props.get("loop.name") {
406 n.to_string()
407 } else {
408 "main.loop".to_string()
409 };
410
411 Some(InnerMainLoop {
412 support: LoopSupport {
413 handle: Arc::new(handle),
414 loop_,
415 loop_utils,
416 loop_control,
417 },
418 running: AtomicBool::new(false),
419 name,
420 hooks: HookList::new(),
421 })
422 }
423
424 fn destroy(&self) {
425 emit_hook!(self.hooks, destroy);
426 }
427}