pub struct PublishMsg<T> { /* private fields */ }
Available on crate feature server only.
Expand description

Inner message published to subscribers.

Implementations§

source§

impl<T: Serialize> PublishMsg<T>

source

pub fn result(value: &T) -> Self

Create a new “result” message by serializing the value into JSON.

If serialization fails, an “error” message is created returned instead.

Examples found in repository?
examples/server-macros.rs (line 54)
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    fn subscribe(&self, interval: u64) -> Result<Self::S> {
        if interval > 0 {
            Ok(Box::pin(async_stream::stream! {
                for i in 0..10 {
                    tokio::time::sleep(Duration::from_secs(interval)).await;
                    yield PublishMsg::result(&i);
                }
                yield PublishMsg::error(&jsonrpc_core::Error {
                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
                    message: "ended".into(),
                    data: None,
                });
            }))
        } else {
            Err(jsonrpc_core::Error::invalid_params("invalid interval"))
        }
    }
More examples
Hide additional examples
examples/server-macros-custom-error.rs (line 83)
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
    fn subscribe(&self, interval: u64) -> Result<Self::S> {
        if interval > 0 {
            Ok(Box::pin(async_stream::stream! {
                for i in 0..10 {
                    tokio::time::sleep(Duration::from_secs(interval)).await;
                    yield PublishMsg::result(&i);
                }
                yield PublishMsg::error(&jsonrpc_core::Error {
                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
                    message: "ended".into(),
                    data: None,
                });
            }))
        } else {
            Err(MyError {
                code: ErrorCode::InvalidParams.code(),
                message: "invalid interval".into(),
                data: None,
            })
        }
    }
examples/broadcast.rs (line 27)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    let (tx, _) = broadcast::channel(8);
    tokio::spawn({
        let tx = tx.clone();
        async move {
            for i in 0u64.. {
                // Error can be ignored.
                //
                // It is recommended to broadcast already serialized
                // `PublishMsg`. This way it only need to serialized once.
                drop(tx.send(PublishMsg::result(&i)));
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    });
    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        move |_params: Params| {
            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
                result.unwrap_or_else(|_| {
                    PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "lagged".into(),
                        data: None,
                    })
                })
            }))
        },
    );
    let rpc = Arc::new(rpc);
    let config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4)
        .with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), config);
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}
examples/server.rs (line 45)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    rpc.add_method("sleep", |params: Params| async move {
        let (x,): (u64,) = params.parse()?;
        tokio::time::sleep(Duration::from_secs(x)).await;
        Ok(x.into())
    });
    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
    rpc.add_method("value", |params: Params| async move {
        let x: Option<u64> = params.parse()?;
        Ok(x.unwrap_or_default().into())
    });
    rpc.add_method("add", |params: Params| async move {
        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
        let sum = x + y + z;
        Ok(sum.into())
    });

    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        |params: Params| {
            let (interval,): (u64,) = params.parse()?;
            if interval > 0 {
                Ok(async_stream::stream! {
                    for i in 0..10 {
                        tokio::time::sleep(Duration::from_secs(interval)).await;
                        yield PublishMsg::result(&i);
                    }
                    yield PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "ended".into(),
                        data: None,
                    });
                })
            } else {
                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
            }
        },
    );
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4);

    // HTTP and WS server.
    let ws_config = stream_config.clone().with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);

    // You can use additional tower-http middlewares to add e.g. CORS.
    tokio::spawn(async move {
        axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
            .serve(app.into_make_service())
            .await
            .unwrap();
    });

    // TCP server with line delimited json codec.
    //
    // You can also use other transports (e.g. TLS, unix socket) and codecs
    // (e.g. netstring, JSON splitter).
    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
    while let Ok((s, _)) = listener.accept().await {
        let rpc = rpc.clone();
        let stream_config = stream_config.clone();
        let codec = codec.clone();
        tokio::spawn(async move {
            let (r, w) = s.into_split();
            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
            let w = FramedWrite::new(w, codec).with(|msg| async move {
                Ok::<_, LinesCodecError>(match msg {
                    StreamMsg::Str(msg) => msg,
                    _ => "".into(),
                })
            });
            tokio::pin!(w);
            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
        });
    }
}
source§

impl<T> PublishMsg<T>

source

pub fn error(err: &Error) -> Self

Create a new “error” message by serializing the JSONRPC error object.

Panics

If serializing the error fails.

