Skip to main content

volo_thrift/
message.rs

1use std::{future::Future, sync::Arc};
2
3use bytes::{Buf, Bytes};
4pub use pilota::thrift::Message;
5use pilota::thrift::{
6    ProtocolException, TAsyncInputProtocol, TInputProtocol, TLengthProtocol, TMessageIdentifier,
7    TOutputProtocol, ThriftException,
8};
9
10pub trait EntryMessage: Sized + Send {
11    fn encode<T: TOutputProtocol>(&self, protocol: &mut T) -> Result<(), ThriftException>;
12
13    fn decode<T: TInputProtocol>(
14        protocol: &mut T,
15        msg_ident: &TMessageIdentifier,
16    ) -> Result<Self, ThriftException>;
17
18    fn decode_async<T: TAsyncInputProtocol>(
19        protocol: &mut T,
20        msg_ident: &TMessageIdentifier,
21    ) -> impl Future<Output = Result<Self, ThriftException>> + Send;
22
23    fn size<T: TLengthProtocol>(&self, protocol: &mut T) -> usize;
24}
25
26impl<Message> EntryMessage for Arc<Message>
27where
28    Message: EntryMessage + Sync,
29{
30    #[inline]
31    fn encode<T: TOutputProtocol>(&self, protocol: &mut T) -> Result<(), ThriftException> {
32        (**self).encode(protocol)
33    }
34
35    #[inline]
36    fn decode<T: TInputProtocol>(
37        protocol: &mut T,
38        msg_ident: &TMessageIdentifier,
39    ) -> Result<Self, ThriftException> {
40        Message::decode(protocol, msg_ident).map(Arc::new)
41    }
42
43    #[inline]
44    async fn decode_async<T: TAsyncInputProtocol>(
45        protocol: &mut T,
46        msg_ident: &TMessageIdentifier,
47    ) -> Result<Self, ThriftException> {
48        Message::decode_async(protocol, msg_ident)
49            .await
50            .map(Arc::new)
51    }
52
53    #[inline]
54    fn size<T: TLengthProtocol>(&self, protocol: &mut T) -> usize {
55        (**self).size(protocol)
56    }
57}
58
59impl EntryMessage for Bytes {
60    fn encode<T: TOutputProtocol>(&self, protocol: &mut T) -> Result<(), ThriftException> {
61        protocol.write_bytes_without_len(self.clone())
62    }
63
64    fn decode<T: TInputProtocol>(
65        protocol: &mut T,
66        _msg_ident: &TMessageIdentifier,
67    ) -> Result<Self, ThriftException> {
68        let ptr = protocol.buf().chunk().as_ptr();
69        let len = protocol.buf().remaining();
70        let buf = protocol.get_bytes(Some(ptr), len)?;
71
72        Ok(buf)
73    }
74
75    async fn decode_async<T: TAsyncInputProtocol>(
76        _protocol: &mut T,
77        _msg_ident: &TMessageIdentifier,
78    ) -> Result<Self, ThriftException> {
79        Err(ThriftException::Protocol(ProtocolException::new(
80            pilota::thrift::ProtocolExceptionKind::NotImplemented,
81            "Binary response decode is not supported for pure Buffered protocol since we don't \
82             know the length of the message",
83        )))
84    }
85
86    fn size<T: TLengthProtocol>(&self, _protocol: &mut T) -> usize {
87        self.as_ref().len()
88    }
89}