mai/
evented_frame_stream.rs

1use lifeguard::{RcRecycled, Pool};
2use mio::{EventLoop, EventSet, Token, PollOpt};
3use mio::Sender as MioSender;
4
5use std::collections::VecDeque;
6use std::io;
7
8use Codec;
9use Command;
10use Context;
11use Handler;
12use Error;
13use Error::*;
14use StreamState::*;
15use Protocol;
16use Buffer;
17use ProtocolEngine;
18
19#[derive(Debug,Clone,Copy)]
20pub enum StreamState {
21  NotReady,
22  Ready,
23  Done
24}
25
26pub type Outbox<F> = VecDeque<F>;
27
28#[derive(Debug)]
29pub struct EventedFrameStream<P: ?Sized> where P: Protocol {
30  pub stream: P::ByteStream,
31  pub state: StreamState,
32  pub codec: P::Codec,
33  pub read_buffer: Option<RcRecycled<Buffer>>,
34  pub write_buffer: Option<RcRecycled<Buffer>>,
35  pub outbox: Option<RcRecycled<Outbox<P::Frame>>>,
36}
37
38impl <P: ?Sized> EventedFrameStream<P> where P: Protocol {
39  pub fn new(ebs: P::ByteStream) -> EventedFrameStream<P> {
40    EventedFrameStream {
41      stream: ebs,
42      state: StreamState::NotReady,
43      codec: P::Codec::new(),
44      read_buffer: None,
45      write_buffer: None,
46      outbox: None,
47    }
48  }
49
50  pub fn release_empty_buffers(&mut self) {
51    let mut drop_read_buffer = false;
52    if self.read_buffer.is_some() {
53      let read_buffer = self.read_buffer.as_mut().unwrap();
54      if read_buffer.len() == 0 {
55        drop_read_buffer = true;
56      }
57    }
58
59    let mut drop_write_buffer = false;
60    if self.write_buffer.is_some() {
61      let write_buffer = self.write_buffer.as_mut().unwrap();
62      if write_buffer.len() == 0 {
63        drop_write_buffer = true;
64      }
65    }
66
67    let mut drop_outbox = false;
68    if self.outbox.is_some() {
69      let outbox = self.outbox.as_mut().unwrap();
70      if outbox.len() == 0 {
71        drop_outbox = true;
72      }
73    }
74
75    if drop_read_buffer {
76      debug!("Read buffer is empty. Releasing it to the pool.");
77      self.read_buffer = None;
78    }
79    if drop_write_buffer {
80      debug!("Write buffer is empty. Releasing it to the pool.");
81      self.write_buffer = None;
82    }
83    if drop_outbox {
84      debug!("Outbox is empty. Releasing it to the pool.");
85      self.write_buffer = None;
86    }
87  }
88
89  pub fn has_bytes_to_write(&self) -> bool {
90    // Has bytes in the outbound buffer waiting to be written...
91    (!self.write_buffer.is_none() && self.write_buffer.as_ref().unwrap().len() > 0)
92    // or frames waiting to be serialized and written
93        || (!self.outbox.is_none() && self.outbox.as_ref().unwrap().len() > 0)
94  }
95
96  pub fn reading_toolset(&mut self, buffer_pool: &mut Pool<Buffer>) -> (&mut P::Codec, &mut P::ByteStream, &mut Buffer) {
97    if self.read_buffer.is_none() {
98      debug!("Getting a read_buffer from the pool.");
99      self.read_buffer = Some(buffer_pool.new_rc());
100    }
101    let EventedFrameStream {
102      ref mut stream,
103      ref mut read_buffer,
104      ref mut codec,
105      ..
106    } = *self;
107    (codec, stream, read_buffer.as_mut().unwrap())
108  }
109
110  // TODO: Split into encoding_toolset and writing_toolset?
111  // Currently an outbox will be allocated if absent, which is silly
112  pub fn writing_toolset(&mut self, buffer_pool: &mut Pool<Buffer>, outbox_pool: &mut Pool<Outbox<P::Frame>>) -> (&mut P::Codec, &mut P::ByteStream, &mut Buffer, &mut Outbox<P::Frame>) {
113    if self.write_buffer.is_none() {
114      debug!("Getting a write_buffer from the pool.");
115      self.write_buffer = Some(buffer_pool.new_rc());
116    }
117    if self.outbox.is_none() {
118      debug!("Getting an outbox from the pool.");
119      self.outbox = Some(outbox_pool.new_rc());
120    }
121    let EventedFrameStream {
122      ref mut stream,
123      ref mut write_buffer,
124      ref mut outbox,
125      ref mut codec,
126        ..
127    } = *self;
128    (codec, stream, write_buffer.as_mut().unwrap(), outbox.as_mut().unwrap())
129  }
130
131  pub fn read_buffer(&mut self, buffer_pool: &mut Pool<Buffer>) -> &mut Buffer {
132    if self.read_buffer.is_none() {
133      debug!("Getting a read_buffer from the pool.");
134      self.read_buffer = Some(buffer_pool.new_rc());
135    }
136    self.read_buffer.as_mut().unwrap()
137  }
138
139  pub fn write_buffer(&mut self, buffer_pool: &mut Pool<Buffer>) -> &mut Buffer {
140    if self.write_buffer.is_none() {
141      debug!("Getting a write_buffer from the pool.");
142      self.write_buffer = Some(buffer_pool.new_rc());
143    }
144    self.write_buffer.as_mut().unwrap()
145  }
146
147  pub fn outbox(&mut self, outbox_pool: &mut Pool<Outbox<P::Frame>>) -> &mut Outbox<P::Frame> {
148    if self.outbox.is_none() {
149      debug!("Getting an outbox from the pool.");
150      self.outbox = Some(outbox_pool.new_rc());
151    }
152    self.outbox.as_mut().unwrap()
153  }
154
155  pub fn register_interest_in_writing (
156    &self,
157    event_loop: &mut EventLoop<ProtocolEngine<P>>,
158    token: Token) -> io::Result<()> {
159      debug!("Registering interest in writable event.");
160      event_loop.reregister(
161        &self.stream,
162        token,
163        EventSet::all(),
164        PollOpt::level()
165      )
166  }
167
168  pub fn deregister_interest_in_writing (
169    &self,
170    event_loop:
171    &mut EventLoop<ProtocolEngine<P>>,
172    token: Token) -> io::Result<()> {
173      debug!("De-registering interest in writable event.");
174      let mut interests = EventSet::all();
175      interests.remove(EventSet::writable());
176      event_loop.reregister(
177        &self.stream,
178        token,
179        interests,
180        PollOpt::level()
181      )
182  }
183
184  pub fn on_error(
185    &mut self,
186    event_loop: &mut EventLoop<ProtocolEngine<P>>,
187    token: Token,
188    session: &mut P::Session,
189    outbox_pool: &mut Pool<Outbox<P::Frame>>,
190    command_sender: &mut MioSender<Command<P>>,
191    handler: &mut P::Handler,
192    error: &Error) {
193      if let Io(_) = *error {
194        debug!("{:?} encountered an i/o error. Setting state to 'Done'.", token);
195        self.state = Done;
196      } //TODO: Other non-fatal error types should move state to 'ShuttingDown'
197      let context = &mut Context::new(event_loop, self, session, outbox_pool, command_sender, token);
198      handler.on_error(context, error);
199  }
200
201  pub fn send (
202    &mut self,
203    event_loop: &mut EventLoop<ProtocolEngine<P>>,
204    token: Token,
205    outbox_pool: &mut Pool<Outbox<P::Frame>>,
206    frame: P::Frame) -> io::Result<()> {
207
208    // If we weren't waiting to write before this, register interest
209    // in case we had deregistered it previously.
210    if !self.has_bytes_to_write() {
211      try!(self.register_interest_in_writing(event_loop, token))
212    }
213
214    // Get the outbox (from the pool if necessary) and add our frame
215    let mut outbox = self.outbox(outbox_pool);
216    outbox.push_back(frame);
217    debug!("Outbox has {} messages.", outbox.len());
218    Ok(())
219  }
220}