micro_tower/session/
stream.rs1use bytes::BytesMut;
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3use tower::ServiceExt;
4
5use crate::api;
6use crate::shutdown::Controller;
7use crate::util::BoxError;
8
9const BUF_SIZE: usize = 1024;
10
11pub async fn spawn_fut<St, Sv>(
17 mut stream: St,
18 mut service: Sv,
19 controller: Controller,
20) -> Result<(), BoxError>
21where
22 St: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static,
23 Sv: tower::Service<BytesMut, Response = BytesMut, Error = api::Error> + Send + 'static,
24 Sv::Future: Send,
25{
26 let mut buf = BytesMut::new();
27 let mut local_buf = [0_u8; BUF_SIZE];
28 loop {
29 buf.clear();
30 loop {
31 let num = tokio::select! {
32 res = stream.read(&mut local_buf) => res,
33 _ = controller.wait_for_shutdown() => return Ok(())
34 };
35 let num = num?;
36 buf.extend_from_slice(&local_buf[..num]);
37 if num < BUF_SIZE {
38 break;
39 }
40 }
41 tracing::trace!(message = "buffer read", size = buf.len());
42 let ready = match service.ready().await {
43 Ok(service) => service,
44 Err(err) => {
45 return Err(err.err);
46 }
47 };
48 buf = match ready.call(buf).await {
49 Ok(buf) => buf,
50 Err(err) => {
51 let report = crate::report!(err.err.as_ref());
52 tracing::error!("{report:?}");
53 err.buf
54 }
55 };
56 tracing::trace!(message = "write buffer", size = buf.len());
57 stream.write_buf(&mut buf).await?;
58 }
59}