etcd_client/rpc/
election.rs

1//! Etcd Election RPC.
2
3use crate::error::Result;
4use crate::intercept::InterceptedChannel;
5use crate::rpc::pb::v3electionpb::election_client::ElectionClient as PbElectionClient;
6use crate::rpc::pb::v3electionpb::{
7    CampaignRequest as PbCampaignRequest, CampaignResponse as PbCampaignResponse,
8    LeaderKey as PbLeaderKey, LeaderRequest as PbLeaderRequest, LeaderResponse as PbLeaderResponse,
9    ProclaimRequest as PbProclaimRequest, ProclaimResponse as PbProclaimResponse,
10    ResignRequest as PbResignRequest, ResignResponse as PbResignResponse,
11};
12use crate::rpc::{KeyValue, ResponseHeader};
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use tokio_stream::Stream;
16use tonic::{IntoRequest, Request, Streaming};
17
18/// Client for Elect operations.
19#[repr(transparent)]
20#[derive(Clone)]
21pub struct ElectionClient {
22    inner: PbElectionClient<InterceptedChannel>,
23}
24
25/// Options for `campaign` operation.
26#[derive(Debug, Default, Clone)]
27#[repr(transparent)]
28pub struct CampaignOptions(PbCampaignRequest);
29
30impl CampaignOptions {
31    #[inline]
32    pub const fn new() -> Self {
33        Self(PbCampaignRequest {
34            name: Vec::new(),
35            lease: 0,
36            value: Vec::new(),
37        })
38    }
39
40    /// Name is the election's identifier for the campaign.
41    #[inline]
42    fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
43        self.0.name = name.into();
44        self
45    }
46
47    /// Lease is the ID of the lease attached to leadership of the election
48    #[inline]
49    const fn with_lease(mut self, lease: i64) -> Self {
50        self.0.lease = lease;
51        self
52    }
53
54    /// Value is the initial proclaimed value set when the campaigner wins the election.
55    #[inline]
56    fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
57        self.0.value = value.into();
58        self
59    }
60}
61
62impl From<CampaignOptions> for PbCampaignRequest {
63    #[inline]
64    fn from(options: CampaignOptions) -> Self {
65        options.0
66    }
67}
68
69impl IntoRequest<PbCampaignRequest> for CampaignOptions {
70    #[inline]
71    fn into_request(self) -> Request<PbCampaignRequest> {
72        Request::new(self.into())
73    }
74}
75
76/// Options for `proclaim` operation.
77#[derive(Debug, Default, Clone)]
78#[repr(transparent)]
79pub struct ProclaimOptions(PbProclaimRequest);
80
81impl ProclaimOptions {
82    #[inline]
83    pub const fn new() -> Self {
84        Self(PbProclaimRequest {
85            leader: None,
86            value: Vec::new(),
87        })
88    }
89
90    /// The initial proclaimed value set when the campaigner wins the election.
91    #[inline]
92    fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
93        self.0.value = value.into();
94        self
95    }
96
97    /// The leadership hold on the election.
98    #[inline]
99    pub fn with_leader(mut self, leader: LeaderKey) -> Self {
100        self.0.leader = Some(leader.into());
101        self
102    }
103}
104
105impl From<ProclaimOptions> for PbProclaimRequest {
106    #[inline]
107    fn from(options: ProclaimOptions) -> Self {
108        options.0
109    }
110}
111
112impl IntoRequest<PbProclaimRequest> for ProclaimOptions {
113    #[inline]
114    fn into_request(self) -> Request<PbProclaimRequest> {
115        Request::new(self.into())
116    }
117}
118
119/// Options for `leader` operation.
120#[derive(Debug, Default, Clone)]
121#[repr(transparent)]
122pub struct LeaderOptions(PbLeaderRequest);
123
124impl LeaderOptions {
125    #[inline]
126    pub const fn new() -> Self {
127        Self(PbLeaderRequest { name: Vec::new() })
128    }
129
130    /// Name is the election identifier for the leadership information.
131    #[inline]
132    pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
133        self.0.name = name.into();
134        self
135    }
136}
137
138impl From<LeaderOptions> for PbLeaderRequest {
139    #[inline]
140    fn from(options: LeaderOptions) -> Self {
141        options.0
142    }
143}
144
145impl IntoRequest<PbLeaderRequest> for LeaderOptions {
146    #[inline]
147    fn into_request(self) -> Request<PbLeaderRequest> {
148        Request::new(self.into())
149    }
150}
151
152/// Options for `resign` operation.
153#[derive(Debug, Default, Clone)]
154#[repr(transparent)]
155pub struct ResignOptions(PbResignRequest);
156
157impl ResignOptions {
158    #[inline]
159    pub const fn new() -> Self {
160        Self(PbResignRequest { leader: None })
161    }
162
163    /// The leadership to relinquish by resignation.
164    #[inline]
165    pub fn with_leader(mut self, leader: LeaderKey) -> Self {
166        self.0.leader = Some(leader.into());
167        self
168    }
169}
170
171impl From<ResignOptions> for PbResignRequest {
172    #[inline]
173    fn from(options: ResignOptions) -> Self {
174        options.0
175    }
176}
177
178impl IntoRequest<PbResignRequest> for ResignOptions {
179    #[inline]
180    fn into_request(self) -> Request<PbResignRequest> {
181        Request::new(self.into())
182    }
183}
184
185/// Response for `Campaign` operation.
186#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
187#[derive(Debug, Clone)]
188#[repr(transparent)]
189pub struct CampaignResponse(PbCampaignResponse);
190
191impl CampaignResponse {
192    #[inline]
193    const fn new(resp: PbCampaignResponse) -> Self {
194        Self(resp)
195    }
196
197    /// Get response header.
198    #[inline]
199    pub fn header(&self) -> Option<&ResponseHeader> {
200        self.0.header.as_ref().map(From::from)
201    }
202
203    /// Takes the header out of the response, leaving a [`None`] in its place.
204    #[inline]
205    pub fn take_header(&mut self) -> Option<ResponseHeader> {
206        self.0.header.take().map(ResponseHeader::new)
207    }
208
209    /// Describes the resources used for holding leadership of the election.
210    #[inline]
211    pub fn leader(&self) -> Option<&LeaderKey> {
212        self.0.leader.as_ref().map(From::from)
213    }
214
215    /// Takes the leader out of the response, leaving a [`None`] in its place.
216    #[inline]
217    pub fn take_leader(&mut self) -> Option<LeaderKey> {
218        self.0.leader.take().map(From::from)
219    }
220}
221
222/// Response for `Proclaim` operation.
223#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
224#[derive(Debug, Clone)]
225#[repr(transparent)]
226pub struct ProclaimResponse(PbProclaimResponse);
227
228impl ProclaimResponse {
229    #[inline]
230    const fn new(resp: PbProclaimResponse) -> Self {
231        Self(resp)
232    }
233
234    /// Gets response header.
235    #[inline]
236    pub fn header(&self) -> Option<&ResponseHeader> {
237        self.0.header.as_ref().map(From::from)
238    }
239
240    /// Takes the header out of the response, leaving a [`None`] in its place.
241    #[inline]
242    pub fn take_header(&mut self) -> Option<ResponseHeader> {
243        self.0.header.take().map(ResponseHeader::new)
244    }
245}
246
247/// Response for `Leader` operation.
248#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
249#[derive(Debug, Clone)]
250#[repr(transparent)]
251pub struct LeaderResponse(PbLeaderResponse);
252
253impl LeaderResponse {
254    #[inline]
255    const fn new(resp: PbLeaderResponse) -> Self {
256        Self(resp)
257    }
258
259    /// Gets response header.
260    #[inline]
261    pub fn header(&self) -> Option<&ResponseHeader> {
262        self.0.header.as_ref().map(From::from)
263    }
264
265    /// Takes the header out of the response, leaving a [`None`] in its place.
266    #[inline]
267    pub fn take_header(&mut self) -> Option<ResponseHeader> {
268        self.0.header.take().map(ResponseHeader::new)
269    }
270
271    /// The key-value pair representing the latest leader update.
272    #[inline]
273    pub fn kv(&self) -> Option<&KeyValue> {
274        self.0.kv.as_ref().map(From::from)
275    }
276
277    /// Takes the kv out of the response, leaving a [`None`] in its place.
278    #[inline]
279    pub fn take_kv(&mut self) -> Option<KeyValue> {
280        self.0.kv.take().map(KeyValue::new)
281    }
282}
283
284/// Response for `Observe` operation.
285#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
286#[derive(Debug)]
287pub struct ObserveStream {
288    stream: Streaming<PbLeaderResponse>,
289}
290
291impl ObserveStream {
292    #[inline]
293    const fn new(stream: Streaming<PbLeaderResponse>) -> Self {
294        Self { stream }
295    }
296
297    /// Fetches the next message from this stream.
298    #[inline]
299    pub async fn message(&mut self) -> Result<Option<LeaderResponse>> {
300        match self.stream.message().await? {
301            Some(resp) => Ok(Some(LeaderResponse::new(resp))),
302            None => Ok(None),
303        }
304    }
305}
306
307impl Stream for ObserveStream {
308    type Item = Result<LeaderResponse>;
309
310    #[inline]
311    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312        Pin::new(&mut self.get_mut().stream)
313            .poll_next(cx)
314            .map(|t| match t {
315                Some(Ok(resp)) => Some(Ok(LeaderResponse::new(resp))),
316                Some(Err(e)) => Some(Err(From::from(e))),
317                None => None,
318            })
319    }
320}
321
322/// Response for `Resign` operation.
323#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
324#[derive(Debug, Clone)]
325#[repr(transparent)]
326pub struct ResignResponse(PbResignResponse);
327
328impl ResignResponse {
329    #[inline]
330    const fn new(resp: PbResignResponse) -> Self {
331        Self(resp)
332    }
333
334    /// Gets response header.
335    #[inline]
336    pub fn header(&self) -> Option<&ResponseHeader> {
337        self.0.header.as_ref().map(From::from)
338    }
339
340    /// Takes the header out of the response, leaving a [`None`] in its place.
341    #[inline]
342    pub fn take_header(&mut self) -> Option<ResponseHeader> {
343        self.0.header.take().map(ResponseHeader::new)
344    }
345}
346
347/// Leader key of election
348#[derive(Debug, Clone)]
349#[repr(transparent)]
350pub struct LeaderKey(PbLeaderKey);
351
352impl LeaderKey {
353    /// Creates a new leader key.
354    #[inline]
355    pub const fn new() -> Self {
356        Self(PbLeaderKey {
357            name: Vec::new(),
358            key: Vec::new(),
359            rev: 0,
360            lease: 0,
361        })
362    }
363
364    /// The election identifier that corresponds to the leadership key.
365    #[inline]
366    pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
367        self.0.name = name.into();
368        self
369    }
370
371    /// An opaque key representing the ownership of the election.
372    #[inline]
373    pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
374        self.0.key = key.into();
375        self
376    }
377
378    /// The creation revision of the key
379    #[inline]
380    pub const fn with_rev(mut self, rev: i64) -> Self {
381        self.0.rev = rev;
382        self
383    }
384
385    /// The lease ID of the election leader.
386    #[inline]
387    pub const fn with_lease(mut self, lease: i64) -> Self {
388        self.0.lease = lease;
389        self
390    }
391
392    /// The name in byte. name is the election identifier that corresponds to the leadership key.
393    #[inline]
394    pub fn name(&self) -> &[u8] {
395        &self.0.name
396    }
397
398    /// The name in string. name is the election identifier that corresponds to the leadership key.
399    #[inline]
400    pub fn name_str(&self) -> Result<&str> {
401        std::str::from_utf8(self.name()).map_err(From::from)
402    }
403
404    /// The name in string. name is the election identifier that corresponds to the leadership key.
405    ///
406    /// # Safety
407    /// This function is unsafe because it does not check that the bytes of the key are valid UTF-8.
408    /// If this constraint is violated, undefined behavior results,
409    /// as the rest of Rust assumes that [`&str`]s are valid UTF-8.
410    #[inline]
411    pub unsafe fn name_str_unchecked(&self) -> &str {
412        std::str::from_utf8_unchecked(self.name())
413    }
414
415    /// The key in byte. key is an opaque key representing the ownership of the election. If the key
416    /// is deleted, then leadership is lost.
417    #[inline]
418    pub fn key(&self) -> &[u8] {
419        &self.0.key
420    }
421
422    /// The key in string. key is an opaque key representing the ownership of the election. If the key
423    /// is deleted, then leadership is lost.
424    #[inline]
425    pub fn key_str(&self) -> Result<&str> {
426        std::str::from_utf8(self.key()).map_err(From::from)
427    }
428
429    /// The key in string. key is an opaque key representing the ownership of the election. If the key
430    /// is deleted, then leadership is lost.
431    ///
432    /// # Safety
433    /// This function is unsafe because it does not check that the bytes of the key are valid UTF-8.
434    /// If this constraint is violated, undefined behavior results,
435    /// as the rest of Rust assumes that [`&str`]s are valid UTF-8.
436    #[inline]
437    pub unsafe fn key_str_unchecked(&self) -> &str {
438        std::str::from_utf8_unchecked(self.key())
439    }
440
441    /// The creation revision of the key.  It can be used to test for ownership
442    /// of an election during transactions by testing the key's creation revision
443    /// matches rev.
444    #[inline]
445    pub const fn rev(&self) -> i64 {
446        self.0.rev
447    }
448
449    /// The lease ID of the election leader.
450    #[inline]
451    pub const fn lease(&self) -> i64 {
452        self.0.lease
453    }
454}
455
456impl Default for LeaderKey {
457    #[inline]
458    fn default() -> Self {
459        Self::new()
460    }
461}
462
463impl From<LeaderKey> for PbLeaderKey {
464    #[inline]
465    fn from(leader_key: LeaderKey) -> Self {
466        leader_key.0
467    }
468}
469
470impl From<PbLeaderKey> for LeaderKey {
471    #[inline]
472    fn from(key: PbLeaderKey) -> Self {
473        Self(key)
474    }
475}
476
477impl From<&PbLeaderKey> for &LeaderKey {
478    #[inline]
479    fn from(src: &PbLeaderKey) -> Self {
480        unsafe { &*(src as *const _ as *const LeaderKey) }
481    }
482}
483
484impl ElectionClient {
485    /// Creates a election
486    #[inline]
487    pub(crate) fn new(channel: InterceptedChannel) -> Self {
488        let inner = PbElectionClient::new(channel);
489        Self { inner }
490    }
491
492    /// Puts a value as eligible for the election on the prefix key.
493    /// Multiple sessions can participate in the election for the
494    /// same prefix, but only one can be the leader at a time.
495    #[inline]
496    pub async fn campaign(
497        &mut self,
498        name: impl Into<Vec<u8>>,
499        value: impl Into<Vec<u8>>,
500        lease: i64,
501    ) -> Result<CampaignResponse> {
502        let resp = self
503            .inner
504            .campaign(
505                CampaignOptions::new()
506                    .with_name(name)
507                    .with_value(value)
508                    .with_lease(lease),
509            )
510            .await?
511            .into_inner();
512        Ok(CampaignResponse::new(resp))
513    }
514
515    /// Lets the leader announce a new value without another election.
516    #[inline]
517    pub async fn proclaim(
518        &mut self,
519        value: impl Into<Vec<u8>>,
520        options: Option<ProclaimOptions>,
521    ) -> Result<ProclaimResponse> {
522        let resp = self
523            .inner
524            .proclaim(options.unwrap_or_default().with_value(value))
525            .await?
526            .into_inner();
527        Ok(ProclaimResponse::new(resp))
528    }
529
530    /// Returns the leader value for the current election.
531    #[inline]
532    pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
533        let resp = self
534            .inner
535            .leader(LeaderOptions::new().with_name(name))
536            .await?
537            .into_inner();
538        Ok(LeaderResponse::new(resp))
539    }
540
541    /// Returns a channel that reliably observes ordered leader proposals
542    /// as GetResponse values on every current elected leader key.
543    #[inline]
544    pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
545        let resp = self
546            .inner
547            .observe(LeaderOptions::new().with_name(name))
548            .await?
549            .into_inner();
550
551        Ok(ObserveStream::new(resp))
552    }
553
554    /// Releases election leadership and then start a new election
555    #[inline]
556    pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
557        let resp = self
558            .inner
559            .resign(option.unwrap_or_default())
560            .await?
561            .into_inner();
562        Ok(ResignResponse::new(resp))
563    }
564}