Struct PublishMsg

Source
pub struct PublishMsg<T> { /* private fields */ }
Available on crate feature server only.
Expand description

Inner message published to subscribers.

Implementations§

Source§

impl<T: Serialize> PublishMsg<T>

Source

pub fn result(value: &T) -> Self

Create a new “result” message by serializing the value into JSON.

If serialization fails, an “error” message is created returned instead.

Examples found in repository?
examples/server-macros.rs (line 54)
49    fn subscribe(&self, interval: u64) -> Result<Self::S> {
50        if interval > 0 {
51            Ok(Box::pin(async_stream::stream! {
52                for i in 0..10 {
53                    tokio::time::sleep(Duration::from_secs(interval)).await;
54                    yield PublishMsg::result(&i);
55                }
56                yield PublishMsg::error(&jsonrpc_core::Error {
57                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
58                    message: "ended".into(),
59                    data: None,
60                });
61            }))
62        } else {
63            Err(jsonrpc_core::Error::invalid_params("invalid interval"))
64        }
65    }
More examples
Hide additional examples
examples/server-macros-custom-error.rs (line 83)
78    fn subscribe(&self, interval: u64) -> Result<Self::S> {
79        if interval > 0 {
80            Ok(Box::pin(async_stream::stream! {
81                for i in 0..10 {
82                    tokio::time::sleep(Duration::from_secs(interval)).await;
83                    yield PublishMsg::result(&i);
84                }
85                yield PublishMsg::error(&jsonrpc_core::Error {
86                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
87                    message: "ended".into(),
88                    data: None,
89                });
90            }))
91        } else {
92            Err(MyError {
93                code: ErrorCode::InvalidParams.code(),
94                message: "invalid interval".into(),
95                data: None,
96            })
97        }
98    }
examples/broadcast.rs (line 27)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    let (tx, _) = broadcast::channel(8);
19    tokio::spawn({
20        let tx = tx.clone();
21        async move {
22            for i in 0u64.. {
23                // Error can be ignored.
24                //
25                // It is recommended to broadcast already serialized
26                // `PublishMsg`. This way it only need to serialized once.
27                drop(tx.send(PublishMsg::result(&i)));
28                tokio::time::sleep(Duration::from_secs(1)).await;
29            }
30        }
31    });
32    add_pub_sub(
33        &mut rpc,
34        "subscribe",
35        "subscription",
36        "unsubscribe",
37        move |_params: Params| {
38            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39                result.unwrap_or_else(|_| {
40                    PublishMsg::error(&jsonrpc_core::Error {
41                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
42                        message: "lagged".into(),
43                        data: None,
44                    })
45                })
46            }))
47        },
48    );
49    let rpc = Arc::new(rpc);
50    let config = StreamServerConfig::default()
51        .with_channel_size(4)
52        .with_pipeline_size(4)
53        .with_keep_alive(true);
54    let app = jsonrpc_router("/rpc", rpc.clone(), config);
55    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56    axum::serve(listener, app).await.unwrap();
57}
examples/server.rs (line 45)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    rpc.add_method("sleep", |params: Params| async move {
19        let (x,): (u64,) = params.parse()?;
20        tokio::time::sleep(Duration::from_secs(x)).await;
21        Ok(x.into())
22    });
23    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24    rpc.add_method("value", |params: Params| async move {
25        let x: Option<u64> = params.parse()?;
26        Ok(x.unwrap_or_default().into())
27    });
28    rpc.add_method("add", |params: Params| async move {
29        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30        let sum = x + y + z;
31        Ok(sum.into())
32    });
33
34    add_pub_sub(
35        &mut rpc,
36        "subscribe",
37        "subscription",
38        "unsubscribe",
39        |params: Params| {
40            let (interval,): (u64,) = params.parse()?;
41            if interval > 0 {
42                Ok(async_stream::stream! {
43                    for i in 0..10 {
44                        tokio::time::sleep(Duration::from_secs(interval)).await;
45                        yield PublishMsg::result(&i);
46                    }
47                    yield PublishMsg::error(&jsonrpc_core::Error {
48                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
49                        message: "ended".into(),
50                        data: None,
51                    });
52                })
53            } else {
54                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55            }
56        },
57    );
58    let rpc = Arc::new(rpc);
59    let stream_config = StreamServerConfig::default()
60        .with_channel_size(4)
61        .with_pipeline_size(4);
62
63    // HTTP and WS server.
64    let ws_config = stream_config.clone().with_keep_alive(true);
65    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67    // You can use additional tower-http middlewares to add e.g. CORS.
68    tokio::spawn(async move {
69        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70        axum::serve(listener, app).await.unwrap();
71    });
72
73    // TCP server with line delimited json codec.
74    //
75    // You can also use other transports (e.g. TLS, unix socket) and codecs
76    // (e.g. netstring, JSON splitter).
77    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79    while let Ok((s, _)) = listener.accept().await {
80        let rpc = rpc.clone();
81        let stream_config = stream_config.clone();
82        let codec = codec.clone();
83        tokio::spawn(async move {
84            let (r, w) = s.into_split();
85            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86            let w = FramedWrite::new(w, codec).with(|msg| async move {
87                Ok::<_, LinesCodecError>(match msg {
88                    StreamMsg::Str(msg) => msg,
89                    _ => "".into(),
90                })
91            });
92            tokio::pin!(w);
93            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94        });
95    }
96}
Source§

impl<T> PublishMsg<T>

Source

pub fn error(err: &Error) -> Self

Create a new “error” message by serializing the JSONRPC error object.

§Panics

If serializing the error fails.

