libpulse_binding/mainloop/
threaded.rs

1// Copyright 2017 Lyndon Brown
2//
3// This file is part of the PulseAudio Rust language binding.
4//
5// Licensed under the MIT license or the Apache license (version 2.0), at your option. You may not
6// copy, modify, or distribute this file except in compliance with said license. You can find copies
7// of these licenses either in the LICENSE-MIT and LICENSE-APACHE files, or alternatively at
8// <http://opensource.org/licenses/MIT> and <http://www.apache.org/licenses/LICENSE-2.0>
9// respectively.
10//
11// Portions of documentation are copied from the LGPL 2.1+ licensed PulseAudio C headers on a
12// fair-use basis, as discussed in the overall project readme (available in the git repository).
13
14//! A variation of the standard main loop implementation, using a background thread.
15//!
16//! # Overview
17//!
18//! The threaded main loop implementation is a special version of the standard main loop
19//! implementation. For the basic design, see the standard main loop documentation
20//! ([`mainloop::standard`](mod@crate::mainloop::standard)).
21//!
22//! The added feature in the threaded main loop is that it spawns a new thread that runs the real
23//! main loop in the background. This allows a synchronous application to use the asynchronous API
24//! without risking stalling the PulseAudio library. A few synchronization primitives are available
25//! to access the objects attached to the event loop safely.
26//!
27//! # Creation
28//!
29//! A [`Mainloop`] object is created using [`Mainloop::new()`]. This will only allocate the required
30//! structures though, so to use it the thread must also be started. This is done through
31//! [`Mainloop::start()`], after which you can start using the main loop.
32//!
33//! # Destruction
34//!
35//! When the PulseAudio connection has been terminated, the thread must be stopped and the
36//! resources freed. Stopping the thread is done using [`Mainloop::stop()`], which must be called
37//! without the lock (see below) held. When that function returns, the thread is stopped and the
38//! [`Mainloop`] object can be destroyed.
39//!
40//! Destruction of the [`Mainloop`] object is done automatically when the object falls out of scope.
41//! (Rust’s `Drop` trait has been implemented and takes care of it).
42//!
43//! # Locking
44//!
45//! Since the PulseAudio API doesn’t allow concurrent accesses to objects, a locking scheme must be
46//! used to guarantee safe usage. The threaded main loop API provides such a scheme through the
47//! functions [`Mainloop::lock()`] and [`Mainloop::unlock()`].
48//!
49//! The lock is recursive, so it’s safe to use it multiple times from the same thread. Just make
50//! sure you call [`Mainloop::unlock()`] the same number of times you called [`Mainloop::lock()`].
51//!
52//! The lock needs to be held whenever you call any PulseAudio function that uses an object
53//! associated with this main loop. Those objects include the mainloop, context, stream and
54//! operation objects, and the various event objects (io, time, defer). Make sure you do not hold on
55//! to the lock more than necessary though, as the threaded main loop stops while the lock is held.
56//!
57//! Example:
58//!
59//! ```rust,no_run
60//! extern crate libpulse_binding as pulse;
61//!
62//! use std::rc::Rc;
63//! use std::cell::RefCell;
64//! use pulse::mainloop::threaded::Mainloop;
65//! use pulse::stream::{Stream, State};
66//!
67//! fn check_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
68//!     m.borrow_mut().lock();
69//!
70//!     let state = s.borrow().get_state();
71//!
72//!     m.borrow_mut().unlock();
73//!
74//!     match state {
75//!         State::Ready => { println!("Stream is ready!"); },
76//!         _ => { println!("Stream is not ready!"); },
77//!     }
78//! }
79//! ```
80//!
81//! # Callbacks
82//!
83//! Callbacks in PulseAudio are asynchronous, so they require extra care when using them together
84//! with a threaded main loop.
85//!
86//! The easiest way to turn the callback based operations into synchronous ones, is to simply wait
87//! for the callback to be called and continue from there. This is the approach chosen in
88//! PulseAudio’s threaded API.
89//!
90//! ## Basic callbacks
91//!
92//! For the basic case, where all that is required is to wait for the callback to be invoked, the
93//! code should look something like this:
94//!
95//! Example:
96//!
97//! ```rust,no_run
98//! extern crate libpulse_binding as pulse;
99//!
100//! use std::rc::Rc;
101//! use std::cell::RefCell;
102//! use pulse::mainloop::threaded::Mainloop;
103//! use pulse::operation::State;
104//! use pulse::stream::Stream;
105//!
106//! fn drain_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
107//!     m.borrow_mut().lock();
108//!
109//!     // Drain
110//!     let o = {
111//!         let ml_ref = Rc::clone(&m);
112//!         s.borrow_mut().drain(Some(Box::new(move |_success: bool| {
113//!             unsafe { (*ml_ref.as_ptr()).signal(false); }
114//!         })))
115//!     };
116//!     while o.get_state() != pulse::operation::State::Done {
117//!         m.borrow_mut().wait();
118//!     }
119//!
120//!     m.borrow_mut().unlock();
121//! }
122//! ```
123//!
124//! The function `drain_stream` will wait for the callback to be called using [`Mainloop::wait()`].
125//!
126//! If your application is multi-threaded, then this waiting must be done inside a while loop. The
127//! reason for this is that multiple threads might be using [`Mainloop::wait()`] at the same time.
128//! Each thread must therefore verify that it was its callback that was invoked. Also the underlying
129//! OS synchronization primitives are usually not free of spurious wake-ups, so a
130//! [`Mainloop::wait()`] must be called within a loop even if you have only one thread waiting.
131//!
132//! The callback `my_drain_callback` indicates to the main function that it has been called using
133//! [`Mainloop::signal()`].
134//!
135//! As you can see, [`Mainloop::wait()`] may only be called with the lock held. The same thing is
136//! true for [`Mainloop::signal()`], but as the lock is held before the callback is invoked, you do
137//! not have to deal with that.
138//!
139//! The functions will not dead lock because the wait function will release the lock before waiting
140//! and then regrab it once it has been signalled. For those of you familiar with threads, the
141//! behaviour is that of a condition variable.
142//!
143//! ## Data callbacks
144//!
145//! For many callbacks, simply knowing that they have been called is insufficient. The callback also
146//! receives some data that is desired. To access this data safely, we must extend our example a
147//! bit:
148//!
149//! ```rust,no_run
150//! extern crate libpulse_binding as pulse;
151//!
152//! use std::rc::Rc;
153//! use std::cell::RefCell;
154//! use std::sync::atomic::{AtomicBool, Ordering};
155//! use pulse::mainloop::threaded::Mainloop;
156//! use pulse::stream::Stream;
157//!
158//! // A data structure to capture all our data in (currently just a pointer to a bool)
159//! struct DrainCbData(*mut bool);
160//!
161//! fn drain_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
162//!     m.borrow_mut().lock();
163//!
164//!     // For guarding against spurious wakeups
165//!     // Possibly also needed for memory flushing and ordering control
166//!     let mut guard = Rc::new(RefCell::new(AtomicBool::new(true)));
167//!
168//!     let mut data: Rc<RefCell<Option<DrainCbData>>> = Rc::new(RefCell::new(None));
169//!
170//!     // Drain
171//!     let o = {
172//!         let ml_ref = Rc::clone(&m);
173//!         let guard_ref = Rc::clone(&guard);
174//!         let data_ref = Rc::clone(&data);
175//!         s.borrow_mut().drain(Some(Box::new(move |mut success: bool| {
176//!             unsafe {
177//!                 *data_ref.as_ptr() = Some(DrainCbData(&mut success));
178//!                 (*guard_ref.as_ptr()).store(false, Ordering::Release);
179//!                 (*ml_ref.as_ptr()).signal(true);
180//!             }
181//!         })))
182//!     };
183//!     while guard.borrow().load(Ordering::Acquire) {
184//!         m.borrow_mut().wait();
185//!     }
186//!
187//!     assert!(!data.borrow().is_none());
188//!     let success = unsafe { *(data.borrow_mut().take().unwrap().0) };
189//!
190//!     // Allow callback to continue now
191//!     m.borrow_mut().accept();
192//!
193//!     match success {
194//!         false => { println!("Bitter defeat..."); },
195//!         true => { println!("Success!"); },
196//!     }
197//!
198//!     m.borrow_mut().unlock();
199//! }
200//! ```
201//!
202//! The example is a bit silly as it would have been more simple to just copy the contents of
203//! `success`, but for larger data structures this can be wasteful.
204//!
205//! The difference here compared to the basic callback is the value `true` passed to
206//! [`Mainloop::signal()`] and the call to [`Mainloop::accept()`]. What will happen is that
207//! [`Mainloop::signal()`] will signal the main function and then wait. The main function is then
208//! free to use the data in the callback until [`Mainloop::accept()`] is called, which will allow
209//! the callback to continue.
210//!
211//! Note that [`Mainloop::accept()`] must be called some time between exiting the while loop and
212//! unlocking the main loop! Failure to do so will result in a race condition. I.e. it is not okay
213//! to release the lock and regrab it before calling [`Mainloop::accept()`].
214//!
215//! ## Asynchronous callbacks
216//!
217//! PulseAudio also has callbacks that are completely asynchronous, meaning that they can be called
218//! at any time. The threaded main loop API provides the locking mechanism to handle concurrent
219//! accesses, but nothing else. Applications will have to handle communication from the callback to
220//! the main program through their own mechanisms.
221//!
222//! The callbacks that are completely asynchronous are:
223//!
224//! * State callbacks for contexts, streams, etc.
225//! * Subscription notifications.
226//!
227//! # Example
228//!
229//! An example program using the threaded mainloop:
230//!
231//! ```rust
232//! extern crate libpulse_binding as pulse;
233//!
234//! use std::rc::Rc;
235//! use std::cell::RefCell;
236//! use std::ops::Deref;
237//! use pulse::mainloop::threaded::Mainloop;
238//! use pulse::context::{Context, FlagSet as ContextFlagSet};
239//! use pulse::stream::{Stream, FlagSet as StreamFlagSet};
240//! use pulse::sample::{Spec, Format};
241//! use pulse::proplist::Proplist;
242//! use pulse::mainloop::api::Mainloop as MainloopTrait; //Needs to be in scope
243//!
244//! fn main() {
245//!     let spec = Spec {
246//!         format: Format::S16NE,
247//!         channels: 2,
248//!         rate: 44100,
249//!     };
250//!     assert!(spec.is_valid());
251//!
252//!     let mut proplist = Proplist::new().unwrap();
253//!     proplist.set_str(pulse::proplist::properties::APPLICATION_NAME, "FooApp")
254//!         .unwrap();
255//!
256//!     let mut mainloop = Rc::new(RefCell::new(Mainloop::new()
257//!         .expect("Failed to create mainloop")));
258//!
259//!     let mut context = Rc::new(RefCell::new(Context::new_with_proplist(
260//!         mainloop.borrow().deref(),
261//!         "FooAppContext",
262//!         &proplist
263//!         ).expect("Failed to create new context")));
264//!
265//!     // Context state change callback
266//!     {
267//!         let ml_ref = Rc::clone(&mainloop);
268//!         let context_ref = Rc::clone(&context);
269//!         context.borrow_mut().set_state_callback(Some(Box::new(move || {
270//!             let state = unsafe { (*context_ref.as_ptr()).get_state() };
271//!             match state {
272//!                 pulse::context::State::Ready |
273//!                 pulse::context::State::Failed |
274//!                 pulse::context::State::Terminated => {
275//!                     unsafe { (*ml_ref.as_ptr()).signal(false); }
276//!                 },
277//!                 _ => {},
278//!             }
279//!         })));
280//!     }
281//!
282//!     context.borrow_mut().connect(None, ContextFlagSet::NOFLAGS, None)
283//!         .expect("Failed to connect context");
284//!
285//!     mainloop.borrow_mut().lock();
286//!     mainloop.borrow_mut().start().expect("Failed to start mainloop");
287//!
288//!     // Wait for context to be ready
289//!     loop {
290//!         match context.borrow().get_state() {
291//!             pulse::context::State::Ready => { break; },
292//!             pulse::context::State::Failed |
293//!             pulse::context::State::Terminated => {
294//!                 eprintln!("Context state failed/terminated, quitting...");
295//!                 mainloop.borrow_mut().unlock();
296//!                 mainloop.borrow_mut().stop();
297//!                 return;
298//!             },
299//!             _ => { mainloop.borrow_mut().wait(); },
300//!         }
301//!     }
302//!     context.borrow_mut().set_state_callback(None);
303//!
304//!     let mut stream = Rc::new(RefCell::new(Stream::new(
305//!         &mut context.borrow_mut(),
306//!         "Music",
307//!         &spec,
308//!         None
309//!         ).expect("Failed to create new stream")));
310//!
311//!     // Stream state change callback
312//!     {
313//!         let ml_ref = Rc::clone(&mainloop);
314//!         let stream_ref = Rc::clone(&stream);
315//!         stream.borrow_mut().set_state_callback(Some(Box::new(move || {
316//!             let state = unsafe { (*stream_ref.as_ptr()).get_state() };
317//!             match state {
318//!                 pulse::stream::State::Ready |
319//!                 pulse::stream::State::Failed |
320//!                 pulse::stream::State::Terminated => {
321//!                     unsafe { (*ml_ref.as_ptr()).signal(false); }
322//!                 },
323//!                 _ => {},
324//!             }
325//!         })));
326//!     }
327//!
328//!     stream.borrow_mut().connect_playback(None, None, StreamFlagSet::START_CORKED,
329//!         None, None).expect("Failed to connect playback");
330//!
331//!     // Wait for stream to be ready
332//!     loop {
333//!         match stream.borrow().get_state() {
334//!             pulse::stream::State::Ready => { break; },
335//!             pulse::stream::State::Failed |
336//!             pulse::stream::State::Terminated => {
337//!                 eprintln!("Stream state failed/terminated, quitting...");
338//!                 mainloop.borrow_mut().unlock();
339//!                 mainloop.borrow_mut().stop();
340//!                 return;
341//!             },
342//!             _ => { mainloop.borrow_mut().wait(); },
343//!         }
344//!     }
345//!     stream.borrow_mut().set_state_callback(None);
346//!
347//!     mainloop.borrow_mut().unlock();
348//!
349//!     // Our main logic (to output a stream of audio data)
350//! #   let mut count = 0; // For automatic unit tests, we’ll spin a few times
351//!     loop {
352//!         mainloop.borrow_mut().lock();
353//!
354//!         // Write some data with stream.write()
355//!
356//!         if stream.borrow().is_corked().unwrap() {
357//!             stream.borrow_mut().uncork(None);
358//!         }
359//!
360//!         // Drain
361//!         let o = {
362//!             let ml_ref = Rc::clone(&mainloop);
363//!             stream.borrow_mut().drain(Some(Box::new(move |_success: bool| {
364//!                 unsafe { (*ml_ref.as_ptr()).signal(false); }
365//!             })))
366//!         };
367//!         while o.get_state() != pulse::operation::State::Done {
368//!             mainloop.borrow_mut().wait();
369//!         }
370//!
371//!         mainloop.borrow_mut().unlock();
372//!
373//!         // If done writing data, call `mainloop.borrow_mut().stop()` (with lock released), then
374//!         // break!
375//! #
376//! #       // Hack: Stop test getting stuck in infinite loop!
377//! #       count += 1;
378//! #       if count == 3 {
379//! #           mainloop.borrow_mut().stop();
380//! #           break;
381//! #       }
382//!     }
383//!
384//!     // Clean shutdown
385//!     mainloop.borrow_mut().lock();
386//!     stream.borrow_mut().disconnect().unwrap();
387//!     mainloop.borrow_mut().unlock();
388//! }
389//! ```
390
391use std::rc::Rc;
392use std::ffi::CString;
393use crate::def;
394use crate::error::PAErr;
395use crate::mainloop::api::{MainloopInternalType, MainloopInner, MainloopInnerType, MainloopApi,
396                           Mainloop as MainloopTrait};
397use crate::mainloop::signal::MainloopSignals;
398
399pub use capi::pa_threaded_mainloop as MainloopInternal;
400
401impl MainloopInternalType for MainloopInternal {}
402
403/// This acts as a safe interface to the internal PA Mainloop.
404///
405/// The mainloop object pointers are further enclosed here in a ref counted wrapper, allowing this
406/// outer wrapper to have clean methods for creating event objects, which can cleanly pass a copy of
407/// the inner ref counted mainloop object to them. Giving this to events serves two purposes,
408/// firstly because they need the API pointer, secondly, it ensures that event objects do not
409/// outlive the mainloop object.
410pub struct Mainloop {
411    /// The ref-counted inner data.
412    pub _inner: Rc<MainloopInner<MainloopInternal>>,
413}
414
415impl MainloopTrait for Mainloop {
416    type MI = MainloopInner<MainloopInternal>;
417
418    #[inline(always)]
419    fn inner(&self) -> Rc<MainloopInner<MainloopInternal>> {
420        Rc::clone(&self._inner)
421    }
422}
423
424impl MainloopSignals for Mainloop {}
425
426impl MainloopInner<MainloopInternal> {
427    #[inline(always)]
428    fn drop_actual(&mut self) {
429        unsafe { capi::pa_threaded_mainloop_free(self.get_ptr()) };
430    }
431}
432
433impl Mainloop {
434    /// Allocates a new threaded main loop object.
435    ///
436    /// You have to call [`start()`](Self::start) before the event loop thread starts running.
437    pub fn new() -> Option<Self> {
438        let ptr = unsafe { capi::pa_threaded_mainloop_new() };
439        if ptr.is_null() {
440            return None;
441        }
442        let api_ptr = unsafe { capi::pa_threaded_mainloop_get_api(ptr) };
443        assert!(!api_ptr.is_null());
444        let ml_inner = unsafe {
445            MainloopInner::<MainloopInternal>::new(ptr, std::mem::transmute(api_ptr),
446                MainloopInner::<MainloopInternal>::drop_actual, true)
447        };
448        Some(Self { _inner: Rc::new(ml_inner) })
449    }
450
451    /// Starts the event loop thread.
452    pub fn start(&mut self) -> Result<(), PAErr> {
453        match unsafe { capi::pa_threaded_mainloop_start(self._inner.get_ptr()) } {
454            0 => Ok(()),
455            e => Err(PAErr(e)),
456        }
457    }
458
459    /// Terminates the event loop thread cleanly.
460    ///
461    /// Make sure to unlock the mainloop object before calling this function.
462    #[inline]
463    pub fn stop(&mut self) {
464        unsafe { capi::pa_threaded_mainloop_stop(self._inner.get_ptr()); }
465    }
466
467    /// Locks the event loop object, effectively blocking the event loop thread from processing
468    /// events.
469    ///
470    /// You can use this to enforce exclusive access to all objects attached to the event loop. This
471    /// lock is recursive. This function may not be called inside the event loop thread. Events that
472    /// are dispatched from the event loop thread are executed with this lock held.
473    #[inline]
474    pub fn lock(&mut self) {
475        assert!(!self.in_thread(), "lock() can not be called from within the event loop thread!");
476        unsafe { capi::pa_threaded_mainloop_lock(self._inner.get_ptr()); }
477    }
478
479    /// Unlocks the event loop object, inverse of [`lock()`](Self::lock).
480    #[inline]
481    pub fn unlock(&mut self) {
482        unsafe { capi::pa_threaded_mainloop_unlock(self._inner.get_ptr()); }
483    }
484
485    /// Waits for an event to be signalled by the event loop thread.
486    ///
487    /// You can use this to pass data from the event loop thread to the main thread in a
488    /// synchronized fashion. This function may not be called inside the event loop thread. Prior to
489    /// this call the event loop object needs to be locked using [`lock()`]. While waiting the lock
490    /// will be released. Immediately before returning it will be acquired again. This function may
491    /// spuriously wake up even without [`signal()`] being called. You need to make sure to handle
492    /// that!
493    ///
494    /// [`lock()`]: Self::lock
495    /// [`signal()`]: Self::signal
496    #[inline]
497    pub fn wait(&mut self) {
498        unsafe { capi::pa_threaded_mainloop_wait(self._inner.get_ptr()); }
499    }
500
501    /// Signals all threads waiting for a signalling event in [`wait()`].
502    ///
503    /// If `wait_for_accept` is `true`, do not return before the signal was accepted by an
504    /// [`accept()`] call. While waiting for that condition the event loop object is unlocked.
505    ///
506    /// [`wait()`]: Self::wait
507    /// [`accept()`]: Self::accept
508    #[inline]
509    pub fn signal(&mut self, wait_for_accept: bool) {
510        unsafe {
511            capi::pa_threaded_mainloop_signal(self._inner.get_ptr(), wait_for_accept as i32);
512        }
513    }
514
515    /// Accepts a signal from the event thread issued with [`signal()`].
516    ///
517    /// This call should only be used in conjunction with [`signal()`] with `wait_for_accept` as
518    /// `true`.
519    ///
520    /// [`signal()`]: Self::signal
521    #[inline]
522    pub fn accept(&mut self) {
523        unsafe { capi::pa_threaded_mainloop_accept(self._inner.get_ptr()); }
524    }
525
526    /// Gets the return value as specified with the main loop’s `quit` routine (used internally by
527    /// threaded mainloop).
528    #[inline]
529    pub fn get_retval(&self) -> def::Retval {
530        def::Retval(unsafe { capi::pa_threaded_mainloop_get_retval(self._inner.get_ptr()) })
531    }
532
533    /// Gets the main loop abstraction layer vtable for this main loop.
534    ///
535    /// There is no need to free this object as it is owned by the loop and is destroyed when the
536    /// loop is freed.
537    ///
538    /// Talking to PA directly with C requires fetching this pointer explicitly via this function.
539    /// This is actually unnecessary through this binding. The pointer is retrieved automatically
540    /// upon Mainloop creation, stored internally, and automatically obtained from it by functions
541    /// that need it.
542    #[inline]
543    pub fn get_api<'a>(&self) -> &'a MainloopApi {
544        let ptr = self._inner.get_api_ptr();
545        assert_eq!(false, ptr.is_null());
546        unsafe { &*ptr }
547    }
548
549    /// Checks whether or not we are in the event loop thread (returns `true` if so).
550    #[inline]
551    pub fn in_thread(&self) -> bool {
552        unsafe { capi::pa_threaded_mainloop_in_thread(self._inner.get_ptr()) != 0 }
553    }
554
555    /// Sets the name of the thread.
556    pub fn set_name(&mut self, name: &str) {
557        // Warning: New CStrings will be immediately freed if not bound to a variable, leading to
558        // as_ptr() giving dangling pointers!
559        let c_name = CString::new(name).unwrap();
560        unsafe { capi::pa_threaded_mainloop_set_name(self._inner.get_ptr(), c_name.as_ptr()); }
561    }
562}