1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use crate::common::{MessageHeader, MESSAGE_TYPE_RESPONSE};
use crate::error::{Error, Result};
use crate::ttrpc::{Request, Response};
use protobuf::Message;
pub fn response_to_channel(
stream_id: u32,
res: Response,
tx: std::sync::mpsc::Sender<(MessageHeader, Vec<u8>)>,
) -> Result<()> {
let mut buf = Vec::with_capacity(res.compute_size() as usize);
let mut s = protobuf::CodedOutputStream::vec(&mut buf);
res.write_to(&mut s).map_err(err_to_Others!(e, ""))?;
s.flush().map_err(err_to_Others!(e, ""))?;
let mh = MessageHeader {
length: buf.len() as u32,
stream_id,
type_: MESSAGE_TYPE_RESPONSE,
flags: 0,
};
tx.send((mh, buf)).map_err(err_to_Others!(e, ""))?;
Ok(())
}
#[macro_export]
macro_rules! request_handler {
($class: ident, $ctx: ident, $req: ident, $server: ident, $req_type: ident, $req_fn: ident) => {
let mut s = CodedInputStream::from_bytes(&$req.payload);
let mut req = super::$server::$req_type::new();
req.merge_from(&mut s)
.map_err(::ttrpc::Err_to_Others!(e, ""))?;
let mut res = ::ttrpc::Response::new();
match $class.service.$req_fn(&$ctx, req) {
Ok(rep) => {
res.set_status(::ttrpc::get_status(::ttrpc::Code::OK, "".to_string()));
res.payload.reserve(rep.compute_size() as usize);
let mut s = protobuf::CodedOutputStream::vec(&mut res.payload);
rep.write_to(&mut s)
.map_err(::ttrpc::Err_to_Others!(e, ""))?;
s.flush().map_err(::ttrpc::Err_to_Others!(e, ""))?;
}
Err(x) => match x {
::ttrpc::Error::RpcStatus(s) => {
res.set_status(s);
}
_ => {
res.set_status(::ttrpc::get_status(
::ttrpc::Code::UNKNOWN,
format!("{:?}", x),
));
}
},
}
::ttrpc::response_to_channel($ctx.mh.stream_id, res, $ctx.res_tx)?
};
}
#[macro_export]
macro_rules! client_request {
($self: ident, $req: ident, $timeout_nano: ident, $server: expr, $method: expr, $cres: ident) => {
let mut creq = ::ttrpc::Request::new();
creq.set_service($server.to_string());
creq.set_method($method.to_string());
creq.set_timeout_nano($timeout_nano);
creq.payload.reserve($req.compute_size() as usize);
let mut s = CodedOutputStream::vec(&mut creq.payload);
$req.write_to(&mut s)
.map_err(::ttrpc::Err_to_Others!(e, ""))?;
s.flush().map_err(::ttrpc::Err_to_Others!(e, ""))?;
let res = $self.client.request(creq)?;
let mut s = CodedInputStream::from_bytes(&res.payload);
$cres
.merge_from(&mut s)
.map_err(::ttrpc::Err_to_Others!(e, "Unpack get error "))?;
};
}
#[derive(Debug)]
pub struct TtrpcContext {
pub fd: std::os::unix::io::RawFd,
pub mh: MessageHeader,
pub res_tx: std::sync::mpsc::Sender<(MessageHeader, Vec<u8>)>,
}
pub trait MethodHandler {
fn handler(&self, ctx: TtrpcContext, req: Request) -> Result<()>;
}