ethcore_io/
lib.rs

1// Copyright 2015-2020 Parity Technologies (UK) Ltd.
2// This file is part of Parity Ethereum.
3
4// Parity Ethereum is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Ethereum is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Ethereum.  If not, see <http://www.gnu.org/licenses/>.
16
17//! General IO module.
18//!
19//! Example usage for creating a network service and adding an IO handler:
20//!
21//! ```rust
22//! extern crate ethcore_io;
23//! use ethcore_io::*;
24//! use std::sync::Arc;
25//! use std::time::Duration;
26//!
27//! struct MyHandler;
28//!
29//! #[derive(Clone)]
30//! struct MyMessage {
31//! 	data: u32
32//! }
33//!
34//! impl IoHandler<MyMessage> for MyHandler {
35//! 	fn initialize(&self, io: &IoContext<MyMessage>) {
36//!			io.register_timer(0, Duration::from_secs(1)).unwrap();
37//!		}
38//!
39//!		fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
40//!			println!("Timeout {}", timer);
41//!		}
42//!
43//!		fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
44//!			println!("Message {}", message.data);
45//!		}
46//! }
47//!
48//! fn main () {
49//! 	let mut service = IoService::<MyMessage>::start().expect("Error creating network service");
50//! 	service.register_handler(Arc::new(MyHandler)).unwrap();
51//!
52//! 	// Wait for quit condition
53//! 	// ...
54//! 	// Drop the service
55//! }
56//! ```
57//!
58//! # Mio vs non-mio
59//!
60//! This library has two modes: mio and not mio. The `mio` feature can be activated or deactivated
61//! when compiling or depending on the library.
62//!
63//! Without mio, only timers and message-passing are available. With mio, you can also use
64//! low-level sockets provided by mio.
65//!
66//! The non-mio mode exists because the `mio` library doesn't compile on platforms such as
67//! emscripten.
68
69//TODO: use Poll from mio
70#![allow(deprecated)]
71
72#[cfg(feature = "mio")]
73mod service_mio;
74#[cfg(not(feature = "mio"))]
75mod service_non_mio;
76#[cfg(feature = "mio")]
77mod worker;
78
79use std::cell::Cell;
80use std::{fmt, error};
81#[cfg(feature = "mio")]
82use mio::deprecated::{EventLoop, NotifyError};
83#[cfg(feature = "mio")]
84use mio::Token;
85
86thread_local! {
87	/// Stack size
88	/// Should be modified if it is changed in Rust since it is no way
89	/// to know or get it
90	pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
91}
92
93#[derive(Debug)]
94/// IO Error
95pub enum IoError {
96	/// Low level error from mio crate
97	#[cfg(feature = "mio")]
98	Mio(::std::io::Error),
99	/// Error concerning the Rust standard library's IO subsystem.
100	StdIo(::std::io::Error),
101}
102
103impl fmt::Display for IoError {
104	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105		// just defer to the std implementation for now.
106		// we can refine the formatting when more variants are added.
107		match *self {
108			#[cfg(feature = "mio")]
109			IoError::Mio(ref std_err) => std_err.fmt(f),
110			IoError::StdIo(ref std_err) => std_err.fmt(f),
111		}
112	}
113}
114
115impl error::Error for IoError {
116	fn description(&self) -> &str {
117		"IO error"
118	}
119}
120
121impl From<::std::io::Error> for IoError {
122	fn from(err: ::std::io::Error) -> IoError {
123		IoError::StdIo(err)
124	}
125}
126
127#[cfg(feature = "mio")]
128impl<Message> From<NotifyError<service_mio::IoMessage<Message>>> for IoError where Message: Send {
129	fn from(_err: NotifyError<service_mio::IoMessage<Message>>) -> IoError {
130		IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
131	}
132}
133
134/// Generic IO handler.
135/// All the handler function are called from within IO event loop.
136/// `Message` type is used as notification data
137pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + 'static {
138	/// Initialize the handler
139	fn initialize(&self, _io: &IoContext<Message>) {}
140	/// Timer function called after a timeout created with `HandlerIo::timeout`.
141	fn timeout(&self, _io: &IoContext<Message>, _timer: TimerToken) {}
142	/// Called when a broadcasted message is received. The message can only be sent from a different IO handler.
143	fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
144	/// Called when an IO stream gets closed
145	#[cfg(feature = "mio")]
146	fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
147	/// Called when an IO stream can be read from
148	#[cfg(feature = "mio")]
149	fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
150	/// Called when an IO stream can be written to
151	#[cfg(feature = "mio")]
152	fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
153	/// Register a new stream with the event loop
154	#[cfg(feature = "mio")]
155	fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
156	/// Re-register a stream with the event loop
157	#[cfg(feature = "mio")]
158	fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
159	/// Deregister a stream. Called when a stream is removed from the event loop
160	#[cfg(feature = "mio")]
161	fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
162}
163
164#[cfg(feature = "mio")]
165pub use service_mio::{TimerToken, StreamToken, IoContext, IoService, IoChannel, IoManager, TOKENS_PER_HANDLER};
166#[cfg(not(feature = "mio"))]
167pub use crate::service_non_mio::{TimerToken, IoContext, IoService, IoChannel, TOKENS_PER_HANDLER};
168
169#[cfg(test)]
170mod tests {
171	use std::{
172		sync::{Arc, atomic},
173		thread,
174		time::Duration,
175	};
176	use super::*;
177
178	// Mio's behaviour is too unstable for this test. Sometimes we have to wait a few milliseconds,
179	// sometimes more than 5 seconds for the message to arrive.
180	// Therefore we ignore this test in order to not have spurious failure when running continuous
181	// integration.
182	#[test]
183	#[cfg_attr(feature = "mio", ignore)]
184	fn send_message_to_handler() {
185		struct MyHandler(atomic::AtomicBool);
186
187		#[derive(Clone)]
188		struct MyMessage {
189			data: u32
190		}
191
192		impl IoHandler<MyMessage> for MyHandler {
193			fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
194				assert_eq!(message.data, 5);
195				self.0.store(true, atomic::Ordering::SeqCst);
196			}
197		}
198
199		let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
200
201		let service = IoService::<MyMessage>::start().expect("Error creating network service");
202		service.register_handler(handler.clone()).unwrap();
203
204		service.send_message(MyMessage { data: 5 }).unwrap();
205
206		thread::sleep(Duration::from_secs(1));
207		assert!(handler.0.load(atomic::Ordering::SeqCst));
208	}
209
210	#[test]
211	fn timeout_working() {
212		struct MyHandler(atomic::AtomicBool);
213
214		#[derive(Clone)]
215		struct MyMessage {
216			data: u32
217		}
218
219		impl IoHandler<MyMessage> for MyHandler {
220			fn initialize(&self, io: &IoContext<MyMessage>) {
221				io.register_timer_once(1234, Duration::from_millis(500)).unwrap();
222			}
223
224			fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
225				assert_eq!(timer, 1234);
226				assert!(!self.0.swap(true, atomic::Ordering::SeqCst));
227			}
228		}
229
230		let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
231
232		let service = IoService::<MyMessage>::start().expect("Error creating network service");
233		service.register_handler(handler.clone()).unwrap();
234
235		thread::sleep(Duration::from_secs(2));
236		assert!(handler.0.load(atomic::Ordering::SeqCst));
237	}
238
239	#[test]
240	fn multi_timeout_working() {
241		struct MyHandler(atomic::AtomicUsize);
242
243		#[derive(Clone)]
244		struct MyMessage {
245			data: u32
246		}
247
248		impl IoHandler<MyMessage> for MyHandler {
249			fn initialize(&self, io: &IoContext<MyMessage>) {
250				io.register_timer(1234, Duration::from_millis(500)).unwrap();
251			}
252
253			fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
254				assert_eq!(timer, 1234);
255				self.0.fetch_add(1, atomic::Ordering::SeqCst);
256			}
257		}
258
259		let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0)));
260
261		let service = IoService::<MyMessage>::start().expect("Error creating network service");
262		service.register_handler(handler.clone()).unwrap();
263
264		thread::sleep(Duration::from_secs(2));
265		assert!(handler.0.load(atomic::Ordering::SeqCst) >= 2);
266	}
267}