server/
server.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//! Example HTTP, WebSocket and TCP JSON-RPC server.

use std::{sync::Arc, time::Duration};

use futures_util::{SinkExt, TryStreamExt};
use jsonrpc_core::{MetaIoHandler, Params};
use jsonrpc_utils::{
    axum_utils::jsonrpc_router,
    pub_sub::{add_pub_sub, PublishMsg},
    stream::{serve_stream_sink, StreamMsg, StreamServerConfig},
};
use tokio::net::TcpListener;
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError};

#[tokio::main]
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    rpc.add_method("sleep", |params: Params| async move {
        let (x,): (u64,) = params.parse()?;
        tokio::time::sleep(Duration::from_secs(x)).await;
        Ok(x.into())
    });
    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
    rpc.add_method("value", |params: Params| async move {
        let x: Option<u64> = params.parse()?;
        Ok(x.unwrap_or_default().into())
    });
    rpc.add_method("add", |params: Params| async move {
        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
        let sum = x + y + z;
        Ok(sum.into())
    });

    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        |params: Params| {
            let (interval,): (u64,) = params.parse()?;
            if interval > 0 {
                Ok(async_stream::stream! {
                    for i in 0..10 {
                        tokio::time::sleep(Duration::from_secs(interval)).await;
                        yield PublishMsg::result(&i);
                    }
                    yield PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "ended".into(),
                        data: None,
                    });
                })
            } else {
                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
            }
        },
    );
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4);

    // HTTP and WS server.
    let ws_config = stream_config.clone().with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);

    // You can use additional tower-http middlewares to add e.g. CORS.
    tokio::spawn(async move {
        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
        axum::serve(listener, app).await.unwrap();
    });

    // TCP server with line delimited json codec.
    //
    // You can also use other transports (e.g. TLS, unix socket) and codecs
    // (e.g. netstring, JSON splitter).
    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
    while let Ok((s, _)) = listener.accept().await {
        let rpc = rpc.clone();
        let stream_config = stream_config.clone();
        let codec = codec.clone();
        tokio::spawn(async move {
            let (r, w) = s.into_split();
            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
            let w = FramedWrite::new(w, codec).with(|msg| async move {
                Ok::<_, LinesCodecError>(match msg {
                    StreamMsg::Str(msg) => msg,
                    _ => "".into(),
                })
            });
            tokio::pin!(w);
            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
        });
    }
}