use super::limits::channels_per_key::MaxChannelsPerKey;
use super::limits::requests_per_channel::MaxRequestsPerChannel;
use super::{Channel, RequestName, Serve};
use futures::prelude::*;
use std::fmt;
use std::hash::Hash;
pub trait Incoming<C>
where
Self: Sized + Stream<Item = C>,
C: Channel,
{
fn max_channels_per_key<K, KF>(self, n: u32, keymaker: KF) -> MaxChannelsPerKey<Self, K, KF>
where
K: fmt::Display + Eq + Hash + Clone + Unpin,
KF: Fn(&C) -> K,
{
MaxChannelsPerKey::new(self, n, keymaker)
}
fn max_concurrent_requests_per_channel(self, n: usize) -> MaxRequestsPerChannel<Self> {
MaxRequestsPerChannel::new(self, n)
}
fn execute<S>(self, serve: S) -> impl Stream<Item = impl Stream<Item = impl Future<Output = ()>>>
where
C::Req: RequestName,
S: Serve<Req = C::Req, Resp = C::Resp> + Clone,
{
self.map(move |channel| channel.execute(serve.clone()))
}
}
#[cfg(feature = "tokio1")]
pub async fn spawn_incoming(incoming: impl Stream<Item = impl Stream<Item = impl Future<Output = ()> + Send + 'static> + Send + 'static>) {
use futures::pin_mut;
pin_mut!(incoming);
while let Some(channel) = incoming.next().await {
tokio::spawn(async move {
pin_mut!(channel);
while let Some(request) = channel.next().await {
tokio::spawn(request);
}
});
}
}
impl<S, C> Incoming<C> for S
where
S: Sized + Stream<Item = C>,
C: Channel,
{
}