Struct StreamServerConfig

Source
pub struct StreamServerConfig { /* private fields */ }
Available on crate feature server only.

Implementations§

Source§

impl StreamServerConfig

Source

pub fn with_channel_size(self, channel_size: usize) -> Self

Set pub-sub channel buffer size.

Default is 8.

§Panics

If channel_size is 0.

Examples found in repository?
examples/broadcast.rs (line 51)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    let (tx, _) = broadcast::channel(8);
19    tokio::spawn({
20        let tx = tx.clone();
21        async move {
22            for i in 0u64.. {
23                // Error can be ignored.
24                //
25                // It is recommended to broadcast already serialized
26                // `PublishMsg`. This way it only need to serialized once.
27                drop(tx.send(PublishMsg::result(&i)));
28                tokio::time::sleep(Duration::from_secs(1)).await;
29            }
30        }
31    });
32    add_pub_sub(
33        &mut rpc,
34        "subscribe",
35        "subscription",
36        "unsubscribe",
37        move |_params: Params| {
38            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39                result.unwrap_or_else(|_| {
40                    PublishMsg::error(&jsonrpc_core::Error {
41                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
42                        message: "lagged".into(),
43                        data: None,
44                    })
45                })
46            }))
47        },
48    );
49    let rpc = Arc::new(rpc);
50    let config = StreamServerConfig::default()
51        .with_channel_size(4)
52        .with_pipeline_size(4)
53        .with_keep_alive(true);
54    let app = jsonrpc_router("/rpc", rpc.clone(), config);
55    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56    axum::serve(listener, app).await.unwrap();
57}
More examples
Hide additional examples
examples/server.rs (line 60)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    rpc.add_method("sleep", |params: Params| async move {
19        let (x,): (u64,) = params.parse()?;
20        tokio::time::sleep(Duration::from_secs(x)).await;
21        Ok(x.into())
22    });
23    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24    rpc.add_method("value", |params: Params| async move {
25        let x: Option<u64> = params.parse()?;
26        Ok(x.unwrap_or_default().into())
27    });
28    rpc.add_method("add", |params: Params| async move {
29        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30        let sum = x + y + z;
31        Ok(sum.into())
32    });
33
34    add_pub_sub(
35        &mut rpc,
36        "subscribe",
37        "subscription",
38        "unsubscribe",
39        |params: Params| {
40            let (interval,): (u64,) = params.parse()?;
41            if interval > 0 {
42                Ok(async_stream::stream! {
43                    for i in 0..10 {
44                        tokio::time::sleep(Duration::from_secs(interval)).await;
45                        yield PublishMsg::result(&i);
46                    }
47                    yield PublishMsg::error(&jsonrpc_core::Error {
48                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
49                        message: "ended".into(),
50                        data: None,
51                    });
52                })
53            } else {
54                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55            }
56        },
57    );
58    let rpc = Arc::new(rpc);
59    let stream_config = StreamServerConfig::default()
60        .with_channel_size(4)
61        .with_pipeline_size(4);
62
63    // HTTP and WS server.
64    let ws_config = stream_config.clone().with_keep_alive(true);
65    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67    // You can use additional tower-http middlewares to add e.g. CORS.
68    tokio::spawn(async move {
69        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70        axum::serve(listener, app).await.unwrap();
71    });
72
73    // TCP server with line delimited json codec.
74    //
75    // You can also use other transports (e.g. TLS, unix socket) and codecs
76    // (e.g. netstring, JSON splitter).
77    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79    while let Ok((s, _)) = listener.accept().await {
80        let rpc = rpc.clone();
81        let stream_config = stream_config.clone();
82        let codec = codec.clone();
83        tokio::spawn(async move {
84            let (r, w) = s.into_split();
85            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86            let w = FramedWrite::new(w, codec).with(|msg| async move {
87                Ok::<_, LinesCodecError>(match msg {
88                    StreamMsg::Str(msg) => msg,
89                    _ => "".into(),
90                })
91            });
92            tokio::pin!(w);
93            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94        });
95    }
96}
Source

pub fn with_pipeline_size(self, pipeline_size: usize) -> Self

Set maximum request pipelining.

Up to pipeline_size number of requests will be handled concurrently.

Default is 1, i.e. no pipelining.

§Panics

if pipeline_size is 0.

