1#![allow(missing_docs)]
4
5use crate::http::response_entity::ResponseEntity;
6use crate::stream::ConnectionStreamWrite;
7use crate::util::unwrap_some;
8use crate::{
9 trace_log, EntitySerializer, MimeType, TiiError, TiiResult, TypeSystem, TypeSystemError,
10};
11use defer_heavy::defer;
12use libflate::gzip;
13use std::any::{Any, TypeId};
14use std::cell::RefCell;
15use std::fmt::{Debug, Formatter};
16use std::fs::File;
17use std::io;
18use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
19use std::path::{Path, PathBuf};
20
21pub(crate) type ResponseBodyHandler = dyn FnOnce(&dyn ResponseBodySink) -> io::Result<()> + Send;
22
23#[repr(transparent)]
24#[derive(Debug)]
25pub struct ResponseBody(ResponseBodyInner);
26
27enum ResponseBodyInner {
29 Entity(ResponseEntity),
30
31 FixedSizeBinaryData(Vec<u8>),
33
34 FixedSizeBinaryDataStaticSlice(&'static [u8]),
36
37 FixedSizeTextData(String),
39
40 FixedSizeFile(Box<dyn ReadAndSeek>, u64),
43
44 Stream(Option<Box<ResponseBodyHandler>>),
49
50 ChunkedStream(Option<Box<ResponseBodyHandler>>),
53
54 ExternallyGzippedData(Vec<u8>),
56
57 ExternallyGzippedFile(Box<dyn ReadAndSeek>, u64),
59
60 ChunkedGzipStream(Option<Box<ResponseBodyHandler>>),
62
63 ChunkedGzipFile(Box<dyn ReadAndSeek>),
65}
66
67impl Debug for ResponseBodyInner {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 match self {
70 ResponseBodyInner::FixedSizeBinaryDataStaticSlice(data) => {
71 f.write_fmt(format_args!("ResponseBody::FixedSizeBinaryDataStaticSlice({data:?})"))
72 }
73 ResponseBodyInner::FixedSizeBinaryData(data) => {
74 f.write_fmt(format_args!("ResponseBody::FixedSizeBinaryData({data:?})"))
75 }
76 ResponseBodyInner::FixedSizeTextData(data) => {
77 f.write_fmt(format_args!("ResponseBody::FixedSizeTextData({data:?})"))
78 }
79 ResponseBodyInner::FixedSizeFile(_, size) => {
80 f.write_fmt(format_args!("ResponseBody::FixedSizeFile(file, {size})"))
81 }
82 ResponseBodyInner::Stream(_) => f.write_str("ResponseBody::Stream(...)"),
83 ResponseBodyInner::ChunkedStream(_) => f.write_str("ResponseBody::ChunkedStream(...)"),
84 ResponseBodyInner::ExternallyGzippedData(_) => {
85 f.write_str("ResponseBody::ExternallyGzippedData(...)")
86 }
87 ResponseBodyInner::ExternallyGzippedFile(_, _) => {
88 f.write_str("ResponseBody::ExternallyGzippedFile(...)")
89 }
90 ResponseBodyInner::ChunkedGzipStream(_) => {
91 f.write_str("ResponseBody::ChunkedGzipStream(...)")
92 }
93 ResponseBodyInner::ChunkedGzipFile(_) => f.write_str("ResponseBody::ChunkedGzipFile(...)"),
94 ResponseBodyInner::Entity(entity) => {
95 f.write_fmt(format_args!("ResponseBody::Entity({entity:?})"))
96 }
97 }
98 }
99}
100
101pub(crate) trait ReadAndSeek: Read + Seek + Send {}
103
104impl<T> ReadAndSeek for T where T: Read + Seek + Send {}
105
106pub trait ResponseBodySink {
107 fn write(&self, buffer: &[u8]) -> io::Result<usize>;
108 fn write_all(&self, buffer: &[u8]) -> io::Result<()>;
109
110 fn as_write(&self) -> ResponseBodySinkAsWrite<'_>;
111}
112impl ResponseBody {
113 pub fn from_entity<T: Any + Send + Debug + 'static>(
114 entity: T,
115 serializer: impl EntitySerializer<T> + 'static,
116 ) -> Self {
117 Self(ResponseBodyInner::Entity(ResponseEntity::new(entity, serializer)))
118 }
119 pub fn from_data(data: Vec<u8>) -> Self {
120 Self(ResponseBodyInner::FixedSizeBinaryData(data))
121 }
122
123 pub fn from_externally_gzipped_data(data_in_gzip_format: Vec<u8>) -> Self {
125 Self(ResponseBodyInner::ExternallyGzippedData(data_in_gzip_format))
126 }
127
128 pub fn from_data_with_gzip_in_memory(data: impl AsRef<[u8]>) -> Self {
130 let data = data.as_ref();
131 let mut encoder =
133 crate::util::unwrap_ok(crate::util::new_gzip_encoder(Vec::with_capacity(data.len() + 128)));
134 crate::util::unwrap_ok(encoder.write_all(data));
135 let buffer = crate::util::unwrap_ok(encoder.finish().into_result());
136 Self(ResponseBodyInner::ExternallyGzippedData(buffer))
137 }
138
139 pub fn from_string(data: impl ToString) -> Self {
140 Self(ResponseBodyInner::FixedSizeTextData(data.to_string()))
141 }
142
143 pub fn from_slice<T: AsRef<[u8]> + ?Sized>(data: &T) -> Self {
144 Self(ResponseBodyInner::FixedSizeBinaryData(data.as_ref().to_vec()))
145 }
146
147 pub fn from_static_slice(data: &'static [u8]) -> Self {
153 Self(ResponseBodyInner::FixedSizeBinaryDataStaticSlice(data))
154 }
155
156 pub fn from_file<T: Read + Seek + Send + 'static>(mut file: T) -> io::Result<Self> {
157 file.seek(SeekFrom::End(0))?;
158 let size = file.stream_position()?;
159 Ok(Self(ResponseBodyInner::FixedSizeFile(Box::new(file), size)))
160 }
161
162 pub fn from_file_with_chunked_gzip<T: Read + Seek + Send + 'static>(file: T) -> Self {
163 Self(ResponseBodyInner::ChunkedGzipFile(Box::new(file)))
164 }
165
166 pub fn from_externally_gzipped_file<T: Read + Seek + Send + 'static>(
167 mut file_in_gzip_format: T,
168 ) -> io::Result<Self> {
169 file_in_gzip_format.seek(SeekFrom::End(0))?;
170 let size = file_in_gzip_format.stream_position()?;
171 Ok(Self(ResponseBodyInner::ExternallyGzippedFile(Box::new(file_in_gzip_format), size)))
172 }
173
174 pub fn chunked<T: FnOnce(&dyn ResponseBodySink) -> io::Result<()> + Send + 'static>(
175 streamer: T,
176 ) -> Self {
177 Self(ResponseBodyInner::ChunkedStream(Some(Box::new(streamer))))
178 }
179
180 pub fn streamed<T: FnOnce(&dyn ResponseBodySink) -> io::Result<()> + Send + 'static>(
181 streamer: T,
182 ) -> Self {
183 Self(ResponseBodyInner::Stream(Some(Box::new(streamer))))
184 }
185
186 pub fn chunked_gzip<T: FnOnce(&dyn ResponseBodySink) -> io::Result<()> + Send + 'static>(
190 streamer: T,
191 ) -> Self {
192 Self(ResponseBodyInner::ChunkedGzipStream(Some(Box::new(streamer))))
193 }
194
195 pub fn serialize_entity(self, mime: &MimeType) -> TiiResult<ResponseBody> {
200 Ok(match self.0 {
201 ResponseBodyInner::Entity(entity) => {
202 ResponseBody(ResponseBodyInner::FixedSizeBinaryData(entity.serialize(mime)?))
203 }
204 other => ResponseBody(other),
205 })
206 }
207
208 pub fn get_entity(&self) -> Option<&dyn Any> {
210 match &self.0 {
211 ResponseBodyInner::Entity(entity) => Some(entity.get_entity()),
212 _ => None,
213 }
214 }
215
216 pub fn get_entity_mut(&mut self) -> Option<&mut dyn Any> {
218 match &mut self.0 {
219 ResponseBodyInner::Entity(entity) => Some(entity.get_entity_mut()),
220 _ => None,
221 }
222 }
223
224 pub fn get_entity_serializer(&self) -> Option<&dyn Any> {
225 match &self.0 {
226 ResponseBodyInner::Entity(entity) => Some(entity.get_serializer()),
227 _ => None,
228 }
229 }
230
231 pub fn get_entity_serializer_mut(&mut self) -> Option<&mut dyn Any> {
232 match &mut self.0 {
233 ResponseBodyInner::Entity(entity) => Some(entity.get_serializer_mut()),
234 _ => None,
235 }
236 }
237
238 #[allow(clippy::type_complexity)] pub fn try_into_entity(self) -> Result<(Box<dyn Any>, Box<dyn Any>), Self> {
242 match self.0 {
243 ResponseBodyInner::Entity(entity) => Ok(entity.into_inner()),
244 _ => Err(self),
245 }
246 }
247
248 pub(crate) fn entity_cast<DST: Any + ?Sized + 'static, RET: Any + 'static>(
249 &self,
250 type_system: &TypeSystem,
251 receiver: impl FnOnce(&DST) -> RET + 'static,
252 ) -> Result<RET, TypeSystemError> {
253 let ety = self.get_entity().ok_or(TypeSystemError::SourceTypeUnknown)?;
254 let caster = type_system.type_cast_wrapper(ety.type_id(), TypeId::of::<DST>())?;
255 caster.call(ety, receiver)
256 }
257 pub(crate) fn entity_cast_mut<DST: Any + ?Sized + 'static, RET: Any + 'static>(
258 &mut self,
259 type_system: &TypeSystem,
260 receiver: impl FnOnce(&mut DST) -> RET + 'static,
261 ) -> Result<RET, TypeSystemError> {
262 let ety = self.get_entity_mut().ok_or(TypeSystemError::SourceTypeUnknown)?;
263 let caster = type_system.type_cast_wrapper_mut(Any::type_id(ety), TypeId::of::<DST>())?;
264 caster.call(ety, receiver)
265 }
266
267 pub fn write_to_raw(self, mime: &MimeType, stream: &mut impl Write) -> TiiResult<()> {
273 match self.0 {
274 ResponseBodyInner::FixedSizeBinaryDataStaticSlice(data) => stream.write_all(data)?,
275 ResponseBodyInner::FixedSizeBinaryData(data) => stream.write_all(data.as_ref())?,
276 ResponseBodyInner::FixedSizeTextData(text) => stream.write_all(text.as_ref())?,
277 ResponseBodyInner::FixedSizeFile(mut data, _)
278 | ResponseBodyInner::ChunkedGzipFile(mut data) => {
279 let mut io_buf = [0u8; 0x1_00_00];
280 loop {
281 let len = data.read(&mut io_buf)?;
282 if len == 0 {
283 return Ok(());
284 }
285 stream.write_all(unwrap_some(io_buf.get(..len)))?
286 }
287 }
288 ResponseBodyInner::Stream(mut handler)
289 | ResponseBodyInner::ChunkedStream(mut handler)
290 | ResponseBodyInner::ChunkedGzipStream(mut handler) => {
291 let sink = RawSink(RefCell::new(stream));
292 handler.take().ok_or_else(|| {
293 io::Error::new(io::ErrorKind::UnexpectedEof, "stream can only be written once")
294 })?(&sink)?;
295 }
296 ResponseBodyInner::ExternallyGzippedData(data) => {
297 let mut io_buf = [0u8; 0x1_00_00];
298 let mut dec = gzip::Decoder::new(&*data)?;
299 loop {
300 let len = dec.read(&mut io_buf)?;
301 if len == 0 {
302 return Ok(());
303 }
304 stream.write_all(unwrap_some(io_buf.get(..len)))?
305 }
306 }
307 ResponseBodyInner::ExternallyGzippedFile(data, _) => {
308 let mut io_buf = [0u8; 0x1_00_00];
309 let mut dec = gzip::Decoder::new(data)?;
310 loop {
311 let len = dec.read(&mut io_buf)?;
312 if len == 0 {
313 return Ok(());
314 }
315 stream.write_all(unwrap_some(io_buf.get(..len)))?
316 }
317 }
318 ResponseBodyInner::Entity(entity) => stream.write_all(&entity.serialize(mime)?)?,
319 };
320
321 Ok(())
322 }
323
324 pub fn write_to_http<T: ConnectionStreamWrite + ?Sized>(
329 self,
330 request_id: u128,
331 stream: &T,
332 ) -> TiiResult<()> {
333 match self.0 {
334 ResponseBodyInner::FixedSizeBinaryDataStaticSlice(data) => stream.write_all(data)?,
335 ResponseBodyInner::FixedSizeBinaryData(data)
336 | ResponseBodyInner::ExternallyGzippedData(data) => stream.write_all(data.as_slice())?,
337 ResponseBodyInner::FixedSizeTextData(text) => stream.write_all(text.as_bytes())?,
338 ResponseBodyInner::FixedSizeFile(mut file, size)
339 | ResponseBodyInner::ExternallyGzippedFile(mut file, size) => {
340 let mut io_buf = [0u8; 0x1_00_00];
342 let mut written = 0u64;
343 file.seek(io::SeekFrom::Start(0))?;
344 loop {
345 let read = file.read(io_buf.as_mut_slice())?;
346 if read == 0 {
347 if written != size {
348 return Err(TiiError::from_io_kind(io::ErrorKind::InvalidData));
349 }
350 return Ok(());
351 }
352
353 stream.write_all(io_buf.get_mut(..read).ok_or(io::Error::other("buffer overflow"))?)?;
354 written = written
355 .checked_add(u64::try_from(read).map_err(|_| io::Error::other("usize->u64 failed"))?)
356 .ok_or(io::Error::other("u64 overflow"))?;
357 }
358 }
359 ResponseBodyInner::Stream(mut handler) => {
360 handler.take().ok_or_else(|| TiiError::from_io_kind(io::ErrorKind::UnexpectedEof))?(
361 &StreamSink(stream.as_stream_write()),
362 )?
363 }
364
365 ResponseBodyInner::ChunkedStream(mut handler) => {
366 let sink = ChunkedSink(request_id, stream.as_stream_write());
367 handler.take().ok_or_else(|| {
368 io::Error::new(io::ErrorKind::UnexpectedEof, "stream can only be written once")
369 })?(&sink)?;
370 sink.finish()?
371 }
372 ResponseBodyInner::ChunkedGzipStream(mut handler) => {
373 let sink = GzipChunkedSink::new(request_id, stream.as_stream_write())?;
374 handler.take().ok_or_else(|| {
375 io::Error::new(io::ErrorKind::UnexpectedEof, "stream can only be written once")
376 })?(&sink)?;
377 sink.finish()?
378 }
379 ResponseBodyInner::ChunkedGzipFile(mut file) => {
380 file.seek(io::SeekFrom::Start(0))?;
381 let sink = GzipChunkedSink::new(request_id, stream.as_stream_write())?;
382 let mut io_buf = [0u8; 0x1_00_00];
383 loop {
384 let count = file.read(io_buf.as_mut_slice())?;
385 if count == 0 {
386 break;
387 }
388 sink.write_all(unwrap_some(io_buf.get(..count)))?;
389 }
390 sink.finish()?
391 }
392 ResponseBodyInner::Entity(entity) => {
393 let sink = ChunkedSink(request_id, stream.as_stream_write());
396 sink.write_all(&entity.serialize(&MimeType::ApplicationOctetStream)?)?;
397 sink.finish()?
398 }
399 };
400
401 Ok(())
402 }
403
404 pub fn is_entity(&self) -> bool {
405 matches!(self.0, ResponseBodyInner::Entity(_))
406 }
407
408 pub fn is_chunked(&self) -> bool {
409 matches!(
410 self.0,
411 ResponseBodyInner::ChunkedStream(_)
412 | ResponseBodyInner::ChunkedGzipStream(_)
413 | ResponseBodyInner::ChunkedGzipFile(_)
414 | ResponseBodyInner::Entity(_)
415 )
416 }
417
418 pub fn get_content_encoding(&self) -> Option<&'static str> {
419 Some(match self.0 {
420 ResponseBodyInner::ExternallyGzippedData(_) => "gzip",
421 ResponseBodyInner::ExternallyGzippedFile(_, _) => "gzip",
422 ResponseBodyInner::ChunkedGzipStream(_) => "gzip",
423 ResponseBodyInner::ChunkedGzipFile(_) => "gzip",
424 _ => return None,
425 })
426 }
427
428 pub fn content_length(&self) -> Option<u64> {
429 match &self.0 {
430 ResponseBodyInner::FixedSizeBinaryDataStaticSlice(data) => u64::try_from(data.len()).ok(),
431 ResponseBodyInner::FixedSizeBinaryData(data) => u64::try_from(data.len()).ok(),
432 ResponseBodyInner::FixedSizeTextData(data) => u64::try_from(data.len()).ok(),
433 ResponseBodyInner::FixedSizeFile(_, sz) => Some(*sz),
434 ResponseBodyInner::ExternallyGzippedData(data) => u64::try_from(data.len()).ok(),
435 ResponseBodyInner::ExternallyGzippedFile(_, sz) => Some(*sz),
436 _ => None,
437 }
438 }
439}
440
441pub struct ResponseBodySinkAsWrite<'a>(&'a dyn ResponseBodySink);
442
443impl Write for ResponseBodySinkAsWrite<'_> {
444 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
445 self.0.write(buf)
446 }
447
448 fn flush(&mut self) -> io::Result<()> {
449 Ok(())
451 }
452}
453
454struct StreamSink<'a>(&'a dyn ConnectionStreamWrite);
455
456impl ResponseBodySink for StreamSink<'_> {
457 fn write(&self, buffer: &[u8]) -> io::Result<usize> {
458 self.0.write(buffer)
459 }
460
461 fn write_all(&self, buffer: &[u8]) -> io::Result<()> {
462 self.0.write_all(buffer)
463 }
464
465 fn as_write(&self) -> ResponseBodySinkAsWrite<'_> {
466 ResponseBodySinkAsWrite(self)
467 }
468}
469
470struct GzipChunkedSink<'a>(u128, RefCell<Option<gzip::Encoder<BufWriter<ChunkedSink<'a>>>>>);
471
472impl<'a> GzipChunkedSink<'a> {
473 fn new(
474 request_id: u128,
475 stream: &'a dyn ConnectionStreamWrite,
476 ) -> io::Result<GzipChunkedSink<'a>> {
477 Ok(Self(
483 request_id,
484 RefCell::new(Some(crate::util::new_gzip_encoder(BufWriter::new(ChunkedSink(
485 request_id, stream,
486 )))?)),
487 ))
488 }
489}
490
491impl GzipChunkedSink<'_> {
492 fn finish(&self) -> io::Result<()> {
493 trace_log!("tii: Request {} GzipChunkedSink::finish", self.0);
494 defer! {
495 trace_log!("tii: Request {} GzipChunkedSink::finish done", self.0);
496 }
497 unwrap_some(self.1.borrow_mut().take()).finish().into_result()?.into_inner()?.finish()
499 }
500}
501
502impl ResponseBodySink for GzipChunkedSink<'_> {
503 fn write(&self, buffer: &[u8]) -> io::Result<usize> {
504 trace_log!("tii: Request {} GzipChunkedSink::write with {} bytes", self.0, buffer.len());
505 unwrap_some(self.1.borrow_mut().as_mut()).write(buffer)
507 }
508
509 fn write_all(&self, buffer: &[u8]) -> io::Result<()> {
510 trace_log!("tii: Request {} GzipChunkedSink::write_all with {} bytes", self.0, buffer.len());
511 unwrap_some(self.1.borrow_mut().as_mut()).write_all(buffer)
513 }
514
515 fn as_write(&self) -> ResponseBodySinkAsWrite<'_> {
516 ResponseBodySinkAsWrite(self)
517 }
518}
519
520struct RawSink<'a>(RefCell<&'a mut dyn Write>);
521
522impl ResponseBodySink for RawSink<'_> {
523 fn write(&self, buffer: &[u8]) -> io::Result<usize> {
524 self.0.borrow_mut().write(buffer)
525 }
526
527 fn write_all(&self, buffer: &[u8]) -> io::Result<()> {
528 self.0.borrow_mut().write_all(buffer)
529 }
530
531 fn as_write(&self) -> ResponseBodySinkAsWrite<'_> {
532 ResponseBodySinkAsWrite(self)
533 }
534}
535static CHUNK_LUT: [&[u8]; 8096] = tii_procmacro::hex_chunked_lut!(8096);
536
537struct ChunkedSink<'a>(u128, &'a dyn ConnectionStreamWrite);
538
539impl Write for ChunkedSink<'_> {
540 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
541 ResponseBodySink::write(self, buf)
542 }
543
544 fn flush(&mut self) -> io::Result<()> {
545 Ok(())
547 }
548}
549
550impl ResponseBodySink for ChunkedSink<'_> {
551 fn write(&self, buffer: &[u8]) -> io::Result<usize> {
552 self.write_all(buffer)?;
553 Ok(buffer.len())
554 }
555
556 fn write_all(&self, buffer: &[u8]) -> io::Result<()> {
557 if buffer.is_empty() {
558 return Ok(());
559 }
560
561 trace_log!(
562 "tii: Request {} ChunkedSink -> Emitting a HTTP chunk with {} bytes",
563 self.0,
564 buffer.len()
565 );
566
567 if let Some(lut) = CHUNK_LUT.get(buffer.len()) {
568 self.1.write_all(lut)?;
569 } else {
570 self.1.write_all(format!("{:X}\r\n", buffer.len()).as_bytes())?;
571 }
572
573 self.1.write_all(buffer)?;
574 self.1.write_all(b"\r\n")
575 }
576
577 fn as_write(&self) -> ResponseBodySinkAsWrite<'_> {
578 ResponseBodySinkAsWrite(self)
579 }
580}
581
582impl ChunkedSink<'_> {
583 fn finish(&self) -> io::Result<()> {
584 trace_log!("tii: Request {} ChunkedSink -> Emitting trailer", self.0);
585 self.1.write_all(b"0\r\n\r\n")
586 }
587}
588
589impl From<Vec<u8>> for ResponseBody {
590 fn from(value: Vec<u8>) -> Self {
591 ResponseBody::from_data(value)
592 }
593}
594
595impl From<String> for ResponseBody {
596 fn from(value: String) -> Self {
597 ResponseBody::from_data(value.into_bytes())
598 }
599}
600
601impl From<&str> for ResponseBody {
602 fn from(value: &str) -> Self {
603 ResponseBody::from_slice(value)
604 }
605}
606
607impl From<&[u8]> for ResponseBody {
608 fn from(value: &[u8]) -> Self {
609 ResponseBody::from_slice(value)
610 }
611}
612
613impl TryFrom<File> for ResponseBody {
614 type Error = io::Error;
615 fn try_from(value: File) -> Result<Self, Self::Error> {
616 ResponseBody::from_file(value)
617 }
618}
619
620impl TryFrom<&Path> for ResponseBody {
621 type Error = io::Error;
622 fn try_from(value: &Path) -> Result<Self, Self::Error> {
623 File::open(Path::new(value))?.try_into()
624 }
625}
626
627impl TryFrom<&PathBuf> for ResponseBody {
628 type Error = io::Error;
629 fn try_from(value: &PathBuf) -> Result<Self, Self::Error> {
630 File::open(Path::new(value))?.try_into()
631 }
632}
633
634impl TryFrom<PathBuf> for ResponseBody {
635 type Error = io::Error;
636 fn try_from(value: PathBuf) -> Result<Self, Self::Error> {
637 File::open(Path::new(value.as_path()))?.try_into()
638 }
639}