rs_jsonrpc_tcp_server/
server.rs1use std;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use tokio_service::Service as TokioService;
6
7use jsonrpc::{MetaIoHandler, Metadata, Middleware, NoopMiddleware};
8use jsonrpc::futures::{future, Future, Stream, Sink};
9use jsonrpc::futures::sync::{mpsc, oneshot};
10use server_utils::{reactor, tokio_core, codecs};
11use server_utils::tokio_io::AsyncRead;
12
13use dispatch::{Dispatcher, SenderChannels, PeerMessageQueue};
14use meta::{MetaExtractor, RequestContext, NoopExtractor};
15use service::Service;
16
17pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
19 remote: reactor::UninitializedRemote,
20 handler: Arc<MetaIoHandler<M, S>>,
21 meta_extractor: Arc<MetaExtractor<M>>,
22 channels: Arc<SenderChannels>,
23 incoming_separator: codecs::Separator,
24 outgoing_separator: codecs::Separator,
25}
26
27impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S> {
28 pub fn new<T>(handler: T) -> Self where
30 T: Into<MetaIoHandler<M, S>>
31 {
32 ServerBuilder {
33 remote: reactor::UninitializedRemote::Unspawned,
34 handler: Arc::new(handler.into()),
35 meta_extractor: Arc::new(NoopExtractor),
36 channels: Default::default(),
37 incoming_separator: Default::default(),
38 outgoing_separator: Default::default(),
39 }
40 }
41
42 pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self {
44 self.remote = reactor::UninitializedRemote::Shared(remote);
45 self
46 }
47
48 pub fn session_meta_extractor<T: MetaExtractor<M> + 'static>(mut self, meta_extractor: T) -> Self {
50 self.meta_extractor = Arc::new(meta_extractor);
51 self
52 }
53
54 pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
56 self.incoming_separator = incoming;
57 self.outgoing_separator = outgoing;
58 self
59 }
60
61 pub fn start(self, addr: &SocketAddr) -> std::io::Result<Server> {
63 let meta_extractor = self.meta_extractor.clone();
64 let rpc_handler = self.handler.clone();
65 let channels = self.channels.clone();
66 let incoming_separator = self.incoming_separator;
67 let outgoing_separator = self.outgoing_separator;
68 let address = addr.to_owned();
69 let (tx, rx) = std::sync::mpsc::channel();
70 let (signal, stop) = oneshot::channel();
71
72 let remote = self.remote.initialize()?;
73
74 remote.remote().spawn(move |handle| {
75 let start = move || {
76 let listener = tokio_core::net::TcpListener::bind(&address, handle)?;
77 let connections = listener.incoming();
78 let remote = handle.remote().clone();
79 let server = connections.for_each(move |(socket, peer_addr)| {
80 trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr);
81 let (sender, receiver) = mpsc::channel(65536);
82
83 let context = RequestContext {
84 peer_addr: peer_addr,
85 sender: sender.clone(),
86 };
87
88 let meta = meta_extractor.extract(&context);
89 let service = Service::new(peer_addr, rpc_handler.clone(), meta);
90 let (writer, reader) = socket.framed(
91 codecs::StreamCodec::new(
92 incoming_separator.clone(),
93 outgoing_separator.clone(),
94 )
95 ).split();
96
97 let responses = reader.and_then(
98 move |req| service.call(req).then(|response| match response {
99 Err(e) => {
100 warn!(target: "tcp", "Error while processing request: {:?}", e);
101 future::ok(String::new())
102 },
103 Ok(None) => {
104 trace!(target: "tcp", "JSON RPC request produced no response");
105 future::ok(String::new())
106 },
107 Ok(Some(response_data)) => {
108 trace!(target: "tcp", "Sent response: {}", &response_data);
109 future::ok(response_data)
110 }
111 })
112 );
113
114 let peer_message_queue = {
115 let mut channels = channels.lock();
116 channels.insert(peer_addr.clone(), sender.clone());
117
118 PeerMessageQueue::new(
119 responses,
120 receiver,
121 peer_addr.clone(),
122 )
123 };
124
125 let shared_channels = channels.clone();
126 let writer = writer.send_all(peer_message_queue).then(move |_| {
127 trace!(target: "tcp", "Peer {}: service finished", peer_addr);
128 let mut channels = shared_channels.lock();
129 channels.remove(&peer_addr);
130 Ok(())
131 });
132
133 remote.spawn(|_| writer);
134
135 Ok(())
136 });
137
138 Ok(server)
139 };
140
141 let stop = stop.map_err(|_| std::io::ErrorKind::Interrupted.into());
142 match start() {
143 Ok(server) => {
144 tx.send(Ok(())).expect("Rx is blocking parent thread.");
145 future::Either::A(server.select(stop)
146 .map(|_| ())
147 .map_err(|(e, _)| {
148 error!("Error while executing the server: {:?}", e);
149 }))
150 },
151 Err(e) => {
152 tx.send(Err(e)).expect("Rx is blocking parent thread.");
153 future::Either::B(stop
154 .map_err(|e| {
155 error!("Error while executing the server: {:?}", e);
156 }))
157 },
158 }
159 });
160
161 let res = rx.recv().expect("Response is always sent before tx is dropped.");
162
163 res.map(|_| Server {
164 remote: Some(remote),
165 stop: Some(signal),
166 })
167 }
168
169 pub fn dispatcher(&self) -> Dispatcher {
171 Dispatcher::new(self.channels.clone())
172 }
173}
174
175pub struct Server {
177 remote: Option<reactor::Remote>,
178 stop: Option<oneshot::Sender<()>>,
179}
180
181impl Server {
182 pub fn close(mut self) {
184 let _ = self.stop.take().map(|sg| sg.send(()));
185 self.remote.take().unwrap().close();
186 }
187
188 pub fn wait(mut self) {
190 self.remote.take().unwrap().wait();
191 }
192}
193
194impl Drop for Server {
195 fn drop(&mut self) {
196 let _ = self.stop.take().map(|sg| sg.send(()));
197 self.remote.take().map(|remote| remote.close());
198 }
199}