timely_communication/allocator/zero_copy/
push_pull.rs

1//! Push and Pull implementations wrapping serialized data.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use bytes::arc::Bytes;
8
9use crate::allocator::canary::Canary;
10use crate::networking::MessageHeader;
11
12use crate::{Data, Push, Pull};
13use crate::allocator::Message;
14
15use super::bytes_exchange::{BytesPush, SendEndpoint};
16
17/// An adapter into which one may push elements of type `T`.
18///
19/// This pusher has a fixed MessageHeader, and access to a SharedByteBuffer which it uses to
20/// acquire buffers for serialization.
21pub struct Pusher<T, P: BytesPush> {
22    header:     MessageHeader,
23    sender:     Rc<RefCell<SendEndpoint<P>>>,
24    phantom:    ::std::marker::PhantomData<T>,
25}
26
27impl<T, P: BytesPush> Pusher<T, P> {
28    /// Creates a new `Pusher` from a header and shared byte buffer.
29    pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> {
30        Pusher {
31            header,
32            sender,
33            phantom:    ::std::marker::PhantomData,
34        }
35    }
36}
37
38impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
39    #[inline]
40    fn push(&mut self, element: &mut Option<Message<T>>) {
41        if let Some(ref mut element) = *element {
42
43            // determine byte lengths and build header.
44            let mut header = self.header;
45            self.header.seqno += 1;
46            header.length = element.length_in_bytes();
47            assert!(header.length > 0);
48
49            // acquire byte buffer and write header, element.
50            let mut borrow = self.sender.borrow_mut();
51            {
52                let mut bytes = borrow.reserve(header.required_bytes());
53                assert!(bytes.len() >= header.required_bytes());
54                let writer = &mut bytes;
55                header.write_to(writer).expect("failed to write header!");
56                element.into_bytes(writer);
57            }
58            borrow.make_valid(header.required_bytes());
59        }
60    }
61}
62
63/// An adapter from which one can pull elements of type `T`.
64///
65/// This type is very simple, and just consumes owned `Vec<u8>` allocations. It is
66/// not the most efficient thing possible, which would probably instead be something
67/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
68/// allocation.
69pub struct Puller<T> {
70    _canary: Canary,
71    current: Option<Message<T>>,
72    receiver: Rc<RefCell<VecDeque<Bytes>>>,    // source of serialized buffers
73}
74
75impl<T:Data> Puller<T> {
76    /// Creates a new `Puller` instance from a shared queue.
77    pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
78        Puller {
79            _canary,
80            current: None,
81            receiver,
82        }
83    }
84}
85
86impl<T:Data> Pull<Message<T>> for Puller<T> {
87    #[inline]
88    fn pull(&mut self) -> &mut Option<Message<T>> {
89        self.current =
90        self.receiver
91            .borrow_mut()
92            .pop_front()
93            .map(|bytes| unsafe { Message::from_bytes(bytes) });
94
95        &mut self.current
96    }
97}
98
99/// An adapter from which one can pull elements of type `T`.
100///
101/// This type is very simple, and just consumes owned `Vec<u8>` allocations. It is
102/// not the most efficient thing possible, which would probably instead be something
103/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
104/// allocation.
105pub struct PullerInner<T> {
106    inner: Box<dyn Pull<Message<T>>>,               // inner pullable (e.g. intra-process typed queue)
107    _canary: Canary,
108    current: Option<Message<T>>,
109    receiver: Rc<RefCell<VecDeque<Bytes>>>,     // source of serialized buffers
110}
111
112impl<T:Data> PullerInner<T> {
113    /// Creates a new `PullerInner` instance from a shared queue.
114    pub fn new(inner: Box<dyn Pull<Message<T>>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
115        PullerInner {
116            inner,
117            _canary,
118            current: None,
119            receiver,
120        }
121    }
122}
123
124impl<T:Data> Pull<Message<T>> for PullerInner<T> {
125    #[inline]
126    fn pull(&mut self) -> &mut Option<Message<T>> {
127
128        let inner = self.inner.pull();
129        if inner.is_some() {
130            inner
131        }
132        else {
133            self.current =
134            self.receiver
135                .borrow_mut()
136                .pop_front()
137                .map(|bytes| unsafe { Message::from_bytes(bytes) });
138
139            &mut self.current
140        }
141    }
142}