1use std::{future::Future, sync::Arc};
2
3use bytes::{Buf, Bytes};
4pub use pilota::thrift::Message;
5use pilota::thrift::{
6 ProtocolException, TAsyncInputProtocol, TInputProtocol, TLengthProtocol, TMessageIdentifier,
7 TOutputProtocol, ThriftException,
8};
9
10pub trait EntryMessage: Sized + Send {
11 fn encode<T: TOutputProtocol>(&self, protocol: &mut T) -> Result<(), ThriftException>;
12
13 fn decode<T: TInputProtocol>(
14 protocol: &mut T,
15 msg_ident: &TMessageIdentifier,
16 ) -> Result<Self, ThriftException>;
17
18 fn decode_async<T: TAsyncInputProtocol>(
19 protocol: &mut T,
20 msg_ident: &TMessageIdentifier,
21 ) -> impl Future<Output = Result<Self, ThriftException>> + Send;
22
23 fn size<T: TLengthProtocol>(&self, protocol: &mut T) -> usize;
24}
25
26impl<Message> EntryMessage for Arc<Message>
27where
28 Message: EntryMessage + Sync,
29{
30 #[inline]
31 fn encode<T: TOutputProtocol>(&self, protocol: &mut T) -> Result<(), ThriftException> {
32 (**self).encode(protocol)
33 }
34
35 #[inline]
36 fn decode<T: TInputProtocol>(
37 protocol: &mut T,
38 msg_ident: &TMessageIdentifier,
39 ) -> Result<Self, ThriftException> {
40 Message::decode(protocol, msg_ident).map(Arc::new)
41 }
42
43 #[inline]
44 async fn decode_async<T: TAsyncInputProtocol>(
45 protocol: &mut T,
46 msg_ident: &TMessageIdentifier,
47 ) -> Result<Self, ThriftException> {
48 Message::decode_async(protocol, msg_ident)
49 .await
50 .map(Arc::new)
51 }
52
53 #[inline]
54 fn size<T: TLengthProtocol>(&self, protocol: &mut T) -> usize {
55 (**self).size(protocol)
56 }
57}
58
59impl EntryMessage for Bytes {
60 fn encode<T: TOutputProtocol>(&self, protocol: &mut T) -> Result<(), ThriftException> {
61 protocol.write_bytes_without_len(self.clone())
62 }
63
64 fn decode<T: TInputProtocol>(
65 protocol: &mut T,
66 _msg_ident: &TMessageIdentifier,
67 ) -> Result<Self, ThriftException> {
68 let ptr = protocol.buf().chunk().as_ptr();
69 let len = protocol.buf().remaining();
70 let buf = protocol.get_bytes(Some(ptr), len)?;
71
72 Ok(buf)
73 }
74
75 async fn decode_async<T: TAsyncInputProtocol>(
76 _protocol: &mut T,
77 _msg_ident: &TMessageIdentifier,
78 ) -> Result<Self, ThriftException> {
79 Err(ThriftException::Protocol(ProtocolException::new(
80 pilota::thrift::ProtocolExceptionKind::NotImplemented,
81 "Binary response decode is not supported for pure Buffered protocol since we don't \
82 know the length of the message",
83 )))
84 }
85
86 fn size<T: TLengthProtocol>(&self, _protocol: &mut T) -> usize {
87 self.as_ref().len()
88 }
89}