1#![allow(deprecated)]
2
3use std;
4use std::sync::Arc;
5
6use crate::jsonrpc::futures::sync::{mpsc, oneshot};
7use crate::jsonrpc::futures::{future, Future, Sink, Stream};
8use crate::jsonrpc::{middleware, FutureResult, MetaIoHandler, Metadata, Middleware};
9use tokio_service::{self, Service as TokioService};
10
11use crate::server_utils::{
12 codecs, reactor, session,
13 tokio::{self, reactor::Handle, runtime::TaskExecutor},
14 tokio_codec::Framed,
15};
16use parking_lot::Mutex;
17
18use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
19use crate::select_with_weak::SelectWithWeakExt;
20use susy_tokio_ipc::Endpoint;
21pub use susy_tokio_ipc::SecurityAttributes;
22
23pub struct Service<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
25 handler: Arc<MetaIoHandler<M, S>>,
26 meta: M,
27}
28
29impl<M: Metadata, S: Middleware<M>> Service<M, S> {
30 pub fn new(handler: Arc<MetaIoHandler<M, S>>, meta: M) -> Self {
32 Service { handler, meta }
33 }
34}
35
36impl<M: Metadata, S: Middleware<M>> tokio_service::Service for Service<M, S> {
37 type Request = String;
38 type Response = Option<String>;
39
40 type Error = ();
41
42 type Future = FutureResult<S::Future, S::CallFuture>;
43
44 fn call(&self, req: Self::Request) -> Self::Future {
45 trace!(target: "ipc", "Received request: {}", req);
46 self.handler.handle_request(&req, self.meta.clone())
47 }
48}
49
50pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
52 handler: Arc<MetaIoHandler<M, S>>,
53 meta_extractor: Arc<MetaExtractor<M>>,
54 session_stats: Option<Arc<session::SessionStats>>,
55 executor: reactor::UninitializedExecutor,
56 incoming_separator: codecs::Separator,
57 outgoing_separator: codecs::Separator,
58 security_attributes: SecurityAttributes,
59 client_buffer_size: usize,
60}
61
62impl<M: Metadata + Default, S: Middleware<M>> ServerBuilder<M, S> {
63 pub fn new<T>(io_handler: T) -> ServerBuilder<M, S>
65 where
66 T: Into<MetaIoHandler<M, S>>,
67 {
68 Self::with_meta_extractor(io_handler, NoopExtractor)
69 }
70}
71
72impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
73 pub fn with_meta_extractor<T, E>(io_handler: T, extractor: E) -> ServerBuilder<M, S>
75 where
76 T: Into<MetaIoHandler<M, S>>,
77 E: MetaExtractor<M>,
78 {
79 ServerBuilder {
80 handler: Arc::new(io_handler.into()),
81 meta_extractor: Arc::new(extractor),
82 session_stats: None,
83 executor: reactor::UninitializedExecutor::Unspawned,
84 incoming_separator: codecs::Separator::Empty,
85 outgoing_separator: codecs::Separator::default(),
86 security_attributes: SecurityAttributes::empty(),
87 client_buffer_size: 5,
88 }
89 }
90
91 pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self {
93 self.executor = reactor::UninitializedExecutor::Shared(executor);
94 self
95 }
96
97 pub fn session_meta_extractor<X>(mut self, meta_extractor: X) -> Self
99 where
100 X: MetaExtractor<M>,
101 {
102 self.meta_extractor = Arc::new(meta_extractor);
103 self
104 }
105
106 pub fn session_stats<T: session::SessionStats>(mut self, stats: T) -> Self {
108 self.session_stats = Some(Arc::new(stats));
109 self
110 }
111
112 pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
114 self.incoming_separator = incoming;
115 self.outgoing_separator = outgoing;
116 self
117 }
118
119 pub fn set_security_attributes(mut self, attr: SecurityAttributes) -> Self {
121 self.security_attributes = attr;
122 self
123 }
124
125 pub fn set_client_buffer_size(mut self, buffer_size: usize) -> Self {
127 self.client_buffer_size = buffer_size;
128 self
129 }
130
131 pub fn start(self, path: &str) -> std::io::Result<Server> {
133 let executor = self.executor.initialize()?;
134 let rpc_handler = self.handler;
135 let endpoint_addr = path.to_owned();
136 let meta_extractor = self.meta_extractor;
137 let session_stats = self.session_stats;
138 let incoming_separator = self.incoming_separator;
139 let outgoing_separator = self.outgoing_separator;
140 let (stop_signal, stop_receiver) = oneshot::channel();
141 let (start_signal, start_receiver) = oneshot::channel();
142 let (wait_signal, wait_receiver) = oneshot::channel();
143 let security_attributes = self.security_attributes;
144 let client_buffer_size = self.client_buffer_size;
145
146 executor.spawn(future::lazy(move || {
147 let mut endpoint = Endpoint::new(endpoint_addr);
148 endpoint.set_security_attributes(security_attributes);
149
150 if cfg!(unix) {
151 if ::std::fs::remove_file(endpoint.path()).is_ok() {
153 warn!("Removed existing file '{}'.", endpoint.path());
154 }
155 }
156
157 let endpoint_handle = Handle::current();
158 let connections = match endpoint.incoming(&endpoint_handle) {
159 Ok(connections) => connections,
160 Err(e) => {
161 start_signal
162 .send(Err(e))
163 .expect("Cannot fail since receiver never dropped before receiving");
164 return future::Either::A(future::ok(()));
165 }
166 };
167
168 let mut id = 0u64;
169
170 let server = connections.for_each(move |(io_stream, remote_id)| {
171 id = id.wrapping_add(1);
172 let session_id = id;
173 let session_stats = session_stats.clone();
174 trace!(target: "ipc", "Accepted incoming IPC connection: {}", session_id);
175 if let Some(stats) = session_stats.as_ref() {
176 stats.open_session(session_id)
177 }
178
179 let (sender, receiver) = mpsc::channel(16);
180 let meta = meta_extractor.extract(&RequestContext {
181 endpoint_addr: &remote_id,
182 session_id,
183 sender,
184 });
185 let service = Service::new(rpc_handler.clone(), meta);
186 let (writer, reader) = Framed::new(
187 io_stream,
188 codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()),
189 )
190 .split();
191 let responses = reader
192 .map(move |req| {
193 service
194 .call(req)
195 .then(|result| match result {
196 Err(_) => future::ok(None),
197 Ok(some_result) => future::ok(some_result),
198 })
199 .map_err(|_: ()| std::io::ErrorKind::Other.into())
200 })
201 .buffer_unordered(client_buffer_size)
202 .filter_map(|x| x)
203 .select_with_weak(receiver.map_err(|e| {
206 warn!(target: "ipc", "Notification error: {:?}", e);
207 std::io::ErrorKind::Other.into()
208 }));
209
210 let writer = writer.send_all(responses).then(move |_| {
211 trace!(target: "ipc", "Peer: service finished");
212 if let Some(stats) = session_stats.as_ref() {
213 stats.close_session(session_id)
214 }
215 Ok(())
216 });
217
218 tokio::spawn(writer);
219
220 Ok(())
221 });
222 start_signal
223 .send(Ok(()))
224 .expect("Cannot fail since receiver never dropped before receiving");
225
226 let stop = stop_receiver.map_err(|_| std::io::ErrorKind::Interrupted.into());
227 future::Either::B(
228 server
229 .select(stop)
230 .map(|_| {
231 let _ = wait_signal.send(());
232 })
233 .map_err(|_| ()),
234 )
235 }));
236
237 let handle = InnerHandles {
238 executor: Some(executor),
239 stop: Some(stop_signal),
240 path: path.to_owned(),
241 };
242
243 match start_receiver.wait().expect("Message should always be sent") {
244 Ok(()) => Ok(Server {
245 handles: Arc::new(Mutex::new(handle)),
246 wait_handle: Some(wait_receiver),
247 }),
248 Err(e) => Err(e),
249 }
250 }
251}
252
253#[derive(Debug)]
255pub struct Server {
256 handles: Arc<Mutex<InnerHandles>>,
257 wait_handle: Option<oneshot::Receiver<()>>,
258}
259
260impl Server {
261 pub fn close(self) {
263 self.handles.lock().close();
264 }
265
266 pub fn close_handle(&self) -> CloseHandle {
268 CloseHandle {
269 inner: self.handles.clone(),
270 }
271 }
272
273 pub fn wait(mut self) {
275 self.wait_handle.take().map(|wait_receiver| wait_receiver.wait());
276 }
277}
278
279#[derive(Debug)]
280struct InnerHandles {
281 executor: Option<reactor::Executor>,
282 stop: Option<oneshot::Sender<()>>,
283 path: String,
284}
285
286impl InnerHandles {
287 pub fn close(&mut self) {
288 let _ = self.stop.take().map(|stop| stop.send(()));
289 if let Some(executor) = self.executor.take() {
290 executor.close()
291 }
292 let _ = ::std::fs::remove_file(&self.path); }
294}
295
296impl Drop for InnerHandles {
297 fn drop(&mut self) {
298 self.close();
299 }
300}
301#[derive(Clone)]
303pub struct CloseHandle {
304 inner: Arc<Mutex<InnerHandles>>,
305}
306
307impl CloseHandle {
308 pub fn close(self) {
310 self.inner.lock().close();
311 }
312}
313
314#[cfg(test)]
315#[cfg(not(windows))]
316mod tests {
317 use tokio_uds;
318
319 use self::tokio_uds::UnixStream;
320 use super::SecurityAttributes;
321 use super::{Server, ServerBuilder};
322 use crate::jsonrpc::futures::sync::{mpsc, oneshot};
323 use crate::jsonrpc::futures::{future, Future, Sink, Stream};
324 use crate::jsonrpc::{MetaIoHandler, Value};
325 use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
326 use crate::server_utils::codecs;
327 use crate::server_utils::{
328 tokio::{self, timer::Delay},
329 tokio_codec::Decoder,
330 };
331 use parking_lot::Mutex;
332 use std::sync::Arc;
333 use std::thread;
334 use std::time;
335 use std::time::{Duration, Instant};
336
337 fn server_builder() -> ServerBuilder {
338 let mut io = MetaIoHandler::<()>::default();
339 io.add_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
340 ServerBuilder::new(io)
341 }
342
343 fn run(path: &str) -> Server {
344 let builder = server_builder();
345 let server = builder.start(path).expect("Server must run with no issues");
346 server
347 }
348
349 fn dummy_request_str(path: &str, data: &str) -> String {
350 let stream_future = UnixStream::connect(path);
351 let reply = stream_future.and_then(|stream| {
352 let stream = codecs::StreamCodec::stream_incoming().framed(stream);
353 let reply = stream
354 .send(data.to_owned())
355 .and_then(move |stream| stream.into_future().map_err(|(err, _)| err))
356 .and_then(|(reply, _)| future::ok(reply.expect("there should be one reply")));
357 reply
358 });
359
360 reply.wait().expect("wait for reply")
361 }
362
363 #[test]
364 fn start() {
365 crate::logger::init_log();
366
367 let mut io = MetaIoHandler::<()>::default();
368 io.add_method("say_hello", |_params| Ok(Value::String("hello".to_string())));
369 let server = ServerBuilder::new(io);
370
371 let _server = server
372 .start("/tmp/test-ipc-20000")
373 .expect("Server must run with no issues");
374 }
375
376 #[test]
377 fn connect() {
378 crate::logger::init_log();
379 let path = "/tmp/test-ipc-30000";
380 let _server = run(path);
381
382 UnixStream::connect(path).wait().expect("Socket should connect");
383 }
384
385 #[test]
386 fn request() {
387 crate::logger::init_log();
388 let path = "/tmp/test-ipc-40000";
389 let server = run(path);
390 let (stop_signal, stop_receiver) = oneshot::channel();
391
392 let t = thread::spawn(move || {
393 let result = dummy_request_str(
394 path,
395 "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
396 );
397 stop_signal.send(result).unwrap();
398 });
399 t.join().unwrap();
400
401 let _ = stop_receiver
402 .map(move |result: String| {
403 assert_eq!(
404 result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
405 "Response does not exactly match the expected response",
406 );
407 server.close();
408 })
409 .wait();
410 }
411
412 #[test]
413 fn req_parallel() {
414 crate::logger::init_log();
415 let path = "/tmp/test-ipc-45000";
416 let server = run(path);
417 let (stop_signal, stop_receiver) = mpsc::channel(400);
418
419 let mut handles = Vec::new();
420 for _ in 0..4 {
421 let path = path.clone();
422 let mut stop_signal = stop_signal.clone();
423 handles.push(thread::spawn(move || {
424 for _ in 0..100 {
425 let result = dummy_request_str(
426 &path,
427 "{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}",
428 );
429 stop_signal.try_send(result).unwrap();
430 }
431 }));
432 }
433
434 for handle in handles.drain(..) {
435 handle.join().unwrap();
436 }
437
438 let _ = stop_receiver
439 .map(|result| {
440 assert_eq!(
441 result, "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}",
442 "Response does not exactly match the expected response",
443 );
444 })
445 .take(400)
446 .collect()
447 .wait();
448 server.close();
449 }
450
451 #[test]
452 fn close() {
453 crate::logger::init_log();
454 let path = "/tmp/test-ipc-50000";
455 let server = run(path);
456 server.close();
457
458 assert!(
459 ::std::fs::metadata(path).is_err(),
460 "There should be no socket file left"
461 );
462 assert!(
463 UnixStream::connect(path).wait().is_err(),
464 "Connection to the closed socket should fail"
465 );
466 }
467
468 fn huge_response_test_str() -> String {
469 let mut result = String::from("begin_hello");
470 result.push_str("begin_hello");
471 for _ in 0..16384 {
472 result.push(' ');
473 }
474 result.push_str("end_hello");
475 result
476 }
477
478 fn huge_response_test_json() -> String {
479 let mut result = String::from("{\"jsonrpc\":\"2.0\",\"result\":\"");
480 result.push_str(&huge_response_test_str());
481 result.push_str("\",\"id\":1}");
482
483 result
484 }
485
486 #[test]
487 fn test_huge_response() {
488 crate::logger::init_log();
489 let path = "/tmp/test-ipc-60000";
490
491 let mut io = MetaIoHandler::<()>::default();
492 io.add_method("say_huge_hello", |_params| Ok(Value::String(huge_response_test_str())));
493 let builder = ServerBuilder::new(io);
494
495 let server = builder.start(path).expect("Server must run with no issues");
496 let (stop_signal, stop_receiver) = oneshot::channel();
497
498 let t = thread::spawn(move || {
499 let result = dummy_request_str(
500 &path,
501 "{\"jsonrpc\": \"2.0\", \"method\": \"say_huge_hello\", \"params\": [], \"id\": 1}",
502 );
503
504 stop_signal.send(result).unwrap();
505 });
506 t.join().unwrap();
507
508 let _ = stop_receiver
509 .map(move |result: String| {
510 assert_eq!(
511 result,
512 huge_response_test_json(),
513 "Response does not exactly match the expected response",
514 );
515 server.close();
516 })
517 .wait();
518 }
519
520 #[test]
521 fn test_session_end() {
522 struct SessionEndMeta {
523 drop_signal: Option<oneshot::Sender<()>>,
524 }
525
526 impl Drop for SessionEndMeta {
527 fn drop(&mut self) {
528 trace!(target: "ipc", "Dropping session meta");
529 self.drop_signal.take().unwrap().send(()).unwrap()
530 }
531 }
532
533 struct SessionEndExtractor {
534 drop_receivers: Arc<Mutex<mpsc::Sender<oneshot::Receiver<()>>>>,
535 }
536
537 impl MetaExtractor<Arc<SessionEndMeta>> for SessionEndExtractor {
538 fn extract(&self, _context: &RequestContext) -> Arc<SessionEndMeta> {
539 let (signal, receiver) = oneshot::channel();
540 self.drop_receivers.lock().try_send(receiver).unwrap();
541 let meta = SessionEndMeta {
542 drop_signal: Some(signal),
543 };
544 Arc::new(meta)
545 }
546 }
547
548 crate::logger::init_log();
549 let path = "/tmp/test-ipc-30009";
550 let (signal, receiver) = mpsc::channel(16);
551 let session_metadata_extractor = SessionEndExtractor {
552 drop_receivers: Arc::new(Mutex::new(signal)),
553 };
554
555 let io = MetaIoHandler::<Arc<SessionEndMeta>>::default();
556 let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor);
557 let server = builder.start(path).expect("Server must run with no issues");
558 {
559 let _ = UnixStream::connect(path).wait().expect("Socket should connect");
560 }
561
562 receiver
563 .into_future()
564 .map_err(|_| ())
565 .and_then(|drop_receiver| drop_receiver.0.unwrap().map_err(|_| ()))
566 .wait()
567 .unwrap();
568 server.close();
569 }
570
571 #[test]
572 fn close_handle() {
573 crate::logger::init_log();
574 let path = "/tmp/test-ipc-90000";
575 let server = run(path);
576 let handle = server.close_handle();
577 handle.close();
578 assert!(
579 UnixStream::connect(path).wait().is_err(),
580 "Connection to the closed socket should fail"
581 );
582 }
583
584 #[test]
585 fn close_when_waiting() {
586 crate::logger::init_log();
587 let path = "/tmp/test-ipc-70000";
588 let server = run(path);
589 let close_handle = server.close_handle();
590 let (tx, rx) = oneshot::channel();
591
592 thread::spawn(move || {
593 thread::sleep(time::Duration::from_millis(100));
594 close_handle.close();
595 });
596 thread::spawn(move || {
597 server.wait();
598 tx.send(true).expect("failed to report that the server has stopped");
599 });
600
601 let delay = Delay::new(Instant::now() + Duration::from_millis(500))
602 .map(|_| false)
603 .map_err(|err| panic!("{:?}", err));
604
605 let result_fut = rx.map_err(|_| ()).select(delay).then(move |result| match result {
606 Ok((result, _)) => {
607 assert_eq!(result, true, "Wait timeout exceeded");
608 assert!(
609 UnixStream::connect(path).wait().is_err(),
610 "Connection to the closed socket should fail"
611 );
612 Ok(())
613 }
614 Err(_) => Err(()),
615 });
616
617 tokio::run(result_fut);
618 }
619
620 #[test]
621 fn runs_with_security_attributes() {
622 let path = "/tmp/test-ipc-9001";
623 let io = MetaIoHandler::<Arc<()>>::default();
624 ServerBuilder::with_meta_extractor(io, NoopExtractor)
625 .set_security_attributes(SecurityAttributes::empty())
626 .start(path)
627 .expect("Server must run with no issues");
628 }
629}