server/
server.rs

1//! Example HTTP, WebSocket and TCP JSON-RPC server.
2
3use std::{sync::Arc, time::Duration};
4
5use futures_util::{SinkExt, TryStreamExt};
6use jsonrpc_core::{MetaIoHandler, Params};
7use jsonrpc_utils::{
8    axum_utils::jsonrpc_router,
9    pub_sub::{add_pub_sub, PublishMsg},
10    stream::{serve_stream_sink, StreamMsg, StreamServerConfig},
11};
12use tokio::net::TcpListener;
13use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError};
14
15#[tokio::main]
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    rpc.add_method("sleep", |params: Params| async move {
19        let (x,): (u64,) = params.parse()?;
20        tokio::time::sleep(Duration::from_secs(x)).await;
21        Ok(x.into())
22    });
23    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24    rpc.add_method("value", |params: Params| async move {
25        let x: Option<u64> = params.parse()?;
26        Ok(x.unwrap_or_default().into())
27    });
28    rpc.add_method("add", |params: Params| async move {
29        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30        let sum = x + y + z;
31        Ok(sum.into())
32    });
33
34    add_pub_sub(
35        &mut rpc,
36        "subscribe",
37        "subscription",
38        "unsubscribe",
39        |params: Params| {
40            let (interval,): (u64,) = params.parse()?;
41            if interval > 0 {
42                Ok(async_stream::stream! {
43                    for i in 0..10 {
44                        tokio::time::sleep(Duration::from_secs(interval)).await;
45                        yield PublishMsg::result(&i);
46                    }
47                    yield PublishMsg::error(&jsonrpc_core::Error {
48                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
49                        message: "ended".into(),
50                        data: None,
51                    });
52                })
53            } else {
54                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55            }
56        },
57    );
58    let rpc = Arc::new(rpc);
59    let stream_config = StreamServerConfig::default()
60        .with_channel_size(4)
61        .with_pipeline_size(4);
62
63    // HTTP and WS server.
64    let ws_config = stream_config.clone().with_keep_alive(true);
65    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67    // You can use additional tower-http middlewares to add e.g. CORS.
68    tokio::spawn(async move {
69        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70        axum::serve(listener, app).await.unwrap();
71    });
72
73    // TCP server with line delimited json codec.
74    //
75    // You can also use other transports (e.g. TLS, unix socket) and codecs
76    // (e.g. netstring, JSON splitter).
77    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79    while let Ok((s, _)) = listener.accept().await {
80        let rpc = rpc.clone();
81        let stream_config = stream_config.clone();
82        let codec = codec.clone();
83        tokio::spawn(async move {
84            let (r, w) = s.into_split();
85            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86            let w = FramedWrite::new(w, codec).with(|msg| async move {
87                Ok::<_, LinesCodecError>(match msg {
88                    StreamMsg::Str(msg) => msg,
89                    _ => "".into(),
90                })
91            });
92            tokio::pin!(w);
93            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94        });
95    }
96}