use std::ffi::CStr;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::Mutex;
use anyhow::bail;
use anyhow::Error;
use anyhow::Result;
use async_trait::async_trait;
use futures::stream::BoxStream;
use crate::application_exception::ApplicationException;
use crate::application_exception::ApplicationExceptionErrorCode;
use crate::context_stack::ContextStack;
use crate::exceptions::ExceptionInfo;
use crate::exceptions::ResultInfo;
use crate::framing::Framing;
use crate::framing::FramingDecoded;
use crate::framing::FramingEncodedFinal;
use crate::protocol::Protocol;
use crate::protocol::ProtocolDecoded;
use crate::protocol::ProtocolReader;
use crate::protocol::ProtocolWriter;
use crate::request_context::RequestContext;
use crate::serialize::Serialize;
use crate::thrift_protocol::ProtocolID;
use crate::ttype::TType;
pub enum SerializedStreamElement<Payload> {
Success(Payload),
DeclaredException(Payload),
ApplicationException(ApplicationException),
SerializationError(Error),
}
pub trait ReplyState<F>
where
F: Framing,
{
type RequestContext;
fn send_reply(&mut self, reply: FramingEncodedFinal<F>);
fn send_stream_reply(
&mut self,
response: FramingEncodedFinal<F>,
stream: Option<BoxStream<'static, SerializedStreamElement<FramingEncodedFinal<F>>>>,
protocol_id: ProtocolID,
) -> Result<()>;
fn set_interaction_processor(
&mut self,
_processor: Arc<
dyn ThriftService<
F,
Handler = (),
RequestContext = Self::RequestContext,
ReplyState = Self,
> + ::std::marker::Send
+ 'static,
>,
) -> Result<()> {
bail!("Thrift server does not support interactions");
}
}
#[async_trait]
pub trait ThriftService<F>: Send + Sync + 'static
where
F: Framing + Send + 'static,
{
type Handler;
type RequestContext;
type ReplyState;
async fn call(
&self,
req: FramingDecoded<F>,
req_ctxt: &Self::RequestContext,
reply_state: Arc<Mutex<Self::ReplyState>>,
) -> Result<(), Error>;
fn create_interaction(
&self,
_name: &str,
) -> ::anyhow::Result<
Arc<
dyn ThriftService<
F,
Handler = (),
RequestContext = Self::RequestContext,
ReplyState = Self::ReplyState,
> + ::std::marker::Send
+ 'static,
>,
> {
bail!("Thrift server does not support interactions");
}
fn get_method_names(&self) -> &'static [&'static str];
}
#[async_trait]
impl<F, T> ThriftService<F> for Box<T>
where
T: ThriftService<F> + Send + Sync + ?Sized,
F: Framing + Send + 'static,
T::RequestContext: Send + Sync + 'static,
T::ReplyState: Send + Sync + 'static,
{
type Handler = T::Handler;
type RequestContext = T::RequestContext;
type ReplyState = T::ReplyState;
async fn call(
&self,
req: FramingDecoded<F>,
req_ctxt: &Self::RequestContext,
reply_state: Arc<Mutex<Self::ReplyState>>,
) -> Result<(), Error> {
(**self).call(req, req_ctxt, reply_state).await
}
fn create_interaction(
&self,
name: &str,
) -> ::anyhow::Result<
Arc<
dyn ThriftService<
F,
Handler = (),
RequestContext = Self::RequestContext,
ReplyState = Self::ReplyState,
> + ::std::marker::Send
+ 'static,
>,
> {
(**self).create_interaction(name)
}
fn get_method_names(&self) -> &'static [&'static str] {
(**self).get_method_names()
}
}
#[async_trait]
impl<F, T> ThriftService<F> for Arc<T>
where
T: ThriftService<F> + Send + Sync + ?Sized,
F: Framing + Send + 'static,
T::RequestContext: Send + Sync + 'static,
T::ReplyState: Send + Sync + 'static,
{
type Handler = T::Handler;
type RequestContext = T::RequestContext;
type ReplyState = T::ReplyState;
async fn call(
&self,
req: FramingDecoded<F>,
req_ctxt: &Self::RequestContext,
reply_state: Arc<Mutex<Self::ReplyState>>,
) -> Result<(), Error> {
(**self).call(req, req_ctxt, reply_state).await
}
fn create_interaction(
&self,
name: &str,
) -> ::anyhow::Result<
Arc<
dyn ThriftService<
F,
Handler = (),
RequestContext = Self::RequestContext,
ReplyState = Self::ReplyState,
> + ::std::marker::Send
+ 'static,
>,
> {
(**self).create_interaction(name)
}
fn get_method_names(&self) -> &'static [&'static str] {
(**self).get_method_names()
}
}
#[async_trait]
pub trait ServiceProcessor<P>
where
P: Protocol,
{
type RequestContext;
type ReplyState;
fn method_idx(&self, name: &[u8]) -> Result<usize, ApplicationException>;
async fn handle_method(
&self,
idx: usize,
request: &mut P::Deserializer,
req_ctxt: &Self::RequestContext,
reply_state: Arc<Mutex<Self::ReplyState>>,
seqid: u32,
) -> Result<(), Error>;
fn create_interaction_idx(&self, _name: &str) -> ::anyhow::Result<::std::primitive::usize> {
bail!("Processor does not support interactions");
}
fn handle_create_interaction(
&self,
_idx: ::std::primitive::usize,
) -> ::anyhow::Result<
Arc<
dyn ThriftService<
P::Frame,
Handler = (),
RequestContext = Self::RequestContext,
ReplyState = Self::ReplyState,
> + ::std::marker::Send
+ 'static,
>,
> {
bail!("Processor does not support interactions");
}
}
#[derive(Debug, Clone)]
pub struct NullServiceProcessor<P, R, RS> {
_phantom: PhantomData<(P, R, RS)>,
}
impl<P, R, RS> NullServiceProcessor<P, R, RS> {
pub fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}
impl<P, R, RS> Default for NullServiceProcessor<P, R, RS> {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl<P, R, RS> ServiceProcessor<P> for NullServiceProcessor<P, R, RS>
where
P: Protocol + Sync,
P::Deserializer: Send,
R: Sync,
RS: Sync + Send,
{
type RequestContext = R;
type ReplyState = RS;
#[inline]
fn method_idx(&self, name: &[u8]) -> Result<usize, ApplicationException> {
Err(ApplicationException::new(
ApplicationExceptionErrorCode::UnknownMethod,
format!("Unknown method {}", String::from_utf8_lossy(name)),
))
}
async fn handle_method(
&self,
_idx: usize,
_d: &mut P::Deserializer,
_req_ctxt: &R,
_reply_state: Arc<Mutex<RS>>,
_seqid: u32,
) -> Result<(), Error> {
unimplemented!("NullServiceProcessor implements no methods")
}
fn create_interaction_idx(&self, name: &str) -> ::anyhow::Result<::std::primitive::usize> {
bail!("Unknown interaction {}", name);
}
fn handle_create_interaction(
&self,
_idx: ::std::primitive::usize,
) -> ::anyhow::Result<
Arc<
dyn ThriftService<
P::Frame,
Handler = (),
RequestContext = Self::RequestContext,
ReplyState = Self::ReplyState,
> + ::std::marker::Send
+ 'static,
>,
> {
unimplemented!("NullServiceProcessor implements no interactions")
}
}
#[async_trait]
impl<P, R, RS> ThriftService<P::Frame> for NullServiceProcessor<P, R, RS>
where
P: Protocol + Send + Sync + 'static,
P::Frame: Send + 'static,
R: RequestContext<Name = CStr> + Send + Sync + 'static,
R::ContextStack: ContextStack<Name = CStr>,
RS: ReplyState<P::Frame> + Send + Sync + 'static,
{
type Handler = ();
type RequestContext = R;
type ReplyState = RS;
async fn call(
&self,
req: ProtocolDecoded<P>,
rctxt: &R,
reply_state: Arc<Mutex<RS>>,
) -> Result<(), Error> {
let mut p = P::deserializer(req);
const SERVICE_NAME: &str = "NullService";
let ((name, ae), _, seqid) = p.read_message_begin(|name| {
let name = String::from_utf8_lossy(name).into_owned();
let ae = ApplicationException::unimplemented_method(SERVICE_NAME, &name);
(name, ae)
})?;
p.skip(TType::Struct)?;
p.read_message_end()?;
rctxt.set_user_exception_header(ae.exn_name(), &ae.exn_value())?;
let res = serialize!(P, |p| {
p.write_message_begin(&name, ae.result_type().message_type(), seqid);
ae.write(p);
p.write_message_end();
});
reply_state.lock().unwrap().send_reply(res);
Ok(())
}
fn create_interaction(
&self,
name: &str,
) -> ::anyhow::Result<
Arc<
dyn ThriftService<
P::Frame,
Handler = Self::Handler,
RequestContext = Self::RequestContext,
ReplyState = Self::ReplyState,
> + ::std::marker::Send
+ 'static,
>,
> {
bail!("Unimplemented interaction {}", name);
}
fn get_method_names(&self) -> &'static [&'static str] {
&[]
}
}