bitconch_jsonrpc_tcp_server/
server.rs

1use std;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use tokio_service::Service as TokioService;
6
7use jsonrpc::{MetaIoHandler, Metadata, Middleware, NoopMiddleware};
8use jsonrpc::futures::{future, Future, Stream, Sink};
9use jsonrpc::futures::sync::{mpsc, oneshot};
10use server_utils::{
11	tokio_codec::Framed,
12	tokio, reactor, codecs,
13};
14
15use dispatch::{Dispatcher, SenderChannels, PeerMessageQueue};
16use meta::{MetaExtractor, RequestContext, NoopExtractor};
17use service::Service;
18
19/// TCP server builder
20pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
21	executor: reactor::UninitializedExecutor,
22	handler: Arc<MetaIoHandler<M, S>>,
23	meta_extractor: Arc<MetaExtractor<M>>,
24	channels: Arc<SenderChannels>,
25	incoming_separator: codecs::Separator,
26	outgoing_separator: codecs::Separator,
27}
28
29impl<M: Metadata + Default, S: Middleware<M> + 'static> ServerBuilder<M, S> {
30	/// Creates new `ServerBuilder` wih given `IoHandler`
31	pub fn new<T>(handler: T) -> Self where
32		T: Into<MetaIoHandler<M, S>>,
33	{
34		Self::with_meta_extractor(handler, NoopExtractor)
35	}
36}
37
38impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S> {
39	/// Creates new `ServerBuilder` wih given `IoHandler`
40	pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self where
41		T: Into<MetaIoHandler<M, S>>,
42		E: MetaExtractor<M> + 'static,
43	{
44		ServerBuilder {
45			executor: reactor::UninitializedExecutor::Unspawned,
46			handler: Arc::new(handler.into()),
47			meta_extractor: Arc::new(extractor),
48			channels: Default::default(),
49			incoming_separator: Default::default(),
50			outgoing_separator: Default::default(),
51		}
52	}
53
54	/// Utilize existing event loop executor.
55	pub fn event_loop_executor(mut self, handle: tokio::runtime::TaskExecutor) -> Self {
56		self.executor = reactor::UninitializedExecutor::Shared(handle);
57		self
58	}
59
60	/// Sets session meta extractor
61	pub fn session_meta_extractor<T: MetaExtractor<M> + 'static>(mut self, meta_extractor: T) -> Self {
62		self.meta_extractor = Arc::new(meta_extractor);
63		self
64	}
65
66	/// Sets the incoming and outgoing requests separator
67	pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
68		self.incoming_separator = incoming;
69		self.outgoing_separator = outgoing;
70		self
71	}
72
73	/// Starts a new server
74	pub fn start(self, addr: &SocketAddr) -> std::io::Result<Server> {
75		let meta_extractor = self.meta_extractor.clone();
76		let rpc_handler = self.handler.clone();
77		let channels = self.channels.clone();
78		let incoming_separator = self.incoming_separator;
79		let outgoing_separator = self.outgoing_separator;
80		let address = addr.to_owned();
81		let (tx, rx) = std::sync::mpsc::channel();
82		let (stop_tx, stop_rx) = oneshot::channel();
83
84		let executor = self.executor.initialize()?;
85
86		executor.spawn(future::lazy(move || {
87			let start = move || {
88				let listener = tokio::net::TcpListener::bind(&address)?;
89				let connections = listener.incoming();
90
91				let server = connections.for_each(move |socket| {
92					let peer_addr = socket.peer_addr().expect("Unable to determine socket peer address");
93					trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr);
94					let (sender, receiver) = mpsc::channel(65536);
95
96					let context = RequestContext {
97						peer_addr: peer_addr,
98						sender: sender.clone(),
99					};
100
101					let meta = meta_extractor.extract(&context);
102					let service = Service::new(peer_addr, rpc_handler.clone(), meta);
103					let (writer, reader) = Framed::new(
104		                socket,
105		                codecs::StreamCodec::new(
106		                    incoming_separator.clone(),
107		                    outgoing_separator.clone(),
108		                ),
109		            ).split();
110
111					let responses = reader.and_then(
112						move |req| service.call(req).then(|response| match response {
113							Err(e) => {
114								warn!(target: "tcp", "Error while processing request: {:?}", e);
115								future::ok(String::new())
116							},
117							Ok(None) => {
118								trace!(target: "tcp", "JSON RPC request produced no response");
119								future::ok(String::new())
120							},
121							Ok(Some(response_data)) => {
122								trace!(target: "tcp", "Sent response: {}", &response_data);
123								future::ok(response_data)
124							}
125						})
126					);
127
128					let peer_message_queue = {
129						let mut channels = channels.lock();
130						channels.insert(peer_addr.clone(), sender.clone());
131
132						PeerMessageQueue::new(
133							responses,
134							receiver,
135							peer_addr.clone(),
136						)
137					};
138
139					let shared_channels = channels.clone();
140					let writer = writer.send_all(peer_message_queue).then(move |_| {
141						trace!(target: "tcp", "Peer {}: service finished", peer_addr);
142						let mut channels = shared_channels.lock();
143						channels.remove(&peer_addr);
144						Ok(())
145					});
146
147					tokio::spawn(writer);
148
149					Ok(())
150				});
151
152				Ok(server)
153			};
154
155			let stop = stop_rx.map_err(|_| std::io::ErrorKind::Interrupted.into());
156			match start() {
157				Ok(server) => {
158					tx.send(Ok(())).expect("Rx is blocking parent thread.");
159					future::Either::A(server.select(stop)
160						.map(|_| ())
161						.map_err(|(e, _)| {
162							error!("Error while executing the server: {:?}", e);
163						}))
164				},
165				Err(e) => {
166					tx.send(Err(e)).expect("Rx is blocking parent thread.");
167					future::Either::B(stop
168						.map_err(|e| {
169							error!("Error while executing the server: {:?}", e);
170						}))
171				},
172			}
173		}));
174
175		let res = rx.recv().expect("Response is always sent before tx is dropped.");
176
177		res.map(|_| Server {
178			executor: Some(executor),
179			stop: Some(stop_tx),
180		})
181	}
182
183	/// Returns dispatcher
184	pub fn dispatcher(&self) -> Dispatcher {
185		Dispatcher::new(self.channels.clone())
186	}
187}
188
189/// TCP Server handle
190pub struct Server {
191	executor: Option<reactor::Executor>,
192	stop: Option<oneshot::Sender<()>>,
193}
194
195impl Server {
196	/// Closes the server (waits for finish)
197	pub fn close(mut self) {
198		let _ = self.stop.take().map(|sg| sg.send(()));
199		self.executor.take().unwrap().close();
200	}
201
202	/// Wait for the server to finish
203	pub fn wait(mut self) {
204		self.executor.take().unwrap().wait();
205	}
206}
207
208impl Drop for Server {
209	fn drop(&mut self) {
210		let _ = self.stop.take().map(|sg| sg.send(()));
211		self.executor.take().map(|executor| executor.close());
212	}
213}