async_fcgi/client/
con_pool.rs

1/*! FCGI Application serving for [Hyper 0.13](https://github.com/hyperium/hyper).
2
3
4
5  This Module consists of the following Objects:
6
7 * [`ConPool`]: supports FCGI_MAX_CONNS Connections
8 * [`Connection`]: handles up to FCGI_MAX_REQS concurrent Requests
9
10[`ConPool`]: ./struct.ConPool.html
11[`Connection`]: ../connection/index.html
12*/
13
14use crate::client::connection::{Connection, MultiHeaderStrategy, HeaderMultilineStrategy};
15use crate::codec::FCGIWriter;
16use crate::fastcgi::{Body, Record, MAX_CONNS, MAX_REQS, MPXS_CONNS, RecordType};
17use async_stream_connection::{Addr, Stream};
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use http::{Request, Response};
20use http_body::Body as HttpBody;
21use log::{info, trace};
22use std::error::Error;
23use std::fmt;
24use std::io::Error as IoError;
25use std::iter::IntoIterator;
26use tokio::io::AsyncReadExt;
27
28#[cfg(all(unix, feature = "app_start"))]
29use async_stream_connection::Listener;
30#[cfg(feature = "app_start")]
31use std::ffi::OsStr;
32#[cfg(all(unix, feature = "app_start"))]
33use std::os::unix::io::{AsRawFd, FromRawFd};
34#[cfg(feature = "app_start")]
35use std::process::Stdio;
36#[cfg(feature = "app_start")]
37use tokio::process::Command;
38
39/// manage a pool of [`Connection`]s to an Server.
40pub struct ConPool {
41    /*
42    sock_addr: String,*/
43    max_cons: u8,
44    /// The maximum number of concurrent transport connections this application will accept
45    max_req_per_con: u16,
46    /// The maximum number of concurrent requests this application will accept
47    con_pool: Connection,
48}
49impl ConPool {
50    /// Connect to a FCGI server / application with [`MultiHeaderStrategy::OnlyFirst`] & [`HeaderMultilineStrategy::Ignore`].
51    /// See [`ConPool::new_with_strategy`]
52    #[inline]
53    pub async fn new(sock_addr: &Addr) -> Result<ConPool, Box<dyn Error>> {
54        Self::new_with_strategy(
55            sock_addr,
56            MultiHeaderStrategy::OnlyFirst,
57            HeaderMultilineStrategy::Ignore,
58        )
59        .await
60    }
61
62    /// Connect to a FCGI server / application.
63    /// Queries [`MAX_CONNS`],
64    /// [`MAX_REQS`]
65    /// and [`MPXS_CONNS`] from the server
66    /// and uses the values to create a [`Connection`].
67    pub async fn new_with_strategy(
68        sock_addr: &Addr,
69        header_mul: MultiHeaderStrategy,
70        header_nl: HeaderMultilineStrategy,
71    ) -> Result<ConPool, Box<dyn Error>> {
72        // query VALUES from connection
73        let stream = Stream::connect(sock_addr).await?;
74        let mut stream = FCGIWriter::new(stream);
75        let mut kvw = stream.kv_stream(Record::MGMT_REQUEST_ID, RecordType::GetValues);
76        kvw.add_kv(MAX_CONNS, Bytes::new()).await?;
77        kvw.add_kv(MAX_REQS, Bytes::new()).await?;
78        kvw.add_kv(MPXS_CONNS, Bytes::new()).await?;
79        kvw.flush().await?;
80        let mut max_cons = 1;
81        let mut max_req_per_con = 1;
82        for rec in send_and_receive(&mut stream).await? {
83            if let Body::GetValuesResult(kvs) = rec.body {
84                for kv in kvs.drain() {
85                    match kv.name_data.chunk() {
86                        MAX_CONNS => {
87                            if let Some(v) = parse_int::<u8>(kv.value_data) {
88                                max_cons = v;
89                            }
90                        }
91                        MAX_REQS => {
92                            if let Some(v) = parse_int::<u16>(kv.value_data) {
93                                max_req_per_con = v;
94                            }
95                        }
96                        MPXS_CONNS => {
97                            if kv.value_data == "0" {
98                                max_req_per_con = 1;
99                                break;
100                            }
101                        }
102                        _ => {}
103                    };
104                }
105            }
106        }
107        info!(
108            "App supports {} connections with {} requests",
109            max_cons, max_req_per_con
110        );
111        let c = Connection::connect_with_strategy(
112            &sock_addr,
113            max_req_per_con,
114            header_mul,
115            header_nl
116        ).await?;
117
118        Ok(ConPool {
119            /*
120            sock_addr,*/
121            max_cons,
122            max_req_per_con,
123            con_pool: c,
124        })
125    }
126    /// Forwards an HTTP request to a FGCI Application.
127    /// Calls [`Connection::forward`] on an available connection.
128    pub async fn forward<B, I, P1, P2>(
129        &self,
130        req: Request<B>,
131        dyn_headers: I,
132    ) -> Result<Response<impl HttpBody<Data = Bytes, Error = IoError>>, IoError>
133    where
134        B: HttpBody + Unpin,
135        I: IntoIterator<Item = (P1, P2)>,
136        P1: Buf,
137        P2: Buf,
138    {
139        self.con_pool.forward(req, dyn_headers).await
140    }
141}
142impl fmt::Debug for ConPool {
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        f.debug_struct("ConPool")
145            .field("max_cons", &self.max_cons)
146            .field("max_req_per_con", &self.max_req_per_con)
147            .finish()
148    }
149}
150
151fn parse_int<I: std::str::FromStr>(bytes: Bytes) -> Option<I> {
152    if let Ok(s) = std::str::from_utf8(bytes.chunk()) {
153        if let Ok(i) = s.parse() {
154            return Some(i);
155        }
156    }
157    return None;
158}
159
160/// Note: only use this if there are no requests pending
161async fn send_and_receive(stream: &mut FCGIWriter<Stream>) -> Result<Vec<Record>, IoError> {
162    let mut recs = Vec::new();
163
164    trace!("prep 4 read");
165    let mut rbuf = BytesMut::with_capacity(4096);
166    loop {
167        stream.read_buf(&mut rbuf).await?;
168        trace!("got {:?}", rbuf);
169        let mut pbuf = rbuf.freeze();
170        while let Some(r) = Record::read(&mut pbuf) {
171            recs.push(r);
172        }
173        if !pbuf.has_remaining() {
174            break;
175        }
176        rbuf = BytesMut::with_capacity(pbuf.len() + 4096);
177        rbuf.put(pbuf);
178    }
179
180    Ok(recs)
181}
182
183#[cfg(feature = "app_start")]
184#[cfg_attr(docsrs, doc(cfg(feature = "app_start")))]
185impl ConPool {
186    /// Setup a [`Command`] to spin up a FCGI server / application
187    /// and make it listen on `sock_addr`.
188    /// ```no_run
189    /// # use async_fcgi::{client::con_pool::ConPool,FCGIAddr};
190    /// # use std::collections::HashMap;
191    /// # use std::error::Error;
192    /// # #[tokio::main(flavor = "current_thread")]
193    /// # async fn main() -> Result<(),Box<dyn Error>> {
194    /// let mut env = HashMap::new();
195    /// env.insert("PHP_FCGI_CHILDREN", "16");
196    /// env.insert("PHP_FCGI_MAX_REQUESTS", "10000");
197    /// let addr: FCGIAddr = "127.0.0.1:1236".parse()?;
198    /// let php = ConPool::prep_server("/usr/bin/php-cgi7.4", &addr)
199    ///             .await?
200    ///             .env_clear().envs(env)
201    ///             .spawn()?;
202    /// # Ok(())
203    /// # }
204    /// ```
205    pub async fn prep_server<S>(program: S, sock_addr: &Addr) -> Result<Command, IoError>
206    where
207        S: AsRef<OsStr>,
208    {
209        // The Web server leaves a single file descriptor, FCGI_LISTENSOCK_FILENO, open when the application begins execution.
210        // This descriptor refers to a listening socket created by the Web server.
211        #[cfg(not(unix))]
212        let stdin = Stdio::null();
213        #[cfg(unix)]
214        let stdin = {
215            let l = Listener::bind(sock_addr).await?;
216            let fd = unsafe { Stdio::from_raw_fd(l.as_raw_fd()) };
217            std::mem::forget(l); // FCGI App closes this - at least php-cgi7.4 does it
218            fd
219        };
220
221        let mut command = Command::new(program);
222        command
223                .stdin(stdin) // FCGI_LISTENSOCK_FILENO equals STDIN_FILENO.
224                //.stdout(Stdio::null()).stderr(Stdio::null()) // The standard descriptors STDOUT_FILENO and STDERR_FILENO are closed when the application begins execution.
225                ;
226        Ok(command)
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use crate::client::tests::local_socket_pair;
234    use std::collections::HashMap;
235    use std::iter::FromIterator;
236    use std::process::ExitStatus;
237    use tokio::io::AsyncWriteExt;
238    use tokio::runtime::Builder;
239
240    #[cfg(feature = "app_start")]
241    #[test]
242    fn start_app() {
243        let rt = Builder::new_current_thread().enable_all().build().unwrap();
244        async fn spawn() {
245            let mut env = HashMap::new();
246            env.insert("PATH", "/usr/bin");
247            let a: Addr = "/tmp/jo".parse().unwrap();
248            let s: ExitStatus = ConPool::prep_server("ls", &a)
249                .await
250                .expect("prep_server error")
251                .args(&["-l", "-a"])
252                .env_clear()
253                .envs(env)
254                .status()
255                .await
256                .expect("ls failed");
257            assert!(s.success())
258        }
259        rt.block_on(spawn());
260        std::fs::remove_file("/tmp/jo").unwrap();
261    }
262    #[test]
263    fn no_vals() {
264        //extern crate pretty_env_logger;
265        //pretty_env_logger::init();
266        use tokio::net::TcpListener;
267        // Create the runtime
268        let rt = Builder::new_current_thread().enable_all().build().unwrap();
269        async fn mock_app(app_listener: TcpListener) {
270            let (mut app_socket, _) = app_listener.accept().await.unwrap();
271            let mut buf = BytesMut::with_capacity(4096);
272            info!("accepted");
273
274            //app_socket.read_buf(&mut buf).await.unwrap();
275            if let Err(e) = app_socket.read_buf(&mut buf).await {
276                info!("{}", e);
277                panic!("could not read");
278            }
279
280            let mut buf = buf.freeze();
281            trace!("app read {:?}", buf);
282            let rec = Record::read(&mut buf).unwrap(); //val stream
283            assert_eq!(rec.get_request_id(), 0);
284            let v = match rec.body {
285                Body::GetValues(v) => v,
286                _ => panic!("wrong body"),
287            };
288            let names = Vec::from_iter(v.drain());
289            assert_eq!(names.len(), 3);
290
291            let _ = Record::read(&mut buf).unwrap(); //val stream end
292
293            assert!(!buf.has_remaining());
294
295            trace!("app answers on get");
296            let from_php =
297                b"\x01\x0a\0\0\0!\x07\0\n\0MPXS_CONNS\x08\0MAX_REQS\t\0MAX_CONNS\0\0\0\0\0\0\0";
298            app_socket
299                .write_buf(&mut Bytes::from(&from_php[..]))
300                .await
301                .unwrap();
302
303            let _ = app_listener.accept().await.unwrap();
304            info!("accepted2");
305        }
306
307        async fn con() {
308            let (app_listener, a) = local_socket_pair().await.unwrap();
309            info!("bound");
310            let m = tokio::spawn(async move {
311                let a = a.into();
312                let cp = ConPool::new(&a).await.unwrap();
313                assert_eq!(cp.max_cons, 1);
314                assert_eq!(cp.max_req_per_con, 1);
315            });
316            mock_app(app_listener).await;
317            m.await.unwrap();
318        }
319        rt.block_on(con());
320    }
321}