server_macros/
server-macros.rs

1//! Example server with macros.
2
3use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use futures_core::{stream::BoxStream, Stream};
7use jsonrpc_core::{MetaIoHandler, Result};
8use jsonrpc_utils::{
9    axum_utils::jsonrpc_router, pub_sub::PublishMsg, rpc, stream::StreamServerConfig,
10};
11
12#[rpc]
13#[async_trait]
14trait MyRpc {
15    async fn sleep(&self, x: u64) -> Result<u64>;
16    async fn value(&self, x: Option<u64>) -> Result<u64>;
17    async fn add(&self, (x, y): (i32, i32), z: Option<i32>) -> Result<i32>;
18    #[rpc(name = "@ping")]
19    fn ping(&self) -> Result<String>;
20
21    type S: Stream<Item = PublishMsg<u64>> + Send + 'static;
22    #[rpc(pub_sub(notify = "subscription", unsubscribe = "unsubscribe"))]
23    fn subscribe(&self, interval: u64) -> Result<Self::S>;
24}
25
26#[derive(Clone)]
27struct RpcImpl;
28
29#[async_trait]
30impl MyRpc for RpcImpl {
31    async fn sleep(&self, x: u64) -> Result<u64> {
32        tokio::time::sleep(Duration::from_secs(x)).await;
33        Ok(x)
34    }
35
36    async fn value(&self, x: Option<u64>) -> Result<u64> {
37        Ok(x.unwrap_or_default())
38    }
39
40    async fn add(&self, (x, y): (i32, i32), z: Option<i32>) -> Result<i32> {
41        Ok(x + y + z.unwrap_or_default())
42    }
43
44    fn ping(&self) -> Result<String> {
45        Ok("pong".into())
46    }
47
48    type S = BoxStream<'static, PublishMsg<u64>>;
49    fn subscribe(&self, interval: u64) -> Result<Self::S> {
50        if interval > 0 {
51            Ok(Box::pin(async_stream::stream! {
52                for i in 0..10 {
53                    tokio::time::sleep(Duration::from_secs(interval)).await;
54                    yield PublishMsg::result(&i);
55                }
56                yield PublishMsg::error(&jsonrpc_core::Error {
57                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
58                    message: "ended".into(),
59                    data: None,
60                });
61            }))
62        } else {
63            Err(jsonrpc_core::Error::invalid_params("invalid interval"))
64        }
65    }
66}
67
68#[tokio::main]
69async fn main() {
70    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
71    add_my_rpc_methods(&mut rpc, RpcImpl);
72    let rpc = Arc::new(rpc);
73    let stream_config = StreamServerConfig::default().with_keep_alive(true);
74
75    let app = jsonrpc_router("/rpc", rpc, stream_config);
76    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
77    axum::serve(listener, app).await.unwrap();
78}