Examples found in repository?
examples/server-macros.rs (lines 56-60)
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    fn subscribe(&self, interval: u64) -> Result<Self::S> {
        if interval > 0 {
            Ok(Box::pin(async_stream::stream! {
                for i in 0..10 {
                    tokio::time::sleep(Duration::from_secs(interval)).await;
                    yield PublishMsg::result(&i);
                }
                yield PublishMsg::error(&jsonrpc_core::Error {
                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
                    message: "ended".into(),
                    data: None,
                });
            }))
        } else {
            Err(jsonrpc_core::Error::invalid_params("invalid interval"))
        }
    }
More examples
Hide additional examples
examples/server-macros-custom-error.rs (lines 85-89)
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
    fn subscribe(&self, interval: u64) -> Result<Self::S> {
        if interval > 0 {
            Ok(Box::pin(async_stream::stream! {
                for i in 0..10 {
                    tokio::time::sleep(Duration::from_secs(interval)).await;
                    yield PublishMsg::result(&i);
                }
                yield PublishMsg::error(&jsonrpc_core::Error {
                    code: jsonrpc_core::ErrorCode::ServerError(-32000),
                    message: "ended".into(),
                    data: None,
                });
            }))
        } else {
            Err(MyError {
                code: ErrorCode::InvalidParams.code(),
                message: "invalid interval".into(),
                data: None,
            })
        }
    }
examples/broadcast.rs (lines 40-44)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    let (tx, _) = broadcast::channel(8);
    tokio::spawn({
        let tx = tx.clone();
        async move {
            for i in 0u64.. {
                // Error can be ignored.
                //
                // It is recommended to broadcast already serialized
                // `PublishMsg`. This way it only need to serialized once.
                drop(tx.send(PublishMsg::result(&i)));
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    });
    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        move |_params: Params| {
            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
                result.unwrap_or_else(|_| {
                    PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "lagged".into(),
                        data: None,
                    })
                })
            }))
        },
    );
    let rpc = Arc::new(rpc);
    let config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4)
        .with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), config);
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}
examples/server.rs (lines 47-51)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    rpc.add_method("sleep", |params: Params| async move {
        let (x,): (u64,) = params.parse()?;
        tokio::time::sleep(Duration::from_secs(x)).await;
        Ok(x.into())
    });
    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
    rpc.add_method("value", |params: Params| async move {
        let x: Option<u64> = params.parse()?;
        Ok(x.unwrap_or_default().into())
    });
    rpc.add_method("add", |params: Params| async move {
        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
        let sum = x + y + z;
        Ok(sum.into())
    });

    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        |params: Params| {
            let (interval,): (u64,) = params.parse()?;
            if interval > 0 {
                Ok(async_stream::stream! {
                    for i in 0..10 {
                        tokio::time::sleep(Duration::from_secs(interval)).await;
                        yield PublishMsg::result(&i);
                    }
                    yield PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "ended".into(),
                        data: None,
                    });
                })
            } else {
                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
            }
        },
    );
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4);

    // HTTP and WS server.
    let ws_config = stream_config.clone().with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);

    // You can use additional tower-http middlewares to add e.g. CORS.
    tokio::spawn(async move {
        axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
            .serve(app.into_make_service())
            .await
            .unwrap();
    });

    // TCP server with line delimited json codec.
    //
    // You can also use other transports (e.g. TLS, unix socket) and codecs
    // (e.g. netstring, JSON splitter).
    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
    while let Ok((s, _)) = listener.accept().await {
        let rpc = rpc.clone();
        let stream_config = stream_config.clone();
        let codec = codec.clone();
        tokio::spawn(async move {
            let (r, w) = s.into_split();
            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
            let w = FramedWrite::new(w, codec).with(|msg| async move {
                Ok::<_, LinesCodecError>(match msg {
                    StreamMsg::Str(msg) => msg,
                    _ => "".into(),
                })
            });
            tokio::pin!(w);
            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
        });
    }
}
source

pub fn result_raw_json(value: impl Into<Arc<str>>) -> Self

Create a new “result” message.

value must be valid JSON.

source

pub fn error_raw_json(value: impl Into<Arc<str>>) -> Self

Create a new “error” message.

value must be valid JSON.

Trait Implementations§

source§

impl<T: Clone> Clone for PublishMsg<T>

source§

fn clone(&self) -> PublishMsg<T>

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl<T> RefUnwindSafe for PublishMsg<T>
where T: RefUnwindSafe,

§

impl<T> Send for PublishMsg<T>
where T: Send,

§

impl<T> Sync for PublishMsg<T>
where T: Sync,

§

impl<T> Unpin for PublishMsg<T>
where T: Unpin,

§

impl<T> UnwindSafe for PublishMsg<T>
where T: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FromRef<T> for T
where T: Clone,

§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more