timely_communication/allocator/zero_copy/
push_pull.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use bytes::arc::Bytes;
8
9use crate::allocator::canary::Canary;
10use crate::networking::MessageHeader;
11
12use crate::{Data, Push, Pull};
13use crate::allocator::Message;
14
15use super::bytes_exchange::{BytesPush, SendEndpoint};
16
17pub struct Pusher<T, P: BytesPush> {
22 header: MessageHeader,
23 sender: Rc<RefCell<SendEndpoint<P>>>,
24 phantom: ::std::marker::PhantomData<T>,
25}
26
27impl<T, P: BytesPush> Pusher<T, P> {
28 pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> {
30 Pusher {
31 header,
32 sender,
33 phantom: ::std::marker::PhantomData,
34 }
35 }
36}
37
38impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
39 #[inline]
40 fn push(&mut self, element: &mut Option<Message<T>>) {
41 if let Some(ref mut element) = *element {
42
43 let mut header = self.header;
45 self.header.seqno += 1;
46 header.length = element.length_in_bytes();
47 assert!(header.length > 0);
48
49 let mut borrow = self.sender.borrow_mut();
51 {
52 let mut bytes = borrow.reserve(header.required_bytes());
53 assert!(bytes.len() >= header.required_bytes());
54 let writer = &mut bytes;
55 header.write_to(writer).expect("failed to write header!");
56 element.into_bytes(writer);
57 }
58 borrow.make_valid(header.required_bytes());
59 }
60 }
61}
62
63pub struct Puller<T> {
70 _canary: Canary,
71 current: Option<Message<T>>,
72 receiver: Rc<RefCell<VecDeque<Bytes>>>, }
74
75impl<T:Data> Puller<T> {
76 pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
78 Puller {
79 _canary,
80 current: None,
81 receiver,
82 }
83 }
84}
85
86impl<T:Data> Pull<Message<T>> for Puller<T> {
87 #[inline]
88 fn pull(&mut self) -> &mut Option<Message<T>> {
89 self.current =
90 self.receiver
91 .borrow_mut()
92 .pop_front()
93 .map(|bytes| unsafe { Message::from_bytes(bytes) });
94
95 &mut self.current
96 }
97}
98
99pub struct PullerInner<T> {
106 inner: Box<dyn Pull<Message<T>>>, _canary: Canary,
108 current: Option<Message<T>>,
109 receiver: Rc<RefCell<VecDeque<Bytes>>>, }
111
112impl<T:Data> PullerInner<T> {
113 pub fn new(inner: Box<dyn Pull<Message<T>>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
115 PullerInner {
116 inner,
117 _canary,
118 current: None,
119 receiver,
120 }
121 }
122}
123
124impl<T:Data> Pull<Message<T>> for PullerInner<T> {
125 #[inline]
126 fn pull(&mut self) -> &mut Option<Message<T>> {
127
128 let inner = self.inner.pull();
129 if inner.is_some() {
130 inner
131 }
132 else {
133 self.current =
134 self.receiver
135 .borrow_mut()
136 .pop_front()
137 .map(|bytes| unsafe { Message::from_bytes(bytes) });
138
139 &mut self.current
140 }
141 }
142}