pub struct PublishMsg<T> { /* private fields */ }
Available on crate feature
server
only.Expand description
Inner message published to subscribers.
Implementations§
Source§impl<T: Serialize> PublishMsg<T>
impl<T: Serialize> PublishMsg<T>
Sourcepub fn result(value: &T) -> Self
pub fn result(value: &T) -> Self
Create a new “result” message by serializing the value into JSON.
If serialization fails, an “error” message is created returned instead.
Examples found in repository?
examples/server-macros.rs (line 54)
49 fn subscribe(&self, interval: u64) -> Result<Self::S> {
50 if interval > 0 {
51 Ok(Box::pin(async_stream::stream! {
52 for i in 0..10 {
53 tokio::time::sleep(Duration::from_secs(interval)).await;
54 yield PublishMsg::result(&i);
55 }
56 yield PublishMsg::error(&jsonrpc_core::Error {
57 code: jsonrpc_core::ErrorCode::ServerError(-32000),
58 message: "ended".into(),
59 data: None,
60 });
61 }))
62 } else {
63 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
64 }
65 }
More examples
examples/server-macros-custom-error.rs (line 83)
78 fn subscribe(&self, interval: u64) -> Result<Self::S> {
79 if interval > 0 {
80 Ok(Box::pin(async_stream::stream! {
81 for i in 0..10 {
82 tokio::time::sleep(Duration::from_secs(interval)).await;
83 yield PublishMsg::result(&i);
84 }
85 yield PublishMsg::error(&jsonrpc_core::Error {
86 code: jsonrpc_core::ErrorCode::ServerError(-32000),
87 message: "ended".into(),
88 data: None,
89 });
90 }))
91 } else {
92 Err(MyError {
93 code: ErrorCode::InvalidParams.code(),
94 message: "invalid interval".into(),
95 data: None,
96 })
97 }
98 }
examples/broadcast.rs (line 27)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 let (tx, _) = broadcast::channel(8);
19 tokio::spawn({
20 let tx = tx.clone();
21 async move {
22 for i in 0u64.. {
23 // Error can be ignored.
24 //
25 // It is recommended to broadcast already serialized
26 // `PublishMsg`. This way it only need to serialized once.
27 drop(tx.send(PublishMsg::result(&i)));
28 tokio::time::sleep(Duration::from_secs(1)).await;
29 }
30 }
31 });
32 add_pub_sub(
33 &mut rpc,
34 "subscribe",
35 "subscription",
36 "unsubscribe",
37 move |_params: Params| {
38 Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39 result.unwrap_or_else(|_| {
40 PublishMsg::error(&jsonrpc_core::Error {
41 code: jsonrpc_core::ErrorCode::ServerError(-32000),
42 message: "lagged".into(),
43 data: None,
44 })
45 })
46 }))
47 },
48 );
49 let rpc = Arc::new(rpc);
50 let config = StreamServerConfig::default()
51 .with_channel_size(4)
52 .with_pipeline_size(4)
53 .with_keep_alive(true);
54 let app = jsonrpc_router("/rpc", rpc.clone(), config);
55 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56 axum::serve(listener, app).await.unwrap();
57}
examples/server.rs (line 45)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 rpc.add_method("sleep", |params: Params| async move {
19 let (x,): (u64,) = params.parse()?;
20 tokio::time::sleep(Duration::from_secs(x)).await;
21 Ok(x.into())
22 });
23 rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24 rpc.add_method("value", |params: Params| async move {
25 let x: Option<u64> = params.parse()?;
26 Ok(x.unwrap_or_default().into())
27 });
28 rpc.add_method("add", |params: Params| async move {
29 let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30 let sum = x + y + z;
31 Ok(sum.into())
32 });
33
34 add_pub_sub(
35 &mut rpc,
36 "subscribe",
37 "subscription",
38 "unsubscribe",
39 |params: Params| {
40 let (interval,): (u64,) = params.parse()?;
41 if interval > 0 {
42 Ok(async_stream::stream! {
43 for i in 0..10 {
44 tokio::time::sleep(Duration::from_secs(interval)).await;
45 yield PublishMsg::result(&i);
46 }
47 yield PublishMsg::error(&jsonrpc_core::Error {
48 code: jsonrpc_core::ErrorCode::ServerError(-32000),
49 message: "ended".into(),
50 data: None,
51 });
52 })
53 } else {
54 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55 }
56 },
57 );
58 let rpc = Arc::new(rpc);
59 let stream_config = StreamServerConfig::default()
60 .with_channel_size(4)
61 .with_pipeline_size(4);
62
63 // HTTP and WS server.
64 let ws_config = stream_config.clone().with_keep_alive(true);
65 let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67 // You can use additional tower-http middlewares to add e.g. CORS.
68 tokio::spawn(async move {
69 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70 axum::serve(listener, app).await.unwrap();
71 });
72
73 // TCP server with line delimited json codec.
74 //
75 // You can also use other transports (e.g. TLS, unix socket) and codecs
76 // (e.g. netstring, JSON splitter).
77 let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78 let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79 while let Ok((s, _)) = listener.accept().await {
80 let rpc = rpc.clone();
81 let stream_config = stream_config.clone();
82 let codec = codec.clone();
83 tokio::spawn(async move {
84 let (r, w) = s.into_split();
85 let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86 let w = FramedWrite::new(w, codec).with(|msg| async move {
87 Ok::<_, LinesCodecError>(match msg {
88 StreamMsg::Str(msg) => msg,
89 _ => "".into(),
90 })
91 });
92 tokio::pin!(w);
93 drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94 });
95 }
96}
Source§impl<T> PublishMsg<T>
impl<T> PublishMsg<T>
Sourcepub fn error(err: &Error) -> Self
pub fn error(err: &Error) -> Self
Create a new “error” message by serializing the JSONRPC error object.
§Panics
If serializing the error fails.
Examples found in repository?
examples/server-macros.rs (lines 56-60)
49 fn subscribe(&self, interval: u64) -> Result<Self::S> {
50 if interval > 0 {
51 Ok(Box::pin(async_stream::stream! {
52 for i in 0..10 {
53 tokio::time::sleep(Duration::from_secs(interval)).await;
54 yield PublishMsg::result(&i);
55 }
56 yield PublishMsg::error(&jsonrpc_core::Error {
57 code: jsonrpc_core::ErrorCode::ServerError(-32000),
58 message: "ended".into(),
59 data: None,
60 });
61 }))
62 } else {
63 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
64 }
65 }
More examples
examples/server-macros-custom-error.rs (lines 85-89)
78 fn subscribe(&self, interval: u64) -> Result<Self::S> {
79 if interval > 0 {
80 Ok(Box::pin(async_stream::stream! {
81 for i in 0..10 {
82 tokio::time::sleep(Duration::from_secs(interval)).await;
83 yield PublishMsg::result(&i);
84 }
85 yield PublishMsg::error(&jsonrpc_core::Error {
86 code: jsonrpc_core::ErrorCode::ServerError(-32000),
87 message: "ended".into(),
88 data: None,
89 });
90 }))
91 } else {
92 Err(MyError {
93 code: ErrorCode::InvalidParams.code(),
94 message: "invalid interval".into(),
95 data: None,
96 })
97 }
98 }
examples/broadcast.rs (lines 40-44)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 let (tx, _) = broadcast::channel(8);
19 tokio::spawn({
20 let tx = tx.clone();
21 async move {
22 for i in 0u64.. {
23 // Error can be ignored.
24 //
25 // It is recommended to broadcast already serialized
26 // `PublishMsg`. This way it only need to serialized once.
27 drop(tx.send(PublishMsg::result(&i)));
28 tokio::time::sleep(Duration::from_secs(1)).await;
29 }
30 }
31 });
32 add_pub_sub(
33 &mut rpc,
34 "subscribe",
35 "subscription",
36 "unsubscribe",
37 move |_params: Params| {
38 Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39 result.unwrap_or_else(|_| {
40 PublishMsg::error(&jsonrpc_core::Error {
41 code: jsonrpc_core::ErrorCode::ServerError(-32000),
42 message: "lagged".into(),
43 data: None,
44 })
45 })
46 }))
47 },
48 );
49 let rpc = Arc::new(rpc);
50 let config = StreamServerConfig::default()
51 .with_channel_size(4)
52 .with_pipeline_size(4)
53 .with_keep_alive(true);
54 let app = jsonrpc_router("/rpc", rpc.clone(), config);
55 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56 axum::serve(listener, app).await.unwrap();
57}
examples/server.rs (lines 47-51)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 rpc.add_method("sleep", |params: Params| async move {
19 let (x,): (u64,) = params.parse()?;
20 tokio::time::sleep(Duration::from_secs(x)).await;
21 Ok(x.into())
22 });
23 rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24 rpc.add_method("value", |params: Params| async move {
25 let x: Option<u64> = params.parse()?;
26 Ok(x.unwrap_or_default().into())
27 });
28 rpc.add_method("add", |params: Params| async move {
29 let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30 let sum = x + y + z;
31 Ok(sum.into())
32 });
33
34 add_pub_sub(
35 &mut rpc,
36 "subscribe",
37 "subscription",
38 "unsubscribe",
39 |params: Params| {
40 let (interval,): (u64,) = params.parse()?;
41 if interval > 0 {
42 Ok(async_stream::stream! {
43 for i in 0..10 {
44 tokio::time::sleep(Duration::from_secs(interval)).await;
45 yield PublishMsg::result(&i);
46 }
47 yield PublishMsg::error(&jsonrpc_core::Error {
48 code: jsonrpc_core::ErrorCode::ServerError(-32000),
49 message: "ended".into(),
50 data: None,
51 });
52 })
53 } else {
54 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55 }
56 },
57 );
58 let rpc = Arc::new(rpc);
59 let stream_config = StreamServerConfig::default()
60 .with_channel_size(4)
61 .with_pipeline_size(4);
62
63 // HTTP and WS server.
64 let ws_config = stream_config.clone().with_keep_alive(true);
65 let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67 // You can use additional tower-http middlewares to add e.g. CORS.
68 tokio::spawn(async move {
69 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70 axum::serve(listener, app).await.unwrap();
71 });
72
73 // TCP server with line delimited json codec.
74 //
75 // You can also use other transports (e.g. TLS, unix socket) and codecs
76 // (e.g. netstring, JSON splitter).
77 let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78 let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79 while let Ok((s, _)) = listener.accept().await {
80 let rpc = rpc.clone();
81 let stream_config = stream_config.clone();
82 let codec = codec.clone();
83 tokio::spawn(async move {
84 let (r, w) = s.into_split();
85 let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86 let w = FramedWrite::new(w, codec).with(|msg| async move {
87 Ok::<_, LinesCodecError>(match msg {
88 StreamMsg::Str(msg) => msg,
89 _ => "".into(),
90 })
91 });
92 tokio::pin!(w);
93 drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94 });
95 }
96}
Sourcepub fn result_raw_json(value: impl Into<Arc<str>>) -> Self
pub fn result_raw_json(value: impl Into<Arc<str>>) -> Self
Create a new “result” message.
value
must be valid JSON.
Sourcepub fn error_raw_json(value: impl Into<Arc<str>>) -> Self
pub fn error_raw_json(value: impl Into<Arc<str>>) -> Self
Create a new “error” message.
value
must be valid JSON.
Trait Implementations§
Source§impl<T: Clone> Clone for PublishMsg<T>
impl<T: Clone> Clone for PublishMsg<T>
Source§fn clone(&self) -> PublishMsg<T>
fn clone(&self) -> PublishMsg<T>
Returns a copy of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreAuto Trait Implementations§
impl<T> Freeze for PublishMsg<T>
impl<T> RefUnwindSafe for PublishMsg<T>where
T: RefUnwindSafe,
impl<T> Send for PublishMsg<T>where
T: Send,
impl<T> Sync for PublishMsg<T>where
T: Sync,
impl<T> Unpin for PublishMsg<T>where
T: Unpin,
impl<T> UnwindSafe for PublishMsg<T>where
T: UnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more