hadoop_common/ipc/client/
connection_id.rs

1use super::Client;
2use crate::{
3    conf::Configuration,
4    fs::{common_configuration_keys, common_configuration_keys_public},
5    io::retry::RetryPolicy,
6    security::UserGroupInformation,
7};
8use anyhow::Error;
9use std::{net::SocketAddr, rc::Rc};
10
11/// This class holds the address and the user ticket. The client connections
12/// to servers are uniquely identified by `<remoteAddress, protocol, ticket>`
13pub struct ConnectionId {
14    address: SocketAddr,
15    ticket: UserGroupInformation,
16    rpc_timeout: i32,
17    // connections will be culled if it was idle for maxIdleTime msecs
18    max_idle_time: i32,
19    connection_retry_policy: Option<Rc<dyn RetryPolicy>>,
20    max_retries_on_sasl: i32,
21    // the max. no. of retries for socket connections on time out exceptions
22    max_retries_on_socket_timeouts: i32,
23    // if T then disable Nagle's Algorithm
24    tcp_no_delay: bool,
25    // if T then use low-delay QoS
26    tcp_low_latency: bool,
27    // do we need to send ping message
28    do_ping: bool,
29    // how often sends ping to the server in msecs
30    ping_interval: i32,
31    // used to get the expected kerberos principal name
32    _conf: Configuration,
33}
34
35impl ConnectionId {
36    fn new(
37        address: &SocketAddr,
38        ticket: &UserGroupInformation,
39        rpc_timeout: i32,
40        connection_retry_policy: Option<Rc<dyn RetryPolicy>>,
41        conf: &Configuration,
42    ) -> anyhow::Result<Self> {
43        let max_idle_time = conf.get_int(
44            common_configuration_keys_public::IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
45            common_configuration_keys_public::IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT,
46        )?;
47        let max_retries_on_sasl = conf.get_int(
48            common_configuration_keys::IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
49            common_configuration_keys::IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT,
50        )?;
51        let max_retries_on_socket_timeouts = conf.get_int(
52            common_configuration_keys_public::IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
53            common_configuration_keys_public::IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT,
54        )?;
55        let tcp_no_delay = conf.get_bool(
56            common_configuration_keys_public::IPC_CLIENT_TCPNODELAY_KEY,
57            common_configuration_keys_public::IPC_CLIENT_TCPNODELAY_DEFAULT,
58        );
59        let tcp_low_latency = conf.get_bool(
60            common_configuration_keys_public::IPC_CLIENT_LOW_LATENCY,
61            common_configuration_keys_public::IPC_CLIENT_LOW_LATENCY_DEFAULT,
62        );
63        let do_ping = conf.get_bool(
64            common_configuration_keys::IPC_CLIENT_PING_KEY,
65            common_configuration_keys::IPC_CLIENT_PING_DEFAULT,
66        );
67        Ok(Self {
68            address: address.to_owned(),
69            ticket: ticket.to_owned(),
70            rpc_timeout,
71            max_idle_time,
72            connection_retry_policy,
73            max_retries_on_sasl,
74            max_retries_on_socket_timeouts,
75            tcp_no_delay,
76            tcp_low_latency,
77            do_ping,
78            ping_interval: if do_ping {
79                Client::get_ping_interval(conf)?
80            } else {
81                0
82            },
83            _conf: conf.to_owned(),
84        })
85    }
86
87    pub fn get_address(&self) -> &SocketAddr {
88        &self.address
89    }
90
91    /// This is used to update the remote address when an address change is detected.
92    fn _set_address(&mut self, address: &SocketAddr) -> anyhow::Result<()> {
93        // TODO: compare hostname
94        if self.address.port() != address.port() {
95            return Err(Error::msg(format!(
96                "Port must match: {} vs {}",
97                self.address, address
98            )));
99        }
100        self.address = address.to_owned();
101        Ok(())
102    }
103
104    pub fn get_ticket(&self) -> &UserGroupInformation {
105        &self.ticket
106    }
107
108    pub fn get_rpc_timeout(&self) -> i32 {
109        self.rpc_timeout
110    }
111
112    pub fn get_max_idle_time(&self) -> i32 {
113        self.max_idle_time
114    }
115
116    pub fn get_max_retries_on_sasl(&self) -> i32 {
117        self.max_retries_on_sasl
118    }
119
120    pub fn get_max_retries_on_socket_timeouts(&self) -> i32 {
121        self.max_retries_on_socket_timeouts
122    }
123
124    pub fn get_tcp_no_delay(&self) -> bool {
125        self.tcp_no_delay
126    }
127
128    pub fn get_tcp_low_latency(&self) -> bool {
129        self.tcp_low_latency
130    }
131
132    pub fn get_do_ping(&self) -> bool {
133        self.do_ping
134    }
135
136    pub fn get_ping_interval(&self) -> i32 {
137        self.ping_interval
138    }
139
140    pub fn get_retry_policy(&self) -> Option<Rc<dyn RetryPolicy>> {
141        self.connection_retry_policy.as_ref().map(Rc::clone)
142    }
143
144    /// Returns a ConnectionId object.
145    pub fn get_connection_id(
146        addr: &SocketAddr,
147        ticket: &UserGroupInformation,
148        rpc_timeout: i32,
149        connection_retry_policy: Option<Rc<dyn RetryPolicy>>,
150        conf: &Configuration,
151    ) -> anyhow::Result<Self> {
152        // TODO: set connection_retry_policy if not yet
153
154        Self::new(addr, ticket, rpc_timeout, connection_retry_policy, conf)
155    }
156}