forked_tarpc/server/
incoming.rs

1use super::{
2    limits::{channels_per_key::MaxChannelsPerKey, requests_per_channel::MaxRequestsPerChannel},
3    Channel,
4};
5use futures::prelude::*;
6use std::{fmt, hash::Hash};
7
8#[cfg(feature = "tokio1")]
9use super::{tokio::TokioServerExecutor, Serve};
10
11/// An extension trait for [streams](futures::prelude::Stream) of [`Channels`](Channel).
12pub trait Incoming<C>
13where
14    Self: Sized + Stream<Item = C>,
15    C: Channel,
16{
17    /// Enforces channel per-key limits.
18    fn max_channels_per_key<K, KF>(self, n: u32, keymaker: KF) -> MaxChannelsPerKey<Self, K, KF>
19    where
20        K: fmt::Display + Eq + Hash + Clone + Unpin,
21        KF: Fn(&C) -> K,
22    {
23        MaxChannelsPerKey::new(self, n, keymaker)
24    }
25
26    /// Caps the number of concurrent requests per channel.
27    fn max_concurrent_requests_per_channel(self, n: usize) -> MaxRequestsPerChannel<Self> {
28        MaxRequestsPerChannel::new(self, n)
29    }
30
31    /// [Executes](Channel::execute) each incoming channel. Each channel will be handled
32    /// concurrently by spawning on tokio's default executor, and each request will be also
33    /// be spawned on tokio's default executor.
34    #[cfg(feature = "tokio1")]
35    #[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
36    fn execute<S>(self, serve: S) -> TokioServerExecutor<Self, S>
37    where
38        S: Serve<C::Req, Resp = C::Resp>,
39    {
40        TokioServerExecutor::new(self, serve)
41    }
42}
43
44impl<S, C> Incoming<C> for S
45where
46    S: Sized + Stream<Item = C>,
47    C: Channel,
48{
49}