chuchi_core/body/
sync_reader.rs

1use super::{size_limit_reached, BodyAsyncReader, BoxedSyncRead, Constraints};
2
3use std::io;
4use std::io::Read;
5use std::pin::Pin;
6
7use tokio_util::io::SyncIoBridge;
8
9use bytes::Bytes;
10
11/// ## Panics
12/// If not read within `task::spawn_blocking`.
13pub struct BodySyncReader {
14	inner: Inner,
15}
16
17impl BodySyncReader {
18	pub(super) fn new(inner: super::Inner, constraints: Constraints) -> Self {
19		let inner = match inner {
20			super::Inner::Empty => Inner::Empty,
21			super::Inner::Bytes(b) => Inner::Sync(ConstrainedSyncReader::new(
22				InnerSync::Bytes(b),
23				constraints,
24			)),
25			super::Inner::SyncReader(r) => {
26				Inner::Sync(ConstrainedSyncReader::new(
27					InnerSync::SyncReader(r),
28					constraints,
29				))
30			}
31			i => Inner::Async(SyncIoBridge::new(Box::pin(
32				BodyAsyncReader::new(i, constraints),
33			))),
34		};
35
36		Self { inner }
37	}
38
39	/// Returns true if this needs to be run within spawn_blocking.
40	pub fn needs_spawn_blocking(&self) -> bool {
41		matches!(self.inner, Inner::Async(_))
42	}
43}
44
45impl Read for BodySyncReader {
46	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
47		self.inner.read(buf)
48	}
49}
50
51enum Inner {
52	Empty,
53	Sync(ConstrainedSyncReader<InnerSync>),
54	Async(SyncIoBridge<Pin<Box<BodyAsyncReader>>>),
55}
56
57impl Read for Inner {
58	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
59		match self {
60			Self::Empty => Ok(0),
61			Self::Sync(r) => r.read(buf),
62			Self::Async(r) => r.read(buf),
63		}
64	}
65}
66
67enum InnerSync {
68	Bytes(Bytes),
69	SyncReader(BoxedSyncRead),
70}
71
72impl Read for InnerSync {
73	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
74		match self {
75			Self::Bytes(b) if b.is_empty() => Ok(0),
76			Self::Bytes(b) => {
77				let read = buf.len().min(b.len());
78				let r = b.split_to(read);
79				buf[..read].copy_from_slice(&r);
80				Ok(read)
81			}
82			Self::SyncReader(r) => r.read(buf),
83		}
84	}
85}
86
87/// Only using size constraint
88struct ConstrainedSyncReader<R> {
89	inner: R,
90	size_limit: Option<usize>,
91}
92
93impl<R> ConstrainedSyncReader<R> {
94	pub fn new(reader: R, constraints: Constraints) -> Self {
95		Self {
96			inner: reader,
97			size_limit: constraints.size,
98		}
99	}
100}
101
102impl<R: Read> Read for ConstrainedSyncReader<R> {
103	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
104		let read = self.inner.read(buf)?;
105
106		if let Some(size_limit) = &mut self.size_limit {
107			match size_limit.checked_sub(read) {
108				Some(ns) => *size_limit = ns,
109				None => return Err(size_limit_reached("sync reader to big")),
110			}
111		}
112
113		Ok(read)
114	}
115}
116
117pub(super) fn sync_reader_into_bytes(
118	r: BoxedSyncRead,
119	constraints: Constraints,
120) -> io::Result<Bytes> {
121	let mut reader = ConstrainedSyncReader::new(r, constraints);
122
123	let mut v = vec![];
124	reader.read_to_end(&mut v)?;
125
126	Ok(v.into())
127}