pub struct StreamServerConfig { /* private fields */ }
Available on crate feature
server
only.Implementations§
Source§impl StreamServerConfig
impl StreamServerConfig
Sourcepub fn with_channel_size(self, channel_size: usize) -> Self
pub fn with_channel_size(self, channel_size: usize) -> Self
Examples found in repository?
examples/broadcast.rs (line 51)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 let (tx, _) = broadcast::channel(8);
19 tokio::spawn({
20 let tx = tx.clone();
21 async move {
22 for i in 0u64.. {
23 // Error can be ignored.
24 //
25 // It is recommended to broadcast already serialized
26 // `PublishMsg`. This way it only need to serialized once.
27 drop(tx.send(PublishMsg::result(&i)));
28 tokio::time::sleep(Duration::from_secs(1)).await;
29 }
30 }
31 });
32 add_pub_sub(
33 &mut rpc,
34 "subscribe",
35 "subscription",
36 "unsubscribe",
37 move |_params: Params| {
38 Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39 result.unwrap_or_else(|_| {
40 PublishMsg::error(&jsonrpc_core::Error {
41 code: jsonrpc_core::ErrorCode::ServerError(-32000),
42 message: "lagged".into(),
43 data: None,
44 })
45 })
46 }))
47 },
48 );
49 let rpc = Arc::new(rpc);
50 let config = StreamServerConfig::default()
51 .with_channel_size(4)
52 .with_pipeline_size(4)
53 .with_keep_alive(true);
54 let app = jsonrpc_router("/rpc", rpc.clone(), config);
55 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56 axum::serve(listener, app).await.unwrap();
57}
More examples
examples/server.rs (line 60)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 rpc.add_method("sleep", |params: Params| async move {
19 let (x,): (u64,) = params.parse()?;
20 tokio::time::sleep(Duration::from_secs(x)).await;
21 Ok(x.into())
22 });
23 rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24 rpc.add_method("value", |params: Params| async move {
25 let x: Option<u64> = params.parse()?;
26 Ok(x.unwrap_or_default().into())
27 });
28 rpc.add_method("add", |params: Params| async move {
29 let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30 let sum = x + y + z;
31 Ok(sum.into())
32 });
33
34 add_pub_sub(
35 &mut rpc,
36 "subscribe",
37 "subscription",
38 "unsubscribe",
39 |params: Params| {
40 let (interval,): (u64,) = params.parse()?;
41 if interval > 0 {
42 Ok(async_stream::stream! {
43 for i in 0..10 {
44 tokio::time::sleep(Duration::from_secs(interval)).await;
45 yield PublishMsg::result(&i);
46 }
47 yield PublishMsg::error(&jsonrpc_core::Error {
48 code: jsonrpc_core::ErrorCode::ServerError(-32000),
49 message: "ended".into(),
50 data: None,
51 });
52 })
53 } else {
54 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55 }
56 },
57 );
58 let rpc = Arc::new(rpc);
59 let stream_config = StreamServerConfig::default()
60 .with_channel_size(4)
61 .with_pipeline_size(4);
62
63 // HTTP and WS server.
64 let ws_config = stream_config.clone().with_keep_alive(true);
65 let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67 // You can use additional tower-http middlewares to add e.g. CORS.
68 tokio::spawn(async move {
69 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70 axum::serve(listener, app).await.unwrap();
71 });
72
73 // TCP server with line delimited json codec.
74 //
75 // You can also use other transports (e.g. TLS, unix socket) and codecs
76 // (e.g. netstring, JSON splitter).
77 let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78 let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79 while let Ok((s, _)) = listener.accept().await {
80 let rpc = rpc.clone();
81 let stream_config = stream_config.clone();
82 let codec = codec.clone();
83 tokio::spawn(async move {
84 let (r, w) = s.into_split();
85 let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86 let w = FramedWrite::new(w, codec).with(|msg| async move {
87 Ok::<_, LinesCodecError>(match msg {
88 StreamMsg::Str(msg) => msg,
89 _ => "".into(),
90 })
91 });
92 tokio::pin!(w);
93 drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94 });
95 }
96}
Sourcepub fn with_pipeline_size(self, pipeline_size: usize) -> Self
pub fn with_pipeline_size(self, pipeline_size: usize) -> Self
Set maximum request pipelining.
Up to pipeline_size
number of requests will be handled concurrently.
Default is 1, i.e. no pipelining.
§Panics
if pipeline_size
is 0.
Examples found in repository?
examples/broadcast.rs (line 52)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 let (tx, _) = broadcast::channel(8);
19 tokio::spawn({
20 let tx = tx.clone();
21 async move {
22 for i in 0u64.. {
23 // Error can be ignored.
24 //
25 // It is recommended to broadcast already serialized
26 // `PublishMsg`. This way it only need to serialized once.
27 drop(tx.send(PublishMsg::result(&i)));
28 tokio::time::sleep(Duration::from_secs(1)).await;
29 }
30 }
31 });
32 add_pub_sub(
33 &mut rpc,
34 "subscribe",
35 "subscription",
36 "unsubscribe",
37 move |_params: Params| {
38 Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39 result.unwrap_or_else(|_| {
40 PublishMsg::error(&jsonrpc_core::Error {
41 code: jsonrpc_core::ErrorCode::ServerError(-32000),
42 message: "lagged".into(),
43 data: None,
44 })
45 })
46 }))
47 },
48 );
49 let rpc = Arc::new(rpc);
50 let config = StreamServerConfig::default()
51 .with_channel_size(4)
52 .with_pipeline_size(4)
53 .with_keep_alive(true);
54 let app = jsonrpc_router("/rpc", rpc.clone(), config);
55 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56 axum::serve(listener, app).await.unwrap();
57}
More examples
examples/server.rs (line 61)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 rpc.add_method("sleep", |params: Params| async move {
19 let (x,): (u64,) = params.parse()?;
20 tokio::time::sleep(Duration::from_secs(x)).await;
21 Ok(x.into())
22 });
23 rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24 rpc.add_method("value", |params: Params| async move {
25 let x: Option<u64> = params.parse()?;
26 Ok(x.unwrap_or_default().into())
27 });
28 rpc.add_method("add", |params: Params| async move {
29 let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30 let sum = x + y + z;
31 Ok(sum.into())
32 });
33
34 add_pub_sub(
35 &mut rpc,
36 "subscribe",
37 "subscription",
38 "unsubscribe",
39 |params: Params| {
40 let (interval,): (u64,) = params.parse()?;
41 if interval > 0 {
42 Ok(async_stream::stream! {
43 for i in 0..10 {
44 tokio::time::sleep(Duration::from_secs(interval)).await;
45 yield PublishMsg::result(&i);
46 }
47 yield PublishMsg::error(&jsonrpc_core::Error {
48 code: jsonrpc_core::ErrorCode::ServerError(-32000),
49 message: "ended".into(),
50 data: None,
51 });
52 })
53 } else {
54 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55 }
56 },
57 );
58 let rpc = Arc::new(rpc);
59 let stream_config = StreamServerConfig::default()
60 .with_channel_size(4)
61 .with_pipeline_size(4);
62
63 // HTTP and WS server.
64 let ws_config = stream_config.clone().with_keep_alive(true);
65 let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67 // You can use additional tower-http middlewares to add e.g. CORS.
68 tokio::spawn(async move {
69 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70 axum::serve(listener, app).await.unwrap();
71 });
72
73 // TCP server with line delimited json codec.
74 //
75 // You can also use other transports (e.g. TLS, unix socket) and codecs
76 // (e.g. netstring, JSON splitter).
77 let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78 let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79 while let Ok((s, _)) = listener.accept().await {
80 let rpc = rpc.clone();
81 let stream_config = stream_config.clone();
82 let codec = codec.clone();
83 tokio::spawn(async move {
84 let (r, w) = s.into_split();
85 let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86 let w = FramedWrite::new(w, codec).with(|msg| async move {
87 Ok::<_, LinesCodecError>(match msg {
88 StreamMsg::Str(msg) => msg,
89 _ => "".into(),
90 })
91 });
92 tokio::pin!(w);
93 drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94 });
95 }
96}
Sourcepub fn with_keep_alive(self, keep_alive: bool) -> Self
pub fn with_keep_alive(self, keep_alive: bool) -> Self
Set whether keep alive is enabled.
Default is false.
Examples found in repository?
More examples
examples/openrpc.rs (line 64)
52async fn main() {
53 let doc = my_rpc_doc();
54
55 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
56 add_my_rpc_methods(&mut rpc, MyRpcImpl);
57 rpc.add_method("rpc.discover", move |_| {
58 let doc = doc.clone();
59 async move { Ok(doc) }
60 });
61
62 let rpc = Arc::new(rpc);
63
64 let stream_config = StreamServerConfig::default().with_keep_alive(true);
65 let app = jsonrpc_router("/", rpc, stream_config).layer(CorsLayer::permissive());
66
67 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
68 axum::serve(listener, app).await.unwrap();
69}
examples/broadcast.rs (line 53)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 let (tx, _) = broadcast::channel(8);
19 tokio::spawn({
20 let tx = tx.clone();
21 async move {
22 for i in 0u64.. {
23 // Error can be ignored.
24 //
25 // It is recommended to broadcast already serialized
26 // `PublishMsg`. This way it only need to serialized once.
27 drop(tx.send(PublishMsg::result(&i)));
28 tokio::time::sleep(Duration::from_secs(1)).await;
29 }
30 }
31 });
32 add_pub_sub(
33 &mut rpc,
34 "subscribe",
35 "subscription",
36 "unsubscribe",
37 move |_params: Params| {
38 Ok(BroadcastStream::new(tx.subscribe()).map(|result| {
39 result.unwrap_or_else(|_| {
40 PublishMsg::error(&jsonrpc_core::Error {
41 code: jsonrpc_core::ErrorCode::ServerError(-32000),
42 message: "lagged".into(),
43 data: None,
44 })
45 })
46 }))
47 },
48 );
49 let rpc = Arc::new(rpc);
50 let config = StreamServerConfig::default()
51 .with_channel_size(4)
52 .with_pipeline_size(4)
53 .with_keep_alive(true);
54 let app = jsonrpc_router("/rpc", rpc.clone(), config);
55 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
56 axum::serve(listener, app).await.unwrap();
57}
examples/server.rs (line 64)
16async fn main() {
17 let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
18 rpc.add_method("sleep", |params: Params| async move {
19 let (x,): (u64,) = params.parse()?;
20 tokio::time::sleep(Duration::from_secs(x)).await;
21 Ok(x.into())
22 });
23 rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
24 rpc.add_method("value", |params: Params| async move {
25 let x: Option<u64> = params.parse()?;
26 Ok(x.unwrap_or_default().into())
27 });
28 rpc.add_method("add", |params: Params| async move {
29 let ((x, y), z): ((i32, i32), i32) = params.parse()?;
30 let sum = x + y + z;
31 Ok(sum.into())
32 });
33
34 add_pub_sub(
35 &mut rpc,
36 "subscribe",
37 "subscription",
38 "unsubscribe",
39 |params: Params| {
40 let (interval,): (u64,) = params.parse()?;
41 if interval > 0 {
42 Ok(async_stream::stream! {
43 for i in 0..10 {
44 tokio::time::sleep(Duration::from_secs(interval)).await;
45 yield PublishMsg::result(&i);
46 }
47 yield PublishMsg::error(&jsonrpc_core::Error {
48 code: jsonrpc_core::ErrorCode::ServerError(-32000),
49 message: "ended".into(),
50 data: None,
51 });
52 })
53 } else {
54 Err(jsonrpc_core::Error::invalid_params("invalid interval"))
55 }
56 },
57 );
58 let rpc = Arc::new(rpc);
59 let stream_config = StreamServerConfig::default()
60 .with_channel_size(4)
61 .with_pipeline_size(4);
62
63 // HTTP and WS server.
64 let ws_config = stream_config.clone().with_keep_alive(true);
65 let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
66
67 // You can use additional tower-http middlewares to add e.g. CORS.
68 tokio::spawn(async move {
69 let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
70 axum::serve(listener, app).await.unwrap();
71 });
72
73 // TCP server with line delimited json codec.
74 //
75 // You can also use other transports (e.g. TLS, unix socket) and codecs
76 // (e.g. netstring, JSON splitter).
77 let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
78 let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
79 while let Ok((s, _)) = listener.accept().await {
80 let rpc = rpc.clone();
81 let stream_config = stream_config.clone();
82 let codec = codec.clone();
83 tokio::spawn(async move {
84 let (r, w) = s.into_split();
85 let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
86 let w = FramedWrite::new(w, codec).with(|msg| async move {
87 Ok::<_, LinesCodecError>(match msg {
88 StreamMsg::Str(msg) => msg,
89 _ => "".into(),
90 })
91 });
92 tokio::pin!(w);
93 drop(serve_stream_sink(&rpc, w, r, stream_config).await);
94 });
95 }
96}
Sourcepub fn with_keep_alive_duration(self, keep_alive_duration: Duration) -> Self
pub fn with_keep_alive_duration(self, keep_alive_duration: Duration) -> Self
Wait for keep_alive_duration
after the last message is received, then
close the connection.
Default is 60 seconds.
Sourcepub fn with_ping_interval(self, ping_interval: Duration) -> Self
pub fn with_ping_interval(self, ping_interval: Duration) -> Self
Set interval to send ping messages.
Default is 19 seconds.
pub fn with_shutdown<S>(self, shutdown: S) -> StreamServerConfig
Trait Implementations§
Source§impl Clone for StreamServerConfig
impl Clone for StreamServerConfig
Source§fn clone(&self) -> StreamServerConfig
fn clone(&self) -> StreamServerConfig
Returns a copy of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreAuto Trait Implementations§
impl Freeze for StreamServerConfig
impl !RefUnwindSafe for StreamServerConfig
impl Send for StreamServerConfig
impl Sync for StreamServerConfig
impl Unpin for StreamServerConfig
impl !UnwindSafe for StreamServerConfig
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more