Examples found in repository?
examples/server-macros.rs (lines 56-60)
49    fn subscribe(&self, interval: u64) -> Result<Self::S> {
50        if interval > 0 {
51            Ok(Box::pin(async_stream::stream! {
52                for i in 0..10 {
53                    tokio::time::sleep(Duration::from_secs(interval)).await;
54                    yield PublishMsg::result(&i);
55                }
56                yield PublishMsg::error(&jsonrpc_core::Error {
57                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
58                    message: "ended".into(),
59                    data: None,
60                });
61            }))
62        } else {
63            Err(jsonrpc_core::Error::invalid_params("invalid interval"))
64        }
65    }
More examples
Hide additional examples
examples/server-macros-custom-error.rs (lines 85-89)
78    fn subscribe(&self, interval: u64) -> Result<Self::S> {
79        if interval > 0 {
80            Ok(Box::pin(async_stream::stream! {
81                for i in 0..10 {
82                    tokio::time::sleep(Duration::from_secs(interval)).await;
83                    yield PublishMsg::result(&i);
84                }
85                yield PublishMsg::error(&jsonrpc_core::Error {
86                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
87                    message: "ended".into(),
88                    data: None,
89                });
90            }))
91        } else {
92            Err(MyError {
93                code: ErrorCode::InvalidParams.code(),
94                message: "invalid interval".into(),
95                data: None,
96            })
97        }
98    }
examples/broadcast.rs (lines 40-44)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    let (tx, _) = broadcast::channel(8);
19    tokio::spawn({
20        let tx = tx.clone();
21        async move {
22            for i in 0u64.. {
23                // Error can be ignored.
24                //
25                // It is recommended to broadcast already serialized
26                // `PublishMsg`. This way it only need to serialized once.
27                drop(tx.send(PublishMsg::result(&i)));
28                tokio::time::sleep(Duration::from_secs(1)).await;
29            }
30        }
31    });
32    add_pub_sub(
33        &mut rpc,
34        "subscribe",
35        "subscription",
36        "unsubscribe",
37        move |_params: Params| {
38            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39                result.unwrap_or_else(|_| {
40                    PublishMsg::error(&jsonrpc_core::Error {
41                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
42                        message: "lagged".into(),
43                        data: None,
44                    })
45                })
46            }))
47        },
48    );
49    let rpc = Arc::new(rpc);
50    let config = StreamServerConfig::default()
51        .with_channel_size(4)
52        .with_pipeline_size(4)
53        .with_keep_alive(true);
54    let app = jsonrpc_router("/rpc", rpc.clone(), config);
55    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56    axum::serve(listener, app).await.unwrap();
57}
examples/server.rs (lines 47-51)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    rpc.add_method("sleep", |params: Params| async move {
19        let (x,): (u64,) = params.parse()?;
20        tokio::time::sleep(Duration::from_secs(x)).await;
21        Ok(x.into())
22    });
23    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24    rpc.add_method("value", |params: Params| async move {
25        let x: Option<u64> = params.parse()?;
26        Ok(x.unwrap_or_default().into())
27    });
28    rpc.add_method("add", |params: Params| async move {
29        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30        let sum = x + y + z;
31        Ok(sum.into())
32    });
33
34    add_pub_sub(
35        &mut rpc,
36        "subscribe",
37        "subscription",
38        "unsubscribe",
39        |params: Params| {
40            let (interval,): (u64,) = params.parse()?;
41            if interval > 0 {
42                Ok(async_stream::stream! {
43                    for i in 0..10 {
44                        tokio::time::sleep(Duration::from_secs(interval)).await;
45                        yield PublishMsg::result(&i);
46                    }
47                    yield PublishMsg::error(&jsonrpc_core::Error {
48                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
49                        message: "ended".into(),
50                        data: None,
51                    });
52                })
53            } else {
54                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55            }
56        },
57    );
58    let rpc = Arc::new(rpc);
59    let stream_config = StreamServerConfig::default()
60        .with_channel_size(4)
61        .with_pipeline_size(4);
62
63    // HTTP and WS server.
64    let ws_config = stream_config.clone().with_keep_alive(true);
65    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67    // You can use additional tower-http middlewares to add e.g. CORS.
68    tokio::spawn(async move {
69        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70        axum::serve(listener, app).await.unwrap();
71    });
72
73    // TCP server with line delimited json codec.
74    //
75    // You can also use other transports (e.g. TLS, unix socket) and codecs
76    // (e.g. netstring, JSON splitter).
77    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79    while let Ok((s, _)) = listener.accept().await {
80        let rpc = rpc.clone();
81        let stream_config = stream_config.clone();
82        let codec = codec.clone();
83        tokio::spawn(async move {
84            let (r, w) = s.into_split();
85            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86            let w = FramedWrite::new(w, codec).with(|msg| async move {
87                Ok::<_, LinesCodecError>(match msg {
88                    StreamMsg::Str(msg) => msg,
89                    _ => "".into(),
90                })
91            });
92            tokio::pin!(w);
93            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94        });
95    }
96}
Source

pub fn result_raw_json(value: impl Into<Arc<str>>) -> Self

Create a new “result” message.

value must be valid JSON.

Source

pub fn error_raw_json(value: impl Into<Arc<str>>) -> Self

Create a new “error” message.

value must be valid JSON.

Trait Implementations§

Source§

impl<T: Clone> Clone for PublishMsg<T>

Source§

fn clone(&self) -> PublishMsg<T>

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl<T> Freeze for PublishMsg<T>

§

impl<T> RefUnwindSafe for PublishMsg<T>
where T: RefUnwindSafe,

§

impl<T> Send for PublishMsg<T>
where T: Send,

§

impl<T> Sync for PublishMsg<T>
where T: Sync,

§

impl<T> Unpin for PublishMsg<T>
where T: Unpin,

§

impl<T> UnwindSafe for PublishMsg<T>
where T: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,

Source§

impl<T> MaybeSendSync for T