micro_tower/session/
tcp.rs

1use std::net::SocketAddr;
2
3use bytes::BytesMut;
4use tokio::net::TcpListener;
5use tokio::task::JoinHandle;
6use tower::{BoxError, Service, ServiceExt};
7
8use crate::{api, shutdown::Controller};
9
10pub fn spawn<SB>(
11    addr: SocketAddr,
12    mut builder: SB,
13    controller: &Controller,
14) -> JoinHandle<Result<(), BoxError>>
15where
16    SB: Service<SocketAddr, Error = BoxError> + Send + 'static,
17    SB::Future: Send,
18    SB::Response: Service<BytesMut, Response = BytesMut, Error = api::Error> + Send,
19    <SB::Response as Service<BytesMut>>::Future: Send,
20{
21    let controller = controller.clone();
22    tokio::spawn(async move {
23        let listener = TcpListener::bind(addr).await?;
24        tracing::info!(message = "listening on", port = addr.port());
25
26        loop {
27            tracing::trace!(message = "wait for new connections", port = addr.port());
28
29            let (stream, addr) = tokio::select! {
30                result = listener.accept() => result?,
31                _ = controller.wait_for_shutdown() => return Ok(())
32            };
33
34            let service = match builder.ready().await {
35                Ok(service) => service,
36                Err(err) => {
37                    let report = crate::report!(err.as_ref());
38                    tracing::error!("{report:?}");
39                    continue;
40                }
41            };
42            let service = match service.call(addr).await {
43                Ok(service) => service,
44                Err(err) => {
45                    let report = crate::report!(err.as_ref());
46                    tracing::error!("{report:?}");
47                    continue;
48                }
49            };
50
51            tracing::info!(message = "new connection", addr = format!("{addr}"));
52
53            let controller = controller.clone();
54            tokio::spawn(async move {
55                if let Err(err) = super::stream::spawn_fut(stream, service, controller).await {
56                    let report = crate::report!(err.as_ref());
57                    tracing::error!("{report:?}");
58                }
59            });
60        }
61    })
62}