1#![allow(deprecated)]
71
72#[cfg(feature = "mio")]
73mod service_mio;
74#[cfg(not(feature = "mio"))]
75mod service_non_mio;
76#[cfg(feature = "mio")]
77mod worker;
78
79use std::cell::Cell;
80use std::{fmt, error};
81#[cfg(feature = "mio")]
82use mio::deprecated::{EventLoop, NotifyError};
83#[cfg(feature = "mio")]
84use mio::Token;
85
86thread_local! {
87 pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
91}
92
93#[derive(Debug)]
94pub enum IoError {
96 #[cfg(feature = "mio")]
98 Mio(::std::io::Error),
99 StdIo(::std::io::Error),
101}
102
103impl fmt::Display for IoError {
104 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105 match *self {
108 #[cfg(feature = "mio")]
109 IoError::Mio(ref std_err) => std_err.fmt(f),
110 IoError::StdIo(ref std_err) => std_err.fmt(f),
111 }
112 }
113}
114
115impl error::Error for IoError {
116 fn description(&self) -> &str {
117 "IO error"
118 }
119}
120
121impl From<::std::io::Error> for IoError {
122 fn from(err: ::std::io::Error) -> IoError {
123 IoError::StdIo(err)
124 }
125}
126
127#[cfg(feature = "mio")]
128impl<Message> From<NotifyError<service_mio::IoMessage<Message>>> for IoError where Message: Send {
129 fn from(_err: NotifyError<service_mio::IoMessage<Message>>) -> IoError {
130 IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
131 }
132}
133
134pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + 'static {
138 fn initialize(&self, _io: &IoContext<Message>) {}
140 fn timeout(&self, _io: &IoContext<Message>, _timer: TimerToken) {}
142 fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
144 #[cfg(feature = "mio")]
146 fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
147 #[cfg(feature = "mio")]
149 fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
150 #[cfg(feature = "mio")]
152 fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
153 #[cfg(feature = "mio")]
155 fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
156 #[cfg(feature = "mio")]
158 fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
159 #[cfg(feature = "mio")]
161 fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
162}
163
164#[cfg(feature = "mio")]
165pub use service_mio::{TimerToken, StreamToken, IoContext, IoService, IoChannel, IoManager, TOKENS_PER_HANDLER};
166#[cfg(not(feature = "mio"))]
167pub use crate::service_non_mio::{TimerToken, IoContext, IoService, IoChannel, TOKENS_PER_HANDLER};
168
169#[cfg(test)]
170mod tests {
171 use std::{
172 sync::{Arc, atomic},
173 thread,
174 time::Duration,
175 };
176 use super::*;
177
178 #[test]
183 #[cfg_attr(feature = "mio", ignore)]
184 fn send_message_to_handler() {
185 struct MyHandler(atomic::AtomicBool);
186
187 #[derive(Clone)]
188 struct MyMessage {
189 data: u32
190 }
191
192 impl IoHandler<MyMessage> for MyHandler {
193 fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
194 assert_eq!(message.data, 5);
195 self.0.store(true, atomic::Ordering::SeqCst);
196 }
197 }
198
199 let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
200
201 let service = IoService::<MyMessage>::start().expect("Error creating network service");
202 service.register_handler(handler.clone()).unwrap();
203
204 service.send_message(MyMessage { data: 5 }).unwrap();
205
206 thread::sleep(Duration::from_secs(1));
207 assert!(handler.0.load(atomic::Ordering::SeqCst));
208 }
209
210 #[test]
211 fn timeout_working() {
212 struct MyHandler(atomic::AtomicBool);
213
214 #[derive(Clone)]
215 struct MyMessage {
216 data: u32
217 }
218
219 impl IoHandler<MyMessage> for MyHandler {
220 fn initialize(&self, io: &IoContext<MyMessage>) {
221 io.register_timer_once(1234, Duration::from_millis(500)).unwrap();
222 }
223
224 fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
225 assert_eq!(timer, 1234);
226 assert!(!self.0.swap(true, atomic::Ordering::SeqCst));
227 }
228 }
229
230 let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
231
232 let service = IoService::<MyMessage>::start().expect("Error creating network service");
233 service.register_handler(handler.clone()).unwrap();
234
235 thread::sleep(Duration::from_secs(2));
236 assert!(handler.0.load(atomic::Ordering::SeqCst));
237 }
238
239 #[test]
240 fn multi_timeout_working() {
241 struct MyHandler(atomic::AtomicUsize);
242
243 #[derive(Clone)]
244 struct MyMessage {
245 data: u32
246 }
247
248 impl IoHandler<MyMessage> for MyHandler {
249 fn initialize(&self, io: &IoContext<MyMessage>) {
250 io.register_timer(1234, Duration::from_millis(500)).unwrap();
251 }
252
253 fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
254 assert_eq!(timer, 1234);
255 self.0.fetch_add(1, atomic::Ordering::SeqCst);
256 }
257 }
258
259 let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0)));
260
261 let service = IoService::<MyMessage>::start().expect("Error creating network service");
262 service.register_handler(handler.clone()).unwrap();
263
264 thread::sleep(Duration::from_secs(2));
265 assert!(handler.0.load(atomic::Ordering::SeqCst) >= 2);
266 }
267}