use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;
use timely_bytes::arc::Bytes;
use crate::allocator::canary::Canary;
use crate::networking::MessageHeader;
use crate::{Bytesable, Push, Pull};
use super::bytes_exchange::{BytesPush, SendEndpoint};
pub struct Pusher<T, P: BytesPush> {
header: MessageHeader,
sender: Rc<RefCell<SendEndpoint<P>>>,
phantom: ::std::marker::PhantomData<T>,
}
impl<T, P: BytesPush> Pusher<T, P> {
pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> {
Pusher {
header,
sender,
phantom: ::std::marker::PhantomData,
}
}
}
impl<T: Bytesable, P: BytesPush> Push<T> for Pusher<T, P> {
#[inline]
fn push(&mut self, element: &mut Option<T>) {
if let Some(ref element) = *element {
let mut header = self.header;
self.header.seqno += 1;
header.length = element.length_in_bytes();
assert!(header.length > 0);
let mut borrow = self.sender.borrow_mut();
{
let mut bytes = borrow.reserve(header.required_bytes());
assert!(bytes.len() >= header.required_bytes());
let writer = &mut bytes;
header.write_to(writer).expect("failed to write header!");
element.into_bytes(writer);
}
borrow.make_valid(header.required_bytes());
}
}
}
pub struct Puller<T> {
_canary: Canary,
current: Option<T>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, }
impl<T: Bytesable> Puller<T> {
pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
Puller {
_canary,
current: None,
receiver,
}
}
}
impl<T: Bytesable> Pull<T> for Puller<T> {
#[inline]
fn pull(&mut self) -> &mut Option<T> {
self.current =
self.receiver
.borrow_mut()
.pop_front()
.map(T::from_bytes);
&mut self.current
}
}
pub struct PullerInner<T> {
inner: Box<dyn Pull<T>>, _canary: Canary,
current: Option<T>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, }
impl<T: Bytesable> PullerInner<T> {
pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
PullerInner {
inner,
_canary,
current: None,
receiver,
}
}
}
impl<T: Bytesable> Pull<T> for PullerInner<T> {
#[inline]
fn pull(&mut self) -> &mut Option<T> {
let inner = self.inner.pull();
if inner.is_some() {
inner
}
else {
self.current =
self.receiver
.borrow_mut()
.pop_front()
.map(T::from_bytes);
&mut self.current
}
}
}