hdfs_client/
hrpc.rs

1use std::io::{Read, Write};
2
3use hdfs_types::common::*;
4use hdfs_types::hdfs::*;
5use prost::{
6    bytes::{BufMut, BytesMut},
7    EncodeError, Message,
8};
9
10use crate::HDFSError;
11
12const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
13
14fn into_err(error: RpcResponseHeaderProto) -> Result<RpcResponseHeaderProto, HDFSError> {
15    if error.status() != rpc_response_header_proto::RpcStatusProto::Success {
16        Err(HDFSError::NameNodeError(Box::new(error)))
17    } else {
18        Ok(error)
19    }
20}
21
22#[derive(Debug, Clone)]
23pub struct Handshake {
24    pub version: u8,
25    pub server_class: u8,
26    pub auth_protocol: u8,
27}
28
29impl Default for Handshake {
30    fn default() -> Self {
31        Self {
32            version: 9,
33            server_class: 0,
34            auth_protocol: 0,
35        }
36    }
37}
38
39impl Handshake {
40    pub fn encode<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
41        buf.put_slice(b"hrpc");
42        buf.put_u8(self.version);
43        buf.put_u8(self.server_class);
44        buf.put_u8(self.auth_protocol);
45        Ok(())
46    }
47}
48
49/// hdfs rpc
50pub struct HRpc<S> {
51    pub(crate) stream: S,
52    pub(crate) call_id: i32,
53    pub(crate) client_id: Vec<u8>,
54    pub(crate) context: Option<RpcCallerContextProto>,
55}
56
57impl<S: Write + Read> HRpc<S> {
58    pub fn connect(
59        mut stream: S,
60        effective_user: impl Into<Option<String>>,
61        real_user: impl Into<Option<String>>,
62        context: impl Into<Option<RpcCallerContextProto>>,
63        handshake: impl Into<Option<Handshake>>,
64    ) -> Result<Self, HDFSError> {
65        let client_id = uuid::Uuid::new_v4().to_bytes_le().to_vec();
66        let context = context.into();
67        let mut buf = BytesMut::new();
68        let handshake = handshake.into().unwrap_or_default();
69        handshake.encode(&mut buf)?;
70        let req_header = RpcRequestHeaderProto {
71            rpc_kind: Some(RpcKindProto::RpcProtocolBuffer as i32),
72            rpc_op: Some(rpc_request_header_proto::OperationProto::RpcFinalPacket as i32),
73            call_id: -3,
74            client_id: client_id.clone(),
75            retry_count: Some(-1),
76            trace_info: None,
77            caller_context: context.clone(),
78            state_id: None,
79            router_federated_state: None,
80        };
81        let ipc_req = IpcConnectionContextProto {
82            protocol: Some(PROTOCOL.into()),
83            user_info: Some(UserInformationProto {
84                effective_user: effective_user.into(),
85                real_user: real_user.into(),
86            }),
87        };
88        let ori = buf.len();
89        buf.put_u32(0);
90        req_header.encode_length_delimited(&mut buf).unwrap();
91        ipc_req.encode_length_delimited(&mut buf).unwrap();
92        let length = buf.len() - ori - 4;
93        buf[ori..(ori + 4)].copy_from_slice(&(length as u32).to_be_bytes());
94        stream.write_all(&buf)?;
95        stream.flush()?;
96        Ok(Self {
97            stream,
98            call_id: Default::default(),
99            client_id,
100            context,
101        })
102    }
103
104    pub fn send_raw_req(
105        &mut self,
106        method_name: &str,
107        req: &[u8],
108    ) -> Result<(RpcResponseHeaderProto, BytesMut), HDFSError> {
109        let call_id = self.call_id;
110        let rpc_req_header = RpcRequestHeaderProto {
111            rpc_kind: Some(RpcKindProto::RpcProtocolBuffer as i32),
112            rpc_op: Some(rpc_request_header_proto::OperationProto::RpcFinalPacket as i32),
113            call_id,
114            client_id: self.client_id.clone(),
115            retry_count: Some(0),
116            trace_info: None,
117            caller_context: self.context.clone(),
118            state_id: None,
119            router_federated_state: None,
120        };
121        let rpc_req_bytes = rpc_req_header.encode_length_delimited_to_vec();
122        self.call_id += 1;
123        let req_header = RequestHeaderProto {
124            method_name: method_name.into(),
125            declaring_class_protocol_name: PROTOCOL.into(),
126            client_protocol_version: 1,
127        };
128        let header_bytes = req_header.encode_length_delimited_to_vec();
129        let total = rpc_req_bytes.len() + header_bytes.len() + req.len();
130        let mut buf = BytesMut::with_capacity(total + 4);
131        buf.put_u32(total as u32);
132        buf.extend_from_slice(&rpc_req_bytes);
133        buf.extend_from_slice(&header_bytes);
134        buf.extend_from_slice(req);
135        self.stream.write_all(&buf)?;
136        self.stream.flush()?;
137        self.read_resp()
138    }
139
140    pub(crate) fn read_resp(&mut self) -> Result<(RpcResponseHeaderProto, BytesMut), HDFSError> {
141        let mut raw_length = [0u8; 4];
142        self.stream.read_exact(&mut raw_length)?;
143        let length = u32::from_be_bytes(raw_length) as usize;
144        let mut buf = BytesMut::with_capacity(length);
145        buf.resize(length, 0);
146        self.stream.read_exact(&mut buf)?;
147        let header_length = prost::encoding::decode_varint(&mut buf)?;
148        let header_bytes = buf.split_to(header_length as usize);
149        let resp_header = RpcResponseHeaderProto::decode(header_bytes)?;
150        // into error if status is not success
151        let resp_header = into_err(resp_header)?;
152        Ok((resp_header, buf))
153    }
154}
155
156macro_rules! method {
157    ($name:ident, $raw:literal, $req:ty, $resp:ty) => {
158        pub fn $name(&mut self, req: $req) -> Result<(RpcResponseHeaderProto, $resp), HDFSError> {
159            #[cfg(feature = "trace_dbg")]
160            {
161                tracing::trace!(target: "hrpc", "\nmethod: {}\nreq: {:#?}", $raw, req);
162            }
163            #[cfg(feature = "trace_valuable")]
164            {
165                use valuable::Valuable;
166                tracing::trace!(target: "hrpc", method=$raw, req=req.as_value());
167            }
168            let req = req.encode_length_delimited_to_vec();
169            let (header, resp) = self.send_raw_req($raw, &req)?;
170            let resp = <$resp>::decode_length_delimited(resp)?;
171            #[cfg(feature = "trace_dbg")]
172            {
173                tracing::trace!(target: "hrpc", "\nmethod: {}\nheader: {:#?}\nresp: {:#?}", $raw, header, resp);
174            }
175            #[cfg(feature = "trace_valuable")]
176            {
177                use valuable::Valuable;
178                tracing::trace!(target: "hrpc", method=$raw, header=header.as_value(), resp=resp.as_value() );
179            }
180            Ok((header, resp))
181        }
182    };
183
184    ($ipc:ty => $($name:ident, $raw:literal, $req:ty, $resp:ty);+;) => {
185        impl <S: std::io::Write + std::io::Read> $ipc {
186            $(method!($name, $raw, $req, $resp);)+
187        }
188    }
189}
190
191// data node methods
192method! { HRpc<S> =>
193    get_replica_visible_length, "getReplicaVisibleLength", GetReplicaVisibleLengthRequestProto, GetReplicaVisibleLengthResponseProto;
194    refresh_namenodes, "refreshNamenodes", RefreshNamenodesRequestProto, RefreshNamenodesResponseProto;
195    delete_block_pool, "deleteBlockPool", DeleteBlockPoolRequestProto, DeleteBlockPoolResponseProto;
196    get_block_local_path_info, "getBlockLocalPathInfo", GetBlockLocalPathInfoRequestProto, GetBlockLocalPathInfoResponseProto;
197    shutdown_datanode, "shutdownDatanode", ShutdownDatanodeRequestProto, ShutdownDatanodeResponseProto;
198    evict_writers, "evictWriters", EvictWritersRequestProto, EvictWritersResponseProto;
199    get_datanode_info, "getDatanodeInfo", GetDatanodeInfoRequestProto, GetDatanodeInfoResponseProto;
200    get_volume_report, "getVolumeReport", GetVolumeReportRequestProto, GetVolumeReportResponseProto;
201    start_reconfiguration, "startReconfiguration", StartReconfigurationRequestProto, StartReconfigurationResponseProto;
202    trigger_block_report, "triggerBlockReport", TriggerBlockReportRequestProto, TriggerBlockReportResponseProto;
203    get_balancer_bandwidth, "getBalancerBandwidth", GetBalancerBandwidthRequestProto, GetBalancerBandwidthResponseProto;
204    get_reconfiguration_status, "getReconfigurationStatus", GetReconfigurationStatusRequestProto, GetReconfigurationStatusResponseProto;
205    get_disk_balancer_setting, "getDiskBalancerSetting", DiskBalancerSettingRequestProto, DiskBalancerSettingResponseProto;
206}
207
208// name node methods
209method! { HRpc<S> =>
210    create, "create", CreateRequestProto, CreateResponseProto;
211    get_server_defaults, "getServerDefaults", GetServerDefaultsRequestProto,GetServerDefaultsResponseProto;
212    get_block_locations, "getBlockLocations", GetBlockLocationsRequestProto, GetBlockLocationsResponseProto;
213    append, "append", AppendRequestProto, AppendResponseProto;
214    set_replication, "setReplication", SetReplicationRequestProto, SetReplicationResponseProto;
215    set_storage_policy, "setStoragePolicy", SetStoragePolicyRequestProto, SetStoragePolicyResponseProto;
216    unset_storage_policy, "unsetStoragePolicy", UnsetStoragePolicyRequestProto, UnsetStoragePolicyResponseProto;
217    get_storage_policy, "getStoragePolicy", GetStoragePolicyRequestProto, GetStoragePolicyResponseProto;
218    get_storage_policies, "getStoragePolicies", GetStoragePoliciesRequestProto, GetStoragePoliciesResponseProto;
219    set_permission, "setPermission", SetPermissionRequestProto, SetPermissionResponseProto;
220    set_owner, "setOwner", SetOwnerRequestProto, SetOwnerResponseProto;
221    abandon_block, "abandonBlock", AbandonBlockRequestProto, AbandonBlockResponseProto;
222    add_block, "addBlock", AddBlockRequestProto, AddBlockResponseProto;
223    get_additional_datanode, "getAdditionalDatanode", GetAdditionalDatanodeRequestProto, GetAdditionalDatanodeResponseProto;
224    complete, "complete", CompleteRequestProto, CompleteResponseProto;
225    report_bad_blocks, "reportBadBlocks", ReportBadBlocksRequestProto, ReportBadBlocksResponseProto;
226    concat, "concat", ConcatRequestProto, ConcatResponseProto;
227    truncate, "truncate", TruncateRequestProto, TruncateResponseProto;
228    rename, "rename", RenameRequestProto, RenameResponseProto;
229    rename2, "rename2", Rename2RequestProto, Rename2ResponseProto;
230    delete, "delete", DeleteRequestProto, DeleteResponseProto;
231    mkdirs, "mkdirs", MkdirsRequestProto, MkdirsResponseProto;
232    get_listing, "getListing", GetListingRequestProto, GetListingResponseProto;
233    renew_lease, "renewLease", RenewLeaseRequestProto, RenewLeaseResponseProto;
234    recover_lease, "recoverLease", RecoverLeaseRequestProto, RecoverLeaseResponseProto;
235    get_fs_stats, "getFsStats", GetFsStatusRequestProto, GetFsStatsResponseProto;
236    get_datanode_report, "getDatanodeReport", GetDatanodeReportRequestProto, GetDatanodeReportResponseProto;
237    get_preferred_block_size, "getPreferredBlockSize", GetPreferredBlockSizeRequestProto, GetPreferredBlockSizeResponseProto;
238    set_safe_mode, "setSafeMode", SetSafeModeRequestProto, SetSafeModeResponseProto;
239    save_namespace, "saveNamespace", SaveNamespaceRequestProto, SaveNamespaceResponseProto;
240    roll_edits, "rollEdits", RollEditsRequestProto, RollEditsResponseProto;
241    restore_failed_storage, "restoreFailedStorage", RestoreFailedStorageRequestProto, RestoreFailedStorageResponseProto;
242    refresh_nodes, "refreshNodes", RefreshNodesRequestProto, RefreshNodesResponseProto;
243    finalize_upgrade, "finalizeUpgrade", FinalizeUpgradeRequestProto, FinalizeUpgradeResponseProto;
244    upgrade_status, "upgradeStatus", UpgradeStatusRequestProto, UpgradeStatusResponseProto;
245    rolling_upgrade, "rollingUpgrade", RollingUpgradeRequestProto, RollingUpgradeResponseProto;
246    list_corrupt_file_blocks, "listCorruptFileBlocks", ListCorruptFileBlocksRequestProto, ListCorruptFileBlocksResponseProto;
247    meta_save, "metaSave", MetaSaveRequestProto, MetaSaveResponseProto;
248    get_file_info, "getFileInfo", GetFileInfoRequestProto, GetFileInfoResponseProto;
249    get_located_file_info, "getLocatedFileInfo", GetLocatedFileInfoRequestProto, GetLocatedFileInfoResponseProto;
250    add_cache_pool, "addCachePool", AddCachePoolRequestProto, AddCachePoolResponseProto;
251    modify_cache_pool, "modifyCachePool", ModifyCachePoolRequestProto, ModifyCachePoolResponseProto;
252    remove_cache_pool, "removeCachePool", RemoveCachePoolRequestProto, RemoveCachePoolResponseProto;
253    list_cache_pools, "listCachePools", ListCachePoolsRequestProto, ListCachePoolsResponseProto;
254    get_file_link_info, "getFileLinkInfo", GetFileLinkInfoRequestProto, GetFileLinkInfoResponseProto;
255    get_content_summary, "getContentSummary", GetContentSummaryRequestProto, GetContentSummaryResponseProto;
256    set_quota, "setQuota", SetQuotaRequestProto, SetQuotaResponseProto;
257    fsync, "fsync", FsyncRequestProto, FsyncResponseProto;
258    set_times, "setTimes", SetTimesRequestProto, SetTimesResponseProto;
259    create_symlink, "createSymlink", CreateSymlinkRequestProto, CreateSymlinkResponseProto;
260    get_link_target, "getLinkTarget", GetLinkTargetRequestProto, GetLinkTargetResponseProto;
261    update_block_for_pipeline, "updateBlockForPipeline", UpdateBlockForPipelineRequestProto, UpdateBlockForPipelineResponseProto;
262    update_pipeline, "updatePipeline", UpdatePipelineRequestProto, UpdatePipelineResponseProto;
263    set_balancer_bandwidth, "setBalancerBandwidth", SetBalancerBandwidthRequestProto, SetBalancerBandwidthResponseProto;
264    get_data_encryption_key, "getDataEncryptionKey", GetDataEncryptionKeyRequestProto, GetDataEncryptionKeyResponseProto;
265    create_snapshot, "createSnapshot", CreateSnapshotRequestProto, CreateSnapshotResponseProto;
266    rename_snapshot, "renameSnapshot", RenameSnapshotRequestProto, RenameSnapshotResponseProto;
267    allow_snapshot, "allowSnapshot", AllowSnapshotRequestProto, AllowSnapshotResponseProto;
268    disallow_snapshot, "disallowSnapshot", DisallowSnapshotRequestProto, DisallowSnapshotResponseProto;
269    get_snapshot_listing, "getSnapshotListing", GetSnapshotListingRequestProto, GetSnapshotListingResponseProto;
270    delete_snapshot, "deleteSnapshot", DeleteSnapshotRequestProto, DeleteSnapshotResponseProto;
271    get_snapshot_diff_report, "getSnapshotDiffReport", GetSnapshotDiffReportRequestProto, GetSnapshotDiffReportResponseProto;
272    is_file_closed, "isFileClosed", IsFileClosedRequestProto, IsFileClosedResponseProto;
273    modify_acl_entries, "modifyAclEntries", ModifyAclEntriesRequestProto, ModifyAclEntriesResponseProto;
274    remove_acl_entries, "removeAclEntries", RemoveAclEntriesRequestProto, RemoveAclEntriesResponseProto;
275    remove_default_acl, "removeDefaultAcl", RemoveDefaultAclRequestProto, RemoveDefaultAclResponseProto;
276    remove_acl, "removeAcl", RemoveAclRequestProto, RemoveAclResponseProto;
277    set_acl, "setAcl", SetAclRequestProto, SetAclResponseProto;
278    get_acl_status, "getAclStatus", GetAclStatusRequestProto, GetAclStatusResponseProto;
279    set_x_attr, "setXAttr", SetXAttrRequestProto, SetXAttrResponseProto;
280    get_x_attrs, "getXAttrs", GetXAttrsRequestProto, GetXAttrsResponseProto;
281    list_x_attrs, "listXAttrs", ListXAttrsRequestProto, ListXAttrsResponseProto;
282    remove_x_attr, "removeXAttr", RemoveXAttrRequestProto, RemoveXAttrResponseProto;
283    check_access, "checkAccess", CheckAccessRequestProto, CheckAccessResponseProto;
284    create_encryption_zone, "createEncryptionZone", CreateEncryptionZoneRequestProto, CreateEncryptionZoneResponseProto;
285    list_encryption_zones, "listEncryptionZones", ListEncryptionZonesRequestProto, ListEncryptionZonesResponseProto;
286    reencrypt_encryption_zone, "reencryptEncryptionZone", ReencryptEncryptionZoneRequestProto, ReencryptEncryptionZoneResponseProto;
287    list_reencryption_status, "listReencryptionStatus", ListReencryptionStatusRequestProto, ListReencryptionStatusResponseProto;
288    get_e_z_for_path, "getEZForPath", GetEzForPathRequestProto, GetEzForPathResponseProto;
289    set_erasure_coding_policy, "setErasureCodingPolicy", SetErasureCodingPolicyRequestProto, SetErasureCodingPolicyResponseProto;
290    get_current_edit_log_txid, "getCurrentEditLogTxid", GetCurrentEditLogTxidRequestProto, GetCurrentEditLogTxidResponseProto;
291    get_edits_from_txid, "getEditsFromTxid", GetEditsFromTxidRequestProto, GetEditsFromTxidResponseProto;
292    get_erasure_coding_policy, "getErasureCodingPolicy", GetErasureCodingPolicyRequestProto, GetErasureCodingPolicyResponseProto;
293    get_erasure_coding_codecs, "getErasureCodingCodecs", GetErasureCodingCodecsRequestProto, GetErasureCodingCodecsResponseProto;
294    get_quota_usage, "getQuotaUsage", GetQuotaUsageRequestProto, GetQuotaUsageResponseProto;
295    list_open_files, "listOpenFiles", ListOpenFilesRequestProto, ListOpenFilesResponseProto;
296    msync, "msync", MsyncRequestProto, MsyncResponseProto;
297    satisfy_storage_policy, "satisfyStoragePolicy", SatisfyStoragePolicyRequestProto, SatisfyStoragePolicyResponseProto;
298    get_ha_service_state, "getHAServiceState", HaServiceStateRequestProto, HaServiceStateResponseProto;
299    get_datanode_storage_report, "getDatanodeStorageReport", GetDatanodeStorageReportRequestProto, GetDatanodeStorageReportResponseProto;
300    get_snapshottable_dir_listing, "getSnapshottableDirListing", GetSnapshottableDirListingRequestProto, GetSnapshottableDirListingResponseProto;
301    get_snapshot_diff_report_listing, "getSnapshotDiffReportListing", GetSnapshotDiffReportListingRequestProto, GetSnapshotDiffReportListingResponseProto;
302    unset_erasure_coding_policy, "unsetErasureCodingPolicy", UnsetErasureCodingPolicyRequestProto, UnsetErasureCodingPolicyResponseProto;
303    get_e_c_topology_result_for_policies, "getECTopologyResultForPolicies", GetEcTopologyResultForPoliciesRequestProto, GetEcTopologyResultForPoliciesResponseProto;
304    get_erasure_coding_policies, "getErasureCodingPolicies", GetErasureCodingPoliciesRequestProto, GetErasureCodingPoliciesResponseProto;
305    add_erasure_coding_policies, "addErasureCodingPolicies", AddErasureCodingPoliciesRequestProto, AddErasureCodingPoliciesResponseProto;
306    remove_erasure_coding_policy, "removeErasureCodingPolicy", RemoveErasureCodingPolicyRequestProto, RemoveErasureCodingPolicyResponseProto;
307    enable_erasure_coding_policy, "enableErasureCodingPolicy", EnableErasureCodingPolicyRequestProto, EnableErasureCodingPolicyResponseProto;
308    disable_erasure_coding_policy, "disableErasureCodingPolicy", DisableErasureCodingPolicyRequestProto, DisableErasureCodingPolicyResponseProto;
309    get_slow_datanode_report, "getSlowDatanodeReport", GetSlowDatanodeReportRequestProto, GetSlowDatanodeReportResponseProto;
310}