conch-runtime 0.1.2

A library for evaluating/executing programs written in the shell programming language.
Documentation
#![cfg(unix)]

extern crate conch_runtime;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

#[macro_use]
mod support;
pub use self::support::*;

use conch_runtime::env::AsyncIoEnvironment;
use conch_runtime::io::{FileDesc, Pipe};
use conch_runtime::os::unix::env::EventedAsyncIoEnv;
use conch_runtime::os::unix::io::{FileDescExt, MaybeEventedFd};
use std::fs::File;
use std::io::{ErrorKind, Read, Result, Write};
use std::time::Duration;
use std::thread;
use tokio_io::AsyncRead;
use tokio_io::io::read_to_end;
use tokio_core::reactor::Core;

struct TimesRead<R> {
    times_read: usize,
    times_would_block: usize,
    reader: R,
}

impl<R> TimesRead<R> {
    fn new(reader: R) -> Self {
        TimesRead {
            times_read: 0,
            times_would_block: 0,
            reader: reader,
        }
    }
}

impl<R: AsyncRead> AsyncRead for TimesRead<R> {}
impl<R: Read> Read for TimesRead<R> {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        match self.reader.read(buf) {
            ret@Ok(0) => ret,
            ret@Ok(_) => {
                self.times_read += 1;
                ret
            },
            Err(e) => {
                if e.kind() == ErrorKind::WouldBlock {
                    self.times_would_block += 1;
                }
                Err(e)
            },
        }
    }
}

#[test]
fn evented_is_async() {
    let msg = "hello world";

    let Pipe { reader, mut writer } = Pipe::new().expect("failed to create pipe");

    let mut lp = Core::new().expect("failed to create event loop");
    let reader = reader.into_evented2(&lp.handle())
        .expect("failed to register reader with event loop");

    let reader = if let MaybeEventedFd::Registered(fd) = reader {
        fd
    } else {
        panic!("unexpected result: {:#?}", reader);
    };

    let join_handle = thread::spawn(move || {
        // Give the future a chance to block for the first time
        thread::sleep(Duration::from_millis(10));
        for c in msg.as_bytes() {
            writer.write(&[*c]).expect("failed to write byte");
            // Give event loop a chance to settle and read data one byte at a time
            thread::sleep(Duration::from_millis(10));
        }
    });

    let (tr, data) = lp.run(read_to_end(TimesRead::new(reader), vec!()))
        .map(|(tr, data)| (tr, String::from_utf8(data).expect("invaild utf8")))
        .expect("future did not exit successfully");

    join_handle.join().expect("thread did not exit successfully");

    assert_eq!(data, msg);

    // NB: we used to assert the number of times read equals the number of bytes
    // in the message, but due to seeing some sporadic failures here in the CI,
    // it's probably good enough to ensure we didn't run in a single read.
    assert!(tr.times_read > 1);
    assert!(tr.times_would_block > 1);
}

#[test]
fn evented_supports_regular_files() {
    let tempdir = mktmp!();
    let path = tempdir.path().join("sample_file");

    let msg = "hello\nworld\n";

    let mut lp = Core::new().expect("failed to create event loop");
    let mut env = EventedAsyncIoEnv::new(lp.remote());

    // Test spawning directly within the event loop
    lp.run(futures::lazy(|| {
        let fd = File::create(&path)
            .map(FileDesc::from)
            .expect("failed to create file");

        env.write_all(fd, msg.to_owned().into_bytes())
    })).expect("failed to write data");

    // Test spawning outside of the event loop
    let fd = File::open(path)
        .map(FileDesc::from)
        .expect("failed to open file");

    let data = lp.run(read_to_end(env.read_async(fd), vec!()))
        .map(|(_, data)| String::from_utf8(data).expect("invaild utf8"))
        .expect("future did not exit successfully");

    assert_eq!(data, msg);
}