jsonrpc_utils/
axum_utils.rs

1//! Axum JSONRPC handlers.
2use std::sync::Arc;
3
4use axum::{
5    body::Bytes,
6    extract::{
7        ws::{Message, Utf8Bytes},
8        WebSocketUpgrade,
9    },
10    http::StatusCode,
11    response::{IntoResponse, Response},
12    routing::post,
13    Extension, Json, Router,
14};
15use futures_util::{SinkExt, StreamExt};
16use jsonrpc_core::{MetaIoHandler, Metadata};
17
18use crate::{
19    pub_sub::Session,
20    stream::{serve_stream_sink, StreamMsg, StreamServerConfig},
21};
22
23/// Axum handler for HTTP POST JSONRPC requests.
24pub async fn handle_jsonrpc<T: Default + Metadata>(
25    Extension(io): Extension<Arc<MetaIoHandler<T>>>,
26    req_body: Bytes,
27) -> Response {
28    let req = match std::str::from_utf8(req_body.as_ref()) {
29        Ok(req) => req,
30        Err(_) => {
31            return Json(jsonrpc_core::Failure {
32                jsonrpc: Some(jsonrpc_core::Version::V2),
33                error: jsonrpc_core::Error::parse_error(),
34                id: jsonrpc_core::Id::Null,
35            })
36            .into_response();
37        }
38    };
39
40    if let Some(r) = io.handle_request(req, T::default()).await {
41        ([(axum::http::header::CONTENT_TYPE, "application/json")], r).into_response()
42    } else {
43        StatusCode::NO_CONTENT.into_response()
44    }
45}
46
47/// Axum handler for JSONRPC over WebSocket.
48///
49/// This supports regular jsonrpc calls and notifications, as well as pub/sub
50/// with [`mod@crate::pub_sub`].
51pub async fn handle_jsonrpc_ws<T: Metadata + From<Session>>(
52    Extension(io): Extension<Arc<MetaIoHandler<T>>>,
53    Extension(config): Extension<StreamServerConfig>,
54    ws: WebSocketUpgrade,
55) -> impl IntoResponse {
56    ws.on_upgrade(move |socket| async move {
57        let (socket_write, socket_read) = socket.split();
58        let write = socket_write.with(|msg: StreamMsg<Utf8Bytes>| async move {
59            Ok::<_, axum::Error>(match msg {
60                StreamMsg::Str(msg) => Message::Text(msg),
61                StreamMsg::Ping => Message::Ping(b"ping"[..].into()),
62                StreamMsg::Pong => Message::Pong(b""[..].into()),
63            })
64        });
65        let read = socket_read.filter_map(|msg| async move {
66            match msg {
67                Ok(Message::Text(t)) => Some(Ok(StreamMsg::Str(t))),
68                Ok(Message::Pong(_)) => Some(Ok(StreamMsg::Pong)),
69                Ok(_) => None,
70                Err(e) => Some(Err(e)),
71            }
72        });
73        tokio::pin!(write);
74        tokio::pin!(read);
75        drop(serve_stream_sink(&io, write, read, config).await);
76    })
77}
78
79/// Returns an axum Router that handles JSONRPC requests at the specified
80/// `path`. Both HTTP and WebSocket are supported.
81///
82/// Subscription added via [`mod@crate::pub_sub`] is supported on WebSocket
83/// connections.
84pub fn jsonrpc_router(
85    path: &str,
86    rpc: Arc<MetaIoHandler<Option<Session>>>,
87    websocket_config: StreamServerConfig,
88) -> Router {
89    Router::new()
90        .route(
91            path,
92            post(handle_jsonrpc::<Option<Session>>).get(handle_jsonrpc_ws::<Option<Session>>),
93        )
94        .layer(Extension(rpc))
95        .layer(Extension(websocket_config))
96}