monolake_services/thrift/ttheader.rs
1//! Core Thrift THeader protocol service implementation for handling downstream client connections.
2//!
3//! This module provides a high-performance, asynchronous Thrift service that handles
4//! connections from downstream clients using the THeader protocol. It is designed to work
5//! with monoio's asynchronous runtime, providing fine-grained control over various timeouts.
6//!
7//! # Key Components
8//!
9//! - [`TtheaderCoreService`]: The main service component responsible for handling Thrift THeader
10//! connections from downstream clients. It can be composed of a stack of handlers implementing
11//! the [`ThriftHandler`] trait.
12//! - [`ThriftServerTimeout`]: Configuration for various timeout settings in the Thrift server.
13//!
14//! # Features
15//!
16//! - Support for Thrift THeader protocol
17//! - Composable design allowing a stack of [`ThriftHandler`] implementations
18//! - Efficient handling of concurrent requests using asynchronous I/O
19//! - Configurable timeout settings for different stages of request processing
20//! - Automatic message framing and error handling
21//!
22//! # Usage
23//!
24//! [`TtheaderCoreService`] is typically used as part of a larger service stack. Here's a basic
25//! example:
26//!
27//! ```ignore
28//! use service_async::{layer::FactoryLayer, stack::FactoryStack};
29//!
30//! use crate::thrift::TtheaderCoreService;
31//!
32//! let config = Config { /* ... */ };
33//! let proxy_config = Config { /* ... */ };
34//! let stack = FactoryStack::new(config)
35//! .replace(TProxyHandler::factory(proxy_config))
36//! .push(TtheaderCoreService::layer());
37//!
38//! let service = stack.make_async().await.unwrap();
39//! // Use the service to handle incoming Thrift THeader connections from downstream clients
40//! ```
41//!
42//! # Handler Composition
43//!
44//! [`TtheaderCoreService`] can be composed of multiple handlers implementing the [`ThriftHandler`]
45//! trait. This allows for a flexible and modular approach to request processing. Handlers can be
46//! chained together to form a processing pipeline, each handling a specific aspect of the
47//! Thrift request/response cycle.
48//!
49//! # Performance Considerations
50//!
51//! - Uses monoio's efficient async I/O operations for improved performance
52//! - Implements connection keep-alive to reduce connection overhead
53//! - Efficient message framing and decoding using the THeader protocol
54
55use std::{convert::Infallible, fmt::Debug, time::Duration};
56
57use certain_map::{Attach, Fork};
58use monoio::io::{sink::SinkExt, stream::Stream, AsyncReadRent, AsyncWriteRent};
59use monoio_codec::Framed;
60use monoio_thrift::codec::ttheader::{RawPayloadCodec, TTHeaderPayloadCodec};
61use monolake_core::{context::PeerAddr, thrift::ThriftHandler, AnyError};
62use service_async::{
63 layer::{layer_fn, FactoryLayer},
64 AsyncMakeService, MakeService, Param, ParamRef, Service,
65};
66use tracing::{error, info, trace, warn};
67
68/// Core Thrift service handler supporting the THeader protocol.
69///
70/// `TtheaderCoreService` is responsible for accepting Thrift connections, decoding requests,
71/// routing them through a handler chain, and encoding responses. It supports the THeader
72/// protocol for efficient message framing and metadata handling.
73/// For implementation details and example usage, see the
74/// [module level documentation](crate::thrift::ttheader).
75#[derive(Clone)]
76pub struct TtheaderCoreService<H> {
77 handler_chain: H,
78 thrift_timeout: ThriftServerTimeout,
79}
80
81impl<H> TtheaderCoreService<H> {
82 pub fn new(handler_chain: H, thrift_timeout: ThriftServerTimeout) -> Self {
83 TtheaderCoreService {
84 handler_chain,
85 thrift_timeout,
86 }
87 }
88}
89
90impl<H, Stream, CXIn, CXStore, CXState, ERR> Service<(Stream, CXIn)> for TtheaderCoreService<H>
91where
92 CXIn: ParamRef<PeerAddr> + Fork<Store = CXStore, State = CXState>,
93 CXStore: 'static,
94 for<'a> CXState: Attach<CXStore>,
95 for<'a> H: ThriftHandler<<CXState as Attach<CXStore>>::Hdr<'a>, Error = ERR>,
96 ERR: Into<AnyError> + Debug,
97 Stream: AsyncReadRent + AsyncWriteRent + Unpin + 'static,
98{
99 type Response = ();
100 type Error = Infallible;
101
102 async fn call(&self, (stream, ctx): (Stream, CXIn)) -> Result<Self::Response, Self::Error> {
103 let mut codec = Framed::new(stream, TTHeaderPayloadCodec::new(RawPayloadCodec::new()));
104 loop {
105 if let Some(keepalive_timeout) = self.thrift_timeout.keepalive_timeout {
106 match monoio::time::timeout(keepalive_timeout, codec.peek_data()).await {
107 Ok(Ok([])) => {
108 // Connection closed normally.
109 info!("Connection closed due to keepalive timeout");
110 break;
111 }
112 Ok(Err(io_error)) => {
113 error!(
114 "Connection {:?} io error: {io_error}",
115 ParamRef::<PeerAddr>::param_ref(&ctx)
116 );
117 break;
118 }
119 Err(_) => {
120 info!(
121 "Connection {:?} keepalive timed out",
122 ParamRef::<PeerAddr>::param_ref(&ctx),
123 );
124 break;
125 }
126 _ => {}
127 }
128 }
129
130 // decode request with message timeout
131 let decoded = match self.thrift_timeout.message_timeout {
132 Some(message_timeout) => {
133 match monoio::time::timeout(message_timeout, codec.next()).await {
134 Ok(x) => x,
135 Err(_) => {
136 info!(
137 "Connection {:?} message timed out",
138 ParamRef::<PeerAddr>::param_ref(&ctx),
139 );
140 break;
141 }
142 }
143 }
144 None => codec.next().await,
145 };
146
147 let req = match decoded {
148 Some(Ok(req)) => req,
149 Some(Err(err)) => {
150 // decode error
151 error!("decode thrift message failed: {err}");
152 break;
153 }
154 None => {
155 // Connection closed normally.
156 trace!("Connection closed normally due to read EOF");
157 break;
158 }
159 };
160
161 // fork ctx
162 let (mut store, state) = ctx.fork();
163 let forked_ctx = unsafe { state.attach(&mut store) };
164
165 // handle request and reply response
166 match self.handler_chain.handle(req, forked_ctx).await {
167 Ok(resp) => {
168 if let Err(e) = codec.send_and_flush(resp).await {
169 warn!("error when reply client: {e}");
170 break;
171 }
172 trace!("sent thrift response");
173 }
174 Err(e) => {
175 // something error when process request(not a biz error)
176 error!("error when processing request: {e:?}");
177 // todo: error resp
178 // if let Err(e) = encoder
179 // .send_and_flush(generate_response(StatusCode::INTERNAL_SERVER_ERROR, true))
180 // .await
181 // {
182 // warn!("error when reply client: {e}");
183 // }
184 break;
185 }
186 }
187 }
188 Ok(())
189 }
190}
191
192// TtheaderCoreService is a Service and a MakeService.
193impl<F> MakeService for TtheaderCoreService<F>
194where
195 F: MakeService,
196{
197 type Service = TtheaderCoreService<F::Service>;
198 type Error = F::Error;
199
200 fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
201 Ok(TtheaderCoreService {
202 handler_chain: self
203 .handler_chain
204 .make_via_ref(old.map(|o| &o.handler_chain))?,
205 thrift_timeout: self.thrift_timeout,
206 })
207 }
208}
209
210impl<F: AsyncMakeService> AsyncMakeService for TtheaderCoreService<F> {
211 type Service = TtheaderCoreService<F::Service>;
212 type Error = F::Error;
213
214 async fn make_via_ref(
215 &self,
216 old: Option<&Self::Service>,
217 ) -> Result<Self::Service, Self::Error> {
218 Ok(TtheaderCoreService {
219 handler_chain: self
220 .handler_chain
221 .make_via_ref(old.map(|o| &o.handler_chain))
222 .await?,
223 thrift_timeout: self.thrift_timeout,
224 })
225 }
226}
227
228/// Configuration for Thrift server timeouts.
229///
230/// This struct allows setting timeouts for connection keepalive and message reading,
231/// providing fine-grained control over the Thrift server's behavior.
232#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Hash)]
233pub struct ThriftServerTimeout {
234 // Connection keepalive timeout: If no byte comes when decoder want next request, close the
235 // connection. Link Nginx `keepalive_timeout`
236 pub keepalive_timeout: Option<Duration>,
237 // Read full thrift message.
238 pub message_timeout: Option<Duration>,
239}
240
241impl<F> TtheaderCoreService<F> {
242 pub fn layer<C>() -> impl FactoryLayer<C, F, Factory = Self>
243 where
244 C: Param<ThriftServerTimeout>,
245 {
246 layer_fn(|c: &C, inner| Self::new(inner, c.param()))
247 }
248}