fbthrift_git/
processor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use std::ffi::CStr;
18use std::marker::PhantomData;
19use std::sync::Arc;
20use std::sync::Mutex;
21
22use anyhow::bail;
23use anyhow::Error;
24use anyhow::Result;
25use async_trait::async_trait;
26use futures::stream::BoxStream;
27
28use crate::application_exception::ApplicationException;
29use crate::application_exception::ApplicationExceptionErrorCode;
30use crate::context_stack::ContextStack;
31use crate::exceptions::ExceptionInfo;
32use crate::exceptions::ResultInfo;
33use crate::framing::Framing;
34use crate::framing::FramingDecoded;
35use crate::framing::FramingEncodedFinal;
36use crate::protocol::Protocol;
37use crate::protocol::ProtocolDecoded;
38use crate::protocol::ProtocolReader;
39use crate::protocol::ProtocolWriter;
40use crate::request_context::RequestContext;
41use crate::serialize::Serialize;
42use crate::thrift_protocol::ProtocolID;
43use crate::ttype::TType;
44
45pub enum SerializedStreamElement<Payload> {
46    /// A normal stream response, without any error. Contains the serialized response.
47    Success(Payload),
48    /// Contains the serialized declared exception.
49    DeclaredException(Payload),
50    /// Contains the application exception.
51    ApplicationException(ApplicationException),
52    /// The serialization failed. Contains the error.
53    SerializationError(Error),
54}
55
56pub trait ReplyState<F>
57where
58    F: Framing,
59{
60    type RequestContext;
61
62    fn send_reply(&mut self, reply: FramingEncodedFinal<F>);
63    fn send_stream_reply(
64        &mut self,
65        response: FramingEncodedFinal<F>,
66        stream: Option<BoxStream<'static, SerializedStreamElement<FramingEncodedFinal<F>>>>,
67        protocol_id: ProtocolID,
68    ) -> Result<()>;
69    fn set_interaction_processor(
70        &mut self,
71        _processor: Arc<
72            dyn ThriftService<
73                    F,
74                    Handler = (),
75                    RequestContext = Self::RequestContext,
76                    ReplyState = Self,
77                > + ::std::marker::Send
78                + 'static,
79        >,
80    ) -> Result<()> {
81        bail!("Thrift server does not support interactions");
82    }
83}
84
85#[async_trait]
86pub trait ThriftService<F>: Send + Sync + 'static
87where
88    F: Framing + Send + 'static,
89{
90    type Handler;
91    type RequestContext;
92    type ReplyState;
93
94    async fn call(
95        &self,
96        req: FramingDecoded<F>,
97        req_ctxt: &Self::RequestContext,
98        reply_state: Arc<Mutex<Self::ReplyState>>,
99    ) -> Result<(), Error>;
100
101    fn create_interaction(
102        &self,
103        _name: &str,
104    ) -> ::anyhow::Result<
105        Arc<
106            dyn ThriftService<
107                    F,
108                    Handler = (),
109                    RequestContext = Self::RequestContext,
110                    ReplyState = Self::ReplyState,
111                > + ::std::marker::Send
112                + 'static,
113        >,
114    > {
115        bail!("Thrift server does not support interactions");
116    }
117
118    /// Returns function names this thrift service is able to handle, similar
119    /// to the keys of C++'s createMethodMetadata().
120    ///
121    /// Return value includes inherited functions from parent thrift services,
122    /// and interactions' functions.
123    fn get_method_names(&self) -> &'static [&'static str];
124
125    /// Applies to interactions only
126    ///
127    /// Termination callback is invoked immediately as soon as the client's
128    /// termination signal is received. This differs to the interaction service
129    /// being dropped, which only happens when all outstanding requests and
130    /// streams have been completed. This is not invoked if the connection
131    /// closes without the signal being received.
132    async fn on_termination(&self);
133}
134
135#[async_trait]
136impl<F, T> ThriftService<F> for Box<T>
137where
138    T: ThriftService<F> + Send + Sync + ?Sized,
139    F: Framing + Send + 'static,
140    T::RequestContext: Send + Sync + 'static,
141    T::ReplyState: Send + Sync + 'static,
142{
143    type Handler = T::Handler;
144    type RequestContext = T::RequestContext;
145    type ReplyState = T::ReplyState;
146
147    async fn call(
148        &self,
149        req: FramingDecoded<F>,
150        req_ctxt: &Self::RequestContext,
151        reply_state: Arc<Mutex<Self::ReplyState>>,
152    ) -> Result<(), Error> {
153        (**self).call(req, req_ctxt, reply_state).await
154    }
155
156    fn create_interaction(
157        &self,
158        name: &str,
159    ) -> ::anyhow::Result<
160        Arc<
161            dyn ThriftService<
162                    F,
163                    Handler = (),
164                    RequestContext = Self::RequestContext,
165                    ReplyState = Self::ReplyState,
166                > + ::std::marker::Send
167                + 'static,
168        >,
169    > {
170        (**self).create_interaction(name)
171    }
172
173    fn get_method_names(&self) -> &'static [&'static str] {
174        (**self).get_method_names()
175    }
176
177    async fn on_termination(&self) {
178        (**self).on_termination().await
179    }
180}
181
182#[async_trait]
183impl<F, T> ThriftService<F> for Arc<T>
184where
185    T: ThriftService<F> + Send + Sync + ?Sized,
186    F: Framing + Send + 'static,
187    T::RequestContext: Send + Sync + 'static,
188    T::ReplyState: Send + Sync + 'static,
189{
190    type Handler = T::Handler;
191    type RequestContext = T::RequestContext;
192    type ReplyState = T::ReplyState;
193
194    async fn call(
195        &self,
196        req: FramingDecoded<F>,
197        req_ctxt: &Self::RequestContext,
198        reply_state: Arc<Mutex<Self::ReplyState>>,
199    ) -> Result<(), Error> {
200        (**self).call(req, req_ctxt, reply_state).await
201    }
202
203    fn create_interaction(
204        &self,
205        name: &str,
206    ) -> ::anyhow::Result<
207        Arc<
208            dyn ThriftService<
209                    F,
210                    Handler = (),
211                    RequestContext = Self::RequestContext,
212                    ReplyState = Self::ReplyState,
213                > + ::std::marker::Send
214                + 'static,
215        >,
216    > {
217        (**self).create_interaction(name)
218    }
219
220    fn get_method_names(&self) -> &'static [&'static str] {
221        (**self).get_method_names()
222    }
223
224    async fn on_termination(&self) {
225        (**self).on_termination().await
226    }
227}
228
229/// Trait implemented by a generated type to implement a service.
230#[async_trait]
231pub trait ServiceProcessor<P>
232where
233    P: Protocol,
234{
235    type RequestContext;
236    type ReplyState;
237
238    /// Given a method name, return a reference to the processor for that index.
239    fn method_idx(&self, name: &[u8]) -> Result<usize, ApplicationException>;
240
241    /// Given a method index and the remains of the message input, get a future
242    /// for the result of the method. This will only be called if the corresponding
243    /// `method_idx()` returns an (index, ServiceProcessor) tuple.
244    /// `frame` is a reference to the frame containing the request.
245    /// `request` is a deserializer instance set up to decode the request.
246    async fn handle_method(
247        &self,
248        idx: usize,
249        //frame: &P::Frame,
250        d: &mut P::Deserializer,
251        req: ProtocolDecoded<P>,
252        req_ctxt: &Self::RequestContext,
253        reply_state: Arc<Mutex<Self::ReplyState>>,
254        seqid: u32,
255    ) -> Result<(), Error>;
256
257    /// Given a method name, return a reference to the interaction creation fn for that index
258    fn create_interaction_idx(&self, _name: &str) -> ::anyhow::Result<::std::primitive::usize> {
259        bail!("Processor does not support interactions");
260    }
261
262    /// Given a creation method index, it produces a fresh interaction processor
263    fn handle_create_interaction(
264        &self,
265        _idx: ::std::primitive::usize,
266    ) -> ::anyhow::Result<
267        Arc<
268            dyn ThriftService<
269                    P::Frame,
270                    Handler = (),
271                    RequestContext = Self::RequestContext,
272                    ReplyState = Self::ReplyState,
273                > + ::std::marker::Send
274                + 'static,
275        >,
276    > {
277        bail!("Processor does not support interactions");
278    }
279
280    /// See [ThriftService::on_termination] docs
281    async fn handle_on_termination(&self);
282}
283
284/// Null processor which implements no methods - it acts as the super for any service
285/// which has no super-service.
286#[derive(Debug, Clone)]
287pub struct NullServiceProcessor<P, R, RS> {
288    _phantom: PhantomData<(P, R, RS)>,
289}
290
291impl<P, R, RS> NullServiceProcessor<P, R, RS> {
292    pub fn new() -> Self {
293        Self {
294            _phantom: PhantomData,
295        }
296    }
297}
298
299impl<P, R, RS> Default for NullServiceProcessor<P, R, RS> {
300    fn default() -> Self {
301        Self::new()
302    }
303}
304
305#[async_trait]
306impl<P, R, RS> ServiceProcessor<P> for NullServiceProcessor<P, R, RS>
307where
308    P: Protocol + Sync,
309    P::Deserializer: Send,
310    R: Sync,
311    RS: Sync + Send,
312{
313    type RequestContext = R;
314    type ReplyState = RS;
315
316    #[inline]
317    fn method_idx(&self, name: &[u8]) -> Result<usize, ApplicationException> {
318        Err(ApplicationException::new(
319            ApplicationExceptionErrorCode::UnknownMethod,
320            format!("Unknown method {}", String::from_utf8_lossy(name)),
321        ))
322    }
323
324    async fn handle_method(
325        &self,
326        _idx: usize,
327        //_frame: &P::Frame,
328        _d: &mut P::Deserializer,
329        _req: ProtocolDecoded<P>,
330        _req_ctxt: &R,
331        _reply_state: Arc<Mutex<RS>>,
332        _seqid: u32,
333    ) -> Result<(), Error> {
334        // Should never be called since method_idx() always returns an error
335        unimplemented!("NullServiceProcessor implements no methods")
336    }
337
338    fn create_interaction_idx(&self, name: &str) -> ::anyhow::Result<::std::primitive::usize> {
339        bail!("Unknown interaction {}", name);
340    }
341
342    fn handle_create_interaction(
343        &self,
344        _idx: ::std::primitive::usize,
345    ) -> ::anyhow::Result<
346        Arc<
347            dyn ThriftService<
348                    P::Frame,
349                    Handler = (),
350                    RequestContext = Self::RequestContext,
351                    ReplyState = Self::ReplyState,
352                > + ::std::marker::Send
353                + 'static,
354        >,
355    > {
356        unimplemented!("NullServiceProcessor implements no interactions")
357    }
358
359    async fn handle_on_termination(&self) {}
360}
361
362#[async_trait]
363impl<P, R, RS> ThriftService<P::Frame> for NullServiceProcessor<P, R, RS>
364where
365    P: Protocol + Send + Sync + 'static,
366    P::Frame: Send + 'static,
367    R: RequestContext<Name = CStr> + Send + Sync + 'static,
368    R::ContextStack: ContextStack<Name = CStr>,
369    RS: ReplyState<P::Frame> + Send + Sync + 'static,
370{
371    type Handler = ();
372    type RequestContext = R;
373    type ReplyState = RS;
374
375    async fn call(
376        &self,
377        req: ProtocolDecoded<P>,
378        rctxt: &R,
379        reply_state: Arc<Mutex<RS>>,
380    ) -> Result<(), Error> {
381        let mut p = P::deserializer(req);
382
383        const SERVICE_NAME: &str = "NullService";
384        let ((name, ae), _, seqid) = p.read_message_begin(|name| {
385            let name = String::from_utf8_lossy(name).into_owned();
386            let ae = ApplicationException::unimplemented_method(SERVICE_NAME, &name);
387            (name, ae)
388        })?;
389
390        p.skip(TType::Struct)?;
391        p.read_message_end()?;
392
393        rctxt.set_user_exception_header(ae.exn_name(), &ae.exn_value())?;
394        let res = serialize!(P, |p| {
395            p.write_message_begin(&name, ae.result_type().message_type(), seqid);
396            ae.write(p);
397            p.write_message_end();
398        });
399        reply_state.lock().unwrap().send_reply(res);
400        Ok(())
401    }
402
403    fn create_interaction(
404        &self,
405        name: &str,
406    ) -> ::anyhow::Result<
407        Arc<
408            dyn ThriftService<
409                    P::Frame,
410                    Handler = Self::Handler,
411                    RequestContext = Self::RequestContext,
412                    ReplyState = Self::ReplyState,
413                > + ::std::marker::Send
414                + 'static,
415        >,
416    > {
417        bail!("Unimplemented interaction {}", name);
418    }
419
420    fn get_method_names(&self) -> &'static [&'static str] {
421        &[]
422    }
423
424    async fn on_termination(&self) {}
425}