tetsy_jsonrpc_tcp_server/
server.rs

1use std;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use tokio_service::Service as TokioService;
6
7use crate::jsonrpc::futures::sync::{mpsc, oneshot};
8use crate::jsonrpc::futures::{future, Future, Sink, Stream};
9use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
10use crate::server_utils::{codecs, reactor, tokio, tokio_codec::Framed, SuspendableStream};
11
12use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels};
13use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
14use crate::service::Service;
15
16/// TCP server builder
17pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
18	executor: reactor::UninitializedExecutor,
19	handler: Arc<MetaIoHandler<M, S>>,
20	meta_extractor: Arc<dyn MetaExtractor<M>>,
21	channels: Arc<SenderChannels>,
22	incoming_separator: codecs::Separator,
23	outgoing_separator: codecs::Separator,
24}
25
26impl<M: Metadata + Default, S: Middleware<M> + 'static> ServerBuilder<M, S> {
27	/// Creates new `ServerBuilder` wih given `IoHandler`
28	pub fn new<T>(handler: T) -> Self
29	where
30		T: Into<MetaIoHandler<M, S>>,
31	{
32		Self::with_meta_extractor(handler, NoopExtractor)
33	}
34}
35
36impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S> {
37	/// Creates new `ServerBuilder` wih given `IoHandler`
38	pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
39	where
40		T: Into<MetaIoHandler<M, S>>,
41		E: MetaExtractor<M> + 'static,
42	{
43		ServerBuilder {
44			executor: reactor::UninitializedExecutor::Unspawned,
45			handler: Arc::new(handler.into()),
46			meta_extractor: Arc::new(extractor),
47			channels: Default::default(),
48			incoming_separator: Default::default(),
49			outgoing_separator: Default::default(),
50		}
51	}
52
53	/// Utilize existing event loop executor.
54	pub fn event_loop_executor(mut self, handle: tokio::runtime::TaskExecutor) -> Self {
55		self.executor = reactor::UninitializedExecutor::Shared(handle);
56		self
57	}
58
59	/// Sets session meta extractor
60	pub fn session_meta_extractor<T: MetaExtractor<M> + 'static>(mut self, meta_extractor: T) -> Self {
61		self.meta_extractor = Arc::new(meta_extractor);
62		self
63	}
64
65	/// Sets the incoming and outgoing requests separator
66	pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
67		self.incoming_separator = incoming;
68		self.outgoing_separator = outgoing;
69		self
70	}
71
72	/// Starts a new server
73	pub fn start(self, addr: &SocketAddr) -> std::io::Result<Server> {
74		let meta_extractor = self.meta_extractor.clone();
75		let rpc_handler = self.handler.clone();
76		let channels = self.channels.clone();
77		let incoming_separator = self.incoming_separator;
78		let outgoing_separator = self.outgoing_separator;
79		let address = addr.to_owned();
80		let (tx, rx) = std::sync::mpsc::channel();
81		let (stop_tx, stop_rx) = oneshot::channel();
82
83		let executor = self.executor.initialize()?;
84
85		executor.spawn(future::lazy(move || {
86			let start = move || {
87				let listener = tokio::net::TcpListener::bind(&address)?;
88				let connections = SuspendableStream::new(listener.incoming());
89
90				let server = connections.map(move |socket| {
91					let peer_addr = match socket.peer_addr() {
92						Ok(addr) => addr,
93						Err(e) => {
94							warn!(target: "tcp", "Unable to determine socket peer address, ignoring connection {}", e);
95							return future::Either::A(future::ok(()));
96						}
97					};
98					trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr);
99					let (sender, receiver) = mpsc::channel(65536);
100
101					let context = RequestContext {
102						peer_addr,
103						sender: sender.clone(),
104					};
105
106					let meta = meta_extractor.extract(&context);
107					let service = Service::new(peer_addr, rpc_handler.clone(), meta);
108					let (writer, reader) = Framed::new(
109						socket,
110						codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()),
111					)
112					.split();
113
114					let responses = reader.and_then(move |req| {
115						service.call(req).then(|response| match response {
116							Err(e) => {
117								warn!(target: "tcp", "Error while processing request: {:?}", e);
118								future::ok(String::new())
119							}
120							Ok(None) => {
121								trace!(target: "tcp", "JSON RPC request produced no response");
122								future::ok(String::new())
123							}
124							Ok(Some(response_data)) => {
125								trace!(target: "tcp", "Sent response: {}", &response_data);
126								future::ok(response_data)
127							}
128						})
129					});
130
131					let peer_message_queue = {
132						let mut channels = channels.lock();
133						channels.insert(peer_addr, sender.clone());
134
135						PeerMessageQueue::new(responses, receiver, peer_addr)
136					};
137
138					let shared_channels = channels.clone();
139					let writer = writer.send_all(peer_message_queue).then(move |_| {
140						trace!(target: "tcp", "Peer {}: service finished", peer_addr);
141						let mut channels = shared_channels.lock();
142						channels.remove(&peer_addr);
143						Ok(())
144					});
145
146					future::Either::B(writer)
147				});
148
149				Ok(server)
150			};
151
152			let stop = stop_rx.map_err(|_| ());
153			match start() {
154				Ok(server) => {
155					tx.send(Ok(())).expect("Rx is blocking parent thread.");
156					future::Either::A(
157						server
158							.buffer_unordered(1024)
159							.for_each(|_| Ok(()))
160							.select(stop)
161							.map(|_| ())
162							.map_err(|(e, _)| {
163								error!("Error while executing the server: {:?}", e);
164							}),
165					)
166				}
167				Err(e) => {
168					tx.send(Err(e)).expect("Rx is blocking parent thread.");
169					future::Either::B(stop.map_err(|e| {
170						error!("Error while executing the server: {:?}", e);
171					}))
172				}
173			}
174		}));
175
176		let res = rx.recv().expect("Response is always sent before tx is dropped.");
177
178		res.map(|_| Server {
179			executor: Some(executor),
180			stop: Some(stop_tx),
181		})
182	}
183
184	/// Returns dispatcher
185	pub fn dispatcher(&self) -> Dispatcher {
186		Dispatcher::new(self.channels.clone())
187	}
188}
189
190/// TCP Server handle
191pub struct Server {
192	executor: Option<reactor::Executor>,
193	stop: Option<oneshot::Sender<()>>,
194}
195
196impl Server {
197	/// Closes the server (waits for finish)
198	pub fn close(mut self) {
199		let _ = self.stop.take().map(|sg| sg.send(()));
200		self.executor.take().unwrap().close();
201	}
202
203	/// Wait for the server to finish
204	pub fn wait(mut self) {
205		self.executor.take().unwrap().wait();
206	}
207}
208
209impl Drop for Server {
210	fn drop(&mut self) {
211		let _ = self.stop.take().map(|sg| sg.send(()));
212		if let Some(executor) = self.executor.take() {
213			executor.close()
214		}
215	}
216}
217
218#[cfg(test)]
219mod tests {
220	use super::*;
221
222	#[test]
223	fn server_is_send_and_sync() {
224		fn is_send_and_sync<T: Send + Sync>() {}
225
226		is_send_and_sync::<Server>();
227	}
228}