etcd_client/rpc/
maintenance.rs

1//! Etcd Maintenance RPC.
2
3pub use crate::rpc::pb::etcdserverpb::alarm_request::AlarmAction;
4pub use crate::rpc::pb::etcdserverpb::AlarmType;
5
6use super::pb::etcdserverpb;
7use crate::error::Result;
8use crate::intercept::InterceptedChannel;
9use crate::rpc::pb::etcdserverpb::{
10    AlarmRequest as PbAlarmRequest, AlarmResponse as PbAlarmResponse,
11    DefragmentRequest as PbDefragmentRequest, DefragmentResponse as PbDefragmentResponse,
12    HashKvRequest as PbHashKvRequest, HashKvResponse as PbHashKvResponse,
13    HashRequest as PbHashRequest, HashResponse as PbHashResponse,
14    MoveLeaderRequest as PbMoveLeaderRequest, MoveLeaderResponse as PbMoveLeaderResponse,
15    SnapshotRequest as PbSnapshotRequest, SnapshotResponse as PbSnapshotResponse,
16    StatusRequest as PbStatusRequest, StatusResponse as PbStatusResponse,
17};
18use crate::rpc::ResponseHeader;
19use etcdserverpb::maintenance_client::MaintenanceClient as PbMaintenanceClient;
20use etcdserverpb::AlarmMember as PbAlarmMember;
21use tonic::codec::Streaming as PbStreaming;
22use tonic::{IntoRequest, Request};
23
24/// Client for maintenance operations.
25#[repr(transparent)]
26#[derive(Clone)]
27pub struct MaintenanceClient {
28    inner: PbMaintenanceClient<InterceptedChannel>,
29}
30
31/// Options for `alarm` operation.
32#[derive(Debug, Default, Clone)]
33#[repr(transparent)]
34pub struct AlarmOptions(PbAlarmRequest);
35
36impl AlarmOptions {
37    /// Creates a new `AlarmOptions`.
38    #[inline]
39    pub const fn new() -> Self {
40        AlarmOptions(PbAlarmRequest {
41            action: AlarmAction::Get as i32,
42            member_id: 0,
43            alarm: AlarmType::None as i32,
44        })
45    }
46
47    /// Sets alarm action and alarm type.
48    #[inline]
49    const fn with_action_and_type(
50        mut self,
51        alarm_action: AlarmAction,
52        alarm_type: AlarmType,
53    ) -> Self {
54        self.0.action = alarm_action as i32;
55        self.0.alarm = alarm_type as i32;
56        self
57    }
58
59    /// Sets alarm member.
60    #[inline]
61    pub fn with_member(&mut self, member: u64) {
62        self.0.member_id = member;
63    }
64}
65
66impl From<AlarmOptions> for PbAlarmRequest {
67    #[inline]
68    fn from(alarm: AlarmOptions) -> Self {
69        alarm.0
70    }
71}
72
73impl IntoRequest<PbAlarmRequest> for AlarmOptions {
74    #[inline]
75    fn into_request(self) -> Request<PbAlarmRequest> {
76        Request::new(self.into())
77    }
78}
79
80/// Options for `status` operation.
81#[derive(Debug, Default, Clone)]
82struct StatusOptions(PbStatusRequest);
83
84impl StatusOptions {
85    #[inline]
86    const fn new() -> Self {
87        Self(PbStatusRequest {})
88    }
89}
90
91impl From<StatusOptions> for PbStatusRequest {
92    #[inline]
93    fn from(status: StatusOptions) -> Self {
94        status.0
95    }
96}
97
98impl IntoRequest<PbStatusRequest> for StatusOptions {
99    #[inline]
100    fn into_request(self) -> Request<PbStatusRequest> {
101        Request::new(self.into())
102    }
103}
104
105/// Options for `defragment` operation.
106#[derive(Debug, Default, Clone)]
107struct DefragmentOptions(PbDefragmentRequest);
108
109impl DefragmentOptions {
110    #[inline]
111    const fn new() -> Self {
112        Self(PbDefragmentRequest {})
113    }
114}
115
116impl From<DefragmentOptions> for PbDefragmentRequest {
117    #[inline]
118    fn from(defragment: DefragmentOptions) -> Self {
119        defragment.0
120    }
121}
122
123impl IntoRequest<PbDefragmentRequest> for DefragmentOptions {
124    #[inline]
125    fn into_request(self) -> Request<PbDefragmentRequest> {
126        Request::new(self.into())
127    }
128}
129
130/// Options for `hash` operation.
131#[derive(Debug, Default, Clone)]
132struct HashOptions(PbHashRequest);
133
134impl HashOptions {
135    #[inline]
136    const fn new() -> Self {
137        Self(PbHashRequest {})
138    }
139}
140
141impl From<HashOptions> for PbHashRequest {
142    #[inline]
143    fn from(hash: HashOptions) -> Self {
144        hash.0
145    }
146}
147
148impl IntoRequest<PbHashRequest> for HashOptions {
149    #[inline]
150    fn into_request(self) -> Request<PbHashRequest> {
151        Request::new(self.into())
152    }
153}
154
155/// Options for `hashkv` operation.
156#[derive(Debug, Default, Clone)]
157#[repr(transparent)]
158struct HashKvOptions(PbHashKvRequest);
159
160impl HashKvOptions {
161    #[inline]
162    const fn new(revision: i64) -> Self {
163        Self(PbHashKvRequest { revision })
164    }
165}
166
167impl From<HashKvOptions> for PbHashKvRequest {
168    #[inline]
169    fn from(hash_kv: HashKvOptions) -> Self {
170        hash_kv.0
171    }
172}
173
174impl IntoRequest<PbHashKvRequest> for HashKvOptions {
175    #[inline]
176    fn into_request(self) -> Request<PbHashKvRequest> {
177        Request::new(self.into())
178    }
179}
180
181/// Options for `snapshot` operation.
182#[derive(Debug, Default, Clone)]
183struct SnapshotOptions(PbSnapshotRequest);
184
185impl SnapshotOptions {
186    #[inline]
187    const fn new() -> Self {
188        Self(PbSnapshotRequest {})
189    }
190}
191
192impl From<SnapshotOptions> for PbSnapshotRequest {
193    #[inline]
194    fn from(snapshot: SnapshotOptions) -> Self {
195        snapshot.0
196    }
197}
198
199impl IntoRequest<PbSnapshotRequest> for SnapshotOptions {
200    #[inline]
201    fn into_request(self) -> Request<PbSnapshotRequest> {
202        Request::new(self.into())
203    }
204}
205
206/// Response for `alarm` operation.
207#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
208#[derive(Debug, Clone)]
209#[repr(transparent)]
210pub struct AlarmResponse(PbAlarmResponse);
211
212/// Alarm member of respond.
213#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
214#[derive(Clone, PartialEq, Eq)]
215pub struct AlarmMember {
216    /// memberID is the ID of the member associated with the raised alarm.
217    member_id: u64,
218    /// alarm is the type of alarm which has been raised.
219    alarm: AlarmType,
220}
221
222impl AlarmMember {
223    /// Get member id.
224    #[inline]
225    pub fn member_id(&self) -> u64 {
226        self.member_id
227    }
228
229    /// Get alarm.
230    #[inline]
231    pub fn alarm(&self) -> AlarmType {
232        self.alarm
233    }
234}
235
236impl AlarmResponse {
237    /// Create a new `AlarmResponse` from pb put response.
238    #[inline]
239    const fn new(resp: PbAlarmResponse) -> Self {
240        Self(resp)
241    }
242
243    /// Get response header.
244    #[inline]
245    pub fn header(&self) -> Option<&ResponseHeader> {
246        self.0.header.as_ref().map(From::from)
247    }
248
249    /// Takes the header out of the response, leaving a [`None`] in its place.
250    #[inline]
251    pub fn take_header(&mut self) -> Option<ResponseHeader> {
252        self.0.header.take().map(ResponseHeader::new)
253    }
254
255    /// Get alarms of members.
256    #[inline]
257    pub fn alarms(&self) -> &[AlarmMember] {
258        unsafe { &*(&self.0.alarms as *const Vec<PbAlarmMember> as *const Vec<AlarmMember>) }
259    }
260}
261
262/// Response for `status` operation.
263#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
264#[derive(Debug, Clone)]
265#[repr(transparent)]
266pub struct StatusResponse(PbStatusResponse);
267
268impl StatusResponse {
269    /// Create a new `StatusResponse` from pb put response.
270    #[inline]
271    const fn new(resp: PbStatusResponse) -> Self {
272        Self(resp)
273    }
274
275    /// Get response header.
276    #[inline]
277    pub fn header(&self) -> Option<&ResponseHeader> {
278        self.0.header.as_ref().map(From::from)
279    }
280
281    /// Takes the header out of the response, leaving a [`None`] in its place.
282    #[inline]
283    pub fn take_header(&mut self) -> Option<ResponseHeader> {
284        self.0.header.take().map(ResponseHeader::new)
285    }
286
287    /// Get version of the member.
288    #[inline]
289    pub fn version(&self) -> &str {
290        &self.0.version
291    }
292
293    /// Get size of db, in bytes.
294    #[inline]
295    pub fn db_size(&self) -> i64 {
296        self.0.db_size
297    }
298
299    /// Get leader of cluster.
300    #[inline]
301    pub fn leader(&self) -> u64 {
302        self.0.leader
303    }
304
305    /// Get raft index of cluster.
306    #[inline]
307    pub fn raft_index(&self) -> u64 {
308        self.0.raft_index
309    }
310
311    /// Get raft term of cluster.
312    #[inline]
313    pub fn raft_term(&self) -> u64 {
314        self.0.raft_term
315    }
316
317    /// Get raft applied of respond member.
318    #[inline]
319    pub fn raft_applied_index(&self) -> u64 {
320        self.0.raft_applied_index
321    }
322
323    /// Get errors of cluster members.
324    #[inline]
325    pub fn errors(&self) -> &[String] {
326        &self.0.errors
327    }
328
329    /// Get raft used db size, in bytes.
330    #[inline]
331    pub fn raft_used_db_size(&self) -> i64 {
332        self.0.db_size_in_use
333    }
334
335    /// Indicate if the member is raft learner.
336    #[inline]
337    pub fn is_learner(&self) -> bool {
338        self.0.is_learner
339    }
340}
341
342/// Response for `defragment` operation.
343#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
344#[derive(Debug, Clone)]
345#[repr(transparent)]
346pub struct DefragmentResponse(PbDefragmentResponse);
347
348impl DefragmentResponse {
349    /// Create a new `DefragmentResponse` from pb put response.
350    #[inline]
351    const fn new(resp: PbDefragmentResponse) -> Self {
352        Self(resp)
353    }
354
355    /// Get response header.
356    #[inline]
357    pub fn header(&self) -> Option<&ResponseHeader> {
358        self.0.header.as_ref().map(From::from)
359    }
360
361    /// Takes the header out of the response, leaving a [`None`] in its place.
362    #[inline]
363    pub fn take_header(&mut self) -> Option<ResponseHeader> {
364        self.0.header.take().map(ResponseHeader::new)
365    }
366}
367
368/// Response for `hash` operation.
369#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
370#[derive(Debug, Clone)]
371#[repr(transparent)]
372pub struct HashResponse(PbHashResponse);
373
374impl HashResponse {
375    /// Create a new `HashResponse` from pb put response.
376    #[inline]
377    const fn new(resp: PbHashResponse) -> Self {
378        Self(resp)
379    }
380
381    /// Get response header.
382    #[inline]
383    pub fn header(&self) -> Option<&ResponseHeader> {
384        self.0.header.as_ref().map(From::from)
385    }
386
387    /// Takes the header out of the response, leaving a [`None`] in its place.
388    #[inline]
389    pub fn take_header(&mut self) -> Option<ResponseHeader> {
390        self.0.header.take().map(ResponseHeader::new)
391    }
392
393    /// Gets the hash value computed from the responding member's KV's backend.
394    #[inline]
395    pub fn hash(&self) -> u32 {
396        self.0.hash
397    }
398}
399
400/// Response for `hash_kv` operation.
401#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
402#[derive(Debug, Clone)]
403#[repr(transparent)]
404pub struct HashKvResponse(PbHashKvResponse);
405
406impl HashKvResponse {
407    /// Create a new `HashKvResponse` from pb put response.
408    #[inline]
409    const fn new(resp: PbHashKvResponse) -> Self {
410        Self(resp)
411    }
412
413    /// Get response header.
414    #[inline]
415    pub fn header(&self) -> Option<&ResponseHeader> {
416        self.0.header.as_ref().map(From::from)
417    }
418
419    /// Takes the header out of the response, leaving a [`None`] in its place.
420    #[inline]
421    pub fn take_header(&mut self) -> Option<ResponseHeader> {
422        self.0.header.take().map(ResponseHeader::new)
423    }
424
425    /// Gets the hash value computed from the responding member's MVCC keys up to a given revision.
426    #[inline]
427    pub fn hash(&self) -> u32 {
428        self.0.hash
429    }
430
431    /// Gets compacted revision of key-value store when hash begins.
432    #[inline]
433    pub fn compact_version(&self) -> i64 {
434        self.0.compact_revision
435    }
436}
437
438/// Response for `snapshot` operation.
439#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
440#[derive(Debug, Clone)]
441#[repr(transparent)]
442pub struct SnapshotResponse(PbSnapshotResponse);
443
444impl SnapshotResponse {
445    /// Create a new `SnapshotResponse` from pb put response.
446    #[inline]
447    const fn new(resp: PbSnapshotResponse) -> Self {
448        Self(resp)
449    }
450
451    /// Get response header.
452    #[inline]
453    pub fn header(&self) -> Option<&ResponseHeader> {
454        self.0.header.as_ref().map(From::from)
455    }
456
457    /// Takes the header out of the response, leaving a [`None`] in its place.
458    #[inline]
459    pub fn take_header(&mut self) -> Option<ResponseHeader> {
460        self.0.header.take().map(ResponseHeader::new)
461    }
462
463    /// Get remaining bytes.
464    #[inline]
465    pub fn remaining_bytes(&self) -> u64 {
466        self.0.remaining_bytes
467    }
468
469    /// The next chunk of the snapshot in the snapshot stream.
470    #[inline]
471    pub fn blob(&self) -> &[u8] {
472        &self.0.blob
473    }
474}
475
476/// Response for `snapshot` operation.
477#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
478#[derive(Debug)]
479#[repr(transparent)]
480pub struct SnapshotStreaming(PbStreaming<PbSnapshotResponse>);
481
482impl SnapshotStreaming {
483    /// Fetches the next message from this stream.
484    #[inline]
485    pub async fn message(&mut self) -> Result<Option<SnapshotResponse>> {
486        let ret = self.0.message().await?;
487        match ret {
488            Some(rsp) => Ok(Some(SnapshotResponse::new(rsp))),
489            None => Ok(None),
490        }
491    }
492}
493
494/// Options for `MoveLeader` operation.
495#[derive(Debug, Default, Clone)]
496#[repr(transparent)]
497pub struct MoveLeaderOptions(PbMoveLeaderRequest);
498
499impl MoveLeaderOptions {
500    /// Sets target_id
501    #[inline]
502    const fn with_target_id(mut self, target_id: u64) -> Self {
503        self.0.target_id = target_id;
504        self
505    }
506
507    /// Creates a `MoveLeaderOptions`.
508    #[inline]
509    pub const fn new() -> Self {
510        Self(PbMoveLeaderRequest { target_id: 0 })
511    }
512}
513
514impl From<MoveLeaderOptions> for PbMoveLeaderRequest {
515    #[inline]
516    fn from(options: MoveLeaderOptions) -> Self {
517        options.0
518    }
519}
520
521impl IntoRequest<PbMoveLeaderRequest> for MoveLeaderOptions {
522    #[inline]
523    fn into_request(self) -> Request<PbMoveLeaderRequest> {
524        Request::new(self.into())
525    }
526}
527
528/// Response for `MoveLeader` operation.
529#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
530#[derive(Debug, Clone)]
531#[repr(transparent)]
532pub struct MoveLeaderResponse(PbMoveLeaderResponse);
533
534impl MoveLeaderResponse {
535    #[inline]
536    const fn new(resp: PbMoveLeaderResponse) -> Self {
537        Self(resp)
538    }
539
540    /// Get response header.
541    #[inline]
542    pub fn header(&self) -> Option<&ResponseHeader> {
543        self.0.header.as_ref().map(From::from)
544    }
545
546    /// Takes the header out of the response, leaving a [`None`] in its place.
547    #[inline]
548    pub fn take_header(&mut self) -> Option<ResponseHeader> {
549        self.0.header.take().map(ResponseHeader::new)
550    }
551}
552
553impl MaintenanceClient {
554    /// Creates a maintenance client.
555    #[inline]
556    pub(crate) fn new(channel: InterceptedChannel) -> Self {
557        let inner = PbMaintenanceClient::new(channel);
558        Self { inner }
559    }
560
561    /// Get or active or inactive alarm.
562    #[inline]
563    pub async fn alarm(
564        &mut self,
565        alarm_action: AlarmAction,
566        alarm_type: AlarmType,
567        options: Option<AlarmOptions>,
568    ) -> Result<AlarmResponse> {
569        let resp = self
570            .inner
571            .alarm(
572                options
573                    .unwrap_or_default()
574                    .with_action_and_type(alarm_action, alarm_type),
575            )
576            .await?
577            .into_inner();
578        Ok(AlarmResponse::new(resp))
579    }
580
581    /// Get status of a member.
582    #[inline]
583    pub async fn status(&mut self) -> Result<StatusResponse> {
584        let resp = self.inner.status(StatusOptions::new()).await?.into_inner();
585        Ok(StatusResponse::new(resp))
586    }
587
588    /// Defragment a member's backend database to recover storage space.
589    #[inline]
590    pub async fn defragment(&mut self) -> Result<DefragmentResponse> {
591        let resp = self
592            .inner
593            .defragment(DefragmentOptions::new())
594            .await?
595            .into_inner();
596        Ok(DefragmentResponse::new(resp))
597    }
598
599    /// Computes the hash of whole backend keyspace.
600    /// including key, lease, and other buckets in storage.
601    /// This is designed for testing ONLY!
602    #[inline]
603    pub async fn hash(&mut self) -> Result<HashResponse> {
604        let resp = self.inner.hash(HashOptions::new()).await?.into_inner();
605        Ok(HashResponse::new(resp))
606    }
607
608    /// Computes the hash of all MVCC keys up to a given revision.
609    /// It only iterates \"key\" bucket in backend storage.
610    #[inline]
611    pub async fn hash_kv(&mut self, revision: i64) -> Result<HashKvResponse> {
612        let resp = self
613            .inner
614            .hash_kv(HashKvOptions::new(revision))
615            .await?
616            .into_inner();
617        Ok(HashKvResponse::new(resp))
618    }
619
620    /// Gets a snapshot of the entire backend from a member over a stream to a client.
621    #[inline]
622    pub async fn snapshot(&mut self) -> Result<SnapshotStreaming> {
623        let resp = self
624            .inner
625            .snapshot(SnapshotOptions::new())
626            .await?
627            .into_inner();
628        Ok(SnapshotStreaming(resp))
629    }
630
631    /// Moves the current leader node to target node.
632    #[inline]
633    pub async fn move_leader(&mut self, target_id: u64) -> Result<MoveLeaderResponse> {
634        let resp = self
635            .inner
636            .move_leader(MoveLeaderOptions::new().with_target_id(target_id))
637            .await?
638            .into_inner();
639        Ok(MoveLeaderResponse::new(resp))
640    }
641}