jsonrpc_utils/
axum_utils.rs1use std::sync::Arc;
3
4use axum::{
5 body::Bytes,
6 extract::{
7 ws::{Message, Utf8Bytes},
8 WebSocketUpgrade,
9 },
10 http::StatusCode,
11 response::{IntoResponse, Response},
12 routing::post,
13 Extension, Json, Router,
14};
15use futures_util::{SinkExt, StreamExt};
16use jsonrpc_core::{MetaIoHandler, Metadata};
17
18use crate::{
19 pub_sub::Session,
20 stream::{serve_stream_sink, StreamMsg, StreamServerConfig},
21};
22
23pub async fn handle_jsonrpc<T: Default + Metadata>(
25 Extension(io): Extension<Arc<MetaIoHandler<T>>>,
26 req_body: Bytes,
27) -> Response {
28 let req = match std::str::from_utf8(req_body.as_ref()) {
29 Ok(req) => req,
30 Err(_) => {
31 return Json(jsonrpc_core::Failure {
32 jsonrpc: Some(jsonrpc_core::Version::V2),
33 error: jsonrpc_core::Error::parse_error(),
34 id: jsonrpc_core::Id::Null,
35 })
36 .into_response();
37 }
38 };
39
40 if let Some(r) = io.handle_request(req, T::default()).await {
41 ([(axum::http::header::CONTENT_TYPE, "application/json")], r).into_response()
42 } else {
43 StatusCode::NO_CONTENT.into_response()
44 }
45}
46
47pub async fn handle_jsonrpc_ws<T: Metadata + From<Session>>(
52 Extension(io): Extension<Arc<MetaIoHandler<T>>>,
53 Extension(config): Extension<StreamServerConfig>,
54 ws: WebSocketUpgrade,
55) -> impl IntoResponse {
56 ws.on_upgrade(move |socket| async move {
57 let (socket_write, socket_read) = socket.split();
58 let write = socket_write.with(|msg: StreamMsg<Utf8Bytes>| async move {
59 Ok::<_, axum::Error>(match msg {
60 StreamMsg::Str(msg) => Message::Text(msg),
61 StreamMsg::Ping => Message::Ping(b"ping"[..].into()),
62 StreamMsg::Pong => Message::Pong(b""[..].into()),
63 })
64 });
65 let read = socket_read.filter_map(|msg| async move {
66 match msg {
67 Ok(Message::Text(t)) => Some(Ok(StreamMsg::Str(t))),
68 Ok(Message::Pong(_)) => Some(Ok(StreamMsg::Pong)),
69 Ok(_) => None,
70 Err(e) => Some(Err(e)),
71 }
72 });
73 tokio::pin!(write);
74 tokio::pin!(read);
75 drop(serve_stream_sink(&io, write, read, config).await);
76 })
77}
78
79pub fn jsonrpc_router(
85 path: &str,
86 rpc: Arc<MetaIoHandler<Option<Session>>>,
87 websocket_config: StreamServerConfig,
88) -> Router {
89 Router::new()
90 .route(
91 path,
92 post(handle_jsonrpc::<Option<Session>>).get(handle_jsonrpc_ws::<Option<Session>>),
93 )
94 .layer(Extension(rpc))
95 .layer(Extension(websocket_config))
96}