Skip to main content

fastcgi_connect/
client.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI client implementation for async communication with FastCGI servers.
16//!
17//! This module provides the main `Client` struct that handles communication
18//! with FastCGI servers in both short connection and keep-alive modes.
19//! The client can execute requests and receive responses or response streams.
20
21use crate::{
22    conn::{KeepAlive, Mode, ShortConn},
23    meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
24    params::Params,
25    request::Request,
26    response::ResponseStream,
27    ClientError, ClientResult, Response,
28};
29#[cfg(feature = "smol")]
30use smol::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
31use std::marker::PhantomData;
32#[cfg(feature = "tokio")]
33use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
34use tracing::debug;
35
36/// I refer to nginx fastcgi implementation, found the request id is always 1.
37///
38/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
39const REQUEST_ID: u16 = 1;
40
41/// Async client for handling communication between fastcgi server.
42pub struct Client<S, M> {
43    stream: S,
44    _mode: PhantomData<M>,
45}
46
47impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
48    /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
49    /// or `tokio::net::UnixStream`, under short connection mode.
50    pub fn new(stream: S) -> Self {
51        Self {
52            stream,
53            _mode: PhantomData,
54        }
55    }
56
57    /// Send request and receive response from fastcgi server, under short
58    /// connection mode.
59    pub async fn execute_once<I: AsyncRead + Unpin>(
60        mut self, request: Request<'_, I>,
61    ) -> ClientResult<Response> {
62        self.inner_execute(request).await
63    }
64
65    /// Send request and receive response stream from fastcgi server, under
66    /// short connection mode.
67    ///
68    /// # Examples
69    ///
70    /// ```
71    /// use fastcgi_connect::{response::Content, Client, Params, Request, StreamExt};
72    /// #[cfg(feature = "smol")]
73    /// use smol::{io, net::TcpStream};
74    /// #[cfg(feature = "tokio")]
75    /// use tokio::{io, net::TcpStream};
76    ///
77    /// async fn stream() {
78    ///     let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
79    ///     let client = Client::new(stream);
80    ///     let mut stream = client
81    ///         .execute_once_stream(Request::new(Params::default(), &mut io::empty()))
82    ///         .await
83    ///         .unwrap();
84    ///
85    ///     while let Some(content) = stream.next().await {
86    ///         let content = content.unwrap();
87    ///
88    ///         match content {
89    ///             Content::Stdout(out) => todo!(),
90    ///             Content::Stderr(out) => todo!(),
91    ///         }
92    ///     }
93    /// }
94    /// ```
95    pub async fn execute_once_stream<I: AsyncRead + Unpin>(
96        mut self, request: Request<'_, I>,
97    ) -> ClientResult<ResponseStream<S>> {
98        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
99        Ok(ResponseStream::new(self.stream, REQUEST_ID))
100    }
101}
102
103impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
104    /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
105    /// or `tokio::net::UnixStream`, under keep alive connection mode.
106    pub fn new_keep_alive(stream: S) -> Self {
107        Self {
108            stream,
109            _mode: PhantomData,
110        }
111    }
112
113    /// Send request and receive response from fastcgi server, under keep alive
114    /// connection mode.
115    pub async fn execute<I: AsyncRead + Unpin>(
116        &mut self, request: Request<'_, I>,
117    ) -> ClientResult<Response> {
118        self.inner_execute(request).await
119    }
120
121    /// Send request and receive response stream from fastcgi server, under
122    /// keep alive connection mode.
123    ///
124    /// # Examples
125    ///
126    /// ```
127    /// use fastcgi_connect::{response::Content, Client, Params, Request, StreamExt};
128    /// #[cfg(feature = "smol")]
129    /// use smol::{io, net::TcpStream};
130    /// #[cfg(feature = "tokio")]
131    /// use tokio::{io, net::TcpStream};
132    ///
133    /// async fn stream() {
134    ///     let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
135    ///     let mut client = Client::new_keep_alive(stream);
136    ///
137    ///     for _ in (0..3) {
138    ///         let mut stream = client
139    ///             .execute_stream(Request::new(Params::default(), &mut io::empty()))
140    ///             .await
141    ///             .unwrap();
142    ///
143    ///         while let Some(content) = stream.next().await {
144    ///             let content = content.unwrap();
145    ///
146    ///             match content {
147    ///                 Content::Stdout(out) => todo!(),
148    ///                 Content::Stderr(out) => todo!(),
149    ///             }
150    ///         }
151    ///     }
152    /// }
153    /// ```
154    pub async fn execute_stream<I: AsyncRead + Unpin>(
155        &mut self, request: Request<'_, I>,
156    ) -> ClientResult<ResponseStream<&mut S>> {
157        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
158        Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
159    }
160}
161
162impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
163    /// Internal method to execute a request and return a complete response.
164    ///
165    /// # Arguments
166    ///
167    /// * `request` - The request to execute
168    async fn inner_execute<I: AsyncRead + Unpin>(
169        &mut self, request: Request<'_, I>,
170    ) -> ClientResult<Response> {
171        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
172        Self::handle_response(&mut self.stream, REQUEST_ID).await
173    }
174
175    /// Handles the complete request process.
176    ///
177    /// # Arguments
178    ///
179    /// * `stream` - The stream to write to
180    /// * `id` - The request ID
181    /// * `params` - The request parameters
182    /// * `body` - The request body stream
183    async fn handle_request<'a, I: AsyncRead + Unpin>(
184        stream: &mut S, id: u16, params: Params<'a>, mut body: I,
185    ) -> ClientResult<()> {
186        Self::handle_request_start(stream, id).await?;
187        Self::handle_request_params(stream, id, params).await?;
188        Self::handle_request_body(stream, id, &mut body).await?;
189        Self::handle_request_flush(stream).await?;
190        Ok(())
191    }
192
193    /// Handles the start of a request by sending the begin request record.
194    ///
195    /// # Arguments
196    ///
197    /// * `stream` - The stream to write to
198    /// * `id` - The request ID
199    async fn handle_request_start(stream: &mut S, id: u16) -> ClientResult<()> {
200        debug!(id, "Start handle request");
201
202        let begin_request_rec =
203            BeginRequestRec::new(id, Role::Responder, <M>::is_keep_alive()).await?;
204
205        debug!(id, ?begin_request_rec, "Send to stream.");
206
207        begin_request_rec.write_to_stream(stream).await?;
208
209        Ok(())
210    }
211
212    /// Handles sending request parameters to the stream.
213    ///
214    /// # Arguments
215    ///
216    /// * `stream` - The stream to write to
217    /// * `id` - The request ID
218    /// * `params` - The request parameters
219    async fn handle_request_params<'a>(
220        stream: &mut S, id: u16, params: Params<'a>,
221    ) -> ClientResult<()> {
222        let param_pairs = ParamPairs::new(params);
223        debug!(id, ?param_pairs, "Params will be sent.");
224
225        Header::write_to_stream_batches(
226            RequestType::Params,
227            id,
228            stream,
229            &mut &param_pairs.to_content().await?[..],
230            Some(|header| {
231                debug!(id, ?header, "Send to stream for Params.");
232                header
233            }),
234        )
235        .await?;
236
237        #[cfg(feature = "smol")]
238        let mut empty = smol::io::empty();
239        #[cfg(feature = "tokio")]
240        let mut empty = tokio::io::empty();
241
242        Header::write_to_stream_batches(
243            RequestType::Params,
244            id,
245            stream,
246            &mut empty,
247            Some(|header| {
248                debug!(id, ?header, "Send to stream for Params.");
249                header
250            }),
251        )
252        .await?;
253
254        Ok(())
255    }
256
257    /// Handles sending the request body to the stream.
258    ///
259    /// # Arguments
260    ///
261    /// * `stream` - The stream to write to
262    /// * `id` - The request ID
263    /// * `body` - The request body stream
264    async fn handle_request_body<I: AsyncRead + Unpin>(
265        stream: &mut S, id: u16, body: &mut I,
266    ) -> ClientResult<()> {
267        Header::write_to_stream_batches(
268            RequestType::Stdin,
269            id,
270            stream,
271            body,
272            Some(|header| {
273                debug!(id, ?header, "Send to stream for Stdin.");
274                header
275            }),
276        )
277        .await?;
278
279        #[cfg(feature = "smol")]
280        let mut empty = smol::io::empty();
281        #[cfg(feature = "tokio")]
282        let mut empty = tokio::io::empty();
283
284        Header::write_to_stream_batches(
285            RequestType::Stdin,
286            id,
287            stream,
288            &mut empty,
289            Some(|header| {
290                debug!(id, ?header, "Send to stream for Stdin.");
291                header
292            }),
293        )
294        .await?;
295
296        Ok(())
297    }
298
299    /// Flushes the stream to ensure all data is sent.
300    ///
301    /// # Arguments
302    ///
303    /// * `stream` - The stream to flush
304    async fn handle_request_flush(stream: &mut S) -> ClientResult<()> {
305        stream.flush().await?;
306
307        Ok(())
308    }
309
310    /// Handles reading and processing the response from the stream.
311    ///
312    /// # Arguments
313    ///
314    /// * `stream` - The stream to read from
315    /// * `id` - The request ID to match
316    async fn handle_response(stream: &mut S, id: u16) -> ClientResult<Response> {
317        let mut response = Response::default();
318
319        let mut stderr = Vec::new();
320        let mut stdout = Vec::new();
321
322        loop {
323            let header = Header::new_from_stream(stream).await?;
324            if header.request_id != id {
325                return Err(ClientError::ResponseNotFound { id });
326            }
327            debug!(id, ?header, "Receive from stream.");
328
329            match header.r#type {
330                RequestType::Stdout => {
331                    stdout.extend(header.read_content_from_stream(stream).await?);
332                }
333                RequestType::Stderr => {
334                    stderr.extend(header.read_content_from_stream(stream).await?);
335                }
336                RequestType::EndRequest => {
337                    let end_request_rec = EndRequestRec::from_header(&header, stream).await?;
338                    debug!(id, ?end_request_rec, "Receive from stream.");
339
340                    end_request_rec
341                        .end_request
342                        .protocol_status
343                        .convert_to_client_result(end_request_rec.end_request.app_status)?;
344
345                    response.stdout = if stdout.is_empty() {
346                        None
347                    } else {
348                        Some(stdout)
349                    };
350                    response.stderr = if stderr.is_empty() {
351                        None
352                    } else {
353                        Some(stderr)
354                    };
355
356                    return Ok(response);
357                }
358                r#type => {
359                    return Err(ClientError::UnknownRequestType {
360                        request_type: r#type,
361                    })
362                }
363            }
364        }
365    }
366}