1use std::ffi::CStr;
18use std::marker::PhantomData;
19use std::sync::Arc;
20use std::sync::Mutex;
21
22use anyhow::bail;
23use anyhow::Error;
24use anyhow::Result;
25use async_trait::async_trait;
26use futures::stream::BoxStream;
27
28use crate::application_exception::ApplicationException;
29use crate::application_exception::ApplicationExceptionErrorCode;
30use crate::context_stack::ContextStack;
31use crate::exceptions::ExceptionInfo;
32use crate::exceptions::ResultInfo;
33use crate::framing::Framing;
34use crate::framing::FramingDecoded;
35use crate::framing::FramingEncodedFinal;
36use crate::protocol::Protocol;
37use crate::protocol::ProtocolDecoded;
38use crate::protocol::ProtocolReader;
39use crate::protocol::ProtocolWriter;
40use crate::request_context::RequestContext;
41use crate::serialize::Serialize;
42use crate::thrift_protocol::ProtocolID;
43use crate::ttype::TType;
44
45pub enum SerializedStreamElement<Payload> {
46 Success(Payload),
48 DeclaredException(Payload),
50 ApplicationException(ApplicationException),
52 SerializationError(Error),
54}
55
56pub trait ReplyState<F>
57where
58 F: Framing,
59{
60 type RequestContext;
61
62 fn send_reply(&mut self, reply: FramingEncodedFinal<F>);
63 fn send_stream_reply(
64 &mut self,
65 response: FramingEncodedFinal<F>,
66 stream: Option<BoxStream<'static, SerializedStreamElement<FramingEncodedFinal<F>>>>,
67 protocol_id: ProtocolID,
68 ) -> Result<()>;
69 fn set_interaction_processor(
70 &mut self,
71 _processor: Arc<
72 dyn ThriftService<
73 F,
74 Handler = (),
75 RequestContext = Self::RequestContext,
76 ReplyState = Self,
77 > + ::std::marker::Send
78 + 'static,
79 >,
80 ) -> Result<()> {
81 bail!("Thrift server does not support interactions");
82 }
83}
84
85#[async_trait]
86pub trait ThriftService<F>: Send + Sync + 'static
87where
88 F: Framing + Send + 'static,
89{
90 type Handler;
91 type RequestContext;
92 type ReplyState;
93
94 async fn call(
95 &self,
96 req: FramingDecoded<F>,
97 req_ctxt: &Self::RequestContext,
98 reply_state: Arc<Mutex<Self::ReplyState>>,
99 ) -> Result<(), Error>;
100
101 fn create_interaction(
102 &self,
103 _name: &str,
104 ) -> ::anyhow::Result<
105 Arc<
106 dyn ThriftService<
107 F,
108 Handler = (),
109 RequestContext = Self::RequestContext,
110 ReplyState = Self::ReplyState,
111 > + ::std::marker::Send
112 + 'static,
113 >,
114 > {
115 bail!("Thrift server does not support interactions");
116 }
117
118 fn get_method_names(&self) -> &'static [&'static str];
124
125 async fn on_termination(&self);
133}
134
135#[async_trait]
136impl<F, T> ThriftService<F> for Box<T>
137where
138 T: ThriftService<F> + Send + Sync + ?Sized,
139 F: Framing + Send + 'static,
140 T::RequestContext: Send + Sync + 'static,
141 T::ReplyState: Send + Sync + 'static,
142{
143 type Handler = T::Handler;
144 type RequestContext = T::RequestContext;
145 type ReplyState = T::ReplyState;
146
147 async fn call(
148 &self,
149 req: FramingDecoded<F>,
150 req_ctxt: &Self::RequestContext,
151 reply_state: Arc<Mutex<Self::ReplyState>>,
152 ) -> Result<(), Error> {
153 (**self).call(req, req_ctxt, reply_state).await
154 }
155
156 fn create_interaction(
157 &self,
158 name: &str,
159 ) -> ::anyhow::Result<
160 Arc<
161 dyn ThriftService<
162 F,
163 Handler = (),
164 RequestContext = Self::RequestContext,
165 ReplyState = Self::ReplyState,
166 > + ::std::marker::Send
167 + 'static,
168 >,
169 > {
170 (**self).create_interaction(name)
171 }
172
173 fn get_method_names(&self) -> &'static [&'static str] {
174 (**self).get_method_names()
175 }
176
177 async fn on_termination(&self) {
178 (**self).on_termination().await
179 }
180}
181
182#[async_trait]
183impl<F, T> ThriftService<F> for Arc<T>
184where
185 T: ThriftService<F> + Send + Sync + ?Sized,
186 F: Framing + Send + 'static,
187 T::RequestContext: Send + Sync + 'static,
188 T::ReplyState: Send + Sync + 'static,
189{
190 type Handler = T::Handler;
191 type RequestContext = T::RequestContext;
192 type ReplyState = T::ReplyState;
193
194 async fn call(
195 &self,
196 req: FramingDecoded<F>,
197 req_ctxt: &Self::RequestContext,
198 reply_state: Arc<Mutex<Self::ReplyState>>,
199 ) -> Result<(), Error> {
200 (**self).call(req, req_ctxt, reply_state).await
201 }
202
203 fn create_interaction(
204 &self,
205 name: &str,
206 ) -> ::anyhow::Result<
207 Arc<
208 dyn ThriftService<
209 F,
210 Handler = (),
211 RequestContext = Self::RequestContext,
212 ReplyState = Self::ReplyState,
213 > + ::std::marker::Send
214 + 'static,
215 >,
216 > {
217 (**self).create_interaction(name)
218 }
219
220 fn get_method_names(&self) -> &'static [&'static str] {
221 (**self).get_method_names()
222 }
223
224 async fn on_termination(&self) {
225 (**self).on_termination().await
226 }
227}
228
229#[async_trait]
231pub trait ServiceProcessor<P>
232where
233 P: Protocol,
234{
235 type RequestContext;
236 type ReplyState;
237
238 fn method_idx(&self, name: &[u8]) -> Result<usize, ApplicationException>;
240
241 async fn handle_method(
247 &self,
248 idx: usize,
249 d: &mut P::Deserializer,
251 req: ProtocolDecoded<P>,
252 req_ctxt: &Self::RequestContext,
253 reply_state: Arc<Mutex<Self::ReplyState>>,
254 seqid: u32,
255 ) -> Result<(), Error>;
256
257 fn create_interaction_idx(&self, _name: &str) -> ::anyhow::Result<::std::primitive::usize> {
259 bail!("Processor does not support interactions");
260 }
261
262 fn handle_create_interaction(
264 &self,
265 _idx: ::std::primitive::usize,
266 ) -> ::anyhow::Result<
267 Arc<
268 dyn ThriftService<
269 P::Frame,
270 Handler = (),
271 RequestContext = Self::RequestContext,
272 ReplyState = Self::ReplyState,
273 > + ::std::marker::Send
274 + 'static,
275 >,
276 > {
277 bail!("Processor does not support interactions");
278 }
279
280 async fn handle_on_termination(&self);
282}
283
284#[derive(Debug, Clone)]
287pub struct NullServiceProcessor<P, R, RS> {
288 _phantom: PhantomData<(P, R, RS)>,
289}
290
291impl<P, R, RS> NullServiceProcessor<P, R, RS> {
292 pub fn new() -> Self {
293 Self {
294 _phantom: PhantomData,
295 }
296 }
297}
298
299impl<P, R, RS> Default for NullServiceProcessor<P, R, RS> {
300 fn default() -> Self {
301 Self::new()
302 }
303}
304
305#[async_trait]
306impl<P, R, RS> ServiceProcessor<P> for NullServiceProcessor<P, R, RS>
307where
308 P: Protocol + Sync,
309 P::Deserializer: Send,
310 R: Sync,
311 RS: Sync + Send,
312{
313 type RequestContext = R;
314 type ReplyState = RS;
315
316 #[inline]
317 fn method_idx(&self, name: &[u8]) -> Result<usize, ApplicationException> {
318 Err(ApplicationException::new(
319 ApplicationExceptionErrorCode::UnknownMethod,
320 format!("Unknown method {}", String::from_utf8_lossy(name)),
321 ))
322 }
323
324 async fn handle_method(
325 &self,
326 _idx: usize,
327 _d: &mut P::Deserializer,
329 _req: ProtocolDecoded<P>,
330 _req_ctxt: &R,
331 _reply_state: Arc<Mutex<RS>>,
332 _seqid: u32,
333 ) -> Result<(), Error> {
334 unimplemented!("NullServiceProcessor implements no methods")
336 }
337
338 fn create_interaction_idx(&self, name: &str) -> ::anyhow::Result<::std::primitive::usize> {
339 bail!("Unknown interaction {}", name);
340 }
341
342 fn handle_create_interaction(
343 &self,
344 _idx: ::std::primitive::usize,
345 ) -> ::anyhow::Result<
346 Arc<
347 dyn ThriftService<
348 P::Frame,
349 Handler = (),
350 RequestContext = Self::RequestContext,
351 ReplyState = Self::ReplyState,
352 > + ::std::marker::Send
353 + 'static,
354 >,
355 > {
356 unimplemented!("NullServiceProcessor implements no interactions")
357 }
358
359 async fn handle_on_termination(&self) {}
360}
361
362#[async_trait]
363impl<P, R, RS> ThriftService<P::Frame> for NullServiceProcessor<P, R, RS>
364where
365 P: Protocol + Send + Sync + 'static,
366 P::Frame: Send + 'static,
367 R: RequestContext<Name = CStr> + Send + Sync + 'static,
368 R::ContextStack: ContextStack<Name = CStr>,
369 RS: ReplyState<P::Frame> + Send + Sync + 'static,
370{
371 type Handler = ();
372 type RequestContext = R;
373 type ReplyState = RS;
374
375 async fn call(
376 &self,
377 req: ProtocolDecoded<P>,
378 rctxt: &R,
379 reply_state: Arc<Mutex<RS>>,
380 ) -> Result<(), Error> {
381 let mut p = P::deserializer(req);
382
383 const SERVICE_NAME: &str = "NullService";
384 let ((name, ae), _, seqid) = p.read_message_begin(|name| {
385 let name = String::from_utf8_lossy(name).into_owned();
386 let ae = ApplicationException::unimplemented_method(SERVICE_NAME, &name);
387 (name, ae)
388 })?;
389
390 p.skip(TType::Struct)?;
391 p.read_message_end()?;
392
393 rctxt.set_user_exception_header(ae.exn_name(), &ae.exn_value())?;
394 let res = serialize!(P, |p| {
395 p.write_message_begin(&name, ae.result_type().message_type(), seqid);
396 ae.write(p);
397 p.write_message_end();
398 });
399 reply_state.lock().unwrap().send_reply(res);
400 Ok(())
401 }
402
403 fn create_interaction(
404 &self,
405 name: &str,
406 ) -> ::anyhow::Result<
407 Arc<
408 dyn ThriftService<
409 P::Frame,
410 Handler = Self::Handler,
411 RequestContext = Self::RequestContext,
412 ReplyState = Self::ReplyState,
413 > + ::std::marker::Send
414 + 'static,
415 >,
416 > {
417 bail!("Unimplemented interaction {}", name);
418 }
419
420 fn get_method_names(&self) -> &'static [&'static str] {
421 &[]
422 }
423
424 async fn on_termination(&self) {}
425}