1#[cfg(feature = "with_actix")]
2use actix::System;
3use jsonrpc_core::{Metadata, RpcMethodSimple};
4use jsonrpc_pubsub::{PubSubHandler, SubscribeRpcMethod, UnsubscribeRpcMethod};
5use jsonrpc_ws_server::tokio;
6
7use std::{
8 fmt,
9 sync::{Arc, Mutex},
10};
11
12use crate::{
13 handler::{Handler, Session},
14 transports::{Transport, TransportError},
15};
16
17pub type WittyMonoServer = SingleTransportServer<PubSubHandler<Session>>;
19pub type WittyMultiServer = MultipleTransportsServer<PubSubHandler<Session>>;
21
22#[derive(Debug)]
24pub enum ServerError {
25 Transport(TransportError),
27}
28
29impl std::error::Error for ServerError {}
30
31impl fmt::Display for ServerError {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 match self {
34 ServerError::Transport(error) => write!(f, "{}", error),
35 }
36 }
37}
38
39impl From<TransportError> for ServerError {
40 fn from(value: TransportError) -> Self {
41 Self::Transport(value)
42 }
43}
44
45pub trait Server<H>
47where
48 H: Handler,
49{
50 type Error;
53
54 fn start(&mut self) -> Result<(), Self::Error>;
59
60 fn stop(&mut self) -> Result<(), Self::Error>;
62
63 fn add_method<F>(&mut self, name: &str, method: F)
65 where
66 F: RpcMethodSimple;
67
68 fn add_subscription<F, G>(
70 &mut self,
71 notification: &str,
72 subscribe: (&str, F),
73 unsubscribe: (&str, G),
74 ) where
75 F: SubscribeRpcMethod<H::Metadata>,
76 G: UnsubscribeRpcMethod<H::Metadata>;
77
78 fn describe_api(&self) -> Vec<String>;
80}
81
82#[cfg(feature = "with_actix")]
87pub trait ActixServer<H>: Server<H>
88where
89 H: Handler,
90{
91 fn add_actix_method<F>(&mut self, system: &Option<actix::System>, name: &str, method: F)
94 where
95 F: RpcMethodSimple;
96
97 fn add_actix_subscription<F, G>(
100 &mut self,
101 system: &Option<actix::System>,
102 notification: &str,
103 subscribe: (&str, Arc<F>),
104 unsubscribe: (&str, Arc<G>),
105 ) where
106 F: SubscribeRpcMethod<H::Metadata>,
107 G: UnsubscribeRpcMethod<
108 H::Metadata,
109 Out = jsonrpc_core::BoxFuture<jsonrpc_core::Result<jsonrpc_core::Value>>,
110 >;
111}
112
113#[derive(Default)]
117pub struct MultipleTransportsServer<H>
118where
119 H: Handler,
120{
121 transports: Vec<Box<dyn Transport<H>>>,
122 io_handler: Arc<Mutex<H>>,
124 runtime: Option<tokio::runtime::Handle>,
125}
126
127impl<H> MultipleTransportsServer<H>
128where
129 H: Handler,
130{
131 pub fn add_transport<T>(&mut self, mut transport: T)
133 where
134 T: Transport<H> + 'static,
135 {
136 transport
137 .set_handler(self.io_handler.clone(), self.runtime.clone())
138 .ok();
139 self.transports.push(Box::new(transport));
140 }
141
142 pub fn handle_request_sync(&self, request: &str, meta: H::Metadata) -> Option<String> {
145 self.io_handler
146 .lock()
147 .unwrap()
148 .handle_request_sync(request, meta)
149 }
150
151 fn on_every_transport<'a, F, O>(&mut self, mut operation: F) -> Result<Vec<O>, TransportError>
153 where
154 F: FnMut(&mut (dyn Transport<H> + 'a)) -> Result<O, TransportError>,
155 {
156 self.transports
157 .iter_mut()
158 .map(|transport| operation(&mut **transport))
159 .collect::<Result<Vec<_>, _>>()
160 }
161
162 pub fn new() -> Self {
164 Self {
165 transports: vec![],
166 io_handler: Arc::new(Mutex::new(H::new())),
167 runtime: None,
168 }
169 }
170
171 fn reset_all_transports(&mut self) -> Result<(), TransportError> {
176 let handler = self.io_handler.clone();
177 let runtime = self.runtime.clone();
178
179 self.on_every_transport(|transport| {
180 if transport.requires_reset() {
181 let running = transport.running();
182 if running {
183 transport.stop()?;
184 }
185 transport.set_handler(handler.clone(), runtime.clone())?;
186 if running {
187 transport.start()?;
188 }
189 }
190 Ok(())
191 })?;
192
193 Ok(())
194 }
195
196 pub fn with_runtime(mut self, runtime: tokio::runtime::Handle) -> Self {
198 self.runtime = Some(runtime);
199
200 self
201 }
202}
203
204impl<H> Server<H> for MultipleTransportsServer<H>
205where
206 H: Handler,
207 H::Metadata: Metadata,
208{
209 type Error = ServerError;
210
211 fn start(&mut self) -> Result<(), Self::Error> {
212 let _ = &self.on_every_transport(|transport| transport.start())?;
213
214 Ok(())
215 }
216
217 fn stop(&mut self) -> Result<(), Self::Error> {
218 let _ = &self.on_every_transport(Transport::stop)?;
219
220 Ok(())
221 }
222
223 fn add_method<F>(&mut self, name: &str, method: F)
224 where
225 F: RpcMethodSimple,
226 {
227 (*self.io_handler.lock().unwrap()).add_method(name, method);
228 self.reset_all_transports().ok();
229 }
230
231 fn add_subscription<F, G>(
232 &mut self,
233 notification: &str,
234 subscribe: (&str, F),
235 unsubscribe: (&str, G),
236 ) where
237 F: SubscribeRpcMethod<H::Metadata>,
238 G: UnsubscribeRpcMethod<H::Metadata>,
239 {
240 (*self.io_handler.lock().unwrap()).add_subscription(notification, subscribe, unsubscribe);
241 self.reset_all_transports().ok();
242 }
243
244 fn describe_api(&self) -> Vec<String> {
245 self.io_handler.lock().unwrap().describe_api()
246 }
247}
248
249#[cfg(feature = "with_actix")]
250impl<H> ActixServer<H> for MultipleTransportsServer<H>
251where
252 H: Handler,
253{
254 fn add_actix_method<F>(&mut self, system: &Option<actix::System>, name: &str, method: F)
255 where
256 F: RpcMethodSimple,
257 {
258 let system = system.clone();
259
260 self.add_method(name, move |params| {
261 let system = system.clone();
262 let execution = method.call(params);
263 let (tx, rx) = futures::channel::oneshot::channel();
264
265 Box::pin(async move {
266 let fut = async move {
268 let response = execution.await;
269 tx.send(response)
270 .expect("Should be able to send result back to spawner");
271 };
272
273 if let Some(system) = system.clone() {
275 system.arbiter().spawn(fut);
276 } else {
277 fut.await;
278 }
279
280 rx.await
281 .expect("Should be able to await the oneshot channel")
282 })
283 })
284 }
285
286 fn add_actix_subscription<F, G>(
287 &mut self,
288 system: &Option<System>,
289 notification: &str,
290 subscribe: (&str, Arc<F>),
291 unsubscribe: (&str, Arc<G>),
292 ) where
293 F: SubscribeRpcMethod<H::Metadata>,
294 G: UnsubscribeRpcMethod<
295 H::Metadata,
296 Out = jsonrpc_core::BoxFuture<jsonrpc_core::Result<jsonrpc_core::Value>>,
297 >,
298 {
299 let subscribe_system = system.clone();
300 let unsubscribe_system = system.clone();
301 let (subscribe_name, subscribe_method) = subscribe;
302 let (unsubscribe_name, unsubscribe_method) = unsubscribe;
303
304 self.add_subscription(
305 notification,
306 (subscribe_name, move |params, meta, subscriber| {
307 let method = subscribe_method.clone();
308
309 if let Some(system) = subscribe_system.clone() {
311 system.arbiter().spawn(async move {
312 method.call(params, meta, subscriber);
313 });
314 } else {
315 method.call(params, meta, subscriber);
316 }
317 }),
318 (unsubscribe_name, move |id, meta| {
319 let system = unsubscribe_system.clone();
320 let method = unsubscribe_method.clone();
321 let execution = method.call(id, meta);
322 let (tx, rx) = futures::channel::oneshot::channel();
323
324 Box::pin(async move {
325 let fut = async move {
327 let response = execution.await;
328 tx.send(response)
329 .expect("Should be able to send result back to spawner");
330 };
331
332 if let Some(system) = system.clone() {
334 system.arbiter().spawn(fut);
335 } else {
336 fut.await;
337 }
338
339 rx.await
340 .expect("Should be able to await the oneshot channel")
341 })
342 }),
343 )
344 }
345}
346
347pub struct SingleTransportServer<H>
349where
350 H: Handler,
351{
352 inner: MultipleTransportsServer<H>,
353}
354
355impl<H> SingleTransportServer<H>
356where
357 H: Handler,
358{
359 pub fn from_transport<T>(transport: T) -> Self
361 where
362 T: Transport<H> + 'static,
363 {
364 let mut inner = MultipleTransportsServer::new();
365 inner.add_transport(transport);
366
367 Self { inner }
368 }
369
370 pub fn handle_request_sync(&self, request: &str, meta: H::Metadata) -> Option<String> {
372 self.inner.handle_request_sync(request, meta)
373 }
374
375 pub fn new<T>() -> Self
377 where
378 T: Transport<H> + 'static,
379 {
380 let inner = MultipleTransportsServer::new();
381
382 Self { inner }
383 }
384
385 pub fn with_runtime(mut self, runtime: tokio::runtime::Handle) -> Self {
387 self.inner = self.inner.with_runtime(runtime);
388
389 self
390 }
391}
392
393impl<H> Server<H> for SingleTransportServer<H>
394where
395 H: Handler,
396{
397 type Error = ServerError;
398
399 fn start(&mut self) -> Result<(), Self::Error> {
400 Server::start(&mut self.inner)
401 }
402
403 fn stop(&mut self) -> Result<(), Self::Error> {
404 Server::stop(&mut self.inner)
405 }
406
407 fn add_method<F>(&mut self, name: &str, method: F)
408 where
409 F: RpcMethodSimple,
410 {
411 Server::add_method(&mut self.inner, name, method)
412 }
413
414 fn add_subscription<F, G>(
415 &mut self,
416 notification: &str,
417 subscribe: (&str, F),
418 unsubscribe: (&str, G),
419 ) where
420 F: SubscribeRpcMethod<H::Metadata>,
421 G: UnsubscribeRpcMethod<H::Metadata>,
422 {
423 Server::add_subscription(&mut self.inner, notification, subscribe, unsubscribe)
424 }
425
426 fn describe_api(&self) -> Vec<String> {
427 Server::describe_api(&self.inner)
428 }
429}
430
431#[cfg(feature = "with_actix")]
432impl<H> ActixServer<H> for SingleTransportServer<H>
433where
434 H: Handler,
435{
436 fn add_actix_method<F>(&mut self, system: &Option<System>, name: &str, method: F)
437 where
438 F: RpcMethodSimple,
439 {
440 ActixServer::add_actix_method(&mut self.inner, system, name, method)
441 }
442
443 fn add_actix_subscription<F, G>(
444 &mut self,
445 system: &Option<System>,
446 notification: &str,
447 subscribe: (&str, Arc<F>),
448 unsubscribe: (&str, Arc<G>),
449 ) where
450 F: SubscribeRpcMethod<H::Metadata>,
451 G: UnsubscribeRpcMethod<
452 H::Metadata,
453 Out = jsonrpc_core::BoxFuture<jsonrpc_core::Result<jsonrpc_core::Value>>,
454 >,
455 {
456 ActixServer::add_actix_subscription::<F, G>(
457 &mut self.inner,
458 system,
459 notification,
460 subscribe,
461 unsubscribe,
462 )
463 }
464}