#[derive(Debug, Clone, Abomonation, Hash, Ord, PartialOrd, Eq, PartialEq)]
pub enum Event<T, D> {
Progress(Vec<(T, i64)>),
Messages(T, Vec<D>),
}
pub trait EventIterator<T, D> {
fn next(&mut self) -> Option<&Event<T, D>>;
}
pub trait EventPusher<T, D> {
fn push(&mut self, event: Event<T, D>);
}
impl<T, D> EventPusher<T, D> for ::std::sync::mpsc::Sender<Event<T, D>> {
fn push(&mut self, event: Event<T, D>) {
let _ = self.send(event);
}
}
pub mod link {
use std::rc::Rc;
use std::cell::RefCell;
use super::{Event, EventPusher, EventIterator};
pub struct EventLink<T, D> {
pub event: Option<Event<T, D>>,
pub next: RefCell<Option<Rc<EventLink<T, D>>>>,
}
impl<T, D> EventLink<T, D> {
pub fn new() -> EventLink<T, D> {
EventLink { event: None, next: RefCell::new(None) }
}
}
impl<T, D> EventPusher<T, D> for Rc<EventLink<T, D>> {
fn push(&mut self, event: Event<T, D>) {
*self.next.borrow_mut() = Some(Rc::new(EventLink { event: Some(event), next: RefCell::new(None) }));
let next = self.next.borrow().as_ref().unwrap().clone();
*self = next;
}
}
impl<T, D> EventIterator<T, D> for Rc<EventLink<T, D>> {
fn next(&mut self) -> Option<&Event<T, D>> {
let is_some = self.next.borrow().is_some();
if is_some {
let next = self.next.borrow().as_ref().unwrap().clone();
*self = next;
self.event.as_ref()
}
else {
None
}
}
}
impl<T, D> Drop for EventLink<T, D> {
fn drop(&mut self) {
while let Some(link) = self.next.replace(None) {
if let Ok(head) = Rc::try_unwrap(link) {
*self = head;
}
}
}
}
#[test]
fn avoid_stack_overflow_in_drop() {
let mut event1 = Rc::new(EventLink::<(),()>::new());
let _event2 = event1.clone();
for _ in 0 .. 1_000_000 {
event1.push(Event::Progress(vec![]));
}
}
}
pub mod binary {
use std::io::Write;
use abomonation::Abomonation;
use super::{Event, EventPusher, EventIterator};
pub struct EventWriter<T, D, W: ::std::io::Write> {
stream: W,
phant: ::std::marker::PhantomData<(T,D)>,
}
impl<T, D, W: ::std::io::Write> EventWriter<T, D, W> {
pub fn new(w: W) -> EventWriter<T, D, W> {
EventWriter {
stream: w,
phant: ::std::marker::PhantomData,
}
}
}
impl<T: Abomonation, D: Abomonation, W: ::std::io::Write> EventPusher<T, D> for EventWriter<T, D, W> {
fn push(&mut self, event: Event<T, D>) {
unsafe { ::abomonation::encode(&event, &mut self.stream).expect("Event abomonation/write failed"); }
}
}
pub struct EventReader<T, D, R: ::std::io::Read> {
reader: R,
bytes: Vec<u8>,
buff1: Vec<u8>,
buff2: Vec<u8>,
consumed: usize,
valid: usize,
phant: ::std::marker::PhantomData<(T,D)>,
}
impl<T, D, R: ::std::io::Read> EventReader<T, D, R> {
pub fn new(r: R) -> EventReader<T, D, R> {
EventReader {
reader: r,
bytes: vec![0u8; 1 << 20],
buff1: vec![],
buff2: vec![],
consumed: 0,
valid: 0,
phant: ::std::marker::PhantomData,
}
}
}
impl<T: Abomonation, D: Abomonation, R: ::std::io::Read> EventIterator<T, D> for EventReader<T, D, R> {
fn next(&mut self) -> Option<&Event<T, D>> {
if unsafe { ::abomonation::decode::<Event<T,D>>(&mut self.buff1[self.consumed..]) }.is_some() {
let (item, rest) = unsafe { ::abomonation::decode::<Event<T,D>>(&mut self.buff1[self.consumed..]) }.unwrap();
self.consumed = self.valid - rest.len();
return Some(item);
}
if self.consumed > 0 {
self.buff2.clear();
self.buff2.write_all(&self.buff1[self.consumed..]).unwrap();
::std::mem::swap(&mut self.buff1, &mut self.buff2);
self.valid = self.buff1.len();
self.consumed = 0;
}
if let Ok(len) = self.reader.read(&mut self.bytes[..]) {
self.buff1.write_all(&self.bytes[..len]).unwrap();
self.valid = self.buff1.len();
}
None
}
}
}