tk_carbon/
proto.rs

1use std::io;
2use std::sync::Arc;
3
4use futures::{Stream, Future, Async};
5use tk_bufstream::IoBuf;
6use tokio_io::{AsyncRead, AsyncWrite};
7use tokio_core::reactor::{Handle, Timeout};
8
9use channel::Receiver;
10use {Init, Config};
11
12
13/// Low-level interface to a single carbon connection
14pub struct Proto<T> {
15    io: IoBuf<T>,
16    channel: Receiver,
17    config: Arc<Config>,
18    timeo: Timeout,
19    handle: Handle,
20}
21
22impl Init {
23    /// Wrap existing connection into a future that implements carbon protocol
24    pub fn from_connection<T>(self, conn: T, handle: &Handle)
25        -> Proto<T>
26    {
27        Proto {
28            io: IoBuf::new(conn),
29            channel: self.chan,
30            timeo: Timeout::new(self.config.write_timeout, &handle)
31                .expect("can always set a timeout"),
32            handle: handle.clone(),
33            config: self.config,
34        }
35    }
36}
37
38impl<T: AsyncRead+AsyncWrite> Future for Proto<T> {
39    type Item = ();
40    type Error = ();
41    fn poll(&mut self) -> Result<Async<()>, ()> {
42        self.io.read().map_err(|_| ())?;
43        if self.io.in_buf.len() > 0 {
44            // invalid protocol is an error
45            return Err(());
46        }
47        if self.io.done() {
48            // connection closed by peer is just finish of a future
49            return Ok(Async::Ready(()));
50        }
51        if self.io.out_buf.len() >= self.config.watermarks.0 {
52            self.flush_output().map_err(|_| ())?;
53            if self.io.out_buf.len() >= self.config.watermarks.0 {
54                return Ok(Async::NotReady);
55            }
56        }
57        while let Async::Ready(Some(metric)) = self.channel.poll()?  {
58            self.io.out_buf.extend(&metric.0);
59            if self.io.out_buf.len() >= self.config.watermarks.0 {
60                break;
61            }
62        }
63        self.flush_output().map_err(|_| ())?;
64        if self.channel.is_done() && self.io.out_buf.len() == 0 {
65            return Ok(Async::Ready(()));
66        }
67        return Ok(Async::NotReady);
68    }
69}
70
71impl<T: AsyncWrite> Proto<T> {
72
73    fn flush_output(&mut self) -> io::Result<()> {
74        let old_out = self.io.out_buf.len();
75        if old_out > 0 {
76            self.io.flush()?;
77            let new_out = self.io.out_buf.len();
78            if new_out != old_out {
79                if new_out != 0 {
80                    self.timeo = Timeout::new(
81                        self.config.write_timeout, &self.handle)?;
82                    self.timeo.poll()?;  // schedule a timeout
83                }
84            } else {
85                let poll_result = self.timeo.poll()?;
86                if poll_result.is_ready() {
87                    // timeout, no byte is written within the period
88                    return Err(io::ErrorKind::TimedOut.into());
89                }
90            }
91        }
92        Ok(())
93    }
94}