tetsy_jsonrpc_tcp_server/
server.rs1use std;
2use std::net::SocketAddr;
3use std::sync::Arc;
4
5use tokio_service::Service as TokioService;
6
7use crate::jsonrpc::futures::sync::{mpsc, oneshot};
8use crate::jsonrpc::futures::{future, Future, Sink, Stream};
9use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
10use crate::server_utils::{codecs, reactor, tokio, tokio_codec::Framed, SuspendableStream};
11
12use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels};
13use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
14use crate::service::Service;
15
16pub struct ServerBuilder<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
18 executor: reactor::UninitializedExecutor,
19 handler: Arc<MetaIoHandler<M, S>>,
20 meta_extractor: Arc<dyn MetaExtractor<M>>,
21 channels: Arc<SenderChannels>,
22 incoming_separator: codecs::Separator,
23 outgoing_separator: codecs::Separator,
24}
25
26impl<M: Metadata + Default, S: Middleware<M> + 'static> ServerBuilder<M, S> {
27 pub fn new<T>(handler: T) -> Self
29 where
30 T: Into<MetaIoHandler<M, S>>,
31 {
32 Self::with_meta_extractor(handler, NoopExtractor)
33 }
34}
35
36impl<M: Metadata, S: Middleware<M> + 'static> ServerBuilder<M, S> {
37 pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
39 where
40 T: Into<MetaIoHandler<M, S>>,
41 E: MetaExtractor<M> + 'static,
42 {
43 ServerBuilder {
44 executor: reactor::UninitializedExecutor::Unspawned,
45 handler: Arc::new(handler.into()),
46 meta_extractor: Arc::new(extractor),
47 channels: Default::default(),
48 incoming_separator: Default::default(),
49 outgoing_separator: Default::default(),
50 }
51 }
52
53 pub fn event_loop_executor(mut self, handle: tokio::runtime::TaskExecutor) -> Self {
55 self.executor = reactor::UninitializedExecutor::Shared(handle);
56 self
57 }
58
59 pub fn session_meta_extractor<T: MetaExtractor<M> + 'static>(mut self, meta_extractor: T) -> Self {
61 self.meta_extractor = Arc::new(meta_extractor);
62 self
63 }
64
65 pub fn request_separators(mut self, incoming: codecs::Separator, outgoing: codecs::Separator) -> Self {
67 self.incoming_separator = incoming;
68 self.outgoing_separator = outgoing;
69 self
70 }
71
72 pub fn start(self, addr: &SocketAddr) -> std::io::Result<Server> {
74 let meta_extractor = self.meta_extractor.clone();
75 let rpc_handler = self.handler.clone();
76 let channels = self.channels.clone();
77 let incoming_separator = self.incoming_separator;
78 let outgoing_separator = self.outgoing_separator;
79 let address = addr.to_owned();
80 let (tx, rx) = std::sync::mpsc::channel();
81 let (stop_tx, stop_rx) = oneshot::channel();
82
83 let executor = self.executor.initialize()?;
84
85 executor.spawn(future::lazy(move || {
86 let start = move || {
87 let listener = tokio::net::TcpListener::bind(&address)?;
88 let connections = SuspendableStream::new(listener.incoming());
89
90 let server = connections.map(move |socket| {
91 let peer_addr = match socket.peer_addr() {
92 Ok(addr) => addr,
93 Err(e) => {
94 warn!(target: "tcp", "Unable to determine socket peer address, ignoring connection {}", e);
95 return future::Either::A(future::ok(()));
96 }
97 };
98 trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr);
99 let (sender, receiver) = mpsc::channel(65536);
100
101 let context = RequestContext {
102 peer_addr,
103 sender: sender.clone(),
104 };
105
106 let meta = meta_extractor.extract(&context);
107 let service = Service::new(peer_addr, rpc_handler.clone(), meta);
108 let (writer, reader) = Framed::new(
109 socket,
110 codecs::StreamCodec::new(incoming_separator.clone(), outgoing_separator.clone()),
111 )
112 .split();
113
114 let responses = reader.and_then(move |req| {
115 service.call(req).then(|response| match response {
116 Err(e) => {
117 warn!(target: "tcp", "Error while processing request: {:?}", e);
118 future::ok(String::new())
119 }
120 Ok(None) => {
121 trace!(target: "tcp", "JSON RPC request produced no response");
122 future::ok(String::new())
123 }
124 Ok(Some(response_data)) => {
125 trace!(target: "tcp", "Sent response: {}", &response_data);
126 future::ok(response_data)
127 }
128 })
129 });
130
131 let peer_message_queue = {
132 let mut channels = channels.lock();
133 channels.insert(peer_addr, sender.clone());
134
135 PeerMessageQueue::new(responses, receiver, peer_addr)
136 };
137
138 let shared_channels = channels.clone();
139 let writer = writer.send_all(peer_message_queue).then(move |_| {
140 trace!(target: "tcp", "Peer {}: service finished", peer_addr);
141 let mut channels = shared_channels.lock();
142 channels.remove(&peer_addr);
143 Ok(())
144 });
145
146 future::Either::B(writer)
147 });
148
149 Ok(server)
150 };
151
152 let stop = stop_rx.map_err(|_| ());
153 match start() {
154 Ok(server) => {
155 tx.send(Ok(())).expect("Rx is blocking parent thread.");
156 future::Either::A(
157 server
158 .buffer_unordered(1024)
159 .for_each(|_| Ok(()))
160 .select(stop)
161 .map(|_| ())
162 .map_err(|(e, _)| {
163 error!("Error while executing the server: {:?}", e);
164 }),
165 )
166 }
167 Err(e) => {
168 tx.send(Err(e)).expect("Rx is blocking parent thread.");
169 future::Either::B(stop.map_err(|e| {
170 error!("Error while executing the server: {:?}", e);
171 }))
172 }
173 }
174 }));
175
176 let res = rx.recv().expect("Response is always sent before tx is dropped.");
177
178 res.map(|_| Server {
179 executor: Some(executor),
180 stop: Some(stop_tx),
181 })
182 }
183
184 pub fn dispatcher(&self) -> Dispatcher {
186 Dispatcher::new(self.channels.clone())
187 }
188}
189
190pub struct Server {
192 executor: Option<reactor::Executor>,
193 stop: Option<oneshot::Sender<()>>,
194}
195
196impl Server {
197 pub fn close(mut self) {
199 let _ = self.stop.take().map(|sg| sg.send(()));
200 self.executor.take().unwrap().close();
201 }
202
203 pub fn wait(mut self) {
205 self.executor.take().unwrap().wait();
206 }
207}
208
209impl Drop for Server {
210 fn drop(&mut self) {
211 let _ = self.stop.take().map(|sg| sg.send(()));
212 if let Some(executor) = self.executor.take() {
213 executor.close()
214 }
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn server_is_send_and_sync() {
224 fn is_send_and_sync<T: Send + Sync>() {}
225
226 is_send_and_sync::<Server>();
227 }
228}