timely_communication/allocator/zero_copy/
bytes_exchange.rs1use std::sync::{Arc, Mutex};
4use std::collections::VecDeque;
5
6use bytes::arc::Bytes;
7use super::bytes_slab::BytesSlab;
8
9pub trait BytesPush {
11    fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I);
15}
16pub trait BytesPull {
18    fn drain_into(&mut self, vec: &mut Vec<Bytes>);
22}
23
24use std::sync::atomic::{AtomicBool, Ordering};
25#[derive(Clone)]
30pub struct MergeQueue {
31    queue: Arc<Mutex<VecDeque<Bytes>>>, buzzer: crate::buzzer::Buzzer,  panic: Arc<AtomicBool>,
34}
35
36impl MergeQueue {
37    pub fn new(buzzer: crate::buzzer::Buzzer) -> Self {
39        MergeQueue {
40            queue: Arc::new(Mutex::new(VecDeque::new())),
41            buzzer,
42            panic: Arc::new(AtomicBool::new(false)),
43        }
44    }
45    pub fn is_complete(&self) -> bool {
47        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
48        Arc::strong_count(&self.queue) == 1 && self.queue.lock().expect("Failed to acquire lock").is_empty()
49    }
50}
51
52impl BytesPush for MergeQueue {
53    fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) {
54
55        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
56
57        let mut lock_ok = self.queue.try_lock();
59        while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
60            lock_ok = self.queue.try_lock();
61        }
62        let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
63
64        let mut iterator = iterator.into_iter();
65        let mut should_ping = false;
66        if let Some(bytes) = iterator.next() {
67            let mut tail = if let Some(mut tail) = queue.pop_back() {
68                if let Err(bytes) = tail.try_merge(bytes) {
69                    queue.push_back(::std::mem::replace(&mut tail, bytes));
70                }
71                tail
72            }
73            else {
74                should_ping = true;
75                bytes
76            };
77
78            for bytes in iterator {
79                if let Err(bytes) = tail.try_merge(bytes) {
80                    queue.push_back(::std::mem::replace(&mut tail, bytes));
81                }
82            }
83            queue.push_back(tail);
84        }
85
86        ::std::mem::drop(queue);
88        if should_ping {
89            self.buzzer.buzz();  }
91    }
92}
93
94impl BytesPull for MergeQueue {
95    fn drain_into(&mut self, vec: &mut Vec<Bytes>) {
96        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
97
98        let mut lock_ok = self.queue.try_lock();
100        while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
101            lock_ok = self.queue.try_lock();
102        }
103        let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
104
105        vec.extend(queue.drain(..));
106    }
107}
108
109impl Drop for MergeQueue {
112    fn drop(&mut self) {
113        if ::std::thread::panicking() {
115            self.panic.store(true, Ordering::SeqCst);
116        }
117        else {
118            if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
120        }
121        self.queue = Arc::new(Mutex::new(VecDeque::new()));
123        self.buzzer.buzz();
124    }
125}
126
127
128pub struct SendEndpoint<P: BytesPush> {
130    send: P,
131    buffer: BytesSlab,
132}
133
134impl<P: BytesPush> SendEndpoint<P> {
135
136    fn send_buffer(&mut self) {
138        let valid_len = self.buffer.valid().len();
139        if valid_len > 0 {
140            self.send.extend(Some(self.buffer.extract(valid_len)));
141        }
142    }
143
144    pub fn new(queue: P) -> Self {
146        SendEndpoint {
147            send: queue,
148            buffer: BytesSlab::new(20),
149        }
150    }
151    pub fn make_valid(&mut self, bytes: usize) {
155        self.buffer.make_valid(bytes);
156        self.send_buffer();
157    }
158    pub fn reserve(&mut self, capacity: usize) -> &mut [u8] {
160
161        if self.buffer.empty().len() < capacity {
162            self.send_buffer();
163            self.buffer.ensure_capacity(capacity);
164        }
165
166        assert!(self.buffer.empty().len() >= capacity);
167        self.buffer.empty()
168    }
169    pub fn publish(&mut self) {
171        self.send_buffer();
172    }
173}
174
175impl<P: BytesPush> Drop for SendEndpoint<P> {
176    fn drop(&mut self) {
177        self.send_buffer();
178    }
179}
180