fastcgi_connect/client.rs
1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI client implementation for async communication with FastCGI servers.
16//!
17//! This module provides the main `Client` struct that handles communication
18//! with FastCGI servers in both short connection and keep-alive modes.
19//! The client can execute requests and receive responses or response streams.
20
21use crate::{
22 conn::{KeepAlive, Mode, ShortConn},
23 meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
24 params::Params,
25 request::Request,
26 response::ResponseStream,
27 ClientError, ClientResult, Response,
28};
29#[cfg(feature = "smol")]
30use smol::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
31use std::marker::PhantomData;
32#[cfg(feature = "tokio")]
33use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
34use tracing::debug;
35
36/// I refer to nginx fastcgi implementation, found the request id is always 1.
37///
38/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
39const REQUEST_ID: u16 = 1;
40
41/// Async client for handling communication between fastcgi server.
42pub struct Client<S, M> {
43 stream: S,
44 _mode: PhantomData<M>,
45}
46
47impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
48 /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
49 /// or `tokio::net::UnixStream`, under short connection mode.
50 pub fn new(stream: S) -> Self {
51 Self {
52 stream,
53 _mode: PhantomData,
54 }
55 }
56
57 /// Send request and receive response from fastcgi server, under short
58 /// connection mode.
59 pub async fn execute_once<I: AsyncRead + Unpin>(
60 mut self, request: Request<'_, I>,
61 ) -> ClientResult<Response> {
62 self.inner_execute(request).await
63 }
64
65 /// Send request and receive response stream from fastcgi server, under
66 /// short connection mode.
67 ///
68 /// # Examples
69 ///
70 /// ```
71 /// use fastcgi_connect::{response::Content, Client, Params, Request, StreamExt};
72 /// #[cfg(feature = "smol")]
73 /// use smol::{io, net::TcpStream};
74 /// #[cfg(feature = "tokio")]
75 /// use tokio::{io, net::TcpStream};
76 ///
77 /// async fn stream() {
78 /// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
79 /// let client = Client::new(stream);
80 /// let mut stream = client
81 /// .execute_once_stream(Request::new(Params::default(), &mut io::empty()))
82 /// .await
83 /// .unwrap();
84 ///
85 /// while let Some(content) = stream.next().await {
86 /// let content = content.unwrap();
87 ///
88 /// match content {
89 /// Content::Stdout(out) => todo!(),
90 /// Content::Stderr(out) => todo!(),
91 /// }
92 /// }
93 /// }
94 /// ```
95 pub async fn execute_once_stream<I: AsyncRead + Unpin>(
96 mut self, request: Request<'_, I>,
97 ) -> ClientResult<ResponseStream<S>> {
98 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
99 Ok(ResponseStream::new(self.stream, REQUEST_ID))
100 }
101}
102
103impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
104 /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
105 /// or `tokio::net::UnixStream`, under keep alive connection mode.
106 pub fn new_keep_alive(stream: S) -> Self {
107 Self {
108 stream,
109 _mode: PhantomData,
110 }
111 }
112
113 /// Send request and receive response from fastcgi server, under keep alive
114 /// connection mode.
115 pub async fn execute<I: AsyncRead + Unpin>(
116 &mut self, request: Request<'_, I>,
117 ) -> ClientResult<Response> {
118 self.inner_execute(request).await
119 }
120
121 /// Send request and receive response stream from fastcgi server, under
122 /// keep alive connection mode.
123 ///
124 /// # Examples
125 ///
126 /// ```
127 /// use fastcgi_connect::{response::Content, Client, Params, Request, StreamExt};
128 /// #[cfg(feature = "smol")]
129 /// use smol::{io, net::TcpStream};
130 /// #[cfg(feature = "tokio")]
131 /// use tokio::{io, net::TcpStream};
132 ///
133 /// async fn stream() {
134 /// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
135 /// let mut client = Client::new_keep_alive(stream);
136 ///
137 /// for _ in (0..3) {
138 /// let mut stream = client
139 /// .execute_stream(Request::new(Params::default(), &mut io::empty()))
140 /// .await
141 /// .unwrap();
142 ///
143 /// while let Some(content) = stream.next().await {
144 /// let content = content.unwrap();
145 ///
146 /// match content {
147 /// Content::Stdout(out) => todo!(),
148 /// Content::Stderr(out) => todo!(),
149 /// }
150 /// }
151 /// }
152 /// }
153 /// ```
154 pub async fn execute_stream<I: AsyncRead + Unpin>(
155 &mut self, request: Request<'_, I>,
156 ) -> ClientResult<ResponseStream<&mut S>> {
157 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
158 Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
159 }
160}
161
162impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
163 /// Internal method to execute a request and return a complete response.
164 ///
165 /// # Arguments
166 ///
167 /// * `request` - The request to execute
168 async fn inner_execute<I: AsyncRead + Unpin>(
169 &mut self, request: Request<'_, I>,
170 ) -> ClientResult<Response> {
171 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
172 Self::handle_response(&mut self.stream, REQUEST_ID).await
173 }
174
175 /// Handles the complete request process.
176 ///
177 /// # Arguments
178 ///
179 /// * `stream` - The stream to write to
180 /// * `id` - The request ID
181 /// * `params` - The request parameters
182 /// * `body` - The request body stream
183 async fn handle_request<'a, I: AsyncRead + Unpin>(
184 stream: &mut S, id: u16, params: Params<'a>, mut body: I,
185 ) -> ClientResult<()> {
186 Self::handle_request_start(stream, id).await?;
187 Self::handle_request_params(stream, id, params).await?;
188 Self::handle_request_body(stream, id, &mut body).await?;
189 Self::handle_request_flush(stream).await?;
190 Ok(())
191 }
192
193 /// Handles the start of a request by sending the begin request record.
194 ///
195 /// # Arguments
196 ///
197 /// * `stream` - The stream to write to
198 /// * `id` - The request ID
199 async fn handle_request_start(stream: &mut S, id: u16) -> ClientResult<()> {
200 debug!(id, "Start handle request");
201
202 let begin_request_rec =
203 BeginRequestRec::new(id, Role::Responder, <M>::is_keep_alive()).await?;
204
205 debug!(id, ?begin_request_rec, "Send to stream.");
206
207 begin_request_rec.write_to_stream(stream).await?;
208
209 Ok(())
210 }
211
212 /// Handles sending request parameters to the stream.
213 ///
214 /// # Arguments
215 ///
216 /// * `stream` - The stream to write to
217 /// * `id` - The request ID
218 /// * `params` - The request parameters
219 async fn handle_request_params<'a>(
220 stream: &mut S, id: u16, params: Params<'a>,
221 ) -> ClientResult<()> {
222 let param_pairs = ParamPairs::new(params);
223 debug!(id, ?param_pairs, "Params will be sent.");
224
225 Header::write_to_stream_batches(
226 RequestType::Params,
227 id,
228 stream,
229 &mut ¶m_pairs.to_content().await?[..],
230 Some(|header| {
231 debug!(id, ?header, "Send to stream for Params.");
232 header
233 }),
234 )
235 .await?;
236
237 #[cfg(feature = "smol")]
238 let mut empty = smol::io::empty();
239 #[cfg(feature = "tokio")]
240 let mut empty = tokio::io::empty();
241
242 Header::write_to_stream_batches(
243 RequestType::Params,
244 id,
245 stream,
246 &mut empty,
247 Some(|header| {
248 debug!(id, ?header, "Send to stream for Params.");
249 header
250 }),
251 )
252 .await?;
253
254 Ok(())
255 }
256
257 /// Handles sending the request body to the stream.
258 ///
259 /// # Arguments
260 ///
261 /// * `stream` - The stream to write to
262 /// * `id` - The request ID
263 /// * `body` - The request body stream
264 async fn handle_request_body<I: AsyncRead + Unpin>(
265 stream: &mut S, id: u16, body: &mut I,
266 ) -> ClientResult<()> {
267 Header::write_to_stream_batches(
268 RequestType::Stdin,
269 id,
270 stream,
271 body,
272 Some(|header| {
273 debug!(id, ?header, "Send to stream for Stdin.");
274 header
275 }),
276 )
277 .await?;
278
279 #[cfg(feature = "smol")]
280 let mut empty = smol::io::empty();
281 #[cfg(feature = "tokio")]
282 let mut empty = tokio::io::empty();
283
284 Header::write_to_stream_batches(
285 RequestType::Stdin,
286 id,
287 stream,
288 &mut empty,
289 Some(|header| {
290 debug!(id, ?header, "Send to stream for Stdin.");
291 header
292 }),
293 )
294 .await?;
295
296 Ok(())
297 }
298
299 /// Flushes the stream to ensure all data is sent.
300 ///
301 /// # Arguments
302 ///
303 /// * `stream` - The stream to flush
304 async fn handle_request_flush(stream: &mut S) -> ClientResult<()> {
305 stream.flush().await?;
306
307 Ok(())
308 }
309
310 /// Handles reading and processing the response from the stream.
311 ///
312 /// # Arguments
313 ///
314 /// * `stream` - The stream to read from
315 /// * `id` - The request ID to match
316 async fn handle_response(stream: &mut S, id: u16) -> ClientResult<Response> {
317 let mut response = Response::default();
318
319 let mut stderr = Vec::new();
320 let mut stdout = Vec::new();
321
322 loop {
323 let header = Header::new_from_stream(stream).await?;
324 if header.request_id != id {
325 return Err(ClientError::ResponseNotFound { id });
326 }
327 debug!(id, ?header, "Receive from stream.");
328
329 match header.r#type {
330 RequestType::Stdout => {
331 stdout.extend(header.read_content_from_stream(stream).await?);
332 }
333 RequestType::Stderr => {
334 stderr.extend(header.read_content_from_stream(stream).await?);
335 }
336 RequestType::EndRequest => {
337 let end_request_rec = EndRequestRec::from_header(&header, stream).await?;
338 debug!(id, ?end_request_rec, "Receive from stream.");
339
340 end_request_rec
341 .end_request
342 .protocol_status
343 .convert_to_client_result(end_request_rec.end_request.app_status)?;
344
345 response.stdout = if stdout.is_empty() {
346 None
347 } else {
348 Some(stdout)
349 };
350 response.stderr = if stderr.is_empty() {
351 None
352 } else {
353 Some(stderr)
354 };
355
356 return Ok(response);
357 }
358 r#type => {
359 return Err(ClientError::UnknownRequestType {
360 request_type: r#type,
361 })
362 }
363 }
364 }
365 }
366}