witty_jsonrpc/
server.rs

1#[cfg(feature = "with_actix")]
2use actix::System;
3use jsonrpc_core::{Metadata, RpcMethodSimple};
4use jsonrpc_pubsub::{PubSubHandler, SubscribeRpcMethod, UnsubscribeRpcMethod};
5use jsonrpc_ws_server::tokio;
6
7use std::{
8    fmt,
9    sync::{Arc, Mutex},
10};
11
12use crate::{
13    handler::{Handler, Session},
14    transports::{Transport, TransportError},
15};
16
17/// A convenient type alias for a single transport server that supports PubSub.
18pub type WittyMonoServer = SingleTransportServer<PubSubHandler<Session>>;
19/// A convenient type alias for a multiple transports server that supports PubSub.
20pub type WittyMultiServer = MultipleTransportsServer<PubSubHandler<Session>>;
21
22/// Enumerates all the different errors that a `Server` can get into.
23#[derive(Debug)]
24pub enum ServerError {
25    /// An error that happened in one of the underlaying transports
26    Transport(TransportError),
27}
28
29impl std::error::Error for ServerError {}
30
31impl fmt::Display for ServerError {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        match self {
34            ServerError::Transport(error) => write!(f, "{}", error),
35        }
36    }
37}
38
39impl From<TransportError> for ServerError {
40    fn from(value: TransportError) -> Self {
41        Self::Transport(value)
42    }
43}
44
45/// Trait defining a JSON-RPC server.
46pub trait Server<H>
47where
48    H: Handler,
49{
50    /// The type to use as the error type within the `Result`s used by an implementation of this
51    /// trait.
52    type Error;
53
54    /// Start the server.
55    ///
56    /// This is expexted to be highly side effected, i.e. this is where sockets and listeners are
57    /// started.
58    fn start(&mut self) -> Result<(), Self::Error>;
59
60    /// Stop the server.
61    fn stop(&mut self) -> Result<(), Self::Error>;
62
63    /// Add a JSON-RPC method to the server.
64    fn add_method<F>(&mut self, name: &str, method: F)
65    where
66        F: RpcMethodSimple;
67
68    /// Add a JSON-RPC subscription so the server.
69    fn add_subscription<F, G>(
70        &mut self,
71        notification: &str,
72        subscribe: (&str, F),
73        unsubscribe: (&str, G),
74    ) where
75        F: SubscribeRpcMethod<H::Metadata>,
76        G: UnsubscribeRpcMethod<H::Metadata>;
77
78    /// Get a list of all the supported JSON-RPC methods.
79    fn describe_api(&self) -> Vec<String>;
80}
81
82/// A little extension of `Server` that allows seamless compatibility with the Actix framework.
83///
84/// This trait is only conditionally compiled. To use it, make sure to enable the `with_actix`
85/// feature in `Cargo.toml`.
86#[cfg(feature = "with_actix")]
87pub trait ActixServer<H>: Server<H>
88where
89    H: Handler,
90{
91    /// Add a JSON-RPC method that when executed will be spawned into an Actix `arbiter` if a
92    /// `system` is provided.
93    fn add_actix_method<F>(&mut self, system: &Option<actix::System>, name: &str, method: F)
94    where
95        F: RpcMethodSimple;
96
97    /// Add a JSON-RPC subscription that when executed will be spawned into an Actix `arbiter` if a
98    /// `system` is provided.
99    fn add_actix_subscription<F, G>(
100        &mut self,
101        system: &Option<actix::System>,
102        notification: &str,
103        subscribe: (&str, Arc<F>),
104        unsubscribe: (&str, Arc<G>),
105    ) where
106        F: SubscribeRpcMethod<H::Metadata>,
107        G: UnsubscribeRpcMethod<
108            H::Metadata,
109            Out = jsonrpc_core::BoxFuture<jsonrpc_core::Result<jsonrpc_core::Value>>,
110        >;
111}
112
113/// A JSON-RPC server that supports using multiple transports at once.
114///
115/// All the transports share the same underlying IO handler.
116#[derive(Default)]
117pub struct MultipleTransportsServer<H>
118where
119    H: Handler,
120{
121    transports: Vec<Box<dyn Transport<H>>>,
122    // TODO: Change Mutex for RwLock
123    io_handler: Arc<Mutex<H>>,
124    runtime: Option<tokio::runtime::Handle>,
125}
126
127impl<H> MultipleTransportsServer<H>
128where
129    H: Handler,
130{
131    /// Add a transport to the server.
132    pub fn add_transport<T>(&mut self, mut transport: T)
133    where
134        T: Transport<H> + 'static,
135    {
136        transport
137            .set_handler(self.io_handler.clone(), self.runtime.clone())
138            .ok();
139        self.transports.push(Box::new(transport));
140    }
141
142    /// Programmatically trigger the handling of a JSON-RPC message inside the IO handler that the
143    /// server wraps.
144    pub fn handle_request_sync(&self, request: &str, meta: H::Metadata) -> Option<String> {
145        self.io_handler
146            .lock()
147            .unwrap()
148            .handle_request_sync(request, meta)
149    }
150
151    /// Apply the same closure on every single transport added to this server.
152    fn on_every_transport<'a, F, O>(&mut self, mut operation: F) -> Result<Vec<O>, TransportError>
153    where
154        F: FnMut(&mut (dyn Transport<H> + 'a)) -> Result<O, TransportError>,
155    {
156        self.transports
157            .iter_mut()
158            .map(|transport| operation(&mut **transport))
159            .collect::<Result<Vec<_>, _>>()
160    }
161
162    /// Create a new server with everything set to its defaults.
163    pub fn new() -> Self {
164        Self {
165            transports: vec![],
166            io_handler: Arc::new(Mutex::new(H::new())),
167            runtime: None,
168        }
169    }
170
171    /// Stop, reconfigure and re-start all the transports added to this server.
172    ///
173    /// This is especially needed for transports that use a external server builder and therefore
174    /// cannot benefit from the `Arc` around the IO handler.
175    fn reset_all_transports(&mut self) -> Result<(), TransportError> {
176        let handler = self.io_handler.clone();
177        let runtime = self.runtime.clone();
178
179        self.on_every_transport(|transport| {
180            if transport.requires_reset() {
181                let running = transport.running();
182                if running {
183                    transport.stop()?;
184                }
185                transport.set_handler(handler.clone(), runtime.clone())?;
186                if running {
187                    transport.start()?;
188                }
189            }
190            Ok(())
191        })?;
192
193        Ok(())
194    }
195
196    /// Attach a tokio runtime to the server.
197    pub fn with_runtime(mut self, runtime: tokio::runtime::Handle) -> Self {
198        self.runtime = Some(runtime);
199
200        self
201    }
202}
203
204impl<H> Server<H> for MultipleTransportsServer<H>
205where
206    H: Handler,
207    H::Metadata: Metadata,
208{
209    type Error = ServerError;
210
211    fn start(&mut self) -> Result<(), Self::Error> {
212        let _ = &self.on_every_transport(|transport| transport.start())?;
213
214        Ok(())
215    }
216
217    fn stop(&mut self) -> Result<(), Self::Error> {
218        let _ = &self.on_every_transport(Transport::stop)?;
219
220        Ok(())
221    }
222
223    fn add_method<F>(&mut self, name: &str, method: F)
224    where
225        F: RpcMethodSimple,
226    {
227        (*self.io_handler.lock().unwrap()).add_method(name, method);
228        self.reset_all_transports().ok();
229    }
230
231    fn add_subscription<F, G>(
232        &mut self,
233        notification: &str,
234        subscribe: (&str, F),
235        unsubscribe: (&str, G),
236    ) where
237        F: SubscribeRpcMethod<H::Metadata>,
238        G: UnsubscribeRpcMethod<H::Metadata>,
239    {
240        (*self.io_handler.lock().unwrap()).add_subscription(notification, subscribe, unsubscribe);
241        self.reset_all_transports().ok();
242    }
243
244    fn describe_api(&self) -> Vec<String> {
245        self.io_handler.lock().unwrap().describe_api()
246    }
247}
248
249#[cfg(feature = "with_actix")]
250impl<H> ActixServer<H> for MultipleTransportsServer<H>
251where
252    H: Handler,
253{
254    fn add_actix_method<F>(&mut self, system: &Option<actix::System>, name: &str, method: F)
255    where
256        F: RpcMethodSimple,
257    {
258        let system = system.clone();
259
260        self.add_method(name, move |params| {
261            let system = system.clone();
262            let execution = method.call(params);
263            let (tx, rx) = futures::channel::oneshot::channel();
264
265            Box::pin(async move {
266                // The future that will actually execute the method
267                let fut = async move {
268                    let response = execution.await;
269                    tx.send(response)
270                        .expect("Should be able to send result back to spawner");
271                };
272
273                // If an actix system is available, spawn there, otherwise simply wait on the future
274                if let Some(system) = system.clone() {
275                    system.arbiter().spawn(fut);
276                } else {
277                    fut.await;
278                }
279
280                rx.await
281                    .expect("Should be able to await the oneshot channel")
282            })
283        })
284    }
285
286    fn add_actix_subscription<F, G>(
287        &mut self,
288        system: &Option<System>,
289        notification: &str,
290        subscribe: (&str, Arc<F>),
291        unsubscribe: (&str, Arc<G>),
292    ) where
293        F: SubscribeRpcMethod<H::Metadata>,
294        G: UnsubscribeRpcMethod<
295            H::Metadata,
296            Out = jsonrpc_core::BoxFuture<jsonrpc_core::Result<jsonrpc_core::Value>>,
297        >,
298    {
299        let subscribe_system = system.clone();
300        let unsubscribe_system = system.clone();
301        let (subscribe_name, subscribe_method) = subscribe;
302        let (unsubscribe_name, unsubscribe_method) = unsubscribe;
303
304        self.add_subscription(
305            notification,
306            (subscribe_name, move |params, meta, subscriber| {
307                let method = subscribe_method.clone();
308
309                // If an actix system is available, spawn there, otherwise simply wait on the future
310                if let Some(system) = subscribe_system.clone() {
311                    system.arbiter().spawn(async move {
312                        method.call(params, meta, subscriber);
313                    });
314                } else {
315                    method.call(params, meta, subscriber);
316                }
317            }),
318            (unsubscribe_name, move |id, meta| {
319                let system = unsubscribe_system.clone();
320                let method = unsubscribe_method.clone();
321                let execution = method.call(id, meta);
322                let (tx, rx) = futures::channel::oneshot::channel();
323
324                Box::pin(async move {
325                    // The future that will actually execute the method
326                    let fut = async move {
327                        let response = execution.await;
328                        tx.send(response)
329                            .expect("Should be able to send result back to spawner");
330                    };
331
332                    // If an actix system is available, spawn there, otherwise simply wait on the future
333                    if let Some(system) = system.clone() {
334                        system.arbiter().spawn(fut);
335                    } else {
336                        fut.await;
337                    }
338
339                    rx.await
340                        .expect("Should be able to await the oneshot channel")
341                })
342            }),
343        )
344    }
345}
346
347/// A simple JSON-RPC server that only uses one transport.
348pub struct SingleTransportServer<H>
349where
350    H: Handler,
351{
352    inner: MultipleTransportsServer<H>,
353}
354
355impl<H> SingleTransportServer<H>
356where
357    H: Handler,
358{
359    /// Create a simple server around an already existing instance of a transport.
360    pub fn from_transport<T>(transport: T) -> Self
361    where
362        T: Transport<H> + 'static,
363    {
364        let mut inner = MultipleTransportsServer::new();
365        inner.add_transport(transport);
366
367        Self { inner }
368    }
369
370    /// Programmatically trigger the handling of a JSON-RPC message on this server.
371    pub fn handle_request_sync(&self, request: &str, meta: H::Metadata) -> Option<String> {
372        self.inner.handle_request_sync(request, meta)
373    }
374
375    /// Create a new server.
376    pub fn new<T>() -> Self
377    where
378        T: Transport<H> + 'static,
379    {
380        let inner = MultipleTransportsServer::new();
381
382        Self { inner }
383    }
384
385    /// Attach a tokio runtime to the server.
386    pub fn with_runtime(mut self, runtime: tokio::runtime::Handle) -> Self {
387        self.inner = self.inner.with_runtime(runtime);
388
389        self
390    }
391}
392
393impl<H> Server<H> for SingleTransportServer<H>
394where
395    H: Handler,
396{
397    type Error = ServerError;
398
399    fn start(&mut self) -> Result<(), Self::Error> {
400        Server::start(&mut self.inner)
401    }
402
403    fn stop(&mut self) -> Result<(), Self::Error> {
404        Server::stop(&mut self.inner)
405    }
406
407    fn add_method<F>(&mut self, name: &str, method: F)
408    where
409        F: RpcMethodSimple,
410    {
411        Server::add_method(&mut self.inner, name, method)
412    }
413
414    fn add_subscription<F, G>(
415        &mut self,
416        notification: &str,
417        subscribe: (&str, F),
418        unsubscribe: (&str, G),
419    ) where
420        F: SubscribeRpcMethod<H::Metadata>,
421        G: UnsubscribeRpcMethod<H::Metadata>,
422    {
423        Server::add_subscription(&mut self.inner, notification, subscribe, unsubscribe)
424    }
425
426    fn describe_api(&self) -> Vec<String> {
427        Server::describe_api(&self.inner)
428    }
429}
430
431#[cfg(feature = "with_actix")]
432impl<H> ActixServer<H> for SingleTransportServer<H>
433where
434    H: Handler,
435{
436    fn add_actix_method<F>(&mut self, system: &Option<System>, name: &str, method: F)
437    where
438        F: RpcMethodSimple,
439    {
440        ActixServer::add_actix_method(&mut self.inner, system, name, method)
441    }
442
443    fn add_actix_subscription<F, G>(
444        &mut self,
445        system: &Option<System>,
446        notification: &str,
447        subscribe: (&str, Arc<F>),
448        unsubscribe: (&str, Arc<G>),
449    ) where
450        F: SubscribeRpcMethod<H::Metadata>,
451        G: UnsubscribeRpcMethod<
452            H::Metadata,
453            Out = jsonrpc_core::BoxFuture<jsonrpc_core::Result<jsonrpc_core::Value>>,
454        >,
455    {
456        ActixServer::add_actix_subscription::<F, G>(
457            &mut self.inner,
458            system,
459            notification,
460            subscribe,
461            unsubscribe,
462        )
463    }
464}