use tokio::io::AsyncRead;
use tokio::prelude::*;
use std::marker::Unpin;
use bytes::{BytesMut, Bytes, BufMut, Buf};
use http::{Request, Response, StatusCode, header::HeaderMap, header::CONTENT_LENGTH, header::AUTHORIZATION, header::CONTENT_TYPE};
use http_body::Body;
use slab::Slab;
use log::{trace, info, error, debug, warn, log_enabled, Level::Trace};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::io::{Error as IoError, ErrorKind};
use std::error::Error;
use std::task::Waker;
use tokio::sync::{Mutex, Semaphore, OwnedSemaphorePermit};
use std::sync::Arc;
use std::future::Future;
use std::iter::{Extend, IntoIterator};
use std::ops::Drop;
use crate::stream::{FCGIAddr, Stream};
use crate::bufvec::BufList;
use crate::fastcgi;
use crate::httpparse::{parse, ParseResult};
struct FCGIBody
{
con: Arc<Mutex<InnerConnection>>,
rid: u16,
done: bool,
was_returned: bool
}
struct FCGIRequest
{
buf: BufList<Bytes>,
waker: Option<Waker>,
ended: bool,
_permit: OwnedSemaphorePermit
}
struct InnerConnection
{
io: Stream,
running_requests: Slab<FCGIRequest>,
con_buf: Option<Bytes>,
fcgi_parser: fastcgi::RecordReader
}
pub struct Connection
{
inner: Arc<Mutex<InnerConnection>>,
sem: Arc<Semaphore>,
addr: FCGIAddr
}
impl Connection
{
pub async fn connect(addr: &FCGIAddr, max_req_per_con: u16) -> Result<Connection, Box<dyn Error>> {
Ok(Connection {
inner: Arc::new(Mutex::new(InnerConnection{
io: Stream::connect(addr).await?,
running_requests: Slab::with_capacity(max_req_per_con as usize),
con_buf: None,
fcgi_parser: fastcgi::RecordReader::new()
})),
sem: Arc::new(Semaphore::new(max_req_per_con as usize)),
addr: addr.clone()
})
}
pub fn is_ready(&self) -> bool {
self.sem.available_permits() > 0
}
pub async fn close(self) -> Result<(), IoError> {
let mut mut_inner = self.inner.lock().await;
mut_inner.io.shutdown().await?;
mut_inner.notify_everyone();
Ok(())
}
pub async fn forward<B, I>(&self,
req: Request<B>,
dyn_headers: I)
-> Result<Response<impl Body<Data = Bytes,Error = IoError>>, IoError>
where B: Body+Unpin,
I: IntoIterator<Item = (Bytes, Bytes)>
{
let rid: u16;
{
info!("new request pending");
let _permit = self.sem.clone().acquire_owned().await;
let meta = FCGIRequest {
buf: BufList::new(),
waker: None,
ended: false,
_permit
};
info!("wait for lock");
let mut mut_inner = self.inner.lock().await;
if mut_inner.check_alive().await?==false {
info!("reconnect...");
if let Err(e) = mut_inner.io.shutdown().await {
error!("shutdown old con: {}", e);
}
mut_inner.notify_everyone();
mut_inner.io = Stream::connect(&self.addr).await?;
mut_inner.fcgi_parser = fastcgi::RecordReader::new();
info!("reconnected");
}
rid = (mut_inner.running_requests.insert(meta)+1) as u16;
info!("started req #{}", rid);
let mut wbuf = BufList::new();
fastcgi::BeginRequestBody::new(fastcgi::BeginRequestBody::RESPONDER,
fastcgi::BeginRequestBody::KEEP_CONN,
rid).append(&mut wbuf);
let mut nv = fastcgi::NVBodyList::new();
nv.extend(dyn_headers);
let query = match req.uri().query() {
Some(query) => BytesMut::from(query.as_bytes()).freeze(),
None => Bytes::new()
};
nv.add(fastcgi::NameValuePair::new(Bytes::from(&b"QUERY_STRING"[..]),query));
let method = BytesMut::from(req.method().as_str().as_bytes()).freeze();
nv.add(fastcgi::NameValuePair::new(Bytes::from(&b"REQUEST_METHOD"[..]),method));
if let Some(value) = req.headers().get(CONTENT_TYPE) {
nv.add(fastcgi::NameValuePair::new(
BytesMut::from(CONTENT_TYPE.as_str().as_bytes()).freeze(),
BytesMut::from(value.as_bytes()).freeze()
));
}
let len = req.headers().get(CONTENT_LENGTH);
if let Some(value) = len {
nv.add(fastcgi::NameValuePair::new(
BytesMut::from(CONTENT_LENGTH.as_str().as_bytes()).freeze(),
BytesMut::from(value.as_bytes()).freeze()
));
}
let skip = [AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE];
for (key, value) in req.headers().iter() {
if skip.iter().find(|x| x == key).is_some() {
continue;
}
let mut k = BytesMut::with_capacity(key.as_str().len()+5);
k.put(&b"HTTP_"[..]);
k.put(key.as_str().as_bytes());
let v = BytesMut::from(value.as_bytes()).freeze();
let p = fastcgi::NameValuePair::new(k.freeze(),v);
nv.add(p);
}
nv.append_records(fastcgi::Record::PARAMS, rid, &mut wbuf);
fastcgi::NVBody::new().to_record(fastcgi::Record::PARAMS, rid).append(&mut wbuf);
mut_inner.io.write_buf(&mut wbuf).await?;
trace!("sent header");
if let Some(value) = len {
let mut len: usize = if let Ok(vstr) = value.to_str() {
vstr.parse().unwrap_or(0)
}else{
0
};
let mut body: B = req.into_body();
while let Some(buf) = body.data().await {
if let Ok(mut b) = buf {
let mut b = b.to_bytes();
len = len.saturating_sub(b.len());
while b.remaining() > 0 {
fastcgi::STDINBody::new(rid, &mut b).append(&mut wbuf);
}
mut_inner.io.write_buf(&mut wbuf).await?;
}
}
if len > 0 {
error!("body to short. abort");
fastcgi::Record::abort(rid).append(&mut wbuf);
}
}
fastcgi::STDINBody::new(rid, &mut Bytes::new()).append(&mut wbuf);
mut_inner.io.write_buf(&mut wbuf).await?;
debug!("sent req body");
}
let mut fcgibody = FCGIBody
{
con: Arc::clone(&self.inner),
rid: (rid-1),
done: false,
was_returned: false
};
let mut rb = Response::builder();
let mut rheaders = rb.headers_mut().unwrap();
let mut status = StatusCode::OK;
let mut buf: Option<Bytes> = None;
while let Some(rbuf) = fcgibody.data().await {
if let Ok(mut b) = rbuf {
if let Some(left) = buf.take() {
let mut c = BytesMut::with_capacity(left.len()+b.len());
c.put(left);
c.put(b);
b = c.freeze();
}
match parse(b.clone(), &mut rheaders){
ParseResult::Ok(bodydata) => {
trace!("read body fragment: {:?}", &bodydata);
if bodydata.has_remaining() {
let mut mut_inner = self.inner.lock().await;
mut_inner.running_requests[fcgibody.rid as usize].buf.push(bodydata);
}
if let Some(stat) = rheaders.get("Status") {
if stat.len() >= 3 {
if let Ok(s) = StatusCode::from_bytes(&stat.as_bytes()[..3][..]) {
status = s;
}
}
}
break;
},
ParseResult::Pending => {
buf = Some(b);
trace!("header pending");
}
ParseResult::Err => {
status = StatusCode::INTERNAL_SERVER_ERROR;
break;
}
}
}else{
error!("{:?}", rbuf);
}
}
fcgibody.was_returned = true;
debug!("resp header parsing done");
match rb.status(status).body(fcgibody) {
Ok(v) => Ok(v),
Err(_) => {
unreachable!();
}
}
}
}
impl Future for InnerConnection {
type Output = Option<Result<(), IoError>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<(), IoError>>>
{
self.poll_resp(cx)
}
}
struct CheckAlive<'a>(&'a mut InnerConnection);
impl<'a> Future for CheckAlive<'a> {
type Output = Result<bool, IoError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Result<bool, IoError>>
{
Poll::Ready(match Pin::new(&mut *self.0).poll_resp(cx) {
Poll::Ready(None) => Ok(false),
Poll::Ready(Some(Err(e))) => {
error!("allive: {:?}", e);
if e.kind() == ErrorKind::NotConnected {
Ok(false)
}else{
Err(e)
}
},
_ => Ok(true)
})
}
}
impl InnerConnection
{
fn check_alive(&mut self) -> CheckAlive {
CheckAlive(self)
}
fn poll_resp(
mut self: Pin<&mut Self>,
cx: &mut Context
) -> Poll<Option<Result<(), IoError>>>
{
let Self {
ref mut io,
ref mut running_requests,
ref mut con_buf,
ref mut fcgi_parser,
} = *self;
let mut rbuf = BytesMut::with_capacity(4096);
match Pin::new(io).poll_read_buf(cx, &mut rbuf) {
Poll::Ready(Ok(0)) => {info!("connection closed");self.notify_everyone();Poll::Ready(None)},
Poll::Ready(Ok(size)) => {
let mut data = rbuf.freeze().slice(..size);
if log_enabled!(Trace) {
let print = if data.len() > 50 {
format!("({}) {:?}...{:?}", data.len(), data.slice(..21), data.slice(data.len()-21..))
}else{
format!("{:?}", data)
};
trace!("read conn data {}", print);
}
if let Some(left) = con_buf.take() {
let mut c = BytesMut::with_capacity(left.len()+data.len());
c.put(left);
c.put(data);
data = c.freeze();
trace!("data with leftover {:?}", data);
}
InnerConnection::parse_and_distribute(&mut data, running_requests, fcgi_parser);
if data.remaining() > 0{
*con_buf = Some(data);
}
Poll::Ready(Some(Ok(())))
},
Poll::Ready(Err(e)) => {error!("Err {}",e);self.notify_everyone();Poll::Ready(Some(Err(e)))},
Poll::Pending => Poll::Pending,
}
}
}
impl Drop for FCGIRequest {
fn drop(&mut self) {
debug!("Req mplex id free");
}
}
impl Drop for FCGIBody {
fn drop(&mut self) {
if self.done {
return;
}
debug!("Dropping FCGIBody #{}!", self.rid+1);
match self.con.try_lock() {
Ok(mut mut_inner) => {
let rid = self.rid as usize;
if mut_inner.running_requests.contains(rid) {
mut_inner.running_requests.remove(rid);
}
},
Err(e) => error!("{}",e),
}
}
}
impl Body for FCGIBody
{
type Data = Bytes;
type Error = IoError;
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context
) -> Poll<Option<Result<Self::Data, Self::Error>>>
{
let Self {
ref con,
rid,
ref mut done,
was_returned
} = *self;
if *done {
debug!("body #{} is already done", rid+1);
return Poll::Ready(None);
}
trace!("read resp body");
let fut = con.lock();
match Box::pin(fut).as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(mut mut_inner) => {
let con_stat = Pin::new(&mut *mut_inner).poll_resp(cx);
let slab = match mut_inner.running_requests.get_mut(rid as usize) {
Some(slab) => slab,
None => {
warn!("#{} not in slab", rid+1);
*done = true;
return Poll::Ready(None);
},
};
if slab.buf.remaining() >= 1 {
trace!("body #{} has data and is {} closed", rid+1, slab.ended);
let retdata = Poll::Ready(Some(Ok(slab.buf.oldest().unwrap())));
if was_returned && slab.ended && slab.buf.remaining() < 1 {
trace!("next read on #{} will not have data -> release", rid+1);
mut_inner.running_requests.remove(rid as usize);
*done = true;
}
retdata
}else{
let req_done = slab.ended;
if req_done {
debug!("body #{} is done", rid+1);
if was_returned {
mut_inner.running_requests.remove(rid as usize);
*done = true;
}else{
warn!("#{} closed before handover", rid+1);
}
Poll::Ready(None)
}else{
trace!("body waits");
slab.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context
) -> Poll<Result<Option<HeaderMap>, Self::Error>>
{
Poll::Ready(Ok(None))
}
}
impl InnerConnection {
fn notify_everyone(&mut self) {
for (rid, mpxs) in self.running_requests.iter_mut() {
if let Some(waker) = mpxs.waker.take() {
waker.wake()
}
if !mpxs.ended {
error!("body #{} not done", rid+1);
}
mpxs.ended = true;
}
}
fn parse_and_distribute(data: &mut Bytes, running_requests: &mut Slab<FCGIRequest>, fcgi_parser: &mut fastcgi::RecordReader) -> Option<Bytes> {
while let Some(r) = fcgi_parser.read(data) {
let (req_no, ovr) = r.get_request_id().overflowing_sub(1);
if ovr {
error!("got mgmt record");
continue;
}
debug!("record for #{}", req_no+1);
if let Some(mpxs) = running_requests.get_mut(req_no as usize) {
match r.body {
fastcgi::Body::StdOut(s) => {
if log_enabled!(Trace) {
let print = if s.len() > 50 {
format!("({}) {:?}...{:?}", s.len(), s.slice(..21), s.slice(s.len()-21..))
}else{
format!("{:?}", s)
};
trace!("FCGI stdout: {}", print);
}
if s.has_remaining() {
mpxs.buf.push(s);
if let Some(waker) = mpxs.waker.take() {
waker.wake();
}
}
},
fastcgi::Body::StdErr(s) => {error!("FCGI #{} Err: {:?}", req_no+1, s);}
fastcgi::Body::EndRequest(status) => {
match status.protocol_status {
fastcgi::EndRequestBody::REQUEST_COMPLETE => info!("Req #{} ended with {}", req_no+1, status.app_status),
_ => error!("Req #{} ended with fcgi error {}", req_no+1, status.protocol_status)
};
mpxs.ended = true;
if let Some(waker) = mpxs.waker.take() {
waker.wake()
}
},
_ => {warn!("type?");}
}
}else{
debug!("not a pending red ID");
}
};
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::runtime::Runtime;
use tokio::net::TcpListener;
use std::collections::{VecDeque,HashMap};
use std::net::SocketAddr;
struct TestBod{
l: VecDeque<Bytes>
}
impl Body for TestBod{
type Data = Bytes;
type Error = IoError;
fn poll_data(
mut self: Pin<&mut Self>,
_cx: &mut Context
) -> Poll<Option<Result<Self::Data, Self::Error>>>
{
let Self {
ref mut l
} = *self;
match l.pop_front()
{
None => Poll::Ready(None),
Some(i) => Poll::Ready(Some(Ok(i)))
}
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context
) -> Poll<Result<Option<HeaderMap>, Self::Error>>
{
Poll::Ready(Ok(None))
}
}
#[test]
fn simple_get() {
let mut rt = Runtime::new().unwrap();
async fn mock_app(mut app_listener: TcpListener) {
let (mut app_socket, _) = app_listener.accept().await.unwrap();
let mut buf = BytesMut::with_capacity(4096);
app_socket.read_buf(&mut buf).await.unwrap();
trace!("app read {:?}", buf);
let to_php = b"\x01\x01\0\x01\0\x08\0\0\0\x01\x01\0\0\0\0\0\x01\x04\0\x01\0i\x07\0\x0f\x1cSCRIPT_FILENAME/home/daniel/Public/test.php\x0c\x05QUERY_STRINGlol=1\x0e\x03REQUEST_METHODGET\x0b\tHTTP_accepttext/html\x01\x04\0\x01\0i\x07\x01\x04\0\x01\0\0\0\0\x01\x05\0\x01\0\0\0\0";
assert_eq!(buf.to_bytes(), &to_php[..]);
trace!("app answers on get");
let from_php = b"\x01\x07\0\x01\0W\x01\0PHP Fatal error: Kann nicht durch 0 teilen in /home/daniel/Public/test.php on line 14\n\0\x01\x06\0\x01\x01\xf7\x01\0Status: 404 Not Found\r\nX-Powered-By: PHP/7.3.16\r\nX-Authenticate: NTLM\r\nContent-type: text/html; charset=UTF-8\r\n\r\n<html><body>\npub\n<pre>Array\n(\n)\nArray\n(\n [lol] => 1\n)\nArray\n(\n [lol] => 1\n)\nArray\n(\n [HTTP_accept] => text/html\n [REQUEST_METHOD] => GET\n [QUERY_STRING] => lol=1\n [SCRIPT_NAME] => /test\n [SCRIPT_FILENAME] => /home/daniel/Public/test.php\n [FCGI_ROLE] => RESPONDER\n [PHP_SELF] => /test\n [REQUEST_TIME_FLOAT] => 1587740954.2741\n [REQUEST_TIME] => 1587740954\n)\n\0\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
app_socket.write_buf(&mut Bytes::from(&from_php[..])).await.unwrap();
}
async fn con() {
let a: SocketAddr = "127.0.0.1:59000".parse().unwrap();
let app_listener = TcpListener::bind(a).await.unwrap();
let m = tokio::spawn(mock_app(app_listener));
let fcgi_con = Connection::connect(&"127.0.0.1:59000".parse().unwrap(), 1).await.unwrap();
trace!("new connection obj");
let b = TestBod{
l: VecDeque::new()
};
let req = Request::get("/test?lol=1").header("Accept", "text/html").body(b).unwrap();
trace!("new req obj");
let mut params = HashMap::new();
params.insert(
Bytes::from(&b"SCRIPT_FILENAME"[..]),
Bytes::from(&b"/home/daniel/Public/test.php"[..]),
);
let mut res = fcgi_con.forward(req,params).await.expect("forward failed");
trace!("got res obj");
assert_eq!(res.status(), StatusCode::NOT_FOUND);
assert_eq!(res.headers().get("X-Powered-By").expect("powered by header missing"), "PHP/7.3.16");
let read1 = res.data().await;
assert!(read1.is_some());
let read1 = read1.unwrap();
assert!(read1.is_ok());
if let Ok(mut d) = read1 {
let body = b"<html><body>\npub\n<pre>Array\n(\n)\nArray\n(\n [lol] => 1\n)\nArray\n(\n [lol] => 1\n)\nArray\n(\n [HTTP_accept] => text/html\n [REQUEST_METHOD] => GET\n [QUERY_STRING] => lol=1\n [SCRIPT_NAME] => /test\n [SCRIPT_FILENAME] => /home/daniel/Public/test.php\n [FCGI_ROLE] => RESPONDER\n [PHP_SELF] => /test\n [REQUEST_TIME_FLOAT] => 1587740954.2741\n [REQUEST_TIME] => 1587740954\n)\n";
assert_eq!(d.to_bytes(), &body[..] );
}
let read2 = res.data().await;
assert!(read2.is_none());
m.await.unwrap();
}
rt.block_on(con());
}
#[test]
fn app_answer_split_mid_record() {
let mut rt = Runtime::new().unwrap();
async fn mock_app(mut app_listener: TcpListener) {
let (mut app_socket, _) = app_listener.accept().await.unwrap();
let mut buf = BytesMut::with_capacity(4096);
app_socket.read_buf(&mut buf).await.unwrap();
trace!("app read {:?}", buf);
trace!("app answers on get");
let from_flup = b"\x01\x06\0\x01\0@\0\0Status: 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\n\x01\x06\0\x01\0\r\x03\0Hello World!\n";
app_socket.write_buf(&mut Bytes::from(&from_flup[..])).await.unwrap();
}
async fn con() {
let a: SocketAddr = "127.0.0.1:59001".parse().unwrap();
let app_listener = TcpListener::bind(a).await.unwrap();
let m = tokio::spawn(mock_app(app_listener));
let fcgi_con = Connection::connect(&"127.0.0.1:59001".parse().unwrap(), 1).await.unwrap();
trace!("new connection obj");
let b = TestBod{
l: VecDeque::new()
};
let req = Request::get("/").body(b).unwrap();
trace!("new req obj");
let params = HashMap::new();
let mut res = fcgi_con.forward(req,params).await.expect("forward failed");
trace!("got res obj");
let read1 = res.data().await;
assert!(read1.is_some());
let read1 = read1.unwrap();
assert!(read1.is_ok());
if let Ok(mut d) = read1 {
let body = b"Hello World!\n";
assert_eq!(d.to_bytes(), &body[..] );
}
m.await.unwrap();
}
rt.block_on(con());
}
#[test]
fn app_http_headers_split() {
let mut rt = Runtime::new().unwrap();
async fn mock_app(mut app_listener: TcpListener) {
let (mut app_socket, _) = app_listener.accept().await.unwrap();
let mut buf = BytesMut::with_capacity(4096);
app_socket.read_buf(&mut buf).await.unwrap();
trace!("app read {:?}", buf);
trace!("app answers on get");
let from_flup = b"\x01\x06\0\x01\0\x1e\0\0Status: 200 OK\r\nContent-Type: ";
app_socket.write_buf(&mut Bytes::from(&from_flup[..])).await.unwrap();
let from_flup = b"\x01\x06\0\x01\0\"\0\0text/plain\r\nContent-Length: 13\r\n\r\n\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
app_socket.write_buf(&mut Bytes::from(&from_flup[..])).await.unwrap();
}
async fn con() {
let a: SocketAddr = "127.0.0.1:59002".parse().unwrap();
let app_listener = TcpListener::bind(a).await.unwrap();
let m = tokio::spawn(mock_app(app_listener));
let fcgi_con = Connection::connect(&"127.0.0.1:59002".parse().unwrap(), 1).await.unwrap();
trace!("new connection obj");
let b = TestBod{
l: VecDeque::new()
};
let req = Request::get("/").body(b).unwrap();
trace!("new req obj");
let params = HashMap::new();
let mut res = fcgi_con.forward(req,params).await.expect("forward failed");
trace!("got res obj");
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(res.headers().get("Content-Length").expect("len header missing"), "13");
assert_eq!(res.headers().get("Content-Type").expect("type header missing"), "text/plain");
let read1 = res.data().await;
assert!(read1.is_none());
m.await.unwrap();
}
rt.block_on(con());
}
}