Function jsonrpc_router

Source
pub fn jsonrpc_router(
    path: &str,
    rpc: Arc<MetaIoHandler<Option<Session>>>,
    websocket_config: StreamServerConfig,
) -> Router
Available on crate feature axum only.
Expand description

Returns an axum Router that handles JSONRPC requests at the specified path. Both HTTP and WebSocket are supported.

Subscription added via crate::pub_sub is supported on WebSocket connections.

Examples found in repository?
examples/server-macros-custom-error.rs (line 108)
102async fn main() {
103    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
104    add_my_rpc_methods(&mut rpc, RpcImpl);
105    let rpc = Arc::new(rpc);
106    let stream_config = StreamServerConfig::default().with_keep_alive(true);
107
108    let app = jsonrpc_router("/rpc", rpc, stream_config);
109    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
110    axum::serve(listener, app).await.unwrap();
111}
More examples
Hide additional examples
examples/server-macros.rs (line 75)
69async fn main() {
70    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
71    add_my_rpc_methods(&mut rpc, RpcImpl);
72    let rpc = Arc::new(rpc);
73    let stream_config = StreamServerConfig::default().with_keep_alive(true);
74
75    let app = jsonrpc_router("/rpc", rpc, stream_config);
76    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
77    axum::serve(listener, app).await.unwrap();
78}
examples/openrpc.rs (line 65)
52async fn main() {
53    let doc = my_rpc_doc();
54
55    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
56    add_my_rpc_methods(&mut rpc, MyRpcImpl);
57    rpc.add_method("rpc.discover", move |_| {
58        let doc = doc.clone();
59        async move { Ok(doc) }
60    });
61
62    let rpc = Arc::new(rpc);
63
64    let stream_config = StreamServerConfig::default().with_keep_alive(true);
65    let app = jsonrpc_router("/", rpc, stream_config).layer(CorsLayer::permissive());
66
67    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
68    axum::serve(listener, app).await.unwrap();
69}
examples/broadcast.rs (line 54)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    let (tx, _) = broadcast::channel(8);
19    tokio::spawn({
20        let tx = tx.clone();
21        async move {
22            for i in 0u64.. {
23                // Error can be ignored.
24                //
25                // It is recommended to broadcast already serialized
26                // `PublishMsg`. This way it only need to serialized once.
27                drop(tx.send(PublishMsg::result(&i)));
28                tokio::time::sleep(Duration::from_secs(1)).await;
29            }
30        }
31    });
32    add_pub_sub(
33        &mut rpc,
34        "subscribe",
35        "subscription",
36        "unsubscribe",
37        move |_params: Params| {
38            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39                result.unwrap_or_else(|_| {
40                    PublishMsg::error(&jsonrpc_core::Error {
41                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
42                        message: "lagged".into(),
43                        data: None,
44                    })
45                })
46            }))
47        },
48    );
49    let rpc = Arc::new(rpc);
50    let config = StreamServerConfig::default()
51        .with_channel_size(4)
52        .with_pipeline_size(4)
53        .with_keep_alive(true);
54    let app = jsonrpc_router("/rpc", rpc.clone(), config);
55    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56    axum::serve(listener, app).await.unwrap();
57}
examples/server.rs (line 65)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    rpc.add_method("sleep", |params: Params| async move {
19        let (x,): (u64,) = params.parse()?;
20        tokio::time::sleep(Duration::from_secs(x)).await;
21        Ok(x.into())
22    });
23    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24    rpc.add_method("value", |params: Params| async move {
25        let x: Option<u64> = params.parse()?;
26        Ok(x.unwrap_or_default().into())
27    });
28    rpc.add_method("add", |params: Params| async move {
29        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30        let sum = x + y + z;
31        Ok(sum.into())
32    });
33
34    add_pub_sub(
35        &mut rpc,
36        "subscribe",
37        "subscription",
38        "unsubscribe",
39        |params: Params| {
40            let (interval,): (u64,) = params.parse()?;
41            if interval > 0 {
42                Ok(async_stream::stream! {
43                    for i in 0..10 {
44                        tokio::time::sleep(Duration::from_secs(interval)).await;
45                        yield PublishMsg::result(&i);
46                    }
47                    yield PublishMsg::error(&jsonrpc_core::Error {
48                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
49                        message: "ended".into(),
50                        data: None,
51                    });
52                })
53            } else {
54                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55            }
56        },
57    );
58    let rpc = Arc::new(rpc);
59    let stream_config = StreamServerConfig::default()
60        .with_channel_size(4)
61        .with_pipeline_size(4);
62
63    // HTTP and WS server.
64    let ws_config = stream_config.clone().with_keep_alive(true);
65    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67    // You can use additional tower-http middlewares to add e.g. CORS.
68    tokio::spawn(async move {
69        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70        axum::serve(listener, app).await.unwrap();
71    });
72
73    // TCP server with line delimited json codec.
74    //
75    // You can also use other transports (e.g. TLS, unix socket) and codecs
76    // (e.g. netstring, JSON splitter).
77    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79    while let Ok((s, _)) = listener.accept().await {
80        let rpc = rpc.clone();
81        let stream_config = stream_config.clone();
82        let codec = codec.clone();
83        tokio::spawn(async move {
84            let (r, w) = s.into_split();
85            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86            let w = FramedWrite::new(w, codec).with(|msg| async move {
87                Ok::<_, LinesCodecError>(match msg {
88                    StreamMsg::Str(msg) => msg,
89                    _ => "".into(),
90                })
91            });
92            tokio::pin!(w);
93            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94        });
95    }
96}