etcd_client/rpc/
watch.rs

1//! Etcd Watch RPC.
2
3pub use crate::rpc::pb::mvccpb::event::EventType;
4
5use crate::error::{Error, Result};
6use crate::intercept::InterceptedChannel;
7use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
8use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
9use crate::rpc::pb::etcdserverpb::{
10    WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
11    WatchResponse as PbWatchResponse,
12};
13use crate::rpc::pb::mvccpb::Event as PbEvent;
14use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use tokio::sync::mpsc::{channel, Sender};
18use tokio_stream::{wrappers::ReceiverStream, Stream};
19use tonic::Streaming;
20
21/// Client for watch operations.
22#[repr(transparent)]
23#[derive(Clone)]
24pub struct WatchClient {
25    inner: PbWatchClient<InterceptedChannel>,
26}
27
28impl WatchClient {
29    /// Creates a watch client.
30    #[inline]
31    pub(crate) fn new(channel: InterceptedChannel) -> Self {
32        let inner = PbWatchClient::new(channel);
33        Self { inner }
34    }
35
36    /// Limits the maximum size of a decoded message.
37    ///
38    /// Default: `4MB`
39    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
40        self.inner = self.inner.max_decoding_message_size(limit);
41        self
42    }
43
44    /// Watches for events happening or that have happened. Both input and output
45    /// are streams; the input stream is for creating and canceling watchers and the output
46    /// stream receives resposnes and events.
47    ///
48    /// One watch stream can watch on multiple key ranges, streaming events for several watches
49    /// are grouped by watch ID. The entire event history can be watched starting from the
50    /// last compaction revision.
51    pub async fn watch(
52        &mut self,
53        key: impl Into<Vec<u8>>,
54        options: Option<WatchOptions>,
55    ) -> Result<WatchStream> {
56        let (request_sender, request_receiver) = channel::<WatchRequest>(100);
57        request_sender
58            .send(options.unwrap_or_default().with_key(key).into())
59            .await
60            .map_err(|e| Error::WatchError(e.to_string()))?;
61        let request_stream = ReceiverStream::new(request_receiver);
62        let response_stream = self.inner.watch(request_stream).await?.into_inner();
63        Ok(WatchStream::new(request_sender, response_stream))
64    }
65}
66
67/// Options for `Watch` operation.
68#[derive(Debug, Default, Clone)]
69pub struct WatchOptions {
70    req: WatchCreateRequest,
71    key_range: KeyRange,
72}
73
74impl WatchOptions {
75    /// Sets key.
76    #[inline]
77    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
78        self.key_range.with_key(key);
79        self
80    }
81
82    /// Creates a new `WatchOptions`.
83    #[inline]
84    pub const fn new() -> Self {
85        Self {
86            req: WatchCreateRequest {
87                key: Vec::new(),
88                range_end: Vec::new(),
89                start_revision: 0,
90                progress_notify: false,
91                filters: Vec::new(),
92                prev_kv: false,
93                watch_id: 0,
94                fragment: false,
95            },
96            key_range: KeyRange::new(),
97        }
98    }
99
100    /// Sets the end of the range [key, end) to watch. If `end` is not given,
101    /// only the key argument is watched. If `end` is equal to '\0', all keys greater than
102    /// or equal to the key argument are watched.
103    #[inline]
104    pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
105        self.key_range.with_range(end);
106        self
107    }
108
109    /// Watches all keys >= key.
110    #[inline]
111    pub fn with_from_key(mut self) -> Self {
112        self.key_range.with_from_key();
113        self
114    }
115
116    /// Watches all keys prefixed with key.
117    #[inline]
118    pub fn with_prefix(mut self) -> Self {
119        self.key_range.with_prefix();
120        self
121    }
122
123    /// Watches all keys.
124    #[inline]
125    pub fn with_all_keys(mut self) -> Self {
126        self.key_range.with_all_keys();
127        self
128    }
129
130    /// Sets the revision to watch from (inclusive). No `start_revision` is "now".
131    #[inline]
132    pub const fn with_start_revision(mut self, revision: i64) -> Self {
133        self.req.start_revision = revision;
134        self
135    }
136
137    /// `progress_notify` is set so that the etcd server will periodically send a `WatchResponse` with
138    /// no events to the new watcher if there are no recent events. It is useful when clients
139    /// wish to recover a disconnected watcher starting from a recent known revision.
140    /// The etcd server may decide how often it will send notifications based on current load.
141    #[inline]
142    pub const fn with_progress_notify(mut self) -> Self {
143        self.req.progress_notify = true;
144        self
145    }
146
147    /// Filter the events at server side before it sends back to the watcher.
148    #[inline]
149    pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
150        self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
151        self
152    }
153
154    /// If `prev_kv` is set, created watcher gets the previous KV before the event happens.
155    /// If the previous KV is already compacted, nothing will be returned.
156    #[inline]
157    pub const fn with_prev_key(mut self) -> Self {
158        self.req.prev_kv = true;
159        self
160    }
161
162    /// If `watch_id` is provided and non-zero, it will be assigned to this watcher.
163    /// Since creating a watcher in etcd is not a synchronous operation,
164    /// this can be used ensure that ordering is correct when creating multiple
165    /// watchers on the same stream. Creating a watcher with an ID already in
166    /// use on the stream will cause an error to be returned.
167    #[inline]
168    pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
169        self.req.watch_id = watch_id;
170        self
171    }
172
173    /// Enables splitting large revisions into multiple watch responses.
174    #[inline]
175    pub const fn with_fragment(mut self) -> Self {
176        self.req.fragment = true;
177        self
178    }
179}
180
181impl From<WatchOptions> for WatchCreateRequest {
182    #[inline]
183    fn from(mut options: WatchOptions) -> Self {
184        let (key, range_end) = options.key_range.build();
185        options.req.key = key;
186        options.req.range_end = range_end;
187        options.req
188    }
189}
190
191impl From<WatchOptions> for WatchRequest {
192    #[inline]
193    fn from(options: WatchOptions) -> Self {
194        Self {
195            request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
196        }
197    }
198}
199
200impl From<WatchCancelRequest> for WatchRequest {
201    #[inline]
202    fn from(req: WatchCancelRequest) -> Self {
203        Self {
204            request_union: Some(WatchRequestUnion::CancelRequest(req)),
205        }
206    }
207}
208
209impl From<WatchProgressRequest> for WatchRequest {
210    #[inline]
211    fn from(req: WatchProgressRequest) -> Self {
212        Self {
213            request_union: Some(WatchRequestUnion::ProgressRequest(req)),
214        }
215    }
216}
217
218/// Watch filter type.
219#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
220#[repr(i32)]
221pub enum WatchFilterType {
222    /// Filter out put event.
223    NoPut = 0,
224    /// Filter out delete event.
225    NoDelete = 1,
226}
227
228/// Response for `Watch` operation.
229#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
230#[derive(Debug, Clone)]
231#[repr(transparent)]
232pub struct WatchResponse(PbWatchResponse);
233
234impl WatchResponse {
235    /// Creates a new `WatchResponse`.
236    #[inline]
237    const fn new(resp: PbWatchResponse) -> Self {
238        Self(resp)
239    }
240
241    /// Watch response header.
242    #[inline]
243    pub fn header(&self) -> Option<&ResponseHeader> {
244        self.0.header.as_ref().map(From::from)
245    }
246
247    /// Takes the header out of the response, leaving a [`None`] in its place.
248    #[inline]
249    pub fn take_header(&mut self) -> Option<ResponseHeader> {
250        self.0.header.take().map(ResponseHeader::new)
251    }
252
253    /// The ID of the watcher that corresponds to the response.
254    #[inline]
255    pub const fn watch_id(&self) -> i64 {
256        self.0.watch_id
257    }
258
259    /// created is set to true if the response is for a create watch request.
260    /// The client should record the watch_id and expect to receive events for
261    /// the created watcher from the same stream.
262    /// All events sent to the created watcher will attach with the same watch_id.
263    #[inline]
264    pub const fn created(&self) -> bool {
265        self.0.created
266    }
267
268    /// `canceled` is set to true if the response is for a cancel watch request.
269    /// No further events will be sent to the canceled watcher.
270    #[inline]
271    pub const fn canceled(&self) -> bool {
272        self.0.canceled
273    }
274
275    /// `compact_revision` is set to the minimum index if a watcher tries to watch
276    /// at a compacted index.
277    ///
278    /// This happens when creating a watcher at a compacted revision or the watcher cannot
279    /// catch up with the progress of the key-value store.
280    ///
281    /// The client should treat the watcher as canceled and should not try to create any
282    /// watcher with the same start_revision again.
283    #[inline]
284    pub const fn compact_revision(&self) -> i64 {
285        self.0.compact_revision
286    }
287
288    /// Indicates the reason for canceling the watcher.
289    #[inline]
290    pub fn cancel_reason(&self) -> &str {
291        &self.0.cancel_reason
292    }
293
294    /// Events happened on the watched keys.
295    #[inline]
296    pub fn events(&self) -> &[Event] {
297        unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
298    }
299}
300
301/// Watching event.
302#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
303#[derive(Debug, Clone)]
304#[repr(transparent)]
305pub struct Event(PbEvent);
306
307impl Event {
308    /// The kind of event. If type is a `Put`, it indicates
309    /// new data has been stored to the key. If type is a `Delete`,
310    /// it indicates the key was deleted.
311    #[inline]
312    pub fn event_type(&self) -> EventType {
313        match self.0.r#type {
314            0 => EventType::Put,
315            1 => EventType::Delete,
316            i => panic!("unknown event {i}"),
317        }
318    }
319
320    /// The KeyValue for the event.
321    /// A `Put` event contains current kv pair.
322    /// A `Put` event with `kv.version()==1` indicates the creation of a key.
323    /// A `Delete` event contains the deleted key with
324    /// its modification revision set to the revision of deletion.
325    #[inline]
326    pub fn kv(&self) -> Option<&KeyValue> {
327        self.0.kv.as_ref().map(From::from)
328    }
329
330    /// The key-value pair before the event happens.
331    #[inline]
332    pub fn prev_kv(&self) -> Option<&KeyValue> {
333        self.0.prev_kv.as_ref().map(From::from)
334    }
335}
336
337/// The watching handle.
338#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
339#[derive(Debug)]
340pub struct WatchStream {
341    request_stream: Sender<WatchRequest>,
342    response_stream: Streaming<PbWatchResponse>,
343}
344
345impl WatchStream {
346    /// Creates a new `WatchStream`.
347    #[inline]
348    const fn new(
349        request_stream: Sender<WatchRequest>,
350        response_stream: Streaming<PbWatchResponse>,
351    ) -> Self {
352        Self {
353            request_stream,
354            response_stream,
355        }
356    }
357
358    /// Send watch request in the existing watch stream.
359    #[inline]
360    pub async fn watch(
361        &mut self,
362        key: impl Into<Vec<u8>>,
363        options: Option<WatchOptions>,
364    ) -> Result<()> {
365        self.request_stream
366            .send(options.unwrap_or_default().with_key(key).into())
367            .await
368            .map_err(|e| Error::WatchError(e.to_string()))
369    }
370
371    /// Cancels watch by specified `watch_id`.
372    #[inline]
373    pub async fn cancel(&mut self, watch_id: i64) -> Result<()> {
374        let req = WatchCancelRequest { watch_id };
375        self.request_stream
376            .send(req.into())
377            .await
378            .map_err(|e| Error::WatchError(e.to_string()))
379    }
380
381    /// Requests a watch stream progress status be sent in the watch response stream as soon as
382    /// possible.
383    #[inline]
384    pub async fn request_progress(&mut self) -> Result<()> {
385        let req = WatchProgressRequest {};
386        self.request_stream
387            .send(req.into())
388            .await
389            .map_err(|e| Error::WatchError(e.to_string()))
390    }
391
392    /// Receive [`WatchResponse`] from this watch stream.
393    #[inline]
394    pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
395        match self.response_stream.message().await? {
396            Some(resp) => Ok(Some(WatchResponse::new(resp))),
397            None => Ok(None),
398        }
399    }
400
401    /// Splits the watch stream into a sender and a receiver.
402    pub fn split(self) -> (Sender<WatchRequest>, Streaming<PbWatchResponse>) {
403        (self.request_stream, self.response_stream)
404    }
405}
406
407impl Stream for WatchStream {
408    type Item = Result<WatchResponse>;
409
410    #[inline]
411    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
412        Pin::new(&mut self.get_mut().response_stream)
413            .poll_next(cx)
414            .map(|t| match t {
415                Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
416                Some(Err(e)) => Some(Err(From::from(e))),
417                None => None,
418            })
419    }
420}