hadoop_common/ipc/client/
connection_id.rs1use super::Client;
2use crate::{
3 conf::Configuration,
4 fs::{common_configuration_keys, common_configuration_keys_public},
5 io::retry::RetryPolicy,
6 security::UserGroupInformation,
7};
8use anyhow::Error;
9use std::{net::SocketAddr, rc::Rc};
10
11pub struct ConnectionId {
14 address: SocketAddr,
15 ticket: UserGroupInformation,
16 rpc_timeout: i32,
17 max_idle_time: i32,
19 connection_retry_policy: Option<Rc<dyn RetryPolicy>>,
20 max_retries_on_sasl: i32,
21 max_retries_on_socket_timeouts: i32,
23 tcp_no_delay: bool,
25 tcp_low_latency: bool,
27 do_ping: bool,
29 ping_interval: i32,
31 _conf: Configuration,
33}
34
35impl ConnectionId {
36 fn new(
37 address: &SocketAddr,
38 ticket: &UserGroupInformation,
39 rpc_timeout: i32,
40 connection_retry_policy: Option<Rc<dyn RetryPolicy>>,
41 conf: &Configuration,
42 ) -> anyhow::Result<Self> {
43 let max_idle_time = conf.get_int(
44 common_configuration_keys_public::IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
45 common_configuration_keys_public::IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT,
46 )?;
47 let max_retries_on_sasl = conf.get_int(
48 common_configuration_keys::IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
49 common_configuration_keys::IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT,
50 )?;
51 let max_retries_on_socket_timeouts = conf.get_int(
52 common_configuration_keys_public::IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
53 common_configuration_keys_public::IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT,
54 )?;
55 let tcp_no_delay = conf.get_bool(
56 common_configuration_keys_public::IPC_CLIENT_TCPNODELAY_KEY,
57 common_configuration_keys_public::IPC_CLIENT_TCPNODELAY_DEFAULT,
58 );
59 let tcp_low_latency = conf.get_bool(
60 common_configuration_keys_public::IPC_CLIENT_LOW_LATENCY,
61 common_configuration_keys_public::IPC_CLIENT_LOW_LATENCY_DEFAULT,
62 );
63 let do_ping = conf.get_bool(
64 common_configuration_keys::IPC_CLIENT_PING_KEY,
65 common_configuration_keys::IPC_CLIENT_PING_DEFAULT,
66 );
67 Ok(Self {
68 address: address.to_owned(),
69 ticket: ticket.to_owned(),
70 rpc_timeout,
71 max_idle_time,
72 connection_retry_policy,
73 max_retries_on_sasl,
74 max_retries_on_socket_timeouts,
75 tcp_no_delay,
76 tcp_low_latency,
77 do_ping,
78 ping_interval: if do_ping {
79 Client::get_ping_interval(conf)?
80 } else {
81 0
82 },
83 _conf: conf.to_owned(),
84 })
85 }
86
87 pub fn get_address(&self) -> &SocketAddr {
88 &self.address
89 }
90
91 fn _set_address(&mut self, address: &SocketAddr) -> anyhow::Result<()> {
93 if self.address.port() != address.port() {
95 return Err(Error::msg(format!(
96 "Port must match: {} vs {}",
97 self.address, address
98 )));
99 }
100 self.address = address.to_owned();
101 Ok(())
102 }
103
104 pub fn get_ticket(&self) -> &UserGroupInformation {
105 &self.ticket
106 }
107
108 pub fn get_rpc_timeout(&self) -> i32 {
109 self.rpc_timeout
110 }
111
112 pub fn get_max_idle_time(&self) -> i32 {
113 self.max_idle_time
114 }
115
116 pub fn get_max_retries_on_sasl(&self) -> i32 {
117 self.max_retries_on_sasl
118 }
119
120 pub fn get_max_retries_on_socket_timeouts(&self) -> i32 {
121 self.max_retries_on_socket_timeouts
122 }
123
124 pub fn get_tcp_no_delay(&self) -> bool {
125 self.tcp_no_delay
126 }
127
128 pub fn get_tcp_low_latency(&self) -> bool {
129 self.tcp_low_latency
130 }
131
132 pub fn get_do_ping(&self) -> bool {
133 self.do_ping
134 }
135
136 pub fn get_ping_interval(&self) -> i32 {
137 self.ping_interval
138 }
139
140 pub fn get_retry_policy(&self) -> Option<Rc<dyn RetryPolicy>> {
141 self.connection_retry_policy.as_ref().map(Rc::clone)
142 }
143
144 pub fn get_connection_id(
146 addr: &SocketAddr,
147 ticket: &UserGroupInformation,
148 rpc_timeout: i32,
149 connection_retry_policy: Option<Rc<dyn RetryPolicy>>,
150 conf: &Configuration,
151 ) -> anyhow::Result<Self> {
152 Self::new(addr, ticket, rpc_timeout, connection_retry_policy, conf)
155 }
156}