Skip to main content

fastcgi_client/
client.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI client implementation for async communication with FastCGI servers.
16//!
17//! This module provides the main `Client` struct that handles communication
18//! with FastCGI servers in both short connection and keep-alive modes.
19//! The client can execute requests and receive responses or response streams.
20
21use crate::{
22    ClientError, ClientResult, Response,
23    conn::{KeepAlive, Mode, ShortConn},
24    io::{self, AsyncRead, AsyncWrite, AsyncWriteExt},
25    meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
26    params::Params,
27    request::Request,
28    response::ResponseStream,
29};
30use std::marker::PhantomData;
31use tracing::debug;
32
33#[cfg(feature = "runtime-tokio")]
34use crate::io::{TokioAsyncReadCompatExt, TokioCompat};
35
36/// I refer to nginx fastcgi implementation, found the request id is always 1.
37///
38/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
39const REQUEST_ID: u16 = 1;
40
41/// Async client for handling communication between fastcgi server.
42pub struct Client<S, M> {
43    stream: S,
44    _mode: PhantomData<M>,
45}
46
47impl<S, M> Client<S, M> {
48    fn from_stream(stream: S) -> Self {
49        Self {
50            stream,
51            _mode: PhantomData,
52        }
53    }
54}
55
56#[cfg(feature = "runtime-tokio")]
57impl<S> Client<TokioCompat<S>, ShortConn>
58where
59    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
60{
61    /// Construct a `Client` Object with a Tokio stream under short connection
62    /// mode.
63    ///
64    /// # Examples
65    ///
66    /// ```
67    /// # #[cfg(feature = "runtime-tokio")]
68    /// # async fn example() {
69    /// use fastcgi_client::Client;
70    /// use tokio::net::TcpStream;
71    /// # #[cfg(unix)]
72    /// # use tokio::net::UnixStream;
73    ///
74    /// let tcp_stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
75    /// let _tcp_client = Client::new_tokio(tcp_stream);
76    ///
77    /// # #[cfg(unix)]
78    /// # {
79    /// let unix_stream = UnixStream::connect("/run/php-fpm.sock").await.unwrap();
80    /// let _unix_client = Client::new_tokio(unix_stream);
81    /// # }
82    /// # }
83    /// # #[cfg(not(feature = "runtime-tokio"))]
84    /// # fn example() {}
85    /// ```
86    pub fn new_tokio(stream: S) -> Self {
87        Self::from_stream(stream.compat())
88    }
89}
90
91#[cfg(feature = "runtime-smol")]
92impl<S> Client<S, ShortConn>
93where
94    S: AsyncRead + AsyncWrite + Unpin,
95{
96    /// Construct a `Client` Object with a Smol-compatible stream under short
97    /// connection mode.
98    ///
99    /// # Examples
100    ///
101    /// ```
102    /// # #[cfg(feature = "runtime-smol")]
103    /// # async fn example() {
104    /// use fastcgi_client::Client;
105    /// use smol::net::TcpStream;
106    /// # #[cfg(unix)]
107    /// # use smol::net::unix::UnixStream;
108    ///
109    /// let tcp_stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
110    /// let _tcp_client = Client::new_smol(tcp_stream);
111    ///
112    /// # #[cfg(unix)]
113    /// # {
114    /// let unix_stream = UnixStream::connect("/run/php-fpm.sock").await.unwrap();
115    /// let _unix_client = Client::new_smol(unix_stream);
116    /// # }
117    /// # }
118    /// # #[cfg(not(feature = "runtime-smol"))]
119    /// # fn example() {}
120    /// ```
121    pub fn new_smol(stream: S) -> Self {
122        Self::from_stream(stream)
123    }
124}
125
126impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
127    /// Send request and receive response from fastcgi server, under short
128    /// connection mode.
129    pub async fn execute_once<I: AsyncRead + Unpin>(
130        mut self, request: Request<'_, I>,
131    ) -> ClientResult<Response> {
132        self.inner_execute(request).await
133    }
134
135    /// Send request and receive response stream from fastcgi server, under
136    /// short connection mode.
137    ///
138    /// # Examples
139    ///
140    /// ```
141    /// # #[cfg(feature = "runtime-tokio")]
142    /// # async fn stream() {
143    /// use fastcgi_client::{io, response::Content, Client, Params, Request, StreamExt};
144    /// use tokio::net::TcpStream;
145    ///
146    ///     let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
147    ///     let client = Client::new_tokio(stream);
148    ///     let mut stream = client
149    ///         .execute_once_stream(Request::new(Params::default(), io::empty()))
150    ///         .await
151    ///         .unwrap();
152    ///
153    ///     while let Some(content) = stream.next().await {
154    ///         let content = content.unwrap();
155    ///
156    ///         match content {
157    ///             Content::Stdout(out) => todo!(),
158    ///             Content::Stderr(out) => todo!(),
159    ///         }
160    ///     }
161    /// }
162    /// # #[cfg(not(feature = "runtime-tokio"))]
163    /// # fn stream() {}
164    /// ```
165    pub async fn execute_once_stream<I: AsyncRead + Unpin>(
166        mut self, request: Request<'_, I>,
167    ) -> ClientResult<ResponseStream<S>> {
168        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
169        Ok(ResponseStream::new(self.stream, REQUEST_ID))
170    }
171}
172
173#[cfg(feature = "runtime-tokio")]
174impl<S> Client<TokioCompat<S>, KeepAlive>
175where
176    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
177{
178    /// Construct a `Client` Object with a Tokio stream under keep-alive mode.
179    ///
180    /// # Examples
181    ///
182    /// ```
183    /// # #[cfg(feature = "runtime-tokio")]
184    /// # async fn example() {
185    /// use fastcgi_client::Client;
186    /// use tokio::net::TcpStream;
187    /// # #[cfg(unix)]
188    /// # use tokio::net::UnixStream;
189    ///
190    /// let tcp_stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
191    /// let _tcp_client = Client::new_keep_alive_tokio(tcp_stream);
192    ///
193    /// # #[cfg(unix)]
194    /// # {
195    /// let unix_stream = UnixStream::connect("/run/php-fpm.sock").await.unwrap();
196    /// let _unix_client = Client::new_keep_alive_tokio(unix_stream);
197    /// # }
198    /// # }
199    /// # #[cfg(not(feature = "runtime-tokio"))]
200    /// # fn example() {}
201    /// ```
202    pub fn new_keep_alive_tokio(stream: S) -> Self {
203        Self::from_stream(stream.compat())
204    }
205}
206
207#[cfg(feature = "runtime-smol")]
208impl<S> Client<S, KeepAlive>
209where
210    S: AsyncRead + AsyncWrite + Unpin,
211{
212    /// Construct a `Client` Object with a Smol-compatible stream under
213    /// keep-alive mode.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// # #[cfg(feature = "runtime-smol")]
219    /// # async fn example() {
220    /// use fastcgi_client::Client;
221    /// use smol::net::TcpStream;
222    /// # #[cfg(unix)]
223    /// # use smol::net::unix::UnixStream;
224    ///
225    /// let tcp_stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
226    /// let _tcp_client = Client::new_keep_alive_smol(tcp_stream);
227    ///
228    /// # #[cfg(unix)]
229    /// # {
230    /// let unix_stream = UnixStream::connect("/run/php-fpm.sock").await.unwrap();
231    /// let _unix_client = Client::new_keep_alive_smol(unix_stream);
232    /// # }
233    /// # }
234    /// # #[cfg(not(feature = "runtime-smol"))]
235    /// # fn example() {}
236    /// ```
237    pub fn new_keep_alive_smol(stream: S) -> Self {
238        Self::from_stream(stream)
239    }
240}
241
242impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
243    /// Send request and receive response from fastcgi server, under keep alive
244    /// connection mode.
245    pub async fn execute<I: AsyncRead + Unpin>(
246        &mut self, request: Request<'_, I>,
247    ) -> ClientResult<Response> {
248        self.inner_execute(request).await
249    }
250
251    /// Send request and receive response stream from fastcgi server, under
252    /// keep alive connection mode.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// # #[cfg(feature = "runtime-tokio")]
258    /// # async fn stream() {
259    /// use fastcgi_client::{io, response::Content, Client, Params, Request, StreamExt};
260    /// use tokio::net::TcpStream;
261    ///
262    ///     let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
263    ///     let mut client = Client::new_keep_alive_tokio(stream);
264    ///
265    ///     for _ in (0..3) {
266    ///         let mut stream = client
267    ///             .execute_stream(Request::new(Params::default(), io::empty()))
268    ///             .await
269    ///             .unwrap();
270    ///
271    ///         while let Some(content) = stream.next().await {
272    ///             let content = content.unwrap();
273    ///
274    ///             match content {
275    ///                 Content::Stdout(out) => todo!(),
276    ///                 Content::Stderr(out) => todo!(),
277    ///             }
278    ///         }
279    ///     }
280    /// }
281    /// # #[cfg(not(feature = "runtime-tokio"))]
282    /// # fn stream() {}
283    /// ```
284    pub async fn execute_stream<I: AsyncRead + Unpin>(
285        &mut self, request: Request<'_, I>,
286    ) -> ClientResult<ResponseStream<&mut S>> {
287        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
288        Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
289    }
290}
291
292impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
293    /// Internal method to execute a request and return a complete response.
294    ///
295    /// # Arguments
296    ///
297    /// * `request` - The request to execute
298    async fn inner_execute<I: AsyncRead + Unpin>(
299        &mut self, request: Request<'_, I>,
300    ) -> ClientResult<Response> {
301        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
302        Self::handle_response(&mut self.stream, REQUEST_ID).await
303    }
304
305    /// Handles the complete request process.
306    ///
307    /// # Arguments
308    /// * `stream` - The stream to write to
309    /// * `id` - The request ID
310    /// * `params` - The request parameters
311    /// * `body` - The request body stream
312    async fn handle_request<I: AsyncRead + Unpin>(
313        stream: &mut S, id: u16, params: Params<'_>, mut body: I,
314    ) -> ClientResult<()> {
315        Self::handle_request_start(stream, id).await?;
316        Self::handle_request_params(stream, id, params).await?;
317        Self::handle_request_body(stream, id, &mut body).await?;
318        Self::handle_request_flush(stream).await?;
319        Ok(())
320    }
321
322    /// Handles the start of a request by sending the begin request record.
323    ///
324    /// # Arguments
325    ///
326    /// * `stream` - The stream to write to
327    /// * `id` - The request ID
328    async fn handle_request_start(stream: &mut S, id: u16) -> ClientResult<()> {
329        debug!(id, "Start handle request");
330
331        let begin_request_rec =
332            BeginRequestRec::new(id, Role::Responder, <M>::is_keep_alive()).await?;
333
334        debug!(id, ?begin_request_rec, "Send to stream.");
335
336        begin_request_rec.write_to_stream(stream).await?;
337
338        Ok(())
339    }
340
341    /// Handles sending request parameters to the stream.
342    ///
343    /// # Arguments
344    ///
345    /// * `stream` - The stream to write to
346    /// * `id` - The request ID
347    /// * `params` - The request parameters
348    async fn handle_request_params(
349        stream: &mut S, id: u16, params: Params<'_>,
350    ) -> ClientResult<()> {
351        let param_pairs = ParamPairs::new(params);
352        debug!(id, ?param_pairs, "Params will be sent.");
353
354        Header::write_to_stream_batches(
355            RequestType::Params,
356            id,
357            stream,
358            &mut &param_pairs.to_content().await?[..],
359            Some(|header| {
360                debug!(id, ?header, "Send to stream for Params.");
361                header
362            }),
363        )
364        .await?;
365
366        Header::write_to_stream_batches(
367            RequestType::Params,
368            id,
369            stream,
370            &mut io::empty(),
371            Some(|header| {
372                debug!(id, ?header, "Send to stream for Params.");
373                header
374            }),
375        )
376        .await?;
377
378        Ok(())
379    }
380
381    /// Handles sending the request body to the stream.
382    ///
383    /// # Arguments
384    ///
385    /// * `stream` - The stream to write to
386    /// * `id` - The request ID
387    /// * `body` - The request body stream
388    async fn handle_request_body<I: AsyncRead + Unpin>(
389        stream: &mut S, id: u16, body: &mut I,
390    ) -> ClientResult<()> {
391        Header::write_to_stream_batches(
392            RequestType::Stdin,
393            id,
394            stream,
395            body,
396            Some(|header| {
397                debug!(id, ?header, "Send to stream for Stdin.");
398                header
399            }),
400        )
401        .await?;
402
403        Header::write_to_stream_batches(
404            RequestType::Stdin,
405            id,
406            stream,
407            &mut io::empty(),
408            Some(|header| {
409                debug!(id, ?header, "Send to stream for Stdin.");
410                header
411            }),
412        )
413        .await?;
414
415        Ok(())
416    }
417
418    /// Flushes the stream to ensure all data is sent.
419    ///
420    /// # Arguments
421    ///
422    /// * `stream` - The stream to flush
423    async fn handle_request_flush(stream: &mut S) -> ClientResult<()> {
424        stream.flush().await?;
425
426        Ok(())
427    }
428
429    /// Handles reading and processing the response from the stream.
430    ///
431    /// # Arguments
432    ///
433    /// * `stream` - The stream to read from
434    /// * `id` - The request ID to match
435    async fn handle_response(stream: &mut S, id: u16) -> ClientResult<Response> {
436        let mut response = Response::default();
437
438        let mut stderr = Vec::new();
439        let mut stdout = Vec::new();
440
441        loop {
442            let header = Header::new_from_stream(stream).await?;
443            if header.request_id != id {
444                return Err(ClientError::ResponseNotFound { id });
445            }
446            debug!(id, ?header, "Receive from stream.");
447
448            match header.r#type {
449                RequestType::Stdout => {
450                    stdout.extend(header.read_content_from_stream(stream).await?);
451                }
452                RequestType::Stderr => {
453                    stderr.extend(header.read_content_from_stream(stream).await?);
454                }
455                RequestType::EndRequest => {
456                    let end_request_rec = EndRequestRec::from_header(&header, stream).await?;
457                    debug!(id, ?end_request_rec, "Receive from stream.");
458
459                    end_request_rec
460                        .end_request
461                        .protocol_status
462                        .convert_to_client_result(end_request_rec.end_request.app_status)?;
463
464                    response.stdout = if stdout.is_empty() {
465                        None
466                    } else {
467                        Some(stdout)
468                    };
469                    response.stderr = if stderr.is_empty() {
470                        None
471                    } else {
472                        Some(stderr)
473                    };
474
475                    return Ok(response);
476                }
477                r#type => {
478                    return Err(ClientError::UnknownRequestType {
479                        request_type: r#type,
480                    });
481                }
482            }
483        }
484    }
485}