[][src]Module libpulse_binding::mainloop::threaded

A variation of the standard main loop implementation, using a background thread.

Overview

The threaded main loop implementation is a special version of the standard main loop implementation. For the basic design, see the standard main loop documentation (mainloop::standard).

The added feature in the threaded main loop is that it spawns a new thread that runs the real main loop in the background. This allows a synchronous application to use the asynchronous API without risking stalling the PulseAudio library. A few synchronization primitives are available to access the objects attached to the event loop safely.

Creation

A Mainloop object is created using Mainloop::new. This will only allocate the required structures though, so to use it the thread must also be started. This is done through Mainloop::start, after which you can start using the main loop.

Destruction

When the PulseAudio connection has been terminated, the thread must be stopped and the resources freed. Stopping the thread is done using Mainloop::stop, which must be called without the lock (see below) held. When that function returns, the thread is stopped and the Mainloop object can be destroyed.

Destruction of the Mainloop object is done automatically when the object falls out of scope. (Rust’s Drop trait has been implemented and takes care of it).

Locking

Since the PulseAudio API doesn’t allow concurrent accesses to objects, a locking scheme must be used to guarantee safe usage. The threaded main loop API provides such a scheme through the functions Mainloop::lock and Mainloop::unlock.

The lock is recursive, so it’s safe to use it multiple times from the same thread. Just make sure you call Mainloop::unlock the same number of times you called Mainloop::lock.

The lock needs to be held whenever you call any PulseAudio function that uses an object associated with this main loop. Make sure you do not hold on to the lock more than necessary though, as the threaded main loop stops while the lock is held.

Example:

This example is not tested
extern crate libpulse_binding as pulse;

use std::rc::Rc;
use std::cell::RefCell;
use pulse::mainloop::threaded::Mainloop;
use pulse::stream::{Stream, State};

fn check_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
    m.borrow_mut().lock();

    let state = s.borrow().get_state();

    m.borrow_mut().unlock();

    match state {
        State::Ready => { println!("Stream is ready!"); },
        _ => { println!("Stream is not ready!"); },
    }
}

Callbacks

Callbacks in PulseAudio are asynchronous, so they require extra care when using them together with a threaded main loop.

The easiest way to turn the callback based operations into synchronous ones, is to simply wait for the callback to be called and continue from there. This is the approach chosen in PulseAudio’s threaded API.

Basic callbacks

For the basic case, where all that is required is to wait for the callback to be invoked, the code should look something like this:

Example:

This example is not tested
extern crate libpulse_binding as pulse;

use std::rc::Rc;
use std::cell::RefCell;
use pulse::mainloop::threaded::Mainloop;
use pulse::operation::State;
use pulse::stream::Stream;

fn drain_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
    m.borrow_mut().lock();

    // Drain
    let o = {
        let ml_ref = Rc::clone(&m);
        s.borrow_mut().drain(Some(Box::new(move |_success: bool| {
            unsafe { (*ml_ref.as_ptr()).signal(false); }
        })))
    };
    while o.get_state() != pulse::operation::State::Done {
        m.borrow_mut().wait();
    }

    m.borrow_mut().unlock();
}

The function drain_stream will wait for the callback to be called using Mainloop::wait.

If your application is multi-threaded, then this waiting must be done inside a while loop. The reason for this is that multiple threads might be using Mainloop::wait at the same time. Each thread must therefore verify that it was its callback that was invoked. Also the underlying OS synchronization primitives are usually not free of spurious wake-ups, so a Mainloop::wait must be called within a loop even if you have only one thread waiting.

The callback my_drain_callback indicates to the main function that it has been called using Mainloop::signal.

As you can see, Mainloop::wait may only be called with the lock held. The same thing is true for Mainloop::signal, but as the lock is held before the callback is invoked, you do not have to deal with that.

