nebula_client/v1/storage/
transport_response_handler.rs

1use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind};
2
3use fbthrift::{
4    binary_protocol::BinaryProtocolDeserializer, ApplicationException, Deserialize, MessageType,
5    ProtocolReader,
6};
7use fbthrift_transport_response_handler::ResponseHandler;
8use nebula_fbthrift_storage_v1::services::storage_service::{ScanEdgeExn, ScanVertexExn};
9
10#[derive(Clone)]
11pub struct StorageTransportResponseHandler;
12
13impl ResponseHandler for StorageTransportResponseHandler {
14    fn try_make_static_response_bytes(
15        &mut self,
16        _service_name: &'static [u8],
17        fn_name: &'static [u8],
18        _request_bytes: &[u8],
19    ) -> Result<Option<Vec<u8>>, IoError> {
20        match fn_name {
21            b"StorageService.scanVertex" | b"StorageService.scanEdge" => Ok(None),
22            _ => Err(IoError::new(
23                IoErrorKind::Other,
24                format!("Unknown method {}", String::from_utf8_lossy(fn_name)),
25            )),
26        }
27    }
28
29    fn parse_response_bytes(&mut self, response_bytes: &[u8]) -> Result<Option<usize>, IoError> {
30        let mut des = BinaryProtocolDeserializer::new(Cursor::new(response_bytes));
31        let (name, message_type, _) = match des.read_message_begin(|v| v.to_vec()) {
32            Ok(v) => v,
33            Err(_) => return Ok(None),
34        };
35
36        match &name[..] {
37            b"scanVertex" | b"scanEdge" => {}
38            _ => return Ok(None),
39        };
40
41        match message_type {
42            MessageType::Reply => {
43                match &name[..] {
44                    b"scanVertex" => {
45                        let _: ScanVertexExn = match Deserialize::read(&mut des) {
46                            Ok(v) => v,
47                            Err(_) => return Ok(None),
48                        };
49                    }
50                    b"scanEdge" => {
51                        let _: ScanEdgeExn = match Deserialize::read(&mut des) {
52                            Ok(v) => v,
53                            Err(_) => return Ok(None),
54                        };
55                    }
56                    _ => unreachable!(),
57                };
58            }
59            MessageType::Exception => {
60                let _: ApplicationException = match Deserialize::read(&mut des) {
61                    Ok(v) => v,
62                    Err(_) => return Ok(None),
63                };
64            }
65            MessageType::Call | MessageType::Oneway | MessageType::InvalidMessageType => {}
66        }
67
68        match des.read_message_end() {
69            Ok(v) => v,
70            Err(_) => return Ok(None),
71        };
72
73        Ok(Some(des.into_inner().position() as usize))
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80
81    #[test]
82    fn test_try_make_static_response_bytes() -> Result<(), Box<dyn std::error::Error>> {
83        let mut handler = StorageTransportResponseHandler;
84
85        assert_eq!(
86            handler.try_make_static_response_bytes(
87                b"StorageService",
88                b"StorageService.scanVertex",
89                b"FOO"
90            )?,
91            None
92        );
93        assert_eq!(
94            handler.try_make_static_response_bytes(
95                b"StorageService",
96                b"StorageService.scanEdge",
97                b"FOO"
98            )?,
99            None
100        );
101        match handler.try_make_static_response_bytes(
102            b"StorageService",
103            b"StorageService.foo",
104            b"FOO",
105        ) {
106            Ok(_) => panic!(),
107            Err(err) => {
108                assert_eq!(err.kind(), IoErrorKind::Other);
109
110                assert_eq!(err.to_string(), "Unknown method StorageService.foo");
111            }
112        }
113
114        Ok(())
115    }
116}