etcd_client/rpc/
kv.rs

1//! Etcd KV Operations.
2
3pub use crate::rpc::pb::etcdserverpb::compare::CompareResult as CompareOp;
4pub use crate::rpc::pb::etcdserverpb::range_request::{SortOrder, SortTarget};
5
6use crate::error::Result;
7use crate::intercept::InterceptedChannel;
8use crate::rpc::pb::etcdserverpb::compare::{CompareTarget, TargetUnion};
9use crate::rpc::pb::etcdserverpb::kv_client::KvClient as PbKvClient;
10use crate::rpc::pb::etcdserverpb::request_op::Request as PbTxnOp;
11use crate::rpc::pb::etcdserverpb::response_op::Response as PbTxnOpResponse;
12use crate::rpc::pb::etcdserverpb::{
13    CompactionRequest as PbCompactionRequest, CompactionRequest,
14    CompactionResponse as PbCompactionResponse, Compare as PbCompare,
15    DeleteRangeRequest as PbDeleteRequest, DeleteRangeRequest,
16    DeleteRangeResponse as PbDeleteResponse, PutRequest as PbPutRequest,
17    PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
18    RequestOp as PbTxnRequestOp, TxnRequest as PbTxnRequest, TxnResponse as PbTxnResponse,
19};
20use crate::rpc::{get_prefix, KeyRange, KeyValue, ResponseHeader};
21use crate::vec::VecExt;
22use std::mem::ManuallyDrop;
23use tonic::{IntoRequest, Request};
24
25/// Client for KV operations.
26#[repr(transparent)]
27#[derive(Clone)]
28pub struct KvClient {
29    inner: PbKvClient<InterceptedChannel>,
30}
31
32impl KvClient {
33    /// Creates a kv client.
34    #[inline]
35    pub(crate) fn new(channel: InterceptedChannel) -> Self {
36        let inner = PbKvClient::new(channel);
37        Self { inner }
38    }
39
40    /// Limits the maximum size of a decoded message.
41    ///
42    /// Default: `4MB`
43    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
44        self.inner = self.inner.max_decoding_message_size(limit);
45        self
46    }
47
48    /// Limits the maximum size of an encoded message.
49    ///
50    /// Default: `usize::MAX`
51    pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
52        self.inner = self.inner.max_encoding_message_size(limit);
53        self
54    }
55
56    /// Puts the given key into the key-value store.
57    /// A put request increments the revision of the key-value store
58    /// and generates one event in the event history.
59    #[inline]
60    pub async fn put(
61        &mut self,
62        key: impl Into<Vec<u8>>,
63        value: impl Into<Vec<u8>>,
64        options: Option<PutOptions>,
65    ) -> Result<PutResponse> {
66        let resp = self
67            .inner
68            .put(options.unwrap_or_default().with_kv(key, value))
69            .await?
70            .into_inner();
71        Ok(PutResponse::new(resp))
72    }
73
74    /// Gets the key or a range of keys from the store.
75    #[inline]
76    pub async fn get(
77        &mut self,
78        key: impl Into<Vec<u8>>,
79        options: Option<GetOptions>,
80    ) -> Result<GetResponse> {
81        let resp = self
82            .inner
83            .range(options.unwrap_or_default().with_key(key.into()))
84            .await?
85            .into_inner();
86        Ok(GetResponse::new(resp))
87    }
88
89    /// Deletes the given key or a range of keys from the key-value store.
90    #[inline]
91    pub async fn delete(
92        &mut self,
93        key: impl Into<Vec<u8>>,
94        options: Option<DeleteOptions>,
95    ) -> Result<DeleteResponse> {
96        let resp = self
97            .inner
98            .delete_range(options.unwrap_or_default().with_key(key.into()))
99            .await?
100            .into_inner();
101        Ok(DeleteResponse::new(resp))
102    }
103
104    /// Compacts the event history in the etcd key-value store. The key-value
105    /// store should be periodically compacted or the event history will continue to grow
106    /// indefinitely.
107    #[inline]
108    pub async fn compact(
109        &mut self,
110        revision: i64,
111        options: Option<CompactionOptions>,
112    ) -> Result<CompactionResponse> {
113        let resp = self
114            .inner
115            .compact(options.unwrap_or_default().with_revision(revision))
116            .await?
117            .into_inner();
118        Ok(CompactionResponse::new(resp))
119    }
120
121    /// Processes multiple operations in a single transaction.
122    /// A txn request increments the revision of the key-value store
123    /// and generates events with the same revision for every completed operation.
124    /// It is not allowed to modify the same key several times within one txn.
125    #[inline]
126    pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
127        let resp = self.inner.txn(txn).await?.into_inner();
128        Ok(TxnResponse::new(resp))
129    }
130}
131
132/// Options for `Put` operation.
133#[derive(Debug, Default, Clone)]
134#[repr(transparent)]
135pub struct PutOptions(PbPutRequest);
136
137impl PutOptions {
138    /// Set key-value pair.
139    #[inline]
140    fn with_kv(mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
141        self.0.key = key.into();
142        self.0.value = value.into();
143        self
144    }
145
146    /// Creates a `PutOptions`.
147    #[inline]
148    pub const fn new() -> Self {
149        Self(PbPutRequest {
150            key: Vec::new(),
151            value: Vec::new(),
152            lease: 0,
153            prev_kv: false,
154            ignore_value: false,
155            ignore_lease: false,
156        })
157    }
158
159    /// Lease is the lease ID to associate with the key in the key-value store. A lease
160    /// value of 0 indicates no lease.
161    #[inline]
162    pub const fn with_lease(mut self, lease: i64) -> Self {
163        self.0.lease = lease;
164        self
165    }
166
167    /// If prev_kv is set, etcd gets the previous key-value pair before changing it.
168    /// The previous key-value pair will be returned in the put response.
169    #[inline]
170    pub const fn with_prev_key(mut self) -> Self {
171        self.0.prev_kv = true;
172        self
173    }
174
175    /// If ignore_value is set, etcd updates the key using its current value.
176    /// Returns an error if the key does not exist.
177    #[inline]
178    pub const fn with_ignore_value(mut self) -> Self {
179        self.0.ignore_value = true;
180        self
181    }
182
183    /// If ignore_lease is set, etcd updates the key using its current lease.
184    /// Returns an error if the key does not exist.
185    #[inline]
186    pub const fn with_ignore_lease(mut self) -> Self {
187        self.0.ignore_lease = true;
188        self
189    }
190}
191
192impl From<PutOptions> for PbPutRequest {
193    #[inline]
194    fn from(options: PutOptions) -> Self {
195        options.0
196    }
197}
198
199impl IntoRequest<PbPutRequest> for PutOptions {
200    #[inline]
201    fn into_request(self) -> Request<PbPutRequest> {
202        Request::new(self.into())
203    }
204}
205
206/// Response for `Put` operation.
207#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
208#[derive(Debug, Clone)]
209#[repr(transparent)]
210pub struct PutResponse(PbPutResponse);
211
212impl PutResponse {
213    /// Create a new `PutResponse` from pb put response.
214    #[inline]
215    const fn new(resp: PbPutResponse) -> Self {
216        Self(resp)
217    }
218
219    /// Get response header.
220    #[inline]
221    pub fn header(&self) -> Option<&ResponseHeader> {
222        self.0.header.as_ref().map(From::from)
223    }
224
225    /// Takes the header out of the response, leaving a [`None`] in its place.
226    #[inline]
227    pub fn take_header(&mut self) -> Option<ResponseHeader> {
228        self.0.header.take().map(ResponseHeader::new)
229    }
230
231    /// If prev_kv is set in the request, the previous key-value pair will be returned.
232    #[inline]
233    pub fn prev_key(&self) -> Option<&KeyValue> {
234        self.0.prev_kv.as_ref().map(From::from)
235    }
236
237    /// Takes the prev_key out of the response, leaving a [`None`] in its place.
238    #[inline]
239    pub fn take_prev_key(&mut self) -> Option<KeyValue> {
240        self.0.prev_kv.take().map(KeyValue::new)
241    }
242
243    #[inline]
244    pub(crate) fn strip_prev_key_prefix(&mut self, prefix: &[u8]) {
245        if let Some(kv) = self.0.prev_kv.as_mut() {
246            kv.key.strip_key_prefix(prefix);
247        }
248    }
249}
250
251/// Options for `Get` operation.
252#[derive(Debug, Default, Clone)]
253pub struct GetOptions {
254    req: PbRangeRequest,
255    key_range: KeyRange,
256}
257
258impl GetOptions {
259    /// Sets key.
260    #[inline]
261    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
262        self.key_range.with_key(key);
263        self
264    }
265
266    /// Creates a `GetOptions`.
267    #[inline]
268    pub const fn new() -> Self {
269        Self {
270            req: PbRangeRequest {
271                key: Vec::new(),
272                range_end: Vec::new(),
273                limit: 0,
274                revision: 0,
275                sort_order: 0,
276                sort_target: 0,
277                serializable: false,
278                keys_only: false,
279                count_only: false,
280                min_mod_revision: 0,
281                max_mod_revision: 0,
282                min_create_revision: 0,
283                max_create_revision: 0,
284            },
285            key_range: KeyRange::new(),
286        }
287    }
288
289    /// Specifies the range of 'Get'.
290    /// Returns the keys in the range [key, end_key).
291    /// `end_key` must be lexicographically greater than start key.
292    #[inline]
293    pub fn with_range(mut self, end_key: impl Into<Vec<u8>>) -> Self {
294        self.key_range.with_range(end_key);
295        self
296    }
297
298    /// Gets all keys >= key.
299    #[inline]
300    pub fn with_from_key(mut self) -> Self {
301        self.key_range.with_from_key();
302        self
303    }
304
305    /// Gets all keys prefixed with key.
306    #[inline]
307    pub fn with_prefix(mut self) -> Self {
308        self.key_range.with_prefix();
309        self
310    }
311
312    /// Gets all keys.
313    #[inline]
314    pub fn with_all_keys(mut self) -> Self {
315        self.key_range.with_all_keys();
316        self
317    }
318
319    /// Limits the number of keys returned for the request. When limit is set to 0,
320    /// it is treated as no limit.
321    #[inline]
322    pub const fn with_limit(mut self, limit: i64) -> Self {
323        self.req.limit = limit;
324        self
325    }
326
327    /// The point-in-time of the key-value store to use for the range.
328    /// If revision is less or equal to zero, the range is over the newest key-value store.
329    /// If the revision has been compacted, ErrCompacted is returned as a response.
330    #[inline]
331    pub const fn with_revision(mut self, revision: i64) -> Self {
332        self.req.revision = revision;
333        self
334    }
335
336    /// Sets the order for returned sorted results.
337    /// It requires 'with_range' and/or 'with_prefix' to be specified too.
338    #[inline]
339    pub fn with_sort(mut self, target: SortTarget, order: SortOrder) -> Self {
340        if target == SortTarget::Key && order == SortOrder::Ascend {
341            // If order != SortOrder::None, server fetches the entire key-space,
342            // and then applies the sort and limit, if provided.
343            // Since by default the server returns results sorted by keys
344            // in lexicographically ascending order, the client should ignore
345            // SortOrder if the target is SortTarget::Key.
346            self.req.sort_order = SortOrder::None as i32;
347        } else {
348            self.req.sort_order = order as i32;
349        }
350        self.req.sort_target = target as i32;
351        self
352    }
353
354    /// Sets the get request to use serializable member-local reads.
355    /// Get requests are linearizable by default; linearizable requests have higher
356    /// latency and lower throughput than serializable requests but reflect the current
357    /// consensus of the cluster. For better performance, in exchange for possible stale reads,
358    /// a serializable get request is served locally without needing to reach consensus
359    /// with other nodes in the cluster.
360    #[inline]
361    pub const fn with_serializable(mut self) -> Self {
362        self.req.serializable = true;
363        self
364    }
365
366    /// Returns only the keys and not the values.
367    #[inline]
368    pub const fn with_keys_only(mut self) -> Self {
369        self.req.keys_only = true;
370        self
371    }
372
373    /// Returns only the count of the keys in the range.
374    #[inline]
375    pub const fn with_count_only(mut self) -> Self {
376        self.req.count_only = true;
377        self
378    }
379
380    /// Sets the lower bound for returned key mod revisions; all keys with
381    /// lesser mod revisions will be filtered away.
382    #[inline]
383    pub const fn with_min_mod_revision(mut self, revision: i64) -> Self {
384        self.req.min_mod_revision = revision;
385        self
386    }
387
388    /// Sets the upper bound for returned key mod revisions; all keys with
389    /// greater mod revisions will be filtered away.
390    #[inline]
391    pub const fn with_max_mod_revision(mut self, revision: i64) -> Self {
392        self.req.max_mod_revision = revision;
393        self
394    }
395
396    /// Sets the lower bound for returned key create revisions; all keys with
397    /// lesser create revisions will be filtered away.
398    #[inline]
399    pub const fn with_min_create_revision(mut self, revision: i64) -> Self {
400        self.req.min_create_revision = revision;
401        self
402    }
403
404    /// `max_create_revision` is the upper bound for returned key create revisions; all keys with
405    /// greater create revisions will be filtered away.
406    #[inline]
407    pub const fn with_max_create_revision(mut self, revision: i64) -> Self {
408        self.req.max_create_revision = revision;
409        self
410    }
411
412    #[inline]
413    pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
414        &mut self.key_range.range_end
415    }
416}
417
418impl From<GetOptions> for PbRangeRequest {
419    #[inline]
420    fn from(mut options: GetOptions) -> Self {
421        let (key, rang_end) = options.key_range.build();
422        options.req.key = key;
423        options.req.range_end = rang_end;
424        options.req
425    }
426}
427
428impl IntoRequest<PbRangeRequest> for GetOptions {
429    #[inline]
430    fn into_request(self) -> Request<PbRangeRequest> {
431        Request::new(self.into())
432    }
433}
434
435/// Response for `Get` operation.
436#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
437#[derive(Debug, Clone)]
438#[repr(transparent)]
439pub struct GetResponse(PbRangeResponse);
440
441impl GetResponse {
442    /// Create a new `GetResponse` from pb get response.
443    #[inline]
444    const fn new(resp: PbRangeResponse) -> Self {
445        Self(resp)
446    }
447
448    /// Get response header.
449    #[inline]
450    pub fn header(&self) -> Option<&ResponseHeader> {
451        self.0.header.as_ref().map(From::from)
452    }
453
454    /// Takes the header out of the response, leaving a [`None`] in its place.
455    #[inline]
456    pub fn take_header(&mut self) -> Option<ResponseHeader> {
457        self.0.header.take().map(ResponseHeader::new)
458    }
459
460    /// The list of key-value pairs matched by the `Get` request.
461    /// kvs is empty when count is requested.
462    #[inline]
463    pub fn kvs(&self) -> &[KeyValue] {
464        unsafe { &*(self.0.kvs.as_slice() as *const _ as *const [KeyValue]) }
465    }
466
467    /// If `kvs` is set in the request, take the key-value pairs, leaving an empty vector in its place.
468    #[inline]
469    pub fn take_kvs(&mut self) -> Vec<KeyValue> {
470        let kvs = ManuallyDrop::new(std::mem::take(&mut self.0.kvs));
471        unsafe { Vec::from_raw_parts(kvs.as_ptr() as *mut KeyValue, kvs.len(), kvs.capacity()) }
472    }
473
474    #[inline]
475    pub(crate) fn strip_kvs_prefix(&mut self, prefix: &[u8]) {
476        for kv in self.0.kvs.iter_mut() {
477            kv.key.strip_key_prefix(prefix);
478        }
479    }
480
481    /// Indicates if there are more keys to return in the requested range.
482    #[inline]
483    pub const fn more(&self) -> bool {
484        self.0.more
485    }
486
487    /// The number of keys within the range when requested.
488    #[inline]
489    pub const fn count(&self) -> i64 {
490        self.0.count
491    }
492}
493
494/// Options for `Delete` operation.
495#[derive(Debug, Default, Clone)]
496pub struct DeleteOptions {
497    req: PbDeleteRequest,
498    key_range: KeyRange,
499}
500
501impl DeleteOptions {
502    /// Sets key.
503    #[inline]
504    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
505        self.key_range.with_key(key);
506        self
507    }
508
509    /// Creates a `DeleteOptions`.
510    #[inline]
511    pub const fn new() -> Self {
512        Self {
513            req: PbDeleteRequest {
514                key: Vec::new(),
515                range_end: Vec::new(),
516                prev_kv: false,
517            },
518            key_range: KeyRange::new(),
519        }
520    }
521
522    /// `end_key` is the key following the last key to delete for the range [key, end_key).
523    #[inline]
524    pub fn with_range(mut self, end_key: impl Into<Vec<u8>>) -> Self {
525        self.key_range.with_range(end_key);
526        self
527    }
528
529    /// Deletes all keys >= key.
530    #[inline]
531    pub fn with_from_key(mut self) -> Self {
532        self.key_range.with_from_key();
533        self
534    }
535
536    /// Deletes all keys prefixed with key.
537    #[inline]
538    pub fn with_prefix(mut self) -> Self {
539        self.key_range.with_prefix();
540        self
541    }
542
543    /// Deletes all keys.
544    #[inline]
545    pub fn with_all_keys(mut self) -> Self {
546        self.key_range.with_all_keys();
547        self
548    }
549
550    /// If `prev_kv` is set, etcd gets the previous key-value pairs before deleting it.
551    /// The previous key-value pairs will be returned in the delete response.
552    #[inline]
553    pub const fn with_prev_key(mut self) -> Self {
554        self.req.prev_kv = true;
555        self
556    }
557
558    #[inline]
559    pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
560        &mut self.key_range.range_end
561    }
562}
563
564impl From<DeleteOptions> for PbDeleteRequest {
565    #[inline]
566    fn from(mut options: DeleteOptions) -> Self {
567        let (key, rang_end) = options.key_range.build();
568        options.req.key = key;
569        options.req.range_end = rang_end;
570        options.req
571    }
572}
573
574impl IntoRequest<PbDeleteRequest> for DeleteOptions {
575    #[inline]
576    fn into_request(self) -> Request<DeleteRangeRequest> {
577        Request::new(self.into())
578    }
579}
580
581/// Response for `Delete` operation.
582#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
583#[derive(Debug, Clone)]
584#[repr(transparent)]
585pub struct DeleteResponse(PbDeleteResponse);
586
587impl DeleteResponse {
588    /// Create a new `DeleteResponse` from pb delete response.
589    #[inline]
590    const fn new(resp: PbDeleteResponse) -> Self {
591        Self(resp)
592    }
593
594    /// Delete response header.
595    #[inline]
596    pub fn header(&self) -> Option<&ResponseHeader> {
597        self.0.header.as_ref().map(From::from)
598    }
599
600    /// Takes the header out of the response, leaving a [`None`] in its place.
601    #[inline]
602    pub fn take_header(&mut self) -> Option<ResponseHeader> {
603        self.0.header.take().map(ResponseHeader::new)
604    }
605
606    /// The number of keys deleted by the delete request.
607    #[inline]
608    pub const fn deleted(&self) -> i64 {
609        self.0.deleted
610    }
611
612    /// If `prev_kv` is set in the request, the previous key-value pairs will be returned.
613    #[inline]
614    pub fn prev_kvs(&self) -> &[KeyValue] {
615        unsafe { &*(self.0.prev_kvs.as_slice() as *const _ as *const [KeyValue]) }
616    }
617
618    /// If `prev_kvs` is set in the request, take the previous key-value pairs, leaving an empty vector in its place.
619    #[inline]
620    pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
621        let kvs = ManuallyDrop::new(std::mem::take(&mut self.0.prev_kvs));
622        unsafe { Vec::from_raw_parts(kvs.as_ptr() as *mut KeyValue, kvs.len(), kvs.capacity()) }
623    }
624
625    #[inline]
626    pub(crate) fn strip_prev_kvs_prefix(&mut self, prefix: &[u8]) {
627        for kv in self.0.prev_kvs.iter_mut() {
628            kv.key.strip_key_prefix(prefix);
629        }
630    }
631}
632
633/// Options for `Compact` operation.
634#[derive(Debug, Default, Clone)]
635#[repr(transparent)]
636pub struct CompactionOptions(PbCompactionRequest);
637
638impl CompactionOptions {
639    /// Creates a `CompactionOptions`.
640    #[inline]
641    pub const fn new() -> Self {
642        Self(PbCompactionRequest {
643            revision: 0,
644            physical: false,
645        })
646    }
647
648    /// The key-value store revision for the compaction operation.
649    #[inline]
650    const fn with_revision(mut self, revision: i64) -> Self {
651        self.0.revision = revision;
652        self
653    }
654
655    /// Physical is set so the RPC will wait until the compaction is physically
656    /// applied to the local database such that compacted entries are totally
657    /// removed from the backend database.
658    #[inline]
659    pub const fn with_physical(mut self) -> Self {
660        self.0.physical = true;
661        self
662    }
663}
664
665impl IntoRequest<PbCompactionRequest> for CompactionOptions {
666    #[inline]
667    fn into_request(self) -> Request<CompactionRequest> {
668        Request::new(self.0)
669    }
670}
671
672/// Response for `Compact` operation.
673#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
674#[derive(Debug, Clone)]
675#[repr(transparent)]
676pub struct CompactionResponse(PbCompactionResponse);
677
678impl CompactionResponse {
679    /// Create a new `CompactionResponse` from pb compaction response.
680    #[inline]
681    const fn new(resp: PbCompactionResponse) -> Self {
682        Self(resp)
683    }
684
685    /// Compact response header.
686    #[inline]
687    pub fn header(&self) -> Option<&ResponseHeader> {
688        self.0.header.as_ref().map(From::from)
689    }
690
691    /// Takes the header out of the response, leaving a [`None`] in its place.
692    #[inline]
693    pub fn take_header(&mut self) -> Option<ResponseHeader> {
694        self.0.header.take().map(ResponseHeader::new)
695    }
696}
697
698/// Transaction comparison.
699#[derive(Debug, Clone)]
700#[repr(transparent)]
701pub struct Compare(PbCompare);
702
703impl Compare {
704    /// Creates a new `Compare`.
705    #[inline]
706    fn new(
707        key: impl Into<Vec<u8>>,
708        cmp: CompareOp,
709        target: CompareTarget,
710        target_union: TargetUnion,
711    ) -> Self {
712        Self(PbCompare {
713            result: cmp as i32,
714            target: target as i32,
715            key: key.into(),
716            range_end: Vec::new(),
717            target_union: Some(target_union),
718        })
719    }
720
721    /// Compares the version of the given key.
722    #[inline]
723    pub fn version(key: impl Into<Vec<u8>>, cmp: CompareOp, version: i64) -> Self {
724        Self::new(
725            key,
726            cmp,
727            CompareTarget::Version,
728            TargetUnion::Version(version),
729        )
730    }
731
732    /// Compares the creation revision of the given key.
733    #[inline]
734    pub fn create_revision(key: impl Into<Vec<u8>>, cmp: CompareOp, revision: i64) -> Self {
735        Self::new(
736            key,
737            cmp,
738            CompareTarget::Create,
739            TargetUnion::CreateRevision(revision),
740        )
741    }
742
743    /// Compares the last modified revision of the given key.
744    #[inline]
745    pub fn mod_revision(key: impl Into<Vec<u8>>, cmp: CompareOp, revision: i64) -> Self {
746        Self::new(
747            key,
748            cmp,
749            CompareTarget::Mod,
750            TargetUnion::ModRevision(revision),
751        )
752    }
753
754    /// Compares the value of the given key.
755    #[inline]
756    pub fn value(key: impl Into<Vec<u8>>, cmp: CompareOp, value: impl Into<Vec<u8>>) -> Self {
757        Self::new(
758            key,
759            cmp,
760            CompareTarget::Value,
761            TargetUnion::Value(value.into()),
762        )
763    }
764
765    /// Compares the lease id of the given key.
766    #[inline]
767    pub fn lease(key: impl Into<Vec<u8>>, cmp: CompareOp, lease: i64) -> Self {
768        Self::new(key, cmp, CompareTarget::Lease, TargetUnion::Lease(lease))
769    }
770
771    /// Sets the comparison to scan the range [key, end).
772    #[inline]
773    pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
774        self.0.range_end = end.into();
775        self
776    }
777
778    /// Sets the comparison to scan all keys prefixed by the key.
779    #[inline]
780    pub fn with_prefix(mut self) -> Self {
781        self.0.range_end = get_prefix(&self.0.key);
782        self
783    }
784}
785
786/// Transaction operation.
787#[derive(Debug, Clone)]
788#[repr(transparent)]
789pub struct TxnOp(PbTxnOp);
790
791impl TxnOp {
792    /// `Put` operation.
793    #[inline]
794    pub fn put(
795        key: impl Into<Vec<u8>>,
796        value: impl Into<Vec<u8>>,
797        options: Option<PutOptions>,
798    ) -> Self {
799        TxnOp(PbTxnOp::RequestPut(
800            options.unwrap_or_default().with_kv(key, value).into(),
801        ))
802    }
803
804    /// `Get` operation.
805    #[inline]
806    pub fn get(key: impl Into<Vec<u8>>, options: Option<GetOptions>) -> Self {
807        TxnOp(PbTxnOp::RequestRange(
808            options.unwrap_or_default().with_key(key).into(),
809        ))
810    }
811
812    /// `Delete` operation.
813    #[inline]
814    pub fn delete(key: impl Into<Vec<u8>>, options: Option<DeleteOptions>) -> Self {
815        TxnOp(PbTxnOp::RequestDeleteRange(
816            options.unwrap_or_default().with_key(key).into(),
817        ))
818    }
819
820    /// `Txn` operation.
821    #[inline]
822    pub fn txn(txn: Txn) -> Self {
823        TxnOp(PbTxnOp::RequestTxn(txn.into()))
824    }
825}
826
827impl From<TxnOp> for PbTxnOp {
828    #[inline]
829    fn from(op: TxnOp) -> Self {
830        op.0
831    }
832}
833
834/// Transaction of multiple operations.
835#[derive(Debug, Default, Clone)]
836pub struct Txn {
837    req: PbTxnRequest,
838    c_when: bool,
839    c_then: bool,
840    c_else: bool,
841}
842
843impl Txn {
844    /// Creates a new transaction.
845    #[inline]
846    pub const fn new() -> Self {
847        Self {
848            req: PbTxnRequest {
849                compare: Vec::new(),
850                success: Vec::new(),
851                failure: Vec::new(),
852            },
853            c_when: false,
854            c_then: false,
855            c_else: false,
856        }
857    }
858
859    /// Takes a list of comparison. If all comparisons passed in succeed,
860    /// the operations passed into `and_then()` will be executed. Or the operations
861    /// passed into `or_else()` will be executed.
862    #[inline]
863    pub fn when(mut self, compares: impl Into<Vec<Compare>>) -> Self {
864        assert!(!self.c_when, "cannot call when twice");
865        assert!(!self.c_then, "cannot call when after and_then");
866        assert!(!self.c_else, "cannot call when after or_else");
867
868        self.c_when = true;
869
870        let compares = ManuallyDrop::new(compares.into());
871        self.req.compare = unsafe {
872            Vec::from_raw_parts(
873                compares.as_ptr() as *mut PbCompare,
874                compares.len(),
875                compares.capacity(),
876            )
877        };
878
879        self
880    }
881
882    /// Takes a list of operations. The operations list will be executed, if the
883    /// comparisons passed in `when()` succeed.
884    #[inline]
885    pub fn and_then(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
886        assert!(!self.c_then, "cannot call and_then twice");
887        assert!(!self.c_else, "cannot call and_then after or_else");
888
889        self.c_then = true;
890        self.req.success = operations
891            .into()
892            .into_iter()
893            .map(|op| PbTxnRequestOp {
894                request: Some(op.into()),
895            })
896            .collect();
897        self
898    }
899
900    /// Takes a list of operations. The operations list will be executed, if the
901    /// comparisons passed in `when()` fail.
902    #[inline]
903    pub fn or_else(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
904        assert!(!self.c_else, "cannot call or_else twice");
905
906        self.c_else = true;
907        self.req.failure = operations
908            .into()
909            .into_iter()
910            .map(|op| PbTxnRequestOp {
911                request: Some(op.into()),
912            })
913            .collect();
914        self
915    }
916
917    #[inline]
918    pub(crate) fn prefix_with(&mut self, prefix: &[u8]) {
919        self.req.prefix_with(prefix);
920    }
921}
922
923impl PbTxnRequest {
924    fn prefix_with(&mut self, prefix: &[u8]) {
925        let prefix_op = |op: &mut PbTxnRequestOp| {
926            if let Some(request) = &mut op.request {
927                match request {
928                    PbTxnOp::RequestRange(req) => {
929                        req.key.prefix_with(prefix);
930                        req.range_end.prefix_range_end_with(prefix);
931                    }
932                    PbTxnOp::RequestPut(req) => {
933                        req.key.prefix_with(prefix);
934                    }
935                    PbTxnOp::RequestDeleteRange(req) => {
936                        req.key.prefix_with(prefix);
937                        req.range_end.prefix_range_end_with(prefix);
938                    }
939                    PbTxnOp::RequestTxn(req) => {
940                        req.prefix_with(prefix);
941                    }
942                }
943            }
944        };
945
946        self.compare.iter_mut().for_each(|cmp| {
947            cmp.key.prefix_with(prefix);
948            cmp.range_end.prefix_range_end_with(prefix);
949        });
950        self.success.iter_mut().for_each(prefix_op);
951        self.failure.iter_mut().for_each(prefix_op);
952    }
953}
954
955impl From<Txn> for PbTxnRequest {
956    #[inline]
957    fn from(txn: Txn) -> Self {
958        txn.req
959    }
960}
961
962impl IntoRequest<PbTxnRequest> for Txn {
963    #[inline]
964    fn into_request(self) -> Request<PbTxnRequest> {
965        Request::new(self.into())
966    }
967}
968
969/// Transaction operation response.
970#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
971#[derive(Debug, Clone)]
972pub enum TxnOpResponse {
973    Put(PutResponse),
974    Get(GetResponse),
975    Delete(DeleteResponse),
976    Txn(TxnResponse),
977}
978
979/// Response for `Txn` operation.
980#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
981#[derive(Debug, Clone)]
982#[repr(transparent)]
983pub struct TxnResponse(PbTxnResponse);
984
985impl TxnResponse {
986    /// Creates a new `Txn` response.
987    #[inline]
988    const fn new(resp: PbTxnResponse) -> Self {
989        Self(resp)
990    }
991
992    /// Transaction response header.
993    #[inline]
994    pub fn header(&self) -> Option<&ResponseHeader> {
995        self.0.header.as_ref().map(From::from)
996    }
997
998    /// Takes the header out of the response, leaving a [`None`] in its place.
999    #[inline]
1000    pub fn take_header(&mut self) -> Option<ResponseHeader> {
1001        self.0.header.take().map(ResponseHeader::new)
1002    }
1003
1004    /// Returns `true` if the compare evaluated to true or `false` otherwise.
1005    #[inline]
1006    pub const fn succeeded(&self) -> bool {
1007        self.0.succeeded
1008    }
1009
1010    /// Returns responses of transaction operations.
1011    #[inline]
1012    pub fn op_responses(&self) -> Vec<TxnOpResponse> {
1013        self.0
1014            .responses
1015            .iter()
1016            .map(|resp| match resp.response.as_ref().unwrap() {
1017                PbTxnOpResponse::ResponsePut(put) => {
1018                    TxnOpResponse::Put(PutResponse::new(put.clone()))
1019                }
1020                PbTxnOpResponse::ResponseRange(get) => {
1021                    TxnOpResponse::Get(GetResponse::new(get.clone()))
1022                }
1023                PbTxnOpResponse::ResponseDeleteRange(delete) => {
1024                    TxnOpResponse::Delete(DeleteResponse::new(delete.clone()))
1025                }
1026                PbTxnOpResponse::ResponseTxn(txn) => {
1027                    TxnOpResponse::Txn(TxnResponse::new(txn.clone()))
1028                }
1029            })
1030            .collect()
1031    }
1032
1033    #[inline]
1034    pub(crate) fn strip_key_prefix(&mut self, prefix: &[u8]) {
1035        self.0.strip_key_prefix(prefix);
1036    }
1037}
1038
1039impl PbTxnResponse {
1040    fn strip_key_prefix(&mut self, prefix: &[u8]) {
1041        self.responses.iter_mut().for_each(|op| {
1042            if let Some(resp) = &mut op.response {
1043                match resp {
1044                    PbTxnOpResponse::ResponseRange(r) => {
1045                        for kv in r.kvs.iter_mut() {
1046                            kv.key.strip_key_prefix(prefix);
1047                        }
1048                    }
1049                    PbTxnOpResponse::ResponsePut(r) => {
1050                        if let Some(kv) = r.prev_kv.as_mut() {
1051                            kv.key.strip_key_prefix(prefix);
1052                        }
1053                    }
1054                    PbTxnOpResponse::ResponseDeleteRange(r) => {
1055                        for kv in r.prev_kvs.iter_mut() {
1056                            kv.key.strip_key_prefix(prefix);
1057                        }
1058                    }
1059                    PbTxnOpResponse::ResponseTxn(r) => {
1060                        r.strip_key_prefix(prefix);
1061                    }
1062                }
1063            }
1064        });
1065    }
1066}