server_macros_custom_error/
server-macros-custom-error.rs

1//! Example server with macros.
2
3use std::{fmt::Display, sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use futures_core::{stream::BoxStream, Stream};
7use jsonrpc_core::{ErrorCode, MetaIoHandler};
8use jsonrpc_utils::{
9    axum_utils::jsonrpc_router, pub_sub::PublishMsg, rpc, stream::StreamServerConfig,
10};
11use serde_json::Value;
12
13struct MyError {
14    code: i64,
15    message: String,
16    data: Option<Value>,
17}
18
19impl<E: Display> From<E> for MyError {
20    fn from(e: E) -> Self {
21        Self {
22            code: ErrorCode::InternalError.code(),
23            message: e.to_string(),
24            data: None,
25        }
26    }
27}
28
29impl From<MyError> for jsonrpc_core::Error {
30    fn from(e: MyError) -> Self {
31        Self {
32            code: e.code.into(),
33            message: e.message,
34            data: e.data,
35        }
36    }
37}
38
39type Result<T, E = MyError> = std::result::Result<T, E>;
40
41#[rpc]
42#[async_trait]
43trait MyRpc {
44    async fn sleep(&self, x: u64) -> Result<u64>;
45    async fn value(&self, x: Option<u64>) -> Result<u64>;
46    async fn add(&self, (x, y): (i32, i32), z: i32) -> Result<i32>;
47    #[rpc(name = "@ping")]
48    fn ping(&self) -> Result<String>;
49
50    type S: Stream<Item = PublishMsg<u64>> + Send + 'static;
51    #[rpc(pub_sub(notify = "subscription", unsubscribe = "unsubscribe"))]
52    fn subscribe(&self, interval: u64) -> Result<Self::S>;
53}
54
55#[derive(Clone)]
56struct RpcImpl;
57
58#[async_trait]
59impl MyRpc for RpcImpl {
60    async fn sleep(&self, x: u64) -> Result<u64> {
61        tokio::time::sleep(Duration::from_secs(x)).await;
62        Ok(x)
63    }
64
65    async fn value(&self, x: Option<u64>) -> Result<u64> {
66        Ok(x.unwrap_or_default())
67    }
68
69    async fn add(&self, (x, y): (i32, i32), z: i32) -> Result<i32> {
70        Ok(x + y + z)
71    }
72
73    fn ping(&self) -> Result<String> {
74        Ok("pong".into())
75    }
76
77    type S = BoxStream<'static, PublishMsg<u64>>;
78    fn subscribe(&self, interval: u64) -> Result<Self::S> {
79        if interval > 0 {
80            Ok(Box::pin(async_stream::stream! {
81                for i in 0..10 {
82                    tokio::time::sleep(Duration::from_secs(interval)).await;
83                    yield PublishMsg::result(&i);
84                }
85                yield PublishMsg::error(&jsonrpc_core::Error {
86                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
87                    message: "ended".into(),
88                    data: None,
89                });
90            }))
91        } else {
92            Err(MyError {
93                code: ErrorCode::InvalidParams.code(),
94                message: "invalid interval".into(),
95                data: None,
96            })
97        }
98    }
99}
100
101#[tokio::main]
102async fn main() {
103    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
104    add_my_rpc_methods(&mut rpc, RpcImpl);
105    let rpc = Arc::new(rpc);
106    let stream_config = StreamServerConfig::default().with_keep_alive(true);
107
108    let app = jsonrpc_router("/rpc", rpc, stream_config);
109    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
110    axum::serve(listener, app).await.unwrap();
111}