server_macros_custom_error/
server-macros-custom-error.rs1use std::{fmt::Display, sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use futures_core::{stream::BoxStream, Stream};
7use jsonrpc_core::{ErrorCode, MetaIoHandler};
8use jsonrpc_utils::{
9 axum_utils::jsonrpc_router, pub_sub::PublishMsg, rpc, stream::StreamServerConfig,
10};
11use serde_json::Value;
12
13struct MyError {
14 code: i64,
15 message: String,
16 data: Option<Value>,
17}
18
19impl<E: Display> From<E> for MyError {
20 fn from(e: E) -> Self {
21 Self {
22 code: ErrorCode::InternalError.code(),
23 message: e.to_string(),
24 data: None,
25 }
26 }
27}
28
29impl From<MyError> for jsonrpc_core::Error {
30 fn from(e: MyError) -> Self {
31 Self {
32 code: e.code.into(),
33 message: e.message,
34 data: e.data,
35 }
36 }
37}
38
39type Result<T, E = MyError> = std::result::Result<T, E>;
40
41#[rpc]
42#[async_trait]
43trait MyRpc {
44 async fn sleep(&self, x: u64) -> Result<u64>;
45 async fn value(&self, x: Option<u64>) -> Result<u64>;
46 async fn add(&self, (x, y): (i32, i32), z: i32) -> Result<i32>;
47 #[rpc(name = "@ping")]
48 fn ping(&self) -> Result<String>;
49
50 type S: Stream<Item = PublishMsg<u64>> + Send + 'static;
51 #[rpc(pub_sub(notify = "subscription", unsubscribe = "unsubscribe"))]
52 fn subscribe(&self, interval: u64) -> Result<Self::S>;
53}
54
55#[derive(Clone)]
56struct RpcImpl;
57
58#[async_trait]
59impl MyRpc for RpcImpl {
60 async fn sleep(&self, x: u64) -> Result<u64> {
61 tokio::time::sleep(Duration::from_secs(x)).await;
62 Ok(x)
63 }
64
65 async fn value(&self, x: Option<u64>) -> Result<u64> {
66 Ok(x.unwrap_or_default())
67 }
68
69 async fn add(&self, (x, y): (i32, i32), z: i32) -> Result<i32> {
70 Ok(x + y + z)
71 }
72
73 fn ping(&self) -> Result<String> {
74 Ok("pong".into())
75 }
76
77 type S = BoxStream<'static, PublishMsg<u64>>;
78 fn subscribe(&self, interval: u64) -> Result<Self::S> {
79 if interval > 0 {
80 Ok(Box::pin(async_stream::stream! {
81 for i in 0..10 {
82 tokio::time::sleep(Duration::from_secs(interval)).await;
83 yield PublishMsg::result(&i);
84 }
85 yield PublishMsg::error(&jsonrpc_core::Error {
86 code: jsonrpc_core::ErrorCode::ServerError(-32000),
87 message: "ended".into(),
88 data: None,
89 });
90 }))
91 } else {
92 Err(MyError {
93 code: ErrorCode::InvalidParams.code(),
94 message: "invalid interval".into(),
95 data: None,
96 })
97 }
98 }
99}
100
101#[tokio::main]
102async fn main() {
103 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
104 add_my_rpc_methods(&mut rpc, RpcImpl);
105 let rpc = Arc::new(rpc);
106 let stream_config = StreamServerConfig::default().with_keep_alive(true);
107
108 let app = jsonrpc_router("/rpc", rpc, stream_config);
109 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
110 axum::serve(listener, app).await.unwrap();
111}