tokio_fastcgi/lib.rs
1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
3//! # Basic Example
4//! ```no_run
5#![doc = include_str!("../examples/simple.rs")]
6//! ```
7use log::{trace, warn};
8use std::fmt::Debug;
9use std::marker::Unpin;
10use std::io::{Cursor, Read, Write};
11use std::collections::{HashMap, hash_map::Entry};
12use std::sync::Arc;
13use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
14use tokio::sync::{Mutex, MutexGuard};
15use std::convert::TryFrom;
16use byteorder::{BigEndian, ReadBytesExt};
17use std::future::Future;
18
19/// The size of the record header is 8 bytes.
20const RECORD_HEADER_SIZE: usize = 8;
21
22/// Flag for FCGI_BeginRequestBody
23const FCGI_KEEP_CONN: u8 = 0x01;
24
25/// Static panic message for a failed lock.
26const ERR_LOCK_FAILED: &str = "A request must not be processed by multiple threads.";
27
28/// The type of the request id. This is always u16 but makes external code more readable.
29type RequestId = u16;
30
31/// Types for the parameter iterator
32type ParamsIterator<'i> = dyn Iterator<Item=(&'i str, &'i [u8])> + 'i;
33
34/// Types for the parameter iterator with string conversion
35type StrParamsIterator<'i> = dyn Iterator<Item=(&'i str, Option<&'i str>)> + 'i;
36
37/// Type returned by [`get_stdin`](Request::get_stdin) and [`get_data`](Request::get_data).
38/// It makes passing around the streams easier.
39pub type OwnedInStream<'a> = MutexGuard<'a, InStream>;
40
41/// Error type for TryFrom on StdReqType and SysReqType
42#[derive(Debug)]
43enum TypeError {
44 UnknownRecordType(u8)
45}
46
47/// Enum containing all request record types that can be handled by Request::Process.
48#[derive(Clone, Copy, Debug, PartialEq)]
49enum StdReqType {
50 BeginRequest = 1,
51 Params = 4,
52 StdIn = 5,
53 Data = 8
54}
55
56impl From<StdReqType> for u8 {
57 fn from(rt: StdReqType) -> Self {
58 rt as u8
59 }
60}
61
62impl TryFrom<u8> for StdReqType {
63 type Error = TypeError;
64 fn try_from(value: u8) -> Result<Self, Self::Error> {
65 match value {
66 1 => Ok(Self::BeginRequest),
67 4 => Ok(Self::Params),
68 5 => Ok(Self::StdIn),
69 8 => Ok(Self::Data),
70 _ => Err(TypeError::UnknownRecordType(value))
71 }
72 }
73}
74
75/// Enum containing all response types generated by Request::Process.
76#[derive(Clone, Copy, Debug, PartialEq)]
77enum StdRespType {
78 EndRequest = 3,
79 StdOut = 6,
80 StdErr = 7
81}
82
83impl From<StdRespType> for u8 {
84 fn from(rt: StdRespType) -> Self {
85 rt as u8
86 }
87}
88
89/// Enum containing all request record types that must be handled by Request::process_sys.
90#[derive(Clone, Copy, Debug, PartialEq)]
91enum SysReqType {
92 AbortRequest = 2,
93 GetValues = 9
94}
95
96impl From<SysReqType> for u8 {
97 fn from(rt: SysReqType) -> Self {
98 rt as u8
99 }
100}
101
102impl TryFrom<u8> for SysReqType {
103 type Error = TypeError;
104 fn try_from(value: u8) -> Result<Self, Self::Error> {
105 match value {
106 2 => Ok(Self::AbortRequest),
107 9 => Ok(Self::GetValues),
108 _ => Err(TypeError::UnknownRecordType(value))
109 }
110 }
111}
112
113/// Enum containing all response record types that can be generated by Request::process_sys.
114#[derive(Clone, Copy, Debug, PartialEq)]
115enum SysRespType {
116 GetValuesResult = 10,
117 UnknownType = 11
118}
119
120impl From<SysRespType> for u8 {
121 fn from(rt: SysRespType) -> Self {
122 rt as u8
123 }
124}
125
126/// Container for std and sys request and response types.
127#[derive(Clone, Copy, Debug)]
128enum Category<S: Copy, T: Copy> {
129 Std(S),
130 Sys(T)
131}
132
133impl <S: Copy + TryFrom<u8, Error = TypeError>, T: Copy + TryFrom<u8, Error = TypeError>> TryFrom<u8> for Category<S, T> {
134 type Error = TypeError;
135 fn try_from(value: u8) -> Result<Self, Self::Error> {
136 if let Ok(result) = S::try_from(value) {
137 Ok(Self::Std(result))
138 } else {
139 T::try_from(value).map(Self::Sys)
140 }
141 }
142}
143
144impl <S: std::convert::Into<u8> + Copy, T: std::convert::Into<u8> + Copy> From<Category<S, T>> for u8 {
145 fn from(cat: Category<S, T>) -> Self {
146 match cat {
147 Category::<S, T>::Std(std) => std.into(),
148 Category::<S, T>::Sys(sys) => sys.into()
149 }
150 }
151}
152
153/// Type for all known request record types
154type RequestType = Category<StdReqType, SysReqType>;
155
156/// Type for all known response record types
157type ResponseType = Category<StdRespType, SysRespType>;
158
159/// Enum containing the role that is requested from the FastCGI client. See the different
160/// variants for a description of the roles and their input and output streams.
161#[derive(PartialEq, Debug)]
162pub enum Role {
163 /// A FastCGI responder receives all the information associated with an HTTP
164 /// request and generates an HTTP response. A responder receives the following
165 /// information from the web-server:
166 ///
167 /// * Environment variables (see [`get_param`](Request::get_param)/[`get_str_param`](Request::get_str_param))
168 /// * StdIn (see [`get_stdin`](Request::get_stdin))
169 ///
170 /// A responder has the following communication channels at its disposal:
171 /// * Result code (see [`RequestResult`])
172 /// * StdOut (see [`get_stdout`](Request::get_stdout))
173 /// * StdErr (see [`get_stderr`](Request::get_stderr))
174 ///
175 /// see the [FastCGI specification](https://fastcgi-archives.github.io/FastCGI_Specification.html#S6.2) for more Information
176 Responder,
177
178 /// A FastCGI authorizer receives all the information associated with an HTTP
179 /// request and generates an authorized/unauthorized decision. In case of an
180 /// authorized decision the authorizer can also associate name-value pairs with
181 /// the HTTP request. A responder receives the following information from the
182 /// web-server:
183 ///
184 /// * Environment variables and request parameters (see
185 /// [`get_param`](Request::get_param)/[`get_str_param`](Request::get_str_param))
186 ///
187 /// An authorizer has the following communication channels at its disposal:
188 ///
189 /// * Result code (see [`RequestResult`])
190 /// * StdOut (see [`get_stdout`](Request::get_stdout))
191 /// * StdErr (see [`get_stderr`](Request::get_stderr))
192 ///
193 /// see the [FastCGI specification](https://fastcgi-archives.github.io/FastCGI_Specification.html#S6.3) for more Information
194 Authorizer,
195
196 /// A FastCGI filter receives all the information associated with an HTTP
197 /// request, plus an extra stream of data from a file stored on the Web server,
198 /// and generates a “filtered” version of the data stream as an HTTP response. A
199 /// filter receives the following information from the web-server:
200 ///
201 /// * Environment variables, request parameters and additional information
202 /// (`FCGI_DATA_LAST_MOD` and `FCGI_DATA_LENGTH`) (see
203 /// [`get_param`](Request::get_param)/[`get_str_param`](Request::get_str_param))
204 /// * StdIn (see [`get_stdin`](Request::get_stdin))
205 /// * File Data from the web-server (see [`get_data`](Request::get_data))
206 ///
207 /// A filter has the following communication channels at its disposal:
208 ///
209 /// * Result code (see [`RequestResult`])
210 /// * StdOut (see [`get_stdout`](Request::get_stdout))
211 /// * StdErr (see [`get_stderr`](Request::get_stderr))
212 ///
213 /// see the [FastCGI specification](https://fastcgi-archives.github.io/FastCGI_Specification.html#S6.4) for more Information
214 Filter
215}
216
217impl Role {
218 fn from_number(rl_num: u16) -> Option<Self> {
219 match rl_num {
220 1 => Some(Role::Responder),
221 2 => Some(Role::Authorizer),
222 3 => Some(Role::Filter),
223 _ => None
224 }
225 }
226}
227
228
229/// The result of a FastCGI request.
230///
231/// This enum is returned by the [`process`](Request::process) method of the
232///[`Request`] struct. The meaning of the values is defined by the FastCGI
233/// specification.
234#[derive(Copy, Clone)]
235pub enum RequestResult {
236 /// The request completed successfully. The returned status value is defined by
237 /// the [role](Role) of the FastCGI application.
238 ///
239 /// # Result codes
240 ///
241 /// The application returns the status code that the CGI program would have
242 /// returned via the `exit` system call.
243 ///
244 Complete(u32),
245 /// The application ran out of resources (for example database connections). The
246 /// request is rejected.
247 Overloaded,
248 /// The application is not prepared to handle the role requested by the
249 /// web-server. For example if a FastCGI responder is called as a filter or an
250 /// authorizer.
251 UnknownRole
252}
253
254impl RequestResult {
255 fn app_status(self) -> u32 {
256 match self {
257 Self::Complete(app_status) => app_status,
258 _ => 0
259 }
260 }
261}
262
263impl From<RequestResult> for u8 {
264 /// Allow the RequestResult to be converted into a u8.
265 /// This method returns the magic number that must be used as the
266 /// result field of the FastCGI protocol.
267 fn from(rr: RequestResult) -> Self {
268 match rr {
269 RequestResult::Complete(_) => 0,
270 RequestResult::Overloaded => 2,
271 RequestResult::UnknownRole => 3
272 }
273 }
274}
275
276/// Errors that can be returned by calls to [`process`](Request::process).
277#[derive(Debug)]
278pub enum Error {
279 /// The input stream was already closed and can not be reused. This indicates
280 /// an error within the call sequence, like calling `process` twice or the
281 /// web-server sending more data after closing `StdIn` or `Data`.
282 StreamAlreadyDone,
283
284 /// `write` was called on an output stream that was already closed by a call to
285 /// `close`.
286 StreamAlreadyClosed,
287
288 /// The web-server violated the FastCGI specification. For example by sending a
289 /// `StdIn` record before sending a `BeginRequest` record.
290 SequenceError,
291
292 /// The record version is not 1. This should never happen since the FastCGI
293 /// specification does not define any other record versions.
294 InvalidRecordVersion,
295
296 /// The web-server sent an unknown role number. This is most likely a bug in the
297 /// FastCGI implementation of the web-server.
298 InvalidRoleNumber,
299
300 /// This error is never returned to the user of the library. It is internally
301 /// handled by the `tokio-fastcgi` crate. The library returns a
302 /// `FCGI_UNKNOWN_TYPE` record to the web-server.
303 UnknownRecordType(RequestId, u8),
304
305 /// An IoError occurred. Most likely the connection to the web-server got lost or
306 /// was interrupted. Some I/O errors are handled by `tokio-fastcgi`. If the
307 /// web-server closes the FastCGI connection after all requests have been
308 /// processed no error is returned and the EOF error is just swallowed.
309 IoError(std::io::Error)
310}
311
312impl std::fmt::Display for Error {
313 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
314 match self {
315 Error::StreamAlreadyDone => write!(f, "Input stream is already done"),
316 Error::StreamAlreadyClosed => write!(f, "Output stream is already closed"),
317 Error::SequenceError => write!(f, "Records out of sequence "),
318 Error::InvalidRecordVersion => write!(f, "Only record version 1 supported"),
319 Error::InvalidRoleNumber => write!(f, "Unkown role pass from server"),
320 Error::UnknownRecordType(request_id, type_id) => write!(f, "Unkown record type {} in request {} received", type_id, request_id),
321 Error::IoError(error) => write!(f, "I/O error: {}", error)
322 }
323 }
324}
325
326impl std::error::Error for Error {
327 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
328 match self {
329 Error::IoError(source) => Some(source),
330 _ => None
331 }
332 }
333}
334
335impl From<std::io::Error> for Error {
336 fn from(io_error: std::io::Error) -> Self {
337 Error::IoError(io_error)
338 }
339}
340
341/// Represents a record received by the web-server.
342struct Record {
343 record_type: RequestType,
344 request_id: RequestId,
345 content: Vec<u8>
346}
347
348impl Record {
349 async fn new<R: AsyncRead + Unpin>(rd: &mut R) -> Result<Self, Error> {
350 let mut header_buffer = [0; RECORD_HEADER_SIZE];
351
352 rd.read_exact(&mut header_buffer).await?;
353
354 let mut header_slice = &header_buffer[..];
355
356 // Check the FastCGI version
357 if byteorder::ReadBytesExt::read_u8(&mut header_slice).unwrap() != 1 {
358 return Err(Error::InvalidRecordVersion);
359 }
360
361 // Parse the remaining header fields
362 // Unwrap the record_type field not yet. An error on the record_type can be handled
363 // and we must read the remaining data to keep the I/O stream in sync.
364 let record_type = RequestType::try_from(byteorder::ReadBytesExt::read_u8(&mut header_slice).unwrap());
365 let request_id = byteorder::ReadBytesExt::read_u16::<BigEndian>(&mut header_slice)?;
366 let content_length = byteorder::ReadBytesExt::read_u16::<BigEndian>(&mut header_slice).unwrap() as usize;
367 let padding_length = byteorder::ReadBytesExt::read_u8(&mut header_slice).unwrap() as u64;
368
369 // Allocate the buffer for the content and read everything asynchronously.
370 // `with_capacity` can not be used, because Tokio does not support this.
371 let mut content = vec![0; content_length];
372 rd.read_exact(&mut content).await?;
373
374 // If there is some padding at the end of the record. Discard it.
375 if padding_length > 0 {
376 tokio::io::copy(&mut rd.take(padding_length), &mut tokio::io::sink()).await?;
377 }
378
379 trace!("FastCGI: In record {{T:{:?}, ID: {}, L:{}}}", record_type, request_id, RECORD_HEADER_SIZE + content.len() + padding_length as usize);
380
381 // Now we unwrap the record_type. If we fail now, the record as been completely read.
382 // Before we can unwrap the TypeError must be translated into a full blown Error::UnknownRecordType value by adding the request_id.
383 let record_type = record_type.map_err(|error| {
384 let TypeError::UnknownRecordType(record_type_nr) = error;
385 Error::UnknownRecordType(request_id, record_type_nr)
386 })?;
387
388 Ok(Self {
389 record_type,
390 request_id,
391 content
392 })
393 }
394
395 /// Checks if this record is a system record. If that's the case Request::update should not be called
396 /// on this one. Just call Request::process_sys to process the system record.
397 /// This method only returns true if the record type can be processed by Request::sys_process and
398 /// the Record is complete.
399 fn is_sys_record(&self) -> bool {
400 matches!(self.record_type, Category::Sys(_))
401 }
402
403 fn get_content(&self) -> &[u8] {
404 &self.content
405 }
406
407 fn get_request_id(&self) -> RequestId {
408 self.request_id
409 }
410}
411
412/// Implements a data stream from the web-server to the FastCGI application.
413///
414/// All data is buffered in memory before being returned to the FastCGI
415/// application. Therefore only a synchronous interface is implemented.
416///
417/// The data of the stream can be accessed via the methods of the [`Read`
418/// trait](Read).
419#[derive(Debug)]
420pub struct InStream {
421 data: Vec<u8>,
422 read_pos: Option<usize>
423}
424
425impl Read for InStream {
426 /// Read implementation for Stream.
427 ///
428 /// *Beware*: Calling read or read_exact on a stream that is not done will panic!
429 fn read(&mut self, out: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
430 let read_pos = self.read_pos.unwrap();
431 let c = std::io::Read::read(&mut &self.data[read_pos..], out)?;
432 self.read_pos = Some(read_pos + c);
433 Ok(c)
434 }
435
436 /// Read_exact implementation for Stream.
437 ///
438 /// *Beware*: Calling read or read_exact on a stream that is not done will panic!
439 fn read_exact(&mut self, out: &mut [u8]) -> std::result::Result<(), std::io::Error> {
440 let read_pos = self.read_pos.unwrap();
441 std::io::Read::read_exact(&mut &self.data[read_pos..], out)?;
442 self.read_pos = Some(read_pos + out.len());
443 Ok(())
444 }
445}
446
447impl InStream{
448 /// Creates a new stream waiting for input.
449 ///
450 /// If `already_done` is set to true the stream is created done and no data can be added to it.
451 fn new(already_done: bool) -> Self {
452 InStream {
453 data: Vec::new(),
454 read_pos: if already_done { Some(0) } else { None }
455 }
456 }
457
458 /// Appends the passed data slice to the internal vector.
459 ///
460 /// If the slice is empty, this is interpreted as an EOF marker and marks this
461 /// stream as done.
462 /// Calling this method on a done stream will always return a StreamAlreadyDone
463 /// error.
464 fn append(&mut self, data: &[u8]) -> Result<(), Error> {
465 if ! data.is_empty() {
466 if self.read_pos.is_none() {
467 self.data.extend_from_slice(data);
468 Ok(())
469 } else {
470 Err(Error::StreamAlreadyDone)
471 }
472 } else {
473 self.read_pos = Some(0);
474 Ok(())
475 }
476 }
477
478 /// Checks if this stream is done (EOF).
479 ///
480 /// This is signaled by the web server by passing an empty record for this stream.
481 fn is_done(&self) -> bool {
482 self.read_pos.is_some()
483 }
484}
485
486/// Represents a FastCGI request that can be handled by the application.
487///
488/// An instance of this struct is returned by the [`next`](Requests::next) function
489/// of the [Requests] struct. It represents one request that should be handled
490/// via FastCGI. Normally [`process`](Request::process) is called on every
491/// instance that is returned. The request gets passed to the callback function
492/// and can be used to get the input/output streams and environment values.
493pub struct Request <W: AsyncWrite + Unpin> {
494 /// Contains the role that this request is requesting from the FastCGI
495 /// application.
496 ///
497 /// If the FastCGI application can not comply to this role the callback
498 /// passed to [`process`](Request::process) should return
499 /// [`RequestResult::UnknownRole`].
500 pub role: Role,
501 keep_connection: bool,
502 request_id: RequestId,
503 params: HashMap<String, Vec<u8>>,
504 params_done: bool,
505 orw: Arc<OutRecordWriter<W>>,
506 stdin: Mutex<InStream>,
507 data: Mutex<InStream>
508}
509
510impl <W: AsyncWrite + Unpin> Request<W> {
511 fn new(record: &Record, writer: Arc<Mutex<W>>) -> Result<Self, Error> {
512 let mut content = record.get_content();
513
514 if let Category::Std(StdReqType::BeginRequest) = record.record_type {
515 if let Some(role) = Role::from_number(byteorder::ReadBytesExt::read_u16::<BigEndian>(&mut content).unwrap()) { //We're reading from am memory buffer. So there is something deeply wrong if this fails.
516 let keep_connection = (byteorder::ReadBytesExt::read_u8(&mut content)? & FCGI_KEEP_CONN) == FCGI_KEEP_CONN;
517
518 Ok(Self {
519 params: HashMap::new(),
520 params_done: false,
521 orw: Arc::from(OutRecordWriter::new(writer, record.request_id)),
522 stdin: Mutex::from(InStream::new(role == Role::Authorizer)), // Authorizers do not get an stdin stream
523 data: Mutex::from(InStream::new(role != Role::Filter)), // Only filters get a data stream
524 role,
525 keep_connection,
526 request_id: record.request_id
527 })
528 } else {
529 Err(Error::InvalidRoleNumber)
530 }
531 } else {
532 Err(Error::SequenceError)
533 }
534 }
535
536 fn read_length<T: Read>(src: &mut T) -> Result<u32, std::io::Error> {
537 let length: u32 = u32::from(src.read_u8()?);
538
539 if length & 0x80 == 0 {
540 Ok(length)
541 } else {
542 let length_byte2 = u32::from(src.read_u8()?);
543 let length_byte10 = u32::from(src.read_u16::<BigEndian>()?);
544
545 Ok((length & 0x7f) << 24 | length_byte2 << 16 | length_byte10)
546 }
547 }
548
549 fn add_nv_pairs(params: &mut HashMap<String, Vec<u8>>, src: &[u8], lowercase_keys: bool) -> Result<(), std::io::Error>{
550 let mut src_slice = src;
551
552 while !src_slice.is_empty() {
553 let name_length = Request::<W>::read_length(&mut src_slice)?;
554 let value_length = Request::<W>::read_length(&mut src_slice)?;
555
556 let mut name_buffer = vec![0; name_length as usize];
557 let mut value_buffer = vec![0; value_length as usize];
558
559 std::io::Read::read_exact(&mut src_slice, &mut name_buffer)?;
560 std::io::Read::read_exact(&mut src_slice, &mut value_buffer)?;
561
562 let key = String::from_utf8_lossy(&name_buffer);
563 let key = if lowercase_keys {
564 key.to_ascii_lowercase()
565 } else {
566 key.into_owned()
567 };
568
569 trace!("FastCGI: NV-Pair[\"{}\"]=\"{}\"", key, String::from_utf8_lossy(&value_buffer));
570
571 params.insert(key, value_buffer);
572 }
573
574 Ok(())
575 }
576
577 /// Returns the parameter with the given name as a byte vector.
578 ///
579 /// Parameters are passed to the FastCGI application as name value pairs.
580 /// Parameters can contain environment variables or other parameters that
581 /// the web-server wants to pass to the application.
582 ///
583 /// If the parameter does not exist `None` is returned.
584 ///
585 /// ## Example
586 ///
587 /// ```rust
588 /// # use tokio::io::{empty, sink};
589 /// # use tokio_fastcgi::{Requests, RequestResult};
590 /// # #[tokio::main]
591 /// # async fn main() {
592 /// # let mut requests = Requests::new(empty(), sink(), 1, 1);
593 /// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
594 /// request.process(|request| async move {
595 /// if let Some(binary_data) = request.get_param("BINARY_DATA") {
596 /// assert_eq!(binary_data, &[10, 20, 30]);
597 /// }
598 ///
599 /// RequestResult::Complete(0)
600 /// });
601 /// # } }
602 /// ```
603 pub fn get_param(&self, name: &str) -> Option<&Vec<u8>> {
604 if self.params_done {
605 self.params.get(&name.to_ascii_lowercase())
606 } else {
607 None
608 }
609 }
610
611 /// Returns the parameter with the given name as a UTF-8 string.
612 ///
613 /// Parameters are passed to the FastCGI application as name value pairs.
614 /// Parameters can contain environment variables or other parameters that
615 /// the web-server wants to pass to the application.
616 ///
617 /// If the parameter does not exist or is not valid UTF-8 `None` is returned.
618 ///
619 /// ## Example
620 ///
621 /// ```rust
622 /// # use tokio::io::{empty, sink};
623 /// # use tokio_fastcgi::{Requests, RequestResult};
624 /// # #[tokio::main]
625 /// # async fn main() {
626 /// # let mut requests = Requests::new(empty(), sink(), 1, 1);
627 /// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
628 /// request.process(|request| async move {
629 /// if let Some(uri) = request.get_str_param("REQUEST_URI") {
630 /// assert_eq!(uri, "/index.html");
631 /// }
632 ///
633 /// RequestResult::Complete(0)
634 /// });
635 /// # } }
636 /// ```
637 pub fn get_str_param(&self, name: &str) -> Option<&str> {
638 if self.params_done {
639 match self.params.get(&name.to_ascii_lowercase()).map(|v| std::str::from_utf8(v)) {
640 None => None,
641 Some(Ok(value)) => Some(value),
642 Some(Err(_)) => {
643 warn!("FastCGI: Parameter {} is not valid utf8.", name);
644 None
645 }
646 }
647 } else {
648 None
649 }
650 }
651
652 /// Returns an iterator over all parameters.
653 ///
654 /// The parameter value is a [u8] slice containing the raw data for the parameter.
655 /// If you need the parameter values as string, take a look at [str_params_iter](Request::str_params_iter).
656 ///
657 /// ## Example
658 ///
659 /// ```rust
660 /// # use tokio::io::{empty, sink};
661 /// # use tokio_fastcgi::{Requests, RequestResult};
662 /// # #[tokio::main]
663 /// # async fn main() {
664 /// # let mut requests = Requests::new(empty(), sink(), 1, 1);
665 /// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
666 /// request.process(|request| async move {
667 /// if let Some(params) = request.params_iter() {
668 /// // Output a list of all parameters
669 /// for param in params {
670 /// println!("{}: {:?}", param.0, param.1);
671 /// }
672 /// }
673 ///
674 /// RequestResult::Complete(0)
675 /// });
676 /// # } }
677 /// ```
678 pub fn params_iter(&self) -> Option<Box<ParamsIterator>> {
679 if self.params_done {
680 Some(Box::new(self.params.iter().map(|v| {
681 (v.0.as_str(), &v.1[..])
682 })))
683 } else {
684 None
685 }
686 }
687
688 /// Returns an iterator over all parameters that tries to convert the parameter
689 /// values into strings.
690 ///
691 /// The parameter value is an [Option] containing a [String] reference.
692 /// If the parameter could not be converted into a string (because it is not valid UTF8)
693 /// the [Option] will be [None](Option::None).
694 ///
695 /// ## Example
696 ///
697 /// ```rust
698 /// # use tokio::io::{empty, sink};
699 /// # use tokio_fastcgi::{Requests, RequestResult};
700 /// # #[tokio::main]
701 /// # async fn main() {
702 /// # let mut requests = Requests::new(empty(), sink(), 1, 1);
703 /// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
704 /// request.process(|request| async move {
705 /// if let Some(params) = request.str_params_iter() {
706 /// // Output a list of all parameters
707 /// for param in params {
708 /// println!("{}: {}", param.0, param.1.unwrap_or("[Invalid UTF8]"));
709 /// }
710 /// }
711 ///
712 /// RequestResult::Complete(0)
713 /// });
714 /// # } }
715 /// ```
716 pub fn str_params_iter(&self) -> Option<Box<StrParamsIterator>> {
717 if self.params_done {
718 Some(Box::new(self.params.iter().map(|v| {
719 (v.0.as_str(), std::str::from_utf8(v.1).ok())
720 })))
721 } else {
722 None
723 }
724 }
725
726 /// Checks if this record is ready for processing by the client application.
727 /// A record is ready if the stdin, the data and the params stream are done (EOF).
728 fn check_ready(&mut self) -> bool {
729 self.get_stdin().is_done() && self.get_data().is_done() && self.params_done
730 }
731
732 /// Updates the state of the Request instance.
733 /// If the Request instance is ready for processing by the client application this method will
734 /// return true.
735 /// Calling update on an already ready request Err(Error::SequenceError) is returned.
736 fn update(&mut self, record: &Record) -> Result<bool, Error> {
737 assert!(record.request_id == self.request_id);
738
739 if self.check_ready() {
740 return Err(Error::SequenceError);
741 }
742
743 if let Category::Std(record_type) = record.record_type {
744 match record_type {
745 StdReqType::BeginRequest => {
746 return Err(Error::SequenceError);
747 },
748
749 StdReqType::Params => {
750 if record.content.is_empty() {
751 self.params_done = true;
752 } else {
753 if self.params_done { warn!("FastCGI: Protocol error. Params received after params stream was marked as done."); }
754
755 Self::add_nv_pairs(&mut self.params, record.get_content(), true)?;
756 }
757 },
758
759 StdReqType::StdIn => {
760 self.get_stdin().append(record.get_content())?;
761 },
762
763 StdReqType::Data => {
764 self.get_data().append(record.get_content())?;
765 }
766 };
767
768 Ok(self.check_ready())
769 } else {
770 Err(Error::SequenceError)
771 }
772 }
773
774 /// Returns the request id of this request.
775 ///
776 /// This id is unique within the current connection. It is managed by the
777 /// web-server.
778 pub fn get_request_id(&self) -> RequestId {
779 self.request_id
780 }
781
782 /// Allows the process closure to write to StdOut.
783 ///
784 /// Returns an `OutStream` instance that will send `StdOut` records back to
785 /// the web-server.
786 ///
787 ///
788 /// ## Example
789 ///
790 /// ```rust
791 /// # use tokio::io::{empty, sink};
792 /// # use tokio_fastcgi::{Requests, RequestResult};
793 /// # #[tokio::main]
794 /// # async fn main() {
795 /// # let mut requests = Requests::new(empty(), sink(), 1, 1);
796 /// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
797 /// request.process(|request| async move {
798 /// let mut stdout = request.get_stdout();
799 ///
800 /// assert!(stdout.write(b"Hello World").await.is_ok());
801 ///
802 /// RequestResult::Complete(0)
803 /// });
804 /// # } }
805 /// ```
806 pub fn get_stdout(&self) -> OutStream<W> {
807 OutStream::new(Category::Std(StdRespType::StdOut), self.orw.clone())
808 }
809
810 /// Allows the process closure to write to StdErr.
811 ///
812 /// Returns an `OutStream` instance that will send `StdErr` records back to
813 /// the web-server. What is done with the data that is sent to StdErr depends
814 /// on the web-server.
815 ///
816 /// ## Example
817 ///
818 /// ```rust
819 /// # use tokio::io::{empty, sink, AsyncWriteExt};
820 /// # use tokio_fastcgi::{Requests, RequestResult};
821 /// # #[tokio::main]
822 /// # async fn main() {
823 /// # let mut requests = Requests::new(empty(), sink(), 1, 1);
824 /// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
825 /// request.process(|request| async move {
826 /// let mut stderr = request.get_stderr();
827 ///
828 /// assert!(stderr.write(b"Hello World").await.is_ok());
829 ///
830 /// RequestResult::Complete(0)
831 /// });
832 /// # } }
833 /// ```
834 pub fn get_stderr(&self) -> OutStream<W> {
835 OutStream::new(Category::Std(StdRespType::StdErr), self.orw.clone())
836 }
837
838 /// Allows the process closure to read from StdIn.
839 ///
840 /// Returns an `InStream` instance that will read the data passed as StdIn
841 /// by the web-server.
842 ///
843 /// ## Example
844 ///
845 /// ```rust
846 /// # use tokio::io::{empty, sink};
847 /// # use std::io::Read;
848 /// # use tokio_fastcgi::{Requests, RequestResult};
849 /// # #[tokio::main]
850 /// # async fn main() {
851 /// # let mut requests = Requests::new(empty(), sink(), 1, 1);
852 /// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
853 /// request.process(|request| async move {
854 /// let mut stdin = request.get_stdin();
855 ///
856 /// let mut buffer = Vec::with_capacity(10);
857 /// assert!(stdin.read(&mut buffer).is_ok());
858 ///
859 /// assert_eq!(buffer.len(), 10);
860 ///
861 /// RequestResult::Complete(0)
862 /// });
863 /// # } }
864 /// ```
865 pub fn get_stdin(&self) -> OwnedInStream {
866 self.stdin.try_lock().expect(ERR_LOCK_FAILED)
867 }
868
869 /// Allows the process closure to read from the Data stream.
870 ///
871 /// Returns an `InStream` instance that will read the data passed as a Data
872 /// stream by the web-server.
873 ///
874 /// ## Example
875 ///
876 /// ```rust
877 /// # use tokio::io::{empty, sink};
878 /// # use std::io::Read;
879 /// # use tokio_fastcgi::{Requests, RequestResult};
880 /// # #[tokio::main]
881 /// # async fn main() {
882 /// # let mut requests = Requests::new(empty(), sink(), 1, 1);
883 /// # if let Some(request) = requests.next().await.expect("Request could not be constructed.") {
884 /// request.process(|request| async move {
885 /// let mut data = request.get_data();
886 ///
887 /// let mut buffer = Vec::with_capacity(10);
888 /// assert!(data.read(&mut buffer).is_ok());
889 ///
890 /// assert_eq!(buffer.len(), 10);
891 ///
892 /// RequestResult::Complete(0)
893 /// });
894 /// # } }
895 /// ```
896 pub fn get_data(&self) -> OwnedInStream {
897 self.data.try_lock().expect(ERR_LOCK_FAILED)
898 }
899
900 /// Processes a FastCGI request.
901 ///
902 /// As soon as a request is completely received it is returned by
903 /// [`Requests::next`]. Calling `process` on this request allows the request
904 /// to be processed. The application logic is passed to `process` via a
905 /// callback function.
906 ///
907 /// The callback function gets a reference to the [`Request`] instance that
908 /// contains all necessary information (input-/output-streams, parameters,
909 /// etc.) for processing the request.
910 ///
911 /// See the examples directory for a complete example for using this function.
912 ///
913 /// ## Callback function
914 ///
915 /// The callback function can access all information about the request via
916 /// the passed `request` parameter. The return value can be one of the
917 /// following values:
918 ///
919 /// - [`RequestResult::Complete`]
920 /// - [`RequestResult::Overloaded`]
921 /// - [`RequestResult::UnknownRole`]
922 ///
923 /// ## Example
924 ///
925 /// ```rust
926 /// # use tokio::io::{empty, sink};
927 /// # use std::io::Read;
928 /// # use tokio_fastcgi::{Requests, RequestResult};
929 /// # #[tokio::main]
930 /// # async fn main() {
931 /// # let instream = empty();
932 /// # let outstream = sink();
933 /// let mut requests = Requests::new(instream, outstream, 1, 1);
934 ///
935 /// while let Some(request) = requests.next().await.expect("Request could not be constructed.") {
936 /// request.process(|request| async move {
937 ///
938 /// // Process request
939 ///
940 /// RequestResult::Complete(0)
941 /// });
942 /// }
943 /// # }
944 /// ```
945 pub async fn process<F: Future<Output = RequestResult>, C: FnOnce(Arc<Self>) -> F>(self, callback: C) -> Result<(), Error> {
946 let rc_self = Arc::from(self);
947
948 let result = callback(rc_self.clone()).await;
949
950 if let Ok(this) = Arc::try_unwrap(rc_self) {
951 this.get_stdout().close().await?;
952 this.get_stderr().close().await?;
953
954 this.orw.write_finish(result).await?;
955 } else {
956 panic!("StdErr or StdOut leaked out of process.")
957 }
958
959 Ok(())
960 }
961}
962
963/// Processes records form an input and output stream.
964///
965/// FastCGI allow multiple requests to be interleaved within one data-stream.
966/// This struct reads the FastCGI-records from an input stream and assembles
967/// them into a [`Request`].
968///
969/// *Beware*: Requests are built in memory. Having huge requests can eat up all
970/// of your systems memory.
971pub struct Requests <R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> {
972 reader: R,
973 writer: Arc<Mutex<W>>,
974 requests: HashMap<RequestId, Request<W>>,
975 close_on_next: bool,
976 max_conns: u8,
977 max_reqs: u8
978}
979
980impl <'w, R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Requests<R, W> {
981 /// Creates a new [`Requests`] instance.
982 ///
983 /// As soon as a new connection is accepted the read and write parts of this
984 /// connection should be passed to this function. It will create a new
985 /// [`Requests`] instance that will handle the communication between the
986 /// web-server and the FastCGI application.
987 ///
988 /// In addition to the read and write side of the connection two more
989 /// parameters must be passed:
990 ///
991 /// - max_conns \
992 /// Maximum number of concurrent connections. This value will be returned
993 /// to the web-server to allow it to adjust its connection handling.
994 /// - max_reqs \
995 /// Maximum number of concurrent requests. Concurrent requests are
996 /// handled by tokio-fastcgi but they consume memory. This value is used
997 /// to tell the web-server how many concurrent requests he can use per
998 /// connection.
999 pub fn new(rd: R, wr: W, max_conns: u8, max_reqs: u8) -> Self {
1000 Self {
1001 requests: HashMap::with_capacity(1),
1002 reader: rd,
1003 writer: Arc::from(Mutex::from(wr)),
1004 close_on_next: false,
1005 max_conns,
1006 max_reqs
1007 }
1008 }
1009
1010 /// Same as [`new`](Requests::new) but takes a tuple containing the read and write
1011 /// side of the socket instead of two distinct variables
1012 ///
1013 /// This is more convenient in combination with the `split` function.
1014 ///
1015 /// # Example
1016 ///
1017 /// ```rust
1018 /// # use tokio::net::TcpListener;
1019 /// # use tokio_fastcgi::{Requests, RequestResult};
1020 /// # #[tokio::main]
1021 /// # async fn main() {
1022 /// # let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1023 /// # let server = async move {
1024 /// if let Ok(mut socket) = listener.accept().await {
1025 /// tokio::spawn(async move {
1026 /// // Directly use the result of the split() call to construct a Requests instance
1027 /// let mut requests = Requests::from_split_socket(socket.0.split(), 10, 10);
1028 ///
1029 /// // Process the requests
1030 /// });
1031 /// }
1032 /// # }; }
1033 /// ```
1034 pub fn from_split_socket(split_socket: (R, W), max_conns: u8, max_reqs: u8) -> Self {
1035 Self::new(split_socket.0, split_socket.1, max_conns, max_reqs)
1036 }
1037
1038 /// Processes and answers system records.
1039 /// If this method returns Error::Canceled an FCGI_END_REQUEST was already sent. Just discard the Request instance
1040 /// assigned for this connection.
1041 async fn process_sys(&self, record: Record) -> Result<Option<RequestId>, Error> {
1042 if let Category::Sys(record_type) = record.record_type {
1043 let output_stream = OutRecordWriter::new(self.writer.clone(), record.request_id);
1044
1045 let result = match record_type {
1046 SysReqType::GetValues => {
1047 let mut params = HashMap::new();
1048
1049 //TODO: Is this function correctly placed in request?
1050 Request::<W>::add_nv_pairs(&mut params, record.get_content(), false)?;
1051
1052 // If we're testing this library we have to make sure that the output is sorted.
1053 // Otherwise the binary compare of the produced FastCGI response is not stable.
1054 // For production we will not do this, because it is an unnecessary performance bottleneck.
1055 #[cfg(debug_assertions)]
1056 let mut params: Vec<(String, _)> = params.into_iter().collect();
1057 #[cfg(debug_assertions)]
1058 params.sort_by(|a, b| { a.0.cmp(&b.0) });
1059
1060 // Construct a vector containing the known parameters.
1061 // All other parameters are simply ignored.
1062 let mut output = Vec::with_capacity(128);
1063 for (name, _) in params {
1064 let result = match &*name {
1065 "FCGI_MAX_CONNS" => Some(self.max_conns),
1066 "FCGI_MAX_REQS" => Some(self.max_reqs),
1067 "FCGI_MPXS_CONNS" => Some(1),
1068 _ => None
1069 };
1070
1071 if let Some(result) = result {
1072 let result_str = result.to_string();
1073 // We can get away with casting here because we know that the names and values can not get longer than what fits into 7 Bits of an u8.
1074 Write::write_all(&mut output, &[name.len() as u8])?;
1075 Write::write_all(&mut output, &[result_str.len() as u8])?;
1076
1077 Write::write_all(&mut output, name.as_bytes())?;
1078 Write::write_all(&mut output, result_str.as_bytes())?;
1079 }
1080 }
1081
1082 output_stream.write_data(Category::Sys(SysRespType::GetValuesResult), &output[..]).await?;
1083
1084 Ok(None)
1085 },
1086 SysReqType::AbortRequest => {
1087 output_stream.write_finish(RequestResult::Complete(0)).await?;
1088
1089 Ok(Some(record.get_request_id()))
1090 }
1091 };
1092
1093 output_stream.flush().await?;
1094
1095 result
1096 } else {
1097 panic!("process_sys called with non sys record.");
1098 }
1099 }
1100
1101 /// Fetches the next request from this connection
1102 ///
1103 /// This function asynchronously fetches FastCGI records and assembles them
1104 /// into requests. It does the de-interlacing to allow multiple requests to
1105 /// be processed in parallel. As soon as the information for a request is
1106 /// complete, it returns a [`Request`] instance for further processing.
1107 ///
1108 /// This function will do the book keeping and process system requests like
1109 /// `FCGI_GET_VALUES` or `FCGI_ABORT_REQUEST`.
1110 pub async fn next(&mut self) -> Result<Option<Request<W>>, Error> {
1111 if self.close_on_next {
1112 if !self.requests.is_empty() {
1113 warn!("FastCGI: The web-server interleaved requests on this connection but did not use the FCGI_KEEP_CONN flag. {} requests will get lost.", self.requests.len());
1114 }
1115
1116 // Signal to the user that this connection should be closed.
1117 Ok(None)
1118 } else {
1119 loop
1120 {
1121 match Record::new(&mut self.reader).await {
1122 // Success, a new record hast to be added to its request...
1123 Ok(record) => {
1124 if record.is_sys_record() {
1125 if let Some(canceled_request_id) = self.process_sys(record).await? {
1126 // The request got canceled. Remove it from the list
1127 self.requests.remove(&canceled_request_id);
1128 }
1129 } else {
1130 let request_ready = match self.requests.entry(record.get_request_id()) {
1131 Entry::Occupied(mut e) => { e.get_mut().update(&record) },
1132 Entry::Vacant(e) => { e.insert(Request::new(&record, self.writer.clone())?); Ok(false) }
1133 }?;
1134
1135 if request_ready {
1136 let request = self.requests.remove(&record.get_request_id()).unwrap();
1137
1138 // Store if we should close the connection after handling this request.
1139 self.close_on_next = !request.keep_connection;
1140
1141 // Calling unwrap here is ok because we made sure there is an object for this id.
1142 return Ok(Some(request));
1143 }
1144 }
1145 },
1146 // IoError UnexpectedEof: May be ok or an error. Depends on if requests have been processed.
1147 Err(Error::IoError(err)) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
1148 // An I/O-error signals the end of the stream. On record construction this is ok as long
1149 // as there are no other requests in flight.
1150 if self.requests.is_empty() {
1151 return Ok(None)
1152 } else {
1153 return Err(Error::from(err));
1154 }
1155 },
1156 // UnknownRecordType must be transmitted back to the server. This is not a fatal error.
1157 Err(Error::UnknownRecordType(request_id, type_id)) => {
1158 let output_stream = OutRecordWriter::new(self.writer.clone(), request_id);
1159 output_stream.write_unkown_type(type_id).await?;
1160 },
1161 // An error occurred, exit the loop and return it...
1162 Err(err) => {
1163 return Err(err);
1164 }
1165 }
1166 }
1167 }
1168 }
1169}
1170
1171// Generate nicer debug output for Request. This is useful if you look at the request
1172// from within the `process` function.
1173impl <W: AsyncWrite + Unpin> Debug for Request<W> {
1174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1175 write!(f, "Request {{ request_id: {}, keep_connection: {:?}, stdin: {:?}, data: {:?}, params: {{", self.request_id, self.keep_connection, self.stdin, self.data)?;
1176
1177 for (param_index, param_key) in self.params.keys().enumerate() {
1178 let delimiter = if param_index > 0 { ", " } else { "" };
1179
1180 if let Some(str_value) = self.get_str_param(param_key) {
1181 write!(f, "{}{}: \"{}\"", delimiter, param_key, str_value)?;
1182 } else {
1183 write!(f, "{}{}: {:?}", delimiter, param_key, self.get_param(param_key))?;
1184 }
1185 }
1186
1187 writeln!(f, "}}")
1188 }
1189}
1190
1191/// Sends output records to the web-server.
1192#[derive(Debug)]
1193struct OutRecordWriter<W: AsyncWrite> {
1194 inner_stream: Arc<Mutex<W>>,
1195 request_id: RequestId,
1196}
1197
1198impl <W: AsyncWrite + Unpin> OutRecordWriter<W> {
1199 fn new(inner_stream: Arc<Mutex<W>>, request_id: RequestId) -> Self {
1200 Self {
1201 inner_stream,
1202 request_id
1203 }
1204 }
1205
1206 async fn write_data(&self, record_type: ResponseType, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
1207 trace!("FastCGI: Out record {{T:{:?}, ID: {}, L:{}}}", record_type, self.request_id, RECORD_HEADER_SIZE + data.len());
1208
1209 // Construct the header
1210 // We use unwrap here because we're writing to a vec. This must never fail.
1211 let mut message_header = Vec::with_capacity(8);
1212 byteorder::WriteBytesExt::write_u8(&mut message_header, 1).unwrap(); // Version
1213 byteorder::WriteBytesExt::write_u8(&mut message_header, record_type.into()).unwrap(); // Record Type
1214 byteorder::WriteBytesExt::write_u16::<BigEndian>(&mut message_header, self.request_id).unwrap(); // Request ID
1215 byteorder::WriteBytesExt::write_u16::<BigEndian>(&mut message_header, data.len() as u16).unwrap(); // Content length
1216 byteorder::WriteBytesExt::write_u8(&mut message_header, 0).unwrap(); // No padding.
1217 byteorder::WriteBytesExt::write_u8(&mut message_header, 0).unwrap(); // Reserved
1218
1219 // Aquire the mutext guard to prevent the header and the payload to pe torn apart.
1220 let mut is = self.inner_stream.try_lock().expect(ERR_LOCK_FAILED);
1221
1222 // Write the messge header
1223 is.write_all_buf(&mut Cursor::new(message_header)).await?;
1224
1225 // Write the data
1226 // Writing empty data blocks breaks tokio-test. Therefore we only call write if the data-buffer is not empty.
1227 if !data.is_empty() {
1228 is.write_all_buf(&mut Cursor::new(data)).await?;
1229 Ok(data.len())
1230 } else {
1231 Ok(0)
1232 }
1233
1234 // Write no padding
1235 }
1236
1237 /// Sends an `EndRequest` response to the web-server and ends the current
1238 /// request.
1239 async fn write_finish(&self, result: RequestResult) -> Result<(), std::io::Error> {
1240 let mut end_message = Vec::with_capacity(8);
1241
1242 // Unwrap is safe here because we're writing to an in memory buffer. This must never fail.
1243 byteorder::WriteBytesExt::write_u32::<BigEndian>(&mut end_message, result.app_status()).unwrap();
1244 byteorder::WriteBytesExt::write_u8(&mut end_message, result.into()).unwrap();
1245 // Write 3 reserved bytes
1246 std::io::Write::write_all(&mut end_message, &[0u8; 3]).unwrap();
1247
1248 self.write_data(Category::Std(StdRespType::EndRequest), &end_message[..]).await?;
1249
1250 Ok(())
1251 }
1252
1253 /// Sends an `UnknownType` response to the web-server.
1254 async fn write_unkown_type(&self, type_id: u8) -> Result<(), std::io::Error> {
1255 let mut ut_message = Vec::with_capacity(8);
1256
1257 // Unwrap is safe here because we're writing to an in memory buffer. This must never fail.
1258 byteorder::WriteBytesExt::write_u8(&mut ut_message, type_id).unwrap();
1259 // Write 7 reserved bytes
1260 std::io::Write::write_all(&mut ut_message, &[0u8; 7]).unwrap();
1261
1262 self.write_data(ResponseType::Sys(SysRespType::UnknownType), &ut_message[..]).await?;
1263
1264 Ok(())
1265 }
1266
1267 async fn flush(&self) -> std::result::Result<(), std::io::Error> {
1268 self.inner_stream.try_lock().expect(ERR_LOCK_FAILED).flush().await
1269 }
1270}
1271
1272/// Implements a data stream from the FastCGI application to the web-server.
1273///
1274/// The maximum chunk size is 64k. The calls made by this
1275/// interface may block if the web-server is not receiving the data fast enough.
1276/// Therefore all calls are implemented as async functions.
1277pub struct OutStream<W: AsyncWrite + Unpin> {
1278 orw: Arc<OutRecordWriter<W>>,
1279 record_type: ResponseType,
1280 closed: bool
1281}
1282
1283impl <W: AsyncWrite + Unpin> OutStream<W> {
1284 fn new(record_type: ResponseType, orw: Arc<OutRecordWriter<W>>) -> Self {
1285 Self {
1286 orw,
1287 record_type,
1288 closed: false
1289 }
1290 }
1291
1292 /// Send data to the web-server.
1293 ///
1294 /// If the data is bigger than 64k the transfer is automatically split into
1295 /// chunks of 64k.
1296 /// If the stream is already closed, the function will always return
1297 /// [`StreamAlreadyClosed`](Error::StreamAlreadyClosed).
1298 pub async fn write(&mut self, data: &[u8]) -> std::result::Result<usize, Error> {
1299 if self.closed {
1300 return Err(Error::StreamAlreadyClosed);
1301 }
1302
1303 // Check if the data can be transmitted in one chunk.
1304 // If not, split the data in chunks of u16 - 1 size.
1305 if data.len() < u16::max_value() as usize {
1306 Ok(self.orw.write_data(self.record_type, data).await?)
1307 } else {
1308 // Transmit large streams in junks of 64k
1309 const JUNK_SIZE: usize = (u16::max_value() - 1) as usize;
1310 for offset in (0..data.len()).step_by(JUNK_SIZE) {
1311 self.orw.write_data(self.record_type, &data[offset..(offset + JUNK_SIZE).min(data.len())]).await?;
1312 }
1313
1314 Ok(data.len())
1315 }
1316 }
1317
1318 /// Flushes the data to the web-server immediately.
1319 ///
1320 /// This function also calls flush on the underlying stream.
1321 pub async fn flush(&self) -> std::result::Result<(), std::io::Error> {
1322 self.orw.flush().await
1323 }
1324
1325 /// Closes the output stream
1326 ///
1327 /// FastCGI closes a stream by sending an empty packet. After calling this
1328 /// method, further calls to [`write`] will fail with
1329 /// [`StreamAlreadyClosed`](Error::StreamAlreadyClosed).
1330 async fn close(&mut self) -> Result<(), Error>{
1331 // Send an empty record to close the stream.
1332 self.write(&[0u8; 0]).await?;
1333
1334 self.flush().await?;
1335
1336 // Now mark this stream as closed. Do not do it any earlier, because
1337 // we need to call write on the stream to close it.
1338 self.closed = true;
1339
1340 Ok(())
1341 }
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346 use super::*;
1347 use tokio_test::io::Builder;
1348
1349 fn is_send<T: Send>(_: T) { }
1350
1351 /// Verify that the future created by process is Send to allow using it
1352 /// with Tokio.
1353 #[test]
1354 fn check_send() {
1355 is_send(async move {
1356 let mut requests = Requests::new(Builder::new().build(), Builder::new().build(), 10, 10);
1357
1358 is_send(&requests);
1359
1360 while let Ok(Some(request)) = requests.next().await {
1361 request.process(|_request| async move {
1362 RequestResult::Complete(0)
1363 }).await.unwrap();
1364 }
1365 });
1366 }
1367}