fcgi_client/client.rs
1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI client implementation for async communication with FastCGI servers.
16//!
17//! This module provides the main `Client` struct that handles communication
18//! with FastCGI servers in both short connection and keep-alive modes.
19//! The client can execute requests and receive responses or response streams.
20
21use crate::{
22 ClientError, ClientResult, Response,
23 conn::{KeepAlive, Mode, ShortConn},
24 meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
25 params::Params,
26 request::Request,
27 response::ResponseStream,
28};
29use bytes::BytesMut;
30use std::{
31 marker::PhantomData,
32 ops::{Deref, DerefMut},
33};
34use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
35use tracing::debug;
36
37/// I refer to nginx fastcgi implementation, found the request id is always 1.
38///
39/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
40const REQUEST_ID: u16 = 1;
41
42/// Async client for handling communication between fastcgi server.
43pub struct Client<S, M> {
44 stream: S,
45 _mode: PhantomData<M>,
46}
47
48impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
49 /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
50 /// or `tokio::net::UnixStream`, under short connection mode.
51 pub fn new(stream: S) -> Self {
52 Self {
53 stream,
54 _mode: PhantomData,
55 }
56 }
57
58 /// Send request and receive response from fastcgi server, under short
59 /// connection mode.
60 pub async fn execute_once<I: AsyncRead + Unpin>(
61 mut self,
62 request: Request<'_, I>,
63 ) -> ClientResult<Response> {
64 self.inner_execute(request).await
65 }
66
67 /// Send request and receive response stream from fastcgi server, under
68 /// short connection mode.
69 ///
70 /// # Examples
71 ///
72 /// ```
73 /// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
74 /// use tokio::{io, net::TcpStream};
75 ///
76 /// async fn stream() {
77 /// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
78 /// let client = Client::new(stream);
79 /// let mut stream = client
80 /// .execute_once_stream(Request::new(Params::default(), &mut io::empty()))
81 /// .await
82 /// .unwrap();
83 ///
84 /// while let Some(content) = stream.next().await {
85 /// let content = content.unwrap();
86 ///
87 /// match content {
88 /// Content::Stdout(out) => todo!(),
89 /// Content::Stderr(out) => todo!(),
90 /// }
91 /// }
92 /// }
93 /// ```
94 pub async fn execute_once_stream<I: AsyncRead + Unpin>(
95 mut self,
96 request: Request<'_, I>,
97 ) -> ClientResult<ResponseStream<S>> {
98 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
99 Ok(ResponseStream::new(self.stream, REQUEST_ID))
100 }
101}
102
103impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
104 /// Construct a `Client` Object with stream, such as `tokio::net::TcpStream`
105 /// or `tokio::net::UnixStream`, under keep alive connection mode.
106 pub fn new_keep_alive(stream: S) -> Self {
107 Self {
108 stream,
109 _mode: PhantomData,
110 }
111 }
112
113 /// Send request and receive response from fastcgi server, under keep alive
114 /// connection mode.
115 pub async fn execute<I: AsyncRead + Unpin>(
116 &mut self,
117 request: Request<'_, I>,
118 ) -> ClientResult<Response> {
119 self.inner_execute(request).await
120 }
121
122 /// Send request and receive response stream from fastcgi server, under
123 /// keep alive connection mode.
124 ///
125 /// # Examples
126 ///
127 /// ```
128 /// use fastcgi_client::{response::Content, Client, Params, Request, StreamExt};
129 /// use tokio::{io, net::TcpStream};
130 ///
131 /// async fn stream() {
132 /// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
133 /// let mut client = Client::new_keep_alive(stream);
134 ///
135 /// for _ in (0..3) {
136 /// let mut stream = client
137 /// .execute_stream(Request::new(Params::default(), &mut io::empty()))
138 /// .await
139 /// .unwrap();
140 ///
141 /// while let Some(content) = stream.next().await {
142 /// let content = content.unwrap();
143 ///
144 /// match content {
145 /// Content::Stdout(out) => todo!(),
146 /// Content::Stderr(out) => todo!(),
147 /// }
148 /// }
149 /// }
150 /// }
151 /// ```
152 pub async fn execute_stream<I: AsyncRead + Unpin>(
153 &mut self,
154 request: Request<'_, I>,
155 ) -> ClientResult<ResponseStream<&mut S>> {
156 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
157 Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
158 }
159}
160
161impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
162 /// Internal method to execute a request and return a complete response.
163 ///
164 /// # Arguments
165 ///
166 /// * `request` - The request to execute
167 async fn inner_execute<I: AsyncRead + Unpin>(
168 &mut self,
169 request: Request<'_, I>,
170 ) -> ClientResult<Response> {
171 Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
172 Self::handle_response(&mut self.stream, REQUEST_ID).await
173 }
174
175 /// Handles the complete request process.
176 ///
177 /// # Arguments
178 ///
179 /// * `stream` - The stream to write to
180 /// * `id` - The request ID
181 /// * `params` - The request parameters
182 /// * `body` - The request body stream
183 async fn handle_request<'a, I: AsyncRead + Unpin>(
184 stream: &mut S,
185 id: u16,
186 params: Params<'a>,
187 mut body: I,
188 ) -> ClientResult<()> {
189 Self::handle_request_start(stream, id).await?;
190
191 Self::handle_request_params(stream, id, params).await?;
192 Self::handle_request_body(stream, id, &mut body).await?;
193 Self::handle_request_flush(stream).await?;
194 Ok(())
195 }
196
197 /// Handles the start of a request by sending the begin request record.
198 ///
199 /// # Arguments
200 ///
201 /// * `stream` - The stream to write to
202 /// * `id` - The request ID
203 async fn handle_request_start(stream: &mut S, id: u16) -> ClientResult<()> {
204 debug!(id, "Start handle request");
205
206 let begin_request_rec = BeginRequestRec::new(id, Role::Responder, <M>::is_keep_alive());
207
208 //debug!(id, ?begin_request_rec, "Send to stream.");
209
210 begin_request_rec.write_to_stream(stream).await?;
211
212 Ok(())
213 }
214
215 /// Handles sending request parameters to the stream.
216 ///
217 /// # Arguments
218 ///
219 /// * `stream` - The stream to write to
220 /// * `id` - The request ID
221 /// * `params` - The request parameters
222 async fn handle_request_params<'a>(
223 stream: &mut S,
224 id: u16,
225 params: Params<'a>,
226 ) -> ClientResult<()> {
227 let param_pairs = ParamPairs::new(params);
228 debug!(id, "Params will be sent {param_pairs:#?}.");
229
230 Header::write_to_stream_batches(
231 RequestType::Params,
232 id,
233 stream,
234 &mut param_pairs.to_content().as_ref(),
235 Some(|header| {
236 debug!(id, ?header, "Send to stream for Params.");
237 header
238 }),
239 )
240 .await?;
241
242 Header::write_to_stream_batches(
243 RequestType::Params,
244 id,
245 stream,
246 &mut tokio::io::empty(),
247 Some(|header| {
248 debug!(id, ?header, "Send to stream for Params.");
249 header
250 }),
251 )
252 .await?;
253
254 Ok(())
255 }
256
257 /// Handles sending the request body to the stream.
258 ///
259 /// # Arguments
260 ///
261 /// * `stream` - The stream to write to
262 /// * `id` - The request ID
263 /// * `body` - The request body stream
264 async fn handle_request_body<I: AsyncRead + Unpin>(
265 stream: &mut S,
266 id: u16,
267 body: &mut I,
268 ) -> ClientResult<()> {
269 Header::write_to_stream_batches(
270 RequestType::Stdin,
271 id,
272 stream,
273 body,
274 Some(|header| {
275 debug!(id, ?header, "Send to stream for Stdin.");
276 header
277 }),
278 )
279 .await?;
280
281 Header::write_to_stream_batches(
282 RequestType::Stdin,
283 id,
284 stream,
285 &mut tokio::io::empty(),
286 Some(|header| {
287 debug!(id, ?header, "Send to stream for Stdin.");
288 header
289 }),
290 )
291 .await?;
292
293 Ok(())
294 }
295
296 /// Flushes the stream to ensure all data is sent.
297 ///
298 /// # Arguments
299 ///
300 /// * `stream` - The stream to flush
301 async fn handle_request_flush(stream: &mut S) -> ClientResult<()> {
302 stream.flush().await?;
303
304 Ok(())
305 }
306
307 /// Handles reading and processing the response from the stream.
308 ///
309 /// # Arguments
310 ///
311 /// * `stream` - The stream to read from
312 /// * `id` - The request ID to match
313 async fn handle_response(stream: &mut S, id: u16) -> ClientResult<Response> {
314 let mut response = Response::default();
315
316 let mut stderr = BytesMut::new();
317 let mut stdout = BytesMut::new();
318
319 loop {
320 let header = Header::new_from_stream(stream).await?;
321 if header.request_id != id {
322 return Err(ClientError::ResponseNotFound { id });
323 }
324 debug!(id, ?header, "Receive from stream.");
325
326 match header.r#type {
327 RequestType::Stdout => {
328 stdout.extend_from_slice(&header.read_content_from_stream(stream).await?);
329 }
330 RequestType::Stderr => {
331 stderr.extend_from_slice(&header.read_content_from_stream(stream).await?);
332 }
333 RequestType::EndRequest => {
334 let end_request_rec = EndRequestRec::from_header(&header, stream).await?;
335 debug!(id, ?end_request_rec, "Receive from stream.");
336
337 end_request_rec
338 .end_request
339 .protocol_status
340 .convert_to_client_result(end_request_rec.end_request.app_status)?;
341
342 response.stdout = if stdout.is_empty() {
343 None
344 } else {
345 Some(stdout.freeze())
346 };
347 response.stderr = if stderr.is_empty() {
348 None
349 } else {
350 Some(stderr.freeze())
351 };
352
353 return Ok(response);
354 }
355 r#type => {
356 return Err(ClientError::UnknownRequestType {
357 request_type: r#type,
358 });
359 }
360 }
361 }
362 }
363}
364
365/// Добавляю реализацию deref в stream потому что Client ведет себя по большей части просто как stream.
366impl<S, M> Deref for Client<S, M> {
367 type Target = S;
368
369 fn deref(&self) -> &Self::Target {
370 &self.stream
371 }
372}
373
374impl<S, M> DerefMut for Client<S, M> {
375 fn deref_mut(&mut self) -> &mut Self::Target {
376 &mut self.stream
377 }
378}