Function add_pub_sub

Source
pub fn add_pub_sub<T: Send + 'static>(
    io: &mut MetaIoHandler<Option<Session>>,
    subscribe_method: &str,
    notify_method: &str,
    unsubscribe_method: &str,
    pubsub: impl PubSub<T> + Clone + Send + Sync + 'static,
)
Available on crate feature server only.
Expand description

Add subscribe and unsubscribe methods to the jsonrpc handler.

Respond to subscription calls with a stream or an error. If a stream is returned, a subscription id is automatically generated. Any results produced by the stream will be sent to the client along with the subscription id. The stream is dropped if the client calls the unsubscribe method with the subscription id or if it is disconnected.

Examples found in repository?
examples/broadcast.rs (lines 32-48)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    let (tx, _) = broadcast::channel(8);
19    tokio::spawn({
20        let tx = tx.clone();
21        async move {
22            for i in 0u64.. {
23                // Error can be ignored.
24                //
25                // It is recommended to broadcast already serialized
26                // `PublishMsg`. This way it only need to serialized once.
27                drop(tx.send(PublishMsg::result(&i)));
28                tokio::time::sleep(Duration::from_secs(1)).await;
29            }
30        }
31    });
32    add_pub_sub(
33        &mut rpc,
34        "subscribe",
35        "subscription",
36        "unsubscribe",
37        move |_params: Params| {
38            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39                result.unwrap_or_else(|_| {
40                    PublishMsg::error(&jsonrpc_core::Error {
41                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
42                        message: "lagged".into(),
43                        data: None,
44                    })
45                })
46            }))
47        },
48    );
49    let rpc = Arc::new(rpc);
50    let config = StreamServerConfig::default()
51        .with_channel_size(4)
52        .with_pipeline_size(4)
53        .with_keep_alive(true);
54    let app = jsonrpc_router("/rpc", rpc.clone(), config);
55    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56    axum::serve(listener, app).await.unwrap();
57}
More examples
Hide additional examples
examples/server.rs (lines 34-57)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    rpc.add_method("sleep", |params: Params| async move {
19        let (x,): (u64,) = params.parse()?;
20        tokio::time::sleep(Duration::from_secs(x)).await;
21        Ok(x.into())
22    });
23    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24    rpc.add_method("value", |params: Params| async move {
25        let x: Option<u64> = params.parse()?;
26        Ok(x.unwrap_or_default().into())
27    });
28    rpc.add_method("add", |params: Params| async move {
29        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30        let sum = x + y + z;
31        Ok(sum.into())
32    });
33
34    add_pub_sub(
35        &mut rpc,
36        "subscribe",
37        "subscription",
38        "unsubscribe",
39        |params: Params| {
40            let (interval,): (u64,) = params.parse()?;
41            if interval > 0 {
42                Ok(async_stream::stream! {
43                    for i in 0..10 {
44                        tokio::time::sleep(Duration::from_secs(interval)).await;
45                        yield PublishMsg::result(&i);
46                    }
47                    yield PublishMsg::error(&jsonrpc_core::Error {
48                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
49                        message: "ended".into(),
50                        data: None,
51                    });
52                })
53            } else {
54                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55            }
56        },
57    );
58    let rpc = Arc::new(rpc);
59    let stream_config = StreamServerConfig::default()
60        .with_channel_size(4)
61        .with_pipeline_size(4);
62
63    // HTTP and WS server.
64    let ws_config = stream_config.clone().with_keep_alive(true);
65    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67    // You can use additional tower-http middlewares to add e.g. CORS.
68    tokio::spawn(async move {
69        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70        axum::serve(listener, app).await.unwrap();
71    });
72
73    // TCP server with line delimited json codec.
74    //
75    // You can also use other transports (e.g. TLS, unix socket) and codecs
76    // (e.g. netstring, JSON splitter).
77    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79    while let Ok((s, _)) = listener.accept().await {
80        let rpc = rpc.clone();
81        let stream_config = stream_config.clone();
82        let codec = codec.clone();
83        tokio::spawn(async move {
84            let (r, w) = s.into_split();
85            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86            let w = FramedWrite::new(w, codec).with(|msg| async move {
87                Ok::<_, LinesCodecError>(match msg {
88                    StreamMsg::Str(msg) => msg,
89                    _ => "".into(),
90                })
91            });
92            tokio::pin!(w);
93            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94        });
95    }
96}