Struct jsonrpc_utils::pub_sub::PublishMsg
source · pub struct PublishMsg<T> { /* private fields */ }
Available on crate feature
server
only.Expand description
Inner message published to subscribers.
Implementations§
source§impl<T: Serialize> PublishMsg<T>
impl<T: Serialize> PublishMsg<T>
sourcepub fn result(value: &T) -> Self
pub fn result(value: &T) -> Self
Create a new “result” message by serializing the value into JSON.
If serialization fails, an “error” message is created returned instead.
Examples found in repository?
examples/server-macros.rs (line 54)
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
fn subscribe(&self, interval: u64) -> Result<Self::S> {
if interval > 0 {
Ok(Box::pin(async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(interval)).await;
yield PublishMsg::result(&i);
}
yield PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "ended".into(),
data: None,
});
}))
} else {
Err(jsonrpc_core::Error::invalid_params("invalid interval"))
}
}
More examples
examples/server-macros-custom-error.rs (line 83)
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
fn subscribe(&self, interval: u64) -> Result<Self::S> {
if interval > 0 {
Ok(Box::pin(async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(interval)).await;
yield PublishMsg::result(&i);
}
yield PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "ended".into(),
data: None,
});
}))
} else {
Err(MyError {
code: ErrorCode::InvalidParams.code(),
message: "invalid interval".into(),
data: None,
})
}
}
examples/broadcast.rs (line 27)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() {
let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
let (tx, _) = broadcast::channel(8);
tokio::spawn({
let tx = tx.clone();
async move {
for i in 0u64.. {
// Error can be ignored.
//
// It is recommended to broadcast already serialized
// `PublishMsg`. This way it only need to serialized once.
drop(tx.send(PublishMsg::result(&i)));
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
add_pub_sub(
&mut rpc,
"subscribe",
"subscription",
"unsubscribe",
move |_params: Params| {
Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
result.unwrap_or_else(|_| {
PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "lagged".into(),
data: None,
})
})
}))
},
);
let rpc = Arc::new(rpc);
let config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4)
.with_keep_alive(true);
let app = jsonrpc_router("/rpc", rpc.clone(), config);
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
examples/server.rs (line 45)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
async fn main() {
let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
rpc.add_method("sleep", |params: Params| async move {
let (x,): (u64,) = params.parse()?;
tokio::time::sleep(Duration::from_secs(x)).await;
Ok(x.into())
});
rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
rpc.add_method("value", |params: Params| async move {
let x: Option<u64> = params.parse()?;
Ok(x.unwrap_or_default().into())
});
rpc.add_method("add", |params: Params| async move {
let ((x, y), z): ((i32, i32), i32) = params.parse()?;
let sum = x + y + z;
Ok(sum.into())
});
add_pub_sub(
&mut rpc,
"subscribe",
"subscription",
"unsubscribe",
|params: Params| {
let (interval,): (u64,) = params.parse()?;
if interval > 0 {
Ok(async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(interval)).await;
yield PublishMsg::result(&i);
}
yield PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "ended".into(),
data: None,
});
})
} else {
Err(jsonrpc_core::Error::invalid_params("invalid interval"))
}
},
);
let rpc = Arc::new(rpc);
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4);
// HTTP and WS server.
let ws_config = stream_config.clone().with_keep_alive(true);
let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
// You can use additional tower-http middlewares to add e.g. CORS.
tokio::spawn(async move {
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
});
// TCP server with line delimited json codec.
//
// You can also use other transports (e.g. TLS, unix socket) and codecs
// (e.g. netstring, JSON splitter).
let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
while let Ok((s, _)) = listener.accept().await {
let rpc = rpc.clone();
let stream_config = stream_config.clone();
let codec = codec.clone();
tokio::spawn(async move {
let (r, w) = s.into_split();
let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
let w = FramedWrite::new(w, codec).with(|msg| async move {
Ok::<_, LinesCodecError>(match msg {
StreamMsg::Str(msg) => msg,
_ => "".into(),
})
});
tokio::pin!(w);
drop(serve_stream_sink(&rpc, w, r, stream_config).await);
});
}
}
source§impl<T> PublishMsg<T>
impl<T> PublishMsg<T>
sourcepub fn error(err: &Error) -> Self
pub fn error(err: &Error) -> Self
Create a new “error” message by serializing the JSONRPC error object.
Panics
If serializing the error fails.
Examples found in repository?
examples/server-macros.rs (lines 56-60)
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
fn subscribe(&self, interval: u64) -> Result<Self::S> {
if interval > 0 {
Ok(Box::pin(async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(interval)).await;
yield PublishMsg::result(&i);
}
yield PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "ended".into(),
data: None,
});
}))
} else {
Err(jsonrpc_core::Error::invalid_params("invalid interval"))
}
}
More examples
examples/server-macros-custom-error.rs (lines 85-89)
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
fn subscribe(&self, interval: u64) -> Result<Self::S> {
if interval > 0 {
Ok(Box::pin(async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(interval)).await;
yield PublishMsg::result(&i);
}
yield PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "ended".into(),
data: None,
});
}))
} else {
Err(MyError {
code: ErrorCode::InvalidParams.code(),
message: "invalid interval".into(),
data: None,
})
}
}
examples/broadcast.rs (lines 40-44)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() {
let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
let (tx, _) = broadcast::channel(8);
tokio::spawn({
let tx = tx.clone();
async move {
for i in 0u64.. {
// Error can be ignored.
//
// It is recommended to broadcast already serialized
// `PublishMsg`. This way it only need to serialized once.
drop(tx.send(PublishMsg::result(&i)));
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
});
add_pub_sub(
&mut rpc,
"subscribe",
"subscription",
"unsubscribe",
move |_params: Params| {
Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
result.unwrap_or_else(|_| {
PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "lagged".into(),
data: None,
})
})
}))
},
);
let rpc = Arc::new(rpc);
let config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4)
.with_keep_alive(true);
let app = jsonrpc_router("/rpc", rpc.clone(), config);
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
examples/server.rs (lines 47-51)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
async fn main() {
let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
rpc.add_method("sleep", |params: Params| async move {
let (x,): (u64,) = params.parse()?;
tokio::time::sleep(Duration::from_secs(x)).await;
Ok(x.into())
});
rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
rpc.add_method("value", |params: Params| async move {
let x: Option<u64> = params.parse()?;
Ok(x.unwrap_or_default().into())
});
rpc.add_method("add", |params: Params| async move {
let ((x, y), z): ((i32, i32), i32) = params.parse()?;
let sum = x + y + z;
Ok(sum.into())
});
add_pub_sub(
&mut rpc,
"subscribe",
"subscription",
"unsubscribe",
|params: Params| {
let (interval,): (u64,) = params.parse()?;
if interval > 0 {
Ok(async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(interval)).await;
yield PublishMsg::result(&i);
}
yield PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "ended".into(),
data: None,
});
})
} else {
Err(jsonrpc_core::Error::invalid_params("invalid interval"))
}
},
);
let rpc = Arc::new(rpc);
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4);
// HTTP and WS server.
let ws_config = stream_config.clone().with_keep_alive(true);
let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
// You can use additional tower-http middlewares to add e.g. CORS.
tokio::spawn(async move {
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
});
// TCP server with line delimited json codec.
//
// You can also use other transports (e.g. TLS, unix socket) and codecs
// (e.g. netstring, JSON splitter).
let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
while let Ok((s, _)) = listener.accept().await {
let rpc = rpc.clone();
let stream_config = stream_config.clone();
let codec = codec.clone();
tokio::spawn(async move {
let (r, w) = s.into_split();
let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
let w = FramedWrite::new(w, codec).with(|msg| async move {
Ok::<_, LinesCodecError>(match msg {
StreamMsg::Str(msg) => msg,
_ => "".into(),
})
});
tokio::pin!(w);
drop(serve_stream_sink(&rpc, w, r, stream_config).await);
});
}
}
sourcepub fn result_raw_json(value: impl Into<Arc<str>>) -> Self
pub fn result_raw_json(value: impl Into<Arc<str>>) -> Self
Create a new “result” message.
value
must be valid JSON.
sourcepub fn error_raw_json(value: impl Into<Arc<str>>) -> Self
pub fn error_raw_json(value: impl Into<Arc<str>>) -> Self
Create a new “error” message.
value
must be valid JSON.
Trait Implementations§
source§impl<T: Clone> Clone for PublishMsg<T>
impl<T: Clone> Clone for PublishMsg<T>
source§fn clone(&self) -> PublishMsg<T>
fn clone(&self) -> PublishMsg<T>
Returns a copy of the value. Read more
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreAuto Trait Implementations§
impl<T> RefUnwindSafe for PublishMsg<T>where
T: RefUnwindSafe,
impl<T> Send for PublishMsg<T>where
T: Send,
impl<T> Sync for PublishMsg<T>where
T: Sync,
impl<T> Unpin for PublishMsg<T>where
T: Unpin,
impl<T> UnwindSafe for PublishMsg<T>where
T: UnwindSafe,
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more