1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use std::io;
use std::sync::Arc;
use futures::{Stream, Future, Async};
use tk_bufstream::IoBuf;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::{Handle, Timeout};
use channel::Receiver;
use {Init, Config};
pub struct Proto<T> {
io: IoBuf<T>,
channel: Receiver,
config: Arc<Config>,
timeo: Timeout,
handle: Handle,
}
impl Init {
pub fn from_connection<T>(self, conn: T, handle: &Handle)
-> Proto<T>
{
Proto {
io: IoBuf::new(conn),
channel: self.chan,
timeo: Timeout::new(self.config.write_timeout, &handle)
.expect("can always set a timeout"),
handle: handle.clone(),
config: self.config,
}
}
}
impl<T: AsyncRead+AsyncWrite> Future for Proto<T> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<()>, ()> {
self.io.read().map_err(|_| ())?;
if self.io.in_buf.len() > 0 {
return Err(());
}
if self.io.done() {
return Ok(Async::Ready(()));
}
if self.io.out_buf.len() >= self.config.watermarks.0 {
self.flush_output().map_err(|_| ())?;
if self.io.out_buf.len() >= self.config.watermarks.0 {
return Ok(Async::NotReady);
}
}
while let Async::Ready(Some(metric)) = self.channel.poll()? {
self.io.out_buf.extend(&metric.0);
if self.io.out_buf.len() >= self.config.watermarks.0 {
break;
}
}
self.flush_output().map_err(|_| ())?;
if self.channel.is_done() && self.io.out_buf.len() == 0 {
return Ok(Async::Ready(()));
}
return Ok(Async::NotReady);
}
}
impl<T: AsyncWrite> Proto<T> {
fn flush_output(&mut self) -> io::Result<()> {
let old_out = self.io.out_buf.len();
if old_out > 0 {
self.io.flush()?;
let new_out = self.io.out_buf.len();
if new_out != old_out {
if new_out != 0 {
self.timeo = Timeout::new(
self.config.write_timeout, &self.handle)?;
self.timeo.poll()?;
}
} else {
let poll_result = self.timeo.poll()?;
if poll_result.is_ready() {
return Err(io::ErrorKind::TimedOut.into());
}
}
}
Ok(())
}
}