hadoop_common/ipc/client/
mod.rs1mod call;
2mod connection;
3mod connection_id;
4
5use super::{client_id::BYTE_LENGTH, AlignmentContext, ClientId, RpcKind, RpcProtocol};
6use crate::{conf::Configuration, fs::common_configuration_keys};
7use anyhow::Error;
8use atomic::Atomic;
9use call::Call;
10use connection::Connection;
11pub use connection_id::ConnectionId;
12use std::{
13 cell::RefCell,
14 io::{Read, Write},
15 net::TcpStream,
16 rc::Rc,
17 sync::{atomic::Ordering, Arc},
18};
19
20static CALL_ID_COUNTER: Atomic<i32> = Atomic::new(0);
22
23thread_local! {
24 static CALL_ID: RefCell<Option<i32>> = RefCell::new(None);
25 static RETRY_COUNT: RefCell<Option<i32>> = RefCell::new(None);
26 static EXTERNAL_CALL_HANDLER: RefCell<Option<String>> = RefCell::new(None);
27}
28
29pub struct Client {
33 _value_class: String,
34 _conf: Configuration,
35 _connection_timeout: i32,
36 _fallback_allowed: bool,
37 _bind_to_wild_card_address: bool,
38 client_id: [u8; BYTE_LENGTH],
39 _max_async_calls: i32,
40}
41
42impl Client {
43 pub fn get_timeout(_conf: &Configuration) -> i32 {
44 3000
46 }
47
48 fn _get_call_id() -> anyhow::Result<i32> {
49 Ok(CALL_ID
50 .try_with(|x| *x.borrow())?
51 .unwrap_or_else(|| Self::next_call_id()))
52 }
53
54 fn take_call_id() -> anyhow::Result<i32> {
55 Ok(CALL_ID
56 .try_with(|x| x.take())?
57 .unwrap_or_else(|| Self::next_call_id()))
58 }
59
60 fn get_retry_count() -> anyhow::Result<i32> {
61 Ok(RETRY_COUNT.try_with(|x| *x.borrow())?.unwrap_or_default())
62 }
63
64 fn get_external_handler() -> anyhow::Result<Option<String>> {
65 Ok(EXTERNAL_CALL_HANDLER.try_with(|x| (*x.borrow()).to_owned())?)
66 }
67
68 fn get_ping_interval(conf: &Configuration) -> anyhow::Result<i32> {
71 conf.get_int(
72 common_configuration_keys::IPC_PING_INTERVAL_KEY,
73 common_configuration_keys::IPC_PING_INTERVAL_DEFAULT,
74 )
75 }
76
77 fn create_call(&self, rpc_kind: &RpcKind, rpc_request: Rc<Vec<u8>>) -> anyhow::Result<Call> {
78 Call::new(rpc_kind, rpc_request)
79 }
80
81 pub fn new(value_class: &str, conf: &Configuration) -> anyhow::Result<Self> {
82 let connection_timeout = conf.get_int(
83 common_configuration_keys::IPC_CLIENT_CONNECT_TIMEOUT_KEY,
84 common_configuration_keys::IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT,
85 )?;
86 let fallback_allowed = conf.get_bool(
87 common_configuration_keys::IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
88 common_configuration_keys::IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT,
89 );
90 let bind_to_wild_card_address = conf.get_bool(
91 common_configuration_keys::IPC_CLIENT_BIND_WILDCARD_ADDR_KEY,
92 common_configuration_keys::IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT,
93 );
94 let max_async_calls = conf.get_int(
95 common_configuration_keys::IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
96 common_configuration_keys::IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT,
97 )?;
98 Ok(Self {
99 _value_class: value_class.to_owned(),
100 _conf: conf.to_owned(),
101 _connection_timeout: connection_timeout,
102 _fallback_allowed: fallback_allowed,
103 _bind_to_wild_card_address: bind_to_wild_card_address,
104 client_id: ClientId::get_client_id(),
105 _max_async_calls: max_async_calls,
106 })
107 }
108
109 pub fn call<T: RpcProtocol>(
112 &self,
113 rpc_kind: &RpcKind,
114 rpc_request: Rc<Vec<u8>>,
115 remote_id: Rc<ConnectionId>,
116 service_class: u8,
117 fallback_to_simple_auth: Option<Arc<Atomic<bool>>>,
118 alignment_context: Option<Rc<dyn AlignmentContext>>,
119 ) -> anyhow::Result<Vec<u8>> {
120 let mut call = self.create_call(rpc_kind, rpc_request)?;
123 call.set_alignment_context(alignment_context);
124 let call = Rc::new(call);
125
126 let mut connection = self.get_connection::<T>(
127 remote_id,
128 Rc::clone(&call),
129 service_class,
130 fallback_to_simple_auth,
131 )?;
132
133 connection.send_rpc_request(Rc::clone(&call))?;
134
135 let res: Vec<u8> = self.get_rpc_response(Rc::clone(&call), &mut connection, 0)?;
138 Ok(res)
139 }
140
141 fn get_connection<'a, T: RpcProtocol>(
144 &'a self,
145 remote_id: Rc<ConnectionId>,
146 call: Rc<Call>,
147 service_class: u8,
148 fallback_to_simple_auth: Option<Arc<Atomic<bool>>>,
149 ) -> anyhow::Result<Connection<'a, T>> {
150 let mut connection = Connection::new(self, remote_id, service_class)?;
152
153 connection.add_call(call);
154
155 connection.setup_iostreams(fallback_to_simple_auth)?;
158 Ok(connection)
159 }
160
161 fn get_rpc_response<T: RpcProtocol>(
162 &self,
163 _call: Rc<Call>,
164 connection: &mut Connection<T>,
165 _timeout: i64,
166 ) -> anyhow::Result<Vec<u8>> {
167 Ok(connection.ipc_streams.read_response()?)
169 }
170
171 fn next_call_id() -> i32 {
178 CALL_ID_COUNTER.fetch_add(1, Ordering::SeqCst) & 0x7FFFFFFF
179 }
180}
181
182struct IpcStreams {
183 inner: TcpStream,
184 max_response_length: i32,
185 first_response: bool,
186}
187
188impl IpcStreams {
189 fn new(inner: TcpStream, max_response_length: i32) -> Self {
190 IpcStreams {
191 inner,
192 max_response_length,
193 first_response: true,
194 }
195 }
196
197 fn read_i32(&mut self) -> anyhow::Result<i32> {
198 let mut buf = [0; 4];
199 self.inner.read_exact(&mut buf)?;
200 Ok(i32::from_be_bytes(buf))
201 }
202
203 fn read_response(&mut self) -> anyhow::Result<Vec<u8>> {
204 let length = self.read_i32()?;
205 if self.first_response {
206 self.first_response = false;
207 if length == -1 {
208 self.read_i32()?;
209 return Err(Error::msg("Remote Exception"));
210 }
211 }
212 if length <= 0 {
213 return Err(Error::msg("RPC response has invalid length"));
214 }
215 if self.max_response_length > 0 && length > self.max_response_length {
216 return Err(Error::msg("RPC response exceeds maximum data length"));
217 }
218 let mut buf = vec![0; length as usize];
219 self.inner.read_exact(&mut buf)?;
220 Ok(buf)
221 }
222
223 fn send_request(&mut self, buf: &[u8]) -> anyhow::Result<usize> {
224 Ok(self.inner.write(buf)?)
225 }
226
227 fn _flush(&mut self) -> anyhow::Result<()> {
228 Ok(self.inner.flush()?)
229 }
230}