tokio_fastcgi/
lib.rs

1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
3//! # Basic Example
4//! ```no_run
5#![doc = include_str!("../examples/simple.rs")]
6//! ```
7use log::{trace, warn};
8use std::fmt::Debug;
9use std::marker::Unpin;
10use std::io::{Cursor, Read, Write};
11use std::collections::{HashMap, hash_map::Entry};
12use std::sync::Arc;
13use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
14use tokio::sync::{Mutex, MutexGuard};
15use std::convert::TryFrom;
16use byteorder::{BigEndian, ReadBytesExt};
17use std::future::Future;
18
19/// The size of the record header is 8 bytes.
20const RECORD_HEADER_SIZE: usize = 8;
21
22/// Flag for FCGI_BeginRequestBody
23const FCGI_KEEP_CONN: u8 = 0x01;
24
25/// Static panic message for a failed lock.
26const ERR_LOCK_FAILED: &str = "A request must not be processed by multiple threads.";
27
28/// The type of the request id. This is always u16 but makes external code more readable.
29type RequestId = u16;
30
31/// Types for the parameter iterator
32type ParamsIterator<'i> = dyn Iterator<Item=(&'i str, &'i [u8])> + 'i;
33
34/// Types for the parameter iterator with string conversion
35type StrParamsIterator<'i> = dyn Iterator<Item=(&'i str, Option<&'i str>)> + 'i;
36
37/// Type returned by [`get_stdin`](Request::get_stdin) and [`get_data`](Request::get_data).
38/// It makes passing around the streams easier.
39pub type OwnedInStream<'a> = MutexGuard<'a, InStream>;
40
41/// Error type for TryFrom on StdReqType and SysReqType
42#[derive(Debug)]
43enum TypeError {
44	UnknownRecordType(u8)
45}
46
47/// Enum containing all request record types that can be handled by Request::Process.
48#[derive(Clone, Copy, Debug, PartialEq)]
49enum StdReqType {
50	BeginRequest = 1,
51	Params = 4,
52	StdIn = 5,
53	Data = 8
54}
55
56impl From<StdReqType> for u8 {
57	fn from(rt: StdReqType) -> Self {
58		rt as u8
59	}
60}
61
62impl TryFrom<u8> for StdReqType {
63	type Error = TypeError;
64	fn try_from(value: u8) -> Result<Self, Self::Error> {
65		match value {
66			1 => Ok(Self::BeginRequest),
67			4 => Ok(Self::Params),
68			5 => Ok(Self::StdIn),
69			8 => Ok(Self::Data),
70			_ => Err(TypeError::UnknownRecordType(value))
71		}
72	}
73}
74
75/// Enum containing all response types generated by Request::Process.
76#[derive(Clone, Copy, Debug, PartialEq)]
77enum StdRespType {
78	EndRequest = 3,
79	StdOut = 6,
80	StdErr = 7
81}
82
83impl From<StdRespType> for u8 {
84	fn from(rt: StdRespType) -> Self {
85		rt as u8
86	}
87}
88
89/// Enum containing all request record types that must be handled by Request::process_sys.
90#[derive(Clone, Copy, Debug, PartialEq)]
91enum SysReqType {
92	AbortRequest = 2,
93	GetValues = 9
94}
95
96impl From<SysReqType> for u8 {
97	fn from(rt: SysReqType) -> Self {
98		rt as u8
99	}
100}
101
102impl TryFrom<u8> for SysReqType {
103	type Error = TypeError;
104	fn try_from(value: u8) -> Result<Self, Self::Error> {
105		match value {
106			2 => Ok(Self::AbortRequest),
107			9 => Ok(Self::GetValues),
108			_ => Err(TypeError::UnknownRecordType(value))
109		}
110	}
111}
112
113/// Enum containing all response record types that can be generated by Request::process_sys.
114#[derive(Clone, Copy, Debug, PartialEq)]
115enum SysRespType {
116	GetValuesResult = 10,
117	UnknownType = 11
118}
119
120impl From<SysRespType> for u8 {
121	fn from(rt: SysRespType) -> Self {
122		rt as u8
123	}
124}
125
126/// Container for std and sys request and response types.
127#[derive(Clone, Copy, Debug)]
128enum Category<S: Copy, T: Copy> {
129	Std(S),
130	Sys(T)
131}
132
133impl <S: Copy + TryFrom<u8, Error = TypeError>, T: Copy + TryFrom<u8, Error = TypeError>> TryFrom<u8> for Category<S, T> {
134	type Error = TypeError;
135	fn try_from(value: u8) -> Result<Self, Self::Error> {
136		if let Ok(result) = S::try_from(value) {
137			Ok(Self::Std(result))
138		} else {
139			T::try_from(value).map(Self::Sys)
140		}
141	}
142}
143
144impl <S: std::convert::Into<u8> + Copy, T: std::convert::Into<u8> + Copy> From<Category<S, T>> for u8 {
145	fn from(cat: Category<S, T>) -> Self {
146		match cat {
147			Category::<S, T>::Std(std) => std.into(),
148			Category::<S, T>::Sys(sys) => sys.into()
149		}
150	}
151}
152
153/// Type for all known request record types
154type RequestType = Category<StdReqType, SysReqType>;
155
156/// Type for all known response record types
157type ResponseType = Category<StdRespType, SysRespType>;
158
159/// Enum containing the role that is requested from the FastCGI client. See the different
160/// variants for a description of the roles and their input and output streams.
161#[derive(PartialEq, Debug)]
162pub enum Role {
163	/// A FastCGI responder receives all the information associated with an HTTP
164	/// request and generates an HTTP response. A responder receives the following
165	/// information from the web-server:
166	///
167	/// * Environment variables (see [`get_param`](Request::get_param)/[`get_str_param`](Request::get_str_param))
168	/// * StdIn (see [`get_stdin`](Request::get_stdin))
169	///
170	/// A responder has the following communication channels at its disposal:
171	/// * Result code (see [`RequestResult`])
172	/// * StdOut (see [`get_stdout`](Request::get_stdout))
173	/// * StdErr (see [`get_stderr`](Request::get_stderr))
174	///
175	/// see the [FastCGI specification](https://fastcgi-archives.github.io/FastCGI_Specification.html#S6.2) for more Information
176	Responder,
177
178	/// A FastCGI authorizer receives all the information associated with an HTTP
179	/// request and generates an authorized/unauthorized decision. In case of an
180	/// authorized decision the authorizer can also associate name-value pairs with
181	/// the HTTP request. A responder receives the following information from the
182	/// web-server:
183	///
184	/// * Environment variables and request parameters (see
185	///   [`get_param`](Request::get_param)/[`get_str_param`](Request::get_str_param))
186	///
187	/// An authorizer has the following communication channels at its disposal:
188	///
189	/// * Result code (see [`RequestResult`])
190	/// * StdOut (see [`get_stdout`](Request::get_stdout))
191	/// * StdErr (see [`get_stderr`](Request::get_stderr))
192	///
193	/// see the [FastCGI specification](https://fastcgi-archives.github.io/FastCGI_Specification.html#S6.3) for more Information
194	Authorizer,
195
196	/// A FastCGI filter receives all the information associated with an HTTP
197	/// request, plus an extra stream of data from a file stored on the Web server,
198	/// and generates a “filtered” version of the data stream as an HTTP response. A
199	/// filter receives the following information from the web-server:
200	///
201	/// * Environment variables, request parameters and additional information
202	///   (`FCGI_DATA_LAST_MOD` and `FCGI_DATA_LENGTH`) (see
203	///   [`get_param`](Request::get_param)/[`get_str_param`](Request::get_str_param))
204	/// * StdIn (see [`get_stdin`](Request::get_stdin))
205	/// * File Data from the web-server (see [`get_data`](Request::get_data))
206	///
207	/// A filter has the following communication channels at its disposal:
208	///
209	/// * Result code (see [`RequestResult`])
210	/// * StdOut (see [`get_stdout`](Request::get_stdout))
211	/// * StdErr (see [`get_stderr`](Request::get_stderr))
212	///
213	/// see the [FastCGI specification](https://fastcgi-archives.github.io/FastCGI_Specification.html#S6.4) for more Information
214	Filter
215}
216
217impl Role {
218	fn from_number(rl_num: u16) -> Option<Self> {
219		match rl_num {
220			1 => Some(Role::Responder),
221			2 => Some(Role::Authorizer),
222			3 => Some(Role::Filter),
223			_ => None
224		}
225	}
226}
227
228
229/// The result of a FastCGI request.
230///
231/// This enum is returned by the [`process`](Request::process) method of the
232///[`Request`] struct.  The meaning of the values is defined by the FastCGI
233/// specification.
234#[derive(Copy, Clone)]
235pub enum RequestResult {
236	/// The request completed successfully. The returned status value is defined by
237	/// the [role](Role) of the FastCGI application.
238	///
239	/// # Result codes
240	///
241	/// The application returns the status code that the CGI program would have
242	/// returned via the `exit` system call.
243	///
244	Complete(u32),
245	/// The application ran out of resources (for example database connections). The
246	/// request is rejected.
247	Overloaded,
248	/// The application is not prepared to handle the role requested by the
249	/// web-server. For example if a FastCGI responder is called as a filter or an
250	/// authorizer.
251	UnknownRole
252}
253
254impl RequestResult {
255	fn app_status(self) -> u32 {
256		match self {
257			Self::Complete(app_status) => app_status,
258			_ => 0
259		}
260	}
261}
262
263impl From<RequestResult> for u8 {
264	/// Allow the RequestResult to be converted into a u8.
265	/// This method returns the magic number that must be used as the
266	/// result field of the FastCGI protocol.
267	fn from(rr: RequestResult) -> Self {
268		match rr {
269			RequestResult::Complete(_) => 0,
270			RequestResult::Overloaded => 2,
271			RequestResult::UnknownRole => 3
272		}
273	}
274}
275
276/// Errors that can be returned by calls to [`process`](Request::process).
277#[derive(Debug)]
278pub enum Error {
279	/// The input stream was already closed and can not be reused. This indicates
280	/// an error within the call sequence, like calling `process` twice or the
281	/// web-server sending more data after closing `StdIn` or `Data`.
282	StreamAlreadyDone,
283
284	/// `write` was called on an output stream that was already closed by a call to
285	/// `close`.
286	StreamAlreadyClosed,
287
288	/// The web-server violated the FastCGI specification. For example by sending a
289	/// `StdIn` record before sending a `BeginRequest` record.
290	SequenceError,
291
292	/// The record version is not 1. This should never happen since the FastCGI
293	/// specification does not define any other record versions.
294	InvalidRecordVersion,
295
296	/// The web-server sent an unknown role number. This is most likely a bug in the
297	/// FastCGI implementation of the web-server.
298	InvalidRoleNumber,
299
300	/// This error is never returned to the user of the library. It is internally
301	/// handled by the `tokio-fastcgi` crate. The library returns a
302	/// `FCGI_UNKNOWN_TYPE` record to the web-server.
303	UnknownRecordType(RequestId, u8),
304
305	/// An IoError occurred. Most likely the connection to the web-server got lost or
306	/// was interrupted. Some I/O errors are handled by `tokio-fastcgi`. If the
307	/// web-server closes the FastCGI connection after all requests have been
308	/// processed no error is returned and the EOF error is just swallowed.
309	IoError(std::io::Error)
310}
311
312impl std::fmt::Display for Error {
313	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
314		match self {
315			Error::StreamAlreadyDone => write!(f, "Input stream is already done"),
316			Error::StreamAlreadyClosed => write!(f, "Output stream is already closed"),
317			Error::SequenceError => write!(f, "Records out of sequence "),
318			Error::InvalidRecordVersion => write!(f, "Only record version 1 supported"),
319			Error::InvalidRoleNumber => write!(f, "Unkown role pass from server"),
320			Error::UnknownRecordType(request_id, type_id) => write!(f, "Unkown record type {} in request {} received", type_id, request_id),
321			Error::IoError(error) => write!(f, "I/O error: {}", error)
322		}
323	}
324}
325
326impl std::error::Error for Error {
327	fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
328		match self {
329			Error::IoError(source) => Some(source),
330			_ => None
331		}
332	}
333}
334
335impl From<std::io::Error> for Error {
336	fn from(io_error: std::io::Error) -> Self {
337		Error::IoError(io_error)
338	}
339}
340
341/// Represents a record received by the web-server.
342struct Record {
343	record_type: RequestType,
344	request_id: RequestId,
345	content: Vec<u8>
346}
347
348impl Record {
349	async fn new<R: AsyncRead + Unpin>(rd: &mut R) -> Result<Self, Error> {
350		let mut header_buffer = [0; RECORD_HEADER_SIZE];
351
352		rd.read_exact(&mut header_buffer).await?;
353
354		let mut header_slice = &header_buffer[..];
355
356		// Check the FastCGI version
357		if byteorder::ReadBytesExt::read_u8(&mut header_slice).unwrap() != 1 {
358			return Err(Error::InvalidRecordVersion);
359		}
360
361		// Parse the remaining header fields
362		// Unwrap the record_type field not yet. An error on the record_type can be handled
363		// and we must read the remaining data to keep the I/O stream in sync.
364		let record_type = RequestType::try_from(byteorder::ReadBytesExt::read_u8(&mut header_slice).unwrap());
365		let request_id = byteorder::ReadBytesExt::read_u16::<BigEndian>(&mut header_slice)?;
366		let content_length = byteorder::ReadBytesExt::read_u16::<BigEndian>(&mut header_slice).unwrap() as usize;
367		let padding_length = byteorder::ReadBytesExt::read_u8(&mut header_slice).unwrap() as u64;
368
369		// Allocate the buffer for the content and read everything asynchronously.
370		// `with_capacity` can not be used, because Tokio does not support this.
371		let mut content = vec![0; content_length];
372		rd.read_exact(&mut content).await?;
373
374		// If there is some padding at the end of the record. Discard it.
375		if padding_length > 0 {
376			tokio::io::copy(&mut rd.take(padding_length), &mut tokio::io::sink()).await?;
377		}
378
379		trace!("FastCGI: In record {{T:{:?}, ID: {}, L:{}}}", record_type, request_id, RECORD_HEADER_SIZE + content.len() + padding_length as usize);
380
381		// Now we unwrap the record_type. If we fail now, the record as been completely read.
382		// Before we can unwrap the TypeError must be translated into a full blown Error::UnknownRecordType value by adding the request_id.
383		let record_type = record_type.map_err(|error| {
384			let TypeError::UnknownRecordType(record_type_nr) = error;
385			Error::UnknownRecordType(request_id, record_type_nr)
386		})?;
387
388		Ok(Self {
389			record_type,
390			request_id,
391			content
392		})
393	}
394
395	/// Checks if this record is a system record. If that's the case Request::update should not be called
396	/// on this one. Just call Request::process_sys to process the system record.
397	/// This method only returns true if the record type can be processed by Request::sys_process and
398	/// the Record is complete.
399	fn is_sys_record(&self) -> bool {
400		matches!(self.record_type, Category::Sys(_))
401	}
402
403	fn get_content(&self) -> &[u8] {
404		&self.content
405	}
406
407	fn get_request_id(&self) -> RequestId {
408		self.request_id
409	}
410}
411
412/// Implements a data stream from the web-server to the FastCGI application.
413///
414/// All data is buffered in memory before being returned to the FastCGI
415/// application. Therefore only a synchronous interface is implemented.
416///
417/// The data of the stream can be accessed via the methods of the [`Read`
418/// trait](Read).
419#[derive(Debug)]
420pub struct InStream {
421	data: Vec<u8>,
422	read_pos: Option<usize>
423}
424
425impl Read for InStream {
426	/// Read implementation for Stream.
427	///
428	/// *Beware*: Calling read or read_exact on a stream that is not done will panic!
429	fn read(&mut self, out: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
430		let read_pos = self.read_pos.unwrap();
431		let c = std::io::Read::read(&mut &self.data[read_pos..], out)?;
432		self.read_pos = Some(read_pos + c);
433		Ok(c)
434	}
435
436	/// Read_exact implementation for Stream.
437	///
438	/// *Beware*: Calling read or read_exact on a stream that is not done will panic!
439	fn read_exact(&mut self, out: &mut [u8]) -> std::result::Result<(), std::io::Error> {
440		let read_pos = self.read_pos.unwrap();
441		std::io::Read::read_exact(&mut &self.data[read_pos..], out)?;
442		self.read_pos = Some(read_pos + out.len());
443		Ok(())
444	}
445}
446
447impl InStream{
448	/// Creates a new stream waiting for input.
449	///
450	/// If `already_done` is set to true the stream is created done and no data can be added to it.
451	fn new(already_done: bool) -> Self {
452		InStream {
453			data: Vec::new(),
454			read_pos: if already_done { Some(0) } else { None }
455		}
456	}
457
458	/// Appends the passed data slice to the internal vector.
459	///
460	/// If the slice is empty, this is interpreted as an EOF marker and marks this
461	/// stream as done.
462	/// Calling this method on a done stream will always return a StreamAlreadyDone
463	/// error.
464	fn append(&mut self, data: &[u8]) -> Result<(), Error> {
465		if ! data.is_empty() {
466			if self.read_pos.is_none() {
467				self.data.extend_from_slice(data);
468				Ok(())
469			} else {
470				Err(Error::StreamAlreadyDone)
471			}
472		} else {
473			self.read_pos = Some(0);
474			Ok(())
475		}
476	}
477
478	/// Checks if this stream is done (EOF).
479	///
480	/// This is signaled by the web server by passing an empty record for this stream.
481	fn is_done(&self) -> bool {
482		self.read_pos.is_some()
483	}
484}
485
486/// Represents a FastCGI request that can be handled by the application.
487///
488/// An instance of this struct is returned by the [`next`](Requests::next) function
489/// of the [Requests] struct. It represents one request that should be handled
490/// via FastCGI. Normally [`process`](Request::process) is called on every
491/// instance that is returned. The request gets passed to the callback function
492/// and can be used to get the input/output streams and environment values.
493pub struct Request <W: AsyncWrite + Unpin> {
494	/// Contains the role that this request is requesting from the FastCGI
495	/// application.
496	///
497	/// If the FastCGI application can not comply to this role the callback
498	/// passed to [`process`](Request::process) should return
499	/// [`RequestResult::UnknownRole`].
500	pub role: Role,
501	keep_connection: bool,
502	request_id: RequestId,
503	params: HashMap<String, Vec<u8>>,
504	params_done: bool,
505	orw: Arc<OutRecordWriter<W>>,
506	stdin: Mutex<InStream>,
507	data: Mutex<InStream>
508}
509
510impl <W: AsyncWrite + Unpin> Request<W> {
511	fn new(record: &Record, writer: Arc<Mutex<W>>) -> Result<Self, Error> {
512		let mut content = record.get_content();
513
514		if let Category::Std(StdReqType::BeginRequest) = record.record_type {
515			if let Some(role) = Role::from_number(byteorder::ReadBytesExt::read_u16::<BigEndian>(&mut content).unwrap()) { //We're reading from am memory buffer. So there is something deeply wrong if this fails.
516				let keep_connection = (byteorder::ReadBytesExt::read_u8(&mut content)? & FCGI_KEEP_CONN) == FCGI_KEEP_CONN;
517
518				Ok(Self {
519					params: HashMap::new(),
520					params_done: false,
521					orw: Arc::from(OutRecordWriter::new(writer, record.request_id)),
522					stdin: Mutex::from(InStream::new(role == Role::Authorizer)), // Authorizers do not get an stdin stream
523					data: Mutex::from(InStream::new(role != Role::Filter)),      // Only filters get a data stream
524					role,
525					keep_connection,
526					request_id: record.request_id
527				})
528			} else {
529				Err(Error::InvalidRoleNumber)
530			}
531		} else {
532			Err(Error::SequenceError)
533		}
534	}
535
536	fn read_length<T: Read>(src: &mut T) -> Result<u32, std::io::Error> {
537		let length: u32 = u32::from(src.read_u8()?);
538
539		if length & 0x80 == 0 {
540			Ok(length)
541		} else {
542			let length_byte2 = u32::from(src.read_u8()?);
543			let length_byte10 = u32::from(src.read_u16::<BigEndian>()?);
544
545			Ok((length & 0x7f) << 24 | length_byte2 << 16 | length_byte10)
546		}
547	}
548
549	fn add_nv_pairs(params: &mut HashMap<String, Vec<u8>>, src: &[u8], lowercase_keys: bool) -> Result<(), std::io::Error>{
550		let mut src_slice = src;
551
552		while !src_slice.is_empty() {
553			let name_length = Request::<W>::read_length(&mut src_slice)?;
554			let value_length = Request::<W>::read_length(&mut src_slice)?;
555
556			let mut name_buffer = vec![0; name_length as usize];
557			let mut value_buffer = vec![0; value_length as usize];
558
559			std::io::Read::read_exact(&mut src_slice, &mut name_buffer)?;
560			std::io::Read::read_exact(&mut src_slice, &mut value_buffer)?;
561
562			let key = String::from_utf8_lossy(&name_buffer);
563			let key = if lowercase_keys {
564				key.to_ascii_lowercase()
565			} else {
566				key.into_owned()
567			};
568
569			trace!("FastCGI: NV-Pair[\"{}\"]=\"{}\"", key, String::from_utf8_lossy(&value_buffer));
570
571			params.insert(key, value_buffer);
572		}
573
574		Ok(())
575	}
576
577	/// Returns the parameter with the given name as a byte vector.
578	///
579	/// Parameters are passed to the FastCGI application as name value pairs.
580	/// Parameters can contain environment variables or other parameters that
581	/// the web-server wants to pass to the application.
582	///
583	/// If the parameter does not exist `None` is returned.
584	///
585	/// ## Example
586	///
587	/// ```rust
588	/// # use tokio::io::{empty, sink};
589	/// # use tokio_fastcgi::{Requests, RequestResult};
590	/// # #[tokio::main]
591	/// # async fn main() {
592	/// # let mut requests = Requests::new(empty(), sink(), 1, 1);
593	/// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
594	/// request.process(|request| async move {
595	///   if let Some(binary_data) = request.get_param("BINARY_DATA") {
596	///     assert_eq!(binary_data, &[10, 20, 30]);
597	///   }
598	///
599	///   RequestResult::Complete(0)
600	/// });
601	/// # } }
602	/// ```
603	pub fn get_param(&self, name: &str) -> Option<&Vec<u8>> {
604		if self.params_done {
605			self.params.get(&name.to_ascii_lowercase())
606		} else {
607			None
608		}
609	}
610
611	/// Returns the parameter with the given name as a UTF-8 string.
612	///
613	/// Parameters are passed to the FastCGI application as name value pairs.
614	/// Parameters can contain environment variables or other parameters that
615	/// the web-server wants to pass to the application.
616	///
617	/// If the parameter does not exist or is not valid UTF-8 `None` is returned.
618	///
619	/// ## Example
620	///
621	/// ```rust
622	/// # use tokio::io::{empty, sink};
623	/// # use tokio_fastcgi::{Requests, RequestResult};
624	/// # #[tokio::main]
625	/// # async fn main() {
626	/// # let mut requests = Requests::new(empty(), sink(), 1, 1);
627	/// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
628	/// request.process(|request| async move {
629	///   if let Some(uri) = request.get_str_param("REQUEST_URI") {
630	///     assert_eq!(uri, "/index.html");
631	///   }
632	///
633	///   RequestResult::Complete(0)
634	/// });
635	/// # } }
636	/// ```
637	pub fn get_str_param(&self, name: &str) -> Option<&str> {
638		if self.params_done {
639			match self.params.get(&name.to_ascii_lowercase()).map(|v| std::str::from_utf8(v)) {
640				None => None,
641				Some(Ok(value)) => Some(value),
642				Some(Err(_)) => {
643					warn!("FastCGI: Parameter {} is not valid utf8.", name);
644					None
645				}
646			}
647		} else {
648			None
649		}
650	}
651
652	/// Returns an iterator over all parameters.
653	///
654	/// The parameter value is a [u8] slice containing the raw data for the parameter.
655	/// If you need the parameter values as string, take a look at [str_params_iter](Request::str_params_iter).
656	///
657	/// ## Example
658	///
659	/// ```rust
660	/// # use tokio::io::{empty, sink};
661	/// # use tokio_fastcgi::{Requests, RequestResult};
662	/// # #[tokio::main]
663	/// # async fn main() {
664	/// # let mut requests = Requests::new(empty(), sink(), 1, 1);
665	/// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
666	/// request.process(|request| async move {
667	///   if let Some(params) = request.params_iter() {
668	///     // Output a list of all parameters
669	///     for param in params {
670	///       println!("{}: {:?}", param.0, param.1);
671	///     }
672	///   }
673	///
674	///   RequestResult::Complete(0)
675	/// });
676	/// # } }
677	/// ```
678	pub fn params_iter(&self) -> Option<Box<ParamsIterator>> {
679		if self.params_done {
680			Some(Box::new(self.params.iter().map(|v| {
681				(v.0.as_str(), &v.1[..])
682			})))
683		} else {
684			None
685		}
686	}
687
688	/// Returns an iterator over all parameters that tries to convert the parameter
689	/// values into strings.
690	///
691	/// The parameter value is an [Option] containing a [String] reference.
692	/// If the parameter could not be converted into a string (because it is not valid UTF8)
693	/// the [Option] will be [None](Option::None).
694	///
695	/// ## Example
696	///
697	/// ```rust
698	/// # use tokio::io::{empty, sink};
699	/// # use tokio_fastcgi::{Requests, RequestResult};
700	/// # #[tokio::main]
701	/// # async fn main() {
702	/// # let mut requests = Requests::new(empty(), sink(), 1, 1);
703	/// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
704	/// request.process(|request| async move {
705	///   if let Some(params) = request.str_params_iter() {
706	///     // Output a list of all parameters
707	///     for param in params {
708	///       println!("{}: {}", param.0, param.1.unwrap_or("[Invalid UTF8]"));
709	///     }
710	///   }
711	///
712	///   RequestResult::Complete(0)
713	/// });
714	/// # } }
715	/// ```
716	pub fn str_params_iter(&self) -> Option<Box<StrParamsIterator>> {
717		if self.params_done {
718			Some(Box::new(self.params.iter().map(|v| {
719				(v.0.as_str(), std::str::from_utf8(v.1).ok())
720			})))
721		} else {
722			None
723		}
724	}
725
726	/// Checks if this record is ready for processing by the client application.
727	/// A record is ready if the stdin, the data and the params stream are done (EOF).
728	fn check_ready(&mut self) -> bool {
729		self.get_stdin().is_done() && self.get_data().is_done() && self.params_done
730	}
731
732	/// Updates the state of the Request instance.
733	/// If the Request instance is ready for processing by the client application this method will
734	/// return true.
735	/// Calling update on an already ready request Err(Error::SequenceError) is returned.
736	fn update(&mut self, record: &Record) -> Result<bool, Error> {
737		assert!(record.request_id == self.request_id);
738
739		if self.check_ready() {
740			return Err(Error::SequenceError);
741		}
742
743		if let Category::Std(record_type) = record.record_type {
744			match record_type {
745				StdReqType::BeginRequest => {
746					return Err(Error::SequenceError);
747				},
748
749				StdReqType::Params => {
750					if record.content.is_empty() {
751						self.params_done = true;
752					} else {
753						if self.params_done { warn!("FastCGI: Protocol error. Params received after params stream was marked as done."); }
754
755						Self::add_nv_pairs(&mut self.params, record.get_content(), true)?;
756					}
757				},
758
759				StdReqType::StdIn => {
760					self.get_stdin().append(record.get_content())?;
761				},
762
763				StdReqType::Data => {
764					self.get_data().append(record.get_content())?;
765				}
766			};
767
768			Ok(self.check_ready())
769		} else {
770			Err(Error::SequenceError)
771		}
772	}
773
774	/// Returns the request id of this request.
775	///
776	/// This id is unique within the current connection. It is managed by the
777	/// web-server.
778	pub fn get_request_id(&self) -> RequestId {
779		self.request_id
780	}
781
782	/// Allows the process closure to write to StdOut.
783	///
784	/// Returns an `OutStream` instance that will send `StdOut` records back to
785	/// the web-server.
786	///
787	///
788	/// ## Example
789	///
790	/// ```rust
791	/// # use tokio::io::{empty, sink};
792	/// # use tokio_fastcgi::{Requests, RequestResult};
793	/// # #[tokio::main]
794	/// # async fn main() {
795	/// # let mut requests = Requests::new(empty(), sink(), 1, 1);
796	/// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
797	/// request.process(|request| async move {
798	///   let mut stdout = request.get_stdout();
799	///
800	///   assert!(stdout.write(b"Hello World").await.is_ok());
801	///
802	///   RequestResult::Complete(0)
803	/// });
804	/// # } }
805	/// ```
806	pub fn get_stdout(&self) -> OutStream<W> {
807		OutStream::new(Category::Std(StdRespType::StdOut), self.orw.clone())
808	}
809
810	/// Allows the process closure to write to StdErr.
811	///
812	/// Returns an `OutStream` instance that will send `StdErr` records back to
813	/// the web-server. What is done with the data that is sent to StdErr depends
814	/// on the web-server.
815	///
816	/// ## Example
817	///
818	/// ```rust
819	/// # use tokio::io::{empty, sink, AsyncWriteExt};
820	/// # use tokio_fastcgi::{Requests, RequestResult};
821	/// # #[tokio::main]
822	/// # async fn main() {
823	/// # let mut requests = Requests::new(empty(), sink(), 1, 1);
824	/// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
825	/// request.process(|request| async move {
826	///   let mut stderr = request.get_stderr();
827	///
828	///   assert!(stderr.write(b"Hello World").await.is_ok());
829	///
830	///   RequestResult::Complete(0)
831	/// });
832	/// # } }
833	/// ```
834	pub fn get_stderr(&self) -> OutStream<W> {
835		OutStream::new(Category::Std(StdRespType::StdErr), self.orw.clone())
836	}
837
838	/// Allows the process closure to read from StdIn.
839	///
840	/// Returns an `InStream` instance that will read the data passed as StdIn
841	/// by the web-server.
842	///
843	/// ## Example
844	///
845	/// ```rust
846	/// # use tokio::io::{empty, sink};
847	/// # use std::io::Read;
848	/// # use tokio_fastcgi::{Requests, RequestResult};
849	/// # #[tokio::main]
850	/// # async fn main() {
851	/// # let mut requests = Requests::new(empty(), sink(), 1, 1);
852	/// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
853	/// request.process(|request| async move {
854	///   let mut stdin = request.get_stdin();
855	///
856	///   let mut buffer = Vec::with_capacity(10);
857	///   assert!(stdin.read(&mut buffer).is_ok());
858	///
859	///   assert_eq!(buffer.len(), 10);
860	///
861	///   RequestResult::Complete(0)
862	/// });
863	/// # } }
864	/// ```
865	pub fn get_stdin(&self) -> OwnedInStream {
866		self.stdin.try_lock().expect(ERR_LOCK_FAILED)
867	}
868
869	/// Allows the process closure to read from the Data stream.
870	///
871	/// Returns an `InStream` instance that will read the data passed as a Data
872	/// stream by the web-server.
873	///
874	/// ## Example
875	///
876	/// ```rust
877	/// # use tokio::io::{empty, sink};
878	/// # use std::io::Read;
879	/// # use tokio_fastcgi::{Requests, RequestResult};
880	/// # #[tokio::main]
881	/// # async fn main() {
882	/// # let mut requests = Requests::new(empty(), sink(), 1, 1);
883	/// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
884	/// request.process(|request| async move {
885	///   let mut data = request.get_data();
886	///
887	///   let mut buffer = Vec::with_capacity(10);
888	///   assert!(data.read(&mut buffer).is_ok());
889	///
890	///   assert_eq!(buffer.len(), 10);
891	///
892	///   RequestResult::Complete(0)
893	/// });
894	/// # } }
895	/// ```
896	pub fn get_data(&self) -> OwnedInStream {
897		self.data.try_lock().expect(ERR_LOCK_FAILED)
898	}
899
900	/// Processes a FastCGI request.
901	///
902	/// As soon as a request is completely received it is returned by
903	/// [`Requests::next`]. Calling `process` on this request allows the request
904	/// to be processed. The application logic is passed to `process` via a
905	/// callback function.
906	///
907	/// The callback function gets a reference to the [`Request`] instance that
908	/// contains all necessary information (input-/output-streams, parameters,
909	/// etc.) for processing the request.
910	///
911	/// See the examples directory for a complete example for using this function.
912	///
913	/// ## Callback function
914	///
915	/// The callback function can access all information about the request via
916	/// the passed `request` parameter. The return value can be one of the
917	/// following values:
918	///
919	/// - [`RequestResult::Complete`]
920	/// - [`RequestResult::Overloaded`]
921	/// - [`RequestResult::UnknownRole`]
922	///
923	/// ## Example
924	///
925	/// ```rust
926	/// # use tokio::io::{empty, sink};
927	/// # use std::io::Read;
928	/// # use tokio_fastcgi::{Requests, RequestResult};
929	/// # #[tokio::main]
930	/// # async fn main() {
931	/// # let instream = empty();
932	/// # let outstream = sink();
933	/// let mut requests = Requests::new(instream, outstream, 1, 1);
934	///
935	/// while let Some(request) = requests.next().await.expect("Request could not be constructed.") {
936	///   request.process(|request| async move {
937	///
938	///     // Process request
939	///
940	///     RequestResult::Complete(0)
941	///   });
942	/// }
943	/// # }
944	/// ```
945	pub async fn process<F: Future<Output = RequestResult>, C: FnOnce(Arc<Self>) -> F>(self, callback: C) -> Result<(), Error> {
946		let rc_self = Arc::from(self);
947
948		let result = callback(rc_self.clone()).await;
949
950		if let Ok(this) = Arc::try_unwrap(rc_self) {
951			this.get_stdout().close().await?;
952			this.get_stderr().close().await?;
953
954			this.orw.write_finish(result).await?;
955		} else {
956			panic!("StdErr or StdOut leaked out of process.")
957		}
958
959		Ok(())
960	}
961}
962
963/// Processes records form an input and output stream.
964///
965/// FastCGI allow multiple requests to be interleaved within one data-stream.
966/// This struct reads the FastCGI-records from an input stream and assembles
967/// them into a [`Request`].
968///
969/// *Beware*: Requests are built in memory. Having huge requests can eat up all
970/// of your systems memory.
971pub struct Requests <R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> {
972	reader: R,
973	writer: Arc<Mutex<W>>,
974	requests: HashMap<RequestId, Request<W>>,
975	close_on_next: bool,
976	max_conns: u8,
977	max_reqs: u8
978}
979
980impl <'w, R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Requests<R, W> {
981	/// Creates a new [`Requests`] instance.
982	///
983	/// As soon as a new connection is accepted the read and write parts of this
984	/// connection should be passed to this function. It will create a new
985	/// [`Requests`] instance that will handle the communication between the
986	/// web-server and the FastCGI application.
987	///
988	/// In addition to the read and write side of the connection two more
989	/// parameters must be passed:
990	///
991	/// - max_conns \
992	///   Maximum number of concurrent connections. This value will be returned
993	///   to the web-server to allow it to adjust its connection handling.
994	/// - max_reqs \
995	///   Maximum number of concurrent requests. Concurrent requests are
996	///   handled by tokio-fastcgi but they consume memory. This value is used
997	///   to tell the web-server how many concurrent requests he can use per
998	///   connection.
999	pub fn new(rd: R, wr: W, max_conns: u8, max_reqs: u8) -> Self {
1000		Self {
1001			requests: HashMap::with_capacity(1),
1002			reader: rd,
1003			writer: Arc::from(Mutex::from(wr)),
1004			close_on_next: false,
1005			max_conns,
1006			max_reqs
1007		}
1008	}
1009
1010	/// Same as [`new`](Requests::new) but takes a tuple containing the read and write
1011	/// side of the socket instead of two distinct variables
1012	///
1013	/// This is more convenient in combination with the `split` function.
1014	///
1015	/// # Example
1016	///
1017	/// ```rust
1018	/// # use tokio::net::TcpListener;
1019	/// # use tokio_fastcgi::{Requests, RequestResult};
1020	/// # #[tokio::main]
1021	/// # async fn main() {
1022	/// # let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1023	/// # let server = async move {
1024	/// if let Ok(mut socket) = listener.accept().await {
1025	///   tokio::spawn(async move {
1026	///   // Directly use the result of the split() call to construct a Requests instance
1027	///     let mut requests = Requests::from_split_socket(socket.0.split(), 10, 10);
1028	///
1029	///     // Process the requests
1030	///   });
1031	/// }
1032	/// # }; }
1033	/// ```
1034	pub fn from_split_socket(split_socket: (R, W), max_conns: u8, max_reqs: u8) -> Self {
1035		Self::new(split_socket.0, split_socket.1, max_conns, max_reqs)
1036	}
1037
1038	/// Processes and answers system records.
1039	/// If this method returns Error::Canceled an FCGI_END_REQUEST was already sent. Just discard the Request instance
1040	/// assigned for this connection.
1041	async fn process_sys(&self, record: Record) -> Result<Option<RequestId>, Error> {
1042		if let Category::Sys(record_type) = record.record_type {
1043			let output_stream = OutRecordWriter::new(self.writer.clone(), record.request_id);
1044
1045			let result = match record_type {
1046				SysReqType::GetValues => {
1047					let mut params = HashMap::new();
1048
1049					//TODO: Is this function correctly placed in request?
1050					Request::<W>::add_nv_pairs(&mut params, record.get_content(), false)?;
1051
1052					// If we're testing this library we have to make sure that the output is sorted.
1053					// Otherwise the binary compare of the produced FastCGI response is not stable.
1054					// For production we will not do this, because it is an unnecessary performance bottleneck.
1055					#[cfg(debug_assertions)]
1056					let mut params: Vec<(String, _)> = params.into_iter().collect();
1057					#[cfg(debug_assertions)]
1058					params.sort_by(|a, b| { a.0.cmp(&b.0) });
1059
1060					// Construct a vector containing the known parameters.
1061					// All other parameters are simply ignored.
1062					let mut output = Vec::with_capacity(128);
1063					for (name, _) in params {
1064						let result = match &*name {
1065							"FCGI_MAX_CONNS" => Some(self.max_conns),
1066							"FCGI_MAX_REQS" => Some(self.max_reqs),
1067							"FCGI_MPXS_CONNS" => Some(1),
1068							_ => None
1069						};
1070
1071						if let Some(result) = result {
1072							let result_str = result.to_string();
1073							// We can get away with casting here because we know that the names and values can not get longer than what fits into 7 Bits of an u8.
1074							Write::write_all(&mut output, &[name.len() as u8])?;
1075							Write::write_all(&mut output, &[result_str.len() as u8])?;
1076
1077							Write::write_all(&mut output, name.as_bytes())?;
1078							Write::write_all(&mut output, result_str.as_bytes())?;
1079						}
1080					}
1081
1082					output_stream.write_data(Category::Sys(SysRespType::GetValuesResult), &output[..]).await?;
1083
1084					Ok(None)
1085				},
1086				SysReqType::AbortRequest => {
1087					output_stream.write_finish(RequestResult::Complete(0)).await?;
1088
1089					Ok(Some(record.get_request_id()))
1090				}
1091			};
1092
1093			output_stream.flush().await?;
1094
1095			result
1096		} else {
1097			panic!("process_sys called with non sys record.");
1098		}
1099	}
1100
1101	/// Fetches the next request from this connection
1102	///
1103	/// This function asynchronously fetches FastCGI records and assembles them
1104	/// into requests. It does the de-interlacing to allow multiple requests to
1105	/// be processed in parallel. As soon as the information for a request is
1106	/// complete, it returns a [`Request`] instance for further processing.
1107	///
1108	/// This function will do the book keeping and process system requests like
1109	/// `FCGI_GET_VALUES` or `FCGI_ABORT_REQUEST`.
1110	pub async fn next(&mut self) -> Result<Option<Request<W>>, Error> {
1111		if self.close_on_next {
1112			if !self.requests.is_empty() {
1113				warn!("FastCGI: The web-server interleaved requests on this connection but did not use the FCGI_KEEP_CONN flag. {} requests will get lost.", self.requests.len());
1114			}
1115
1116			// Signal to the user that this connection should be closed.
1117			Ok(None)
1118		} else {
1119			loop
1120			{
1121				match Record::new(&mut self.reader).await {
1122					// Success, a new record hast to be added to its request...
1123					Ok(record) => {
1124						if record.is_sys_record() {
1125							if let Some(canceled_request_id) = self.process_sys(record).await? {
1126								// The request got canceled. Remove it from the list
1127								self.requests.remove(&canceled_request_id);
1128							}
1129						} else {
1130							let request_ready = match self.requests.entry(record.get_request_id()) {
1131								Entry::Occupied(mut e) => { e.get_mut().update(&record) },
1132								Entry::Vacant(e) => { e.insert(Request::new(&record, self.writer.clone())?); Ok(false) }
1133							}?;
1134
1135							if request_ready {
1136								let request = self.requests.remove(&record.get_request_id()).unwrap();
1137
1138								// Store if we should close the connection after handling this request.
1139								self.close_on_next = !request.keep_connection;
1140
1141								// Calling unwrap here is ok because we made sure there is an object for this id.
1142								return Ok(Some(request));
1143							}
1144						}
1145					},
1146					// IoError UnexpectedEof: May be ok or an error. Depends on if requests have been processed.
1147					Err(Error::IoError(err)) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
1148						// An I/O-error signals the end of the stream. On record construction this is ok as long
1149						// as there are no other requests in flight.
1150						if self.requests.is_empty() {
1151							return Ok(None)
1152						} else {
1153							return Err(Error::from(err));
1154						}
1155					},
1156					// UnknownRecordType must be transmitted back to the server. This is not a fatal error.
1157					Err(Error::UnknownRecordType(request_id, type_id)) => {
1158						let output_stream = OutRecordWriter::new(self.writer.clone(), request_id);
1159						output_stream.write_unkown_type(type_id).await?;
1160					},
1161					// An error occurred, exit the loop and return it...
1162					Err(err) => {
1163						return Err(err);
1164					}
1165				}
1166			}
1167		}
1168	}
1169}
1170
1171// Generate nicer debug output for Request. This is useful if you look at the request
1172// from within the `process` function.
1173impl <W: AsyncWrite + Unpin> Debug for Request<W> {
1174	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1175		write!(f, "Request {{ request_id: {}, keep_connection: {:?}, stdin: {:?}, data: {:?}, params: {{", self.request_id, self.keep_connection, self.stdin, self.data)?;
1176
1177		for (param_index, param_key) in self.params.keys().enumerate() {
1178			let delimiter = if param_index > 0 { ", " } else { "" };
1179
1180			if let Some(str_value) = self.get_str_param(param_key) {
1181				write!(f, "{}{}: \"{}\"", delimiter, param_key, str_value)?;
1182			} else {
1183				write!(f, "{}{}: {:?}", delimiter, param_key, self.get_param(param_key))?;
1184			}
1185		}
1186
1187		writeln!(f, "}}")
1188	}
1189}
1190
1191/// Sends output records to the web-server.
1192#[derive(Debug)]
1193struct OutRecordWriter<W: AsyncWrite> {
1194	inner_stream: Arc<Mutex<W>>,
1195	request_id: RequestId,
1196}
1197
1198impl <W: AsyncWrite + Unpin> OutRecordWriter<W> {
1199	fn new(inner_stream: Arc<Mutex<W>>, request_id: RequestId) -> Self {
1200		Self {
1201			inner_stream,
1202			request_id
1203		}
1204	}
1205
1206	async fn write_data(&self, record_type: ResponseType, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
1207		trace!("FastCGI: Out record {{T:{:?}, ID: {}, L:{}}}", record_type, self.request_id, RECORD_HEADER_SIZE + data.len());
1208
1209		// Construct the header
1210		// We use unwrap here because we're writing to a vec. This must never fail.
1211		let mut message_header = Vec::with_capacity(8);
1212		byteorder::WriteBytesExt::write_u8(&mut message_header, 1).unwrap();                                // Version
1213		byteorder::WriteBytesExt::write_u8(&mut message_header, record_type.into()).unwrap();               // Record Type
1214		byteorder::WriteBytesExt::write_u16::<BigEndian>(&mut message_header, self.request_id).unwrap();    // Request ID
1215		byteorder::WriteBytesExt::write_u16::<BigEndian>(&mut message_header, data.len() as u16).unwrap();  // Content length
1216		byteorder::WriteBytesExt::write_u8(&mut message_header, 0).unwrap();                                // No padding.
1217		byteorder::WriteBytesExt::write_u8(&mut message_header, 0).unwrap();                                // Reserved
1218
1219		// Aquire the mutext guard to prevent the header and the payload to pe torn apart.
1220		let mut is = self.inner_stream.try_lock().expect(ERR_LOCK_FAILED);
1221
1222		// Write the messge header
1223		is.write_all_buf(&mut Cursor::new(message_header)).await?;
1224
1225		// Write the data
1226		// Writing empty data blocks breaks tokio-test. Therefore we only call write if the data-buffer is not empty.
1227		if !data.is_empty() {
1228			is.write_all_buf(&mut Cursor::new(data)).await?;
1229			Ok(data.len())
1230		} else {
1231			Ok(0)
1232		}
1233
1234		// Write no padding
1235	}
1236
1237	/// Sends an `EndRequest` response to the web-server and ends the current
1238	/// request.
1239	async fn write_finish(&self, result: RequestResult) -> Result<(), std::io::Error> {
1240		let mut end_message = Vec::with_capacity(8);
1241
1242		// Unwrap is safe here because we're writing to an in memory buffer. This must never fail.
1243		byteorder::WriteBytesExt::write_u32::<BigEndian>(&mut end_message, result.app_status()).unwrap();
1244		byteorder::WriteBytesExt::write_u8(&mut end_message, result.into()).unwrap();
1245		// Write 3 reserved bytes
1246		std::io::Write::write_all(&mut end_message, &[0u8; 3]).unwrap();
1247
1248		self.write_data(Category::Std(StdRespType::EndRequest), &end_message[..]).await?;
1249
1250		Ok(())
1251	}
1252
1253	/// Sends an `UnknownType` response to the web-server.
1254	async fn write_unkown_type(&self, type_id: u8) -> Result<(), std::io::Error> {
1255		let mut ut_message = Vec::with_capacity(8);
1256
1257		// Unwrap is safe here because we're writing to an in memory buffer. This must never fail.
1258		byteorder::WriteBytesExt::write_u8(&mut ut_message, type_id).unwrap();
1259		// Write 7 reserved bytes
1260		std::io::Write::write_all(&mut ut_message, &[0u8; 7]).unwrap();
1261
1262		self.write_data(ResponseType::Sys(SysRespType::UnknownType), &ut_message[..]).await?;
1263
1264		Ok(())
1265	}
1266
1267	async fn flush(&self) -> std::result::Result<(), std::io::Error> {
1268		self.inner_stream.try_lock().expect(ERR_LOCK_FAILED).flush().await
1269	}
1270}
1271
1272/// Implements a data stream from the FastCGI application to the web-server.
1273///
1274/// The maximum chunk size is 64k. The calls made by this
1275/// interface may block if the web-server is not receiving the data fast enough.
1276/// Therefore all calls are implemented as async functions.
1277pub struct OutStream<W: AsyncWrite + Unpin> {
1278	orw: Arc<OutRecordWriter<W>>,
1279	record_type: ResponseType,
1280	closed: bool
1281}
1282
1283impl <W: AsyncWrite + Unpin> OutStream<W> {
1284	fn new(record_type: ResponseType, orw: Arc<OutRecordWriter<W>>) -> Self {
1285		Self {
1286			orw,
1287			record_type,
1288			closed: false
1289		}
1290	}
1291
1292	/// Send data to the web-server.
1293	///
1294	/// If the data is bigger than 64k the transfer is automatically split into
1295	/// chunks of 64k.
1296	/// If the stream is already closed, the function will always return
1297	/// [`StreamAlreadyClosed`](Error::StreamAlreadyClosed).
1298	pub async fn write(&mut self, data: &[u8]) -> std::result::Result<usize, Error> {
1299		if self.closed {
1300			return Err(Error::StreamAlreadyClosed);
1301		}
1302
1303		// Check if the data can be transmitted in one chunk.
1304		// If not, split the data in chunks of u16 - 1 size.
1305		if data.len() < u16::max_value() as usize {
1306			Ok(self.orw.write_data(self.record_type, data).await?)
1307		} else {
1308			// Transmit large streams in junks of 64k
1309			const JUNK_SIZE: usize = (u16::max_value() - 1) as usize;
1310			for offset in (0..data.len()).step_by(JUNK_SIZE) {
1311				self.orw.write_data(self.record_type, &data[offset..(offset + JUNK_SIZE).min(data.len())]).await?;
1312			}
1313
1314			Ok(data.len())
1315		}
1316	}
1317
1318	/// Flushes the data to the web-server immediately.
1319	///
1320	/// This function also calls flush on the underlying stream.
1321	pub async fn flush(&self) -> std::result::Result<(), std::io::Error> {
1322		self.orw.flush().await
1323	}
1324
1325	/// Closes the output stream
1326	///
1327	/// FastCGI closes a stream by sending an empty packet. After calling this
1328	/// method, further calls to [`write`] will fail with
1329	/// [`StreamAlreadyClosed`](Error::StreamAlreadyClosed).
1330	async fn close(&mut self) -> Result<(), Error>{
1331		// Send an empty record to close the stream.
1332		self.write(&[0u8; 0]).await?;
1333
1334		self.flush().await?;
1335
1336		// Now mark this stream as closed. Do not do it any earlier, because
1337		// we need to call write on the stream to close it.
1338		self.closed = true;
1339
1340		Ok(())
1341	}
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346	use super::*;
1347	use tokio_test::io::Builder;
1348
1349	fn is_send<T: Send>(_: T) { }
1350
1351	/// Verify that the future created by process is Send to allow using it
1352	/// with Tokio.
1353	#[test]
1354	fn check_send() {
1355		is_send(async move {
1356			let mut requests = Requests::new(Builder::new().build(), Builder::new().build(), 10, 10);
1357
1358			is_send(&requests);
1359
1360			while let Ok(Some(request)) = requests.next().await {
1361				request.process(|_request| async move {
1362					RequestResult::Complete(0)
1363				}).await.unwrap();
1364			}
1365		});
1366	}
1367}