etcd_client/rpc/
kv.rs

1//! Etcd KV Operations.
2
3pub use crate::rpc::pb::etcdserverpb::compare::CompareResult as CompareOp;
4pub use crate::rpc::pb::etcdserverpb::range_request::{SortOrder, SortTarget};
5
6use crate::auth::AuthService;
7use crate::channel::Channel;
8use crate::error::Result;
9use crate::rpc::pb::etcdserverpb::compare::{CompareTarget, TargetUnion};
10use crate::rpc::pb::etcdserverpb::kv_client::KvClient as PbKvClient;
11use crate::rpc::pb::etcdserverpb::request_op::Request as PbTxnOp;
12use crate::rpc::pb::etcdserverpb::response_op::Response as PbTxnOpResponse;
13use crate::rpc::pb::etcdserverpb::{
14    CompactionRequest as PbCompactionRequest, CompactionRequest,
15    CompactionResponse as PbCompactionResponse, Compare as PbCompare,
16    DeleteRangeRequest as PbDeleteRequest, DeleteRangeRequest,
17    DeleteRangeResponse as PbDeleteResponse, PutRequest as PbPutRequest,
18    PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
19    RequestOp as PbTxnRequestOp, TxnRequest as PbTxnRequest, TxnResponse as PbTxnResponse,
20};
21use crate::rpc::{get_prefix, KeyRange, KeyValue, ResponseHeader};
22use crate::vec::VecExt;
23use http::HeaderValue;
24use std::mem::ManuallyDrop;
25use std::sync::{Arc, RwLock};
26use tonic::{IntoRequest, Request};
27
28/// Client for KV operations.
29#[repr(transparent)]
30#[derive(Clone)]
31pub struct KvClient {
32    inner: PbKvClient<AuthService<Channel>>,
33}
34
35impl KvClient {
36    /// Creates a kv client.
37    #[inline]
38    pub(crate) fn new(channel: Channel, auth_token: Arc<RwLock<Option<HeaderValue>>>) -> Self {
39        let inner = PbKvClient::new(AuthService::new(channel, auth_token));
40        Self { inner }
41    }
42
43    /// Limits the maximum size of a decoded message.
44    ///
45    /// Default: `4MB`
46    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
47        self.inner = self.inner.max_decoding_message_size(limit);
48        self
49    }
50
51    /// Limits the maximum size of an encoded message.
52    ///
53    /// Default: `usize::MAX`
54    pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
55        self.inner = self.inner.max_encoding_message_size(limit);
56        self
57    }
58
59    /// Puts the given key into the key-value store.
60    /// A put request increments the revision of the key-value store
61    /// and generates one event in the event history.
62    #[inline]
63    pub async fn put(
64        &mut self,
65        key: impl Into<Vec<u8>>,
66        value: impl Into<Vec<u8>>,
67        options: Option<PutOptions>,
68    ) -> Result<PutResponse> {
69        let resp = self
70            .inner
71            .put(options.unwrap_or_default().with_kv(key, value))
72            .await?
73            .into_inner();
74        Ok(PutResponse::new(resp))
75    }
76
77    /// Gets the key or a range of keys from the store.
78    #[inline]
79    pub async fn get(
80        &mut self,
81        key: impl Into<Vec<u8>>,
82        options: Option<GetOptions>,
83    ) -> Result<GetResponse> {
84        let resp = self
85            .inner
86            .range(options.unwrap_or_default().with_key(key.into()))
87            .await?
88            .into_inner();
89        Ok(GetResponse::new(resp))
90    }
91
92    /// Deletes the given key or a range of keys from the key-value store.
93    #[inline]
94    pub async fn delete(
95        &mut self,
96        key: impl Into<Vec<u8>>,
97        options: Option<DeleteOptions>,
98    ) -> Result<DeleteResponse> {
99        let resp = self
100            .inner
101            .delete_range(options.unwrap_or_default().with_key(key.into()))
102            .await?
103            .into_inner();
104        Ok(DeleteResponse::new(resp))
105    }
106
107    /// Compacts the event history in the etcd key-value store. The key-value
108    /// store should be periodically compacted or the event history will continue to grow
109    /// indefinitely.
110    #[inline]
111    pub async fn compact(
112        &mut self,
113        revision: i64,
114        options: Option<CompactionOptions>,
115    ) -> Result<CompactionResponse> {
116        let resp = self
117            .inner
118            .compact(options.unwrap_or_default().with_revision(revision))
119            .await?
120            .into_inner();
121        Ok(CompactionResponse::new(resp))
122    }
123
124    /// Processes multiple operations in a single transaction.
125    /// A txn request increments the revision of the key-value store
126    /// and generates events with the same revision for every completed operation.
127    /// It is not allowed to modify the same key several times within one txn.
128    #[inline]
129    pub async fn txn(&mut self, txn: Txn) -> Result<TxnResponse> {
130        let resp = self.inner.txn(txn).await?.into_inner();
131        Ok(TxnResponse::new(resp))
132    }
133}
134
135/// Options for `Put` operation.
136#[derive(Debug, Default, Clone)]
137#[repr(transparent)]
138pub struct PutOptions(PbPutRequest);
139
140impl PutOptions {
141    /// Set key-value pair.
142    #[inline]
143    fn with_kv(mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
144        self.0.key = key.into();
145        self.0.value = value.into();
146        self
147    }
148
149    /// Creates a `PutOptions`.
150    #[inline]
151    pub const fn new() -> Self {
152        Self(PbPutRequest {
153            key: Vec::new(),
154            value: Vec::new(),
155            lease: 0,
156            prev_kv: false,
157            ignore_value: false,
158            ignore_lease: false,
159        })
160    }
161
162    /// Lease is the lease ID to associate with the key in the key-value store. A lease
163    /// value of 0 indicates no lease.
164    #[inline]
165    pub const fn with_lease(mut self, lease: i64) -> Self {
166        self.0.lease = lease;
167        self
168    }
169
170    /// If prev_kv is set, etcd gets the previous key-value pair before changing it.
171    /// The previous key-value pair will be returned in the put response.
172    #[inline]
173    pub const fn with_prev_key(mut self) -> Self {
174        self.0.prev_kv = true;
175        self
176    }
177
178    /// If ignore_value is set, etcd updates the key using its current value.
179    /// Returns an error if the key does not exist.
180    #[inline]
181    pub const fn with_ignore_value(mut self) -> Self {
182        self.0.ignore_value = true;
183        self
184    }
185
186    /// If ignore_lease is set, etcd updates the key using its current lease.
187    /// Returns an error if the key does not exist.
188    #[inline]
189    pub const fn with_ignore_lease(mut self) -> Self {
190        self.0.ignore_lease = true;
191        self
192    }
193}
194
195impl From<PutOptions> for PbPutRequest {
196    #[inline]
197    fn from(options: PutOptions) -> Self {
198        options.0
199    }
200}
201
202impl IntoRequest<PbPutRequest> for PutOptions {
203    #[inline]
204    fn into_request(self) -> Request<PbPutRequest> {
205        Request::new(self.into())
206    }
207}
208
209/// Response for `Put` operation.
210#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
211#[derive(Debug, Clone)]
212#[repr(transparent)]
213pub struct PutResponse(PbPutResponse);
214
215impl PutResponse {
216    /// Create a new `PutResponse` from pb put response.
217    #[inline]
218    const fn new(resp: PbPutResponse) -> Self {
219        Self(resp)
220    }
221
222    /// Get response header.
223    #[inline]
224    pub fn header(&self) -> Option<&ResponseHeader> {
225        self.0.header.as_ref().map(From::from)
226    }
227
228    /// Takes the header out of the response, leaving a [`None`] in its place.
229    #[inline]
230    pub fn take_header(&mut self) -> Option<ResponseHeader> {
231        self.0.header.take().map(ResponseHeader::new)
232    }
233
234    /// If prev_kv is set in the request, the previous key-value pair will be returned.
235    #[inline]
236    pub fn prev_key(&self) -> Option<&KeyValue> {
237        self.0.prev_kv.as_ref().map(From::from)
238    }
239
240    /// Takes the prev_key out of the response, leaving a [`None`] in its place.
241    #[inline]
242    pub fn take_prev_key(&mut self) -> Option<KeyValue> {
243        self.0.prev_kv.take().map(KeyValue::new)
244    }
245
246    #[inline]
247    pub(crate) fn strip_prev_key_prefix(&mut self, prefix: &[u8]) {
248        if let Some(kv) = self.0.prev_kv.as_mut() {
249            kv.key.strip_key_prefix(prefix);
250        }
251    }
252}
253
254/// Options for `Get` operation.
255#[derive(Debug, Default, Clone)]
256pub struct GetOptions {
257    req: PbRangeRequest,
258    key_range: KeyRange,
259}
260
261impl GetOptions {
262    /// Sets key.
263    #[inline]
264    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
265        self.key_range.with_key(key);
266        self
267    }
268
269    /// Creates a `GetOptions`.
270    #[inline]
271    pub const fn new() -> Self {
272        Self {
273            req: PbRangeRequest {
274                key: Vec::new(),
275                range_end: Vec::new(),
276                limit: 0,
277                revision: 0,
278                sort_order: 0,
279                sort_target: 0,
280                serializable: false,
281                keys_only: false,
282                count_only: false,
283                min_mod_revision: 0,
284                max_mod_revision: 0,
285                min_create_revision: 0,
286                max_create_revision: 0,
287            },
288            key_range: KeyRange::new(),
289        }
290    }
291
292    /// Specifies the range of 'Get'.
293    /// Returns the keys in the range [key, end_key).
294    /// `end_key` must be lexicographically greater than start key.
295    #[inline]
296    pub fn with_range(mut self, end_key: impl Into<Vec<u8>>) -> Self {
297        self.key_range.with_range(end_key);
298        self
299    }
300
301    /// Gets all keys >= key.
302    #[inline]
303    pub fn with_from_key(mut self) -> Self {
304        self.key_range.with_from_key();
305        self
306    }
307
308    /// Gets all keys prefixed with key.
309    #[inline]
310    pub fn with_prefix(mut self) -> Self {
311        self.key_range.with_prefix();
312        self
313    }
314
315    /// Gets all keys.
316    #[inline]
317    pub fn with_all_keys(mut self) -> Self {
318        self.key_range.with_all_keys();
319        self
320    }
321
322    /// Limits the number of keys returned for the request. When limit is set to 0,
323    /// it is treated as no limit.
324    #[inline]
325    pub const fn with_limit(mut self, limit: i64) -> Self {
326        self.req.limit = limit;
327        self
328    }
329
330    /// The point-in-time of the key-value store to use for the range.
331    /// If revision is less or equal to zero, the range is over the newest key-value store.
332    /// If the revision has been compacted, ErrCompacted is returned as a response.
333    #[inline]
334    pub const fn with_revision(mut self, revision: i64) -> Self {
335        self.req.revision = revision;
336        self
337    }
338
339    /// Sets the order for returned sorted results.
340    /// It requires 'with_range' and/or 'with_prefix' to be specified too.
341    #[inline]
342    pub fn with_sort(mut self, target: SortTarget, order: SortOrder) -> Self {
343        if target == SortTarget::Key && order == SortOrder::Ascend {
344            // If order != SortOrder::None, server fetches the entire key-space,
345            // and then applies the sort and limit, if provided.
346            // Since by default the server returns results sorted by keys
347            // in lexicographically ascending order, the client should ignore
348            // SortOrder if the target is SortTarget::Key.
349            self.req.sort_order = SortOrder::None as i32;
350        } else {
351            self.req.sort_order = order as i32;
352        }
353        self.req.sort_target = target as i32;
354        self
355    }
356
357    /// Sets the get request to use serializable member-local reads.
358    /// Get requests are linearizable by default; linearizable requests have higher
359    /// latency and lower throughput than serializable requests but reflect the current
360    /// consensus of the cluster. For better performance, in exchange for possible stale reads,
361    /// a serializable get request is served locally without needing to reach consensus
362    /// with other nodes in the cluster.
363    #[inline]
364    pub const fn with_serializable(mut self) -> Self {
365        self.req.serializable = true;
366        self
367    }
368
369    /// Returns only the keys and not the values.
370    #[inline]
371    pub const fn with_keys_only(mut self) -> Self {
372        self.req.keys_only = true;
373        self
374    }
375
376    /// Returns only the count of the keys in the range.
377    #[inline]
378    pub const fn with_count_only(mut self) -> Self {
379        self.req.count_only = true;
380        self
381    }
382
383    /// Sets the lower bound for returned key mod revisions; all keys with
384    /// lesser mod revisions will be filtered away.
385    #[inline]
386    pub const fn with_min_mod_revision(mut self, revision: i64) -> Self {
387        self.req.min_mod_revision = revision;
388        self
389    }
390
391    /// Sets the upper bound for returned key mod revisions; all keys with
392    /// greater mod revisions will be filtered away.
393    #[inline]
394    pub const fn with_max_mod_revision(mut self, revision: i64) -> Self {
395        self.req.max_mod_revision = revision;
396        self
397    }
398
399    /// Sets the lower bound for returned key create revisions; all keys with
400    /// lesser create revisions will be filtered away.
401    #[inline]
402    pub const fn with_min_create_revision(mut self, revision: i64) -> Self {
403        self.req.min_create_revision = revision;
404        self
405    }
406
407    /// `max_create_revision` is the upper bound for returned key create revisions; all keys with
408    /// greater create revisions will be filtered away.
409    #[inline]
410    pub const fn with_max_create_revision(mut self, revision: i64) -> Self {
411        self.req.max_create_revision = revision;
412        self
413    }
414
415    #[inline]
416    pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
417        &mut self.key_range.range_end
418    }
419}
420
421impl From<GetOptions> for PbRangeRequest {
422    #[inline]
423    fn from(mut options: GetOptions) -> Self {
424        let (key, rang_end) = options.key_range.build();
425        options.req.key = key;
426        options.req.range_end = rang_end;
427        options.req
428    }
429}
430
431impl IntoRequest<PbRangeRequest> for GetOptions {
432    #[inline]
433    fn into_request(self) -> Request<PbRangeRequest> {
434        Request::new(self.into())
435    }
436}
437
438/// Response for `Get` operation.
439#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
440#[derive(Debug, Clone)]
441#[repr(transparent)]
442pub struct GetResponse(PbRangeResponse);
443
444impl GetResponse {
445    /// Create a new `GetResponse` from pb get response.
446    #[inline]
447    const fn new(resp: PbRangeResponse) -> Self {
448        Self(resp)
449    }
450
451    /// Get response header.
452    #[inline]
453    pub fn header(&self) -> Option<&ResponseHeader> {
454        self.0.header.as_ref().map(From::from)
455    }
456
457    /// Takes the header out of the response, leaving a [`None`] in its place.
458    #[inline]
459    pub fn take_header(&mut self) -> Option<ResponseHeader> {
460        self.0.header.take().map(ResponseHeader::new)
461    }
462
463    /// The list of key-value pairs matched by the `Get` request.
464    /// kvs is empty when count is requested.
465    #[inline]
466    pub fn kvs(&self) -> &[KeyValue] {
467        unsafe { &*(self.0.kvs.as_slice() as *const _ as *const [KeyValue]) }
468    }
469
470    /// If `kvs` is set in the request, take the key-value pairs, leaving an empty vector in its place.
471    #[inline]
472    pub fn take_kvs(&mut self) -> Vec<KeyValue> {
473        let kvs = ManuallyDrop::new(std::mem::take(&mut self.0.kvs));
474        unsafe { Vec::from_raw_parts(kvs.as_ptr() as *mut KeyValue, kvs.len(), kvs.capacity()) }
475    }
476
477    #[inline]
478    pub(crate) fn strip_kvs_prefix(&mut self, prefix: &[u8]) {
479        for kv in self.0.kvs.iter_mut() {
480            kv.key.strip_key_prefix(prefix);
481        }
482    }
483
484    /// Indicates if there are more keys to return in the requested range.
485    #[inline]
486    pub const fn more(&self) -> bool {
487        self.0.more
488    }
489
490    /// The number of keys within the range when requested.
491    #[inline]
492    pub const fn count(&self) -> i64 {
493        self.0.count
494    }
495}
496
497/// Options for `Delete` operation.
498#[derive(Debug, Default, Clone)]
499pub struct DeleteOptions {
500    req: PbDeleteRequest,
501    key_range: KeyRange,
502}
503
504impl DeleteOptions {
505    /// Sets key.
506    #[inline]
507    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
508        self.key_range.with_key(key);
509        self
510    }
511
512    /// Creates a `DeleteOptions`.
513    #[inline]
514    pub const fn new() -> Self {
515        Self {
516            req: PbDeleteRequest {
517                key: Vec::new(),
518                range_end: Vec::new(),
519                prev_kv: false,
520            },
521            key_range: KeyRange::new(),
522        }
523    }
524
525    /// `end_key` is the key following the last key to delete for the range [key, end_key).
526    #[inline]
527    pub fn with_range(mut self, end_key: impl Into<Vec<u8>>) -> Self {
528        self.key_range.with_range(end_key);
529        self
530    }
531
532    /// Deletes all keys >= key.
533    #[inline]
534    pub fn with_from_key(mut self) -> Self {
535        self.key_range.with_from_key();
536        self
537    }
538
539    /// Deletes all keys prefixed with key.
540    #[inline]
541    pub fn with_prefix(mut self) -> Self {
542        self.key_range.with_prefix();
543        self
544    }
545
546    /// Deletes all keys.
547    #[inline]
548    pub fn with_all_keys(mut self) -> Self {
549        self.key_range.with_all_keys();
550        self
551    }
552
553    /// If `prev_kv` is set, etcd gets the previous key-value pairs before deleting it.
554    /// The previous key-value pairs will be returned in the delete response.
555    #[inline]
556    pub const fn with_prev_key(mut self) -> Self {
557        self.req.prev_kv = true;
558        self
559    }
560
561    #[inline]
562    pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
563        &mut self.key_range.range_end
564    }
565}
566
567impl From<DeleteOptions> for PbDeleteRequest {
568    #[inline]
569    fn from(mut options: DeleteOptions) -> Self {
570        let (key, rang_end) = options.key_range.build();
571        options.req.key = key;
572        options.req.range_end = rang_end;
573        options.req
574    }
575}
576
577impl IntoRequest<PbDeleteRequest> for DeleteOptions {
578    #[inline]
579    fn into_request(self) -> Request<DeleteRangeRequest> {
580        Request::new(self.into())
581    }
582}
583
584/// Response for `Delete` operation.
585#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
586#[derive(Debug, Clone)]
587#[repr(transparent)]
588pub struct DeleteResponse(PbDeleteResponse);
589
590impl DeleteResponse {
591    /// Create a new `DeleteResponse` from pb delete response.
592    #[inline]
593    const fn new(resp: PbDeleteResponse) -> Self {
594        Self(resp)
595    }
596
597    /// Delete response header.
598    #[inline]
599    pub fn header(&self) -> Option<&ResponseHeader> {
600        self.0.header.as_ref().map(From::from)
601    }
602
603    /// Takes the header out of the response, leaving a [`None`] in its place.
604    #[inline]
605    pub fn take_header(&mut self) -> Option<ResponseHeader> {
606        self.0.header.take().map(ResponseHeader::new)
607    }
608
609    /// The number of keys deleted by the delete request.
610    #[inline]
611    pub const fn deleted(&self) -> i64 {
612        self.0.deleted
613    }
614
615    /// If `prev_kv` is set in the request, the previous key-value pairs will be returned.
616    #[inline]
617    pub fn prev_kvs(&self) -> &[KeyValue] {
618        unsafe { &*(self.0.prev_kvs.as_slice() as *const _ as *const [KeyValue]) }
619    }
620
621    /// If `prev_kvs` is set in the request, take the previous key-value pairs, leaving an empty vector in its place.
622    #[inline]
623    pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
624        let kvs = ManuallyDrop::new(std::mem::take(&mut self.0.prev_kvs));
625        unsafe { Vec::from_raw_parts(kvs.as_ptr() as *mut KeyValue, kvs.len(), kvs.capacity()) }
626    }
627
628    #[inline]
629    pub(crate) fn strip_prev_kvs_prefix(&mut self, prefix: &[u8]) {
630        for kv in self.0.prev_kvs.iter_mut() {
631            kv.key.strip_key_prefix(prefix);
632        }
633    }
634}
635
636/// Options for `Compact` operation.
637#[derive(Debug, Default, Clone)]
638#[repr(transparent)]
639pub struct CompactionOptions(PbCompactionRequest);
640
641impl CompactionOptions {
642    /// Creates a `CompactionOptions`.
643    #[inline]
644    pub const fn new() -> Self {
645        Self(PbCompactionRequest {
646            revision: 0,
647            physical: false,
648        })
649    }
650
651    /// The key-value store revision for the compaction operation.
652    #[inline]
653    const fn with_revision(mut self, revision: i64) -> Self {
654        self.0.revision = revision;
655        self
656    }
657
658    /// Physical is set so the RPC will wait until the compaction is physically
659    /// applied to the local database such that compacted entries are totally
660    /// removed from the backend database.
661    #[inline]
662    pub const fn with_physical(mut self) -> Self {
663        self.0.physical = true;
664        self
665    }
666}
667
668impl IntoRequest<PbCompactionRequest> for CompactionOptions {
669    #[inline]
670    fn into_request(self) -> Request<CompactionRequest> {
671        Request::new(self.0)
672    }
673}
674
675/// Response for `Compact` operation.
676#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
677#[derive(Debug, Clone)]
678#[repr(transparent)]
679pub struct CompactionResponse(PbCompactionResponse);
680
681impl CompactionResponse {
682    /// Create a new `CompactionResponse` from pb compaction response.
683    #[inline]
684    const fn new(resp: PbCompactionResponse) -> Self {
685        Self(resp)
686    }
687
688    /// Compact response header.
689    #[inline]
690    pub fn header(&self) -> Option<&ResponseHeader> {
691        self.0.header.as_ref().map(From::from)
692    }
693
694    /// Takes the header out of the response, leaving a [`None`] in its place.
695    #[inline]
696    pub fn take_header(&mut self) -> Option<ResponseHeader> {
697        self.0.header.take().map(ResponseHeader::new)
698    }
699}
700
701/// Transaction comparison.
702#[derive(Debug, Clone)]
703#[repr(transparent)]
704pub struct Compare(PbCompare);
705
706impl Compare {
707    /// Creates a new `Compare`.
708    #[inline]
709    fn new(
710        key: impl Into<Vec<u8>>,
711        cmp: CompareOp,
712        target: CompareTarget,
713        target_union: TargetUnion,
714    ) -> Self {
715        Self(PbCompare {
716            result: cmp as i32,
717            target: target as i32,
718            key: key.into(),
719            range_end: Vec::new(),
720            target_union: Some(target_union),
721        })
722    }
723
724    /// Compares the version of the given key.
725    #[inline]
726    pub fn version(key: impl Into<Vec<u8>>, cmp: CompareOp, version: i64) -> Self {
727        Self::new(
728            key,
729            cmp,
730            CompareTarget::Version,
731            TargetUnion::Version(version),
732        )
733    }
734
735    /// Compares the creation revision of the given key.
736    #[inline]
737    pub fn create_revision(key: impl Into<Vec<u8>>, cmp: CompareOp, revision: i64) -> Self {
738        Self::new(
739            key,
740            cmp,
741            CompareTarget::Create,
742            TargetUnion::CreateRevision(revision),
743        )
744    }
745
746    /// Compares the last modified revision of the given key.
747    #[inline]
748    pub fn mod_revision(key: impl Into<Vec<u8>>, cmp: CompareOp, revision: i64) -> Self {
749        Self::new(
750            key,
751            cmp,
752            CompareTarget::Mod,
753            TargetUnion::ModRevision(revision),
754        )
755    }
756
757    /// Compares the value of the given key.
758    #[inline]
759    pub fn value(key: impl Into<Vec<u8>>, cmp: CompareOp, value: impl Into<Vec<u8>>) -> Self {
760        Self::new(
761            key,
762            cmp,
763            CompareTarget::Value,
764            TargetUnion::Value(value.into()),
765        )
766    }
767
768    /// Compares the lease id of the given key.
769    #[inline]
770    pub fn lease(key: impl Into<Vec<u8>>, cmp: CompareOp, lease: i64) -> Self {
771        Self::new(key, cmp, CompareTarget::Lease, TargetUnion::Lease(lease))
772    }
773
774    /// Sets the comparison to scan the range [key, end).
775    #[inline]
776    pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
777        self.0.range_end = end.into();
778        self
779    }
780
781    /// Sets the comparison to scan all keys prefixed by the key.
782    #[inline]
783    pub fn with_prefix(mut self) -> Self {
784        self.0.range_end = get_prefix(&self.0.key);
785        self
786    }
787}
788
789/// Transaction operation.
790#[derive(Debug, Clone)]
791#[repr(transparent)]
792pub struct TxnOp(PbTxnOp);
793
794impl TxnOp {
795    /// `Put` operation.
796    #[inline]
797    pub fn put(
798        key: impl Into<Vec<u8>>,
799        value: impl Into<Vec<u8>>,
800        options: Option<PutOptions>,
801    ) -> Self {
802        TxnOp(PbTxnOp::RequestPut(
803            options.unwrap_or_default().with_kv(key, value).into(),
804        ))
805    }
806
807    /// `Get` operation.
808    #[inline]
809    pub fn get(key: impl Into<Vec<u8>>, options: Option<GetOptions>) -> Self {
810        TxnOp(PbTxnOp::RequestRange(
811            options.unwrap_or_default().with_key(key).into(),
812        ))
813    }
814
815    /// `Delete` operation.
816    #[inline]
817    pub fn delete(key: impl Into<Vec<u8>>, options: Option<DeleteOptions>) -> Self {
818        TxnOp(PbTxnOp::RequestDeleteRange(
819            options.unwrap_or_default().with_key(key).into(),
820        ))
821    }
822
823    /// `Txn` operation.
824    #[inline]
825    pub fn txn(txn: Txn) -> Self {
826        TxnOp(PbTxnOp::RequestTxn(txn.into()))
827    }
828}
829
830impl From<TxnOp> for PbTxnOp {
831    #[inline]
832    fn from(op: TxnOp) -> Self {
833        op.0
834    }
835}
836
837/// Transaction of multiple operations.
838#[derive(Debug, Default, Clone)]
839pub struct Txn {
840    req: PbTxnRequest,
841    c_when: bool,
842    c_then: bool,
843    c_else: bool,
844}
845
846impl Txn {
847    /// Creates a new transaction.
848    #[inline]
849    pub const fn new() -> Self {
850        Self {
851            req: PbTxnRequest {
852                compare: Vec::new(),
853                success: Vec::new(),
854                failure: Vec::new(),
855            },
856            c_when: false,
857            c_then: false,
858            c_else: false,
859        }
860    }
861
862    /// Takes a list of comparison. If all comparisons passed in succeed,
863    /// the operations passed into `and_then()` will be executed. Or the operations
864    /// passed into `or_else()` will be executed.
865    #[inline]
866    pub fn when(mut self, compares: impl Into<Vec<Compare>>) -> Self {
867        assert!(!self.c_when, "cannot call when twice");
868        assert!(!self.c_then, "cannot call when after and_then");
869        assert!(!self.c_else, "cannot call when after or_else");
870
871        self.c_when = true;
872
873        let compares = ManuallyDrop::new(compares.into());
874        self.req.compare = unsafe {
875            Vec::from_raw_parts(
876                compares.as_ptr() as *mut PbCompare,
877                compares.len(),
878                compares.capacity(),
879            )
880        };
881
882        self
883    }
884
885    /// Takes a list of operations. The operations list will be executed, if the
886    /// comparisons passed in `when()` succeed.
887    #[inline]
888    pub fn and_then(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
889        assert!(!self.c_then, "cannot call and_then twice");
890        assert!(!self.c_else, "cannot call and_then after or_else");
891
892        self.c_then = true;
893        self.req.success = operations
894            .into()
895            .into_iter()
896            .map(|op| PbTxnRequestOp {
897                request: Some(op.into()),
898            })
899            .collect();
900        self
901    }
902
903    /// Takes a list of operations. The operations list will be executed, if the
904    /// comparisons passed in `when()` fail.
905    #[inline]
906    pub fn or_else(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
907        assert!(!self.c_else, "cannot call or_else twice");
908
909        self.c_else = true;
910        self.req.failure = operations
911            .into()
912            .into_iter()
913            .map(|op| PbTxnRequestOp {
914                request: Some(op.into()),
915            })
916            .collect();
917        self
918    }
919
920    #[inline]
921    pub(crate) fn prefix_with(&mut self, prefix: &[u8]) {
922        self.req.prefix_with(prefix);
923    }
924}
925
926impl PbTxnRequest {
927    fn prefix_with(&mut self, prefix: &[u8]) {
928        let prefix_op = |op: &mut PbTxnRequestOp| {
929            if let Some(request) = &mut op.request {
930                match request {
931                    PbTxnOp::RequestRange(req) => {
932                        req.key.prefix_with(prefix);
933                        req.range_end.prefix_range_end_with(prefix);
934                    }
935                    PbTxnOp::RequestPut(req) => {
936                        req.key.prefix_with(prefix);
937                    }
938                    PbTxnOp::RequestDeleteRange(req) => {
939                        req.key.prefix_with(prefix);
940                        req.range_end.prefix_range_end_with(prefix);
941                    }
942                    PbTxnOp::RequestTxn(req) => {
943                        req.prefix_with(prefix);
944                    }
945                }
946            }
947        };
948
949        self.compare.iter_mut().for_each(|cmp| {
950            cmp.key.prefix_with(prefix);
951            cmp.range_end.prefix_range_end_with(prefix);
952        });
953        self.success.iter_mut().for_each(prefix_op);
954        self.failure.iter_mut().for_each(prefix_op);
955    }
956}
957
958impl From<Txn> for PbTxnRequest {
959    #[inline]
960    fn from(txn: Txn) -> Self {
961        txn.req
962    }
963}
964
965impl IntoRequest<PbTxnRequest> for Txn {
966    #[inline]
967    fn into_request(self) -> Request<PbTxnRequest> {
968        Request::new(self.into())
969    }
970}
971
972/// Transaction operation response.
973#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
974#[derive(Debug, Clone)]
975pub enum TxnOpResponse {
976    Put(PutResponse),
977    Get(GetResponse),
978    Delete(DeleteResponse),
979    Txn(TxnResponse),
980}
981
982/// Response for `Txn` operation.
983#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
984#[derive(Debug, Clone)]
985#[repr(transparent)]
986pub struct TxnResponse(PbTxnResponse);
987
988impl TxnResponse {
989    /// Creates a new `Txn` response.
990    #[inline]
991    const fn new(resp: PbTxnResponse) -> Self {
992        Self(resp)
993    }
994
995    /// Transaction response header.
996    #[inline]
997    pub fn header(&self) -> Option<&ResponseHeader> {
998        self.0.header.as_ref().map(From::from)
999    }
1000
1001    /// Takes the header out of the response, leaving a [`None`] in its place.
1002    #[inline]
1003    pub fn take_header(&mut self) -> Option<ResponseHeader> {
1004        self.0.header.take().map(ResponseHeader::new)
1005    }
1006
1007    /// Returns `true` if the compare evaluated to true or `false` otherwise.
1008    #[inline]
1009    pub const fn succeeded(&self) -> bool {
1010        self.0.succeeded
1011    }
1012
1013    /// Returns responses of transaction operations.
1014    #[inline]
1015    pub fn op_responses(&self) -> Vec<TxnOpResponse> {
1016        self.0
1017            .responses
1018            .iter()
1019            .map(|resp| match resp.response.as_ref().unwrap() {
1020                PbTxnOpResponse::ResponsePut(put) => {
1021                    TxnOpResponse::Put(PutResponse::new(put.clone()))
1022                }
1023                PbTxnOpResponse::ResponseRange(get) => {
1024                    TxnOpResponse::Get(GetResponse::new(get.clone()))
1025                }
1026                PbTxnOpResponse::ResponseDeleteRange(delete) => {
1027                    TxnOpResponse::Delete(DeleteResponse::new(delete.clone()))
1028                }
1029                PbTxnOpResponse::ResponseTxn(txn) => {
1030                    TxnOpResponse::Txn(TxnResponse::new(txn.clone()))
1031                }
1032            })
1033            .collect()
1034    }
1035
1036    #[inline]
1037    pub(crate) fn strip_key_prefix(&mut self, prefix: &[u8]) {
1038        self.0.strip_key_prefix(prefix);
1039    }
1040}
1041
1042impl PbTxnResponse {
1043    fn strip_key_prefix(&mut self, prefix: &[u8]) {
1044        self.responses.iter_mut().for_each(|op| {
1045            if let Some(resp) = &mut op.response {
1046                match resp {
1047                    PbTxnOpResponse::ResponseRange(r) => {
1048                        for kv in r.kvs.iter_mut() {
1049                            kv.key.strip_key_prefix(prefix);
1050                        }
1051                    }
1052                    PbTxnOpResponse::ResponsePut(r) => {
1053                        if let Some(kv) = r.prev_kv.as_mut() {
1054                            kv.key.strip_key_prefix(prefix);
1055                        }
1056                    }
1057                    PbTxnOpResponse::ResponseDeleteRange(r) => {
1058                        for kv in r.prev_kvs.iter_mut() {
1059                            kv.key.strip_key_prefix(prefix);
1060                        }
1061                    }
1062                    PbTxnOpResponse::ResponseTxn(r) => {
1063                        r.strip_key_prefix(prefix);
1064                    }
1065                }
1066            }
1067        });
1068    }
1069}