fastcgi_client/
client.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI client implementation for async communication with FastCGI servers.
16//!
17//! This module provides the main `Client` struct that handles communication
18//! with FastCGI servers in both short connection and keep-alive modes.
19//! The client can execute requests and receive responses or response streams.
20
21use crate::{
22    conn::{KeepAlive, Mode, ShortConn},
23    meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
24    params::Params,
25    request::Request,
26    response::ResponseStream,
27    ClientError, ClientResult, Response,
28};
29use std::marker::PhantomData;
30use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
31use tracing::debug;
32
33/// I refer to nginx fastcgi implementation, found the request id is always 1.
34///
35/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
36const REQUEST_ID: u16 = 1;
37
38/// Async client for handling communication between fastcgi server.
39pub struct Client<S, M> {
40    stream: S,
41    _mode: PhantomData<M>,
42}
43
44impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
45    /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
46    /// or `tokio::net::UnixStream`, under short connection mode.
47    pub fn new(stream: S) -> Self {
48        Self {
49            stream,
50            _mode: PhantomData,
51        }
52    }
53
54    /// Send request and receive response from fastcgi server, under short
55    /// connection mode.
56    pub async fn execute_once<I: AsyncRead + Unpin>(
57        mut self, request: Request<'_, I>,
58    ) -> ClientResult<Response> {
59        self.inner_execute(request).await
60    }
61
62    /// Send request and receive response stream from fastcgi server, under
63    /// short connection mode.
64    ///
65    /// # Examples
66    ///
67    /// ```
68    /// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
69    /// use tokio::{io, net::TcpStream};
70    ///
71    /// async fn stream() {
72    ///     let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
73    ///     let client = Client::new(stream);
74    ///     let mut stream = client
75    ///         .execute_once_stream(Request::new(Params::default(), &mut io::empty()))
76    ///         .await
77    ///         .unwrap();
78    ///
79    ///     while let Some(content) = stream.next().await {
80    ///         let content = content.unwrap();
81    ///
82    ///         match content {
83    ///             Content::Stdout(out) => todo!(),
84    ///             Content::Stderr(out) => todo!(),
85    ///         }
86    ///     }
87    /// }
88    /// ```
89    pub async fn execute_once_stream<I: AsyncRead + Unpin>(
90        mut self, request: Request<'_, I>,
91    ) -> ClientResult<ResponseStream<S>> {
92        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
93        Ok(ResponseStream::new(self.stream, REQUEST_ID))
94    }
95}
96
97impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
98    /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
99    /// or `tokio::net::UnixStream`, under keep alive connection mode.
100    pub fn new_keep_alive(stream: S) -> Self {
101        Self {
102            stream,
103            _mode: PhantomData,
104        }
105    }
106
107    /// Send request and receive response from fastcgi server, under keep alive
108    /// connection mode.
109    pub async fn execute<I: AsyncRead + Unpin>(
110        &mut self, request: Request<'_, I>,
111    ) -> ClientResult<Response> {
112        self.inner_execute(request).await
113    }
114
115    /// Send request and receive response stream from fastcgi server, under
116    /// keep alive connection mode.
117    ///
118    /// # Examples
119    ///
120    /// ```
121    /// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
122    /// use tokio::{io, net::TcpStream};
123    ///
124    /// async fn stream() {
125    ///     let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
126    ///     let mut client = Client::new_keep_alive(stream);
127    ///
128    ///     for _ in (0..3) {
129    ///         let mut stream = client
130    ///             .execute_stream(Request::new(Params::default(), &mut io::empty()))
131    ///             .await
132    ///             .unwrap();
133    ///
134    ///         while let Some(content) = stream.next().await {
135    ///             let content = content.unwrap();
136    ///
137    ///             match content {
138    ///                 Content::Stdout(out) => todo!(),
139    ///                 Content::Stderr(out) => todo!(),
140    ///             }
141    ///         }
142    ///     }
143    /// }
144    /// ```
145    pub async fn execute_stream<I: AsyncRead + Unpin>(
146        &mut self, request: Request<'_, I>,
147    ) -> ClientResult<ResponseStream<&mut S>> {
148        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
149        Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
150    }
151}
152
153impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
154    /// Internal method to execute a request and return a complete response.
155    ///
156    /// # Arguments
157    ///
158    /// * `request` - The request to execute
159    async fn inner_execute<I: AsyncRead + Unpin>(
160        &mut self, request: Request<'_, I>,
161    ) -> ClientResult<Response> {
162        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
163        Self::handle_response(&mut self.stream, REQUEST_ID).await
164    }
165
166    /// Handles the complete request process.
167    ///
168    /// # Arguments
169    ///
170    /// * `stream` - The stream to write to
171    /// * `id` - The request ID
172    /// * `params` - The request parameters
173    /// * `body` - The request body stream
174    async fn handle_request<'a, I: AsyncRead + Unpin>(
175        stream: &mut S, id: u16, params: Params<'a>, mut body: I,
176    ) -> ClientResult<()> {
177        Self::handle_request_start(stream, id).await?;
178        Self::handle_request_params(stream, id, params).await?;
179        Self::handle_request_body(stream, id, &mut body).await?;
180        Self::handle_request_flush(stream).await?;
181        Ok(())
182    }
183
184    /// Handles the start of a request by sending the begin request record.
185    ///
186    /// # Arguments
187    ///
188    /// * `stream` - The stream to write to
189    /// * `id` - The request ID
190    async fn handle_request_start(stream: &mut S, id: u16) -> ClientResult<()> {
191        debug!(id, "Start handle request");
192
193        let begin_request_rec =
194            BeginRequestRec::new(id, Role::Responder, <M>::is_keep_alive()).await?;
195
196        debug!(id, ?begin_request_rec, "Send to stream.");
197
198        begin_request_rec.write_to_stream(stream).await?;
199
200        Ok(())
201    }
202
203    /// Handles sending request parameters to the stream.
204    ///
205    /// # Arguments
206    ///
207    /// * `stream` - The stream to write to
208    /// * `id` - The request ID
209    /// * `params` - The request parameters
210    async fn handle_request_params<'a>(
211        stream: &mut S, id: u16, params: Params<'a>,
212    ) -> ClientResult<()> {
213        let param_pairs = ParamPairs::new(params);
214        debug!(id, ?param_pairs, "Params will be sent.");
215
216        Header::write_to_stream_batches(
217            RequestType::Params,
218            id,
219            stream,
220            &mut &param_pairs.to_content().await?[..],
221            Some(|header| {
222                debug!(id, ?header, "Send to stream for Params.");
223                header
224            }),
225        )
226        .await?;
227
228        Header::write_to_stream_batches(
229            RequestType::Params,
230            id,
231            stream,
232            &mut tokio::io::empty(),
233            Some(|header| {
234                debug!(id, ?header, "Send to stream for Params.");
235                header
236            }),
237        )
238        .await?;
239
240        Ok(())
241    }
242
243    /// Handles sending the request body to the stream.
244    ///
245    /// # Arguments
246    ///
247    /// * `stream` - The stream to write to
248    /// * `id` - The request ID
249    /// * `body` - The request body stream
250    async fn handle_request_body<I: AsyncRead + Unpin>(
251        stream: &mut S, id: u16, body: &mut I,
252    ) -> ClientResult<()> {
253        Header::write_to_stream_batches(
254            RequestType::Stdin,
255            id,
256            stream,
257            body,
258            Some(|header| {
259                debug!(id, ?header, "Send to stream for Stdin.");
260                header
261            }),
262        )
263        .await?;
264
265        Header::write_to_stream_batches(
266            RequestType::Stdin,
267            id,
268            stream,
269            &mut tokio::io::empty(),
270            Some(|header| {
271                debug!(id, ?header, "Send to stream for Stdin.");
272                header
273            }),
274        )
275        .await?;
276
277        Ok(())
278    }
279
280    /// Flushes the stream to ensure all data is sent.
281    ///
282    /// # Arguments
283    ///
284    /// * `stream` - The stream to flush
285    async fn handle_request_flush(stream: &mut S) -> ClientResult<()> {
286        stream.flush().await?;
287
288        Ok(())
289    }
290
291    /// Handles reading and processing the response from the stream.
292    ///
293    /// # Arguments
294    ///
295    /// * `stream` - The stream to read from
296    /// * `id` - The request ID to match
297    async fn handle_response(stream: &mut S, id: u16) -> ClientResult<Response> {
298        let mut response = Response::default();
299
300        let mut stderr = Vec::new();
301        let mut stdout = Vec::new();
302
303        loop {
304            let header = Header::new_from_stream(stream).await?;
305            if header.request_id != id {
306                return Err(ClientError::ResponseNotFound { id });
307            }
308            debug!(id, ?header, "Receive from stream.");
309
310            match header.r#type {
311                RequestType::Stdout => {
312                    stdout.extend(header.read_content_from_stream(stream).await?);
313                }
314                RequestType::Stderr => {
315                    stderr.extend(header.read_content_from_stream(stream).await?);
316                }
317                RequestType::EndRequest => {
318                    let end_request_rec = EndRequestRec::from_header(&header, stream).await?;
319                    debug!(id, ?end_request_rec, "Receive from stream.");
320
321                    end_request_rec
322                        .end_request
323                        .protocol_status
324                        .convert_to_client_result(end_request_rec.end_request.app_status)?;
325
326                    response.stdout = if stdout.is_empty() {
327                        None
328                    } else {
329                        Some(stdout)
330                    };
331                    response.stderr = if stderr.is_empty() {
332                        None
333                    } else {
334                        Some(stderr)
335                    };
336
337                    return Ok(response);
338                }
339                r#type => {
340                    return Err(ClientError::UnknownRequestType {
341                        request_type: r#type,
342                    })
343                }
344            }
345        }
346    }
347}