hadoop_common/ipc/client/
mod.rs

1mod call;
2mod connection;
3mod connection_id;
4
5use super::{client_id::BYTE_LENGTH, AlignmentContext, ClientId, RpcKind, RpcProtocol};
6use crate::{conf::Configuration, fs::common_configuration_keys};
7use anyhow::Error;
8use atomic::Atomic;
9use call::Call;
10use connection::Connection;
11pub use connection_id::ConnectionId;
12use std::{
13    cell::RefCell,
14    io::{Read, Write},
15    net::TcpStream,
16    rc::Rc,
17    sync::{atomic::Ordering, Arc},
18};
19
20/// A counter for generating call IDs.
21static CALL_ID_COUNTER: Atomic<i32> = Atomic::new(0);
22
23thread_local! {
24    static CALL_ID: RefCell<Option<i32>> = RefCell::new(None);
25    static RETRY_COUNT: RefCell<Option<i32>> = RefCell::new(None);
26    static EXTERNAL_CALL_HANDLER: RefCell<Option<String>> = RefCell::new(None);
27}
28
29/// A client for an IPC service.  IPC calls take a single [`Writable`] as a
30/// parameter, and return a [`Writable`] as their value.  A service runs on
31/// a port and is defined by a parameter class and a value class.
32pub struct Client {
33    _value_class: String,
34    _conf: Configuration,
35    _connection_timeout: i32,
36    _fallback_allowed: bool,
37    _bind_to_wild_card_address: bool,
38    client_id: [u8; BYTE_LENGTH],
39    _max_async_calls: i32,
40}
41
42impl Client {
43    pub fn get_timeout(_conf: &Configuration) -> i32 {
44        // TODO
45        3000
46    }
47
48    fn _get_call_id() -> anyhow::Result<i32> {
49        Ok(CALL_ID
50            .try_with(|x| *x.borrow())?
51            .unwrap_or_else(|| Self::next_call_id()))
52    }
53
54    fn take_call_id() -> anyhow::Result<i32> {
55        Ok(CALL_ID
56            .try_with(|x| x.take())?
57            .unwrap_or_else(|| Self::next_call_id()))
58    }
59
60    fn get_retry_count() -> anyhow::Result<i32> {
61        Ok(RETRY_COUNT.try_with(|x| *x.borrow())?.unwrap_or_default())
62    }
63
64    fn get_external_handler() -> anyhow::Result<Option<String>> {
65        Ok(EXTERNAL_CALL_HANDLER.try_with(|x| (*x.borrow()).to_owned())?)
66    }
67
68    /// Get the ping interval from configuration;
69    /// If not set in the configuration, return the default value.
70    fn get_ping_interval(conf: &Configuration) -> anyhow::Result<i32> {
71        conf.get_int(
72            common_configuration_keys::IPC_PING_INTERVAL_KEY,
73            common_configuration_keys::IPC_PING_INTERVAL_DEFAULT,
74        )
75    }
76
77    fn create_call(&self, rpc_kind: &RpcKind, rpc_request: Rc<Vec<u8>>) -> anyhow::Result<Call> {
78        Call::new(rpc_kind, rpc_request)
79    }
80
81    pub fn new(value_class: &str, conf: &Configuration) -> anyhow::Result<Self> {
82        let connection_timeout = conf.get_int(
83            common_configuration_keys::IPC_CLIENT_CONNECT_TIMEOUT_KEY,
84            common_configuration_keys::IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT,
85        )?;
86        let fallback_allowed = conf.get_bool(
87            common_configuration_keys::IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
88            common_configuration_keys::IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT,
89        );
90        let bind_to_wild_card_address = conf.get_bool(
91            common_configuration_keys::IPC_CLIENT_BIND_WILDCARD_ADDR_KEY,
92            common_configuration_keys::IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT,
93        );
94        let max_async_calls = conf.get_int(
95            common_configuration_keys::IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
96            common_configuration_keys::IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT,
97        )?;
98        Ok(Self {
99            _value_class: value_class.to_owned(),
100            _conf: conf.to_owned(),
101            _connection_timeout: connection_timeout,
102            _fallback_allowed: fallback_allowed,
103            _bind_to_wild_card_address: bind_to_wild_card_address,
104            client_id: ClientId::get_client_id(),
105            _max_async_calls: max_async_calls,
106        })
107    }
108
109    /// Make a call, passing `rpc_request`, to the IPC server defined by
110    /// `remote_id`, returning the rpc response.
111    pub fn call<T: RpcProtocol>(
112        &self,
113        rpc_kind: &RpcKind,
114        rpc_request: Rc<Vec<u8>>,
115        remote_id: Rc<ConnectionId>,
116        service_class: u8,
117        fallback_to_simple_auth: Option<Arc<Atomic<bool>>>,
118        alignment_context: Option<Rc<dyn AlignmentContext>>,
119    ) -> anyhow::Result<Vec<u8>> {
120        // TODO: return Writable
121
122        let mut call = self.create_call(rpc_kind, rpc_request)?;
123        call.set_alignment_context(alignment_context);
124        let call = Rc::new(call);
125
126        let mut connection = self.get_connection::<T>(
127            remote_id,
128            Rc::clone(&call),
129            service_class,
130            fallback_to_simple_auth,
131        )?;
132
133        connection.send_rpc_request(Rc::clone(&call))?;
134
135        // TODO: support asynchronous mode
136
137        let res: Vec<u8> = self.get_rpc_response(Rc::clone(&call), &mut connection, 0)?;
138        Ok(res)
139    }
140
141    /// Get a connection from the pool, or create a new one and add it to the
142    /// pool.  Connections to a given ConnectionId are reused.
143    fn get_connection<'a, T: RpcProtocol>(
144        &'a self,
145        remote_id: Rc<ConnectionId>,
146        call: Rc<Call>,
147        service_class: u8,
148        fallback_to_simple_auth: Option<Arc<Atomic<bool>>>,
149    ) -> anyhow::Result<Connection<'a, T>> {
150        // TODO: connection pool
151        let mut connection = Connection::new(self, remote_id, service_class)?;
152
153        connection.add_call(call);
154
155        // If the server happens to be slow, the method below will take longer to
156        // establish a connection.
157        connection.setup_iostreams(fallback_to_simple_auth)?;
158        Ok(connection)
159    }
160
161    fn get_rpc_response<T: RpcProtocol>(
162        &self,
163        _call: Rc<Call>,
164        connection: &mut Connection<T>,
165        _timeout: i64,
166    ) -> anyhow::Result<Vec<u8>> {
167        // TODO: get rpc response from call
168        Ok(connection.ipc_streams.read_response()?)
169    }
170
171    /// Returns the next valid sequential call ID by incrementing an atomic counter
172    /// and masking off the sign bit.  Valid call IDs are non-negative integers in
173    /// the range [ 0, 2^31 - 1 ].  Negative numbers are reserved for special
174    /// purposes.  The values can overflow back to 0 and be reused.  Note that prior
175    /// versions of the client did not mask off the sign bit, so a server may still
176    /// see a negative call ID if it receives connections from an old client.
177    fn next_call_id() -> i32 {
178        CALL_ID_COUNTER.fetch_add(1, Ordering::SeqCst) & 0x7FFFFFFF
179    }
180}
181
182struct IpcStreams {
183    inner: TcpStream,
184    max_response_length: i32,
185    first_response: bool,
186}
187
188impl IpcStreams {
189    fn new(inner: TcpStream, max_response_length: i32) -> Self {
190        IpcStreams {
191            inner,
192            max_response_length,
193            first_response: true,
194        }
195    }
196
197    fn read_i32(&mut self) -> anyhow::Result<i32> {
198        let mut buf = [0; 4];
199        self.inner.read_exact(&mut buf)?;
200        Ok(i32::from_be_bytes(buf))
201    }
202
203    fn read_response(&mut self) -> anyhow::Result<Vec<u8>> {
204        let length = self.read_i32()?;
205        if self.first_response {
206            self.first_response = false;
207            if length == -1 {
208                self.read_i32()?;
209                return Err(Error::msg("Remote Exception"));
210            }
211        }
212        if length <= 0 {
213            return Err(Error::msg("RPC response has invalid length"));
214        }
215        if self.max_response_length > 0 && length > self.max_response_length {
216            return Err(Error::msg("RPC response exceeds maximum data length"));
217        }
218        let mut buf = vec![0; length as usize];
219        self.inner.read_exact(&mut buf)?;
220        Ok(buf)
221    }
222
223    fn send_request(&mut self, buf: &[u8]) -> anyhow::Result<usize> {
224        Ok(self.inner.write(buf)?)
225    }
226
227    fn _flush(&mut self) -> anyhow::Result<()> {
228        Ok(self.inner.flush()?)
229    }
230}