1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
use std::io; use bytes::{Buf, Bytes, BytesMut}; use fbthrift::{ binary_protocol::{BinaryProtocolDeserializer, BinaryProtocolSerializer}, ApplicationException, Deserialize, MessageType, ProtocolReader, ProtocolWriter, Serialize, }; use fbthrift_transport_response_handler::ResponseHandler; use nebula_fbthrift_graph::services::graph_service::{AuthenticateExn, ExecuteExn, SignoutExn}; #[derive(Clone)] pub struct GraphTransportResponseHandler; impl ResponseHandler for GraphTransportResponseHandler { fn try_make_response_bytes( &self, request_bytes: &[u8], ) -> io::Result<(Vec<u8>, Option<Vec<u8>>)> { let mut des = BinaryProtocolDeserializer::<Bytes>::new(Bytes::from(request_bytes.to_vec())); let (name, message_type, seqid) = des .read_message_begin(|v| v.to_vec()) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; match &name[..] { b"authenticate" => Ok((name, None)), b"signout" => { if message_type != MessageType::Call { return Err(io::Error::new( io::ErrorKind::Other, format!("Unexpected message type {:?}", message_type), )); } let buf = BytesMut::with_capacity(1024); let mut ser = BinaryProtocolSerializer::<BytesMut>::with_buffer(buf); ser.write_message_begin("signout", MessageType::Reply, seqid); ser.write_message_end(); SignoutExn::Success(()).write(&mut ser); let res_buf = ser.finish().bytes().to_vec(); return Ok((name, Some(res_buf))); } b"execute" => Ok((name, None)), _ => { return Err(io::Error::new( io::ErrorKind::Other, format!("Unknown method {:?}", name), )) } } } fn parse_response_bytes( &self, _name: &[u8], response_bytes: &[u8], ) -> io::Result<Option<usize>> { let n = response_bytes.len(); let mut des = BinaryProtocolDeserializer::<Bytes>::new(Bytes::from(response_bytes.to_vec())); let (name, message_type, _) = match des.read_message_begin(|v| v.to_vec()) { Ok(v) => v, Err(_) => return Ok(None), }; match &name[..] { b"authenticate" => {} b"signout" => unreachable!(), b"execute" => {} _ => return Ok(None), }; match message_type { MessageType::Reply => { match &name[..] { b"authenticate" => { let _: AuthenticateExn = match Deserialize::read(&mut des) { Ok(v) => v, Err(_) => return Ok(None), }; } b"execute" => { let _: ExecuteExn = match Deserialize::read(&mut des) { Ok(v) => v, Err(_) => return Ok(None), }; } _ => unreachable!(), }; } MessageType::Exception => { let _: ApplicationException = match Deserialize::read(&mut des) { Ok(v) => v, Err(_) => return Ok(None), }; } MessageType::Call | MessageType::Oneway | MessageType::InvalidMessageType => {} } match des.read_message_end() { Ok(v) => v, Err(_) => return Ok(None), }; Ok(Some(n - des.into_inner().len())) } }