The functions will not dead lock because the wait function will release the lock before waiting and then regrab it once it has been signalled. For those of you familiar with threads, the behaviour is that of a condition variable.

Data callbacks

For many callbacks, simply knowing that they have been called is insufficient. The callback also receives some data that is desired. To access this data safely, we must extend our example a bit:

This example is not tested
extern crate libpulse_binding as pulse;

use std::rc::Rc;
use std::cell::RefCell;
use std::sync::atomic::{AtomicBool, Ordering};
use pulse::mainloop::threaded::Mainloop;
use pulse::stream::Stream;

// A data structure to capture all our data in (currently just a pointer to a bool)
struct DrainCbData(*mut bool);

fn drain_stream(m: Rc<RefCell<Mainloop>>, s: Rc<RefCell<Stream>>) {
    m.borrow_mut().lock();

    // For guarding against spurious wakeups
    // Possibly also needed for memory flushing and ordering control
    let mut guard = Rc::new(RefCell::new(AtomicBool::new(true)));

    let mut data: Rc<RefCell<Option<DrainCbData>>> = Rc::new(RefCell::new(None));

    // Drain
    let o = {
        let ml_ref = Rc::clone(&m);
        let guard_ref = Rc::clone(&guard);
        let data_ref = Rc::clone(&data);
        s.borrow_mut().drain(Some(Box::new(move |mut success: bool| {
            unsafe {
                *data_ref.as_ptr() = Some(DrainCbData(&mut success));
                (*guard_ref.as_ptr()).store(false, Ordering::Release);
                (*ml_ref.as_ptr()).signal(true);
            }
        })))
    };
    while guard.borrow().load(Ordering::Acquire) {
        m.borrow_mut().wait();
    }

    assert!(!data.borrow().is_none());
    let success = unsafe { *(data.borrow_mut().take().unwrap().0) };

    // Allow callback to continue now
    m.borrow_mut().accept();

    match success {
        false => { println!("Bitter defeat..."); },
        true => { println!("Success!"); },
    }

    m.borrow_mut().unlock();
}

The example is a bit silly as it would have been more simple to just copy the contents of success, but for larger data structures this can be wasteful.

The difference here compared to the basic callback is the value true passed to Mainloop::signal and the call to Mainloop::accept. What will happen is that Mainloop::signal will signal the main function and then wait. The main function is then free to use the data in the callback until Mainloop::accept is called, which will allow the callback to continue.

Note that Mainloop::accept must be called some time between exiting the while loop and unlocking the main loop! Failure to do so will result in a race condition. I.e. it is not okay to release the lock and regrab it before calling Mainloop::accept.

Asynchronous callbacks

PulseAudio also has callbacks that are completely asynchronous, meaning that they can be called at any time. The threaded main loop API provides the locking mechanism to handle concurrent accesses, but nothing else. Applications will have to handle communication from the callback to the main program through their own mechanisms.

The callbacks that are completely asynchronous are:

  • State callbacks for contexts, streams, etc.
  • Subscription notifications.

Example

An example program using the threaded mainloop:

extern crate libpulse_binding as pulse;

use std::rc::Rc;
use std::cell::RefCell;
use std::ops::Deref;
use pulse::mainloop::threaded::Mainloop;
use pulse::context::Context;
use pulse::stream::Stream;
use pulse::proplist::Proplist;
use pulse::mainloop::api::Mainloop as MainloopTrait; //Needs to be in scope

