micro_tower/session/
stream.rs

1use bytes::BytesMut;
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3use tower::ServiceExt;
4
5use crate::api;
6use crate::shutdown::Controller;
7use crate::util::BoxError;
8
9const BUF_SIZE: usize = 1024;
10
11/// Spawns a future to handle streams of requests (e.g. a tcp stream).
12///
13/// # Errors
14///
15/// Will return `Err` if failed to read bytes from stream or send bytes to stream.
16pub async fn spawn_fut<St, Sv>(
17    mut stream: St,
18    mut service: Sv,
19    controller: Controller,
20) -> Result<(), BoxError>
21where
22    St: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
23    Sv: tower::Service<BytesMut, Response = BytesMut, Error = api::Error> + Send + 'static,
24    Sv::Future: Send,
25{
26    let mut buf = BytesMut::new();
27    let mut local_buf = [0_u8; BUF_SIZE];
28    loop {
29        buf.clear();
30        loop {
31            let num = tokio::select! {
32                res = stream.read(&mut local_buf) => res,
33                _ = controller.wait_for_shutdown() => return Ok(())
34            };
35            let num = num?;
36            buf.extend_from_slice(&local_buf[..num]);
37            if num < BUF_SIZE {
38                break;
39            }
40        }
41        tracing::trace!(message = "buffer read", size = buf.len());
42        let ready = match service.ready().await {
43            Ok(service) => service,
44            Err(err) => {
45                return Err(err.err);
46            }
47        };
48        buf = match ready.call(buf).await {
49            Ok(buf) => buf,
50            Err(err) => {
51                let report = crate::report!(err.err.as_ref());
52                tracing::error!("{report:?}");
53                err.buf
54            }
55        };
56        tracing::trace!(message = "write buffer", size = buf.len());
57        stream.write_buf(&mut buf).await?;
58    }
59}