fastcgi_client/client.rs
1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI client implementation for async communication with FastCGI servers.
16//!
17//! This module provides the main `Client` struct that handles communication
18//! with FastCGI servers in both short connection and keep-alive modes.
19//! The client can execute requests and receive responses or response streams.
20
21use crate::{
22 conn::{KeepAlive, Mode, ShortConn},
23 meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
24 params::Params,
25 request::Request,
26 response::ResponseStream,
27 ClientError, ClientResult, Response,
28};
29use std::marker::PhantomData;
30use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
31use tracing::debug;
32
33/// I refer to nginx fastcgi implementation, found the request id is always 1.
34///
35/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
36const REQUEST_ID: u16 = 1;
37
38/// Async client for handling communication between fastcgi server.
39pub struct Client<S, M> {
40 stream: S,
41 _mode: PhantomData<M>,
42}
43
44impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
45 /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
46 /// or `tokio::net::UnixStream`, under short connection mode.
47 pub fn new(stream: S) -> Self {
48 Self {
49 stream,
50 _mode: PhantomData,
51 }
52 }
53
54 /// Send request and receive response from fastcgi server, under short
55 /// connection mode.
56 pub async fn execute_once<I: AsyncRead + Unpin>(
57 mut self, request: Request<'_, I>,
58 ) -> ClientResult<Response> {
59 self.inner_execute(request).await
60 }
61
62 /// Send request and receive response stream from fastcgi server, under
63 /// short connection mode.
64 ///
65 /// # Examples
66 ///
67 /// ```
68 /// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
69 /// use tokio::{io, net::TcpStream};
70 ///
71 /// async fn stream() {
72 /// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
73 /// let client = Client::new(stream);
74 /// let mut stream = client
75 /// .execute_once_stream(Request::new(Params::default(), &mut io::empty()))
76 /// .await
77 /// .unwrap();
78 ///
79 /// while let Some(content) = stream.next().await {
80 /// let content = content.unwrap();
81 ///
82 /// match content {
83 /// Content::Stdout(out) => todo!(),
84 /// Content::Stderr(out) => todo!(),
85 /// }
86 /// }
87 /// }
88 /// ```
89 pub async fn execute_once_stream<I: AsyncRead + Unpin>(
90 mut self, request: Request<'_, I>,
91 ) -> ClientResult<ResponseStream<S>> {
92 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
93 Ok(ResponseStream::new(self.stream, REQUEST_ID))
94 }
95}
96
97impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
98 /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
99 /// or `tokio::net::UnixStream`, under keep alive connection mode.
100 pub fn new_keep_alive(stream: S) -> Self {
101 Self {
102 stream,
103 _mode: PhantomData,
104 }
105 }
106
107 /// Send request and receive response from fastcgi server, under keep alive
108 /// connection mode.
109 pub async fn execute<I: AsyncRead + Unpin>(
110 &mut self, request: Request<'_, I>,
111 ) -> ClientResult<Response> {
112 self.inner_execute(request).await
113 }
114
115 /// Send request and receive response stream from fastcgi server, under
116 /// keep alive connection mode.
117 ///
118 /// # Examples
119 ///
120 /// ```
121 /// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
122 /// use tokio::{io, net::TcpStream};
123 ///
124 /// async fn stream() {
125 /// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
126 /// let mut client = Client::new_keep_alive(stream);
127 ///
128 /// for _ in (0..3) {
129 /// let mut stream = client
130 /// .execute_stream(Request::new(Params::default(), &mut io::empty()))
131 /// .await
132 /// .unwrap();
133 ///
134 /// while let Some(content) = stream.next().await {
135 /// let content = content.unwrap();
136 ///
137 /// match content {
138 /// Content::Stdout(out) => todo!(),
139 /// Content::Stderr(out) => todo!(),
140 /// }
141 /// }
142 /// }
143 /// }
144 /// ```
145 pub async fn execute_stream<I: AsyncRead + Unpin>(
146 &mut self, request: Request<'_, I>,
147 ) -> ClientResult<ResponseStream<&mut S>> {
148 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
149 Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
150 }
151}
152
153impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
154 /// Internal method to execute a request and return a complete response.
155 ///
156 /// # Arguments
157 ///
158 /// * `request` - The request to execute
159 async fn inner_execute<I: AsyncRead + Unpin>(
160 &mut self, request: Request<'_, I>,
161 ) -> ClientResult<Response> {
162 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
163 Self::handle_response(&mut self.stream, REQUEST_ID).await
164 }
165
166 /// Handles the complete request process.
167 ///
168 /// # Arguments
169 ///
170 /// * `stream` - The stream to write to
171 /// * `id` - The request ID
172 /// * `params` - The request parameters
173 /// * `body` - The request body stream
174 async fn handle_request<'a, I: AsyncRead + Unpin>(
175 stream: &mut S, id: u16, params: Params<'a>, mut body: I,
176 ) -> ClientResult<()> {
177 Self::handle_request_start(stream, id).await?;
178 Self::handle_request_params(stream, id, params).await?;
179 Self::handle_request_body(stream, id, &mut body).await?;
180 Self::handle_request_flush(stream).await?;
181 Ok(())
182 }
183
184 /// Handles the start of a request by sending the begin request record.
185 ///
186 /// # Arguments
187 ///
188 /// * `stream` - The stream to write to
189 /// * `id` - The request ID
190 async fn handle_request_start(stream: &mut S, id: u16) -> ClientResult<()> {
191 debug!(id, "Start handle request");
192
193 let begin_request_rec =
194 BeginRequestRec::new(id, Role::Responder, <M>::is_keep_alive()).await?;
195
196 debug!(id, ?begin_request_rec, "Send to stream.");
197
198 begin_request_rec.write_to_stream(stream).await?;
199
200 Ok(())
201 }
202
203 /// Handles sending request parameters to the stream.
204 ///
205 /// # Arguments
206 ///
207 /// * `stream` - The stream to write to
208 /// * `id` - The request ID
209 /// * `params` - The request parameters
210 async fn handle_request_params<'a>(
211 stream: &mut S, id: u16, params: Params<'a>,
212 ) -> ClientResult<()> {
213 let param_pairs = ParamPairs::new(params);
214 debug!(id, ?param_pairs, "Params will be sent.");
215
216 Header::write_to_stream_batches(
217 RequestType::Params,
218 id,
219 stream,
220 &mut ¶m_pairs.to_content().await?[..],
221 Some(|header| {
222 debug!(id, ?header, "Send to stream for Params.");
223 header
224 }),
225 )
226 .await?;
227
228 Header::write_to_stream_batches(
229 RequestType::Params,
230 id,
231 stream,
232 &mut tokio::io::empty(),
233 Some(|header| {
234 debug!(id, ?header, "Send to stream for Params.");
235 header
236 }),
237 )
238 .await?;
239
240 Ok(())
241 }
242
243 /// Handles sending the request body to the stream.
244 ///
245 /// # Arguments
246 ///
247 /// * `stream` - The stream to write to
248 /// * `id` - The request ID
249 /// * `body` - The request body stream
250 async fn handle_request_body<I: AsyncRead + Unpin>(
251 stream: &mut S, id: u16, body: &mut I,
252 ) -> ClientResult<()> {
253 Header::write_to_stream_batches(
254 RequestType::Stdin,
255 id,
256 stream,
257 body,
258 Some(|header| {
259 debug!(id, ?header, "Send to stream for Stdin.");
260 header
261 }),
262 )
263 .await?;
264
265 Header::write_to_stream_batches(
266 RequestType::Stdin,
267 id,
268 stream,
269 &mut tokio::io::empty(),
270 Some(|header| {
271 debug!(id, ?header, "Send to stream for Stdin.");
272 header
273 }),
274 )
275 .await?;
276
277 Ok(())
278 }
279
280 /// Flushes the stream to ensure all data is sent.
281 ///
282 /// # Arguments
283 ///
284 /// * `stream` - The stream to flush
285 async fn handle_request_flush(stream: &mut S) -> ClientResult<()> {
286 stream.flush().await?;
287
288 Ok(())
289 }
290
291 /// Handles reading and processing the response from the stream.
292 ///
293 /// # Arguments
294 ///
295 /// * `stream` - The stream to read from
296 /// * `id` - The request ID to match
297 async fn handle_response(stream: &mut S, id: u16) -> ClientResult<Response> {
298 let mut response = Response::default();
299
300 let mut stderr = Vec::new();
301 let mut stdout = Vec::new();
302
303 loop {
304 let header = Header::new_from_stream(stream).await?;
305 if header.request_id != id {
306 return Err(ClientError::ResponseNotFound { id });
307 }
308 debug!(id, ?header, "Receive from stream.");
309
310 match header.r#type {
311 RequestType::Stdout => {
312 stdout.extend(header.read_content_from_stream(stream).await?);
313 }
314 RequestType::Stderr => {
315 stderr.extend(header.read_content_from_stream(stream).await?);
316 }
317 RequestType::EndRequest => {
318 let end_request_rec = EndRequestRec::from_header(&header, stream).await?;
319 debug!(id, ?end_request_rec, "Receive from stream.");
320
321 end_request_rec
322 .end_request
323 .protocol_status
324 .convert_to_client_result(end_request_rec.end_request.app_status)?;
325
326 response.stdout = if stdout.is_empty() {
327 None
328 } else {
329 Some(stdout)
330 };
331 response.stderr = if stderr.is_empty() {
332 None
333 } else {
334 Some(stderr)
335 };
336
337 return Ok(response);
338 }
339 r#type => {
340 return Err(ClientError::UnknownRequestType {
341 request_type: r#type,
342 })
343 }
344 }
345 }
346 }
347}