async_fcgi/client/
connection.rs

1/*! A single connection (TCP or Unix) to an FCGI application.
2
3Multiple Requests can be multiplexed on it.
4
5# Example
6```
7# use std::collections::HashMap;
8# use std::error::Error;
9# use tokio::net::TcpListener;
10# use bytes::BytesMut;
11# use tokio::io::{AsyncReadExt, AsyncWriteExt};
12# use std::net::SocketAddr;
13use http::{Request, StatusCode};
14use http_body::{Body};
15use bytes::Bytes;
16use async_fcgi::client::connection::Connection;
17
18# #[tokio::main(flavor = "current_thread")]
19# async fn main() -> Result<(),Box<dyn Error>> {
20#    let sa: SocketAddr = "127.0.0.1:59000".parse()?;
21#    let app_listener = TcpListener::bind(sa).await?;
22#    tokio::spawn(async move {
23#        let (mut app_socket, _) = app_listener.accept().await.unwrap();
24#        let mut buf = BytesMut::with_capacity(4096);
25#        app_socket.read_buf(&mut buf).await.unwrap();
26#        let from_php = b"\x01\x06\0\x01\x00\x38\0\0Status: 404 Not Found\r\nX-Powered-By: PHP/7.3.16\r\n\r\ntest!\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
27#        app_socket.write_buf(&mut Bytes::from(&from_php[..])).await.unwrap();
28#    });
29    let mut fcgi_con = Connection::connect(&"127.0.0.1:59000".parse()?, 1).await?;
30    let req = Request::get("/test?lol=1").header("Accept", "text/html").body(String::new())?;
31    let mut params = HashMap::new();
32    params.insert(
33        Bytes::from(&b"SCRIPT_FILENAME"[..]),
34        Bytes::from(&b"/home/daniel/Public/test.php"[..]),
35    );
36    let mut res = fcgi_con.forward(req, params).await?;
37    assert_eq!(res.status(), StatusCode::NOT_FOUND);
38    assert_eq!(res.headers().get("X-Powered-By").unwrap(), "PHP/7.3.16");
39    # Ok(())
40# }
41```
42*/
43use bytes::{Buf, BufMut, Bytes, BytesMut};
44use http::{
45    header::AUTHORIZATION, header::CONTENT_LENGTH, header::CONTENT_TYPE,
46    Request, Response, StatusCode,
47};
48use http_body::{Body, Frame};
49use slab::Slab;
50use std::marker::Unpin;
51
52use log::{debug, error, info, log_enabled, trace, warn, Level::Trace};
53
54use std::error::Error;
55use std::future::Future;
56use std::io::{Error as IoError, ErrorKind};
57use std::iter::IntoIterator;
58use std::ops::Drop;
59use std::pin::Pin;
60use std::sync::Arc;
61use std::task::Waker;
62use std::task::{Context, Poll};
63use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
64
65use crate::bufvec::BufList;
66use crate::codec::{FCGIType, FCGIWriter};
67use crate::fastcgi;
68use crate::httpparse::{parse, ParseResult};
69use async_stream_connection::{Addr, Stream};
70use tokio::io::{AsyncBufRead, BufReader};
71
72/// [http_body](https://docs.rs/http-body/0.3.1/http_body/trait.Body.html) type for FCGI.
73///
74/// This is the STDOUT of an FastCGI Application.
75/// STDERR is logged using [log::error](https://doc.rust-lang.org/1.1.0/log/macro.error!.html)
76struct FCGIBody {
77    con: Arc<Mutex<InnerConnection>>, //where to read
78    rid: u16,                         //my id
79    done: bool,                       //no more data
80    was_returned: bool,               //request is no longer polled by us
81}
82/// Request stream
83///
84/// Manages one request from
85/// `FCGI_BEGIN_REQUEST` to `FCGI_END_REQUEST`
86///
87struct FCGIRequest {
88    buf: BufList<Bytes>,           //stdout read by some task
89    waker: Option<Waker>,          //wake me if needed
90    /// the FCGI server is done with this request
91    ended: bool,                   //fin reading
92    /// was aborted (by dropping the FCGIBody)
93    aborted: bool,
94    _permit: OwnedSemaphorePermit, //block a multiplex slot
95}
96/// Shared object to read from a `Connection`
97///
98/// Manages all requests on it and distributes data to them
99struct InnerConnection {
100    io: FCGIWriter<BufReader<Stream>>,
101    running_requests: Slab<FCGIRequest>,
102    fcgi_parser: fastcgi::RecordReader,
103}
104/// Single transport connection to a FCGI application
105///
106/// Can multiplex `max_req_per_con` simultaneous request streams
107pub struct Connection {
108    inner: Arc<Mutex<InnerConnection>>,
109    sem: Arc<Semaphore>,
110    addr: Addr,
111    header_mul: MultiHeaderStrategy,
112    header_nl: HeaderMultilineStrategy,
113}
114/// Specifies how to handle multiple HTTP Headers
115#[derive(Copy, Clone)]
116pub enum MultiHeaderStrategy {
117    /// RFC 3875: Combine then by joining them separated by `,`
118    Combine,
119    /// Only forward the first occurrence
120    OnlyFirst,
121    /// Only forward the last occurrence
122    OnlyLast,
123}
124/// Specifies how to handle HTTP Headers that contain `\n`
125#[derive(Copy, Clone)]
126pub enum HeaderMultilineStrategy {
127    /// Forward it to the FCGI server
128    Ignore,
129    /// RFC 7230: Return [`std::io::ErrorKind::InvalidData`]
130    ReturnError,
131}
132impl Connection {
133    /// Connect to a peer with [`MultiHeaderStrategy::OnlyFirst`] & [`HeaderMultilineStrategy::Ignore`].
134    #[inline]
135    pub async fn connect(
136        addr: &Addr,
137        max_req_per_con: u16,
138    ) -> Result<Connection, Box<dyn Error>> {
139        Self::connect_with_strategy(
140            addr,
141            max_req_per_con,
142            MultiHeaderStrategy::OnlyFirst,
143            HeaderMultilineStrategy::Ignore,
144        )
145        .await
146    }
147    /// Connect to a peer
148    pub async fn connect_with_strategy(
149        addr: &Addr,
150        max_req_per_con: u16,
151        header_mul: MultiHeaderStrategy,
152        header_nl: HeaderMultilineStrategy,
153    ) -> Result<Connection, Box<dyn Error>> {
154        Ok(Connection {
155            inner: Arc::new(Mutex::new(InnerConnection {
156                io: FCGIWriter::new(BufReader::new(Stream::connect(addr).await?)),
157                running_requests: Slab::with_capacity(max_req_per_con as usize),
158                fcgi_parser: fastcgi::RecordReader::new(),
159            })),
160            sem: Arc::new(Semaphore::new(max_req_per_con as usize)),
161            addr: addr.clone(),
162            header_mul,
163            header_nl,
164        })
165    }
166
167    /// true if the next call to forward does not need to
168    /// wait for the end of some previous request
169    pub fn is_ready(&self) -> bool {
170        self.sem.available_permits() > 0
171    }
172
173    pub async fn close(self) -> Result<(), IoError> {
174        let mut mut_inner = self.inner.lock().await;
175        mut_inner.io.shutdown().await?;
176        mut_inner.notify_everyone();
177        Ok(())
178    }
179
180    const QUERY_STRING: &'static [u8] = b"QUERY_STRING";
181    const REQUEST_METHOD: &'static [u8] = b"REQUEST_METHOD";
182    const CONTENT_TYPE: &'static [u8] = b"CONTENT_TYPE";
183    const CONTENT_LENGTH: &'static [u8] = b"CONTENT_LENGTH";
184    const NULL: &'static [u8] = b"";
185    /// Forwards an HTTP request to a FGCI Application
186    /// ```no_run
187    /// # use std::error::Error;
188    /// # use http::Request;
189    /// # use async_fcgi::client::connection::Connection;
190    /// # #[tokio::main(flavor = "current_thread")]
191    /// # async fn main() -> Result<(),Box<dyn Error>> {
192    /// # let mut fcgi_con = Connection::connect(&"127.0.0.1:59000".parse()?, 1).await?;
193    /// let req = Request::get("/test?lol=1").header("Accept", "text/html").body(String::new())?;
194    /// let mut params = [(
195    ///     &b"SCRIPT_FILENAME"[..],
196    ///     &b"/home/daniel/Public/test.php"[..]
197    /// )];
198    /// let mut res = fcgi_con.forward(req, params).await?;
199    /// # Ok(())
200    /// # }
201    /// ```
202    ///
203    /// Fills `QUERY_STRING`, `REQUEST_METHOD`, `CONTENT_TYPE` and `CONTENT_LENGTH`
204    /// from the corresponding values in the Request.
205    /// Headers from the Request will be added with the `HTTP_` prefix. (CGI/1.1 4.1.18)
206    ///
207    /// Additional Params might be expected from the application (at least the url path):
208    /// 
209    /// |Param             |Specification           |Info                       |
210    /// |------------------|------------------------|---------------------------|
211    /// | SCRIPT_NAME      |**must** CGI/1.1 4.1.13 | **required** in any case |
212    /// | SERVER_NAME      |**must** CGI/1.1 4.1.14 | **required** by flup |
213    /// | SERVER_PORT      |**must** CGI/1.1 4.1.15 | **required** by flup |
214    /// | SERVER_PROTOCOL  |**must** CGI/1.1 4.1.16 | **required** by flup |
215    /// | SERVER_SOFTWARE  |**must** CGI/1.1 4.1.17 | |
216    /// | REMOTE_ADDR      |**must** CGI/1.1 4.1.8  | |
217    /// | GATEWAY_INTERFACE|**must** CGI/1.1 4.1.4  | `"CGI/1.1"` |
218    /// | REMOTE_HOST      |should CGI/1.1  4.1.9 | |
219    /// | REMOTE_IDENT     |may CGI/1.1  4.1.10 | |
220    /// | REMOTE_USER      |opt CGI/1.1 | |
221    /// | AUTH_TYPE        |opt CGI/1.1 | |
222    /// | PATH_INFO        |opt CGI/1.1   4.1.5 |extra-path|
223    /// | PATH_TRANSLATED  |opt CGI/1.1   4.1.6|
224    /// | SCRIPT_FILENAME  | | **required** by PHP |
225    /// | REMOTE_PORT      | | common |
226    /// | SERVER_ADDR      | | common |
227    /// | REQUEST_URI      | | common |
228    /// | DOCUMENT_URI     | | common |
229    /// | DOCUMENT_ROOT    | | common |
230    pub async fn forward<B, I, P1, P2>(
231        &self,
232        req: Request<B>,
233        dyn_headers: I,
234    ) -> Result<Response<impl Body<Data = Bytes, Error = IoError>>, IoError>
235    where
236        B: Body + Unpin,
237        I: IntoIterator<Item = (P1, P2)>,
238        P1: Buf,
239        P2: Buf,
240    {
241        info!("new request pending");
242        let _permit = self
243            .sem
244            .clone()
245            .acquire_owned()
246            .await
247            .map_err(|_e| IoError::new(ErrorKind::WouldBlock, ""))?;
248        let meta = FCGIRequest {
249            buf: BufList::new(),
250            waker: None,
251            ended: false,
252            aborted: false,
253            _permit,
254        };
255
256        info!("wait for lock");
257        let mut mut_inner = self.inner.lock().await;
258
259        if mut_inner.check_alive().await? == false {
260            // we need to connect again
261            info!("reconnect...");
262            if let Err(e) = mut_inner.io.shutdown().await {
263                error!("shutdown old con: {}", e);
264            }
265            mut_inner.notify_everyone();
266            mut_inner.io = FCGIWriter::new(BufReader::new(Stream::connect(&self.addr).await?));
267            mut_inner.fcgi_parser = fastcgi::RecordReader::new();
268            info!("reconnected");
269        }
270
271        let rid = (mut_inner.running_requests.insert(meta) + 1) as u16;
272        info!("started req #{}", rid);
273        //entry.insert(meta);
274
275        let br = FCGIType::BeginRequest {
276            request_id: rid,
277            role: fastcgi::FastCGIRole::Responder,
278            flags: fastcgi::BeginRequestBody::KEEP_CONN,
279        };
280        mut_inner.io.encode(br).await?;
281        //Prepare the CGI headers
282        let mut kvw = mut_inner.io.kv_stream(rid, fastcgi::RecordType::Params);
283
284        kvw.extend(dyn_headers).await?;
285
286        match req.uri().query() {
287            Some(query) => kvw.add_kv(Self::QUERY_STRING, query.as_bytes()).await?, //must CGI1.1 4.1.7
288            None => kvw.add_kv(Self::QUERY_STRING, Self::NULL).await?, //must CGI1.1 4.1.7
289        }
290
291        kvw.add_kv(Self::REQUEST_METHOD, req.method().as_str().as_bytes())
292            .await?; //must CGI1.1 4.1.12
293
294        let (parts, body) = req.into_parts();
295        let headers = parts.headers;
296
297        if let Some(value) = headers.get(CONTENT_TYPE) {
298            //if client CGI1.1 4.1.3.
299            kvw.add_kv(Self::CONTENT_TYPE, value.as_bytes()).await?;
300        }
301
302        let len: Option<usize> = if Some(0) == body.size_hint().upper() {
303            //if exact and 0 -> no body
304            None
305        } else {
306            //if exact (content len present) -> lower==upper
307            //if unknown -> at least lower, maybe 0
308            let value = body.size_hint().lower(); //if body CGI1.1 4.1.2.
309            kvw.add_kv(Self::CONTENT_LENGTH, value.to_string().as_bytes())
310                .await?;
311            Some(value as usize)
312        };
313        let skip = [AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE];
314        //append all HTTP headers
315        for key in headers.keys() {
316            if skip.iter().find(|x| x == key).is_some() {
317                //CGI1.1 4.1.18.
318                continue;
319            }
320            /*rfc3875
321            The HTTP header field name is converted to upper case, has all
322            occurrences of "-" replaced with "_" and has "HTTP_" prepended to
323            give the meta-variable name.
324            */
325            let mut k = BytesMut::with_capacity(key.as_str().len() + 5);
326            k.put(&b"HTTP_"[..]);
327            for &c in key.as_str().as_bytes() {
328                let upper = match c {
329                    b'-' => b'_',
330                    lower_acii if b'a' <= lower_acii && lower_acii <= b'z' => {
331                        lower_acii - (b'a' - b'A')
332                    } //a ... z
333                    s => s,
334                };
335                k.put_u8(upper);
336            }
337            /*rfc3875
338            If multiple header fields with the same field-name
339            are received then the server MUST rewrite them as a single value
340            having the same semantics.  Similarly, a header field that spans
341            multiple lines MUST be merged onto a single line.
342
343            RFC 7230, Section 3.2.2, Field Order: Set-Cookie is special
344            -> but not part of a request
345
346            RFC 7230, Section 3.2.4, Field Parsing: multiline header -> 400/502
347            */
348            let mut value_buf;
349            let value = match self.header_mul {
350                MultiHeaderStrategy::Combine => {
351                    value_buf = BytesMut::with_capacity(512);
352                    let mut first = false;
353                    for v in headers.get_all(key).iter() {
354                        if !first {
355                            first = true;
356                        } else {
357                            value_buf.put_u8(b',');
358                        }
359                        let v = v.as_bytes();
360                        value_buf.put_slice(v); //copy
361                    }
362                    value_buf.as_ref()
363                }
364                MultiHeaderStrategy::OnlyFirst => match headers.get(key) {
365                    Some(v) => v.as_bytes(),
366                    None => Self::NULL,
367                },
368                MultiHeaderStrategy::OnlyLast => match headers.get_all(key).iter().next_back() {
369                    Some(v) => v.as_bytes(),
370                    None => Self::NULL,
371                },
372            };
373            if let HeaderMultilineStrategy::ReturnError = self.header_nl {
374                if value.as_ref().contains(&b'\n') {
375                    drop(kvw); //stop mid stream
376                    mut_inner.abort_req(rid).await?; //abort request
377                    return Err(IoError::new(
378                        ErrorKind::InvalidData,
379                        "multiline headers are not allowed",
380                    ));
381                }
382            }
383            kvw.add_kv(k, value).await?;
384        }
385        //send all headers to the FCGI App
386        kvw.flush().await?;
387        trace!("sent header");
388        //Note: Responses might arrive from this point on
389
390        if let Some(len) = len {
391            drop(mut_inner); // close mutex before create_response or/and send_body
392                             // send the body to the FCGI App
393                             // and read responses
394            let (_, res) =
395                tokio::try_join!(self.send_body(rid, len, body), self.create_response(rid))?;
396            Ok(res)
397        } else {
398            //send end of STDIN
399            mut_inner
400                .io
401                .flush_data_chunk(Self::NULL, rid, fastcgi::RecordType::StdIn)
402                .await?;
403            drop(mut_inner); // close mutex before create_response
404            self.create_response(rid).await
405        }
406    }
407    /// send the body to the FCGI App as STDIN
408    async fn send_body<B>(
409        &self,
410        request_id: u16,
411        mut len: usize,
412        mut body: B,
413    ) -> Result<(), IoError>
414    where
415        B: Body + Unpin,
416    {
417        //stream as body comes in
418        while let Some(chunk) = body.data().await {
419            if let Ok(data) = chunk {
420                let s = data.remaining();
421                debug!("sent {} body bytes to app", s);
422                if s == 0 {
423                    continue;
424                }
425                len -= s;
426                self.inner
427                    .lock()
428                    .await
429                    .io
430                    .flush_data_chunk(data, request_id, fastcgi::RecordType::StdIn)
431                    .await?;
432            }
433        }
434        //CGI1.1 4.2 -> at least content-length data
435        if len > 0 {
436            self.inner
437                .lock()
438                .await
439                .abort_req(request_id).await?;
440            return Err(std::io::Error::new(
441                std::io::ErrorKind::ConnectionAborted,
442                "body too short",
443            ));
444        }
445        //empty record to end STDIN steam FCGI1.0
446        self.inner
447            .lock()
448            .await
449            .io
450            .flush_data_chunk(Self::NULL, request_id, fastcgi::RecordType::StdIn)
451            .await?;
452
453        debug!("sent req body");
454        Ok(())
455    }
456    /// Poll the STDOUT response of the FCGI Server
457    /// Parse the Headers and return a body that streams the rest
458    async fn create_response(
459        &self,
460        rid: u16,
461    ) -> Result<Response<impl Body<Data = Bytes, Error = IoError>>, IoError> {
462        let mut fcgibody = FCGIBody {
463            con: Arc::clone(&self.inner),
464            rid: (rid - 1),
465            done: false,
466            was_returned: false,
467        };
468        let mut rb = Response::builder();
469        let mut rheaders = rb.headers_mut().unwrap();
470        let mut status = StatusCode::OK;
471        //read the headers
472        let mut buf: Option<Bytes> = None;
473        while let Some(rbuf) = fcgibody.data().await {
474            if let Ok(mut b) = rbuf {
475                if let Some(left) = buf.take() {
476                    //we have old data -> concat
477                    let mut c = BytesMut::with_capacity(left.len() + b.len());
478                    c.put(left);
479                    c.put(b);
480                    b = c.freeze();
481                }
482                match parse(b.clone(), &mut rheaders) {
483                    ParseResult::Ok(bodydata) => {
484                        trace!("read body fragment: {:?}", &bodydata);
485                        if bodydata.has_remaining() {
486                            let mut mut_inner = self.inner.lock().await;
487                            //was_returned prevents: request might already be done and gone
488                            mut_inner.running_requests[fcgibody.rid as usize]
489                                .buf
490                                .push(bodydata);
491                        }
492
493                        if let Some(stat) = rheaders.get("Status") {
494                            //CGI1.1
495                            //info!("Status header: {:?}", stat);
496                            if stat.len() >= 3 {
497                                if let Ok(s) = StatusCode::from_bytes(&stat.as_bytes()[..3][..]) {
498                                    status = s;
499                                }
500                            }
501                        }
502                        //Location header for local URIs (starting with "/") -> must be done in Webserver
503                        break;
504                    }
505                    ParseResult::Pending => {
506                        //read more
507                        buf = Some(b);
508                        trace!("header pending");
509                    }
510                    ParseResult::Err => {
511                        status = StatusCode::INTERNAL_SERVER_ERROR;
512                        break;
513                    }
514                }
515            } else {
516                error!("{:?}", rbuf);
517            }
518        }
519        fcgibody.was_returned = true;
520        debug!("resp header parsing done");
521
522        match rb.status(status).body(fcgibody) {
523            Ok(v) => Ok(v),
524            Err(_) => {
525                //all headers are parsed ok, so they should be fine
526                unreachable!();
527            }
528        }
529    }
530}
531
532/// `Frame` from `http-body-util` but only returns data frames
533pub(crate) struct BodyDataFrame<'a, T: ?Sized>(pub(crate) &'a mut T);
534impl<'a, T: Body + Unpin + ?Sized> Future for BodyDataFrame<'a, T> {
535    type Output = Option<Result<T::Data, T::Error>>;
536
537    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
538        match Pin::new(&mut self.0).poll_frame(ctx) {
539            Poll::Ready(Some(Ok(a))) => {
540                if let Ok(d) = a.into_data() {
541                    Poll::Ready(Some(Ok(d)))
542                }else{
543                    ctx.waker().wake_by_ref();
544                    Poll::Pending
545                }
546            },
547            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
548            Poll::Ready(None) => Poll::Ready(None),
549            Poll::Pending => Poll::Pending
550        }
551    }
552}
553pub(crate) trait BodyExt: Body {
554    /// Returns a future that resolves to the next [`Frame`], if any.
555    ///
556    /// [`Frame`]: combinators::Frame
557    fn data(&mut self) -> BodyDataFrame<'_, Self>
558    where
559        Self: Unpin,
560    {
561        BodyDataFrame(self)
562    }
563}
564impl<T: ?Sized> BodyExt for T where T: Body {}
565
566impl Future for InnerConnection {
567    type Output = Option<Result<(), IoError>>;
568
569    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<(), IoError>>> {
570        self.poll_resp(cx)
571    }
572}
573struct CheckAlive<'a>(&'a mut InnerConnection);
574
575impl<'a> Future for CheckAlive<'a> {
576    type Output = Result<bool, IoError>;
577
578    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<bool, IoError>> {
579        Poll::Ready(match Pin::new(&mut *self.0).poll_resp(cx) {
580            Poll::Ready(None) => Ok(false),
581            Poll::Ready(Some(Err(e))) => {
582                error!("allive: {:?}", e);
583                if e.kind() == ErrorKind::NotConnected {
584                    Ok(false)
585                } else {
586                    Err(e)
587                }
588            }
589            _ => Ok(true),
590        })
591    }
592}
593
594impl InnerConnection {
595    ///returns true if the connection is still alive
596    fn check_alive(&mut self) -> CheckAlive {
597        CheckAlive(self)
598    }
599    async fn abort_req(&mut self, request_id: u16) -> Result<(), IoError> {
600        self.io.encode(FCGIType::AbortRequest { request_id }).await
601    }
602    /// drive this connection
603    /// Read, parse and distribute data from the socket.
604    /// return None if the connection was closed
605    fn poll_resp(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<(), IoError>>> {
606        let Self {
607            ref mut io,
608            ref mut running_requests,
609            ref mut fcgi_parser,
610        } = *self;
611        /*
612        1. Read from Socket
613        2. Parse all the Data and put it in the corresponding OutBuffer
614        3. Notify those with new Data
615        */
616        let read = match Pin::new(io).poll_fill_buf(cx) {
617            Poll::Ready(Ok(rbuf)) => {
618                let data_available = rbuf.len();
619                if data_available == 0 {
620                    info!("connection closed");
621                    0
622                } else {
623                    let mut data = Bytes::copy_from_slice(rbuf);
624                    if log_enabled!(Trace) {
625                        let print = if data.len() > 50 {
626                            format!(
627                                "({}) {:?}...{:?}",
628                                data.len(),
629                                data.slice(..21),
630                                data.slice(data.len() - 21..)
631                            )
632                        } else {
633                            format!("{:?}", data)
634                        };
635                        trace!("read conn data {}", print);
636                    }
637                    InnerConnection::parse_and_distribute(&mut data, running_requests, fcgi_parser);
638                    let read = data_available - data.remaining();
639                    read
640                }
641            }
642            Poll::Ready(Err(e)) => {
643                error!("Err {}", e);
644                self.notify_everyone();
645                return Poll::Ready(Some(Err(e)));
646            }
647            Poll::Pending => return Poll::Pending,
648        };
649        if read == 0 {
650            self.notify_everyone();
651            Poll::Ready(None)
652        } else {
653            Pin::new(&mut (*self).io).consume(read);
654            Poll::Ready(Some(Ok(())))
655        }
656    }
657}
658impl Drop for FCGIRequest {
659    fn drop(&mut self) {
660        debug!("Req mplex id free");
661    }
662}
663impl Drop for FCGIBody {
664    fn drop(&mut self) {
665        if self.done {
666            return;
667        }
668        debug!("Dropping FCGIBody #{}!", self.rid + 1);
669        match self.con.try_lock() {
670            Ok(mut mut_inner) => {
671                let rid = self.rid as usize;
672                if let Some(req) = mut_inner.running_requests.get_mut(rid) {
673                    req.aborted = true;
674                    req.waker = None;
675                    //TODO drop the data already
676                }
677            }
678            Err(e) => error!("{}", e),
679        }
680    }
681}
682
683impl Body for FCGIBody {
684    type Data = Bytes;
685    type Error = IoError;
686    /// Get a chunk of STDOUT data from this FCGI application request stream
687    fn poll_frame(
688        mut self: Pin<&mut Self>,
689        cx: &mut Context,
690    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
691        /*
692        We need to read the socket because we
693        a. are the only request
694        b. have to wake another task
695
696        1. Read InnerConnection
697        4. Check if we now have data
698        */
699        let Self {
700            ref con,
701            rid,
702            ref mut done,
703            was_returned,
704        } = *self;
705
706        if *done {
707            debug!("body #{} is already done", rid + 1);
708            return Poll::Ready(None);
709        }
710
711        trace!("read resp body");
712        let fut = con.lock();
713        match Box::pin(fut).as_mut().poll(cx) {
714            Poll::Pending => Poll::Pending,
715            Poll::Ready(mut mut_inner) => {
716                // mut_inner: InnerConnection<S>
717
718                //poll connection and distribute new data
719                let _con_stat = Pin::new(&mut *mut_inner).poll_resp(cx);
720
721                //work with slab buffer
722                let slab = match mut_inner.running_requests.get_mut(rid as usize) {
723                    Some(slab) => slab,
724                    None => {
725                        warn!("#{} not in slab", rid + 1);
726                        *done = true;
727                        return Poll::Ready(None);
728                    }
729                };
730
731                /*
732                if let Poll::Ready(Some(Err(e))) = con_stat {
733                    error!("body #{} (done: {}) err {}", rid, slab.ended, e);
734                    if !slab.ended {//unreachable
735                        //request is not done but an error occured
736                        return Poll::Ready(Some(Err(e)));
737                    }
738                }*/
739
740                if slab.buf.remaining() >= 1 {
741                    trace!("body #{} has data and is {} closed", rid + 1, slab.ended);
742                    let retdata = Poll::Ready(Some(Ok(Frame::data(slab.buf.oldest().unwrap()))));
743                    if was_returned && slab.ended && slab.buf.remaining() < 1 {
744                        //ret rid of this as fast as possible,
745                        //it blocks us and clients might stop reading
746                        trace!("next read on #{} will not have data -> release", rid + 1);
747                        mut_inner.running_requests.remove(rid as usize);
748                        *done = true;
749                    }
750                    retdata
751                } else {
752                    //data buffer empty
753                    let req_done = slab.ended;
754                    if req_done {
755                        debug!("body #{} is done", rid + 1);
756                        if was_returned {
757                            mut_inner.running_requests.remove(rid as usize);
758                            *done = true;
759                        } else {
760                            warn!("#{} closed before handover", rid + 1);
761                        }
762                        Poll::Ready(None)
763                    } else {
764                        trace!("body waits");
765                        //store waker
766                        slab.waker = Some(cx.waker().clone());
767                        Poll::Pending
768                    }
769                }
770            }
771        }
772    }
773}
774
775impl InnerConnection {
776    /// Something happened. We are done with everything
777    fn notify_everyone(&mut self) {
778        for (rid, mpxs) in self.running_requests.iter_mut() {
779            if let Some(waker) = mpxs.waker.take() {
780                waker.wake()
781            }
782            if mpxs.aborted {
783                continue;
784            }
785            if !mpxs.ended {
786                error!("body #{} not done", rid + 1);
787            }
788            mpxs.ended = true;
789        }
790    }
791    fn parse_and_distribute(
792        data: &mut Bytes,
793        running_requests: &mut Slab<FCGIRequest>,
794        fcgi_parser: &mut fastcgi::RecordReader,
795    ) {
796        //trace!("parse {:?}", &data);
797        while let Some(r) = fcgi_parser.read(data) {
798            let (req_no, ovr) = r.get_request_id().overflowing_sub(1);
799            if ovr {
800                //req id 0
801                error!("got mgmt record");
802                continue;
803            }
804            debug!("record for #{}", req_no + 1);
805            if let Some(mpxs) = running_requests.get_mut(req_no as usize) {
806                match r.body {
807                    fastcgi::Body::EndRequest(status) => {
808                        match status.protocol_status {
809                            fastcgi::ProtoStatus::Complete => {
810                                info!("Req #{} ended with {}", req_no + 1, status.app_status)
811                            }
812                            //CANT_MPX_CONN => ,
813                            //TODO handle OVERLOADED
814                            _ => error!(
815                                "Req #{} ended with fcgi error {}",
816                                req_no + 1,
817                                status.protocol_status
818                            ),
819                        };
820                        mpxs.ended = true;
821                        if let Some(waker) = mpxs.waker.take() {
822                            waker.wake()
823                        }
824                        if mpxs.aborted {
825                            // fcgi server is also done with it
826                            running_requests.remove(req_no as usize);
827                        }
828                    }
829                    _ if mpxs.aborted => {
830                        //TODO send abort
831                        continue;
832                    }
833                    fastcgi::Body::StdOut(s) => {                        
834                        if log_enabled!(Trace) {
835                            let print = if s.len() > 50 {
836                                format!(
837                                    "({}) {:?}...{:?}",
838                                    s.len(),
839                                    s.slice(..21),
840                                    s.slice(s.len() - 21..)
841                                )
842                            } else {
843                                format!("{:?}", s)
844                            };
845                            trace!("FCGI stdout: {}", print);
846                        }
847                        if s.has_remaining() {
848                            mpxs.buf.push(s);
849                            if let Some(waker) = mpxs.waker.take() {
850                                waker.wake();
851                            }
852                        }
853                    }
854                    fastcgi::Body::StdErr(s) => {
855                        error!("FCGI #{} Err: {:?}", req_no + 1, s);
856                    }
857                    _ => {
858                        warn!("type?");
859                    }
860                }
861            } else {
862                debug!("not a pending req ID");
863                //TODO send abort
864            }
865        }
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::*;
872    use crate::client::tests::local_socket_pair;
873    use http_body::SizeHint;
874    use std::collections::{HashMap, VecDeque};
875    use tokio::{
876        io::{AsyncReadExt, AsyncWriteExt},
877        net::TcpListener,
878        runtime::Builder,
879    };
880
881    struct TestBod {
882        l: VecDeque<Bytes>,
883    }
884    impl Body for TestBod {
885        type Data = Bytes;
886        type Error = IoError;
887        fn poll_frame(
888            mut self: Pin<&mut Self>,
889            _cx: &mut Context,
890        ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
891            let Self { ref mut l } = *self;
892            match l.pop_front() {
893                None => Poll::Ready(None),
894                Some(i) => Poll::Ready(Some(Ok(Frame::data(i)))),
895            }
896        }
897        fn size_hint(&self) -> SizeHint {
898            let mut sh = SizeHint::default();
899            let s: usize = self.l.iter().map(|b| b.remaining()).sum();
900            sh.set_exact(s as u64);
901            sh
902        }
903    }
904    fn init_log() {
905        let mut builder = pretty_env_logger::formatted_timed_builder();
906        builder.is_test(true);
907        if let Ok(s) = ::std::env::var("RUST_LOG") {
908            builder.parse_filters(&s);
909        }
910        let _ = builder.try_init();
911    }
912
913    #[test]
914    fn simple_get() {
915        init_log();
916        // Create the runtime
917        let rt = Builder::new_current_thread().enable_all().build().unwrap();
918        async fn mock_app(app_listener: TcpListener) {
919            let (mut app_socket, _) = app_listener.accept().await.unwrap();
920            let mut buf = BytesMut::with_capacity(4096);
921            app_socket.read_buf(&mut buf).await.unwrap();
922            trace!("app read {:?}", buf);
923            let to_php = b"\x01\x01\0\x01\0\x08\0\0\0\x01\x01\0\0\0\0\0\x01\x04\0\x01\0i\x07\0\x0f\x1cSCRIPT_FILENAME/home/daniel/Public/test.php\x0c\x05QUERY_STRINGlol=1\x0e\x03REQUEST_METHODGET\x0b\tHTTP_ACCEPTtext/html\x01\x04\0\x01\0i\x07\x01\x04\0\x01\0\0\0\0\x01\x05\0\x01\0\0\0\0";
924            assert_eq!(buf, Bytes::from(&to_php[..]));
925            trace!("app answers on get");
926            let from_php = b"\x01\x07\0\x01\0W\x01\0PHP Fatal error:  Kann nicht durch 0 teilen in /home/daniel/Public/test.php on line 14\n\0\x01\x06\0\x01\x01\xf7\x01\0Status: 404 Not Found\r\nX-Powered-By: PHP/7.3.16\r\nX-Authenticate: NTLM\r\nContent-type: text/html; charset=UTF-8\r\n\r\n<html><body>\npub\n<pre>Array\n(\n)\nArray\n(\n    [lol] => 1\n)\nArray\n(\n    [lol] => 1\n)\nArray\n(\n    [HTTP_accept] => text/html\n    [REQUEST_METHOD] => GET\n    [QUERY_STRING] => lol=1\n    [SCRIPT_NAME] => /test\n    [SCRIPT_FILENAME] => /home/daniel/Public/test.php\n    [FCGI_ROLE] => RESPONDER\n    [PHP_SELF] => /test\n    [REQUEST_TIME_FLOAT] => 1587740954.2741\n    [REQUEST_TIME] => 1587740954\n)\n\0\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
927            app_socket
928                .write_buf(&mut Bytes::from(&from_php[..]))
929                .await
930                .unwrap();
931        }
932
933        async fn con() {
934            let (app_listener, a) = local_socket_pair().await.unwrap();
935            let m = tokio::spawn(mock_app(app_listener));
936
937            let fcgi_con = Connection::connect(&a, 1).await.unwrap();
938            trace!("new connection obj");
939            let b = TestBod { l: VecDeque::new() };
940            let req = Request::get("/test?lol=1")
941                .header("Accept", "text/html")
942                .body(b)
943                .unwrap();
944            trace!("new req obj");
945            let mut params = HashMap::new();
946            params.insert(
947                &b"SCRIPT_FILENAME"[..],
948                &b"/home/daniel/Public/test.php"[..],
949            );
950            let mut res = fcgi_con.forward(req, params).await.expect("forward failed");
951            trace!("got res obj");
952            assert_eq!(res.status(), StatusCode::NOT_FOUND);
953            assert_eq!(
954                res.headers()
955                    .get("X-Powered-By")
956                    .expect("powered by header missing"),
957                "PHP/7.3.16"
958            );
959            let read1 = res.data().await;
960            assert!(read1.is_some());
961            let read1 = read1.unwrap();
962            assert!(read1.is_ok());
963            if let Ok(d) = read1 {
964                let body = b"<html><body>\npub\n<pre>Array\n(\n)\nArray\n(\n    [lol] => 1\n)\nArray\n(\n    [lol] => 1\n)\nArray\n(\n    [HTTP_accept] => text/html\n    [REQUEST_METHOD] => GET\n    [QUERY_STRING] => lol=1\n    [SCRIPT_NAME] => /test\n    [SCRIPT_FILENAME] => /home/daniel/Public/test.php\n    [FCGI_ROLE] => RESPONDER\n    [PHP_SELF] => /test\n    [REQUEST_TIME_FLOAT] => 1587740954.2741\n    [REQUEST_TIME] => 1587740954\n)\n";
965                assert_eq!(d, &body[..]);
966            }
967            let read2 = res.data().await;
968            assert!(read2.is_none());
969            m.await.unwrap();
970        }
971        rt.block_on(con());
972    }
973    #[test]
974    fn app_answer_split_mid_record() {
975        //flup did this once
976        init_log();
977        // Create the runtime
978        let rt = Builder::new_current_thread().enable_all().build().unwrap();
979        async fn mock_app(app_listener: TcpListener) {
980            let (mut app_socket, _) = app_listener.accept().await.unwrap();
981            let mut buf = BytesMut::with_capacity(4096);
982            app_socket.read_buf(&mut buf).await.unwrap();
983            trace!("app read {:?}", buf);
984            trace!("app answers on get");
985            let from_flup = b"\x01\x06\0\x01\0@\0\0Status: 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\n\x01\x06\0\x01\0\r\x03\0Hello World!\n";
986            app_socket
987                .write_buf(&mut Bytes::from(&from_flup[..]))
988                .await
989                .unwrap();
990        }
991
992        async fn con() {
993            let (app_listener, a) = local_socket_pair().await.unwrap();
994            let m = tokio::spawn(mock_app(app_listener));
995
996            let fcgi_con = Connection::connect(&a, 1).await.unwrap();
997            trace!("new connection obj");
998            let b = TestBod { l: VecDeque::new() };
999            let req = Request::get("/").body(b).unwrap();
1000            trace!("new req obj");
1001            let params: HashMap<Bytes, Bytes> = HashMap::new();
1002            let mut res = fcgi_con.forward(req, params).await.expect("forward failed");
1003            trace!("got res obj");
1004            let read1 = res.data().await;
1005            assert!(read1.is_some());
1006            let read1 = read1.unwrap();
1007            assert!(read1.is_ok());
1008            if let Ok(d) = read1 {
1009                let body = b"Hello World!\n";
1010                assert_eq!(d, &body[..]);
1011            }
1012            m.await.unwrap();
1013        }
1014        rt.block_on(con());
1015    }
1016
1017    #[test]
1018    fn app_http_headers_split() {
1019        init_log();
1020        // Create the runtime
1021        let rt = Builder::new_current_thread().enable_all().build().unwrap();
1022        async fn mock_app(app_listener: TcpListener) {
1023            let (mut app_socket, _) = app_listener.accept().await.unwrap();
1024            let mut buf = BytesMut::with_capacity(4096);
1025            app_socket.read_buf(&mut buf).await.unwrap();
1026            trace!("app read {:?}", buf);
1027            trace!("app answers on get");
1028            let from_flup = b"\x01\x06\0\x01\0\x1e\0\0Status: 200 OK\r\nContent-Type: ";
1029            app_socket
1030                .write_buf(&mut Bytes::from(&from_flup[..]))
1031                .await
1032                .unwrap();
1033            let from_flup = b"\x01\x06\0\x01\0\"\0\0text/plain\r\nContent-Length: 13\r\n\r\n\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
1034            app_socket
1035                .write_buf(&mut Bytes::from(&from_flup[..]))
1036                .await
1037                .unwrap();
1038        }
1039
1040        async fn con() {
1041            let (app_listener, a) = local_socket_pair().await.unwrap();
1042            let m = tokio::spawn(mock_app(app_listener));
1043
1044            let fcgi_con = Connection::connect(&a, 1).await.unwrap();
1045            trace!("new connection obj");
1046            let b = TestBod { l: VecDeque::new() };
1047            let req = Request::get("/").body(b).unwrap();
1048            trace!("new req obj");
1049            let params: HashMap<Bytes, Bytes> = HashMap::new();
1050            let mut res = fcgi_con.forward(req, params).await.expect("forward failed");
1051            trace!("got res obj");
1052            assert_eq!(res.status(), StatusCode::OK);
1053            assert_eq!(
1054                res.headers()
1055                    .get("Content-Length")
1056                    .expect("len header missing"),
1057                "13"
1058            );
1059            assert_eq!(
1060                res.headers()
1061                    .get("Content-Type")
1062                    .expect("type header missing"),
1063                "text/plain"
1064            );
1065
1066            let read1 = res.data().await;
1067            assert!(read1.is_none());
1068            m.await.unwrap();
1069        }
1070        rt.block_on(con());
1071    }
1072    #[test]
1073    fn simple_post() {
1074        init_log();
1075        // Create the runtime
1076        let rt = Builder::new_current_thread().enable_all().build().unwrap();
1077        async fn mock_app(app_listener: TcpListener) {
1078            let (mut app_socket, _) = app_listener.accept().await.unwrap();
1079            let mut buf = BytesMut::with_capacity(4096);
1080            app_socket.read_buf(&mut buf).await.unwrap();
1081            trace!("app read {:?}", buf);
1082            let to_php = b"\x01\x01\0\x01\0\x08\0\0\0\x01\x01\0\0\0\0\0\x01\x04\0\x01\0\x81\x07\0\x0f\x1cSCRIPT_FILENAME/home/daniel/Public/test.php\x0c\0QUERY_STRING\x0e\x04REQUEST_METHODPOST\x0c\x13CONTENT_TYPEmultipart/form-data\x0e\x01CONTENT_LENGTH8\x01\x04\0\x01\0\x81\x07\x01\x04\0\x01\0\0\0\0\x01\x05\0\x01\0\x08\0\0test=123\x01\x05\0\x01\0\0\0\0";
1083            assert_eq!(buf, Bytes::from(&to_php[..]));
1084            trace!("app answers on get");
1085            let from_php = b"\x01\x06\0\x01\x00\x23\x05\0Status: 201 Created\r\n\r\n<html><body>#+#+#\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
1086            app_socket
1087                .write_buf(&mut Bytes::from(&from_php[..]))
1088                .await
1089                .unwrap();
1090        }
1091
1092        async fn con() {
1093            let (app_listener, a) = local_socket_pair().await.unwrap();
1094            let m = tokio::spawn(mock_app(app_listener));
1095
1096            let fcgi_con = Connection::connect(&a, 1).await.unwrap();
1097            trace!("new connection obj");
1098            let mut l = VecDeque::new();
1099            l.push_back(Bytes::from(&"test=123"[..]));
1100            let b = TestBod { l };
1101
1102            let req = Request::post("/test")
1103                .header("Content-Length", "8")
1104                .header("Content-Type", "multipart/form-data")
1105                .body(b)
1106                .unwrap();
1107            trace!("new req obj");
1108            let mut params = HashMap::new();
1109            params.insert(
1110                &b"SCRIPT_FILENAME"[..],
1111                &b"/home/daniel/Public/test.php"[..],
1112            );
1113            let mut res = fcgi_con.forward(req, params).await.expect("forward failed");
1114            trace!("got res obj");
1115            assert_eq!(res.status(), StatusCode::CREATED);
1116            let read1 = res.data().await;
1117            assert!(read1.is_some());
1118            let read1 = read1.unwrap();
1119            assert!(read1.is_ok());
1120            if let Ok(d) = read1 {
1121                let body = b"<html><body>";
1122                assert_eq!(d, &body[..]);
1123            }
1124            let read2 = res.data().await;
1125            assert!(read2.is_none());
1126            m.await.unwrap();
1127        }
1128        rt.block_on(con());
1129    }
1130    #[test]
1131    fn long_header() {
1132        init_log();
1133        // Create the runtime
1134        let rt = Builder::new_current_thread().enable_all().build().unwrap();
1135        async fn mock_app(app_listener: TcpListener) {
1136            let (mut app_socket, _) = app_listener.accept().await.unwrap();
1137            let mut buf = BytesMut::with_capacity(4096);
1138            app_socket.read_buf(&mut buf).await.unwrap();
1139            trace!("app read {:?}", buf);
1140            let to_php = b"\x01\x01\0\x01\0\x08\0\0\0\x01\x01\0\0\0\0\0\x01\x04\0\x01\0\xb8\0\0\x0c\0QUERY_STRING\x0e\x03REQUEST_METHODGET\x0b\x80\0\0\x87HTTP_ACCEPTtext/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\x01\x04\0\x01\0\0\0\0\x01\x05\0\x01\0\0\0\0";
1141            assert_eq!(buf, Bytes::from(&to_php[..]));
1142            trace!("app answers on get");
1143            let from_php = b"\x01\x06\0\x01\0\x1b\x05\0Status: 404 Not Found\r\n\r\n\r\n\x01\x06\0\x01\0\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
1144            app_socket
1145                .write_buf(&mut Bytes::from(&from_php[..]))
1146                .await
1147                .unwrap();
1148        }
1149
1150        async fn con() {
1151            let (app_listener, a) = local_socket_pair().await.unwrap();
1152            let m = tokio::spawn(mock_app(app_listener));
1153
1154            let fcgi_con = Connection::connect(&a, 1).await.unwrap();
1155            trace!("new connection obj");
1156            let b = TestBod { l: VecDeque::new() };
1157            let req = Request::get("/")
1158                .header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7")
1159                .body(b)
1160                .unwrap();
1161            trace!("new req obj");
1162            let params: HashMap<Bytes, Bytes> = HashMap::new();
1163            let res = fcgi_con.forward(req, params).await.expect("forward failed");
1164            trace!("got res obj");
1165            assert_eq!(res.status(), StatusCode::NOT_FOUND);
1166            m.await.unwrap();
1167        }
1168        rt.block_on(con());
1169    }
1170}