monolake_services/thrift/
ttheader.rs

1//! Core Thrift THeader protocol service implementation for handling downstream client connections.
2//!
3//! This module provides a high-performance, asynchronous Thrift service that handles
4//! connections from downstream clients using the THeader protocol. It is designed to work
5//! with monoio's asynchronous runtime, providing fine-grained control over various timeouts.
6//!
7//! # Key Components
8//!
9//! - [`TtheaderCoreService`]: The main service component responsible for handling Thrift THeader
10//!   connections from downstream clients. It can be composed of a stack of handlers implementing
11//!   the [`ThriftHandler`] trait.
12//! - [`ThriftServerTimeout`]: Configuration for various timeout settings in the Thrift server.
13//!
14//! # Features
15//!
16//! - Support for Thrift THeader protocol
17//! - Composable design allowing a stack of [`ThriftHandler`] implementations
18//! - Efficient handling of concurrent requests using asynchronous I/O
19//! - Configurable timeout settings for different stages of request processing
20//! - Automatic message framing and error handling
21//!
22//! # Usage
23//!
24//! [`TtheaderCoreService`] is typically used as part of a larger service stack. Here's a basic
25//! example:
26//!
27//! ```ignore
28//! use service_async::{layer::FactoryLayer, stack::FactoryStack};
29//!
30//! use crate::thrift::TtheaderCoreService;
31//!
32//! let config = Config { /* ... */ };
33//! let proxy_config = Config { /* ... */ };
34//! let stack = FactoryStack::new(config)
35//!     .replace(TProxyHandler::factory(proxy_config))
36//!     .push(TtheaderCoreService::layer());
37//!
38//! let service = stack.make_async().await.unwrap();
39//! // Use the service to handle incoming Thrift THeader connections from downstream clients
40//! ```
41//!
42//! # Handler Composition
43//!
44//! [`TtheaderCoreService`] can be composed of multiple handlers implementing the [`ThriftHandler`]
45//! trait. This allows for a flexible and modular approach to request processing. Handlers can be
46//! chained together to form a processing pipeline, each handling a specific aspect of the
47//! Thrift request/response cycle.
48//!
49//! # Performance Considerations
50//!
51//! - Uses monoio's efficient async I/O operations for improved performance
52//! - Implements connection keep-alive to reduce connection overhead
53//! - Efficient message framing and decoding using the THeader protocol
54
55use std::{convert::Infallible, fmt::Debug, time::Duration};
56
57use certain_map::{Attach, Fork};
58use monoio::io::{sink::SinkExt, stream::Stream, AsyncReadRent, AsyncWriteRent};
59use monoio_codec::Framed;
60use monoio_thrift::codec::ttheader::{RawPayloadCodec, TTHeaderPayloadCodec};
61use monolake_core::{context::PeerAddr, thrift::ThriftHandler, AnyError};
62use service_async::{
63    layer::{layer_fn, FactoryLayer},
64    AsyncMakeService, MakeService, Param, ParamRef, Service,
65};
66use tracing::{error, info, trace, warn};
67
68/// Core Thrift service handler supporting the THeader protocol.
69///
70/// `TtheaderCoreService` is responsible for accepting Thrift connections, decoding requests,
71/// routing them through a handler chain, and encoding responses. It supports the THeader
72/// protocol for efficient message framing and metadata handling.
73/// For implementation details and example usage, see the
74/// [module level documentation](crate::thrift::ttheader).
75#[derive(Clone)]
76pub struct TtheaderCoreService<H> {
77    handler_chain: H,
78    thrift_timeout: ThriftServerTimeout,
79}
80
81impl<H> TtheaderCoreService<H> {
82    pub fn new(handler_chain: H, thrift_timeout: ThriftServerTimeout) -> Self {
83        TtheaderCoreService {
84            handler_chain,
85            thrift_timeout,
86        }
87    }
88}
89
90impl<H, Stream, CXIn, CXStore, CXState, ERR> Service<(Stream, CXIn)> for TtheaderCoreService<H>
91where
92    CXIn: ParamRef<PeerAddr> + Fork<Store = CXStore, State = CXState>,
93    CXStore: 'static,
94    for<'a> CXState: Attach<CXStore>,
95    for<'a> H: ThriftHandler<<CXState as Attach<CXStore>>::Hdr<'a>, Error = ERR>,
96    ERR: Into<AnyError> + Debug,
97    Stream: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
98{
99    type Response = ();
100    type Error = Infallible;
101
102    async fn call(&self, (stream, ctx): (Stream, CXIn)) -> Result<Self::Response, Self::Error> {
103        let mut codec = Framed::new(stream, TTHeaderPayloadCodec::new(RawPayloadCodec::new()));
104        loop {
105            if let Some(keepalive_timeout) = self.thrift_timeout.keepalive_timeout {
106                match monoio::time::timeout(keepalive_timeout, codec.peek_data()).await {
107                    Ok(Ok([])) => {
108                        // Connection closed normally.
109                        info!("Connection closed due to keepalive timeout");
110                        break;
111                    }
112                    Ok(Err(io_error)) => {
113                        error!(
114                            "Connection {:?} io error: {io_error}",
115                            ParamRef::<PeerAddr>::param_ref(&ctx)
116                        );
117                        break;
118                    }
119                    Err(_) => {
120                        info!(
121                            "Connection {:?} keepalive timed out",
122                            ParamRef::<PeerAddr>::param_ref(&ctx),
123                        );
124                        break;
125                    }
126                    _ => {}
127                }
128            }
129
130            // decode request with message timeout
131            let decoded = match self.thrift_timeout.message_timeout {
132                Some(message_timeout) => {
133                    match monoio::time::timeout(message_timeout, codec.next()).await {
134                        Ok(x) => x,
135                        Err(_) => {
136                            info!(
137                                "Connection {:?} message timed out",
138                                ParamRef::<PeerAddr>::param_ref(&ctx),
139                            );
140                            break;
141                        }
142                    }
143                }
144                None => codec.next().await,
145            };
146
147            let req = match decoded {
148                Some(Ok(req)) => req,
149                Some(Err(err)) => {
150                    // decode error
151                    error!("decode thrift message failed: {err}");
152                    break;
153                }
154                None => {
155                    // Connection closed normally.
156                    trace!("Connection closed normally due to read EOF");
157                    break;
158                }
159            };
160
161            // fork ctx
162            let (mut store, state) = ctx.fork();
163            let forked_ctx = unsafe { state.attach(&mut store) };
164
165            // handle request and reply response
166            match self.handler_chain.handle(req, forked_ctx).await {
167                Ok(resp) => {
168                    if let Err(e) = codec.send_and_flush(resp).await {
169                        warn!("error when reply client: {e}");
170                        break;
171                    }
172                    trace!("sent thrift response");
173                }
174                Err(e) => {
175                    // something error when process request(not a biz error)
176                    error!("error when processing request: {e:?}");
177                    // todo: error resp
178                    // if let Err(e) = encoder
179                    // .send_and_flush(generate_response(StatusCode::INTERNAL_SERVER_ERROR, true))
180                    // .await
181                    // {
182                    // warn!("error when reply client: {e}");
183                    // }
184                    break;
185                }
186            }
187        }
188        Ok(())
189    }
190}
191
192// TtheaderCoreService is a Service and a MakeService.
193impl<F> MakeService for TtheaderCoreService<F>
194where
195    F: MakeService,
196{
197    type Service = TtheaderCoreService<F::Service>;
198    type Error = F::Error;
199
200    fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
201        Ok(TtheaderCoreService {
202            handler_chain: self
203                .handler_chain
204                .make_via_ref(old.map(|o| &o.handler_chain))?,
205            thrift_timeout: self.thrift_timeout,
206        })
207    }
208}
209
210impl<F: AsyncMakeService> AsyncMakeService for TtheaderCoreService<F> {
211    type Service = TtheaderCoreService<F::Service>;
212    type Error = F::Error;
213
214    async fn make_via_ref(
215        &self,
216        old: Option<&Self::Service>,
217    ) -> Result<Self::Service, Self::Error> {
218        Ok(TtheaderCoreService {
219            handler_chain: self
220                .handler_chain
221                .make_via_ref(old.map(|o| &o.handler_chain))
222                .await?,
223            thrift_timeout: self.thrift_timeout,
224        })
225    }
226}
227
228/// Configuration for Thrift server timeouts.
229///
230/// This struct allows setting timeouts for connection keepalive and message reading,
231/// providing fine-grained control over the Thrift server's behavior.
232#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Hash)]
233pub struct ThriftServerTimeout {
234    // Connection keepalive timeout: If no byte comes when decoder want next request, close the
235    // connection. Link Nginx `keepalive_timeout`
236    pub keepalive_timeout: Option<Duration>,
237    // Read full thrift message.
238    pub message_timeout: Option<Duration>,
239}
240
241impl<F> TtheaderCoreService<F> {
242    pub fn layer<C>() -> impl FactoryLayer<C, F, Factory = Self>
243    where
244        C: Param<ThriftServerTimeout>,
245    {
246        layer_fn(|c: &C, inner| Self::new(inner, c.param()))
247    }
248}