micro_tower/session/
tcp.rs1use std::net::SocketAddr;
2
3use bytes::BytesMut;
4use tokio::net::TcpListener;
5use tokio::task::JoinHandle;
6use tower::{BoxError, Service, ServiceExt};
7
8use crate::{api, shutdown::Controller};
9
10pub fn spawn<SB>(
11 addr: SocketAddr,
12 mut builder: SB,
13 controller: &Controller,
14) -> JoinHandle<Result<(), BoxError>>
15where
16 SB: Service<SocketAddr, Error = BoxError> + Send + 'static,
17 SB::Future: Send,
18 SB::Response: Service<BytesMut, Response = BytesMut, Error = api::Error> + Send,
19 <SB::Response as Service<BytesMut>>::Future: Send,
20{
21 let controller = controller.clone();
22 tokio::spawn(async move {
23 let listener = TcpListener::bind(addr).await?;
24 tracing::info!(message = "listening on", port = addr.port());
25
26 loop {
27 tracing::trace!(message = "wait for new connections", port = addr.port());
28
29 let (stream, addr) = tokio::select! {
30 result = listener.accept() => result?,
31 _ = controller.wait_for_shutdown() => return Ok(())
32 };
33
34 let service = match builder.ready().await {
35 Ok(service) => service,
36 Err(err) => {
37 let report = crate::report!(err.as_ref());
38 tracing::error!("{report:?}");
39 continue;
40 }
41 };
42 let service = match service.call(addr).await {
43 Ok(service) => service,
44 Err(err) => {
45 let report = crate::report!(err.as_ref());
46 tracing::error!("{report:?}");
47 continue;
48 }
49 };
50
51 tracing::info!(message = "new connection", addr = format!("{addr}"));
52
53 let controller = controller.clone();
54 tokio::spawn(async move {
55 if let Err(err) = super::stream::spawn_fut(stream, service, controller).await {
56 let report = crate::report!(err.as_ref());
57 tracing::error!("{report:?}");
58 }
59 });
60 }
61 })
62}