pipewire_native/
main_loop.rs

1// SPDX-License-Identifier: MIT
2// SPDX-FileCopyrightText: Copyright (c) 2025 Asymptotic Inc.
3// SPDX-FileCopyrightText: Copyright (c) 2025 Sanchayan Maity
4
5use pipewire_native_spa as spa;
6
7use spa::flags;
8use spa::interface::r#loop::LoopUtilsSource;
9use spa::{emit_hook, hook::HookList};
10
11use std::os::fd::RawFd;
12use std::pin::Pin;
13use std::sync::atomic::AtomicBool;
14use std::sync::atomic::Ordering;
15use std::sync::{Arc, Mutex};
16
17use crate::{debug, default_topic, log, new_refcounted, properties::Properties, refcounted, trace};
18use crate::{HookId, GLOBAL_SUPPORT};
19
20default_topic!(log::topic::MAIN_LOOP);
21
22/// Main loop events
23pub struct MainLoopEvents {
24    /// The main loop was destroyed.
25    pub destroy: Option<Box<dyn FnMut()>>,
26}
27
28unsafe impl Send for MainLoopEvents {}
29
30#[derive(Clone)]
31pub(crate) struct LoopSupport {
32    #[allow(dead_code)]
33    handle: Arc<Box<dyn spa::interface::plugin::Handle + Send + Sync>>,
34    pub(crate) loop_: Arc<Pin<Box<spa::interface::r#loop::LoopImpl>>>,
35    pub(crate) loop_utils: Arc<Pin<Box<spa::interface::r#loop::LoopUtilsImpl>>>,
36    pub(crate) loop_control: Arc<Pin<Box<spa::interface::r#loop::LoopControlImpl>>>,
37}
38
39/// An event source on the mainloop.
40pub struct Source {
41    inner: Pin<Box<LoopUtilsSource>>,
42}
43
44impl Source {
45    /// Mask representing current set of even types the source is interested in.
46    pub fn mask(&self) -> spa::flags::Io {
47        self.inner.mask
48    }
49
50    fn from_loop_utils(source: Pin<Box<LoopUtilsSource>>) -> Self {
51        Source { inner: source }
52    }
53}
54
55/// Callback for events triggered by [MainLoop::signal_event]
56pub type SourceEventFn = spa::interface::r#loop::SourceEventFn;
57/// Callback for events triggered by [MainLoop] idle events.
58pub type SourceIdleFn = spa::interface::r#loop::SourceIdleFn;
59/// Callback for events triggered by [MainLoop] I/O events.
60pub type SourceIoFn = spa::interface::r#loop::SourceIoFn;
61/// Callback for events triggered by signals on the [MainLoop].
62pub type SourceSignalFn = spa::interface::r#loop::SourceSignalFn;
63/// Callback for timer events on the [MainLoop].
64pub type SourceTimerFn = spa::interface::r#loop::SourceTimerFn;
65
66refcounted! {
67    /// An event loop implementation. This event loop is used for communication with PipeWire, and
68    /// provides a number of primitives for integration with applications and any other event loops
69    /// or event sources they might have.
70    ///
71    /// Many of these primitives are quite low-level, and it is also possible to use the main loop
72    /// only for PipeWire-related events. This can be done by creating the [MainLoop] and invoking
73    /// [MainLoop::run] on a separate thread. In this case, it is important to make sure that any
74    /// access to common data structures is protected from concurrent access. The [MainLoop::lock]
75    /// and [MainLoop::unlock] methods provide such a mechanism.
76    pub struct MainLoop {
77        support: LoopSupport,
78        // This is an atomic because it is hard to convince Rust that this will only be mutated on one
79        // thread (i.e. the one on which run() is called
80        running: AtomicBool,
81        name: String,
82        hooks: Arc<Mutex<HookList<MainLoopEvents>>>,
83    }
84}
85
86impl MainLoop {
87    /// Create a new main loop.
88    pub fn new(props: &Properties) -> Option<MainLoop> {
89        let l = InnerMainLoop::new(props)?;
90
91        debug!("Creating main loop");
92
93        Some(MainLoop {
94            inner: new_refcounted(l),
95        })
96    }
97
98    pub(crate) fn set_running(&self) -> std::io::Result<()> {
99        if self
100            .inner
101            .running
102            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
103            .is_err()
104        {
105            Err(std::io::Error::from(std::io::ErrorKind::AlreadyExists))
106        } else {
107            Ok(())
108        }
109    }
110
111    pub(crate) fn run_once(&self) -> std::io::Result<i32> {
112        if !self.inner.running.load(Ordering::Relaxed) {
113            return Err(std::io::Error::from(std::io::ErrorKind::NotConnected));
114        }
115
116        self.inner.support.loop_control.enter();
117
118        let res = self
119            .inner
120            .support
121            .loop_control
122            .iterate(Some(std::time::Duration::MAX));
123
124        self.inner.support.loop_control.leave();
125
126        res
127    }
128
129    /// Run the main loop. This takes over executation of the current thread until [MainLoop::quit]
130    /// is invoked.
131    pub fn run(&self) {
132        if self
133            .inner
134            .running
135            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
136            .is_err()
137        {
138            return;
139        }
140
141        self.inner.support.loop_control.enter();
142
143        while self.inner.running.load(Ordering::Relaxed) {
144            if let Err(res) = self
145                .inner
146                .support
147                .loop_control
148                .iterate(Some(std::time::Duration::MAX))
149            {
150                if res.kind() == std::io::ErrorKind::Interrupted {
151                    continue;
152                }
153            }
154        }
155
156        self.inner.support.loop_control.leave();
157    }
158
159    /// Terminate execution of the main loop.
160    pub fn quit(&self) {
161        debug!("quit");
162
163        let this = self.clone();
164
165        let stop = move |_block: bool, _seq: u32, _data: &[u8]| {
166            this.inner.running.store(false, Ordering::Relaxed);
167            0
168        };
169
170        let _ = self
171            .inner
172            .support
173            .loop_
174            .invoke(1, &[], false, Box::new(stop));
175    }
176
177    /// Add a listener for main loop events.
178    pub fn add_listener(&self, events: MainLoopEvents) -> HookId {
179        self.inner.hooks.lock().unwrap().append(events)
180    }
181
182    /// Remove a set of event listeners.
183    pub fn remove_listener(&self, hook_id: HookId) {
184        self.inner.hooks.lock().unwrap().remove(hook_id);
185    }
186
187    // Loop control methods
188    // TODO: decide how we want to expose these, if at all
189    #[doc(hidden)]
190    pub fn get_fd(&self) -> RawFd {
191        self.inner.support.loop_control.get_fd() as RawFd
192    }
193
194    #[doc(hidden)]
195    pub fn enter(&self) {
196        trace!("enter");
197        self.inner.support.loop_control.enter()
198    }
199
200    #[doc(hidden)]
201    pub fn leave(&self) {
202        trace!("leave");
203        self.inner.support.loop_control.leave()
204    }
205
206    /// Run one iteration of the loop, returning if no events occurred in `timeout` time. A value
207    /// of `None` signifies an infinite timeout. Returns the number of iterations performed.
208    pub fn iterate(&self, timeout: Option<std::time::Duration>) -> std::io::Result<i32> {
209        trace!("iterate");
210        self.inner.support.loop_control.iterate(timeout)
211    }
212
213    #[doc(hidden)]
214    pub fn check(&self) -> std::io::Result<i32> {
215        self.inner.support.loop_control.check()
216    }
217
218    /// Take the main loop lock, preventing it from running until the lock is released. The main
219    /// loop takes this lock in each of its iterations.
220    pub fn lock(&self) -> std::io::Result<()> {
221        trace!("lock");
222        self.inner.support.loop_control.lock()?;
223        Ok(())
224    }
225
226    /// Release the main loop lock, allowing it to run again.
227    pub fn unlock(&self) -> std::io::Result<()> {
228        trace!("unlock");
229        self.inner.support.loop_control.unlock()?;
230        Ok(())
231    }
232
233    /// Get the time corresponding to the specified timeout, which can be used with
234    /// [MainLoop::wait].
235    pub fn get_time(&self, timeout: std::time::Duration) -> std::io::Result<libc::timespec> {
236        self.inner.support.loop_control.get_time(timeout)
237    }
238
239    /// Wait until the specified time or until signalled.
240    pub fn wait(&self, abstime: &libc::timespec) -> std::io::Result<()> {
241        debug!("wait");
242        self.inner.support.loop_control.wait(abstime)?;
243        Ok(())
244    }
245
246    /// Signal any threads waiting with [MainLoop::wait] to wake up.
247    pub fn signal(&self, wait_for_accept: bool) -> std::io::Result<()> {
248        debug!("signal");
249        self.inner.support.loop_control.signal(wait_for_accept)?;
250        Ok(())
251    }
252
253    /// Acknowledge a [MainLoop::signal] with `wait_for_accept: true`, allowing that thread to
254    /// proceed.
255    pub fn accept(&self) -> std::io::Result<()> {
256        debug!("accept");
257        self.inner.support.loop_control.accept()?;
258        Ok(())
259    }
260
261    /// Add an I/O event source using the given `fd`.
262    pub fn add_io(
263        &self,
264        fd: RawFd,
265        mask: flags::Io,
266        close: bool,
267        func: Box<SourceIoFn>,
268    ) -> Option<Source> {
269        self.inner
270            .support
271            .loop_utils
272            .add_io(fd, mask, close, func)
273            .map(Source::from_loop_utils)
274    }
275
276    /// Update the set of events of interest (given by `mask`).
277    pub fn update_io(&self, source: &mut Source, mask: flags::Io) -> std::io::Result<i32> {
278        self.inner
279            .support
280            .loop_utils
281            .update_io(&mut source.inner, mask)
282    }
283
284    /// Add an idle event source (called after each iteration of the main loop).
285    pub fn add_idle(&self, enabled: bool, func: Box<SourceIdleFn>) -> Option<Source> {
286        self.inner
287            .support
288            .loop_utils
289            .add_idle(enabled, func)
290            .map(Source::from_loop_utils)
291    }
292
293    /// Enable or disable an idle event source.
294    pub fn enable_idle(&self, source: &mut Source, enabled: bool) -> std::io::Result<i32> {
295        debug!("idle {enabled}");
296        self.inner
297            .support
298            .loop_utils
299            .enable_idle(&mut source.inner, enabled)
300    }
301
302    /// Add a source which can be triggered using [Self::signal_event].
303    pub fn add_event(&self, func: Box<SourceEventFn>) -> Option<Source> {
304        self.inner
305            .support
306            .loop_utils
307            .add_event(func)
308            .map(Source::from_loop_utils)
309    }
310
311    /// Signal a source added with [Self::add_event].
312    pub fn signal_event(&self, source: &mut Source) -> std::io::Result<i32> {
313        self.inner
314            .support
315            .loop_utils
316            .signal_event(&mut source.inner)
317    }
318
319    /// Add a timer source.
320    pub fn add_timer(&self, func: Box<SourceTimerFn>) -> Option<Source> {
321        self.inner
322            .support
323            .loop_utils
324            .add_timer(func)
325            .map(Source::from_loop_utils)
326    }
327
328    /// Update the timer interval of a timer source.
329    pub fn update_timer(
330        &self,
331        source: &mut Source,
332        value: &libc::timespec,
333        interval: Option<&libc::timespec>,
334        absolute: bool,
335    ) -> std::io::Result<i32> {
336        self.inner
337            .support
338            .loop_utils
339            .update_timer(&mut source.inner, value, interval, absolute)
340    }
341
342    /// Add a source triggered by UNIX signals.
343    pub fn add_signal(&self, signal_number: i32, func: Box<SourceSignalFn>) -> Option<Source> {
344        self.inner
345            .support
346            .loop_utils
347            .add_signal(signal_number, func)
348            .map(Source::from_loop_utils)
349    }
350
351    /// Remove and destroy an event source.
352    pub fn destroy_source(&self, source: Source) {
353        self.inner.support.loop_utils.destroy_source(source.inner)
354    }
355
356    /// Set the main loop name.
357    pub fn set_name(&mut self, name: &str) {
358        debug!("main loop name {name}");
359        if let Some(inner) = Arc::get_mut(&mut self.inner) {
360            inner.name = name.to_string()
361        }
362    }
363
364    pub(crate) fn support(&self) -> LoopSupport {
365        self.inner.support.clone()
366    }
367}
368
369impl Drop for InnerMainLoop {
370    fn drop(&mut self) {
371        self.destroy();
372    }
373}
374
375impl InnerMainLoop {
376    pub fn new(props: &Properties) -> Option<InnerMainLoop> {
377        let support = GLOBAL_SUPPORT
378            .get()
379            .expect("Global support should be initialised");
380
381        let handle = support
382            .load_spa_handle(None, spa::interface::plugin::LOOP_FACTORY, None)
383            .ok()?;
384
385        let loop_ = handle.get_interface(spa::interface::LOOP).and_then(|i| {
386            Arc::new(Box::into_pin(i))
387                .downcast_arc_pin_box::<spa::interface::r#loop::LoopImpl>()
388                .ok()
389        })?;
390        let loop_utils = handle
391            .get_interface(spa::interface::LOOP_UTILS)
392            .and_then(|i| {
393                Arc::new(Box::into_pin(i))
394                    .downcast_arc_pin_box::<spa::interface::r#loop::LoopUtilsImpl>()
395                    .ok()
396            })?;
397        let loop_control = handle
398            .get_interface(spa::interface::LOOP_CONTROL)
399            .and_then(|i| {
400                Arc::new(Box::into_pin(i))
401                    .downcast_arc_pin_box::<spa::interface::r#loop::LoopControlImpl>()
402                    .ok()
403            })?;
404
405        let name = if let Some(n) = props.get("loop.name") {
406            n.to_string()
407        } else {
408            "main.loop".to_string()
409        };
410
411        Some(InnerMainLoop {
412            support: LoopSupport {
413                handle: Arc::new(handle),
414                loop_,
415                loop_utils,
416                loop_control,
417            },
418            running: AtomicBool::new(false),
419            name,
420            hooks: HookList::new(),
421        })
422    }
423
424    fn destroy(&self) {
425        emit_hook!(self.hooks, destroy);
426    }
427}