rs_jsonrpc_tcp_server/
server.rs

1use std;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use tokio_service::Service as TokioService;
6
7use jsonrpc::{MetaIoHandler, Metadata, Middleware, NoopMiddleware};
8use jsonrpc::futures::{future, Future, Stream, Sink};
9use jsonrpc::futures::sync::{mpsc, oneshot};
10use server_utils::{reactor, tokio_core, codecs};
11use server_utils::tokio_io::AsyncRead;
12
13use dispatch::{Dispatcher, SenderChannels, PeerMessageQueue};
14use meta::{MetaExtractor, RequestContext, NoopExtractor};
15use service::Service;
16
17/// TCP server builder
18pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
19	remote: reactor::UninitializedRemote,
20	handler: Arc<MetaIoHandler<M, S>>,
21	meta_extractor: Arc<MetaExtractor<M>>,
22	channels: Arc<SenderChannels>,
23	incoming_separator: codecs::Separator, 
24	outgoing_separator: codecs::Separator,
25}
26
27impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S> {
28	/// Creates new `SeverBuilder` wih given `IoHandler`
29	pub fn new<T>(handler: T) -> Self where
30		T: Into<MetaIoHandler<M, S>>
31	{
32		ServerBuilder {
33			remote: reactor::UninitializedRemote::Unspawned,
34			handler: Arc::new(handler.into()),
35			meta_extractor: Arc::new(NoopExtractor),
36			channels: Default::default(),
37			incoming_separator: Default::default(),
38			outgoing_separator: Default::default(),
39		}
40	}
41
42	/// Utilize existing event loop remote.
43	pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self {
44		self.remote = reactor::UninitializedRemote::Shared(remote);
45		self
46	}
47
48	/// Sets session meta extractor
49	pub fn session_meta_extractor<T: MetaExtractor<M> + 'static>(mut self, meta_extractor: T) -> Self {
50		self.meta_extractor = Arc::new(meta_extractor);
51		self
52	}
53
54	/// Sets the incoming and outgoing requests separator
55	pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
56		self.incoming_separator = incoming;
57		self.outgoing_separator = outgoing;
58		self
59	}
60
61	/// Starts a new server
62	pub fn start(self, addr: &SocketAddr) -> std::io::Result<Server> {
63		let meta_extractor = self.meta_extractor.clone();
64		let rpc_handler = self.handler.clone();
65		let channels = self.channels.clone();
66		let incoming_separator = self.incoming_separator;
67		let outgoing_separator = self.outgoing_separator;
68		let address = addr.to_owned();
69		let (tx, rx) = std::sync::mpsc::channel();
70		let (signal, stop) = oneshot::channel();
71
72		let remote = self.remote.initialize()?;
73
74		remote.remote().spawn(move |handle| {
75			let start = move || {
76				let listener = tokio_core::net::TcpListener::bind(&address, handle)?;
77				let connections = listener.incoming();
78				let remote = handle.remote().clone();
79				let server = connections.for_each(move |(socket, peer_addr)| {
80					trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr);
81					let (sender, receiver) = mpsc::channel(65536);
82
83					let context = RequestContext {
84						peer_addr: peer_addr,
85						sender: sender.clone(),
86					};
87
88					let meta = meta_extractor.extract(&context);
89					let service = Service::new(peer_addr, rpc_handler.clone(), meta);
90					let (writer, reader) = socket.framed(
91						codecs::StreamCodec::new(
92							incoming_separator.clone(),
93							outgoing_separator.clone(),
94						)
95					).split();
96
97					let responses = reader.and_then(
98						move |req| service.call(req).then(|response| match response {
99							Err(e) => {
100								warn!(target: "tcp", "Error while processing request: {:?}", e);
101								future::ok(String::new())
102							},
103							Ok(None) => {
104								trace!(target: "tcp", "JSON RPC request produced no response");
105								future::ok(String::new())
106							},
107							Ok(Some(response_data)) => {
108								trace!(target: "tcp", "Sent response: {}", &response_data);
109								future::ok(response_data)
110							}
111						})
112					);
113
114					let peer_message_queue = {
115						let mut channels = channels.lock();
116						channels.insert(peer_addr.clone(), sender.clone());
117
118						PeerMessageQueue::new(
119							responses,
120							receiver,
121							peer_addr.clone(),
122						)
123					};
124
125					let shared_channels = channels.clone();
126					let writer = writer.send_all(peer_message_queue).then(move |_| {
127						trace!(target: "tcp", "Peer {}: service finished", peer_addr);
128						let mut channels = shared_channels.lock();
129						channels.remove(&peer_addr);
130						Ok(())
131					});
132
133					remote.spawn(|_| writer);
134
135					Ok(())
136				});
137
138				Ok(server)
139			};
140
141			let stop = stop.map_err(|_| std::io::ErrorKind::Interrupted.into());
142			match start() {
143				Ok(server) => {
144					tx.send(Ok(())).expect("Rx is blocking parent thread.");
145					future::Either::A(server.select(stop)
146						.map(|_| ())
147						.map_err(|(e, _)| {
148							error!("Error while executing the server: {:?}", e);
149						}))
150				},
151				Err(e) => {
152					tx.send(Err(e)).expect("Rx is blocking parent thread.");
153					future::Either::B(stop
154						.map_err(|e| {
155							error!("Error while executing the server: {:?}", e);
156						}))
157				},
158			}
159		});
160
161		let res = rx.recv().expect("Response is always sent before tx is dropped.");
162
163		res.map(|_| Server {
164			remote: Some(remote),
165			stop: Some(signal),
166		})
167	}
168
169	/// Returns dispatcher
170	pub fn dispatcher(&self) -> Dispatcher {
171		Dispatcher::new(self.channels.clone())
172	}
173}
174
175/// TCP Server handle
176pub struct Server {
177	remote: Option<reactor::Remote>,
178	stop: Option<oneshot::Sender<()>>,
179}
180
181impl Server {
182	/// Closes the server (waits for finish)
183	pub fn close(mut self) {
184		let _ = self.stop.take().map(|sg| sg.send(()));
185		self.remote.take().unwrap().close();
186	}
187
188	/// Wait for the server to finish
189	pub fn wait(mut self) {
190		self.remote.take().unwrap().wait();
191	}
192}
193
194impl Drop for Server {
195	fn drop(&mut self) {
196		let _ = self.stop.take().map(|sg| sg.send(()));
197		self.remote.take().map(|remote| remote.close());
198	}
199}