1use std::io;
2use std::sync::Arc;
3
4use futures::{Stream, Future, Async};
5use tk_bufstream::IoBuf;
6use tokio_io::{AsyncRead, AsyncWrite};
7use tokio_core::reactor::{Handle, Timeout};
8
9use channel::Receiver;
10use {Init, Config};
11
12
13pub struct Proto<T> {
15 io: IoBuf<T>,
16 channel: Receiver,
17 config: Arc<Config>,
18 timeo: Timeout,
19 handle: Handle,
20}
21
22impl Init {
23 pub fn from_connection<T>(self, conn: T, handle: &Handle)
25 -> Proto<T>
26 {
27 Proto {
28 io: IoBuf::new(conn),
29 channel: self.chan,
30 timeo: Timeout::new(self.config.write_timeout, &handle)
31 .expect("can always set a timeout"),
32 handle: handle.clone(),
33 config: self.config,
34 }
35 }
36}
37
38impl<T: AsyncRead+AsyncWrite> Future for Proto<T> {
39 type Item = ();
40 type Error = ();
41 fn poll(&mut self) -> Result<Async<()>, ()> {
42 self.io.read().map_err(|_| ())?;
43 if self.io.in_buf.len() > 0 {
44 return Err(());
46 }
47 if self.io.done() {
48 return Ok(Async::Ready(()));
50 }
51 if self.io.out_buf.len() >= self.config.watermarks.0 {
52 self.flush_output().map_err(|_| ())?;
53 if self.io.out_buf.len() >= self.config.watermarks.0 {
54 return Ok(Async::NotReady);
55 }
56 }
57 while let Async::Ready(Some(metric)) = self.channel.poll()? {
58 self.io.out_buf.extend(&metric.0);
59 if self.io.out_buf.len() >= self.config.watermarks.0 {
60 break;
61 }
62 }
63 self.flush_output().map_err(|_| ())?;
64 if self.channel.is_done() && self.io.out_buf.len() == 0 {
65 return Ok(Async::Ready(()));
66 }
67 return Ok(Async::NotReady);
68 }
69}
70
71impl<T: AsyncWrite> Proto<T> {
72
73 fn flush_output(&mut self) -> io::Result<()> {
74 let old_out = self.io.out_buf.len();
75 if old_out > 0 {
76 self.io.flush()?;
77 let new_out = self.io.out_buf.len();
78 if new_out != old_out {
79 if new_out != 0 {
80 self.timeo = Timeout::new(
81 self.config.write_timeout, &self.handle)?;
82 self.timeo.poll()?; }
84 } else {
85 let poll_result = self.timeo.poll()?;
86 if poll_result.is_ready() {
87 return Err(io::ErrorKind::TimedOut.into());
89 }
90 }
91 }
92 Ok(())
93 }
94}