async_fcgi/client/
con_pool.rs1use crate::client::connection::{Connection, MultiHeaderStrategy, HeaderMultilineStrategy};
15use crate::codec::FCGIWriter;
16use crate::fastcgi::{Body, Record, MAX_CONNS, MAX_REQS, MPXS_CONNS, RecordType};
17use async_stream_connection::{Addr, Stream};
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use http::{Request, Response};
20use http_body::Body as HttpBody;
21use log::{info, trace};
22use std::error::Error;
23use std::fmt;
24use std::io::Error as IoError;
25use std::iter::IntoIterator;
26use tokio::io::AsyncReadExt;
27
28#[cfg(all(unix, feature = "app_start"))]
29use async_stream_connection::Listener;
30#[cfg(feature = "app_start")]
31use std::ffi::OsStr;
32#[cfg(all(unix, feature = "app_start"))]
33use std::os::unix::io::{AsRawFd, FromRawFd};
34#[cfg(feature = "app_start")]
35use std::process::Stdio;
36#[cfg(feature = "app_start")]
37use tokio::process::Command;
38
39pub struct ConPool {
41 max_cons: u8,
44 max_req_per_con: u16,
46 con_pool: Connection,
48}
49impl ConPool {
50 #[inline]
53 pub async fn new(sock_addr: &Addr) -> Result<ConPool, Box<dyn Error>> {
54 Self::new_with_strategy(
55 sock_addr,
56 MultiHeaderStrategy::OnlyFirst,
57 HeaderMultilineStrategy::Ignore,
58 )
59 .await
60 }
61
62 pub async fn new_with_strategy(
68 sock_addr: &Addr,
69 header_mul: MultiHeaderStrategy,
70 header_nl: HeaderMultilineStrategy,
71 ) -> Result<ConPool, Box<dyn Error>> {
72 let stream = Stream::connect(sock_addr).await?;
74 let mut stream = FCGIWriter::new(stream);
75 let mut kvw = stream.kv_stream(Record::MGMT_REQUEST_ID, RecordType::GetValues);
76 kvw.add_kv(MAX_CONNS, Bytes::new()).await?;
77 kvw.add_kv(MAX_REQS, Bytes::new()).await?;
78 kvw.add_kv(MPXS_CONNS, Bytes::new()).await?;
79 kvw.flush().await?;
80 let mut max_cons = 1;
81 let mut max_req_per_con = 1;
82 for rec in send_and_receive(&mut stream).await? {
83 if let Body::GetValuesResult(kvs) = rec.body {
84 for kv in kvs.drain() {
85 match kv.name_data.chunk() {
86 MAX_CONNS => {
87 if let Some(v) = parse_int::<u8>(kv.value_data) {
88 max_cons = v;
89 }
90 }
91 MAX_REQS => {
92 if let Some(v) = parse_int::<u16>(kv.value_data) {
93 max_req_per_con = v;
94 }
95 }
96 MPXS_CONNS => {
97 if kv.value_data == "0" {
98 max_req_per_con = 1;
99 break;
100 }
101 }
102 _ => {}
103 };
104 }
105 }
106 }
107 info!(
108 "App supports {} connections with {} requests",
109 max_cons, max_req_per_con
110 );
111 let c = Connection::connect_with_strategy(
112 &sock_addr,
113 max_req_per_con,
114 header_mul,
115 header_nl
116 ).await?;
117
118 Ok(ConPool {
119 max_cons,
122 max_req_per_con,
123 con_pool: c,
124 })
125 }
126 pub async fn forward<B, I, P1, P2>(
129 &self,
130 req: Request<B>,
131 dyn_headers: I,
132 ) -> Result<Response<impl HttpBody<Data = Bytes, Error = IoError>>, IoError>
133 where
134 B: HttpBody + Unpin,
135 I: IntoIterator<Item = (P1, P2)>,
136 P1: Buf,
137 P2: Buf,
138 {
139 self.con_pool.forward(req, dyn_headers).await
140 }
141}
142impl fmt::Debug for ConPool {
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 f.debug_struct("ConPool")
145 .field("max_cons", &self.max_cons)
146 .field("max_req_per_con", &self.max_req_per_con)
147 .finish()
148 }
149}
150
151fn parse_int<I: std::str::FromStr>(bytes: Bytes) -> Option<I> {
152 if let Ok(s) = std::str::from_utf8(bytes.chunk()) {
153 if let Ok(i) = s.parse() {
154 return Some(i);
155 }
156 }
157 return None;
158}
159
160async fn send_and_receive(stream: &mut FCGIWriter<Stream>) -> Result<Vec<Record>, IoError> {
162 let mut recs = Vec::new();
163
164 trace!("prep 4 read");
165 let mut rbuf = BytesMut::with_capacity(4096);
166 loop {
167 stream.read_buf(&mut rbuf).await?;
168 trace!("got {:?}", rbuf);
169 let mut pbuf = rbuf.freeze();
170 while let Some(r) = Record::read(&mut pbuf) {
171 recs.push(r);
172 }
173 if !pbuf.has_remaining() {
174 break;
175 }
176 rbuf = BytesMut::with_capacity(pbuf.len() + 4096);
177 rbuf.put(pbuf);
178 }
179
180 Ok(recs)
181}
182
183#[cfg(feature = "app_start")]
184#[cfg_attr(docsrs, doc(cfg(feature = "app_start")))]
185impl ConPool {
186 pub async fn prep_server<S>(program: S, sock_addr: &Addr) -> Result<Command, IoError>
206 where
207 S: AsRef<OsStr>,
208 {
209 #[cfg(not(unix))]
212 let stdin = Stdio::null();
213 #[cfg(unix)]
214 let stdin = {
215 let l = Listener::bind(sock_addr).await?;
216 let fd = unsafe { Stdio::from_raw_fd(l.as_raw_fd()) };
217 std::mem::forget(l); fd
219 };
220
221 let mut command = Command::new(program);
222 command
223 .stdin(stdin) ;
226 Ok(command)
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use crate::client::tests::local_socket_pair;
234 use std::collections::HashMap;
235 use std::iter::FromIterator;
236 use std::process::ExitStatus;
237 use tokio::io::AsyncWriteExt;
238 use tokio::runtime::Builder;
239
240 #[cfg(feature = "app_start")]
241 #[test]
242 fn start_app() {
243 let rt = Builder::new_current_thread().enable_all().build().unwrap();
244 async fn spawn() {
245 let mut env = HashMap::new();
246 env.insert("PATH", "/usr/bin");
247 let a: Addr = "/tmp/jo".parse().unwrap();
248 let s: ExitStatus = ConPool::prep_server("ls", &a)
249 .await
250 .expect("prep_server error")
251 .args(&["-l", "-a"])
252 .env_clear()
253 .envs(env)
254 .status()
255 .await
256 .expect("ls failed");
257 assert!(s.success())
258 }
259 rt.block_on(spawn());
260 std::fs::remove_file("/tmp/jo").unwrap();
261 }
262 #[test]
263 fn no_vals() {
264 use tokio::net::TcpListener;
267 let rt = Builder::new_current_thread().enable_all().build().unwrap();
269 async fn mock_app(app_listener: TcpListener) {
270 let (mut app_socket, _) = app_listener.accept().await.unwrap();
271 let mut buf = BytesMut::with_capacity(4096);
272 info!("accepted");
273
274 if let Err(e) = app_socket.read_buf(&mut buf).await {
276 info!("{}", e);
277 panic!("could not read");
278 }
279
280 let mut buf = buf.freeze();
281 trace!("app read {:?}", buf);
282 let rec = Record::read(&mut buf).unwrap(); assert_eq!(rec.get_request_id(), 0);
284 let v = match rec.body {
285 Body::GetValues(v) => v,
286 _ => panic!("wrong body"),
287 };
288 let names = Vec::from_iter(v.drain());
289 assert_eq!(names.len(), 3);
290
291 let _ = Record::read(&mut buf).unwrap(); assert!(!buf.has_remaining());
294
295 trace!("app answers on get");
296 let from_php =
297 b"\x01\x0a\0\0\0!\x07\0\n\0MPXS_CONNS\x08\0MAX_REQS\t\0MAX_CONNS\0\0\0\0\0\0\0";
298 app_socket
299 .write_buf(&mut Bytes::from(&from_php[..]))
300 .await
301 .unwrap();
302
303 let _ = app_listener.accept().await.unwrap();
304 info!("accepted2");
305 }
306
307 async fn con() {
308 let (app_listener, a) = local_socket_pair().await.unwrap();
309 info!("bound");
310 let m = tokio::spawn(async move {
311 let a = a.into();
312 let cp = ConPool::new(&a).await.unwrap();
313 assert_eq!(cp.max_cons, 1);
314 assert_eq!(cp.max_req_per_con, 1);
315 });
316 mock_app(app_listener).await;
317 m.await.unwrap();
318 }
319 rt.block_on(con());
320 }
321}