etcd_client/rpc/
maintenance.rs

1//! Etcd Maintenance RPC.
2
3pub use crate::rpc::pb::etcdserverpb::alarm_request::AlarmAction;
4pub use crate::rpc::pb::etcdserverpb::AlarmType;
5
6use super::pb::etcdserverpb;
7use crate::auth::AuthService;
8use crate::channel::Channel;
9use crate::error::Result;
10use crate::rpc::pb::etcdserverpb::{
11    AlarmRequest as PbAlarmRequest, AlarmResponse as PbAlarmResponse,
12    DefragmentRequest as PbDefragmentRequest, DefragmentResponse as PbDefragmentResponse,
13    HashKvRequest as PbHashKvRequest, HashKvResponse as PbHashKvResponse,
14    HashRequest as PbHashRequest, HashResponse as PbHashResponse,
15    MoveLeaderRequest as PbMoveLeaderRequest, MoveLeaderResponse as PbMoveLeaderResponse,
16    SnapshotRequest as PbSnapshotRequest, SnapshotResponse as PbSnapshotResponse,
17    StatusRequest as PbStatusRequest, StatusResponse as PbStatusResponse,
18};
19use crate::rpc::ResponseHeader;
20use etcdserverpb::maintenance_client::MaintenanceClient as PbMaintenanceClient;
21use etcdserverpb::AlarmMember as PbAlarmMember;
22use http::HeaderValue;
23use std::sync::{Arc, RwLock};
24use tonic::codec::Streaming as PbStreaming;
25use tonic::{IntoRequest, Request};
26
27/// Client for maintenance operations.
28#[repr(transparent)]
29#[derive(Clone)]
30pub struct MaintenanceClient {
31    inner: PbMaintenanceClient<AuthService<Channel>>,
32}
33
34/// Options for `alarm` operation.
35#[derive(Debug, Default, Clone)]
36#[repr(transparent)]
37pub struct AlarmOptions(PbAlarmRequest);
38
39impl AlarmOptions {
40    /// Creates a new `AlarmOptions`.
41    #[inline]
42    pub const fn new() -> Self {
43        AlarmOptions(PbAlarmRequest {
44            action: AlarmAction::Get as i32,
45            member_id: 0,
46            alarm: AlarmType::None as i32,
47        })
48    }
49
50    /// Sets alarm action and alarm type.
51    #[inline]
52    const fn with_action_and_type(
53        mut self,
54        alarm_action: AlarmAction,
55        alarm_type: AlarmType,
56    ) -> Self {
57        self.0.action = alarm_action as i32;
58        self.0.alarm = alarm_type as i32;
59        self
60    }
61
62    /// Sets alarm member.
63    #[inline]
64    pub fn with_member(&mut self, member: u64) {
65        self.0.member_id = member;
66    }
67}
68
69impl From<AlarmOptions> for PbAlarmRequest {
70    #[inline]
71    fn from(alarm: AlarmOptions) -> Self {
72        alarm.0
73    }
74}
75
76impl IntoRequest<PbAlarmRequest> for AlarmOptions {
77    #[inline]
78    fn into_request(self) -> Request<PbAlarmRequest> {
79        Request::new(self.into())
80    }
81}
82
83/// Options for `status` operation.
84#[derive(Debug, Default, Clone)]
85struct StatusOptions(PbStatusRequest);
86
87impl StatusOptions {
88    #[inline]
89    const fn new() -> Self {
90        Self(PbStatusRequest {})
91    }
92}
93
94impl From<StatusOptions> for PbStatusRequest {
95    #[inline]
96    fn from(status: StatusOptions) -> Self {
97        status.0
98    }
99}
100
101impl IntoRequest<PbStatusRequest> for StatusOptions {
102    #[inline]
103    fn into_request(self) -> Request<PbStatusRequest> {
104        Request::new(self.into())
105    }
106}
107
108/// Options for `defragment` operation.
109#[derive(Debug, Default, Clone)]
110struct DefragmentOptions(PbDefragmentRequest);
111
112impl DefragmentOptions {
113    #[inline]
114    const fn new() -> Self {
115        Self(PbDefragmentRequest {})
116    }
117}
118
119impl From<DefragmentOptions> for PbDefragmentRequest {
120    #[inline]
121    fn from(defragment: DefragmentOptions) -> Self {
122        defragment.0
123    }
124}
125
126impl IntoRequest<PbDefragmentRequest> for DefragmentOptions {
127    #[inline]
128    fn into_request(self) -> Request<PbDefragmentRequest> {
129        Request::new(self.into())
130    }
131}
132
133/// Options for `hash` operation.
134#[derive(Debug, Default, Clone)]
135struct HashOptions(PbHashRequest);
136
137impl HashOptions {
138    #[inline]
139    const fn new() -> Self {
140        Self(PbHashRequest {})
141    }
142}
143
144impl From<HashOptions> for PbHashRequest {
145    #[inline]
146    fn from(hash: HashOptions) -> Self {
147        hash.0
148    }
149}
150
151impl IntoRequest<PbHashRequest> for HashOptions {
152    #[inline]
153    fn into_request(self) -> Request<PbHashRequest> {
154        Request::new(self.into())
155    }
156}
157
158/// Options for `hashkv` operation.
159#[derive(Debug, Default, Clone)]
160#[repr(transparent)]
161struct HashKvOptions(PbHashKvRequest);
162
163impl HashKvOptions {
164    #[inline]
165    const fn new(revision: i64) -> Self {
166        Self(PbHashKvRequest { revision })
167    }
168}
169
170impl From<HashKvOptions> for PbHashKvRequest {
171    #[inline]
172    fn from(hash_kv: HashKvOptions) -> Self {
173        hash_kv.0
174    }
175}
176
177impl IntoRequest<PbHashKvRequest> for HashKvOptions {
178    #[inline]
179    fn into_request(self) -> Request<PbHashKvRequest> {
180        Request::new(self.into())
181    }
182}
183
184/// Options for `snapshot` operation.
185#[derive(Debug, Default, Clone)]
186struct SnapshotOptions(PbSnapshotRequest);
187
188impl SnapshotOptions {
189    #[inline]
190    const fn new() -> Self {
191        Self(PbSnapshotRequest {})
192    }
193}
194
195impl From<SnapshotOptions> for PbSnapshotRequest {
196    #[inline]
197    fn from(snapshot: SnapshotOptions) -> Self {
198        snapshot.0
199    }
200}
201
202impl IntoRequest<PbSnapshotRequest> for SnapshotOptions {
203    #[inline]
204    fn into_request(self) -> Request<PbSnapshotRequest> {
205        Request::new(self.into())
206    }
207}
208
209/// Response for `alarm` operation.
210#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
211#[derive(Debug, Clone)]
212#[repr(transparent)]
213pub struct AlarmResponse(PbAlarmResponse);
214
215/// Alarm member of respond.
216#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
217#[derive(Clone, PartialEq, Eq)]
218pub struct AlarmMember {
219    /// memberID is the ID of the member associated with the raised alarm.
220    member_id: u64,
221    /// alarm is the type of alarm which has been raised.
222    alarm: AlarmType,
223}
224
225impl AlarmMember {
226    /// Get member id.
227    #[inline]
228    pub fn member_id(&self) -> u64 {
229        self.member_id
230    }
231
232    /// Get alarm.
233    #[inline]
234    pub fn alarm(&self) -> AlarmType {
235        self.alarm
236    }
237}
238
239impl AlarmResponse {
240    /// Create a new `AlarmResponse` from pb put response.
241    #[inline]
242    const fn new(resp: PbAlarmResponse) -> Self {
243        Self(resp)
244    }
245
246    /// Get response header.
247    #[inline]
248    pub fn header(&self) -> Option<&ResponseHeader> {
249        self.0.header.as_ref().map(From::from)
250    }
251
252    /// Takes the header out of the response, leaving a [`None`] in its place.
253    #[inline]
254    pub fn take_header(&mut self) -> Option<ResponseHeader> {
255        self.0.header.take().map(ResponseHeader::new)
256    }
257
258    /// Get alarms of members.
259    #[inline]
260    pub fn alarms(&self) -> &[AlarmMember] {
261        unsafe { &*(&self.0.alarms as *const Vec<PbAlarmMember> as *const Vec<AlarmMember>) }
262    }
263}
264
265/// Response for `status` operation.
266#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
267#[derive(Debug, Clone)]
268#[repr(transparent)]
269pub struct StatusResponse(PbStatusResponse);
270
271impl StatusResponse {
272    /// Create a new `StatusResponse` from pb put response.
273    #[inline]
274    const fn new(resp: PbStatusResponse) -> Self {
275        Self(resp)
276    }
277
278    /// Get response header.
279    #[inline]
280    pub fn header(&self) -> Option<&ResponseHeader> {
281        self.0.header.as_ref().map(From::from)
282    }
283
284    /// Takes the header out of the response, leaving a [`None`] in its place.
285    #[inline]
286    pub fn take_header(&mut self) -> Option<ResponseHeader> {
287        self.0.header.take().map(ResponseHeader::new)
288    }
289
290    /// Get version of the member.
291    #[inline]
292    pub fn version(&self) -> &str {
293        &self.0.version
294    }
295
296    /// Get size of db, in bytes.
297    #[inline]
298    pub fn db_size(&self) -> i64 {
299        self.0.db_size
300    }
301
302    /// Get leader of cluster.
303    #[inline]
304    pub fn leader(&self) -> u64 {
305        self.0.leader
306    }
307
308    /// Get raft index of cluster.
309    #[inline]
310    pub fn raft_index(&self) -> u64 {
311        self.0.raft_index
312    }
313
314    /// Get raft term of cluster.
315    #[inline]
316    pub fn raft_term(&self) -> u64 {
317        self.0.raft_term
318    }
319
320    /// Get raft applied of respond member.
321    #[inline]
322    pub fn raft_applied_index(&self) -> u64 {
323        self.0.raft_applied_index
324    }
325
326    /// Get errors of cluster members.
327    #[inline]
328    pub fn errors(&self) -> &[String] {
329        &self.0.errors
330    }
331
332    /// Get raft used db size, in bytes.
333    #[inline]
334    pub fn raft_used_db_size(&self) -> i64 {
335        self.0.db_size_in_use
336    }
337
338    /// Indicate if the member is raft learner.
339    #[inline]
340    pub fn is_learner(&self) -> bool {
341        self.0.is_learner
342    }
343}
344
345/// Response for `defragment` operation.
346#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
347#[derive(Debug, Clone)]
348#[repr(transparent)]
349pub struct DefragmentResponse(PbDefragmentResponse);
350
351impl DefragmentResponse {
352    /// Create a new `DefragmentResponse` from pb put response.
353    #[inline]
354    const fn new(resp: PbDefragmentResponse) -> Self {
355        Self(resp)
356    }
357
358    /// Get response header.
359    #[inline]
360    pub fn header(&self) -> Option<&ResponseHeader> {
361        self.0.header.as_ref().map(From::from)
362    }
363
364    /// Takes the header out of the response, leaving a [`None`] in its place.
365    #[inline]
366    pub fn take_header(&mut self) -> Option<ResponseHeader> {
367        self.0.header.take().map(ResponseHeader::new)
368    }
369}
370
371/// Response for `hash` operation.
372#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
373#[derive(Debug, Clone)]
374#[repr(transparent)]
375pub struct HashResponse(PbHashResponse);
376
377impl HashResponse {
378    /// Create a new `HashResponse` from pb put response.
379    #[inline]
380    const fn new(resp: PbHashResponse) -> Self {
381        Self(resp)
382    }
383
384    /// Get response header.
385    #[inline]
386    pub fn header(&self) -> Option<&ResponseHeader> {
387        self.0.header.as_ref().map(From::from)
388    }
389
390    /// Takes the header out of the response, leaving a [`None`] in its place.
391    #[inline]
392    pub fn take_header(&mut self) -> Option<ResponseHeader> {
393        self.0.header.take().map(ResponseHeader::new)
394    }
395
396    /// Gets the hash value computed from the responding member's KV's backend.
397    #[inline]
398    pub fn hash(&self) -> u32 {
399        self.0.hash
400    }
401}
402
403/// Response for `hash_kv` operation.
404#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
405#[derive(Debug, Clone)]
406#[repr(transparent)]
407pub struct HashKvResponse(PbHashKvResponse);
408
409impl HashKvResponse {
410    /// Create a new `HashKvResponse` from pb put response.
411    #[inline]
412    const fn new(resp: PbHashKvResponse) -> Self {
413        Self(resp)
414    }
415
416    /// Get response header.
417    #[inline]
418    pub fn header(&self) -> Option<&ResponseHeader> {
419        self.0.header.as_ref().map(From::from)
420    }
421
422    /// Takes the header out of the response, leaving a [`None`] in its place.
423    #[inline]
424    pub fn take_header(&mut self) -> Option<ResponseHeader> {
425        self.0.header.take().map(ResponseHeader::new)
426    }
427
428    /// Gets the hash value computed from the responding member's MVCC keys up to a given revision.
429    #[inline]
430    pub fn hash(&self) -> u32 {
431        self.0.hash
432    }
433
434    /// Gets compacted revision of key-value store when hash begins.
435    #[inline]
436    pub fn compact_version(&self) -> i64 {
437        self.0.compact_revision
438    }
439}
440
441/// Response for `snapshot` operation.
442#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
443#[derive(Debug, Clone)]
444#[repr(transparent)]
445pub struct SnapshotResponse(PbSnapshotResponse);
446
447impl SnapshotResponse {
448    /// Create a new `SnapshotResponse` from pb put response.
449    #[inline]
450    const fn new(resp: PbSnapshotResponse) -> Self {
451        Self(resp)
452    }
453
454    /// Get response header.
455    #[inline]
456    pub fn header(&self) -> Option<&ResponseHeader> {
457        self.0.header.as_ref().map(From::from)
458    }
459
460    /// Takes the header out of the response, leaving a [`None`] in its place.
461    #[inline]
462    pub fn take_header(&mut self) -> Option<ResponseHeader> {
463        self.0.header.take().map(ResponseHeader::new)
464    }
465
466    /// Get remaining bytes.
467    #[inline]
468    pub fn remaining_bytes(&self) -> u64 {
469        self.0.remaining_bytes
470    }
471
472    /// The next chunk of the snapshot in the snapshot stream.
473    #[inline]
474    pub fn blob(&self) -> &[u8] {
475        &self.0.blob
476    }
477}
478
479/// Response for `snapshot` operation.
480#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
481#[derive(Debug)]
482#[repr(transparent)]
483pub struct SnapshotStreaming(PbStreaming<PbSnapshotResponse>);
484
485impl SnapshotStreaming {
486    /// Fetches the next message from this stream.
487    #[inline]
488    pub async fn message(&mut self) -> Result<Option<SnapshotResponse>> {
489        let ret = self.0.message().await?;
490        match ret {
491            Some(rsp) => Ok(Some(SnapshotResponse::new(rsp))),
492            None => Ok(None),
493        }
494    }
495}
496
497/// Options for `MoveLeader` operation.
498#[derive(Debug, Default, Clone)]
499#[repr(transparent)]
500pub struct MoveLeaderOptions(PbMoveLeaderRequest);
501
502impl MoveLeaderOptions {
503    /// Sets target_id
504    #[inline]
505    const fn with_target_id(mut self, target_id: u64) -> Self {
506        self.0.target_id = target_id;
507        self
508    }
509
510    /// Creates a `MoveLeaderOptions`.
511    #[inline]
512    pub const fn new() -> Self {
513        Self(PbMoveLeaderRequest { target_id: 0 })
514    }
515}
516
517impl From<MoveLeaderOptions> for PbMoveLeaderRequest {
518    #[inline]
519    fn from(options: MoveLeaderOptions) -> Self {
520        options.0
521    }
522}
523
524impl IntoRequest<PbMoveLeaderRequest> for MoveLeaderOptions {
525    #[inline]
526    fn into_request(self) -> Request<PbMoveLeaderRequest> {
527        Request::new(self.into())
528    }
529}
530
531/// Response for `MoveLeader` operation.
532#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
533#[derive(Debug, Clone)]
534#[repr(transparent)]
535pub struct MoveLeaderResponse(PbMoveLeaderResponse);
536
537impl MoveLeaderResponse {
538    #[inline]
539    const fn new(resp: PbMoveLeaderResponse) -> Self {
540        Self(resp)
541    }
542
543    /// Get response header.
544    #[inline]
545    pub fn header(&self) -> Option<&ResponseHeader> {
546        self.0.header.as_ref().map(From::from)
547    }
548
549    /// Takes the header out of the response, leaving a [`None`] in its place.
550    #[inline]
551    pub fn take_header(&mut self) -> Option<ResponseHeader> {
552        self.0.header.take().map(ResponseHeader::new)
553    }
554}
555
556impl MaintenanceClient {
557    /// Creates a maintenance client.
558    #[inline]
559    pub(crate) fn new(channel: Channel, auth_token: Arc<RwLock<Option<HeaderValue>>>) -> Self {
560        let inner = PbMaintenanceClient::new(AuthService::new(channel, auth_token));
561        Self { inner }
562    }
563
564    /// Get or active or inactive alarm.
565    #[inline]
566    pub async fn alarm(
567        &mut self,
568        alarm_action: AlarmAction,
569        alarm_type: AlarmType,
570        options: Option<AlarmOptions>,
571    ) -> Result<AlarmResponse> {
572        let resp = self
573            .inner
574            .alarm(
575                options
576                    .unwrap_or_default()
577                    .with_action_and_type(alarm_action, alarm_type),
578            )
579            .await?
580            .into_inner();
581        Ok(AlarmResponse::new(resp))
582    }
583
584    /// Get status of a member.
585    #[inline]
586    pub async fn status(&mut self) -> Result<StatusResponse> {
587        let resp = self.inner.status(StatusOptions::new()).await?.into_inner();
588        Ok(StatusResponse::new(resp))
589    }
590
591    /// Defragment a member's backend database to recover storage space.
592    #[inline]
593    pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
594        let resp = self
595            .inner
596            .defragment(DefragmentOptions::new())
597            .await?
598            .into_inner();
599        Ok(DefragmentResponse::new(resp))
600    }
601
602    /// Computes the hash of whole backend keyspace.
603    /// including key, lease, and other buckets in storage.
604    /// This is designed for testing ONLY!
605    #[inline]
606    pub async fn hash(&mut self) -> Result<HashResponse> {
607        let resp = self.inner.hash(HashOptions::new()).await?.into_inner();
608        Ok(HashResponse::new(resp))
609    }
610
611    /// Computes the hash of all MVCC keys up to a given revision.
612    /// It only iterates \"key\" bucket in backend storage.
613    #[inline]
614    pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
615        let resp = self
616            .inner
617            .hash_kv(HashKvOptions::new(revision))
618            .await?
619            .into_inner();
620        Ok(HashKvResponse::new(resp))
621    }
622
623    /// Gets a snapshot of the entire backend from a member over a stream to a client.
624    #[inline]
625    pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
626        let resp = self
627            .inner
628            .snapshot(SnapshotOptions::new())
629            .await?
630            .into_inner();
631        Ok(SnapshotStreaming(resp))
632    }
633
634    /// Moves the current leader node to target node.
635    #[inline]
636    pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
637        let resp = self
638            .inner
639            .move_leader(MoveLeaderOptions::new().with_target_id(target_id))
640            .await?
641            .into_inner();
642        Ok(MoveLeaderResponse::new(resp))
643    }
644}