libpulse_binding/mainloop/threaded.rs
1// Copyright 2017 Lyndon Brown
2//
3// This file is part of the PulseAudio Rust language binding.
4//
5// Licensed under the MIT license or the Apache license (version 2.0), at your option. You may not
6// copy, modify, or distribute this file except in compliance with said license. You can find copies
7// of these licenses either in the LICENSE-MIT and LICENSE-APACHE files, or alternatively at
8// <http://opensource.org/licenses/MIT> and <http://www.apache.org/licenses/LICENSE-2.0>
9// respectively.
10//
11// Portions of documentation are copied from the LGPL 2.1+ licensed PulseAudio C headers on a
12// fair-use basis, as discussed in the overall project readme (available in the git repository).
13
14//! A variation of the standard main loop implementation, using a background thread.
15//!
16//! # Overview
17//!
18//! The threaded main loop implementation is a special version of the standard main loop
19//! implementation. For the basic design, see the standard main loop documentation
20//! ([`mainloop::standard`](mod@crate::mainloop::standard)).
21//!
22//! The added feature in the threaded main loop is that it spawns a new thread that runs the real
23//! main loop in the background. This allows a synchronous application to use the asynchronous API
24//! without risking stalling the PulseAudio library. A few synchronization primitives are available
25//! to access the objects attached to the event loop safely.
26//!
27//! # Creation
28//!
29//! A [`Mainloop`] object is created using [`Mainloop::new()`]. This will only allocate the required
30//! structures though, so to use it the thread must also be started. This is done through
31//! [`Mainloop::start()`], after which you can start using the main loop.
32//!
33//! # Destruction
34//!
35//! When the PulseAudio connection has been terminated, the thread must be stopped and the
36//! resources freed. Stopping the thread is done using [`Mainloop::stop()`], which must be called
37//! without the lock (see below) held. When that function returns, the thread is stopped and the
38//! [`Mainloop`] object can be destroyed.
39//!
40//! Destruction of the [`Mainloop`] object is done automatically when the object falls out of scope.
41//! (Rust’s `Drop` trait has been implemented and takes care of it).
42//!
43//! # Locking
44//!
45//! Since the PulseAudio API doesn’t allow concurrent accesses to objects, a locking scheme must be
46//! used to guarantee safe usage. The threaded main loop API provides such a scheme through the
47//! functions [`Mainloop::lock()`] and [`Mainloop::unlock()`].
48//!
49//! The lock is recursive, so it’s safe to use it multiple times from the same thread. Just make
50//! sure you call [`Mainloop::unlock()`] the same number of times you called [`Mainloop::lock()`].
51//!
52//! The lock needs to be held whenever you call any PulseAudio function that uses an object
53//! associated with this main loop. Those objects include the mainloop, context, stream and
54//! operation objects, and the various event objects (io, time, defer). Make sure you do not hold on
55//! to the lock more than necessary though, as the threaded main loop stops while the lock is held.
56//!
57//! Example:
58//!
59//! ```rust,no_run
60//! extern crate libpulse_binding as pulse;
61//!
62//! use std::rc::Rc;
63//! use std::cell::RefCell;
64//! use pulse::mainloop::threaded::Mainloop;
65//! use pulse::stream::{Stream, State};
66//!
67//! fn check_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
68//! m.borrow_mut().lock();
69//!
70//! let state = s.borrow().get_state();
71//!
72//! m.borrow_mut().unlock();
73//!
74//! match state {
75//! State::Ready => { println!("Stream is ready!"); },
76//! _ => { println!("Stream is not ready!"); },
77//! }
78//! }
79//! ```
80//!
81//! # Callbacks
82//!
83//! Callbacks in PulseAudio are asynchronous, so they require extra care when using them together
84//! with a threaded main loop.
85//!
86//! The easiest way to turn the callback based operations into synchronous ones, is to simply wait
87//! for the callback to be called and continue from there. This is the approach chosen in
88//! PulseAudio’s threaded API.
89//!
90//! ## Basic callbacks
91//!
92//! For the basic case, where all that is required is to wait for the callback to be invoked, the
93//! code should look something like this:
94//!
95//! Example:
96//!
97//! ```rust,no_run
98//! extern crate libpulse_binding as pulse;
99//!
100//! use std::rc::Rc;
101//! use std::cell::RefCell;
102//! use pulse::mainloop::threaded::Mainloop;
103//! use pulse::operation::State;
104//! use pulse::stream::Stream;
105//!
106//! fn drain_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
107//! m.borrow_mut().lock();
108//!
109//! // Drain
110//! let o = {
111//! let ml_ref = Rc::clone(&m);
112//! s.borrow_mut().drain(Some(Box::new(move |_success: bool| {
113//! unsafe { (*ml_ref.as_ptr()).signal(false); }
114//! })))
115//! };
116//! while o.get_state() != pulse::operation::State::Done {
117//! m.borrow_mut().wait();
118//! }
119//!
120//! m.borrow_mut().unlock();
121//! }
122//! ```
123//!
124//! The function `drain_stream` will wait for the callback to be called using [`Mainloop::wait()`].
125//!
126//! If your application is multi-threaded, then this waiting must be done inside a while loop. The
127//! reason for this is that multiple threads might be using [`Mainloop::wait()`] at the same time.
128//! Each thread must therefore verify that it was its callback that was invoked. Also the underlying
129//! OS synchronization primitives are usually not free of spurious wake-ups, so a
130//! [`Mainloop::wait()`] must be called within a loop even if you have only one thread waiting.
131//!
132//! The callback `my_drain_callback` indicates to the main function that it has been called using
133//! [`Mainloop::signal()`].
134//!
135//! As you can see, [`Mainloop::wait()`] may only be called with the lock held. The same thing is
136//! true for [`Mainloop::signal()`], but as the lock is held before the callback is invoked, you do
137//! not have to deal with that.
138//!
139//! The functions will not dead lock because the wait function will release the lock before waiting
140//! and then regrab it once it has been signalled. For those of you familiar with threads, the
141//! behaviour is that of a condition variable.
142//!
143//! ## Data callbacks
144//!
145//! For many callbacks, simply knowing that they have been called is insufficient. The callback also
146//! receives some data that is desired. To access this data safely, we must extend our example a
147//! bit:
148//!
149//! ```rust,no_run
150//! extern crate libpulse_binding as pulse;
151//!
152//! use std::rc::Rc;
153//! use std::cell::RefCell;
154//! use std::sync::atomic::{AtomicBool, Ordering};
155//! use pulse::mainloop::threaded::Mainloop;
156//! use pulse::stream::Stream;
157//!
158//! // A data structure to capture all our data in (currently just a pointer to a bool)
159//! struct DrainCbData(*mut bool);
160//!
161//! fn drain_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
162//! m.borrow_mut().lock();
163//!
164//! // For guarding against spurious wakeups
165//! // Possibly also needed for memory flushing and ordering control
166//! let mut guard = Rc::new(RefCell::new(AtomicBool::new(true)));
167//!
168//! let mut data: Rc<RefCell<Option<DrainCbData>>> = Rc::new(RefCell::new(None));
169//!
170//! // Drain
171//! let o = {
172//! let ml_ref = Rc::clone(&m);
173//! let guard_ref = Rc::clone(&guard);
174//! let data_ref = Rc::clone(&data);
175//! s.borrow_mut().drain(Some(Box::new(move |mut success: bool| {
176//! unsafe {
177//! *data_ref.as_ptr() = Some(DrainCbData(&mut success));
178//! (*guard_ref.as_ptr()).store(false, Ordering::Release);
179//! (*ml_ref.as_ptr()).signal(true);
180//! }
181//! })))
182//! };
183//! while guard.borrow().load(Ordering::Acquire) {
184//! m.borrow_mut().wait();
185//! }
186//!
187//! assert!(!data.borrow().is_none());
188//! let success = unsafe { *(data.borrow_mut().take().unwrap().0) };
189//!
190//! // Allow callback to continue now
191//! m.borrow_mut().accept();
192//!
193//! match success {
194//! false => { println!("Bitter defeat..."); },
195//! true => { println!("Success!"); },
196//! }
197//!
198//! m.borrow_mut().unlock();
199//! }
200//! ```
201//!
202//! The example is a bit silly as it would have been more simple to just copy the contents of
203//! `success`, but for larger data structures this can be wasteful.
204//!
205//! The difference here compared to the basic callback is the value `true` passed to
206//! [`Mainloop::signal()`] and the call to [`Mainloop::accept()`]. What will happen is that
207//! [`Mainloop::signal()`] will signal the main function and then wait. The main function is then
208//! free to use the data in the callback until [`Mainloop::accept()`] is called, which will allow
209//! the callback to continue.
210//!
211//! Note that [`Mainloop::accept()`] must be called some time between exiting the while loop and
212//! unlocking the main loop! Failure to do so will result in a race condition. I.e. it is not okay
213//! to release the lock and regrab it before calling [`Mainloop::accept()`].
214//!
215//! ## Asynchronous callbacks
216//!
217//! PulseAudio also has callbacks that are completely asynchronous, meaning that they can be called
218//! at any time. The threaded main loop API provides the locking mechanism to handle concurrent
219//! accesses, but nothing else. Applications will have to handle communication from the callback to
220//! the main program through their own mechanisms.
221//!
222//! The callbacks that are completely asynchronous are:
223//!
224//! * State callbacks for contexts, streams, etc.
225//! * Subscription notifications.
226//!
227//! # Example
228//!
229//! An example program using the threaded mainloop:
230//!
231//! ```rust
232//! extern crate libpulse_binding as pulse;
233//!
234//! use std::rc::Rc;
235//! use std::cell::RefCell;
236//! use std::ops::Deref;
237//! use pulse::mainloop::threaded::Mainloop;
238//! use pulse::context::{Context, FlagSet as ContextFlagSet};
239//! use pulse::stream::{Stream, FlagSet as StreamFlagSet};
240//! use pulse::sample::{Spec, Format};
241//! use pulse::proplist::Proplist;
242//! use pulse::mainloop::api::Mainloop as MainloopTrait; //Needs to be in scope
243//!
244//! fn main() {
245//! let spec = Spec {
246//! format: Format::S16NE,
247//! channels: 2,
248//! rate: 44100,
249//! };
250//! assert!(spec.is_valid());
251//!
252//! let mut proplist = Proplist::new().unwrap();
253//! proplist.set_str(pulse::proplist::properties::APPLICATION_NAME, "FooApp")
254//! .unwrap();
255//!
256//! let mut mainloop = Rc::new(RefCell::new(Mainloop::new()
257//! .expect("Failed to create mainloop")));
258//!
259//! let mut context = Rc::new(RefCell::new(Context::new_with_proplist(
260//! mainloop.borrow().deref(),
261//! "FooAppContext",
262//! &proplist
263//! ).expect("Failed to create new context")));
264//!
265//! // Context state change callback
266//! {
267//! let ml_ref = Rc::clone(&mainloop);
268//! let context_ref = Rc::clone(&context);
269//! context.borrow_mut().set_state_callback(Some(Box::new(move || {
270//! let state = unsafe { (*context_ref.as_ptr()).get_state() };
271//! match state {
272//! pulse::context::State::Ready |
273//! pulse::context::State::Failed |
274//! pulse::context::State::Terminated => {
275//! unsafe { (*ml_ref.as_ptr()).signal(false); }
276//! },
277//! _ => {},
278//! }
279//! })));
280//! }
281//!
282//! context.borrow_mut().connect(None, ContextFlagSet::NOFLAGS, None)
283//! .expect("Failed to connect context");
284//!
285//! mainloop.borrow_mut().lock();
286//! mainloop.borrow_mut().start().expect("Failed to start mainloop");
287//!
288//! // Wait for context to be ready
289//! loop {
290//! match context.borrow().get_state() {
291//! pulse::context::State::Ready => { break; },
292//! pulse::context::State::Failed |
293//! pulse::context::State::Terminated => {
294//! eprintln!("Context state failed/terminated, quitting...");
295//! mainloop.borrow_mut().unlock();
296//! mainloop.borrow_mut().stop();
297//! return;
298//! },
299//! _ => { mainloop.borrow_mut().wait(); },
300//! }
301//! }
302//! context.borrow_mut().set_state_callback(None);
303//!
304//! let mut stream = Rc::new(RefCell::new(Stream::new(
305//! &mut context.borrow_mut(),
306//! "Music",
307//! &spec,
308//! None
309//! ).expect("Failed to create new stream")));
310//!
311//! // Stream state change callback
312//! {
313//! let ml_ref = Rc::clone(&mainloop);
314//! let stream_ref = Rc::clone(&stream);
315//! stream.borrow_mut().set_state_callback(Some(Box::new(move || {
316//! let state = unsafe { (*stream_ref.as_ptr()).get_state() };
317//! match state {
318//! pulse::stream::State::Ready |
319//! pulse::stream::State::Failed |
320//! pulse::stream::State::Terminated => {
321//! unsafe { (*ml_ref.as_ptr()).signal(false); }
322//! },
323//! _ => {},
324//! }
325//! })));
326//! }
327//!
328//! stream.borrow_mut().connect_playback(None, None, StreamFlagSet::START_CORKED,
329//! None, None).expect("Failed to connect playback");
330//!
331//! // Wait for stream to be ready
332//! loop {
333//! match stream.borrow().get_state() {
334//! pulse::stream::State::Ready => { break; },
335//! pulse::stream::State::Failed |
336//! pulse::stream::State::Terminated => {
337//! eprintln!("Stream state failed/terminated, quitting...");
338//! mainloop.borrow_mut().unlock();
339//! mainloop.borrow_mut().stop();
340//! return;
341//! },
342//! _ => { mainloop.borrow_mut().wait(); },
343//! }
344//! }
345//! stream.borrow_mut().set_state_callback(None);
346//!
347//! mainloop.borrow_mut().unlock();
348//!
349//! // Our main logic (to output a stream of audio data)
350//! # let mut count = 0; // For automatic unit tests, we’ll spin a few times
351//! loop {
352//! mainloop.borrow_mut().lock();
353//!
354//! // Write some data with stream.write()
355//!
356//! if stream.borrow().is_corked().unwrap() {
357//! stream.borrow_mut().uncork(None);
358//! }
359//!
360//! // Drain
361//! let o = {
362//! let ml_ref = Rc::clone(&mainloop);
363//! stream.borrow_mut().drain(Some(Box::new(move |_success: bool| {
364//! unsafe { (*ml_ref.as_ptr()).signal(false); }
365//! })))
366//! };
367//! while o.get_state() != pulse::operation::State::Done {
368//! mainloop.borrow_mut().wait();
369//! }
370//!
371//! mainloop.borrow_mut().unlock();
372//!
373//! // If done writing data, call `mainloop.borrow_mut().stop()` (with lock released), then
374//! // break!
375//! #
376//! # // Hack: Stop test getting stuck in infinite loop!
377//! # count += 1;
378//! # if count == 3 {
379//! # mainloop.borrow_mut().stop();
380//! # break;
381//! # }
382//! }
383//!
384//! // Clean shutdown
385//! mainloop.borrow_mut().lock();
386//! stream.borrow_mut().disconnect().unwrap();
387//! mainloop.borrow_mut().unlock();
388//! }
389//! ```
390
391use std::rc::Rc;
392use std::ffi::CString;
393use crate::def;
394use crate::error::PAErr;
395use crate::mainloop::api::{MainloopInternalType, MainloopInner, MainloopInnerType, MainloopApi,
396 Mainloop as MainloopTrait};
397use crate::mainloop::signal::MainloopSignals;
398
399pub use capi::pa_threaded_mainloop as MainloopInternal;
400
401impl MainloopInternalType for MainloopInternal {}
402
403/// This acts as a safe interface to the internal PA Mainloop.
404///
405/// The mainloop object pointers are further enclosed here in a ref counted wrapper, allowing this
406/// outer wrapper to have clean methods for creating event objects, which can cleanly pass a copy of
407/// the inner ref counted mainloop object to them. Giving this to events serves two purposes,
408/// firstly because they need the API pointer, secondly, it ensures that event objects do not
409/// outlive the mainloop object.
410pub struct Mainloop {
411 /// The ref-counted inner data.
412 pub _inner: Rc<MainloopInner<MainloopInternal>>,
413}
414
415impl MainloopTrait for Mainloop {
416 type MI = MainloopInner<MainloopInternal>;
417
418 #[inline(always)]
419 fn inner(&self) -> Rc<MainloopInner<MainloopInternal>> {
420 Rc::clone(&self._inner)
421 }
422}
423
424impl MainloopSignals for Mainloop {}
425
426impl MainloopInner<MainloopInternal> {
427 #[inline(always)]
428 fn drop_actual(&mut self) {
429 unsafe { capi::pa_threaded_mainloop_free(self.get_ptr()) };
430 }
431}
432
433impl Mainloop {
434 /// Allocates a new threaded main loop object.
435 ///
436 /// You have to call [`start()`](Self::start) before the event loop thread starts running.
437 pub fn new() -> Option<Self> {
438 let ptr = unsafe { capi::pa_threaded_mainloop_new() };
439 if ptr.is_null() {
440 return None;
441 }
442 let api_ptr = unsafe { capi::pa_threaded_mainloop_get_api(ptr) };
443 assert!(!api_ptr.is_null());
444 let ml_inner = unsafe {
445 MainloopInner::<MainloopInternal>::new(ptr, std::mem::transmute(api_ptr),
446 MainloopInner::<MainloopInternal>::drop_actual, true)
447 };
448 Some(Self { _inner: Rc::new(ml_inner) })
449 }
450
451 /// Starts the event loop thread.
452 pub fn start(&mut self) -> Result<(), PAErr> {
453 match unsafe { capi::pa_threaded_mainloop_start(self._inner.get_ptr()) } {
454 0 => Ok(()),
455 e => Err(PAErr(e)),
456 }
457 }
458
459 /// Terminates the event loop thread cleanly.
460 ///
461 /// Make sure to unlock the mainloop object before calling this function.
462 #[inline]
463 pub fn stop(&mut self) {
464 unsafe { capi::pa_threaded_mainloop_stop(self._inner.get_ptr()); }
465 }
466
467 /// Locks the event loop object, effectively blocking the event loop thread from processing
468 /// events.
469 ///
470 /// You can use this to enforce exclusive access to all objects attached to the event loop. This
471 /// lock is recursive. This function may not be called inside the event loop thread. Events that
472 /// are dispatched from the event loop thread are executed with this lock held.
473 #[inline]
474 pub fn lock(&mut self) {
475 assert!(!self.in_thread(), "lock() can not be called from within the event loop thread!");
476 unsafe { capi::pa_threaded_mainloop_lock(self._inner.get_ptr()); }
477 }
478
479 /// Unlocks the event loop object, inverse of [`lock()`](Self::lock).
480 #[inline]
481 pub fn unlock(&mut self) {
482 unsafe { capi::pa_threaded_mainloop_unlock(self._inner.get_ptr()); }
483 }
484
485 /// Waits for an event to be signalled by the event loop thread.
486 ///
487 /// You can use this to pass data from the event loop thread to the main thread in a
488 /// synchronized fashion. This function may not be called inside the event loop thread. Prior to
489 /// this call the event loop object needs to be locked using [`lock()`]. While waiting the lock
490 /// will be released. Immediately before returning it will be acquired again. This function may
491 /// spuriously wake up even without [`signal()`] being called. You need to make sure to handle
492 /// that!
493 ///
494 /// [`lock()`]: Self::lock
495 /// [`signal()`]: Self::signal
496 #[inline]
497 pub fn wait(&mut self) {
498 unsafe { capi::pa_threaded_mainloop_wait(self._inner.get_ptr()); }
499 }
500
501 /// Signals all threads waiting for a signalling event in [`wait()`].
502 ///
503 /// If `wait_for_accept` is `true`, do not return before the signal was accepted by an
504 /// [`accept()`] call. While waiting for that condition the event loop object is unlocked.
505 ///
506 /// [`wait()`]: Self::wait
507 /// [`accept()`]: Self::accept
508 #[inline]
509 pub fn signal(&mut self, wait_for_accept: bool) {
510 unsafe {
511 capi::pa_threaded_mainloop_signal(self._inner.get_ptr(), wait_for_accept as i32);
512 }
513 }
514
515 /// Accepts a signal from the event thread issued with [`signal()`].
516 ///
517 /// This call should only be used in conjunction with [`signal()`] with `wait_for_accept` as
518 /// `true`.
519 ///
520 /// [`signal()`]: Self::signal
521 #[inline]
522 pub fn accept(&mut self) {
523 unsafe { capi::pa_threaded_mainloop_accept(self._inner.get_ptr()); }
524 }
525
526 /// Gets the return value as specified with the main loop’s `quit` routine (used internally by
527 /// threaded mainloop).
528 #[inline]
529 pub fn get_retval(&self) -> def::Retval {
530 def::Retval(unsafe { capi::pa_threaded_mainloop_get_retval(self._inner.get_ptr()) })
531 }
532
533 /// Gets the main loop abstraction layer vtable for this main loop.
534 ///
535 /// There is no need to free this object as it is owned by the loop and is destroyed when the
536 /// loop is freed.
537 ///
538 /// Talking to PA directly with C requires fetching this pointer explicitly via this function.
539 /// This is actually unnecessary through this binding. The pointer is retrieved automatically
540 /// upon Mainloop creation, stored internally, and automatically obtained from it by functions
541 /// that need it.
542 #[inline]
543 pub fn get_api<'a>(&self) -> &'a MainloopApi {
544 let ptr = self._inner.get_api_ptr();
545 assert_eq!(false, ptr.is_null());
546 unsafe { &*ptr }
547 }
548
549 /// Checks whether or not we are in the event loop thread (returns `true` if so).
550 #[inline]
551 pub fn in_thread(&self) -> bool {
552 unsafe { capi::pa_threaded_mainloop_in_thread(self._inner.get_ptr()) != 0 }
553 }
554
555 /// Sets the name of the thread.
556 pub fn set_name(&mut self, name: &str) {
557 // Warning: New CStrings will be immediately freed if not bound to a variable, leading to
558 // as_ptr() giving dangling pointers!
559 let c_name = CString::new(name).unwrap();
560 unsafe { capi::pa_threaded_mainloop_set_name(self._inner.get_ptr(), c_name.as_ptr()); }
561 }
562}