use std::future::Future;
#[cfg(feature = "net")]
use std::pin::Pin;
#[cfg(feature = "net")]
use std::task::{Context, Poll};
use err_context::AnyError;
#[cfg(feature = "net")]
use super::net::Accept;
use super::FutureInstaller;
#[cfg(feature = "net")]
use log::error;
use log::trace;
use pin_project::pin_project;
use spirit::fragment::Transformation;
pub struct ToFuture<F>(pub F);
impl<F, Fut, R, II, SF> Transformation<R, II, SF> for ToFuture<F>
where
F: FnMut(R, &SF) -> Result<Fut, AnyError>,
Fut: Future<Output = ()> + 'static,
{
type OutputResource = Fut;
type OutputInstaller = FutureInstaller;
fn installer(&mut self, _: II, _: &str) -> FutureInstaller {
FutureInstaller::default()
}
fn transform(&mut self, r: R, cfg: &SF, name: &str) -> Result<Fut, AnyError> {
trace!("Wrapping {} into a future", name);
(self.0)(r, cfg)
}
}
pub struct ToFutureUnconfigured<F>(pub F);
impl<F, Fut, R, II, SF> Transformation<R, II, SF> for ToFutureUnconfigured<F>
where
F: FnMut(R) -> Fut,
Fut: Future<Output = ()> + 'static,
{
type OutputResource = Fut;
type OutputInstaller = FutureInstaller;
fn installer(&mut self, _: II, _: &str) -> FutureInstaller {
FutureInstaller::default()
}
fn transform(&mut self, r: R, _: &SF, name: &str) -> Result<Fut, AnyError> {
trace!("Wrapping {} into a future", name);
Ok((self.0)(r))
}
}
#[cfg(feature = "net")]
#[pin_project]
pub struct Acceptor<A, F, C> {
#[pin]
accept: A,
f: F,
cfg: C,
name: &'static str,
}
#[cfg(feature = "net")]
impl<A, F, C, Fut> Future for Acceptor<A, F, C>
where
A: Accept,
F: FnMut(A::Connection, &C) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
type Output = ();
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<()> {
let mut me = self.project();
loop {
match me.accept.as_mut().poll_accept(ctx) {
Poll::Ready(Err(e)) => {
error!("Giving up acceptor {}: {}", me.name, e);
return Poll::Ready(());
}
Poll::Ready(Ok(conn)) => {
trace!("Got a new connection on {}", me.name);
let fut = (me.f)(conn, me.cfg);
tokio::spawn(async move { fut.await });
}
Poll::Pending => return Poll::Pending,
}
}
}
}
#[cfg(feature = "net")]
pub struct PerConnection<F>(pub F);
#[cfg(feature = "net")]
impl<F, Fut, A, II, SF> Transformation<A, II, SF> for PerConnection<F>
where
A: Accept,
F: Clone + FnMut(A::Connection, &SF) -> Fut + 'static,
Fut: Future<Output = ()> + 'static,
SF: Clone + 'static,
{
type OutputResource = Acceptor<A, F, SF>;
type OutputInstaller = FutureInstaller;
fn installer(&mut self, _: II, _: &str) -> FutureInstaller {
FutureInstaller::default()
}
fn transform(
&mut self,
accept: A,
cfg: &SF,
name: &'static str,
) -> Result<Acceptor<A, F, SF>, AnyError> {
trace!("Creating new acceptor for {}", name);
let f = self.0.clone();
let cfg = cfg.clone();
Ok(Acceptor {
accept,
f,
cfg,
name,
})
}
}
#[cfg(feature = "net")]
pub struct PerConnectionInit<F>(pub F);
#[cfg(feature = "net")]
impl<FA, FC, Fut, A, II, SF> Transformation<A, II, SF> for PerConnectionInit<FA>
where
A: Accept,
FA: FnMut(&A, &SF) -> FC + 'static,
FC: FnMut(A::Connection, &SF) -> Fut + 'static,
Fut: Future<Output = ()> + 'static,
SF: Clone + 'static,
{
type OutputResource = Acceptor<A, FC, SF>;
type OutputInstaller = FutureInstaller;
fn installer(&mut self, _: II, _: &str) -> FutureInstaller {
FutureInstaller::default()
}
fn transform(
&mut self,
accept: A,
cfg: &SF,
name: &'static str,
) -> Result<Acceptor<A, FC, SF>, AnyError> {
trace!("Creating new acceptor for {}", name);
let f = (self.0)(&accept, cfg);
let cfg = cfg.clone();
Ok(Acceptor {
accept,
f,
cfg,
name,
})
}
}