calloop-subproc 1.0.0

Subprocess support for the Calloop event loop
Documentation
//! Common utilities for testing our subprocess event sources.

// Necessary because of how Cargo handles integration tests. See:
// https://doc.rust-lang.org/book/ch11-03-test-organization.html
#![allow(dead_code)]

use calloop::{EventLoop, EventSource};
use std::thread::JoinHandle;
use test_binary::build_mock_binary_once;

build_mock_binary_once!(test_status);
build_mock_binary_once!(test_output);
build_mock_binary_once!(test_output_asap);

/// Generate a single command for the test status binary, given the exit status
/// we want for it.
pub fn test_status_cmd(status: i32) -> calloop_subproc::Command {
    calloop_subproc::Command::new(test_status_path()).with_args([format!("{}", status)])
}

/// Generate a single command for the test output binary.
pub fn test_output_cmd() -> calloop_subproc::Command {
    calloop_subproc::Command::new(test_output_path())
}

/// Generate a single command for the test output binary (fast version).
pub fn test_output_asap_cmd() -> calloop_subproc::Command {
    calloop_subproc::Command::new(test_output_asap_path())
}

/// The once-only value for initialising logging.
static INTEGRATION_TEST_LOGGING_INIT: std::sync::Once = std::sync::Once::new();

/// Initialise logging for the integration tests.
pub fn init_logging() {
    INTEGRATION_TEST_LOGGING_INIT.call_once(|| {
        env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace"))
            .is_test(true)
            .try_init()
            .expect("Could not initialise test logging");
    });
}

/// Our tests mostly do the same thing, run the Calloop event loop in a thread
/// and check that we get the expected events. This struct manages that,
/// stopping the loop and joining the thread when it's dropped, and acting as an
/// iterator over any generated events.
///
/// Note that each call to `next()` must be followed by a call to `send()`,
/// which injects the return value for the callback passed to the event source.
/// In the event of a panic before such a `send()` call, the channel end will be
/// dropped and the event source will also panic.
///
/// Note that the iterator simply passes through to a channel receiver's
/// `recv()` method under the hood, so iterating over it may block.
pub struct SpawnedCalloop<T, U> {
    runner: Option<std::thread::JoinHandle<()>>,
    stopper: calloop::ping::Ping,
    receiver: std::sync::mpsc::Receiver<T>,
    replier: Option<std::sync::mpsc::Sender<U>>,
}

impl<T: Send + 'static, U: Send + 'static> SpawnedCalloop<T, U> {
    /// Create a new [`SpawnedCalloop`] from a function that creates the event
    /// source you want to test.
    pub fn new<F, X>(creator: F) -> Self
    where
        F: FnOnce() -> X + Send + 'static,
        X: EventSource<Event = T, Ret = U>,
    {
        // So we can stop it from outside the thread.
        let (stopper, stop_src) = calloop::ping::make_ping().expect("Could not create ping");

        // So we can get data out and in.
        let (sender, receiver) = std::sync::mpsc::channel();
        let (replier, returner) = std::sync::mpsc::channel();

        // Spin up a thread to run Calloop.

        let runner = std::thread::spawn(move || {
            // Event source under test.
            let dut_source = creator();

            let mut event_loop: EventLoop<calloop::LoopSignal> =
                EventLoop::try_new().expect("Could not initialise event loop");

            let handle = event_loop.handle();

            // Stop on the ping.
            handle
                .insert_source(stop_src, |_, _, signaller| {
                    log::trace!("Received stop signal");
                    signaller.stop();
                })
                .expect("Could not insert ping source");

            // Run the updater event source.
            handle
                .insert_source(dut_source, |msg, _, _| {
                    // This could potentially block, but that's fine for a
                    // testing loop where we control the calls and can ensure we
                    // always pair them. If an assert causes a test to fail, the
                    // other end of this channel will be dropped in the drop
                    // handler and recv() will wake up with an error.
                    sender.send(msg).expect("Could not send event source data");
                    returner
                        .recv()
                        .expect("Could not receive event source return value")
                })
                .expect("Could not insert event source under test");

            event_loop
                .run(None, &mut event_loop.get_signal(), |_| {})
                .expect("Could not run event loop");
        });

        Self {
            runner: Some(runner),
            stopper,
            receiver,
            replier: Some(replier),
        }
    }
}

impl<T, U> SpawnedCalloop<T, U> {
    /// Send "return" values for the callback inserted into the test loop.
    pub fn send(&self, val: U) {
        self.replier
            .as_ref()
            .expect("Reply side of channel should still exist")
            .send(val)
            .expect("Could not send reply value to test loop");
    }
}

impl<T, U> Iterator for SpawnedCalloop<T, U> {
    type Item = T;

    fn next(&mut self) -> Option<Self::Item> {
        self.receiver.recv().ok()
    }
}

impl<T, U> Drop for SpawnedCalloop<T, U> {
    fn drop(&mut self) {
        self.stopper.ping();
        self.replier.take();
        self.runner.take().map(JoinHandle::join);
    }
}