pub async fn serve_stream_sink<E, T, S>(
rpc: &MetaIoHandler<T>,
sink: impl Sink<StreamMsg<S>, Error = E> + Unpin,
stream: impl Stream<Item = Result<StreamMsg<S>, E>> + Unpin,
config: StreamServerConfig,
) -> Result<(), E>
Available on crate feature
server
only.Expand description
Serve JSON-RPC requests over a bidirectional stream (Stream + Sink).
§Keepalive
We will response to ping messages with pong messages. We will send out ping
messages at the specified interval if keepalive is enabled. If keepalive is
enabled and we don’t receive any messages over the stream for
keep_alive_duration
, we will stop serving (and this function will return).
Examples found in repository?
examples/server.rs (line 93)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 rpc.add_method("sleep", |params: Params| async move {
19 let (x,): (u64,) = params.parse()?;
20 tokio::time::sleep(Duration::from_secs(x)).await;
21 Ok(x.into())
22 });
23 rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24 rpc.add_method("value", |params: Params| async move {
25 let x: Option<u64> = params.parse()?;
26 Ok(x.unwrap_or_default().into())
27 });
28 rpc.add_method("add", |params: Params| async move {
29 let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30 let sum = x + y + z;
31 Ok(sum.into())
32 });
33
34 add_pub_sub(
35 &mut rpc,
36 "subscribe",
37 "subscription",
38 "unsubscribe",
39 |params: Params| {
40 let (interval,): (u64,) = params.parse()?;
41 if interval > 0 {
42 Ok(async_stream::stream! {
43 for i in 0..10 {
44 tokio::time::sleep(Duration::from_secs(interval)).await;
45 yield PublishMsg::result(&i);
46 }
47 yield PublishMsg::error(&jsonrpc_core::Error {
48 code: jsonrpc_core::ErrorCode::ServerError(-32000),
49 message: "ended".into(),
50 data: None,
51 });
52 })
53 } else {
54 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55 }
56 },
57 );
58 let rpc = Arc::new(rpc);
59 let stream_config = StreamServerConfig::default()
60 .with_channel_size(4)
61 .with_pipeline_size(4);
62
63 // HTTP and WS server.
64 let ws_config = stream_config.clone().with_keep_alive(true);
65 let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67 // You can use additional tower-http middlewares to add e.g. CORS.
68 tokio::spawn(async move {
69 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70 axum::serve(listener, app).await.unwrap();
71 });
72
73 // TCP server with line delimited json codec.
74 //
75 // You can also use other transports (e.g. TLS, unix socket) and codecs
76 // (e.g. netstring, JSON splitter).
77 let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78 let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79 while let Ok((s, _)) = listener.accept().await {
80 let rpc = rpc.clone();
81 let stream_config = stream_config.clone();
82 let codec = codec.clone();
83 tokio::spawn(async move {
84 let (r, w) = s.into_split();
85 let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86 let w = FramedWrite::new(w, codec).with(|msg| async move {
87 Ok::<_, LinesCodecError>(match msg {
88 StreamMsg::Str(msg) => msg,
89 _ => "".into(),
90 })
91 });
92 tokio::pin!(w);
93 drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94 });
95 }
96}