chuchi_core/body/
sync_reader.rs1use super::{size_limit_reached, BodyAsyncReader, BoxedSyncRead, Constraints};
2
3use std::io;
4use std::io::Read;
5use std::pin::Pin;
6
7use tokio_util::io::SyncIoBridge;
8
9use bytes::Bytes;
10
11pub struct BodySyncReader {
14 inner: Inner,
15}
16
17impl BodySyncReader {
18 pub(super) fn new(inner: super::Inner, constraints: Constraints) -> Self {
19 let inner = match inner {
20 super::Inner::Empty => Inner::Empty,
21 super::Inner::Bytes(b) => Inner::Sync(ConstrainedSyncReader::new(
22 InnerSync::Bytes(b),
23 constraints,
24 )),
25 super::Inner::SyncReader(r) => {
26 Inner::Sync(ConstrainedSyncReader::new(
27 InnerSync::SyncReader(r),
28 constraints,
29 ))
30 }
31 i => Inner::Async(SyncIoBridge::new(Box::pin(
32 BodyAsyncReader::new(i, constraints),
33 ))),
34 };
35
36 Self { inner }
37 }
38
39 pub fn needs_spawn_blocking(&self) -> bool {
41 matches!(self.inner, Inner::Async(_))
42 }
43}
44
45impl Read for BodySyncReader {
46 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
47 self.inner.read(buf)
48 }
49}
50
51enum Inner {
52 Empty,
53 Sync(ConstrainedSyncReader<InnerSync>),
54 Async(SyncIoBridge<Pin<Box<BodyAsyncReader>>>),
55}
56
57impl Read for Inner {
58 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
59 match self {
60 Self::Empty => Ok(0),
61 Self::Sync(r) => r.read(buf),
62 Self::Async(r) => r.read(buf),
63 }
64 }
65}
66
67enum InnerSync {
68 Bytes(Bytes),
69 SyncReader(BoxedSyncRead),
70}
71
72impl Read for InnerSync {
73 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
74 match self {
75 Self::Bytes(b) if b.is_empty() => Ok(0),
76 Self::Bytes(b) => {
77 let read = buf.len().min(b.len());
78 let r = b.split_to(read);
79 buf[..read].copy_from_slice(&r);
80 Ok(read)
81 }
82 Self::SyncReader(r) => r.read(buf),
83 }
84 }
85}
86
87struct ConstrainedSyncReader<R> {
89 inner: R,
90 size_limit: Option<usize>,
91}
92
93impl<R> ConstrainedSyncReader<R> {
94 pub fn new(reader: R, constraints: Constraints) -> Self {
95 Self {
96 inner: reader,
97 size_limit: constraints.size,
98 }
99 }
100}
101
102impl<R: Read> Read for ConstrainedSyncReader<R> {
103 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
104 let read = self.inner.read(buf)?;
105
106 if let Some(size_limit) = &mut self.size_limit {
107 match size_limit.checked_sub(read) {
108 Some(ns) => *size_limit = ns,
109 None => return Err(size_limit_reached("sync reader to big")),
110 }
111 }
112
113 Ok(read)
114 }
115}
116
117pub(super) fn sync_reader_into_bytes(
118 r: BoxedSyncRead,
119 constraints: Constraints,
120) -> io::Result<Bytes> {
121 let mut reader = ConstrainedSyncReader::new(r, constraints);
122
123 let mut v = vec![];
124 reader.read_to_end(&mut v)?;
125
126 Ok(v.into())
127}