fcgi_client/
client.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI client implementation for async communication with FastCGI servers.
16//!
17//! This module provides the main `Client` struct that handles communication
18//! with FastCGI servers in both short connection and keep-alive modes.
19//! The client can execute requests and receive responses or response streams.
20
21use crate::{
22    ClientError, ClientResult, Response,
23    conn::{KeepAlive, Mode, ShortConn},
24    meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
25    params::Params,
26    request::Request,
27    response::ResponseStream,
28};
29use bytes::BytesMut;
30use std::{
31    marker::PhantomData,
32    ops::{Deref, DerefMut},
33};
34use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
35use tracing::debug;
36
37/// I refer to nginx fastcgi implementation, found the request id is always 1.
38///
39/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
40const REQUEST_ID: u16 = 1;
41
42/// Async client for handling communication between fastcgi server.
43pub struct Client<S, M> {
44    stream: S,
45    _mode: PhantomData<M>,
46}
47
48impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
49    /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
50    /// or `tokio::net::UnixStream`, under short connection mode.
51    pub fn new(stream: S) -> Self {
52        Self {
53            stream,
54            _mode: PhantomData,
55        }
56    }
57
58    /// Send request and receive response from fastcgi server, under short
59    /// connection mode.
60    pub async fn execute_once<I: AsyncRead + Unpin>(
61        mut self,
62        request: Request<'_, I>,
63    ) -> ClientResult<Response> {
64        self.inner_execute(request).await
65    }
66
67    /// Send request and receive response stream from fastcgi server, under
68    /// short connection mode.
69    ///
70    /// # Examples
71    ///
72    /// ```
73    /// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
74    /// use tokio::{io, net::TcpStream};
75    ///
76    /// async fn stream() {
77    ///     let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
78    ///     let client = Client::new(stream);
79    ///     let mut stream = client
80    ///         .execute_once_stream(Request::new(Params::default(), &mut io::empty()))
81    ///         .await
82    ///         .unwrap();
83    ///
84    ///     while let Some(content) = stream.next().await {
85    ///         let content = content.unwrap();
86    ///
87    ///         match content {
88    ///             Content::Stdout(out) => todo!(),
89    ///             Content::Stderr(out) => todo!(),
90    ///         }
91    ///     }
92    /// }
93    /// ```
94    pub async fn execute_once_stream<I: AsyncRead + Unpin>(
95        mut self,
96        request: Request<'_, I>,
97    ) -> ClientResult<ResponseStream<S>> {
98        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
99        Ok(ResponseStream::new(self.stream, REQUEST_ID))
100    }
101}
102
103impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
104    /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
105    /// or `tokio::net::UnixStream`, under keep alive connection mode.
106    pub fn new_keep_alive(stream: S) -> Self {
107        Self {
108            stream,
109            _mode: PhantomData,
110        }
111    }
112
113    /// Send request and receive response from fastcgi server, under keep alive
114    /// connection mode.
115    pub async fn execute<I: AsyncRead + Unpin>(
116        &mut self,
117        request: Request<'_, I>,
118    ) -> ClientResult<Response> {
119        self.inner_execute(request).await
120    }
121
122    /// Send request and receive response stream from fastcgi server, under
123    /// keep alive connection mode.
124    ///
125    /// # Examples
126    ///
127    /// ```
128    /// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
129    /// use tokio::{io, net::TcpStream};
130    ///
131    /// async fn stream() {
132    ///     let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
133    ///     let mut client = Client::new_keep_alive(stream);
134    ///
135    ///     for _ in (0..3) {
136    ///         let mut stream = client
137    ///             .execute_stream(Request::new(Params::default(), &mut io::empty()))
138    ///             .await
139    ///             .unwrap();
140    ///
141    ///         while let Some(content) = stream.next().await {
142    ///             let content = content.unwrap();
143    ///
144    ///             match content {
145    ///                 Content::Stdout(out) => todo!(),
146    ///                 Content::Stderr(out) => todo!(),
147    ///             }
148    ///         }
149    ///     }
150    /// }
151    /// ```
152    pub async fn execute_stream<I: AsyncRead + Unpin>(
153        &mut self,
154        request: Request<'_, I>,
155    ) -> ClientResult<ResponseStream<&mut S>> {
156        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
157        Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
158    }
159}
160
161impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
162    /// Internal method to execute a request and return a complete response.
163    ///
164    /// # Arguments
165    ///
166    /// * `request` - The request to execute
167    async fn inner_execute<I: AsyncRead + Unpin>(
168        &mut self,
169        request: Request<'_, I>,
170    ) -> ClientResult<Response> {
171        Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
172        Self::handle_response(&mut self.stream, REQUEST_ID).await
173    }
174
175    /// Handles the complete request process.
176    ///
177    /// # Arguments
178    ///
179    /// * `stream` - The stream to write to
180    /// * `id` - The request ID
181    /// * `params` - The request parameters
182    /// * `body` - The request body stream
183    async fn handle_request<'a, I: AsyncRead + Unpin>(
184        stream: &mut S,
185        id: u16,
186        params: Params<'a>,
187        mut body: I,
188    ) -> ClientResult<()> {
189        Self::handle_request_start(stream, id).await?;
190
191        Self::handle_request_params(stream, id, params).await?;
192        Self::handle_request_body(stream, id, &mut body).await?;
193        Self::handle_request_flush(stream).await?;
194        Ok(())
195    }
196
197    /// Handles the start of a request by sending the begin request record.
198    ///
199    /// # Arguments
200    ///
201    /// * `stream` - The stream to write to
202    /// * `id` - The request ID
203    async fn handle_request_start(stream: &mut S, id: u16) -> ClientResult<()> {
204        debug!(id, "Start handle request");
205
206        let begin_request_rec = BeginRequestRec::new(id, Role::Responder, <M>::is_keep_alive());
207
208        //debug!(id, ?begin_request_rec, "Send to stream.");
209
210        begin_request_rec.write_to_stream(stream).await?;
211
212        Ok(())
213    }
214
215    /// Handles sending request parameters to the stream.
216    ///
217    /// # Arguments
218    ///
219    /// * `stream` - The stream to write to
220    /// * `id` - The request ID
221    /// * `params` - The request parameters
222    async fn handle_request_params<'a>(
223        stream: &mut S,
224        id: u16,
225        params: Params<'a>,
226    ) -> ClientResult<()> {
227        let param_pairs = ParamPairs::new(params);
228        debug!(id, "Params will be sent {param_pairs:#?}.");
229
230        Header::write_to_stream_batches(
231            RequestType::Params,
232            id,
233            stream,
234            &mut param_pairs.to_content().as_ref(),
235            Some(|header| {
236                debug!(id, ?header, "Send to stream for Params.");
237                header
238            }),
239        )
240        .await?;
241
242        Header::write_to_stream_batches(
243            RequestType::Params,
244            id,
245            stream,
246            &mut tokio::io::empty(),
247            Some(|header| {
248                debug!(id, ?header, "Send to stream for Params.");
249                header
250            }),
251        )
252        .await?;
253
254        Ok(())
255    }
256
257    /// Handles sending the request body to the stream.
258    ///
259    /// # Arguments
260    ///
261    /// * `stream` - The stream to write to
262    /// * `id` - The request ID
263    /// * `body` - The request body stream
264    async fn handle_request_body<I: AsyncRead + Unpin>(
265        stream: &mut S,
266        id: u16,
267        body: &mut I,
268    ) -> ClientResult<()> {
269        Header::write_to_stream_batches(
270            RequestType::Stdin,
271            id,
272            stream,
273            body,
274            Some(|header| {
275                debug!(id, ?header, "Send to stream for Stdin.");
276                header
277            }),
278        )
279        .await?;
280
281        Header::write_to_stream_batches(
282            RequestType::Stdin,
283            id,
284            stream,
285            &mut tokio::io::empty(),
286            Some(|header| {
287                debug!(id, ?header, "Send to stream for Stdin.");
288                header
289            }),
290        )
291        .await?;
292
293        Ok(())
294    }
295
296    /// Flushes the stream to ensure all data is sent.
297    ///
298    /// # Arguments
299    ///
300    /// * `stream` - The stream to flush
301    async fn handle_request_flush(stream: &mut S) -> ClientResult<()> {
302        stream.flush().await?;
303
304        Ok(())
305    }
306
307    /// Handles reading and processing the response from the stream.
308    ///
309    /// # Arguments
310    ///
311    /// * `stream` - The stream to read from
312    /// * `id` - The request ID to match
313    async fn handle_response(stream: &mut S, id: u16) -> ClientResult<Response> {
314        let mut response = Response::default();
315
316        let mut stderr = BytesMut::new();
317        let mut stdout = BytesMut::new();
318
319        loop {
320            let header = Header::new_from_stream(stream).await?;
321            if header.request_id != id {
322                return Err(ClientError::ResponseNotFound { id });
323            }
324            debug!(id, ?header, "Receive from stream.");
325
326            match header.r#type {
327                RequestType::Stdout => {
328                    stdout.extend_from_slice(&header.read_content_from_stream(stream).await?);
329                }
330                RequestType::Stderr => {
331                    stderr.extend_from_slice(&header.read_content_from_stream(stream).await?);
332                }
333                RequestType::EndRequest => {
334                    let end_request_rec = EndRequestRec::from_header(&header, stream).await?;
335                    debug!(id, ?end_request_rec, "Receive from stream.");
336
337                    end_request_rec
338                        .end_request
339                        .protocol_status
340                        .convert_to_client_result(end_request_rec.end_request.app_status)?;
341
342                    response.stdout = if stdout.is_empty() {
343                        None
344                    } else {
345                        Some(stdout.freeze())
346                    };
347                    response.stderr = if stderr.is_empty() {
348                        None
349                    } else {
350                        Some(stderr.freeze())
351                    };
352
353                    return Ok(response);
354                }
355                r#type => {
356                    return Err(ClientError::UnknownRequestType {
357                        request_type: r#type,
358                    });
359                }
360            }
361        }
362    }
363}
364
365/// Добавляю реализацию deref в stream потому что Client ведет себя по большей части просто как stream.
366impl<S, M> Deref for Client<S, M> {
367    type Target = S;
368
369    fn deref(&self) -> &Self::Target {
370        &self.stream
371    }
372}
373
374impl<S, M> DerefMut for Client<S, M> {
375    fn deref_mut(&mut self) -> &mut Self::Target {
376        &mut self.stream
377    }
378}