pub async fn serve_stream_sink<E, T: Metadata + From<Session>>(
    rpc: &MetaIoHandler<T>,
    sink: impl Sink<StreamMsg, Error = E> + Unpin,
    stream: impl Stream<Item = Result<StreamMsg, E>> + Unpin,
    config: StreamServerConfig
) -> Result<(), E>
Available on crate feature server only.
Expand description

Serve JSON-RPC requests over a bidirectional stream (Stream + Sink).

Keepalive

TODO: document keepalive mechanism.

Examples found in repository?
examples/server.rs (line 95)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    rpc.add_method("sleep", |params: Params| async move {
        let (x,): (u64,) = params.parse()?;
        tokio::time::sleep(Duration::from_secs(x)).await;
        Ok(x.into())
    });
    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
    rpc.add_method("value", |params: Params| async move {
        let x: Option<u64> = params.parse()?;
        Ok(x.unwrap_or_default().into())
    });
    rpc.add_method("add", |params: Params| async move {
        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
        let sum = x + y + z;
        Ok(sum.into())
    });

    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        |params: Params| {
            let (interval,): (u64,) = params.parse()?;
            if interval > 0 {
                Ok(async_stream::stream! {
                    for i in 0..10 {
                        tokio::time::sleep(Duration::from_secs(interval)).await;
                        yield PublishMsg::result(&i);
                    }
                    yield PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "ended".into(),
                        data: None,
                    });
                })
            } else {
                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
            }
        },
    );
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4);

    // HTTP and WS server.
    let ws_config = stream_config.clone().with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);

    // You can use additional tower-http middlewares to add e.g. CORS.
    tokio::spawn(async move {
        axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
            .serve(app.into_make_service())
            .await
            .unwrap();
    });

    // TCP server with line delimited json codec.
    //
    // You can also use other transports (e.g. TLS, unix socket) and codecs
    // (e.g. netstring, JSON splitter).
    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
    while let Ok((s, _)) = listener.accept().await {
        let rpc = rpc.clone();
        let stream_config = stream_config.clone();
        let codec = codec.clone();
        tokio::spawn(async move {
            let (r, w) = s.into_split();
            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
            let w = FramedWrite::new(w, codec).with(|msg| async move {
                Ok::<_, LinesCodecError>(match msg {
                    StreamMsg::Str(msg) => msg,
                    _ => "".into(),
                })
            });
            tokio::pin!(w);
            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
        });
    }
}