broadcast/
broadcast.rs

1//! Example pub/sub server with broadcast.
2
3use std::{sync::Arc, time::Duration};
4
5use futures_util::StreamExt;
6use jsonrpc_core::{MetaIoHandler, Params};
7use jsonrpc_utils::{
8    axum_utils::jsonrpc_router,
9    pub_sub::{add_pub_sub, PublishMsg},
10    stream::StreamServerConfig,
11};
12use tokio::sync::broadcast;
13use tokio_stream::wrappers::BroadcastStream;
14
15#[tokio::main]
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    let (tx, _) = broadcast::channel(8);
19    tokio::spawn({
20        let tx = tx.clone();
21        async move {
22            for i in 0u64.. {
23                // Error can be ignored.
24                //
25                // It is recommended to broadcast already serialized
26                // `PublishMsg`. This way it only need to serialized once.
27                drop(tx.send(PublishMsg::result(&i)));
28                tokio::time::sleep(Duration::from_secs(1)).await;
29            }
30        }
31    });
32    add_pub_sub(
33        &mut rpc,
34        "subscribe",
35        "subscription",
36        "unsubscribe",
37        move |_params: Params| {
38            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39                result.unwrap_or_else(|_| {
40                    PublishMsg::error(&jsonrpc_core::Error {
41                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
42                        message: "lagged".into(),
43                        data: None,
44                    })
45                })
46            }))
47        },
48    );
49    let rpc = Arc::new(rpc);
50    let config = StreamServerConfig::default()
51        .with_channel_size(4)
52        .with_pipeline_size(4)
53        .with_keep_alive(true);
54    let app = jsonrpc_router("/rpc", rpc.clone(), config);
55    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56    axum::serve(listener, app).await.unwrap();
57}