susydev_jsonrpc_ipc_server/
server.rs

1#![allow(deprecated)]
2
3use std;
4use std::sync::Arc;
5
6use crate::jsonrpc::futures::sync::{mpsc, oneshot};
7use crate::jsonrpc::futures::{future, Future, Sink, Stream};
8use crate::jsonrpc::{middleware, FutureResult, MetaIoHandler, Metadata, Middleware};
9use tokio_service::{self, Service as TokioService};
10
11use crate::server_utils::{
12	codecs, reactor, session,
13	tokio::{self, reactor::Handle, runtime::TaskExecutor},
14	tokio_codec::Framed,
15};
16use parking_lot::Mutex;
17
18use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
19use crate::select_with_weak::SelectWithWeakExt;
20use susy_tokio_ipc::Endpoint;
21pub use susy_tokio_ipc::SecurityAttributes;
22
23/// IPC server session
24pub struct Service<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
25	handler: Arc<MetaIoHandler<M, S>>,
26	meta: M,
27}
28
29impl<M: Metadata, S: Middleware<M>> Service<M, S> {
30	/// Create new IPC server session with given handler and metadata.
31	pub fn new(handler: Arc<MetaIoHandler<M, S>>, meta: M) -> Self {
32		Service { handler, meta }
33	}
34}
35
36impl<M: Metadata, S: Middleware<M>> tokio_service::Service for Service<M, S> {
37	type Request = String;
38	type Response = Option<String>;
39
40	type Error = ();
41
42	type Future = FutureResult<S::Future, S::CallFuture>;
43
44	fn call(&self, req: Self::Request) -> Self::Future {
45		trace!(target: "ipc", "Received request: {}", req);
46		self.handler.handle_request(&req, self.meta.clone())
47	}
48}
49
50/// IPC server builder
51pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
52	handler: Arc<MetaIoHandler<M, S>>,
53	meta_extractor: Arc<MetaExtractor<M>>,
54	session_stats: Option<Arc<session::SessionStats>>,
55	executor: reactor::UninitializedExecutor,
56	incoming_separator: codecs::Separator,
57	outgoing_separator: codecs::Separator,
58	security_attributes: SecurityAttributes,
59	client_buffer_size: usize,
60}
61
62impl<M: Metadata + Default, S: Middleware<M>> ServerBuilder<M, S> {
63	/// Creates new IPC server build given the `IoHandler`.
64	pub fn new<T>(io_handler: T) -> ServerBuilder<M, S>
65	where
66		T: Into<MetaIoHandler<M, S>>,
67	{
68		Self::with_meta_extractor(io_handler, NoopExtractor)
69	}
70}
71
72impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
73	/// Creates new IPC server build given the `IoHandler` and metadata extractor.
74	pub fn with_meta_extractor<T, E>(io_handler: T, extractor: E) -> ServerBuilder<M, S>
75	where
76		T: Into<MetaIoHandler<M, S>>,
77		E: MetaExtractor<M>,
78	{
79		ServerBuilder {
80			handler: Arc::new(io_handler.into()),
81			meta_extractor: Arc::new(extractor),
82			session_stats: None,
83			executor: reactor::UninitializedExecutor::Unspawned,
84			incoming_separator: codecs::Separator::Empty,
85			outgoing_separator: codecs::Separator::default(),
86			security_attributes: SecurityAttributes::empty(),
87			client_buffer_size: 5,
88		}
89	}
90
91	/// Sets shared different event loop executor.
92	pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self {
93		self.executor = reactor::UninitializedExecutor::Shared(executor);
94		self
95	}
96
97	/// Sets session metadata extractor.
98	pub fn session_meta_extractor<X>(mut self, meta_extractor: X) -> Self
99	where
100		X: MetaExtractor<M>,
101	{
102		self.meta_extractor = Arc::new(meta_extractor);
103		self
104	}
105
106	/// Session stats
107	pub fn session_stats<T: session::SessionStats>(mut self, stats: T) -> Self {
108		self.session_stats = Some(Arc::new(stats));
109		self
110	}
111
112	/// Sets the incoming and outgoing requests separator
113	pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
114		self.incoming_separator = incoming;
115		self.outgoing_separator = outgoing;
116		self
117	}
118
119	/// Sets the security attributes for the underlying IPC socket/pipe
120	pub fn set_security_attributes(mut self, attr: SecurityAttributes) -> Self {
121		self.security_attributes = attr;
122		self
123	}
124
125	/// Sets how many concurrent requests per client can be processed at any one time. Set to 5 by default.
126	pub fn set_client_buffer_size(mut self, buffer_size: usize) -> Self {
127		self.client_buffer_size = buffer_size;
128		self
129	}
130
131	/// Creates a new server from the given endpoint.
132	pub fn start(self, path: &str) -> std::io::Result<Server> {
133		let executor = self.executor.initialize()?;
134		let rpc_handler = self.handler;
135		let endpoint_addr = path.to_owned();
136		let meta_extractor = self.meta_extractor;
137		let session_stats = self.session_stats;
138		let incoming_separator = self.incoming_separator;
139		let outgoing_separator = self.outgoing_separator;
140		let (stop_signal, stop_receiver) = oneshot::channel();
141		let (start_signal, start_receiver) = oneshot::channel();
142		let (wait_signal, wait_receiver) = oneshot::channel();
143		let security_attributes = self.security_attributes;
144		let client_buffer_size = self.client_buffer_size;
145
146		executor.spawn(future::lazy(move || {
147			let mut endpoint = Endpoint::new(endpoint_addr);
148			endpoint.set_security_attributes(security_attributes);
149
150			if cfg!(unix) {
151				// warn about existing file and remove it
152				if ::std::fs::remove_file(endpoint.path()).is_ok() {
153					warn!("Removed existing file '{}'.", endpoint.path());
154				}
155			}
156
157			let endpoint_handle = Handle::current();
158			let connections = match endpoint.incoming(&endpoint_handle) {
159				Ok(connections) => connections,
160				Err(e) => {
161					start_signal
162						.send(Err(e))
163						.expect("Cannot fail since receiver never dropped before receiving");
164					return future::Either::A(future::ok(()));
165				}
166			};
167
168			let mut id = 0u64;
169
170			let server = connections.for_each(move |(io_stream, remote_id)| {
171				id = id.wrapping_add(1);
172				let session_id = id;
173				let session_stats = session_stats.clone();
174				trace!(target: "ipc", "Accepted incoming IPC connection: {}", session_id);
175				if let Some(stats) = session_stats.as_ref() {
176					stats.open_session(session_id)
177				}
178
179				let (sender, receiver) = mpsc::channel(16);
180				let meta = meta_extractor.extract(&RequestContext {
181					endpoint_addr: &remote_id,
182					session_id,
183					sender,
184				});
185				let service = Service::new(rpc_handler.clone(), meta);
186				let (writer, reader) = Framed::new(
187					io_stream,
188					codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()),
189				)
190				.split();
191				let responses = reader
192					.map(move |req| {
193						service
194							.call(req)
195							.then(|result| match result {
196								Err(_) => future::ok(None),
197								Ok(some_result) => future::ok(some_result),
198							})
199							.map_err(|_: ()| std::io::ErrorKind::Other.into())
200					})
201					.buffer_unordered(client_buffer_size)
202					.filter_map(|x| x)
203					// we use `select_with_weak` here, instead of `select`, to close the stream
204					// as soon as the ipc pipe is closed
205					.select_with_weak(receiver.map_err(|e| {
206						warn!(target: "ipc", "Notification error: {:?}", e);
207						std::io::ErrorKind::Other.into()
208					}));
209
210				let writer = writer.send_all(responses).then(move |_| {
211					trace!(target: "ipc", "Peer: service finished");
212					if let Some(stats) = session_stats.as_ref() {
213						stats.close_session(session_id)
214					}
215					Ok(())
216				});
217
218				tokio::spawn(writer);
219
220				Ok(())
221			});
222			start_signal
223				.send(Ok(()))
224				.expect("Cannot fail since receiver never dropped before receiving");
225
226			let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into());
227			future::Either::B(
228				server
229					.select(stop)
230					.map(|_| {
231						let _ = wait_signal.send(());
232					})
233					.map_err(|_| ()),
234			)
235		}));
236
237		let handle = InnerHandles {
238			executor: Some(executor),
239			stop: Some(stop_signal),
240			path: path.to_owned(),
241		};
242
243		match start_receiver.wait().expect("Message should always be sent") {
244			Ok(()) => Ok(Server {
245				handles: Arc::new(Mutex::new(handle)),
246				wait_handle: Some(wait_receiver),
247			}),
248			Err(e) => Err(e),
249		}
250	}
251}
252
253/// IPC Server handle
254#[derive(Debug)]
255pub struct Server {
256	handles: Arc<Mutex<InnerHandles>>,
257	wait_handle: Option<oneshot::Receiver<()>>,
258}
259
260impl Server {
261	/// Closes the server (waits for finish)
262	pub fn close(self) {
263		self.handles.lock().close();
264	}
265
266	/// Creates a close handle that can be used to stop the server remotely
267	pub fn close_handle(&self) -> CloseHandle {
268		CloseHandle {
269			inner: self.handles.clone(),
270		}
271	}
272
273	/// Wait for the server to finish
274	pub fn wait(mut self) {
275		self.wait_handle.take().map(|wait_receiver| wait_receiver.wait());
276	}
277}
278
279#[derive(Debug)]
280struct InnerHandles {
281	executor: Option<reactor::Executor>,
282	stop: Option<oneshot::Sender<()>>,
283	path: String,
284}
285
286impl InnerHandles {
287	pub fn close(&mut self) {
288		let _ = self.stop.take().map(|stop| stop.send(()));
289		if let Some(executor) = self.executor.take() {
290			executor.close()
291		}
292		let _ = ::std::fs::remove_file(&self.path); // ignore error, file could have been gone somewhere
293	}
294}
295
296impl Drop for InnerHandles {
297	fn drop(&mut self) {
298		self.close();
299	}
300}
301/// `CloseHandle` allows one to stop an `IpcServer` remotely.
302#[derive(Clone)]
303pub struct CloseHandle {
304	inner: Arc<Mutex<InnerHandles>>,
305}
306
307impl CloseHandle {
308	/// `close` closes the corresponding `IpcServer` instance.
309	pub fn close(self) {
310		self.inner.lock().close();
311	}
312}
313
314#[cfg(test)]
315#[cfg(not(windows))]
316mod tests {
317	use tokio_uds;
318
319	use self::tokio_uds::UnixStream;
320	use super::SecurityAttributes;
321	use super::{Server, ServerBuilder};
322	use crate::jsonrpc::futures::sync::{mpsc, oneshot};
323	use crate::jsonrpc::futures::{future, Future, Sink, Stream};
324	use crate::jsonrpc::{MetaIoHandler, Value};
325	use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
326	use crate::server_utils::codecs;
327	use crate::server_utils::{
328		tokio::{self, timer::Delay},
329		tokio_codec::Decoder,
330	};
331	use parking_lot::Mutex;
332	use std::sync::Arc;
333	use std::thread;
334	use std::time;
335	use std::time::{Duration, Instant};
336
337	fn server_builder() -> ServerBuilder {
338		let mut io = MetaIoHandler::<()>::default();
339		io.add_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
340		ServerBuilder::new(io)
341	}
342
343	fn run(path: &str) -> Server {
344		let builder = server_builder();
345		let server = builder.start(path).expect("Server must run with no issues");
346		server
347	}
348
349	fn dummy_request_str(path: &str, data: &str) -> String {
350		let stream_future = UnixStream::connect(path);
351		let reply = stream_future.and_then(|stream| {
352			let stream = codecs::StreamCodec::stream_incoming().framed(stream);
353			let reply = stream
354				.send(data.to_owned())
355				.and_then(move |stream| stream.into_future().map_err(|(err, _)| err))
356				.and_then(|(reply, _)| future::ok(reply.expect("there should be one reply")));
357			reply
358		});
359
360		reply.wait().expect("wait for reply")
361	}
362
363	#[test]
364	fn start() {
365		crate::logger::init_log();
366
367		let mut io = MetaIoHandler::<()>::default();
368		io.add_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
369		let server = ServerBuilder::new(io);
370
371		let _server = server
372			.start("/tmp/test-ipc-20000")
373			.expect("Server must run with no issues");
374	}
375
376	#[test]
377	fn connect() {
378		crate::logger::init_log();
379		let path = "/tmp/test-ipc-30000";
380		let _server = run(path);
381
382		UnixStream::connect(path).wait().expect("Socket should connect");
383	}
384
385	#[test]
386	fn request() {
387		crate::logger::init_log();
388		let path = "/tmp/test-ipc-40000";
389		let server = run(path);
390		let (stop_signal, stop_receiver) = oneshot::channel();
391
392		let t = thread::spawn(move || {
393			let result = dummy_request_str(
394				path,
395				"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
396			);
397			stop_signal.send(result).unwrap();
398		});
399		t.join().unwrap();
400
401		let _ = stop_receiver
402			.map(move |result: String| {
403				assert_eq!(
404					result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
405					"Response does not exactly match the expected response",
406				);
407				server.close();
408			})
409			.wait();
410	}
411
412	#[test]
413	fn req_parallel() {
414		crate::logger::init_log();
415		let path = "/tmp/test-ipc-45000";
416		let server = run(path);
417		let (stop_signal, stop_receiver) = mpsc::channel(400);
418
419		let mut handles = Vec::new();
420		for _ in 0..4 {
421			let path = path.clone();
422			let mut stop_signal = stop_signal.clone();
423			handles.push(thread::spawn(move || {
424				for _ in 0..100 {
425					let result = dummy_request_str(
426						&path,
427						"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
428					);
429					stop_signal.try_send(result).unwrap();
430				}
431			}));
432		}
433
434		for handle in handles.drain(..) {
435			handle.join().unwrap();
436		}
437
438		let _ = stop_receiver
439			.map(|result| {
440				assert_eq!(
441					result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
442					"Response does not exactly match the expected response",
443				);
444			})
445			.take(400)
446			.collect()
447			.wait();
448		server.close();
449	}
450
451	#[test]
452	fn close() {
453		crate::logger::init_log();
454		let path = "/tmp/test-ipc-50000";
455		let server = run(path);
456		server.close();
457
458		assert!(
459			::std::fs::metadata(path).is_err(),
460			"There should be no socket file left"
461		);
462		assert!(
463			UnixStream::connect(path).wait().is_err(),
464			"Connection to the closed socket should fail"
465		);
466	}
467
468	fn huge_response_test_str() -> String {
469		let mut result = String::from("begin_hello");
470		result.push_str("begin_hello");
471		for _ in 0..16384 {
472			result.push(' ');
473		}
474		result.push_str("end_hello");
475		result
476	}
477
478	fn huge_response_test_json() -> String {
479		let mut result = String::from("{\"jsonrpc\":\"2.0\",\"result\":\"");
480		result.push_str(&huge_response_test_str());
481		result.push_str("\",\"id\":1}");
482
483		result
484	}
485
486	#[test]
487	fn test_huge_response() {
488		crate::logger::init_log();
489		let path = "/tmp/test-ipc-60000";
490
491		let mut io = MetaIoHandler::<()>::default();
492		io.add_method("say_huge_hello", |_params| Ok(Value::String(huge_response_test_str())));
493		let builder = ServerBuilder::new(io);
494
495		let server = builder.start(path).expect("Server must run with no issues");
496		let (stop_signal, stop_receiver) = oneshot::channel();
497
498		let t = thread::spawn(move || {
499			let result = dummy_request_str(
500				&path,
501				"{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
502			);
503
504			stop_signal.send(result).unwrap();
505		});
506		t.join().unwrap();
507
508		let _ = stop_receiver
509			.map(move |result: String| {
510				assert_eq!(
511					result,
512					huge_response_test_json(),
513					"Response does not exactly match the expected response",
514				);
515				server.close();
516			})
517			.wait();
518	}
519
520	#[test]
521	fn test_session_end() {
522		struct SessionEndMeta {
523			drop_signal: Option<oneshot::Sender<()>>,
524		}
525
526		impl Drop for SessionEndMeta {
527			fn drop(&mut self) {
528				trace!(target: "ipc", "Dropping session meta");
529				self.drop_signal.take().unwrap().send(()).unwrap()
530			}
531		}
532
533		struct SessionEndExtractor {
534			drop_receivers: Arc<Mutex<mpsc::Sender<oneshot::Receiver<()>>>>,
535		}
536
537		impl MetaExtractor<Arc<SessionEndMeta>> for SessionEndExtractor {
538			fn extract(&self, _context: &RequestContext) -> Arc<SessionEndMeta> {
539				let (signal, receiver) = oneshot::channel();
540				self.drop_receivers.lock().try_send(receiver).unwrap();
541				let meta = SessionEndMeta {
542					drop_signal: Some(signal),
543				};
544				Arc::new(meta)
545			}
546		}
547
548		crate::logger::init_log();
549		let path = "/tmp/test-ipc-30009";
550		let (signal, receiver) = mpsc::channel(16);
551		let session_metadata_extractor = SessionEndExtractor {
552			drop_receivers: Arc::new(Mutex::new(signal)),
553		};
554
555		let io = MetaIoHandler::<Arc<SessionEndMeta>>::default();
556		let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor);
557		let server = builder.start(path).expect("Server must run with no issues");
558		{
559			let _ = UnixStream::connect(path).wait().expect("Socket should connect");
560		}
561
562		receiver
563			.into_future()
564			.map_err(|_| ())
565			.and_then(|drop_receiver| drop_receiver.0.unwrap().map_err(|_| ()))
566			.wait()
567			.unwrap();
568		server.close();
569	}
570
571	#[test]
572	fn close_handle() {
573		crate::logger::init_log();
574		let path = "/tmp/test-ipc-90000";
575		let server = run(path);
576		let handle = server.close_handle();
577		handle.close();
578		assert!(
579			UnixStream::connect(path).wait().is_err(),
580			"Connection to the closed socket should fail"
581		);
582	}
583
584	#[test]
585	fn close_when_waiting() {
586		crate::logger::init_log();
587		let path = "/tmp/test-ipc-70000";
588		let server = run(path);
589		let close_handle = server.close_handle();
590		let (tx, rx) = oneshot::channel();
591
592		thread::spawn(move || {
593			thread::sleep(time::Duration::from_millis(100));
594			close_handle.close();
595		});
596		thread::spawn(move || {
597			server.wait();
598			tx.send(true).expect("failed to report that the server has stopped");
599		});
600
601		let delay = Delay::new(Instant::now() + Duration::from_millis(500))
602			.map(|_| false)
603			.map_err(|err| panic!("{:?}", err));
604
605		let result_fut = rx.map_err(|_| ()).select(delay).then(move |result| match result {
606			Ok((result, _)) => {
607				assert_eq!(result, true, "Wait timeout exceeded");
608				assert!(
609					UnixStream::connect(path).wait().is_err(),
610					"Connection to the closed socket should fail"
611				);
612				Ok(())
613			}
614			Err(_) => Err(()),
615		});
616
617		tokio::run(result_fut);
618	}
619
620	#[test]
621	fn runs_with_security_attributes() {
622		let path = "/tmp/test-ipc-9001";
623		let io = MetaIoHandler::<Arc<()>>::default();
624		ServerBuilder::with_meta_extractor(io, NoopExtractor)
625			.set_security_attributes(SecurityAttributes::empty())
626			.start(path)
627			.expect("Server must run with no issues");
628	}
629}