use data::*;
use control::*;
use super::{thread_namer, SendData};
use std::thread::{Builder, JoinHandle};
use futures::{Future, Sink};
use futures::sync::mpsc::*;
use errors::ChaseError;
impl Chaser {
pub fn run_stream(
mut self,
) -> Result<(Receiver<SendData>, JoinHandle<Result<(), ChaseError>>), ChaseError> {
let (mut tx, rx) = channel(0);
let join_handle = Builder::new()
.name(thread_namer(&self.path))
.spawn(move || {
self.run(|line, num, pos| {
let next_tx = tx.clone().send((line.to_string(), num, pos)).wait()?;
tx = next_tx;
Ok(Control::Continue)
})?;
Ok(())
})?;
Ok((rx, join_handle))
}
}
#[cfg(test)]
mod tests {
use super::super::super::data::*;
use tempdir::*;
use std::io::Write;
use futures::{Future, Stream};
use futures::future;
use std::thread::sleep;
use std::time::Duration;
use std::fs::{rename, OpenOptions};
#[test]
fn run_stream_test() {
let temp_dir = TempDir::new("chase-test-stream").unwrap();
let file_path = temp_dir.path().join("test.log");
let chaser = Chaser::new(&file_path);
let mut file_write = OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(&file_path)
.unwrap();
let (stream, _) = chaser.run_stream().unwrap();
write!(file_write, "Hello, world 1\n").unwrap();
write!(file_write, "Hello, world 2\n").unwrap();
write!(file_write, "Hello, world 3\n").unwrap();
let accumulated = stream
.take(4) .fold(String::new(), |mut acc, (line, _, _)| {
acc.push_str(&line);
future::ok(acc)
});
sleep(Duration::from_millis(250));
let mut file_write_new = {
rename(&file_path, temp_dir.path().join("test.log.bk")).unwrap();
OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(&file_path)
.unwrap()
};
write!(file_write_new, "Hello, world 4\n").unwrap();
assert_eq!(
accumulated.wait(),
Ok("Hello, world 1Hello, world 2Hello, world 3Hello, world 4".to_string())
);
drop(file_write);
temp_dir.close().unwrap();
}
}