server_macros/
server-macros.rs1use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use futures_core::{stream::BoxStream, Stream};
7use jsonrpc_core::{MetaIoHandler, Result};
8use jsonrpc_utils::{
9 axum_utils::jsonrpc_router, pub_sub::PublishMsg, rpc, stream::StreamServerConfig,
10};
11
12#[rpc]
13#[async_trait]
14trait MyRpc {
15 async fn sleep(&self, x: u64) -> Result<u64>;
16 async fn value(&self, x: Option<u64>) -> Result<u64>;
17 async fn add(&self, (x, y): (i32, i32), z: Option<i32>) -> Result<i32>;
18 #[rpc(name = "@ping")]
19 fn ping(&self) -> Result<String>;
20
21 type S: Stream<Item = PublishMsg<u64>> + Send + 'static;
22 #[rpc(pub_sub(notify = "subscription", unsubscribe = "unsubscribe"))]
23 fn subscribe(&self, interval: u64) -> Result<Self::S>;
24}
25
26#[derive(Clone)]
27struct RpcImpl;
28
29#[async_trait]
30impl MyRpc for RpcImpl {
31 async fn sleep(&self, x: u64) -> Result<u64> {
32 tokio::time::sleep(Duration::from_secs(x)).await;
33 Ok(x)
34 }
35
36 async fn value(&self, x: Option<u64>) -> Result<u64> {
37 Ok(x.unwrap_or_default())
38 }
39
40 async fn add(&self, (x, y): (i32, i32), z: Option<i32>) -> Result<i32> {
41 Ok(x + y + z.unwrap_or_default())
42 }
43
44 fn ping(&self) -> Result<String> {
45 Ok("pong".into())
46 }
47
48 type S = BoxStream<'static, PublishMsg<u64>>;
49 fn subscribe(&self, interval: u64) -> Result<Self::S> {
50 if interval > 0 {
51 Ok(Box::pin(async_stream::stream! {
52 for i in 0..10 {
53 tokio::time::sleep(Duration::from_secs(interval)).await;
54 yield PublishMsg::result(&i);
55 }
56 yield PublishMsg::error(&jsonrpc_core::Error {
57 code: jsonrpc_core::ErrorCode::ServerError(-32000),
58 message: "ended".into(),
59 data: None,
60 });
61 }))
62 } else {
63 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
64 }
65 }
66}
67
68#[tokio::main]
69async fn main() {
70 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
71 add_my_rpc_methods(&mut rpc, RpcImpl);
72 let rpc = Arc::new(rpc);
73 let stream_config = StreamServerConfig::default().with_keep_alive(true);
74
75 let app = jsonrpc_router("/rpc", rpc, stream_config);
76 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
77 axum::serve(listener, app).await.unwrap();
78}