hadoop_common/util/
proto_util.rs

1use crate::{
2    ipc::{client_id::BYTE_LENGTH, AlignmentContext, RpcKind},
3    security::UserGroupInformation,
4};
5use hadoop_proto::hadoop::common::{
6    rpc_request_header_proto::OperationProto, IpcConnectionContextProto, RpcKindProto,
7    RpcRequestHeaderProto, UserInformationProto,
8};
9use std::rc::Rc;
10
11pub struct ProtoUtil;
12
13impl ProtoUtil {
14    pub fn make_ipc_connection_context(
15        protocol: Option<&str>,
16        ugi: Option<&UserGroupInformation>,
17        auth_method: &str,
18    ) -> IpcConnectionContextProto {
19        let mut ugi_proto = UserInformationProto {
20            effective_user: None,
21            real_user: None,
22        };
23        if let Some(ugi) = ugi {
24            if auth_method == "kerberos" {
25                // Real user was established as part of the connection.
26                // Send effective user only.
27                ugi_proto.effective_user = Some(ugi.get_user_name());
28            } else if auth_method == "token" {
29                // With token, the connection itself establishes
30                // both real and effective user. Hence send none in header.
31            } else {
32                // Simple authentication
33                // No user info is established as part of the connection.
34                // Send both effective user and real user
35                ugi_proto.effective_user = Some(ugi.get_user_name());
36            }
37        }
38        IpcConnectionContextProto {
39            user_info: Some(ugi_proto),
40            protocol: protocol.map(|p| p.to_string()),
41        }
42    }
43
44    pub fn convert<S, T: From<S>>(value: S) -> T {
45        T::from(value)
46    }
47
48    pub fn make_rpc_request_header(
49        rpc_kind: &RpcKind,
50        operation: OperationProto,
51        call_id: i32,
52        retry_count: i32,
53        uuid: &[u8; BYTE_LENGTH],
54        alignment_context: Option<Rc<dyn AlignmentContext>>,
55    ) -> RpcRequestHeaderProto {
56        let result = RpcRequestHeaderProto {
57            rpc_kind: Some(RpcKindProto::from(rpc_kind).into()),
58            rpc_op: Some(operation.into()),
59            call_id,
60            client_id: uuid.to_vec(),
61            retry_count: Some(retry_count),
62            trace_info: None,
63            caller_context: None,
64            state_id: None,
65            router_federated_state: None,
66        };
67
68        // Add alignment context if it is not null
69        if let Some(_alignment_context) = alignment_context {
70            // TODO
71        }
72
73        result
74    }
75}
76
77impl From<&RpcKind> for RpcKindProto {
78    fn from(kind: &RpcKind) -> Self {
79        match kind {
80            RpcKind::RpcBuiltin => RpcKindProto::RpcBuiltin,
81            RpcKind::RpcWritable => RpcKindProto::RpcWritable,
82            RpcKind::RpcProtocolBuffer => RpcKindProto::RpcProtocolBuffer,
83        }
84    }
85}