1use std::{sync::Arc, time::Duration};
4
5use futures_util::StreamExt;
6use jsonrpc_core::{MetaIoHandler, Params};
7use jsonrpc_utils::{
8 axum_utils::jsonrpc_router,
9 pub_sub::{add_pub_sub, PublishMsg},
10 stream::StreamServerConfig,
11};
12use tokio::sync::broadcast;
13use tokio_stream::wrappers::BroadcastStream;
14
15#[tokio::main]
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 let (tx, _) = broadcast::channel(8);
19 tokio::spawn({
20 let tx = tx.clone();
21 async move {
22 for i in 0u64.. {
23 drop(tx.send(PublishMsg::result(&i)));
28 tokio::time::sleep(Duration::from_secs(1)).await;
29 }
30 }
31 });
32 add_pub_sub(
33 &mut rpc,
34 "subscribe",
35 "subscription",
36 "unsubscribe",
37 move |_params: Params| {
38 Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39 result.unwrap_or_else(|_| {
40 PublishMsg::error(&jsonrpc_core::Error {
41 code: jsonrpc_core::ErrorCode::ServerError(-32000),
42 message: "lagged".into(),
43 data: None,
44 })
45 })
46 }))
47 },
48 );
49 let rpc = Arc::new(rpc);
50 let config = StreamServerConfig::default()
51 .with_channel_size(4)
52 .with_pipeline_size(4)
53 .with_keep_alive(true);
54 let app = jsonrpc_router("/rpc", rpc.clone(), config);
55 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56 axum::serve(listener, app).await.unwrap();
57}