nebula_client/v1/graph/
transport_response_handler.rs

1use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind};
2
3use bytes::BytesMut;
4use fbthrift::{
5    binary_protocol::{BinaryProtocolDeserializer, BinaryProtocolSerializer},
6    ApplicationException, Deserialize, MessageType, ProtocolReader, ProtocolWriter, Serialize,
7};
8use fbthrift_transport_response_handler::ResponseHandler;
9use nebula_fbthrift_graph_v1::services::graph_service::{AuthenticateExn, ExecuteExn, SignoutExn};
10
11#[derive(Clone)]
12pub struct GraphTransportResponseHandler;
13
14impl ResponseHandler for GraphTransportResponseHandler {
15    fn try_make_static_response_bytes(
16        &mut self,
17        _service_name: &'static [u8],
18        fn_name: &'static [u8],
19        request_bytes: &[u8],
20    ) -> Result<Option<Vec<u8>>, IoError> {
21        match fn_name {
22            b"GraphService.authenticate" => Ok(None),
23            b"GraphService.signout" => {
24                let mut des = BinaryProtocolDeserializer::new(Cursor::new(request_bytes));
25                let (name, message_type, seqid) = des
26                    .read_message_begin(|v| v.to_vec())
27                    .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
28
29                if name != b"signout" {
30                    return Err(IoError::new(
31                        IoErrorKind::Other,
32                        format!("Unexpected name {name:?}"),
33                    ));
34                }
35
36                if message_type != MessageType::Call {
37                    return Err(IoError::new(
38                        IoErrorKind::Other,
39                        format!("Unexpected message type {message_type:?}"),
40                    ));
41                }
42
43                let buf = BytesMut::with_capacity(1024);
44                let mut ser = BinaryProtocolSerializer::<BytesMut>::with_buffer(buf);
45
46                ser.write_message_begin("signout", MessageType::Reply, seqid);
47                ser.write_message_end();
48
49                SignoutExn::Success(()).write(&mut ser);
50
51                let res_buf = ser.finish().to_vec();
52
53                Ok(Some(res_buf))
54            }
55            b"GraphService.execute" => Ok(None),
56            _ => Err(IoError::new(
57                IoErrorKind::Other,
58                format!("Unknown method {}", String::from_utf8_lossy(fn_name)),
59            )),
60        }
61    }
62
63    fn parse_response_bytes(&mut self, response_bytes: &[u8]) -> Result<Option<usize>, IoError> {
64        let mut des = BinaryProtocolDeserializer::new(Cursor::new(response_bytes));
65        let (name, message_type, _) = match des.read_message_begin(|v| v.to_vec()) {
66            Ok(v) => v,
67            Err(_) => return Ok(None),
68        };
69
70        match &name[..] {
71            b"authenticate" => {}
72            b"signout" => unreachable!(),
73            b"execute" => {}
74            _ => return Ok(None),
75        };
76
77        match message_type {
78            MessageType::Reply => {
79                match &name[..] {
80                    b"authenticate" => {
81                        let _: AuthenticateExn = match Deserialize::read(&mut des) {
82                            Ok(v) => v,
83                            Err(_) => return Ok(None),
84                        };
85                    }
86                    b"execute" => {
87                        let _: ExecuteExn = match Deserialize::read(&mut des) {
88                            Ok(v) => v,
89                            Err(_) => return Ok(None),
90                        };
91                    }
92                    _ => unreachable!(),
93                };
94            }
95            MessageType::Exception => {
96                let _: ApplicationException = match Deserialize::read(&mut des) {
97                    Ok(v) => v,
98                    Err(_) => return Ok(None),
99                };
100            }
101            MessageType::Call | MessageType::Oneway | MessageType::InvalidMessageType => {}
102        }
103
104        match des.read_message_end() {
105            Ok(v) => v,
106            Err(_) => return Ok(None),
107        };
108        Ok(Some(des.into_inner().position() as usize))
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    #[test]
117    fn test_try_make_static_response_bytes() -> Result<(), Box<dyn std::error::Error>> {
118        let mut handler = GraphTransportResponseHandler;
119
120        assert_eq!(
121            handler.try_make_static_response_bytes(
122                b"GraphService",
123                b"GraphService.authenticate",
124                b"FOO"
125            )?,
126            None
127        );
128        assert_eq!(
129            handler.try_make_static_response_bytes(
130                b"GraphService",
131                b"GraphService.execute",
132                b"FOO"
133            )?,
134            None
135        );
136        match handler.try_make_static_response_bytes(b"GraphService", b"GraphService.foo", b"FOO") {
137            Ok(_) => panic!(),
138            Err(err) => {
139                assert_eq!(err.kind(), IoErrorKind::Other);
140
141                assert_eq!(err.to_string(), "Unknown method GraphService.foo");
142            }
143        }
144
145        Ok(())
146    }
147
148    #[test]
149    fn test_try_make_static_response_bytes_with_signout() -> Result<(), Box<dyn std::error::Error>>
150    {
151        let mut handler = GraphTransportResponseHandler;
152
153        //
154        // Ref https://github.com/bk-rs/nebula-rs/blob/e500e6f93b0ffcd009038c2a51b41a6aa3488b18/nebula-fbthrift/nebula-fbthrift-graph/src/lib.rs#L1634
155        //
156        let request = ::fbthrift::serialize!(::fbthrift::BinaryProtocol, |p| {
157            p.write_message_begin("signout", ::fbthrift::MessageType::Call, 0);
158
159            p.write_struct_begin("args");
160            p.write_field_begin("arg_sessionId", ::fbthrift::TType::I64, 1i16);
161            ::fbthrift::Serialize::write(&1, p);
162            p.write_field_end();
163            p.write_field_stop();
164            p.write_struct_end();
165
166            p.write_message_end();
167        });
168
169        match handler.try_make_static_response_bytes(
170            b"GraphService",
171            b"GraphService.signout",
172            &request[..],
173        ) {
174            Ok(Some(_)) => {}
175            _ => panic!(),
176        }
177
178        Ok(())
179    }
180}