fn main() {
    let spec = pulse::sample::Spec {
        format: pulse::sample::SAMPLE_S16NE,
        channels: 2,
        rate: 44100,
    };
    assert!(spec.is_valid());

    let mut proplist = Proplist::new().unwrap();
    proplist.set_str(pulse::proplist::properties::APPLICATION_NAME, "FooApp")
        .unwrap();

    let mut mainloop = Rc::new(RefCell::new(Mainloop::new()
        .expect("Failed to create mainloop")));

    let mut context = Rc::new(RefCell::new(Context::new_with_proplist(
        mainloop.borrow().deref(),
        "FooAppContext",
        &proplist
        ).expect("Failed to create new context")));

    // Context state change callback
    {
        let ml_ref = Rc::clone(&mainloop);
        let context_ref = Rc::clone(&context);
        context.borrow_mut().set_state_callback(Some(Box::new(move || {
            let state = unsafe { (*context_ref.as_ptr()).get_state() };
            match state {
                pulse::context::State::Ready |
                pulse::context::State::Failed |
                pulse::context::State::Terminated => {
                    unsafe { (*ml_ref.as_ptr()).signal(false); }
                },
                _ => {},
            }
        })));
    }

    context.borrow_mut().connect(None, pulse::context::flags::NOFLAGS, None)
        .expect("Failed to connect context");

    mainloop.borrow_mut().lock();
    mainloop.borrow_mut().start().expect("Failed to start mainloop");

    // Wait for context to be ready
    loop {
        match context.borrow().get_state() {
            pulse::context::State::Ready => { break; },
            pulse::context::State::Failed |
            pulse::context::State::Terminated => {
                eprintln!("Context state failed/terminated, quitting...");
                mainloop.borrow_mut().unlock();
                mainloop.borrow_mut().stop();
                return;
            },
            _ => { mainloop.borrow_mut().wait(); },
        }
    }
    context.borrow_mut().set_state_callback(None);

    let mut stream = Rc::new(RefCell::new(Stream::new(
        &mut context.borrow_mut(),
        "Music",
        &spec,
        None
        ).expect("Failed to create new stream")));

    // Stream state change callback
    {
        let ml_ref = Rc::clone(&mainloop);
        let stream_ref = Rc::clone(&stream);
        stream.borrow_mut().set_state_callback(Some(Box::new(move || {
            let state = unsafe { (*stream_ref.as_ptr()).get_state() };
            match state {
                pulse::stream::State::Ready |
                pulse::stream::State::Failed |
                pulse::stream::State::Terminated => {
                    unsafe { (*ml_ref.as_ptr()).signal(false); }
                },
                _ => {},
            }
        })));
    }

    stream.borrow_mut().connect_playback(None, None, pulse::stream::flags::START_CORKED,
        None, None).expect("Failed to connect playback");

    // Wait for stream to be ready
    loop {
        match stream.borrow().get_state() {
            pulse::stream::State::Ready => { break; },
            pulse::stream::State::Failed |
            pulse::stream::State::Terminated => {
                eprintln!("Stream state failed/terminated, quitting...");
                mainloop.borrow_mut().unlock();
                mainloop.borrow_mut().stop();
                return;
            },
            _ => { mainloop.borrow_mut().wait(); },
        }
    }
    stream.borrow_mut().set_state_callback(None);

    mainloop.borrow_mut().unlock();

    // Our main loop
    loop {
        mainloop.borrow_mut().lock();

        // Write some data with stream.write()

        if stream.borrow().is_corked().unwrap() {
            stream.borrow_mut().uncork(None);
        }

        // Drain
        let o = {
            let ml_ref = Rc::clone(&mainloop);
            stream.borrow_mut().drain(Some(Box::new(move |_success: bool| {
                unsafe { (*ml_ref.as_ptr()).signal(false); }
            })))
        };
        while o.get_state() != pulse::operation::State::Done {
            mainloop.borrow_mut().wait();
        }

        mainloop.borrow_mut().unlock();

        // If done writing data, call `mainloop.borrow_mut().stop()` (with lock released), then
        // break!
    }

    // Clean shutdown
    mainloop.borrow_mut().lock();
    stream.borrow_mut().disconnect().unwrap();
    mainloop.borrow_mut().unlock();
}

Structs

Mainloop

This acts as a safe interface to the internal PA Mainloop.

MainloopInternal

An opaque threaded main loop object.