jsonrpc_utils/
axum_utils.rs1use std::sync::Arc;
3
4use axum::{
5 body::Bytes,
6 extract::{ws::Message, WebSocketUpgrade},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9 routing::post,
10 Extension, Json, Router,
11};
12use futures_util::{SinkExt, StreamExt};
13use jsonrpc_core::{MetaIoHandler, Metadata};
14
15use crate::{
16 pub_sub::Session,
17 stream::{serve_stream_sink, StreamMsg, StreamServerConfig},
18};
19
20pub async fn handle_jsonrpc<T: Default + Metadata>(
22 Extension(io): Extension<Arc<MetaIoHandler<T>>>,
23 req_body: Bytes,
24) -> Response {
25 let req = match std::str::from_utf8(req_body.as_ref()) {
26 Ok(req) => req,
27 Err(_) => {
28 return Json(jsonrpc_core::Failure {
29 jsonrpc: Some(jsonrpc_core::Version::V2),
30 error: jsonrpc_core::Error::parse_error(),
31 id: jsonrpc_core::Id::Null,
32 })
33 .into_response();
34 }
35 };
36
37 if let Some(r) = io.handle_request(req, T::default()).await {
38 ([(axum::http::header::CONTENT_TYPE, "application/json")], r).into_response()
39 } else {
40 StatusCode::NO_CONTENT.into_response()
41 }
42}
43
44pub async fn handle_jsonrpc_ws<T: Metadata + From<Session>>(
49 Extension(io): Extension<Arc<MetaIoHandler<T>>>,
50 Extension(config): Extension<StreamServerConfig>,
51 ws: WebSocketUpgrade,
52) -> impl IntoResponse {
53 ws.on_upgrade(move |socket| async move {
54 let (socket_write, socket_read) = socket.split();
55 let write = socket_write.with(|msg: StreamMsg| async move {
56 Ok::<_, axum::Error>(match msg {
57 StreamMsg::Str(msg) => Message::Text(msg.into()),
58 StreamMsg::Ping => Message::Ping(b"ping"[..].into()),
59 StreamMsg::Pong => Message::Pong(b""[..].into()),
60 })
61 });
62 let read = socket_read.filter_map(|msg| async move {
63 match msg {
64 Ok(Message::Text(t)) => Some(Ok(StreamMsg::Str(t.as_str().to_string()))),
65 Ok(Message::Pong(_)) => Some(Ok(StreamMsg::Pong)),
66 Ok(_) => None,
67 Err(e) => Some(Err(e)),
68 }
69 });
70 tokio::pin!(write);
71 tokio::pin!(read);
72 drop(serve_stream_sink(&io, write, read, config).await);
73 })
74}
75
76pub fn jsonrpc_router(
82 path: &str,
83 rpc: Arc<MetaIoHandler<Option<Session>>>,
84 websocket_config: StreamServerConfig,
85) -> Router {
86 Router::new()
87 .route(
88 path,
89 post(handle_jsonrpc::<Option<Session>>).get(handle_jsonrpc_ws::<Option<Session>>),
90 )
91 .layer(Extension(rpc))
92 .layer(Extension(websocket_config))
93}