nebula_client/v1/storage/
transport_response_handler.rs1use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind};
2
3use fbthrift::{
4 binary_protocol::BinaryProtocolDeserializer, ApplicationException, Deserialize, MessageType,
5 ProtocolReader,
6};
7use fbthrift_transport_response_handler::ResponseHandler;
8use nebula_fbthrift_storage_v1::services::storage_service::{ScanEdgeExn, ScanVertexExn};
9
10#[derive(Clone)]
11pub struct StorageTransportResponseHandler;
12
13impl ResponseHandler for StorageTransportResponseHandler {
14 fn try_make_static_response_bytes(
15 &mut self,
16 _service_name: &'static [u8],
17 fn_name: &'static [u8],
18 _request_bytes: &[u8],
19 ) -> Result<Option<Vec<u8>>, IoError> {
20 match fn_name {
21 b"StorageService.scanVertex" | b"StorageService.scanEdge" => Ok(None),
22 _ => Err(IoError::new(
23 IoErrorKind::Other,
24 format!("Unknown method {}", String::from_utf8_lossy(fn_name)),
25 )),
26 }
27 }
28
29 fn parse_response_bytes(&mut self, response_bytes: &[u8]) -> Result<Option<usize>, IoError> {
30 let mut des = BinaryProtocolDeserializer::new(Cursor::new(response_bytes));
31 let (name, message_type, _) = match des.read_message_begin(|v| v.to_vec()) {
32 Ok(v) => v,
33 Err(_) => return Ok(None),
34 };
35
36 match &name[..] {
37 b"scanVertex" | b"scanEdge" => {}
38 _ => return Ok(None),
39 };
40
41 match message_type {
42 MessageType::Reply => {
43 match &name[..] {
44 b"scanVertex" => {
45 let _: ScanVertexExn = match Deserialize::read(&mut des) {
46 Ok(v) => v,
47 Err(_) => return Ok(None),
48 };
49 }
50 b"scanEdge" => {
51 let _: ScanEdgeExn = match Deserialize::read(&mut des) {
52 Ok(v) => v,
53 Err(_) => return Ok(None),
54 };
55 }
56 _ => unreachable!(),
57 };
58 }
59 MessageType::Exception => {
60 let _: ApplicationException = match Deserialize::read(&mut des) {
61 Ok(v) => v,
62 Err(_) => return Ok(None),
63 };
64 }
65 MessageType::Call | MessageType::Oneway | MessageType::InvalidMessageType => {}
66 }
67
68 match des.read_message_end() {
69 Ok(v) => v,
70 Err(_) => return Ok(None),
71 };
72
73 Ok(Some(des.into_inner().position() as usize))
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use super::*;
80
81 #[test]
82 fn test_try_make_static_response_bytes() -> Result<(), Box<dyn std::error::Error>> {
83 let mut handler = StorageTransportResponseHandler;
84
85 assert_eq!(
86 handler.try_make_static_response_bytes(
87 b"StorageService",
88 b"StorageService.scanVertex",
89 b"FOO"
90 )?,
91 None
92 );
93 assert_eq!(
94 handler.try_make_static_response_bytes(
95 b"StorageService",
96 b"StorageService.scanEdge",
97 b"FOO"
98 )?,
99 None
100 );
101 match handler.try_make_static_response_bytes(
102 b"StorageService",
103 b"StorageService.foo",
104 b"FOO",
105 ) {
106 Ok(_) => panic!(),
107 Err(err) => {
108 assert_eq!(err.kind(), IoErrorKind::Other);
109
110 assert_eq!(err.to_string(), "Unknown method StorageService.foo");
111 }
112 }
113
114 Ok(())
115 }
116}