pub struct StreamServerConfig { /* private fields */ }
Available on crate feature server only.

Implementations§

source§

impl StreamServerConfig

source

pub fn with_channel_size(self, channel_size: usize) -> Self

Set websocket channel size.

Default is 8.

Panics

If channel_size is 0.

Examples found in repository?
examples/broadcast.rs (line 51)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    let (tx, _) = broadcast::channel(8);
    tokio::spawn({
        let tx = tx.clone();
        async move {
            for i in 0u64.. {
                // Error can be ignored.
                //
                // It is recommended to broadcast already serialized
                // `PublishMsg`. This way it only need to serialized once.
                drop(tx.send(PublishMsg::result(&i)));
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    });
    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        move |_params: Params| {
            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
                result.unwrap_or_else(|_| {
                    PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "lagged".into(),
                        data: None,
                    })
                })
            }))
        },
    );
    let rpc = Arc::new(rpc);
    let config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4)
        .with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), config);
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}
More examples
Hide additional examples
examples/server.rs (line 60)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    rpc.add_method("sleep", |params: Params| async move {
        let (x,): (u64,) = params.parse()?;
        tokio::time::sleep(Duration::from_secs(x)).await;
        Ok(x.into())
    });
    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
    rpc.add_method("value", |params: Params| async move {
        let x: Option<u64> = params.parse()?;
        Ok(x.unwrap_or_default().into())
    });
    rpc.add_method("add", |params: Params| async move {
        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
        let sum = x + y + z;
        Ok(sum.into())
    });

    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        |params: Params| {
            let (interval,): (u64,) = params.parse()?;
            if interval > 0 {
                Ok(async_stream::stream! {
                    for i in 0..10 {
                        tokio::time::sleep(Duration::from_secs(interval)).await;
                        yield PublishMsg::result(&i);
                    }
                    yield PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "ended".into(),
                        data: None,
                    });
                })
            } else {
                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
            }
        },
    );
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4);

    // HTTP and WS server.
    let ws_config = stream_config.clone().with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);

    // You can use additional tower-http middlewares to add e.g. CORS.
    tokio::spawn(async move {
        axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
            .serve(app.into_make_service())
            .await
            .unwrap();
    });

    // TCP server with line delimited json codec.
    //
    // You can also use other transports (e.g. TLS, unix socket) and codecs
    // (e.g. netstring, JSON splitter).
    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
    while let Ok((s, _)) = listener.accept().await {
        let rpc = rpc.clone();
        let stream_config = stream_config.clone();
        let codec = codec.clone();
        tokio::spawn(async move {
            let (r, w) = s.into_split();
            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
            let w = FramedWrite::new(w, codec).with(|msg| async move {
                Ok::<_, LinesCodecError>(match msg {
                    StreamMsg::Str(msg) => msg,
                    _ => "".into(),
                })
            });
            tokio::pin!(w);
            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
        });
    }
}
source

pub fn with_pipeline_size(self, pipeline_size: usize) -> Self

Set maximum request pipelining.

Up to pipeline_size number of requests will be handled concurrently.

Default is 1, i.e. no pipelining.

Panics

if pipeline_size is 0.

Examples found in repository?
examples/broadcast.rs (line 52)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    let (tx, _) = broadcast::channel(8);
    tokio::spawn({
        let tx = tx.clone();
        async move {
            for i in 0u64.. {
                // Error can be ignored.
                //
                // It is recommended to broadcast already serialized
                // `PublishMsg`. This way it only need to serialized once.
                drop(tx.send(PublishMsg::result(&i)));
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    });
    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        move |_params: Params| {
            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
                result.unwrap_or_else(|_| {
                    PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "lagged".into(),
                        data: None,
                    })
                })
            }))
        },
    );
    let rpc = Arc::new(rpc);
    let config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4)
        .with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), config);
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}
More examples
Hide additional examples
examples/server.rs (line 61)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    rpc.add_method("sleep", |params: Params| async move {
        let (x,): (u64,) = params.parse()?;
        tokio::time::sleep(Duration::from_secs(x)).await;
        Ok(x.into())
    });
    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
    rpc.add_method("value", |params: Params| async move {
        let x: Option<u64> = params.parse()?;
        Ok(x.unwrap_or_default().into())
    });
    rpc.add_method("add", |params: Params| async move {
        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
        let sum = x + y + z;
        Ok(sum.into())
    });

    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        |params: Params| {
            let (interval,): (u64,) = params.parse()?;
            if interval > 0 {
                Ok(async_stream::stream! {
                    for i in 0..10 {
                        tokio::time::sleep(Duration::from_secs(interval)).await;
                        yield PublishMsg::result(&i);
                    }
                    yield PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "ended".into(),
                        data: None,
                    });
                })
            } else {
                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
            }
        },
    );
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4);

    // HTTP and WS server.
    let ws_config = stream_config.clone().with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);

    // You can use additional tower-http middlewares to add e.g. CORS.
    tokio::spawn(async move {
        axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
            .serve(app.into_make_service())
            .await
            .unwrap();
    });

    // TCP server with line delimited json codec.
    //
    // You can also use other transports (e.g. TLS, unix socket) and codecs
    // (e.g. netstring, JSON splitter).
    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
    while let Ok((s, _)) = listener.accept().await {
        let rpc = rpc.clone();
        let stream_config = stream_config.clone();
        let codec = codec.clone();
        tokio::spawn(async move {
            let (r, w) = s.into_split();
            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
            let w = FramedWrite::new(w, codec).with(|msg| async move {
                Ok::<_, LinesCodecError>(match msg {
                    StreamMsg::Str(msg) => msg,
                    _ => "".into(),
                })
            });
            tokio::pin!(w);
            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
        });
    }
}
source

