Function jsonrpc_utils::stream::serve_stream_sink
source · pub async fn serve_stream_sink<E, T: Metadata + From<Session>>(
rpc: &MetaIoHandler<T>,
sink: impl Sink<StreamMsg, Error = E> + Unpin,
stream: impl Stream<Item = Result<StreamMsg, E>> + Unpin,
config: StreamServerConfig
) -> Result<(), E>
Available on crate feature
server
only.Expand description
Serve JSON-RPC requests over a bidirectional stream (Stream + Sink).
Keepalive
TODO: document keepalive mechanism.
Examples found in repository?
examples/server.rs (line 95)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
async fn main() {
let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
rpc.add_method("sleep", |params: Params| async move {
let (x,): (u64,) = params.parse()?;
tokio::time::sleep(Duration::from_secs(x)).await;
Ok(x.into())
});
rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
rpc.add_method("value", |params: Params| async move {
let x: Option<u64> = params.parse()?;
Ok(x.unwrap_or_default().into())
});
rpc.add_method("add", |params: Params| async move {
let ((x, y), z): ((i32, i32), i32) = params.parse()?;
let sum = x + y + z;
Ok(sum.into())
});
add_pub_sub(
&mut rpc,
"subscribe",
"subscription",
"unsubscribe",
|params: Params| {
let (interval,): (u64,) = params.parse()?;
if interval > 0 {
Ok(async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(interval)).await;
yield PublishMsg::result(&i);
}
yield PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "ended".into(),
data: None,
});
})
} else {
Err(jsonrpc_core::Error::invalid_params("invalid interval"))
}
},
);
let rpc = Arc::new(rpc);
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4);
// HTTP and WS server.
let ws_config = stream_config.clone().with_keep_alive(true);
let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
// You can use additional tower-http middlewares to add e.g. CORS.
tokio::spawn(async move {
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
});
// TCP server with line delimited json codec.
//
// You can also use other transports (e.g. TLS, unix socket) and codecs
// (e.g. netstring, JSON splitter).
let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
while let Ok((s, _)) = listener.accept().await {
let rpc = rpc.clone();
let stream_config = stream_config.clone();
let codec = codec.clone();
tokio::spawn(async move {
let (r, w) = s.into_split();
let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
let w = FramedWrite::new(w, codec).with(|msg| async move {
Ok::<_, LinesCodecError>(match msg {
StreamMsg::Str(msg) => msg,
_ => "".into(),
})
});
tokio::pin!(w);
drop(serve_stream_sink(&rpc, w, r, stream_config).await);
});
}
}