Examples found in repository?
examples/broadcast.rs (line 52)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    let (tx, _) = broadcast::channel(8);
19    tokio::spawn({
20        let tx = tx.clone();
21        async move {
22            for i in 0u64.. {
23                // Error can be ignored.
24                //
25                // It is recommended to broadcast already serialized
26                // `PublishMsg`. This way it only need to serialized once.
27                drop(tx.send(PublishMsg::result(&i)));
28                tokio::time::sleep(Duration::from_secs(1)).await;
29            }
30        }
31    });
32    add_pub_sub(
33        &mut rpc,
34        "subscribe",
35        "subscription",
36        "unsubscribe",
37        move |_params: Params| {
38            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39                result.unwrap_or_else(|_| {
40                    PublishMsg::error(&jsonrpc_core::Error {
41                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
42                        message: "lagged".into(),
43                        data: None,
44                    })
45                })
46            }))
47        },
48    );
49    let rpc = Arc::new(rpc);
50    let config = StreamServerConfig::default()
51        .with_channel_size(4)
52        .with_pipeline_size(4)
53        .with_keep_alive(true);
54    let app = jsonrpc_router("/rpc", rpc.clone(), config);
55    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56    axum::serve(listener, app).await.unwrap();
57}
More examples
Hide additional examples
examples/server.rs (line 61)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    rpc.add_method("sleep", |params: Params| async move {
19        let (x,): (u64,) = params.parse()?;
20        tokio::time::sleep(Duration::from_secs(x)).await;
21        Ok(x.into())
22    });
23    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24    rpc.add_method("value", |params: Params| async move {
25        let x: Option<u64> = params.parse()?;
26        Ok(x.unwrap_or_default().into())
27    });
28    rpc.add_method("add", |params: Params| async move {
29        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30        let sum = x + y + z;
31        Ok(sum.into())
32    });
33
34    add_pub_sub(
35        &mut rpc,
36        "subscribe",
37        "subscription",
38        "unsubscribe",
39        |params: Params| {
40            let (interval,): (u64,) = params.parse()?;
41            if interval > 0 {
42                Ok(async_stream::stream! {
43                    for i in 0..10 {
44                        tokio::time::sleep(Duration::from_secs(interval)).await;
45                        yield PublishMsg::result(&i);
46                    }
47                    yield PublishMsg::error(&jsonrpc_core::Error {
48                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
49                        message: "ended".into(),
50                        data: None,
51                    });
52                })
53            } else {
54                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55            }
56        },
57    );
58    let rpc = Arc::new(rpc);
59    let stream_config = StreamServerConfig::default()
60        .with_channel_size(4)
61        .with_pipeline_size(4);
62
63    // HTTP and WS server.
64    let ws_config = stream_config.clone().with_keep_alive(true);
65    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67    // You can use additional tower-http middlewares to add e.g. CORS.
68    tokio::spawn(async move {
69        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70        axum::serve(listener, app).await.unwrap();
71    });
72
73    // TCP server with line delimited json codec.
74    //
75    // You can also use other transports (e.g. TLS, unix socket) and codecs
76    // (e.g. netstring, JSON splitter).
77    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79    while let Ok((s, _)) = listener.accept().await {
80        let rpc = rpc.clone();
81        let stream_config = stream_config.clone();
82        let codec = codec.clone();
83        tokio::spawn(async move {
84            let (r, w) = s.into_split();
85            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86            let w = FramedWrite::new(w, codec).with(|msg| async move {
87                Ok::<_, LinesCodecError>(match msg {
88                    StreamMsg::Str(msg) => msg,
89                    _ => "".into(),
90                })
91            });
92            tokio::pin!(w);
93            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94        });
95    }
96}
Source

pub fn with_keep_alive(self, keep_alive: bool) -> Self

Set whether keep alive is enabled.

Default is false.

