bitconch_jsonrpc_tcp_server/
server.rs1use std;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use tokio_service::Service as TokioService;
6
7use jsonrpc::{MetaIoHandler, Metadata, Middleware, NoopMiddleware};
8use jsonrpc::futures::{future, Future, Stream, Sink};
9use jsonrpc::futures::sync::{mpsc, oneshot};
10use server_utils::{
11 tokio_codec::Framed,
12 tokio, reactor, codecs,
13};
14
15use dispatch::{Dispatcher, SenderChannels, PeerMessageQueue};
16use meta::{MetaExtractor, RequestContext, NoopExtractor};
17use service::Service;
18
19pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
21 executor: reactor::UninitializedExecutor,
22 handler: Arc<MetaIoHandler<M, S>>,
23 meta_extractor: Arc<MetaExtractor<M>>,
24 channels: Arc<SenderChannels>,
25 incoming_separator: codecs::Separator,
26 outgoing_separator: codecs::Separator,
27}
28
29impl<M: Metadata + Default, S: Middleware<M> + 'static> ServerBuilder<M, S> {
30 pub fn new<T>(handler: T) -> Self where
32 T: Into<MetaIoHandler<M, S>>,
33 {
34 Self::with_meta_extractor(handler, NoopExtractor)
35 }
36}
37
38impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S> {
39 pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self where
41 T: Into<MetaIoHandler<M, S>>,
42 E: MetaExtractor<M> + 'static,
43 {
44 ServerBuilder {
45 executor: reactor::UninitializedExecutor::Unspawned,
46 handler: Arc::new(handler.into()),
47 meta_extractor: Arc::new(extractor),
48 channels: Default::default(),
49 incoming_separator: Default::default(),
50 outgoing_separator: Default::default(),
51 }
52 }
53
54 pub fn event_loop_executor(mut self, handle: tokio::runtime::TaskExecutor) -> Self {
56 self.executor = reactor::UninitializedExecutor::Shared(handle);
57 self
58 }
59
60 pub fn session_meta_extractor<T: MetaExtractor<M> + 'static>(mut self, meta_extractor: T) -> Self {
62 self.meta_extractor = Arc::new(meta_extractor);
63 self
64 }
65
66 pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
68 self.incoming_separator = incoming;
69 self.outgoing_separator = outgoing;
70 self
71 }
72
73 pub fn start(self, addr: &SocketAddr) -> std::io::Result<Server> {
75 let meta_extractor = self.meta_extractor.clone();
76 let rpc_handler = self.handler.clone();
77 let channels = self.channels.clone();
78 let incoming_separator = self.incoming_separator;
79 let outgoing_separator = self.outgoing_separator;
80 let address = addr.to_owned();
81 let (tx, rx) = std::sync::mpsc::channel();
82 let (stop_tx, stop_rx) = oneshot::channel();
83
84 let executor = self.executor.initialize()?;
85
86 executor.spawn(future::lazy(move || {
87 let start = move || {
88 let listener = tokio::net::TcpListener::bind(&address)?;
89 let connections = listener.incoming();
90
91 let server = connections.for_each(move |socket| {
92 let peer_addr = socket.peer_addr().expect("Unable to determine socket peer address");
93 trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr);
94 let (sender, receiver) = mpsc::channel(65536);
95
96 let context = RequestContext {
97 peer_addr: peer_addr,
98 sender: sender.clone(),
99 };
100
101 let meta = meta_extractor.extract(&context);
102 let service = Service::new(peer_addr, rpc_handler.clone(), meta);
103 let (writer, reader) = Framed::new(
104 socket,
105 codecs::StreamCodec::new(
106 incoming_separator.clone(),
107 outgoing_separator.clone(),
108 ),
109 ).split();
110
111 let responses = reader.and_then(
112 move |req| service.call(req).then(|response| match response {
113 Err(e) => {
114 warn!(target: "tcp", "Error while processing request: {:?}", e);
115 future::ok(String::new())
116 },
117 Ok(None) => {
118 trace!(target: "tcp", "JSON RPC request produced no response");
119 future::ok(String::new())
120 },
121 Ok(Some(response_data)) => {
122 trace!(target: "tcp", "Sent response: {}", &response_data);
123 future::ok(response_data)
124 }
125 })
126 );
127
128 let peer_message_queue = {
129 let mut channels = channels.lock();
130 channels.insert(peer_addr.clone(), sender.clone());
131
132 PeerMessageQueue::new(
133 responses,
134 receiver,
135 peer_addr.clone(),
136 )
137 };
138
139 let shared_channels = channels.clone();
140 let writer = writer.send_all(peer_message_queue).then(move |_| {
141 trace!(target: "tcp", "Peer {}: service finished", peer_addr);
142 let mut channels = shared_channels.lock();
143 channels.remove(&peer_addr);
144 Ok(())
145 });
146
147 tokio::spawn(writer);
148
149 Ok(())
150 });
151
152 Ok(server)
153 };
154
155 let stop = stop_rx.map_err(|_| std::io::ErrorKind::Interrupted.into());
156 match start() {
157 Ok(server) => {
158 tx.send(Ok(())).expect("Rx is blocking parent thread.");
159 future::Either::A(server.select(stop)
160 .map(|_| ())
161 .map_err(|(e, _)| {
162 error!("Error while executing the server: {:?}", e);
163 }))
164 },
165 Err(e) => {
166 tx.send(Err(e)).expect("Rx is blocking parent thread.");
167 future::Either::B(stop
168 .map_err(|e| {
169 error!("Error while executing the server: {:?}", e);
170 }))
171 },
172 }
173 }));
174
175 let res = rx.recv().expect("Response is always sent before tx is dropped.");
176
177 res.map(|_| Server {
178 executor: Some(executor),
179 stop: Some(stop_tx),
180 })
181 }
182
183 pub fn dispatcher(&self) -> Dispatcher {
185 Dispatcher::new(self.channels.clone())
186 }
187}
188
189pub struct Server {
191 executor: Option<reactor::Executor>,
192 stop: Option<oneshot::Sender<()>>,
193}
194
195impl Server {
196 pub fn close(mut self) {
198 let _ = self.stop.take().map(|sg| sg.send(()));
199 self.executor.take().unwrap().close();
200 }
201
202 pub fn wait(mut self) {
204 self.executor.take().unwrap().wait();
205 }
206}
207
208impl Drop for Server {
209 fn drop(&mut self) {
210 let _ = self.stop.take().map(|sg| sg.send(()));
211 self.executor.take().map(|executor| executor.close());
212 }
213}