etcd_client/rpc/
election.rs

1//! Etcd Election RPC.
2
3use crate::auth::AuthService;
4use crate::error::Result;
5use crate::intercept::InterceptedChannel;
6use crate::rpc::pb::v3electionpb::election_client::ElectionClient as PbElectionClient;
7use crate::rpc::pb::v3electionpb::{
8    CampaignRequest as PbCampaignRequest, CampaignResponse as PbCampaignResponse,
9    LeaderKey as PbLeaderKey, LeaderRequest as PbLeaderRequest, LeaderResponse as PbLeaderResponse,
10    ProclaimRequest as PbProclaimRequest, ProclaimResponse as PbProclaimResponse,
11    ResignRequest as PbResignRequest, ResignResponse as PbResignResponse,
12};
13use crate::rpc::{KeyValue, ResponseHeader};
14use http::HeaderValue;
15use std::sync::RwLock;
16use std::task::{Context, Poll};
17use std::{pin::Pin, sync::Arc};
18use tokio_stream::Stream;
19use tonic::{IntoRequest, Request, Streaming};
20
21/// Client for Elect operations.
22#[repr(transparent)]
23#[derive(Clone)]
24pub struct ElectionClient {
25    inner: PbElectionClient<AuthService<InterceptedChannel>>,
26}
27
28/// Options for `campaign` operation.
29#[derive(Debug, Default, Clone)]
30#[repr(transparent)]
31pub struct CampaignOptions(PbCampaignRequest);
32
33impl CampaignOptions {
34    #[inline]
35    pub const fn new() -> Self {
36        Self(PbCampaignRequest {
37            name: Vec::new(),
38            lease: 0,
39            value: Vec::new(),
40        })
41    }
42
43    /// Name is the election's identifier for the campaign.
44    #[inline]
45    fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
46        self.0.name = name.into();
47        self
48    }
49
50    /// Lease is the ID of the lease attached to leadership of the election
51    #[inline]
52    const fn with_lease(mut self, lease: i64) -> Self {
53        self.0.lease = lease;
54        self
55    }
56
57    /// Value is the initial proclaimed value set when the campaigner wins the election.
58    #[inline]
59    fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
60        self.0.value = value.into();
61        self
62    }
63}
64
65impl From<CampaignOptions> for PbCampaignRequest {
66    #[inline]
67    fn from(options: CampaignOptions) -> Self {
68        options.0
69    }
70}
71
72impl IntoRequest<PbCampaignRequest> for CampaignOptions {
73    #[inline]
74    fn into_request(self) -> Request<PbCampaignRequest> {
75        Request::new(self.into())
76    }
77}
78
79/// Options for `proclaim` operation.
80#[derive(Debug, Default, Clone)]
81#[repr(transparent)]
82pub struct ProclaimOptions(PbProclaimRequest);
83
84impl ProclaimOptions {
85    #[inline]
86    pub const fn new() -> Self {
87        Self(PbProclaimRequest {
88            leader: None,
89            value: Vec::new(),
90        })
91    }
92
93    /// The initial proclaimed value set when the campaigner wins the election.
94    #[inline]
95    fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
96        self.0.value = value.into();
97        self
98    }
99
100    /// The leadership hold on the election.
101    #[inline]
102    pub fn with_leader(mut self, leader: LeaderKey) -> Self {
103        self.0.leader = Some(leader.into());
104        self
105    }
106}
107
108impl From<ProclaimOptions> for PbProclaimRequest {
109    #[inline]
110    fn from(options: ProclaimOptions) -> Self {
111        options.0
112    }
113}
114
115impl IntoRequest<PbProclaimRequest> for ProclaimOptions {
116    #[inline]
117    fn into_request(self) -> Request<PbProclaimRequest> {
118        Request::new(self.into())
119    }
120}
121
122/// Options for `leader` operation.
123#[derive(Debug, Default, Clone)]
124#[repr(transparent)]
125pub struct LeaderOptions(PbLeaderRequest);
126
127impl LeaderOptions {
128    #[inline]
129    pub const fn new() -> Self {
130        Self(PbLeaderRequest { name: Vec::new() })
131    }
132
133    /// Name is the election identifier for the leadership information.
134    #[inline]
135    pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
136        self.0.name = name.into();
137        self
138    }
139}
140
141impl From<LeaderOptions> for PbLeaderRequest {
142    #[inline]
143    fn from(options: LeaderOptions) -> Self {
144        options.0
145    }
146}
147
148impl IntoRequest<PbLeaderRequest> for LeaderOptions {
149    #[inline]
150    fn into_request(self) -> Request<PbLeaderRequest> {
151        Request::new(self.into())
152    }
153}
154
155/// Options for `resign` operation.
156#[derive(Debug, Default, Clone)]
157#[repr(transparent)]
158pub struct ResignOptions(PbResignRequest);
159
160impl ResignOptions {
161    #[inline]
162    pub const fn new() -> Self {
163        Self(PbResignRequest { leader: None })
164    }
165
166    /// The leadership to relinquish by resignation.
167    #[inline]
168    pub fn with_leader(mut self, leader: LeaderKey) -> Self {
169        self.0.leader = Some(leader.into());
170        self
171    }
172}
173
174impl From<ResignOptions> for PbResignRequest {
175    #[inline]
176    fn from(options: ResignOptions) -> Self {
177        options.0
178    }
179}
180
181impl IntoRequest<PbResignRequest> for ResignOptions {
182    #[inline]
183    fn into_request(self) -> Request<PbResignRequest> {
184        Request::new(self.into())
185    }
186}
187
188/// Response for `Campaign` operation.
189#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
190#[derive(Debug, Clone)]
191#[repr(transparent)]
192pub struct CampaignResponse(PbCampaignResponse);
193
194impl CampaignResponse {
195    #[inline]
196    const fn new(resp: PbCampaignResponse) -> Self {
197        Self(resp)
198    }
199
200    /// Get response header.
201    #[inline]
202    pub fn header(&self) -> Option<&ResponseHeader> {
203        self.0.header.as_ref().map(From::from)
204    }
205
206    /// Takes the header out of the response, leaving a [`None`] in its place.
207    #[inline]
208    pub fn take_header(&mut self) -> Option<ResponseHeader> {
209        self.0.header.take().map(ResponseHeader::new)
210    }
211
212    /// Describes the resources used for holding leadership of the election.
213    #[inline]
214    pub fn leader(&self) -> Option<&LeaderKey> {
215        self.0.leader.as_ref().map(From::from)
216    }
217
218    /// Takes the leader out of the response, leaving a [`None`] in its place.
219    #[inline]
220    pub fn take_leader(&mut self) -> Option<LeaderKey> {
221        self.0.leader.take().map(From::from)
222    }
223}
224
225/// Response for `Proclaim` operation.
226#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
227#[derive(Debug, Clone)]
228#[repr(transparent)]
229pub struct ProclaimResponse(PbProclaimResponse);
230
231impl ProclaimResponse {
232    #[inline]
233    const fn new(resp: PbProclaimResponse) -> Self {
234        Self(resp)
235    }
236
237    /// Gets response header.
238    #[inline]
239    pub fn header(&self) -> Option<&ResponseHeader> {
240        self.0.header.as_ref().map(From::from)
241    }
242
243    /// Takes the header out of the response, leaving a [`None`] in its place.
244    #[inline]
245    pub fn take_header(&mut self) -> Option<ResponseHeader> {
246        self.0.header.take().map(ResponseHeader::new)
247    }
248}
249
250/// Response for `Leader` operation.
251#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
252#[derive(Debug, Clone)]
253#[repr(transparent)]
254pub struct LeaderResponse(PbLeaderResponse);
255
256impl LeaderResponse {
257    #[inline]
258    const fn new(resp: PbLeaderResponse) -> Self {
259        Self(resp)
260    }
261
262    /// Gets response header.
263    #[inline]
264    pub fn header(&self) -> Option<&ResponseHeader> {
265        self.0.header.as_ref().map(From::from)
266    }
267
268    /// Takes the header out of the response, leaving a [`None`] in its place.
269    #[inline]
270    pub fn take_header(&mut self) -> Option<ResponseHeader> {
271        self.0.header.take().map(ResponseHeader::new)
272    }
273
274    /// The key-value pair representing the latest leader update.
275    #[inline]
276    pub fn kv(&self) -> Option<&KeyValue> {
277        self.0.kv.as_ref().map(From::from)
278    }
279
280    /// Takes the kv out of the response, leaving a [`None`] in its place.
281    #[inline]
282    pub fn take_kv(&mut self) -> Option<KeyValue> {
283        self.0.kv.take().map(KeyValue::new)
284    }
285}
286
287/// Response for `Observe` operation.
288#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
289#[derive(Debug)]
290pub struct ObserveStream {
291    stream: Streaming<PbLeaderResponse>,
292}
293
294impl ObserveStream {
295    #[inline]
296    const fn new(stream: Streaming<PbLeaderResponse>) -> Self {
297        Self { stream }
298    }
299
300    /// Fetches the next message from this stream.
301    #[inline]
302    pub async fn message(&mut self) -> Result<Option<LeaderResponse>> {
303        match self.stream.message().await? {
304            Some(resp) => Ok(Some(LeaderResponse::new(resp))),
305            None => Ok(None),
306        }
307    }
308}
309
310impl Stream for ObserveStream {
311    type Item = Result<LeaderResponse>;
312
313    #[inline]
314    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
315        Pin::new(&mut self.get_mut().stream)
316            .poll_next(cx)
317            .map(|t| match t {
318                Some(Ok(resp)) => Some(Ok(LeaderResponse::new(resp))),
319                Some(Err(e)) => Some(Err(From::from(e))),
320                None => None,
321            })
322    }
323}
324
325/// Response for `Resign` operation.
326#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
327#[derive(Debug, Clone)]
328#[repr(transparent)]
329pub struct ResignResponse(PbResignResponse);
330
331impl ResignResponse {
332    #[inline]
333    const fn new(resp: PbResignResponse) -> Self {
334        Self(resp)
335    }
336
337    /// Gets response header.
338    #[inline]
339    pub fn header(&self) -> Option<&ResponseHeader> {
340        self.0.header.as_ref().map(From::from)
341    }
342
343    /// Takes the header out of the response, leaving a [`None`] in its place.
344    #[inline]
345    pub fn take_header(&mut self) -> Option<ResponseHeader> {
346        self.0.header.take().map(ResponseHeader::new)
347    }
348}
349
350/// Leader key of election
351#[derive(Debug, Clone)]
352#[repr(transparent)]
353pub struct LeaderKey(PbLeaderKey);
354
355impl LeaderKey {
356    /// Creates a new leader key.
357    #[inline]
358    pub const fn new() -> Self {
359        Self(PbLeaderKey {
360            name: Vec::new(),
361            key: Vec::new(),
362            rev: 0,
363            lease: 0,
364        })
365    }
366
367    /// The election identifier that corresponds to the leadership key.
368    #[inline]
369    pub fn with_name(mut self, name: impl Into<Vec<u8>>) -> Self {
370        self.0.name = name.into();
371        self
372    }
373
374    /// An opaque key representing the ownership of the election.
375    #[inline]
376    pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
377        self.0.key = key.into();
378        self
379    }
380
381    /// The creation revision of the key
382    #[inline]
383    pub const fn with_rev(mut self, rev: i64) -> Self {
384        self.0.rev = rev;
385        self
386    }
387
388    /// The lease ID of the election leader.
389    #[inline]
390    pub const fn with_lease(mut self, lease: i64) -> Self {
391        self.0.lease = lease;
392        self
393    }
394
395    /// The name in byte. name is the election identifier that corresponds to the leadership key.
396    #[inline]
397    pub fn name(&self) -> &[u8] {
398        &self.0.name
399    }
400
401    /// The name in string. name is the election identifier that corresponds to the leadership key.
402    #[inline]
403    pub fn name_str(&self) -> Result<&str> {
404        std::str::from_utf8(self.name()).map_err(From::from)
405    }
406
407    /// The name in string. name is the election identifier that corresponds to the leadership key.
408    ///
409    /// # Safety
410    /// This function is unsafe because it does not check that the bytes of the key are valid UTF-8.
411    /// If this constraint is violated, undefined behavior results,
412    /// as the rest of Rust assumes that [`&str`]s are valid UTF-8.
413    #[inline]
414    pub unsafe fn name_str_unchecked(&self) -> &str {
415        std::str::from_utf8_unchecked(self.name())
416    }
417
418    /// The key in byte. key is an opaque key representing the ownership of the election. If the key
419    /// is deleted, then leadership is lost.
420    #[inline]
421    pub fn key(&self) -> &[u8] {
422        &self.0.key
423    }
424
425    /// The key in string. key is an opaque key representing the ownership of the election. If the key
426    /// is deleted, then leadership is lost.
427    #[inline]
428    pub fn key_str(&self) -> Result<&str> {
429        std::str::from_utf8(self.key()).map_err(From::from)
430    }
431
432    /// The key in string. key is an opaque key representing the ownership of the election. If the key
433    /// is deleted, then leadership is lost.
434    ///
435    /// # Safety
436    /// This function is unsafe because it does not check that the bytes of the key are valid UTF-8.
437    /// If this constraint is violated, undefined behavior results,
438    /// as the rest of Rust assumes that [`&str`]s are valid UTF-8.
439    #[inline]
440    pub unsafe fn key_str_unchecked(&self) -> &str {
441        std::str::from_utf8_unchecked(self.key())
442    }
443
444    /// The creation revision of the key.  It can be used to test for ownership
445    /// of an election during transactions by testing the key's creation revision
446    /// matches rev.
447    #[inline]
448    pub const fn rev(&self) -> i64 {
449        self.0.rev
450    }
451
452    /// The lease ID of the election leader.
453    #[inline]
454    pub const fn lease(&self) -> i64 {
455        self.0.lease
456    }
457}
458
459impl Default for LeaderKey {
460    #[inline]
461    fn default() -> Self {
462        Self::new()
463    }
464}
465
466impl From<LeaderKey> for PbLeaderKey {
467    #[inline]
468    fn from(leader_key: LeaderKey) -> Self {
469        leader_key.0
470    }
471}
472
473impl From<PbLeaderKey> for LeaderKey {
474    #[inline]
475    fn from(key: PbLeaderKey) -> Self {
476        Self(key)
477    }
478}
479
480impl From<&PbLeaderKey> for &LeaderKey {
481    #[inline]
482    fn from(src: &PbLeaderKey) -> Self {
483        unsafe { &*(src as *const _ as *const LeaderKey) }
484    }
485}
486
487impl ElectionClient {
488    /// Creates a election
489    #[inline]
490    pub(crate) fn new(
491        channel: InterceptedChannel,
492        auth_token: Arc<RwLock<Option<HeaderValue>>>,
493    ) -> Self {
494        let inner = PbElectionClient::new(AuthService::new(channel, auth_token));
495        Self { inner }
496    }
497
498    /// Puts a value as eligible for the election on the prefix key.
499    /// Multiple sessions can participate in the election for the
500    /// same prefix, but only one can be the leader at a time.
501    #[inline]
502    pub async fn campaign(
503        &mut self,
504        name: impl Into<Vec<u8>>,
505        value: impl Into<Vec<u8>>,
506        lease: i64,
507    ) -> Result<CampaignResponse> {
508        let resp = self
509            .inner
510            .campaign(
511                CampaignOptions::new()
512                    .with_name(name)
513                    .with_value(value)
514                    .with_lease(lease),
515            )
516            .await?
517            .into_inner();
518        Ok(CampaignResponse::new(resp))
519    }
520
521    /// Lets the leader announce a new value without another election.
522    #[inline]
523    pub async fn proclaim(
524        &mut self,
525        value: impl Into<Vec<u8>>,
526        options: Option<ProclaimOptions>,
527    ) -> Result<ProclaimResponse> {
528        let resp = self
529            .inner
530            .proclaim(options.unwrap_or_default().with_value(value))
531            .await?
532            .into_inner();
533        Ok(ProclaimResponse::new(resp))
534    }
535
536    /// Returns the leader value for the current election.
537    #[inline]
538    pub async fn leader(&mut self, name: impl Into<Vec<u8>>) -> Result<LeaderResponse> {
539        let resp = self
540            .inner
541            .leader(LeaderOptions::new().with_name(name))
542            .await?
543            .into_inner();
544        Ok(LeaderResponse::new(resp))
545    }
546
547    /// Returns a channel that reliably observes ordered leader proposals
548    /// as GetResponse values on every current elected leader key.
549    #[inline]
550    pub async fn observe(&mut self, name: impl Into<Vec<u8>>) -> Result<ObserveStream> {
551        let resp = self
552            .inner
553            .observe(LeaderOptions::new().with_name(name))
554            .await?
555            .into_inner();
556
557        Ok(ObserveStream::new(resp))
558    }
559
560    /// Releases election leadership and then start a new election
561    #[inline]
562    pub async fn resign(&mut self, option: Option<ResignOptions>) -> Result<ResignResponse> {
563        let resp = self
564            .inner
565            .resign(option.unwrap_or_default())
566            .await?
567            .into_inner();
568        Ok(ResignResponse::new(resp))
569    }
570}