use super::{
limits::{channels_per_key::MaxChannelsPerKey, requests_per_channel::MaxRequestsPerChannel},
Channel, RequestName, Serve,
};
use futures::prelude::*;
use std::{fmt, hash::Hash};
pub trait Incoming<C>
where
Self: Sized + Stream<Item = C>,
C: Channel,
{
fn max_channels_per_key<K, KF>(self, n: u32, keymaker: KF) -> MaxChannelsPerKey<Self, K, KF>
where
K: fmt::Display + Eq + Hash + Clone + Unpin,
KF: Fn(&C) -> K,
{
MaxChannelsPerKey::new(self, n, keymaker)
}
fn max_concurrent_requests_per_channel(self, n: usize) -> MaxRequestsPerChannel<Self> {
MaxRequestsPerChannel::new(self, n)
}
fn execute<S>(
self,
serve: S,
) -> impl Stream<Item = impl Stream<Item = impl Future<Output = ()>>>
where
C::Req: RequestName,
S: Serve<Req = C::Req, Resp = C::Resp> + Clone,
{
self.map(move |channel| channel.execute(serve.clone()))
}
}
#[cfg(feature = "tokio1")]
pub async fn spawn_incoming(
incoming: impl Stream<
Item = impl Stream<Item = impl Future<Output = ()> + Send + 'static> + Send + 'static,
>,
) {
use futures::pin_mut;
pin_mut!(incoming);
while let Some(channel) = incoming.next().await {
tokio::spawn(async move {
pin_mut!(channel);
while let Some(request) = channel.next().await {
tokio::spawn(request);
}
});
}
}
impl<S, C> Incoming<C> for S
where
S: Sized + Stream<Item = C>,
C: Channel,
{
}