use columnar::Columnar;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize, Columnar)]
pub enum Event<T, C> {
Progress(Vec<(T, i64)>),
Messages(T, C),
}
pub trait EventIterator<T: Clone, C: Clone> {
fn next(&mut self) -> Option<std::borrow::Cow<'_, Event<T, C>>>;
}
pub trait EventPusher<T, C> {
fn push(&mut self, event: Event<T, C>);
}
impl<T, C> EventPusher<T, C> for ::std::sync::mpsc::Sender<Event<T, C>> {
fn push(&mut self, event: Event<T, C>) {
let _ = self.send(event);
}
}
pub mod link {
use std::borrow::Cow;
use std::rc::Rc;
use std::cell::RefCell;
use super::{Event, EventPusher, EventIterator};
pub struct EventLink<T, C> {
pub event: Option<Event<T, C>>,
pub next: RefCell<Option<Rc<EventLink<T, C>>>>,
}
impl<T, C> EventLink<T, C> {
pub fn new() -> EventLink<T, C> {
EventLink { event: None, next: RefCell::new(None) }
}
}
impl<T, C> EventPusher<T, C> for Rc<EventLink<T, C>> {
fn push(&mut self, event: Event<T, C>) {
*self.next.borrow_mut() = Some(Rc::new(EventLink { event: Some(event), next: RefCell::new(None) }));
let next = Rc::clone(self.next.borrow().as_ref().unwrap());
*self = next;
}
}
impl<T: Clone, C: Clone> EventIterator<T, C> for Rc<EventLink<T, C>> {
fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
let is_some = self.next.borrow().is_some();
if is_some {
let next = Rc::clone(self.next.borrow().as_ref().unwrap());
*self = next;
if let Some(this) = Rc::get_mut(self) {
this.event.take().map(Cow::Owned)
}
else {
self.event.as_ref().map(Cow::Borrowed)
}
}
else {
None
}
}
}
impl<T, C> Drop for EventLink<T, C> {
fn drop(&mut self) {
while let Some(link) = self.next.replace(None) {
if let Ok(head) = Rc::try_unwrap(link) {
*self = head;
}
}
}
}
impl<T, C> Default for EventLink<T, C> {
fn default() -> Self {
Self::new()
}
}
#[test]
fn avoid_stack_overflow_in_drop() {
#[cfg(miri)]
let limit = 1_000;
#[cfg(not(miri))]
let limit = 1_000_000;
let mut event1 = Rc::new(EventLink::<(),()>::new());
let _event2 = Rc::clone(&event1);
for _ in 0 .. limit {
event1.push(Event::Progress(vec![]));
}
}
}
pub mod link_sync {
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use super::{Event, EventPusher, EventIterator};
pub struct EventLink<T, C> {
pub event: Option<Event<T, C>>,
pub next: Mutex<Option<Arc<EventLink<T, C>>>>,
}
impl<T, C> EventLink<T, C> {
pub fn new() -> EventLink<T, C> {
EventLink { event: None, next: Mutex::new(None) }
}
}
impl<T, C> EventPusher<T, C> for Arc<EventLink<T, C>> {
fn push(&mut self, event: Event<T, C>) {
let mut guard = self.next.lock().unwrap();
*guard = Some(Arc::new(EventLink { event: Some(event), next: Mutex::new(None) }));
let next = Arc::clone(guard.as_ref().unwrap());
drop(guard);
*self = next;
}
}
impl<T: Clone, C: Clone> EventIterator<T, C> for Arc<EventLink<T, C>> {
fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
let is_some = self.next.lock().unwrap().is_some();
if is_some {
let next = Arc::clone(self.next.lock().unwrap().as_ref().unwrap());
*self = next;
if let Some(this) = Arc::get_mut(self) {
this.event.take().map(Cow::Owned)
}
else {
self.event.as_ref().map(Cow::Borrowed)
}
}
else {
None
}
}
}
impl<T, C> Drop for EventLink<T, C> {
fn drop(&mut self) {
while let Some(link) = self.next.get_mut().unwrap().take() {
if let Ok(head) = Arc::try_unwrap(link) {
*self = head;
}
}
}
}
impl<T, C> Default for EventLink<T, C> {
fn default() -> Self {
Self::new()
}
}
#[test]
fn avoid_stack_overflow_in_drop() {
#[cfg(miri)]
let limit = 1_000;
#[cfg(not(miri))]
let limit = 1_000_000;
let mut event1 = Arc::new(EventLink::<(),()>::new());
let _event2 = Arc::clone(&event1);
for _ in 0 .. limit {
event1.push(Event::Progress(vec![]));
}
}
}
pub mod binary {
use std::borrow::Cow;
use std::io::ErrorKind;
use std::ops::DerefMut;
use std::sync::Arc;
use serde::{de::DeserializeOwned, Serialize};
use timely_communication::allocator::zero_copy::bytes_slab::{BytesRefill, BytesSlab};
use super::{Event, EventPusher, EventIterator};
pub struct EventWriter<T, C, W: ::std::io::Write> {
stream: W,
phant: ::std::marker::PhantomData<(T, C)>,
}
impl<T, C, W: ::std::io::Write> EventWriter<T, C, W> {
pub fn new(w: W) -> Self {
Self {
stream: w,
phant: ::std::marker::PhantomData,
}
}
}
impl<T: Serialize, C: Serialize, W: ::std::io::Write> EventPusher<T, C> for EventWriter<T, C, W> {
fn push(&mut self, event: Event<T, C>) {
let len = ::bincode::serialized_size(&event).expect("Event bincode failed");
self.stream.write_all(&len.to_le_bytes()).expect("Event write failed");
::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode failed");
}
}
pub struct EventReader<T, C, R: ::std::io::Read> {
reader: R,
buf: BytesSlab,
phant: ::std::marker::PhantomData<(T, C)>,
}
impl<T, C, R: ::std::io::Read> EventReader<T, C, R> {
pub fn new(r: R) -> Self {
let refill = BytesRefill {
logic: Arc::new(|size| {
Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target = [u8]>>
}),
limit: None,
};
Self {
reader: r,
buf: BytesSlab::new(20, refill),
phant: ::std::marker::PhantomData,
}
}
}
impl<T: DeserializeOwned + Clone, C: DeserializeOwned + Clone, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
self.buf.ensure_capacity(1);
match self.reader.read(self.buf.empty()) {
Ok(n) => self.buf.make_valid(n),
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
Err(e) => panic!("read failed: {e}"),
};
let valid = self.buf.valid();
if valid.len() >= 8 {
let event_len = u64::from_le_bytes([
valid[0], valid[1], valid[2], valid[3], valid[4], valid[5], valid[6], valid[7],
]);
let required_bytes = (event_len + 8) as usize;
if valid.len() >= required_bytes {
let bytes = self.buf.extract(required_bytes);
let event = ::bincode::deserialize(&bytes[8..]).expect("Event decode failed");
Some(Cow::Owned(event))
} else {
None
}
} else {
None
}
}
}
}