mai/
evented_frame_stream.rs1use lifeguard::{RcRecycled, Pool};
2use mio::{EventLoop, EventSet, Token, PollOpt};
3use mio::Sender as MioSender;
4
5use std::collections::VecDeque;
6use std::io;
7
8use Codec;
9use Command;
10use Context;
11use Handler;
12use Error;
13use Error::*;
14use StreamState::*;
15use Protocol;
16use Buffer;
17use ProtocolEngine;
18
19#[derive(Debug,Clone,Copy)]
20pub enum StreamState {
21 NotReady,
22 Ready,
23 Done
24}
25
26pub type Outbox<F> = VecDeque<F>;
27
28#[derive(Debug)]
29pub struct EventedFrameStream<P: ?Sized> where P: Protocol {
30 pub stream: P::ByteStream,
31 pub state: StreamState,
32 pub codec: P::Codec,
33 pub read_buffer: Option<RcRecycled<Buffer>>,
34 pub write_buffer: Option<RcRecycled<Buffer>>,
35 pub outbox: Option<RcRecycled<Outbox<P::Frame>>>,
36}
37
38impl <P: ?Sized> EventedFrameStream<P> where P: Protocol {
39 pub fn new(ebs: P::ByteStream) -> EventedFrameStream<P> {
40 EventedFrameStream {
41 stream: ebs,
42 state: StreamState::NotReady,
43 codec: P::Codec::new(),
44 read_buffer: None,
45 write_buffer: None,
46 outbox: None,
47 }
48 }
49
50 pub fn release_empty_buffers(&mut self) {
51 let mut drop_read_buffer = false;
52 if self.read_buffer.is_some() {
53 let read_buffer = self.read_buffer.as_mut().unwrap();
54 if read_buffer.len() == 0 {
55 drop_read_buffer = true;
56 }
57 }
58
59 let mut drop_write_buffer = false;
60 if self.write_buffer.is_some() {
61 let write_buffer = self.write_buffer.as_mut().unwrap();
62 if write_buffer.len() == 0 {
63 drop_write_buffer = true;
64 }
65 }
66
67 let mut drop_outbox = false;
68 if self.outbox.is_some() {
69 let outbox = self.outbox.as_mut().unwrap();
70 if outbox.len() == 0 {
71 drop_outbox = true;
72 }
73 }
74
75 if drop_read_buffer {
76 debug!("Read buffer is empty. Releasing it to the pool.");
77 self.read_buffer = None;
78 }
79 if drop_write_buffer {
80 debug!("Write buffer is empty. Releasing it to the pool.");
81 self.write_buffer = None;
82 }
83 if drop_outbox {
84 debug!("Outbox is empty. Releasing it to the pool.");
85 self.write_buffer = None;
86 }
87 }
88
89 pub fn has_bytes_to_write(&self) -> bool {
90 (!self.write_buffer.is_none() && self.write_buffer.as_ref().unwrap().len() > 0)
92 || (!self.outbox.is_none() && self.outbox.as_ref().unwrap().len() > 0)
94 }
95
96 pub fn reading_toolset(&mut self, buffer_pool: &mut Pool<Buffer>) -> (&mut P::Codec, &mut P::ByteStream, &mut Buffer) {
97 if self.read_buffer.is_none() {
98 debug!("Getting a read_buffer from the pool.");
99 self.read_buffer = Some(buffer_pool.new_rc());
100 }
101 let EventedFrameStream {
102 ref mut stream,
103 ref mut read_buffer,
104 ref mut codec,
105 ..
106 } = *self;
107 (codec, stream, read_buffer.as_mut().unwrap())
108 }
109
110 pub fn writing_toolset(&mut self, buffer_pool: &mut Pool<Buffer>, outbox_pool: &mut Pool<Outbox<P::Frame>>) -> (&mut P::Codec, &mut P::ByteStream, &mut Buffer, &mut Outbox<P::Frame>) {
113 if self.write_buffer.is_none() {
114 debug!("Getting a write_buffer from the pool.");
115 self.write_buffer = Some(buffer_pool.new_rc());
116 }
117 if self.outbox.is_none() {
118 debug!("Getting an outbox from the pool.");
119 self.outbox = Some(outbox_pool.new_rc());
120 }
121 let EventedFrameStream {
122 ref mut stream,
123 ref mut write_buffer,
124 ref mut outbox,
125 ref mut codec,
126 ..
127 } = *self;
128 (codec, stream, write_buffer.as_mut().unwrap(), outbox.as_mut().unwrap())
129 }
130
131 pub fn read_buffer(&mut self, buffer_pool: &mut Pool<Buffer>) -> &mut Buffer {
132 if self.read_buffer.is_none() {
133 debug!("Getting a read_buffer from the pool.");
134 self.read_buffer = Some(buffer_pool.new_rc());
135 }
136 self.read_buffer.as_mut().unwrap()
137 }
138
139 pub fn write_buffer(&mut self, buffer_pool: &mut Pool<Buffer>) -> &mut Buffer {
140 if self.write_buffer.is_none() {
141 debug!("Getting a write_buffer from the pool.");
142 self.write_buffer = Some(buffer_pool.new_rc());
143 }
144 self.write_buffer.as_mut().unwrap()
145 }
146
147 pub fn outbox(&mut self, outbox_pool: &mut Pool<Outbox<P::Frame>>) -> &mut Outbox<P::Frame> {
148 if self.outbox.is_none() {
149 debug!("Getting an outbox from the pool.");
150 self.outbox = Some(outbox_pool.new_rc());
151 }
152 self.outbox.as_mut().unwrap()
153 }
154
155 pub fn register_interest_in_writing (
156 &self,
157 event_loop: &mut EventLoop<ProtocolEngine<P>>,
158 token: Token) -> io::Result<()> {
159 debug!("Registering interest in writable event.");
160 event_loop.reregister(
161 &self.stream,
162 token,
163 EventSet::all(),
164 PollOpt::level()
165 )
166 }
167
168 pub fn deregister_interest_in_writing (
169 &self,
170 event_loop:
171 &mut EventLoop<ProtocolEngine<P>>,
172 token: Token) -> io::Result<()> {
173 debug!("De-registering interest in writable event.");
174 let mut interests = EventSet::all();
175 interests.remove(EventSet::writable());
176 event_loop.reregister(
177 &self.stream,
178 token,
179 interests,
180 PollOpt::level()
181 )
182 }
183
184 pub fn on_error(
185 &mut self,
186 event_loop: &mut EventLoop<ProtocolEngine<P>>,
187 token: Token,
188 session: &mut P::Session,
189 outbox_pool: &mut Pool<Outbox<P::Frame>>,
190 command_sender: &mut MioSender<Command<P>>,
191 handler: &mut P::Handler,
192 error: &Error) {
193 if let Io(_) = *error {
194 debug!("{:?} encountered an i/o error. Setting state to 'Done'.", token);
195 self.state = Done;
196 } let context = &mut Context::new(event_loop, self, session, outbox_pool, command_sender, token);
198 handler.on_error(context, error);
199 }
200
201 pub fn send (
202 &mut self,
203 event_loop: &mut EventLoop<ProtocolEngine<P>>,
204 token: Token,
205 outbox_pool: &mut Pool<Outbox<P::Frame>>,
206 frame: P::Frame) -> io::Result<()> {
207
208 if !self.has_bytes_to_write() {
211 try!(self.register_interest_in_writing(event_loop, token))
212 }
213
214 let mut outbox = self.outbox(outbox_pool);
216 outbox.push_back(frame);
217 debug!("Outbox has {} messages.", outbox.len());
218 Ok(())
219 }
220}