pub fn with_keep_alive(self, keep_alive: bool) -> Self

Set whether keep alive is enabled.

Default is false.

Examples found in repository?
examples/server-macros-custom-error.rs (line 106)
102
103
104
105
106
107
108
109
110
111
112
113
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    add_my_rpc_methods(&mut rpc, RpcImpl);
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default().with_keep_alive(true);

    let app = jsonrpc_router("/rpc", rpc, stream_config);
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}
More examples
Hide additional examples
examples/server-macros.rs (line 73)
69
70
71
72
73
74
75
76
77
78
79
80
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    add_my_rpc_methods(&mut rpc, RpcImpl);
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default().with_keep_alive(true);

    let app = jsonrpc_router("/rpc", rpc, stream_config);
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}
examples/openrpc.rs (line 64)
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
async fn main() {
    let doc = my_rpc_doc();

    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    add_my_rpc_methods(&mut rpc, MyRpcImpl);
    rpc.add_method("rpc.discover", move |_| {
        let doc = doc.clone();
        async move { Ok(doc) }
    });

    let rpc = Arc::new(rpc);

    let stream_config = StreamServerConfig::default().with_keep_alive(true);
    let app = jsonrpc_router("/", rpc, stream_config).layer(CorsLayer::permissive());

    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}
examples/broadcast.rs (line 53)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    let (tx, _) = broadcast::channel(8);
    tokio::spawn({
        let tx = tx.clone();
        async move {
            for i in 0u64.. {
                // Error can be ignored.
                //
                // It is recommended to broadcast already serialized
                // `PublishMsg`. This way it only need to serialized once.
                drop(tx.send(PublishMsg::result(&i)));
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    });
    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        move |_params: Params| {
            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
                result.unwrap_or_else(|_| {
                    PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "lagged".into(),
                        data: None,
                    })
                })
            }))
        },
    );
    let rpc = Arc::new(rpc);
    let config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4)
        .with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), config);
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}
examples/server.rs (line 64)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async fn main() {
    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
    rpc.add_method("sleep", |params: Params| async move {
        let (x,): (u64,) = params.parse()?;
        tokio::time::sleep(Duration::from_secs(x)).await;
        Ok(x.into())
    });
    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
    rpc.add_method("value", |params: Params| async move {
        let x: Option<u64> = params.parse()?;
        Ok(x.unwrap_or_default().into())
    });
    rpc.add_method("add", |params: Params| async move {
        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
        let sum = x + y + z;
        Ok(sum.into())
    });

    add_pub_sub(
        &mut rpc,
        "subscribe",
        "subscription",
        "unsubscribe",
        |params: Params| {
            let (interval,): (u64,) = params.parse()?;
            if interval > 0 {
                Ok(async_stream::stream! {
                    for i in 0..10 {
                        tokio::time::sleep(Duration::from_secs(interval)).await;
                        yield PublishMsg::result(&i);
                    }
                    yield PublishMsg::error(&jsonrpc_core::Error {
                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
                        message: "ended".into(),
                        data: None,
                    });
                })
            } else {
                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
            }
        },
    );
    let rpc = Arc::new(rpc);
    let stream_config = StreamServerConfig::default()
        .with_channel_size(4)
        .with_pipeline_size(4);

    // HTTP and WS server.
    let ws_config = stream_config.clone().with_keep_alive(true);
    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);

    // You can use additional tower-http middlewares to add e.g. CORS.
    tokio::spawn(async move {
        axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
            .serve(app.into_make_service())
            .await
            .unwrap();
    });

    // TCP server with line delimited json codec.
    //
    // You can also use other transports (e.g. TLS, unix socket) and codecs
    // (e.g. netstring, JSON splitter).
    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
    while let Ok((s, _)) = listener.accept().await {
        let rpc = rpc.clone();
        let stream_config = stream_config.clone();
        let codec = codec.clone();
        tokio::spawn(async move {
            let (r, w) = s.into_split();
            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
            let w = FramedWrite::new(w, codec).with(|msg| async move {
                Ok::<_, LinesCodecError>(match msg {
                    StreamMsg::Str(msg) => msg,
                    _ => "".into(),
                })
            });
            tokio::pin!(w);
            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
        });
    }
}
source

pub fn with_keep_alive_duration(self, keep_alive_duration: Duration) -> Self

Wait for keep_alive_duration after the last message is received, then close the connection.

Default is 60 seconds.

source

pub fn with_ping_interval(self, ping_interval: Duration) -> Self

Set interval to send ping messages.

Default is 19 seconds.

source

pub fn with_shutdown<S>(self, shutdown: S) -> StreamServerConfig
where S: Future<Output = ()> + Send + 'static,

Trait Implementations§

source§

impl Clone for StreamServerConfig

source§

fn clone(&self) -> StreamServerConfig

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Default for StreamServerConfig

source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FromRef<T> for T
where T: Clone,

§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more