fastcgi_client/client.rs
1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI client implementation for async communication with FastCGI servers.
16//!
17//! This module provides the main `Client` struct that handles communication
18//! with FastCGI servers in both short connection and keep-alive modes.
19//! The client can execute requests and receive responses or response streams.
20
21use crate::{
22 ClientError, ClientResult, Response,
23 conn::{KeepAlive, Mode, ShortConn},
24 io::{self, AsyncRead, AsyncWrite, AsyncWriteExt},
25 meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
26 params::Params,
27 request::Request,
28 response::ResponseStream,
29};
30use std::marker::PhantomData;
31use tracing::debug;
32
33#[cfg(feature = "runtime-tokio")]
34use crate::io::{TokioAsyncReadCompatExt, TokioCompat};
35
36/// I refer to nginx fastcgi implementation, found the request id is always 1.
37///
38/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
39const REQUEST_ID: u16 = 1;
40
41/// Async client for handling communication between fastcgi server.
42pub struct Client<S, M> {
43 stream: S,
44 _mode: PhantomData<M>,
45}
46
47impl<S, M> Client<S, M> {
48 fn from_stream(stream: S) -> Self {
49 Self {
50 stream,
51 _mode: PhantomData,
52 }
53 }
54}
55
56#[cfg(feature = "runtime-tokio")]
57impl<S> Client<TokioCompat<S>, ShortConn>
58where
59 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
60{
61 /// Construct a `Client` Object with a Tokio stream under short connection
62 /// mode.
63 ///
64 /// # Examples
65 ///
66 /// ```
67 /// # #[cfg(feature = "runtime-tokio")]
68 /// # async fn example() {
69 /// use fastcgi_client::Client;
70 /// use tokio::net::TcpStream;
71 /// # #[cfg(unix)]
72 /// # use tokio::net::UnixStream;
73 ///
74 /// let tcp_stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
75 /// let _tcp_client = Client::new_tokio(tcp_stream);
76 ///
77 /// # #[cfg(unix)]
78 /// # {
79 /// let unix_stream = UnixStream::connect("/run/php-fpm.sock").await.unwrap();
80 /// let _unix_client = Client::new_tokio(unix_stream);
81 /// # }
82 /// # }
83 /// # #[cfg(not(feature = "runtime-tokio"))]
84 /// # fn example() {}
85 /// ```
86 pub fn new_tokio(stream: S) -> Self {
87 Self::from_stream(stream.compat())
88 }
89}
90
91#[cfg(feature = "runtime-smol")]
92impl<S> Client<S, ShortConn>
93where
94 S: AsyncRead + AsyncWrite + Unpin,
95{
96 /// Construct a `Client` Object with a Smol-compatible stream under short
97 /// connection mode.
98 ///
99 /// # Examples
100 ///
101 /// ```
102 /// # #[cfg(feature = "runtime-smol")]
103 /// # async fn example() {
104 /// use fastcgi_client::Client;
105 /// use smol::net::TcpStream;
106 /// # #[cfg(unix)]
107 /// # use smol::net::unix::UnixStream;
108 ///
109 /// let tcp_stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
110 /// let _tcp_client = Client::new_smol(tcp_stream);
111 ///
112 /// # #[cfg(unix)]
113 /// # {
114 /// let unix_stream = UnixStream::connect("/run/php-fpm.sock").await.unwrap();
115 /// let _unix_client = Client::new_smol(unix_stream);
116 /// # }
117 /// # }
118 /// # #[cfg(not(feature = "runtime-smol"))]
119 /// # fn example() {}
120 /// ```
121 pub fn new_smol(stream: S) -> Self {
122 Self::from_stream(stream)
123 }
124}
125
126impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
127 /// Send request and receive response from fastcgi server, under short
128 /// connection mode.
129 pub async fn execute_once<I: AsyncRead + Unpin>(
130 mut self, request: Request<'_, I>,
131 ) -> ClientResult<Response> {
132 self.inner_execute(request).await
133 }
134
135 /// Send request and receive response stream from fastcgi server, under
136 /// short connection mode.
137 ///
138 /// # Examples
139 ///
140 /// ```
141 /// # #[cfg(feature = "runtime-tokio")]
142 /// # async fn stream() {
143 /// use fastcgi_client::{io, response::Content, Client, Params, Request, StreamExt};
144 /// use tokio::net::TcpStream;
145 ///
146 /// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
147 /// let client = Client::new_tokio(stream);
148 /// let mut stream = client
149 /// .execute_once_stream(Request::new(Params::default(), io::empty()))
150 /// .await
151 /// .unwrap();
152 ///
153 /// while let Some(content) = stream.next().await {
154 /// let content = content.unwrap();
155 ///
156 /// match content {
157 /// Content::Stdout(out) => todo!(),
158 /// Content::Stderr(out) => todo!(),
159 /// }
160 /// }
161 /// }
162 /// # #[cfg(not(feature = "runtime-tokio"))]
163 /// # fn stream() {}
164 /// ```
165 pub async fn execute_once_stream<I: AsyncRead + Unpin>(
166 mut self, request: Request<'_, I>,
167 ) -> ClientResult<ResponseStream<S>> {
168 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
169 Ok(ResponseStream::new(self.stream, REQUEST_ID))
170 }
171}
172
173#[cfg(feature = "runtime-tokio")]
174impl<S> Client<TokioCompat<S>, KeepAlive>
175where
176 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
177{
178 /// Construct a `Client` Object with a Tokio stream under keep-alive mode.
179 ///
180 /// # Examples
181 ///
182 /// ```
183 /// # #[cfg(feature = "runtime-tokio")]
184 /// # async fn example() {
185 /// use fastcgi_client::Client;
186 /// use tokio::net::TcpStream;
187 /// # #[cfg(unix)]
188 /// # use tokio::net::UnixStream;
189 ///
190 /// let tcp_stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
191 /// let _tcp_client = Client::new_keep_alive_tokio(tcp_stream);
192 ///
193 /// # #[cfg(unix)]
194 /// # {
195 /// let unix_stream = UnixStream::connect("/run/php-fpm.sock").await.unwrap();
196 /// let _unix_client = Client::new_keep_alive_tokio(unix_stream);
197 /// # }
198 /// # }
199 /// # #[cfg(not(feature = "runtime-tokio"))]
200 /// # fn example() {}
201 /// ```
202 pub fn new_keep_alive_tokio(stream: S) -> Self {
203 Self::from_stream(stream.compat())
204 }
205}
206
207#[cfg(feature = "runtime-smol")]
208impl<S> Client<S, KeepAlive>
209where
210 S: AsyncRead + AsyncWrite + Unpin,
211{
212 /// Construct a `Client` Object with a Smol-compatible stream under
213 /// keep-alive mode.
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// # #[cfg(feature = "runtime-smol")]
219 /// # async fn example() {
220 /// use fastcgi_client::Client;
221 /// use smol::net::TcpStream;
222 /// # #[cfg(unix)]
223 /// # use smol::net::unix::UnixStream;
224 ///
225 /// let tcp_stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
226 /// let _tcp_client = Client::new_keep_alive_smol(tcp_stream);
227 ///
228 /// # #[cfg(unix)]
229 /// # {
230 /// let unix_stream = UnixStream::connect("/run/php-fpm.sock").await.unwrap();
231 /// let _unix_client = Client::new_keep_alive_smol(unix_stream);
232 /// # }
233 /// # }
234 /// # #[cfg(not(feature = "runtime-smol"))]
235 /// # fn example() {}
236 /// ```
237 pub fn new_keep_alive_smol(stream: S) -> Self {
238 Self::from_stream(stream)
239 }
240}
241
242impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
243 /// Send request and receive response from fastcgi server, under keep alive
244 /// connection mode.
245 pub async fn execute<I: AsyncRead + Unpin>(
246 &mut self, request: Request<'_, I>,
247 ) -> ClientResult<Response> {
248 self.inner_execute(request).await
249 }
250
251 /// Send request and receive response stream from fastcgi server, under
252 /// keep alive connection mode.
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// # #[cfg(feature = "runtime-tokio")]
258 /// # async fn stream() {
259 /// use fastcgi_client::{io, response::Content, Client, Params, Request, StreamExt};
260 /// use tokio::net::TcpStream;
261 ///
262 /// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
263 /// let mut client = Client::new_keep_alive_tokio(stream);
264 ///
265 /// for _ in (0..3) {
266 /// let mut stream = client
267 /// .execute_stream(Request::new(Params::default(), io::empty()))
268 /// .await
269 /// .unwrap();
270 ///
271 /// while let Some(content) = stream.next().await {
272 /// let content = content.unwrap();
273 ///
274 /// match content {
275 /// Content::Stdout(out) => todo!(),
276 /// Content::Stderr(out) => todo!(),
277 /// }
278 /// }
279 /// }
280 /// }
281 /// # #[cfg(not(feature = "runtime-tokio"))]
282 /// # fn stream() {}
283 /// ```
284 pub async fn execute_stream<I: AsyncRead + Unpin>(
285 &mut self, request: Request<'_, I>,
286 ) -> ClientResult<ResponseStream<&mut S>> {
287 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
288 Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
289 }
290}
291
292impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
293 /// Internal method to execute a request and return a complete response.
294 ///
295 /// # Arguments
296 ///
297 /// * `request` - The request to execute
298 async fn inner_execute<I: AsyncRead + Unpin>(
299 &mut self, request: Request<'_, I>,
300 ) -> ClientResult<Response> {
301 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
302 Self::handle_response(&mut self.stream, REQUEST_ID).await
303 }
304
305 /// Handles the complete request process.
306 ///
307 /// # Arguments
308 /// * `stream` - The stream to write to
309 /// * `id` - The request ID
310 /// * `params` - The request parameters
311 /// * `body` - The request body stream
312 async fn handle_request<I: AsyncRead + Unpin>(
313 stream: &mut S, id: u16, params: Params<'_>, mut body: I,
314 ) -> ClientResult<()> {
315 Self::handle_request_start(stream, id).await?;
316 Self::handle_request_params(stream, id, params).await?;
317 Self::handle_request_body(stream, id, &mut body).await?;
318 Self::handle_request_flush(stream).await?;
319 Ok(())
320 }
321
322 /// Handles the start of a request by sending the begin request record.
323 ///
324 /// # Arguments
325 ///
326 /// * `stream` - The stream to write to
327 /// * `id` - The request ID
328 async fn handle_request_start(stream: &mut S, id: u16) -> ClientResult<()> {
329 debug!(id, "Start handle request");
330
331 let begin_request_rec =
332 BeginRequestRec::new(id, Role::Responder, <M>::is_keep_alive()).await?;
333
334 debug!(id, ?begin_request_rec, "Send to stream.");
335
336 begin_request_rec.write_to_stream(stream).await?;
337
338 Ok(())
339 }
340
341 /// Handles sending request parameters to the stream.
342 ///
343 /// # Arguments
344 ///
345 /// * `stream` - The stream to write to
346 /// * `id` - The request ID
347 /// * `params` - The request parameters
348 async fn handle_request_params(
349 stream: &mut S, id: u16, params: Params<'_>,
350 ) -> ClientResult<()> {
351 let param_pairs = ParamPairs::new(params);
352 debug!(id, ?param_pairs, "Params will be sent.");
353
354 Header::write_to_stream_batches(
355 RequestType::Params,
356 id,
357 stream,
358 &mut ¶m_pairs.to_content().await?[..],
359 Some(|header| {
360 debug!(id, ?header, "Send to stream for Params.");
361 header
362 }),
363 )
364 .await?;
365
366 Header::write_to_stream_batches(
367 RequestType::Params,
368 id,
369 stream,
370 &mut io::empty(),
371 Some(|header| {
372 debug!(id, ?header, "Send to stream for Params.");
373 header
374 }),
375 )
376 .await?;
377
378 Ok(())
379 }
380
381 /// Handles sending the request body to the stream.
382 ///
383 /// # Arguments
384 ///
385 /// * `stream` - The stream to write to
386 /// * `id` - The request ID
387 /// * `body` - The request body stream
388 async fn handle_request_body<I: AsyncRead + Unpin>(
389 stream: &mut S, id: u16, body: &mut I,
390 ) -> ClientResult<()> {
391 Header::write_to_stream_batches(
392 RequestType::Stdin,
393 id,
394 stream,
395 body,
396 Some(|header| {
397 debug!(id, ?header, "Send to stream for Stdin.");
398 header
399 }),
400 )
401 .await?;
402
403 Header::write_to_stream_batches(
404 RequestType::Stdin,
405 id,
406 stream,
407 &mut io::empty(),
408 Some(|header| {
409 debug!(id, ?header, "Send to stream for Stdin.");
410 header
411 }),
412 )
413 .await?;
414
415 Ok(())
416 }
417
418 /// Flushes the stream to ensure all data is sent.
419 ///
420 /// # Arguments
421 ///
422 /// * `stream` - The stream to flush
423 async fn handle_request_flush(stream: &mut S) -> ClientResult<()> {
424 stream.flush().await?;
425
426 Ok(())
427 }
428
429 /// Handles reading and processing the response from the stream.
430 ///
431 /// # Arguments
432 ///
433 /// * `stream` - The stream to read from
434 /// * `id` - The request ID to match
435 async fn handle_response(stream: &mut S, id: u16) -> ClientResult<Response> {
436 let mut response = Response::default();
437
438 let mut stderr = Vec::new();
439 let mut stdout = Vec::new();
440
441 loop {
442 let header = Header::new_from_stream(stream).await?;
443 if header.request_id != id {
444 return Err(ClientError::ResponseNotFound { id });
445 }
446 debug!(id, ?header, "Receive from stream.");
447
448 match header.r#type {
449 RequestType::Stdout => {
450 stdout.extend(header.read_content_from_stream(stream).await?);
451 }
452 RequestType::Stderr => {
453 stderr.extend(header.read_content_from_stream(stream).await?);
454 }
455 RequestType::EndRequest => {
456 let end_request_rec = EndRequestRec::from_header(&header, stream).await?;
457 debug!(id, ?end_request_rec, "Receive from stream.");
458
459 end_request_rec
460 .end_request
461 .protocol_status
462 .convert_to_client_result(end_request_rec.end_request.app_status)?;
463
464 response.stdout = if stdout.is_empty() {
465 None
466 } else {
467 Some(stdout)
468 };
469 response.stderr = if stderr.is_empty() {
470 None
471 } else {
472 Some(stderr)
473 };
474
475 return Ok(response);
476 }
477 r#type => {
478 return Err(ClientError::UnknownRequestType {
479 request_type: r#type,
480 });
481 }
482 }
483 }
484 }
485}