1use std::{sync::Arc, time::Duration};
4
5use futures_util::{SinkExt, TryStreamExt};
6use jsonrpc_core::{MetaIoHandler, Params};
7use jsonrpc_utils::{
8 axum_utils::jsonrpc_router,
9 pub_sub::{add_pub_sub, PublishMsg},
10 stream::{serve_stream_sink, StreamMsg, StreamServerConfig},
11};
12use tokio::net::TcpListener;
13use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError};
14
15#[tokio::main]
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 rpc.add_method("sleep", |params: Params| async move {
19 let (x,): (u64,) = params.parse()?;
20 tokio::time::sleep(Duration::from_secs(x)).await;
21 Ok(x.into())
22 });
23 rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24 rpc.add_method("value", |params: Params| async move {
25 let x: Option<u64> = params.parse()?;
26 Ok(x.unwrap_or_default().into())
27 });
28 rpc.add_method("add", |params: Params| async move {
29 let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30 let sum = x + y + z;
31 Ok(sum.into())
32 });
33
34 add_pub_sub(
35 &mut rpc,
36 "subscribe",
37 "subscription",
38 "unsubscribe",
39 |params: Params| {
40 let (interval,): (u64,) = params.parse()?;
41 if interval > 0 {
42 Ok(async_stream::stream! {
43 for i in 0..10 {
44 tokio::time::sleep(Duration::from_secs(interval)).await;
45 yield PublishMsg::result(&i);
46 }
47 yield PublishMsg::error(&jsonrpc_core::Error {
48 code: jsonrpc_core::ErrorCode::ServerError(-32000),
49 message: "ended".into(),
50 data: None,
51 });
52 })
53 } else {
54 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55 }
56 },
57 );
58 let rpc = Arc::new(rpc);
59 let stream_config = StreamServerConfig::default()
60 .with_channel_size(4)
61 .with_pipeline_size(4);
62
63 let ws_config = stream_config.clone().with_keep_alive(true);
65 let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67 tokio::spawn(async move {
69 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70 axum::serve(listener, app).await.unwrap();
71 });
72
73 let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78 let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79 while let Ok((s, _)) = listener.accept().await {
80 let rpc = rpc.clone();
81 let stream_config = stream_config.clone();
82 let codec = codec.clone();
83 tokio::spawn(async move {
84 let (r, w) = s.into_split();
85 let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86 let w = FramedWrite::new(w, codec).with(|msg| async move {
87 Ok::<_, LinesCodecError>(match msg {
88 StreamMsg::Str(msg) => msg,
89 _ => "".into(),
90 })
91 });
92 tokio::pin!(w);
93 drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94 });
95 }
96}