Examples found in repository?
examples/server-macros-custom-error.rs (line 106)
102async fn main() {
103    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
104    add_my_rpc_methods(&mut rpc, RpcImpl);
105    let rpc = Arc::new(rpc);
106    let stream_config = StreamServerConfig::default().with_keep_alive(true);
107
108    let app = jsonrpc_router("/rpc", rpc, stream_config);
109    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
110    axum::serve(listener, app).await.unwrap();
111}
More examples
Hide additional examples
examples/server-macros.rs (line 73)
69async fn main() {
70    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
71    add_my_rpc_methods(&mut rpc, RpcImpl);
72    let rpc = Arc::new(rpc);
73    let stream_config = StreamServerConfig::default().with_keep_alive(true);
74
75    let app = jsonrpc_router("/rpc", rpc, stream_config);
76    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
77    axum::serve(listener, app).await.unwrap();
78}
examples/openrpc.rs (line 64)
52async fn main() {
53    let doc = my_rpc_doc();
54
55    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
56    add_my_rpc_methods(&mut rpc, MyRpcImpl);
57    rpc.add_method("rpc.discover", move |_| {
58        let doc = doc.clone();
59        async move { Ok(doc) }
60    });
61
62    let rpc = Arc::new(rpc);
63
64    let stream_config = StreamServerConfig::default().with_keep_alive(true);
65    let app = jsonrpc_router("/", rpc, stream_config).layer(CorsLayer::permissive());
66
67    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
68    axum::serve(listener, app).await.unwrap();
69}
examples/broadcast.rs (line 53)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    let (tx, _) = broadcast::channel(8);
19    tokio::spawn({
20        let tx = tx.clone();
21        async move {
22            for i in 0u64.. {
23                // Error can be ignored.
24                //
25                // It is recommended to broadcast already serialized
26                // `PublishMsg`. This way it only need to serialized once.
27                drop(tx.send(PublishMsg::result(&i)));
28                tokio::time::sleep(Duration::from_secs(1)).await;
29            }
30        }
31    });
32    add_pub_sub(
33        &mut rpc,
34        "subscribe",
35        "subscription",
36        "unsubscribe",
37        move |_params: Params| {
38            Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39                result.unwrap_or_else(|_| {
40                    PublishMsg::error(&jsonrpc_core::Error {
41                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
42                        message: "lagged".into(),
43                        data: None,
44                    })
45                })
46            }))
47        },
48    );
49    let rpc = Arc::new(rpc);
50    let config = StreamServerConfig::default()
51        .with_channel_size(4)
52        .with_pipeline_size(4)
53        .with_keep_alive(true);
54    let app = jsonrpc_router("/rpc", rpc.clone(), config);
55    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56    axum::serve(listener, app).await.unwrap();
57}
examples/server.rs (line 64)
16async fn main() {
17    let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18    rpc.add_method("sleep", |params: Params| async move {
19        let (x,): (u64,) = params.parse()?;
20        tokio::time::sleep(Duration::from_secs(x)).await;
21        Ok(x.into())
22    });
23    rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24    rpc.add_method("value", |params: Params| async move {
25        let x: Option<u64> = params.parse()?;
26        Ok(x.unwrap_or_default().into())
27    });
28    rpc.add_method("add", |params: Params| async move {
29        let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30        let sum = x + y + z;
31        Ok(sum.into())
32    });
33
34    add_pub_sub(
35        &mut rpc,
36        "subscribe",
37        "subscription",
38        "unsubscribe",
39        |params: Params| {
40            let (interval,): (u64,) = params.parse()?;
41            if interval > 0 {
42                Ok(async_stream::stream! {
43                    for i in 0..10 {
44                        tokio::time::sleep(Duration::from_secs(interval)).await;
45                        yield PublishMsg::result(&i);
46                    }
47                    yield PublishMsg::error(&jsonrpc_core::Error {
48                        code: jsonrpc_core::ErrorCode::ServerError(-32000),
49                        message: "ended".into(),
50                        data: None,
51                    });
52                })
53            } else {
54                Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55            }
56        },
57    );
58    let rpc = Arc::new(rpc);
59    let stream_config = StreamServerConfig::default()
60        .with_channel_size(4)
61        .with_pipeline_size(4);
62
63    // HTTP and WS server.
64    let ws_config = stream_config.clone().with_keep_alive(true);
65    let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67    // You can use additional tower-http middlewares to add e.g. CORS.
68    tokio::spawn(async move {
69        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70        axum::serve(listener, app).await.unwrap();
71    });
72
73    // TCP server with line delimited json codec.
74    //
75    // You can also use other transports (e.g. TLS, unix socket) and codecs
76    // (e.g. netstring, JSON splitter).
77    let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78    let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79    while let Ok((s, _)) = listener.accept().await {
80        let rpc = rpc.clone();
81        let stream_config = stream_config.clone();
82        let codec = codec.clone();
83        tokio::spawn(async move {
84            let (r, w) = s.into_split();
85            let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86            let w = FramedWrite::new(w, codec).with(|msg| async move {
87                Ok::<_, LinesCodecError>(match msg {
88                    StreamMsg::Str(msg) => msg,
89                    _ => "".into(),
90                })
91            });
92            tokio::pin!(w);
93            drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94        });
95    }
96}
Source

pub fn with_keep_alive_duration(self, keep_alive_duration: Duration) -> Self

Wait for keep_alive_duration after the last message is received, then close the connection.

Default is 60 seconds.

Source

pub fn with_ping_interval(self, ping_interval: Duration) -> Self

Set interval to send ping messages.

Default is 19 seconds.

Source

pub fn with_shutdown<S>(self, shutdown: S) -> StreamServerConfig
where S: Future<Output = ()> + Send + 'static,

Trait Implementations§

Source§

impl Clone for StreamServerConfig

Source§

fn clone(&self) -> StreamServerConfig

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Default for StreamServerConfig

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,

Source§

impl<T> MaybeSendSync for T