1use bytes::{Buf, BufMut, Bytes, BytesMut};
44use http::{
45 header::AUTHORIZATION, header::CONTENT_LENGTH, header::CONTENT_TYPE,
46 Request, Response, StatusCode,
47};
48use http_body::{Body, Frame};
49use slab::Slab;
50use std::marker::Unpin;
51
52use log::{debug, error, info, log_enabled, trace, warn, Level::Trace};
53
54use std::error::Error;
55use std::future::Future;
56use std::io::{Error as IoError, ErrorKind};
57use std::iter::IntoIterator;
58use std::ops::Drop;
59use std::pin::Pin;
60use std::sync::Arc;
61use std::task::Waker;
62use std::task::{Context, Poll};
63use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
64
65use crate::bufvec::BufList;
66use crate::codec::{FCGIType, FCGIWriter};
67use crate::fastcgi;
68use crate::httpparse::{parse, ParseResult};
69use async_stream_connection::{Addr, Stream};
70use tokio::io::{AsyncBufRead, BufReader};
71
72struct FCGIBody {
77 con: Arc<Mutex<InnerConnection>>, rid: u16, done: bool, was_returned: bool, }
82struct FCGIRequest {
88 buf: BufList<Bytes>, waker: Option<Waker>, ended: bool, aborted: bool,
94 _permit: OwnedSemaphorePermit, }
96struct InnerConnection {
100 io: FCGIWriter<BufReader<Stream>>,
101 running_requests: Slab<FCGIRequest>,
102 fcgi_parser: fastcgi::RecordReader,
103}
104pub struct Connection {
108 inner: Arc<Mutex<InnerConnection>>,
109 sem: Arc<Semaphore>,
110 addr: Addr,
111 header_mul: MultiHeaderStrategy,
112 header_nl: HeaderMultilineStrategy,
113}
114#[derive(Copy, Clone)]
116pub enum MultiHeaderStrategy {
117 Combine,
119 OnlyFirst,
121 OnlyLast,
123}
124#[derive(Copy, Clone)]
126pub enum HeaderMultilineStrategy {
127 Ignore,
129 ReturnError,
131}
132impl Connection {
133 #[inline]
135 pub async fn connect(
136 addr: &Addr,
137 max_req_per_con: u16,
138 ) -> Result<Connection, Box<dyn Error>> {
139 Self::connect_with_strategy(
140 addr,
141 max_req_per_con,
142 MultiHeaderStrategy::OnlyFirst,
143 HeaderMultilineStrategy::Ignore,
144 )
145 .await
146 }
147 pub async fn connect_with_strategy(
149 addr: &Addr,
150 max_req_per_con: u16,
151 header_mul: MultiHeaderStrategy,
152 header_nl: HeaderMultilineStrategy,
153 ) -> Result<Connection, Box<dyn Error>> {
154 Ok(Connection {
155 inner: Arc::new(Mutex::new(InnerConnection {
156 io: FCGIWriter::new(BufReader::new(Stream::connect(addr).await?)),
157 running_requests: Slab::with_capacity(max_req_per_con as usize),
158 fcgi_parser: fastcgi::RecordReader::new(),
159 })),
160 sem: Arc::new(Semaphore::new(max_req_per_con as usize)),
161 addr: addr.clone(),
162 header_mul,
163 header_nl,
164 })
165 }
166
167 pub fn is_ready(&self) -> bool {
170 self.sem.available_permits() > 0
171 }
172
173 pub async fn close(self) -> Result<(), IoError> {
174 let mut mut_inner = self.inner.lock().await;
175 mut_inner.io.shutdown().await?;
176 mut_inner.notify_everyone();
177 Ok(())
178 }
179
180 const QUERY_STRING: &'static [u8] = b"QUERY_STRING";
181 const REQUEST_METHOD: &'static [u8] = b"REQUEST_METHOD";
182 const CONTENT_TYPE: &'static [u8] = b"CONTENT_TYPE";
183 const CONTENT_LENGTH: &'static [u8] = b"CONTENT_LENGTH";
184 const NULL: &'static [u8] = b"";
185 pub async fn forward<B, I, P1, P2>(
231 &self,
232 req: Request<B>,
233 dyn_headers: I,
234 ) -> Result<Response<impl Body<Data = Bytes, Error = IoError>>, IoError>
235 where
236 B: Body + Unpin,
237 I: IntoIterator<Item = (P1, P2)>,
238 P1: Buf,
239 P2: Buf,
240 {
241 info!("new request pending");
242 let _permit = self
243 .sem
244 .clone()
245 .acquire_owned()
246 .await
247 .map_err(|_e| IoError::new(ErrorKind::WouldBlock, ""))?;
248 let meta = FCGIRequest {
249 buf: BufList::new(),
250 waker: None,
251 ended: false,
252 aborted: false,
253 _permit,
254 };
255
256 info!("wait for lock");
257 let mut mut_inner = self.inner.lock().await;
258
259 if mut_inner.check_alive().await? == false {
260 info!("reconnect...");
262 if let Err(e) = mut_inner.io.shutdown().await {
263 error!("shutdown old con: {}", e);
264 }
265 mut_inner.notify_everyone();
266 mut_inner.io = FCGIWriter::new(BufReader::new(Stream::connect(&self.addr).await?));
267 mut_inner.fcgi_parser = fastcgi::RecordReader::new();
268 info!("reconnected");
269 }
270
271 let rid = (mut_inner.running_requests.insert(meta) + 1) as u16;
272 info!("started req #{}", rid);
273 let br = FCGIType::BeginRequest {
276 request_id: rid,
277 role: fastcgi::FastCGIRole::Responder,
278 flags: fastcgi::BeginRequestBody::KEEP_CONN,
279 };
280 mut_inner.io.encode(br).await?;
281 let mut kvw = mut_inner.io.kv_stream(rid, fastcgi::RecordType::Params);
283
284 kvw.extend(dyn_headers).await?;
285
286 match req.uri().query() {
287 Some(query) => kvw.add_kv(Self::QUERY_STRING, query.as_bytes()).await?, None => kvw.add_kv(Self::QUERY_STRING, Self::NULL).await?, }
290
291 kvw.add_kv(Self::REQUEST_METHOD, req.method().as_str().as_bytes())
292 .await?; let (parts, body) = req.into_parts();
295 let headers = parts.headers;
296
297 if let Some(value) = headers.get(CONTENT_TYPE) {
298 kvw.add_kv(Self::CONTENT_TYPE, value.as_bytes()).await?;
300 }
301
302 let len: Option<usize> = if Some(0) == body.size_hint().upper() {
303 None
305 } else {
306 let value = body.size_hint().lower(); kvw.add_kv(Self::CONTENT_LENGTH, value.to_string().as_bytes())
310 .await?;
311 Some(value as usize)
312 };
313 let skip = [AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE];
314 for key in headers.keys() {
316 if skip.iter().find(|x| x == key).is_some() {
317 continue;
319 }
320 let mut k = BytesMut::with_capacity(key.as_str().len() + 5);
326 k.put(&b"HTTP_"[..]);
327 for &c in key.as_str().as_bytes() {
328 let upper = match c {
329 b'-' => b'_',
330 lower_acii if b'a' <= lower_acii && lower_acii <= b'z' => {
331 lower_acii - (b'a' - b'A')
332 } s => s,
334 };
335 k.put_u8(upper);
336 }
337 let mut value_buf;
349 let value = match self.header_mul {
350 MultiHeaderStrategy::Combine => {
351 value_buf = BytesMut::with_capacity(512);
352 let mut first = false;
353 for v in headers.get_all(key).iter() {
354 if !first {
355 first = true;
356 } else {
357 value_buf.put_u8(b',');
358 }
359 let v = v.as_bytes();
360 value_buf.put_slice(v); }
362 value_buf.as_ref()
363 }
364 MultiHeaderStrategy::OnlyFirst => match headers.get(key) {
365 Some(v) => v.as_bytes(),
366 None => Self::NULL,
367 },
368 MultiHeaderStrategy::OnlyLast => match headers.get_all(key).iter().next_back() {
369 Some(v) => v.as_bytes(),
370 None => Self::NULL,
371 },
372 };
373 if let HeaderMultilineStrategy::ReturnError = self.header_nl {
374 if value.as_ref().contains(&b'\n') {
375 drop(kvw); mut_inner.abort_req(rid).await?; return Err(IoError::new(
378 ErrorKind::InvalidData,
379 "multiline headers are not allowed",
380 ));
381 }
382 }
383 kvw.add_kv(k, value).await?;
384 }
385 kvw.flush().await?;
387 trace!("sent header");
388 if let Some(len) = len {
391 drop(mut_inner); let (_, res) =
395 tokio::try_join!(self.send_body(rid, len, body), self.create_response(rid))?;
396 Ok(res)
397 } else {
398 mut_inner
400 .io
401 .flush_data_chunk(Self::NULL, rid, fastcgi::RecordType::StdIn)
402 .await?;
403 drop(mut_inner); self.create_response(rid).await
405 }
406 }
407 async fn send_body<B>(
409 &self,
410 request_id: u16,
411 mut len: usize,
412 mut body: B,
413 ) -> Result<(), IoError>
414 where
415 B: Body + Unpin,
416 {
417 while let Some(chunk) = body.data().await {
419 if let Ok(data) = chunk {
420 let s = data.remaining();
421 debug!("sent {} body bytes to app", s);
422 if s == 0 {
423 continue;
424 }
425 len -= s;
426 self.inner
427 .lock()
428 .await
429 .io
430 .flush_data_chunk(data, request_id, fastcgi::RecordType::StdIn)
431 .await?;
432 }
433 }
434 if len > 0 {
436 self.inner
437 .lock()
438 .await
439 .abort_req(request_id).await?;
440 return Err(std::io::Error::new(
441 std::io::ErrorKind::ConnectionAborted,
442 "body too short",
443 ));
444 }
445 self.inner
447 .lock()
448 .await
449 .io
450 .flush_data_chunk(Self::NULL, request_id, fastcgi::RecordType::StdIn)
451 .await?;
452
453 debug!("sent req body");
454 Ok(())
455 }
456 async fn create_response(
459 &self,
460 rid: u16,
461 ) -> Result<Response<impl Body<Data = Bytes, Error = IoError>>, IoError> {
462 let mut fcgibody = FCGIBody {
463 con: Arc::clone(&self.inner),
464 rid: (rid - 1),
465 done: false,
466 was_returned: false,
467 };
468 let mut rb = Response::builder();
469 let mut rheaders = rb.headers_mut().unwrap();
470 let mut status = StatusCode::OK;
471 let mut buf: Option<Bytes> = None;
473 while let Some(rbuf) = fcgibody.data().await {
474 if let Ok(mut b) = rbuf {
475 if let Some(left) = buf.take() {
476 let mut c = BytesMut::with_capacity(left.len() + b.len());
478 c.put(left);
479 c.put(b);
480 b = c.freeze();
481 }
482 match parse(b.clone(), &mut rheaders) {
483 ParseResult::Ok(bodydata) => {
484 trace!("read body fragment: {:?}", &bodydata);
485 if bodydata.has_remaining() {
486 let mut mut_inner = self.inner.lock().await;
487 mut_inner.running_requests[fcgibody.rid as usize]
489 .buf
490 .push(bodydata);
491 }
492
493 if let Some(stat) = rheaders.get("Status") {
494 if stat.len() >= 3 {
497 if let Ok(s) = StatusCode::from_bytes(&stat.as_bytes()[..3][..]) {
498 status = s;
499 }
500 }
501 }
502 break;
504 }
505 ParseResult::Pending => {
506 buf = Some(b);
508 trace!("header pending");
509 }
510 ParseResult::Err => {
511 status = StatusCode::INTERNAL_SERVER_ERROR;
512 break;
513 }
514 }
515 } else {
516 error!("{:?}", rbuf);
517 }
518 }
519 fcgibody.was_returned = true;
520 debug!("resp header parsing done");
521
522 match rb.status(status).body(fcgibody) {
523 Ok(v) => Ok(v),
524 Err(_) => {
525 unreachable!();
527 }
528 }
529 }
530}
531
532pub(crate) struct BodyDataFrame<'a, T: ?Sized>(pub(crate) &'a mut T);
534impl<'a, T: Body + Unpin + ?Sized> Future for BodyDataFrame<'a, T> {
535 type Output = Option<Result<T::Data, T::Error>>;
536
537 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
538 match Pin::new(&mut self.0).poll_frame(ctx) {
539 Poll::Ready(Some(Ok(a))) => {
540 if let Ok(d) = a.into_data() {
541 Poll::Ready(Some(Ok(d)))
542 }else{
543 ctx.waker().wake_by_ref();
544 Poll::Pending
545 }
546 },
547 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
548 Poll::Ready(None) => Poll::Ready(None),
549 Poll::Pending => Poll::Pending
550 }
551 }
552}
553pub(crate) trait BodyExt: Body {
554 fn data(&mut self) -> BodyDataFrame<'_, Self>
558 where
559 Self: Unpin,
560 {
561 BodyDataFrame(self)
562 }
563}
564impl<T: ?Sized> BodyExt for T where T: Body {}
565
566impl Future for InnerConnection {
567 type Output = Option<Result<(), IoError>>;
568
569 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<(), IoError>>> {
570 self.poll_resp(cx)
571 }
572}
573struct CheckAlive<'a>(&'a mut InnerConnection);
574
575impl<'a> Future for CheckAlive<'a> {
576 type Output = Result<bool, IoError>;
577
578 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<bool, IoError>> {
579 Poll::Ready(match Pin::new(&mut *self.0).poll_resp(cx) {
580 Poll::Ready(None) => Ok(false),
581 Poll::Ready(Some(Err(e))) => {
582 error!("allive: {:?}", e);
583 if e.kind() == ErrorKind::NotConnected {
584 Ok(false)
585 } else {
586 Err(e)
587 }
588 }
589 _ => Ok(true),
590 })
591 }
592}
593
594impl InnerConnection {
595 fn check_alive(&mut self) -> CheckAlive {
597 CheckAlive(self)
598 }
599 async fn abort_req(&mut self, request_id: u16) -> Result<(), IoError> {
600 self.io.encode(FCGIType::AbortRequest { request_id }).await
601 }
602 fn poll_resp(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<(), IoError>>> {
606 let Self {
607 ref mut io,
608 ref mut running_requests,
609 ref mut fcgi_parser,
610 } = *self;
611 let read = match Pin::new(io).poll_fill_buf(cx) {
617 Poll::Ready(Ok(rbuf)) => {
618 let data_available = rbuf.len();
619 if data_available == 0 {
620 info!("connection closed");
621 0
622 } else {
623 let mut data = Bytes::copy_from_slice(rbuf);
624 if log_enabled!(Trace) {
625 let print = if data.len() > 50 {
626 format!(
627 "({}) {:?}...{:?}",
628 data.len(),
629 data.slice(..21),
630 data.slice(data.len() - 21..)
631 )
632 } else {
633 format!("{:?}", data)
634 };
635 trace!("read conn data {}", print);
636 }
637 InnerConnection::parse_and_distribute(&mut data, running_requests, fcgi_parser);
638 let read = data_available - data.remaining();
639 read
640 }
641 }
642 Poll::Ready(Err(e)) => {
643 error!("Err {}", e);
644 self.notify_everyone();
645 return Poll::Ready(Some(Err(e)));
646 }
647 Poll::Pending => return Poll::Pending,
648 };
649 if read == 0 {
650 self.notify_everyone();
651 Poll::Ready(None)
652 } else {
653 Pin::new(&mut (*self).io).consume(read);
654 Poll::Ready(Some(Ok(())))
655 }
656 }
657}
658impl Drop for FCGIRequest {
659 fn drop(&mut self) {
660 debug!("Req mplex id free");
661 }
662}
663impl Drop for FCGIBody {
664 fn drop(&mut self) {
665 if self.done {
666 return;
667 }
668 debug!("Dropping FCGIBody #{}!", self.rid + 1);
669 match self.con.try_lock() {
670 Ok(mut mut_inner) => {
671 let rid = self.rid as usize;
672 if let Some(req) = mut_inner.running_requests.get_mut(rid) {
673 req.aborted = true;
674 req.waker = None;
675 }
677 }
678 Err(e) => error!("{}", e),
679 }
680 }
681}
682
683impl Body for FCGIBody {
684 type Data = Bytes;
685 type Error = IoError;
686 fn poll_frame(
688 mut self: Pin<&mut Self>,
689 cx: &mut Context,
690 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
691 let Self {
700 ref con,
701 rid,
702 ref mut done,
703 was_returned,
704 } = *self;
705
706 if *done {
707 debug!("body #{} is already done", rid + 1);
708 return Poll::Ready(None);
709 }
710
711 trace!("read resp body");
712 let fut = con.lock();
713 match Box::pin(fut).as_mut().poll(cx) {
714 Poll::Pending => Poll::Pending,
715 Poll::Ready(mut mut_inner) => {
716 let _con_stat = Pin::new(&mut *mut_inner).poll_resp(cx);
720
721 let slab = match mut_inner.running_requests.get_mut(rid as usize) {
723 Some(slab) => slab,
724 None => {
725 warn!("#{} not in slab", rid + 1);
726 *done = true;
727 return Poll::Ready(None);
728 }
729 };
730
731 if slab.buf.remaining() >= 1 {
741 trace!("body #{} has data and is {} closed", rid + 1, slab.ended);
742 let retdata = Poll::Ready(Some(Ok(Frame::data(slab.buf.oldest().unwrap()))));
743 if was_returned && slab.ended && slab.buf.remaining() < 1 {
744 trace!("next read on #{} will not have data -> release", rid + 1);
747 mut_inner.running_requests.remove(rid as usize);
748 *done = true;
749 }
750 retdata
751 } else {
752 let req_done = slab.ended;
754 if req_done {
755 debug!("body #{} is done", rid + 1);
756 if was_returned {
757 mut_inner.running_requests.remove(rid as usize);
758 *done = true;
759 } else {
760 warn!("#{} closed before handover", rid + 1);
761 }
762 Poll::Ready(None)
763 } else {
764 trace!("body waits");
765 slab.waker = Some(cx.waker().clone());
767 Poll::Pending
768 }
769 }
770 }
771 }
772 }
773}
774
775impl InnerConnection {
776 fn notify_everyone(&mut self) {
778 for (rid, mpxs) in self.running_requests.iter_mut() {
779 if let Some(waker) = mpxs.waker.take() {
780 waker.wake()
781 }
782 if mpxs.aborted {
783 continue;
784 }
785 if !mpxs.ended {
786 error!("body #{} not done", rid + 1);
787 }
788 mpxs.ended = true;
789 }
790 }
791 fn parse_and_distribute(
792 data: &mut Bytes,
793 running_requests: &mut Slab<FCGIRequest>,
794 fcgi_parser: &mut fastcgi::RecordReader,
795 ) {
796 while let Some(r) = fcgi_parser.read(data) {
798 let (req_no, ovr) = r.get_request_id().overflowing_sub(1);
799 if ovr {
800 error!("got mgmt record");
802 continue;
803 }
804 debug!("record for #{}", req_no + 1);
805 if let Some(mpxs) = running_requests.get_mut(req_no as usize) {
806 match r.body {
807 fastcgi::Body::EndRequest(status) => {
808 match status.protocol_status {
809 fastcgi::ProtoStatus::Complete => {
810 info!("Req #{} ended with {}", req_no + 1, status.app_status)
811 }
812 _ => error!(
815 "Req #{} ended with fcgi error {}",
816 req_no + 1,
817 status.protocol_status
818 ),
819 };
820 mpxs.ended = true;
821 if let Some(waker) = mpxs.waker.take() {
822 waker.wake()
823 }
824 if mpxs.aborted {
825 running_requests.remove(req_no as usize);
827 }
828 }
829 _ if mpxs.aborted => {
830 continue;
832 }
833 fastcgi::Body::StdOut(s) => {
834 if log_enabled!(Trace) {
835 let print = if s.len() > 50 {
836 format!(
837 "({}) {:?}...{:?}",
838 s.len(),
839 s.slice(..21),
840 s.slice(s.len() - 21..)
841 )
842 } else {
843 format!("{:?}", s)
844 };
845 trace!("FCGI stdout: {}", print);
846 }
847 if s.has_remaining() {
848 mpxs.buf.push(s);
849 if let Some(waker) = mpxs.waker.take() {
850 waker.wake();
851 }
852 }
853 }
854 fastcgi::Body::StdErr(s) => {
855 error!("FCGI #{} Err: {:?}", req_no + 1, s);
856 }
857 _ => {
858 warn!("type?");
859 }
860 }
861 } else {
862 debug!("not a pending req ID");
863 }
865 }
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use super::*;
872 use crate::client::tests::local_socket_pair;
873 use http_body::SizeHint;
874 use std::collections::{HashMap, VecDeque};
875 use tokio::{
876 io::{AsyncReadExt, AsyncWriteExt},
877 net::TcpListener,
878 runtime::Builder,
879 };
880
881 struct TestBod {
882 l: VecDeque<Bytes>,
883 }
884 impl Body for TestBod {
885 type Data = Bytes;
886 type Error = IoError;
887 fn poll_frame(
888 mut self: Pin<&mut Self>,
889 _cx: &mut Context,
890 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
891 let Self { ref mut l } = *self;
892 match l.pop_front() {
893 None => Poll::Ready(None),
894 Some(i) => Poll::Ready(Some(Ok(Frame::data(i)))),
895 }
896 }
897 fn size_hint(&self) -> SizeHint {
898 let mut sh = SizeHint::default();
899 let s: usize = self.l.iter().map(|b| b.remaining()).sum();
900 sh.set_exact(s as u64);
901 sh
902 }
903 }
904 fn init_log() {
905 let mut builder = pretty_env_logger::formatted_timed_builder();
906 builder.is_test(true);
907 if let Ok(s) = ::std::env::var("RUST_LOG") {
908 builder.parse_filters(&s);
909 }
910 let _ = builder.try_init();
911 }
912
913 #[test]
914 fn simple_get() {
915 init_log();
916 let rt = Builder::new_current_thread().enable_all().build().unwrap();
918 async fn mock_app(app_listener: TcpListener) {
919 let (mut app_socket, _) = app_listener.accept().await.unwrap();
920 let mut buf = BytesMut::with_capacity(4096);
921 app_socket.read_buf(&mut buf).await.unwrap();
922 trace!("app read {:?}", buf);
923 let to_php = b"\x01\x01\0\x01\0\x08\0\0\0\x01\x01\0\0\0\0\0\x01\x04\0\x01\0i\x07\0\x0f\x1cSCRIPT_FILENAME/home/daniel/Public/test.php\x0c\x05QUERY_STRINGlol=1\x0e\x03REQUEST_METHODGET\x0b\tHTTP_ACCEPTtext/html\x01\x04\0\x01\0i\x07\x01\x04\0\x01\0\0\0\0\x01\x05\0\x01\0\0\0\0";
924 assert_eq!(buf, Bytes::from(&to_php[..]));
925 trace!("app answers on get");
926 let from_php = b"\x01\x07\0\x01\0W\x01\0PHP Fatal error: Kann nicht durch 0 teilen in /home/daniel/Public/test.php on line 14\n\0\x01\x06\0\x01\x01\xf7\x01\0Status: 404 Not Found\r\nX-Powered-By: PHP/7.3.16\r\nX-Authenticate: NTLM\r\nContent-type: text/html; charset=UTF-8\r\n\r\n<html><body>\npub\n<pre>Array\n(\n)\nArray\n(\n [lol] => 1\n)\nArray\n(\n [lol] => 1\n)\nArray\n(\n [HTTP_accept] => text/html\n [REQUEST_METHOD] => GET\n [QUERY_STRING] => lol=1\n [SCRIPT_NAME] => /test\n [SCRIPT_FILENAME] => /home/daniel/Public/test.php\n [FCGI_ROLE] => RESPONDER\n [PHP_SELF] => /test\n [REQUEST_TIME_FLOAT] => 1587740954.2741\n [REQUEST_TIME] => 1587740954\n)\n\0\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
927 app_socket
928 .write_buf(&mut Bytes::from(&from_php[..]))
929 .await
930 .unwrap();
931 }
932
933 async fn con() {
934 let (app_listener, a) = local_socket_pair().await.unwrap();
935 let m = tokio::spawn(mock_app(app_listener));
936
937 let fcgi_con = Connection::connect(&a, 1).await.unwrap();
938 trace!("new connection obj");
939 let b = TestBod { l: VecDeque::new() };
940 let req = Request::get("/test?lol=1")
941 .header("Accept", "text/html")
942 .body(b)
943 .unwrap();
944 trace!("new req obj");
945 let mut params = HashMap::new();
946 params.insert(
947 &b"SCRIPT_FILENAME"[..],
948 &b"/home/daniel/Public/test.php"[..],
949 );
950 let mut res = fcgi_con.forward(req, params).await.expect("forward failed");
951 trace!("got res obj");
952 assert_eq!(res.status(), StatusCode::NOT_FOUND);
953 assert_eq!(
954 res.headers()
955 .get("X-Powered-By")
956 .expect("powered by header missing"),
957 "PHP/7.3.16"
958 );
959 let read1 = res.data().await;
960 assert!(read1.is_some());
961 let read1 = read1.unwrap();
962 assert!(read1.is_ok());
963 if let Ok(d) = read1 {
964 let body = b"<html><body>\npub\n<pre>Array\n(\n)\nArray\n(\n [lol] => 1\n)\nArray\n(\n [lol] => 1\n)\nArray\n(\n [HTTP_accept] => text/html\n [REQUEST_METHOD] => GET\n [QUERY_STRING] => lol=1\n [SCRIPT_NAME] => /test\n [SCRIPT_FILENAME] => /home/daniel/Public/test.php\n [FCGI_ROLE] => RESPONDER\n [PHP_SELF] => /test\n [REQUEST_TIME_FLOAT] => 1587740954.2741\n [REQUEST_TIME] => 1587740954\n)\n";
965 assert_eq!(d, &body[..]);
966 }
967 let read2 = res.data().await;
968 assert!(read2.is_none());
969 m.await.unwrap();
970 }
971 rt.block_on(con());
972 }
973 #[test]
974 fn app_answer_split_mid_record() {
975 init_log();
977 let rt = Builder::new_current_thread().enable_all().build().unwrap();
979 async fn mock_app(app_listener: TcpListener) {
980 let (mut app_socket, _) = app_listener.accept().await.unwrap();
981 let mut buf = BytesMut::with_capacity(4096);
982 app_socket.read_buf(&mut buf).await.unwrap();
983 trace!("app read {:?}", buf);
984 trace!("app answers on get");
985 let from_flup = b"\x01\x06\0\x01\0@\0\0Status: 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\n\x01\x06\0\x01\0\r\x03\0Hello World!\n";
986 app_socket
987 .write_buf(&mut Bytes::from(&from_flup[..]))
988 .await
989 .unwrap();
990 }
991
992 async fn con() {
993 let (app_listener, a) = local_socket_pair().await.unwrap();
994 let m = tokio::spawn(mock_app(app_listener));
995
996 let fcgi_con = Connection::connect(&a, 1).await.unwrap();
997 trace!("new connection obj");
998 let b = TestBod { l: VecDeque::new() };
999 let req = Request::get("/").body(b).unwrap();
1000 trace!("new req obj");
1001 let params: HashMap<Bytes, Bytes> = HashMap::new();
1002 let mut res = fcgi_con.forward(req, params).await.expect("forward failed");
1003 trace!("got res obj");
1004 let read1 = res.data().await;
1005 assert!(read1.is_some());
1006 let read1 = read1.unwrap();
1007 assert!(read1.is_ok());
1008 if let Ok(d) = read1 {
1009 let body = b"Hello World!\n";
1010 assert_eq!(d, &body[..]);
1011 }
1012 m.await.unwrap();
1013 }
1014 rt.block_on(con());
1015 }
1016
1017 #[test]
1018 fn app_http_headers_split() {
1019 init_log();
1020 let rt = Builder::new_current_thread().enable_all().build().unwrap();
1022 async fn mock_app(app_listener: TcpListener) {
1023 let (mut app_socket, _) = app_listener.accept().await.unwrap();
1024 let mut buf = BytesMut::with_capacity(4096);
1025 app_socket.read_buf(&mut buf).await.unwrap();
1026 trace!("app read {:?}", buf);
1027 trace!("app answers on get");
1028 let from_flup = b"\x01\x06\0\x01\0\x1e\0\0Status: 200 OK\r\nContent-Type: ";
1029 app_socket
1030 .write_buf(&mut Bytes::from(&from_flup[..]))
1031 .await
1032 .unwrap();
1033 let from_flup = b"\x01\x06\0\x01\0\"\0\0text/plain\r\nContent-Length: 13\r\n\r\n\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
1034 app_socket
1035 .write_buf(&mut Bytes::from(&from_flup[..]))
1036 .await
1037 .unwrap();
1038 }
1039
1040 async fn con() {
1041 let (app_listener, a) = local_socket_pair().await.unwrap();
1042 let m = tokio::spawn(mock_app(app_listener));
1043
1044 let fcgi_con = Connection::connect(&a, 1).await.unwrap();
1045 trace!("new connection obj");
1046 let b = TestBod { l: VecDeque::new() };
1047 let req = Request::get("/").body(b).unwrap();
1048 trace!("new req obj");
1049 let params: HashMap<Bytes, Bytes> = HashMap::new();
1050 let mut res = fcgi_con.forward(req, params).await.expect("forward failed");
1051 trace!("got res obj");
1052 assert_eq!(res.status(), StatusCode::OK);
1053 assert_eq!(
1054 res.headers()
1055 .get("Content-Length")
1056 .expect("len header missing"),
1057 "13"
1058 );
1059 assert_eq!(
1060 res.headers()
1061 .get("Content-Type")
1062 .expect("type header missing"),
1063 "text/plain"
1064 );
1065
1066 let read1 = res.data().await;
1067 assert!(read1.is_none());
1068 m.await.unwrap();
1069 }
1070 rt.block_on(con());
1071 }
1072 #[test]
1073 fn simple_post() {
1074 init_log();
1075 let rt = Builder::new_current_thread().enable_all().build().unwrap();
1077 async fn mock_app(app_listener: TcpListener) {
1078 let (mut app_socket, _) = app_listener.accept().await.unwrap();
1079 let mut buf = BytesMut::with_capacity(4096);
1080 app_socket.read_buf(&mut buf).await.unwrap();
1081 trace!("app read {:?}", buf);
1082 let to_php = b"\x01\x01\0\x01\0\x08\0\0\0\x01\x01\0\0\0\0\0\x01\x04\0\x01\0\x81\x07\0\x0f\x1cSCRIPT_FILENAME/home/daniel/Public/test.php\x0c\0QUERY_STRING\x0e\x04REQUEST_METHODPOST\x0c\x13CONTENT_TYPEmultipart/form-data\x0e\x01CONTENT_LENGTH8\x01\x04\0\x01\0\x81\x07\x01\x04\0\x01\0\0\0\0\x01\x05\0\x01\0\x08\0\0test=123\x01\x05\0\x01\0\0\0\0";
1083 assert_eq!(buf, Bytes::from(&to_php[..]));
1084 trace!("app answers on get");
1085 let from_php = b"\x01\x06\0\x01\x00\x23\x05\0Status: 201 Created\r\n\r\n<html><body>#+#+#\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
1086 app_socket
1087 .write_buf(&mut Bytes::from(&from_php[..]))
1088 .await
1089 .unwrap();
1090 }
1091
1092 async fn con() {
1093 let (app_listener, a) = local_socket_pair().await.unwrap();
1094 let m = tokio::spawn(mock_app(app_listener));
1095
1096 let fcgi_con = Connection::connect(&a, 1).await.unwrap();
1097 trace!("new connection obj");
1098 let mut l = VecDeque::new();
1099 l.push_back(Bytes::from(&"test=123"[..]));
1100 let b = TestBod { l };
1101
1102 let req = Request::post("/test")
1103 .header("Content-Length", "8")
1104 .header("Content-Type", "multipart/form-data")
1105 .body(b)
1106 .unwrap();
1107 trace!("new req obj");
1108 let mut params = HashMap::new();
1109 params.insert(
1110 &b"SCRIPT_FILENAME"[..],
1111 &b"/home/daniel/Public/test.php"[..],
1112 );
1113 let mut res = fcgi_con.forward(req, params).await.expect("forward failed");
1114 trace!("got res obj");
1115 assert_eq!(res.status(), StatusCode::CREATED);
1116 let read1 = res.data().await;
1117 assert!(read1.is_some());
1118 let read1 = read1.unwrap();
1119 assert!(read1.is_ok());
1120 if let Ok(d) = read1 {
1121 let body = b"<html><body>";
1122 assert_eq!(d, &body[..]);
1123 }
1124 let read2 = res.data().await;
1125 assert!(read2.is_none());
1126 m.await.unwrap();
1127 }
1128 rt.block_on(con());
1129 }
1130 #[test]
1131 fn long_header() {
1132 init_log();
1133 let rt = Builder::new_current_thread().enable_all().build().unwrap();
1135 async fn mock_app(app_listener: TcpListener) {
1136 let (mut app_socket, _) = app_listener.accept().await.unwrap();
1137 let mut buf = BytesMut::with_capacity(4096);
1138 app_socket.read_buf(&mut buf).await.unwrap();
1139 trace!("app read {:?}", buf);
1140 let to_php = b"\x01\x01\0\x01\0\x08\0\0\0\x01\x01\0\0\0\0\0\x01\x04\0\x01\0\xb8\0\0\x0c\0QUERY_STRING\x0e\x03REQUEST_METHODGET\x0b\x80\0\0\x87HTTP_ACCEPTtext/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\x01\x04\0\x01\0\0\0\0\x01\x05\0\x01\0\0\0\0";
1141 assert_eq!(buf, Bytes::from(&to_php[..]));
1142 trace!("app answers on get");
1143 let from_php = b"\x01\x06\0\x01\0\x1b\x05\0Status: 404 Not Found\r\n\r\n\r\n\x01\x06\0\x01\0\x01\x03\0\x01\0\x08\0\0\0\0\0\0\0\0\0\0";
1144 app_socket
1145 .write_buf(&mut Bytes::from(&from_php[..]))
1146 .await
1147 .unwrap();
1148 }
1149
1150 async fn con() {
1151 let (app_listener, a) = local_socket_pair().await.unwrap();
1152 let m = tokio::spawn(mock_app(app_listener));
1153
1154 let fcgi_con = Connection::connect(&a, 1).await.unwrap();
1155 trace!("new connection obj");
1156 let b = TestBod { l: VecDeque::new() };
1157 let req = Request::get("/")
1158 .header("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7")
1159 .body(b)
1160 .unwrap();
1161 trace!("new req obj");
1162 let params: HashMap<Bytes, Bytes> = HashMap::new();
1163 let res = fcgi_con.forward(req, params).await.expect("forward failed");
1164 trace!("got res obj");
1165 assert_eq!(res.status(), StatusCode::NOT_FOUND);
1166 m.await.unwrap();
1167 }
1168 rt.block_on(con());
1169 }
1170}