1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use super::{size_limit_reached, BodyAsyncReader, BoxedSyncRead, Constraints};

use std::io;
use std::io::Read;
use std::pin::Pin;

use tokio_util::io::SyncIoBridge;

use bytes::Bytes;

/// ## Panics
/// If not read within `task::spawn_blocking`.
pub struct BodySyncReader {
	inner: Inner,
}

impl BodySyncReader {
	pub(super) fn new(inner: super::Inner, constraints: Constraints) -> Self {
		let inner = match inner {
			super::Inner::Empty => Inner::Empty,
			super::Inner::Bytes(b) => Inner::Sync(ConstrainedSyncReader::new(
				InnerSync::Bytes(b),
				constraints,
			)),
			super::Inner::SyncReader(r) => {
				Inner::Sync(ConstrainedSyncReader::new(
					InnerSync::SyncReader(r),
					constraints,
				))
			}
			i => Inner::Async(SyncIoBridge::new(Box::pin(
				BodyAsyncReader::new(i, constraints),
			))),
		};

		Self { inner }
	}

	/// Returns true if this needs to be run within spawn_blocking.
	pub fn needs_spawn_blocking(&self) -> bool {
		matches!(self.inner, Inner::Async(_))
	}
}

impl Read for BodySyncReader {
	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
		self.inner.read(buf)
	}
}

enum Inner {
	Empty,
	Sync(ConstrainedSyncReader<InnerSync>),
	Async(SyncIoBridge<Pin<Box<BodyAsyncReader>>>),
}

impl Read for Inner {
	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
		match self {
			Self::Empty => Ok(0),
			Self::Sync(r) => r.read(buf),
			Self::Async(r) => r.read(buf),
		}
	}
}

enum InnerSync {
	Bytes(Bytes),
	SyncReader(BoxedSyncRead),
}

impl Read for InnerSync {
	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
		match self {
			Self::Bytes(b) if b.is_empty() => Ok(0),
			Self::Bytes(b) => {
				let read = buf.len().min(b.len());
				let r = b.split_to(read);
				buf[..read].copy_from_slice(&r);
				Ok(read)
			}
			Self::SyncReader(r) => r.read(buf),
		}
	}
}

/// Only using size constraint
struct ConstrainedSyncReader<R> {
	inner: R,
	size_limit: Option<usize>,
}

impl<R> ConstrainedSyncReader<R> {
	pub fn new(reader: R, constraints: Constraints) -> Self {
		Self {
			inner: reader,
			size_limit: constraints.size,
		}
	}
}

impl<R: Read> Read for ConstrainedSyncReader<R> {
	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
		let read = self.inner.read(buf)?;

		if let Some(size_limit) = &mut self.size_limit {
			match size_limit.checked_sub(read) {
				Some(ns) => *size_limit = ns,
				None => return Err(size_limit_reached("sync reader to big")),
			}
		}

		Ok(read)
	}
}

pub(super) fn sync_reader_into_bytes(
	r: BoxedSyncRead,
	constraints: Constraints,
) -> io::Result<Bytes> {
	let mut reader = ConstrainedSyncReader::new(r, constraints);

	let mut v = vec![];
	reader.read_to_end(&mut v)?;

	Ok(v.into())
}