hadoop_common/util/
proto_util.rs1use crate::{
2 ipc::{client_id::BYTE_LENGTH, AlignmentContext, RpcKind},
3 security::UserGroupInformation,
4};
5use hadoop_proto::hadoop::common::{
6 rpc_request_header_proto::OperationProto, IpcConnectionContextProto, RpcKindProto,
7 RpcRequestHeaderProto, UserInformationProto,
8};
9use std::rc::Rc;
10
11pub struct ProtoUtil;
12
13impl ProtoUtil {
14 pub fn make_ipc_connection_context(
15 protocol: Option<&str>,
16 ugi: Option<&UserGroupInformation>,
17 auth_method: &str,
18 ) -> IpcConnectionContextProto {
19 let mut ugi_proto = UserInformationProto {
20 effective_user: None,
21 real_user: None,
22 };
23 if let Some(ugi) = ugi {
24 if auth_method == "kerberos" {
25 ugi_proto.effective_user = Some(ugi.get_user_name());
28 } else if auth_method == "token" {
29 } else {
32 ugi_proto.effective_user = Some(ugi.get_user_name());
36 }
37 }
38 IpcConnectionContextProto {
39 user_info: Some(ugi_proto),
40 protocol: protocol.map(|p| p.to_string()),
41 }
42 }
43
44 pub fn convert<S, T: From<S>>(value: S) -> T {
45 T::from(value)
46 }
47
48 pub fn make_rpc_request_header(
49 rpc_kind: &RpcKind,
50 operation: OperationProto,
51 call_id: i32,
52 retry_count: i32,
53 uuid: &[u8; BYTE_LENGTH],
54 alignment_context: Option<Rc<dyn AlignmentContext>>,
55 ) -> RpcRequestHeaderProto {
56 let result = RpcRequestHeaderProto {
57 rpc_kind: Some(RpcKindProto::from(rpc_kind).into()),
58 rpc_op: Some(operation.into()),
59 call_id,
60 client_id: uuid.to_vec(),
61 retry_count: Some(retry_count),
62 trace_info: None,
63 caller_context: None,
64 state_id: None,
65 router_federated_state: None,
66 };
67
68 if let Some(_alignment_context) = alignment_context {
70 }
72
73 result
74 }
75}
76
77impl From<&RpcKind> for RpcKindProto {
78 fn from(kind: &RpcKind) -> Self {
79 match kind {
80 RpcKind::RpcBuiltin => RpcKindProto::RpcBuiltin,
81 RpcKind::RpcWritable => RpcKindProto::RpcWritable,
82 RpcKind::RpcProtocolBuffer => RpcKindProto::RpcProtocolBuffer,
83 }
84 }
85}