1use std::io::{Read, Write};
2
3use hdfs_types::common::*;
4use hdfs_types::hdfs::*;
5use prost::{
6 bytes::{BufMut, BytesMut},
7 EncodeError, Message,
8};
9
10use crate::HDFSError;
11
12const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
13
14fn into_err(error: RpcResponseHeaderProto) -> Result<RpcResponseHeaderProto, HDFSError> {
15 if error.status() != rpc_response_header_proto::RpcStatusProto::Success {
16 Err(HDFSError::NameNodeError(Box::new(error)))
17 } else {
18 Ok(error)
19 }
20}
21
22#[derive(Debug, Clone)]
23pub struct Handshake {
24 pub version: u8,
25 pub server_class: u8,
26 pub auth_protocol: u8,
27}
28
29impl Default for Handshake {
30 fn default() -> Self {
31 Self {
32 version: 9,
33 server_class: 0,
34 auth_protocol: 0,
35 }
36 }
37}
38
39impl Handshake {
40 pub fn encode<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
41 buf.put_slice(b"hrpc");
42 buf.put_u8(self.version);
43 buf.put_u8(self.server_class);
44 buf.put_u8(self.auth_protocol);
45 Ok(())
46 }
47}
48
49pub struct HRpc<S> {
51 pub(crate) stream: S,
52 pub(crate) call_id: i32,
53 pub(crate) client_id: Vec<u8>,
54 pub(crate) context: Option<RpcCallerContextProto>,
55}
56
57impl<S: Write + Read> HRpc<S> {
58 pub fn connect(
59 mut stream: S,
60 effective_user: impl Into<Option<String>>,
61 real_user: impl Into<Option<String>>,
62 context: impl Into<Option<RpcCallerContextProto>>,
63 handshake: impl Into<Option<Handshake>>,
64 ) -> Result<Self, HDFSError> {
65 let client_id = uuid::Uuid::new_v4().to_bytes_le().to_vec();
66 let context = context.into();
67 let mut buf = BytesMut::new();
68 let handshake = handshake.into().unwrap_or_default();
69 handshake.encode(&mut buf)?;
70 let req_header = RpcRequestHeaderProto {
71 rpc_kind: Some(RpcKindProto::RpcProtocolBuffer as i32),
72 rpc_op: Some(rpc_request_header_proto::OperationProto::RpcFinalPacket as i32),
73 call_id: -3,
74 client_id: client_id.clone(),
75 retry_count: Some(-1),
76 trace_info: None,
77 caller_context: context.clone(),
78 state_id: None,
79 router_federated_state: None,
80 };
81 let ipc_req = IpcConnectionContextProto {
82 protocol: Some(PROTOCOL.into()),
83 user_info: Some(UserInformationProto {
84 effective_user: effective_user.into(),
85 real_user: real_user.into(),
86 }),
87 };
88 let ori = buf.len();
89 buf.put_u32(0);
90 req_header.encode_length_delimited(&mut buf).unwrap();
91 ipc_req.encode_length_delimited(&mut buf).unwrap();
92 let length = buf.len() - ori - 4;
93 buf[ori..(ori + 4)].copy_from_slice(&(length as u32).to_be_bytes());
94 stream.write_all(&buf)?;
95 stream.flush()?;
96 Ok(Self {
97 stream,
98 call_id: Default::default(),
99 client_id,
100 context,
101 })
102 }
103
104 pub fn send_raw_req(
105 &mut self,
106 method_name: &str,
107 req: &[u8],
108 ) -> Result<(RpcResponseHeaderProto, BytesMut), HDFSError> {
109 let call_id = self.call_id;
110 let rpc_req_header = RpcRequestHeaderProto {
111 rpc_kind: Some(RpcKindProto::RpcProtocolBuffer as i32),
112 rpc_op: Some(rpc_request_header_proto::OperationProto::RpcFinalPacket as i32),
113 call_id,
114 client_id: self.client_id.clone(),
115 retry_count: Some(0),
116 trace_info: None,
117 caller_context: self.context.clone(),
118 state_id: None,
119 router_federated_state: None,
120 };
121 let rpc_req_bytes = rpc_req_header.encode_length_delimited_to_vec();
122 self.call_id += 1;
123 let req_header = RequestHeaderProto {
124 method_name: method_name.into(),
125 declaring_class_protocol_name: PROTOCOL.into(),
126 client_protocol_version: 1,
127 };
128 let header_bytes = req_header.encode_length_delimited_to_vec();
129 let total = rpc_req_bytes.len() + header_bytes.len() + req.len();
130 let mut buf = BytesMut::with_capacity(total + 4);
131 buf.put_u32(total as u32);
132 buf.extend_from_slice(&rpc_req_bytes);
133 buf.extend_from_slice(&header_bytes);
134 buf.extend_from_slice(req);
135 self.stream.write_all(&buf)?;
136 self.stream.flush()?;
137 self.read_resp()
138 }
139
140 pub(crate) fn read_resp(&mut self) -> Result<(RpcResponseHeaderProto, BytesMut), HDFSError> {
141 let mut raw_length = [0u8; 4];
142 self.stream.read_exact(&mut raw_length)?;
143 let length = u32::from_be_bytes(raw_length) as usize;
144 let mut buf = BytesMut::with_capacity(length);
145 buf.resize(length, 0);
146 self.stream.read_exact(&mut buf)?;
147 let header_length = prost::encoding::decode_varint(&mut buf)?;
148 let header_bytes = buf.split_to(header_length as usize);
149 let resp_header = RpcResponseHeaderProto::decode(header_bytes)?;
150 let resp_header = into_err(resp_header)?;
152 Ok((resp_header, buf))
153 }
154}
155
156macro_rules! method {
157 ($name:ident, $raw:literal, $req:ty, $resp:ty) => {
158 pub fn $name(&mut self, req: $req) -> Result<(RpcResponseHeaderProto, $resp), HDFSError> {
159 #[cfg(feature = "trace_dbg")]
160 {
161 tracing::trace!(target: "hrpc", "\nmethod: {}\nreq: {:#?}", $raw, req);
162 }
163 #[cfg(feature = "trace_valuable")]
164 {
165 use valuable::Valuable;
166 tracing::trace!(target: "hrpc", method=$raw, req=req.as_value());
167 }
168 let req = req.encode_length_delimited_to_vec();
169 let (header, resp) = self.send_raw_req($raw, &req)?;
170 let resp = <$resp>::decode_length_delimited(resp)?;
171 #[cfg(feature = "trace_dbg")]
172 {
173 tracing::trace!(target: "hrpc", "\nmethod: {}\nheader: {:#?}\nresp: {:#?}", $raw, header, resp);
174 }
175 #[cfg(feature = "trace_valuable")]
176 {
177 use valuable::Valuable;
178 tracing::trace!(target: "hrpc", method=$raw, header=header.as_value(), resp=resp.as_value() );
179 }
180 Ok((header, resp))
181 }
182 };
183
184 ($ipc:ty => $($name:ident, $raw:literal, $req:ty, $resp:ty);+;) => {
185 impl <S: std::io::Write + std::io::Read> $ipc {
186 $(method!($name, $raw, $req, $resp);)+
187 }
188 }
189}
190
191method! { HRpc<S> =>
193 get_replica_visible_length, "getReplicaVisibleLength", GetReplicaVisibleLengthRequestProto, GetReplicaVisibleLengthResponseProto;
194 refresh_namenodes, "refreshNamenodes", RefreshNamenodesRequestProto, RefreshNamenodesResponseProto;
195 delete_block_pool, "deleteBlockPool", DeleteBlockPoolRequestProto, DeleteBlockPoolResponseProto;
196 get_block_local_path_info, "getBlockLocalPathInfo", GetBlockLocalPathInfoRequestProto, GetBlockLocalPathInfoResponseProto;
197 shutdown_datanode, "shutdownDatanode", ShutdownDatanodeRequestProto, ShutdownDatanodeResponseProto;
198 evict_writers, "evictWriters", EvictWritersRequestProto, EvictWritersResponseProto;
199 get_datanode_info, "getDatanodeInfo", GetDatanodeInfoRequestProto, GetDatanodeInfoResponseProto;
200 get_volume_report, "getVolumeReport", GetVolumeReportRequestProto, GetVolumeReportResponseProto;
201 start_reconfiguration, "startReconfiguration", StartReconfigurationRequestProto, StartReconfigurationResponseProto;
202 trigger_block_report, "triggerBlockReport", TriggerBlockReportRequestProto, TriggerBlockReportResponseProto;
203 get_balancer_bandwidth, "getBalancerBandwidth", GetBalancerBandwidthRequestProto, GetBalancerBandwidthResponseProto;
204 get_reconfiguration_status, "getReconfigurationStatus", GetReconfigurationStatusRequestProto, GetReconfigurationStatusResponseProto;
205 get_disk_balancer_setting, "getDiskBalancerSetting", DiskBalancerSettingRequestProto, DiskBalancerSettingResponseProto;
206}
207
208method! { HRpc<S> =>
210 create, "create", CreateRequestProto, CreateResponseProto;
211 get_server_defaults, "getServerDefaults", GetServerDefaultsRequestProto,GetServerDefaultsResponseProto;
212 get_block_locations, "getBlockLocations", GetBlockLocationsRequestProto, GetBlockLocationsResponseProto;
213 append, "append", AppendRequestProto, AppendResponseProto;
214 set_replication, "setReplication", SetReplicationRequestProto, SetReplicationResponseProto;
215 set_storage_policy, "setStoragePolicy", SetStoragePolicyRequestProto, SetStoragePolicyResponseProto;
216 unset_storage_policy, "unsetStoragePolicy", UnsetStoragePolicyRequestProto, UnsetStoragePolicyResponseProto;
217 get_storage_policy, "getStoragePolicy", GetStoragePolicyRequestProto, GetStoragePolicyResponseProto;
218 get_storage_policies, "getStoragePolicies", GetStoragePoliciesRequestProto, GetStoragePoliciesResponseProto;
219 set_permission, "setPermission", SetPermissionRequestProto, SetPermissionResponseProto;
220 set_owner, "setOwner", SetOwnerRequestProto, SetOwnerResponseProto;
221 abandon_block, "abandonBlock", AbandonBlockRequestProto, AbandonBlockResponseProto;
222 add_block, "addBlock", AddBlockRequestProto, AddBlockResponseProto;
223 get_additional_datanode, "getAdditionalDatanode", GetAdditionalDatanodeRequestProto, GetAdditionalDatanodeResponseProto;
224 complete, "complete", CompleteRequestProto, CompleteResponseProto;
225 report_bad_blocks, "reportBadBlocks", ReportBadBlocksRequestProto, ReportBadBlocksResponseProto;
226 concat, "concat", ConcatRequestProto, ConcatResponseProto;
227 truncate, "truncate", TruncateRequestProto, TruncateResponseProto;
228 rename, "rename", RenameRequestProto, RenameResponseProto;
229 rename2, "rename2", Rename2RequestProto, Rename2ResponseProto;
230 delete, "delete", DeleteRequestProto, DeleteResponseProto;
231 mkdirs, "mkdirs", MkdirsRequestProto, MkdirsResponseProto;
232 get_listing, "getListing", GetListingRequestProto, GetListingResponseProto;
233 renew_lease, "renewLease", RenewLeaseRequestProto, RenewLeaseResponseProto;
234 recover_lease, "recoverLease", RecoverLeaseRequestProto, RecoverLeaseResponseProto;
235 get_fs_stats, "getFsStats", GetFsStatusRequestProto, GetFsStatsResponseProto;
236 get_datanode_report, "getDatanodeReport", GetDatanodeReportRequestProto, GetDatanodeReportResponseProto;
237 get_preferred_block_size, "getPreferredBlockSize", GetPreferredBlockSizeRequestProto, GetPreferredBlockSizeResponseProto;
238 set_safe_mode, "setSafeMode", SetSafeModeRequestProto, SetSafeModeResponseProto;
239 save_namespace, "saveNamespace", SaveNamespaceRequestProto, SaveNamespaceResponseProto;
240 roll_edits, "rollEdits", RollEditsRequestProto, RollEditsResponseProto;
241 restore_failed_storage, "restoreFailedStorage", RestoreFailedStorageRequestProto, RestoreFailedStorageResponseProto;
242 refresh_nodes, "refreshNodes", RefreshNodesRequestProto, RefreshNodesResponseProto;
243 finalize_upgrade, "finalizeUpgrade", FinalizeUpgradeRequestProto, FinalizeUpgradeResponseProto;
244 upgrade_status, "upgradeStatus", UpgradeStatusRequestProto, UpgradeStatusResponseProto;
245 rolling_upgrade, "rollingUpgrade", RollingUpgradeRequestProto, RollingUpgradeResponseProto;
246 list_corrupt_file_blocks, "listCorruptFileBlocks", ListCorruptFileBlocksRequestProto, ListCorruptFileBlocksResponseProto;
247 meta_save, "metaSave", MetaSaveRequestProto, MetaSaveResponseProto;
248 get_file_info, "getFileInfo", GetFileInfoRequestProto, GetFileInfoResponseProto;
249 get_located_file_info, "getLocatedFileInfo", GetLocatedFileInfoRequestProto, GetLocatedFileInfoResponseProto;
250 add_cache_pool, "addCachePool", AddCachePoolRequestProto, AddCachePoolResponseProto;
251 modify_cache_pool, "modifyCachePool", ModifyCachePoolRequestProto, ModifyCachePoolResponseProto;
252 remove_cache_pool, "removeCachePool", RemoveCachePoolRequestProto, RemoveCachePoolResponseProto;
253 list_cache_pools, "listCachePools", ListCachePoolsRequestProto, ListCachePoolsResponseProto;
254 get_file_link_info, "getFileLinkInfo", GetFileLinkInfoRequestProto, GetFileLinkInfoResponseProto;
255 get_content_summary, "getContentSummary", GetContentSummaryRequestProto, GetContentSummaryResponseProto;
256 set_quota, "setQuota", SetQuotaRequestProto, SetQuotaResponseProto;
257 fsync, "fsync", FsyncRequestProto, FsyncResponseProto;
258 set_times, "setTimes", SetTimesRequestProto, SetTimesResponseProto;
259 create_symlink, "createSymlink", CreateSymlinkRequestProto, CreateSymlinkResponseProto;
260 get_link_target, "getLinkTarget", GetLinkTargetRequestProto, GetLinkTargetResponseProto;
261 update_block_for_pipeline, "updateBlockForPipeline", UpdateBlockForPipelineRequestProto, UpdateBlockForPipelineResponseProto;
262 update_pipeline, "updatePipeline", UpdatePipelineRequestProto, UpdatePipelineResponseProto;
263 set_balancer_bandwidth, "setBalancerBandwidth", SetBalancerBandwidthRequestProto, SetBalancerBandwidthResponseProto;
264 get_data_encryption_key, "getDataEncryptionKey", GetDataEncryptionKeyRequestProto, GetDataEncryptionKeyResponseProto;
265 create_snapshot, "createSnapshot", CreateSnapshotRequestProto, CreateSnapshotResponseProto;
266 rename_snapshot, "renameSnapshot", RenameSnapshotRequestProto, RenameSnapshotResponseProto;
267 allow_snapshot, "allowSnapshot", AllowSnapshotRequestProto, AllowSnapshotResponseProto;
268 disallow_snapshot, "disallowSnapshot", DisallowSnapshotRequestProto, DisallowSnapshotResponseProto;
269 get_snapshot_listing, "getSnapshotListing", GetSnapshotListingRequestProto, GetSnapshotListingResponseProto;
270 delete_snapshot, "deleteSnapshot", DeleteSnapshotRequestProto, DeleteSnapshotResponseProto;
271 get_snapshot_diff_report, "getSnapshotDiffReport", GetSnapshotDiffReportRequestProto, GetSnapshotDiffReportResponseProto;
272 is_file_closed, "isFileClosed", IsFileClosedRequestProto, IsFileClosedResponseProto;
273 modify_acl_entries, "modifyAclEntries", ModifyAclEntriesRequestProto, ModifyAclEntriesResponseProto;
274 remove_acl_entries, "removeAclEntries", RemoveAclEntriesRequestProto, RemoveAclEntriesResponseProto;
275 remove_default_acl, "removeDefaultAcl", RemoveDefaultAclRequestProto, RemoveDefaultAclResponseProto;
276 remove_acl, "removeAcl", RemoveAclRequestProto, RemoveAclResponseProto;
277 set_acl, "setAcl", SetAclRequestProto, SetAclResponseProto;
278 get_acl_status, "getAclStatus", GetAclStatusRequestProto, GetAclStatusResponseProto;
279 set_x_attr, "setXAttr", SetXAttrRequestProto, SetXAttrResponseProto;
280 get_x_attrs, "getXAttrs", GetXAttrsRequestProto, GetXAttrsResponseProto;
281 list_x_attrs, "listXAttrs", ListXAttrsRequestProto, ListXAttrsResponseProto;
282 remove_x_attr, "removeXAttr", RemoveXAttrRequestProto, RemoveXAttrResponseProto;
283 check_access, "checkAccess", CheckAccessRequestProto, CheckAccessResponseProto;
284 create_encryption_zone, "createEncryptionZone", CreateEncryptionZoneRequestProto, CreateEncryptionZoneResponseProto;
285 list_encryption_zones, "listEncryptionZones", ListEncryptionZonesRequestProto, ListEncryptionZonesResponseProto;
286 reencrypt_encryption_zone, "reencryptEncryptionZone", ReencryptEncryptionZoneRequestProto, ReencryptEncryptionZoneResponseProto;
287 list_reencryption_status, "listReencryptionStatus", ListReencryptionStatusRequestProto, ListReencryptionStatusResponseProto;
288 get_e_z_for_path, "getEZForPath", GetEzForPathRequestProto, GetEzForPathResponseProto;
289 set_erasure_coding_policy, "setErasureCodingPolicy", SetErasureCodingPolicyRequestProto, SetErasureCodingPolicyResponseProto;
290 get_current_edit_log_txid, "getCurrentEditLogTxid", GetCurrentEditLogTxidRequestProto, GetCurrentEditLogTxidResponseProto;
291 get_edits_from_txid, "getEditsFromTxid", GetEditsFromTxidRequestProto, GetEditsFromTxidResponseProto;
292 get_erasure_coding_policy, "getErasureCodingPolicy", GetErasureCodingPolicyRequestProto, GetErasureCodingPolicyResponseProto;
293 get_erasure_coding_codecs, "getErasureCodingCodecs", GetErasureCodingCodecsRequestProto, GetErasureCodingCodecsResponseProto;
294 get_quota_usage, "getQuotaUsage", GetQuotaUsageRequestProto, GetQuotaUsageResponseProto;
295 list_open_files, "listOpenFiles", ListOpenFilesRequestProto, ListOpenFilesResponseProto;
296 msync, "msync", MsyncRequestProto, MsyncResponseProto;
297 satisfy_storage_policy, "satisfyStoragePolicy", SatisfyStoragePolicyRequestProto, SatisfyStoragePolicyResponseProto;
298 get_ha_service_state, "getHAServiceState", HaServiceStateRequestProto, HaServiceStateResponseProto;
299 get_datanode_storage_report, "getDatanodeStorageReport", GetDatanodeStorageReportRequestProto, GetDatanodeStorageReportResponseProto;
300 get_snapshottable_dir_listing, "getSnapshottableDirListing", GetSnapshottableDirListingRequestProto, GetSnapshottableDirListingResponseProto;
301 get_snapshot_diff_report_listing, "getSnapshotDiffReportListing", GetSnapshotDiffReportListingRequestProto, GetSnapshotDiffReportListingResponseProto;
302 unset_erasure_coding_policy, "unsetErasureCodingPolicy", UnsetErasureCodingPolicyRequestProto, UnsetErasureCodingPolicyResponseProto;
303 get_e_c_topology_result_for_policies, "getECTopologyResultForPolicies", GetEcTopologyResultForPoliciesRequestProto, GetEcTopologyResultForPoliciesResponseProto;
304 get_erasure_coding_policies, "getErasureCodingPolicies", GetErasureCodingPoliciesRequestProto, GetErasureCodingPoliciesResponseProto;
305 add_erasure_coding_policies, "addErasureCodingPolicies", AddErasureCodingPoliciesRequestProto, AddErasureCodingPoliciesResponseProto;
306 remove_erasure_coding_policy, "removeErasureCodingPolicy", RemoveErasureCodingPolicyRequestProto, RemoveErasureCodingPolicyResponseProto;
307 enable_erasure_coding_policy, "enableErasureCodingPolicy", EnableErasureCodingPolicyRequestProto, EnableErasureCodingPolicyResponseProto;
308 disable_erasure_coding_policy, "disableErasureCodingPolicy", DisableErasureCodingPolicyRequestProto, DisableErasureCodingPolicyResponseProto;
309 get_slow_datanode_report, "getSlowDatanodeReport", GetSlowDatanodeReportRequestProto, GetSlowDatanodeReportResponseProto;
310}