1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use std::{collections::VecDeque, fmt, marker::PhantomData};
use async_zmq_types::{IntoSocket, Multipart};
use futures::{try_ready, Async, AsyncSink, Sink};
use crate::{async_types::SendState, error::Error, socket::Socket};
pub struct MultipartSink<T>
where
T: From<Socket>,
{
state: SendState,
sock: Socket,
multiparts: VecDeque<Multipart>,
buffer_size: usize,
phantom: PhantomData<T>,
}
impl<T> MultipartSink<T>
where
T: From<Socket>,
{
pub fn new(sock: Socket, buffer_size: usize) -> Self {
MultipartSink {
state: SendState::Ready,
sock,
multiparts: VecDeque::new(),
buffer_size,
phantom: PhantomData,
}
}
}
impl<T> IntoSocket<T, Socket> for MultipartSink<T>
where
T: From<Socket>,
{
fn into_socket(self) -> T {
T::from(self.sock)
}
}
impl<T> Sink for MultipartSink<T>
where
T: From<Socket>,
{
type SinkItem = Multipart;
type SinkError = Error;
fn start_send(
&mut self,
multipart: Self::SinkItem,
) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
self.poll_complete()?;
if self.multiparts.len() >= self.buffer_size {
return Ok(AsyncSink::NotReady(multipart));
}
self.multiparts.push_back(multipart);
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
try_ready!(self.state.poll_flush(&self.sock));
while let Some(multipart) = self.multiparts.pop_front() {
self.state = SendState::Pending(multipart);
try_ready!(self.state.poll_flush(&self.sock));
}
Ok(Async::Ready(()))
}
}
impl<T> fmt::Debug for MultipartSink<T>
where
T: From<Socket>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MultipartSink({:?})", self.sock)
}
}
impl<T> fmt::Display for MultipartSink<T>
where
T: From<Socket>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MultipartSink({})", self.sock)
}
}