jsonrpc_utils/
axum_utils.rs

1//! Axum JSONRPC handlers.
2use std::sync::Arc;
3
4use axum::{
5    body::Bytes,
6    extract::{ws::Message, WebSocketUpgrade},
7    http::StatusCode,
8    response::{IntoResponse, Response},
9    routing::post,
10    Extension, Json, Router,
11};
12use futures_util::{SinkExt, StreamExt};
13use jsonrpc_core::{MetaIoHandler, Metadata};
14
15use crate::{
16    pub_sub::Session,
17    stream::{serve_stream_sink, StreamMsg, StreamServerConfig},
18};
19
20/// Axum handler for HTTP POST JSONRPC requests.
21pub async fn handle_jsonrpc<T: Default + Metadata>(
22    Extension(io): Extension<Arc<MetaIoHandler<T>>>,
23    req_body: Bytes,
24) -> Response {
25    let req = match std::str::from_utf8(req_body.as_ref()) {
26        Ok(req) => req,
27        Err(_) => {
28            return Json(jsonrpc_core::Failure {
29                jsonrpc: Some(jsonrpc_core::Version::V2),
30                error: jsonrpc_core::Error::parse_error(),
31                id: jsonrpc_core::Id::Null,
32            })
33            .into_response();
34        }
35    };
36
37    if let Some(r) = io.handle_request(req, T::default()).await {
38        ([(axum::http::header::CONTENT_TYPE, "application/json")], r).into_response()
39    } else {
40        StatusCode::NO_CONTENT.into_response()
41    }
42}
43
44/// Axum handler for JSONRPC over WebSocket.
45///
46/// This supports regular jsonrpc calls and notifications, as well as pub/sub
47/// with [`mod@crate::pub_sub`].
48pub async fn handle_jsonrpc_ws<T: Metadata + From<Session>>(
49    Extension(io): Extension<Arc<MetaIoHandler<T>>>,
50    Extension(config): Extension<StreamServerConfig>,
51    ws: WebSocketUpgrade,
52) -> impl IntoResponse {
53    ws.on_upgrade(move |socket| async move {
54        let (socket_write, socket_read) = socket.split();
55        let write = socket_write.with(|msg: StreamMsg| async move {
56            Ok::<_, axum::Error>(match msg {
57                StreamMsg::Str(msg) => Message::Text(msg.into()),
58                StreamMsg::Ping => Message::Ping(b"ping"[..].into()),
59                StreamMsg::Pong => Message::Pong(b""[..].into()),
60            })
61        });
62        let read = socket_read.filter_map(|msg| async move {
63            match msg {
64                Ok(Message::Text(t)) => Some(Ok(StreamMsg::Str(t.as_str().to_string()))),
65                Ok(Message::Pong(_)) => Some(Ok(StreamMsg::Pong)),
66                Ok(_) => None,
67                Err(e) => Some(Err(e)),
68            }
69        });
70        tokio::pin!(write);
71        tokio::pin!(read);
72        drop(serve_stream_sink(&io, write, read, config).await);
73    })
74}
75
76/// Returns an axum Router that handles JSONRPC requests at the specified
77/// `path`. Both HTTP and WebSocket are supported.
78///
79/// Subscription added via [`mod@crate::pub_sub`] is supported on WebSocket
80/// connections.
81pub fn jsonrpc_router(
82    path: &str,
83    rpc: Arc<MetaIoHandler<Option<Session>>>,
84    websocket_config: StreamServerConfig,
85) -> Router {
86    Router::new()
87        .route(
88            path,
89            post(handle_jsonrpc::<Option<Session>>).get(handle_jsonrpc_ws::<Option<Session>>),
90        )
91        .layer(Extension(rpc))
92        .layer(Extension(websocket_config))
93}