etcd_client/rpc/
watch.rs

1//! Etcd Watch RPC.
2
3pub use crate::rpc::pb::mvccpb::event::EventType;
4
5use crate::auth::AuthService;
6use crate::error::{Error, Result};
7use crate::intercept::InterceptedChannel;
8use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
9use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
10use crate::rpc::pb::etcdserverpb::{
11    WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
12    WatchResponse as PbWatchResponse,
13};
14use crate::rpc::pb::mvccpb::Event as PbEvent;
15use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
16use http::HeaderValue;
17use std::pin::Pin;
18use std::sync::{Arc, RwLock};
19use std::task::{Context, Poll};
20use tokio::sync::mpsc::{channel, Sender};
21use tokio_stream::{wrappers::ReceiverStream, Stream};
22use tonic::Streaming;
23
24/// Client for watch operations.
25#[repr(transparent)]
26#[derive(Clone)]
27pub struct WatchClient {
28    inner: PbWatchClient<AuthService<InterceptedChannel>>,
29}
30
31impl WatchClient {
32    /// Creates a watch client.
33    #[inline]
34    pub(crate) fn new(
35        channel: InterceptedChannel,
36        auth_token: Arc<RwLock<Option<HeaderValue>>>,
37    ) -> Self {
38        let inner = PbWatchClient::new(AuthService::new(channel, auth_token));
39        Self { inner }
40    }
41
42    /// Limits the maximum size of a decoded message.
43    ///
44    /// Default: `4MB`
45    pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
46        self.inner = self.inner.max_decoding_message_size(limit);
47        self
48    }
49
50    /// Watches for events happening or that have happened. Both input and output
51    /// are streams; the input stream is for creating and canceling watchers and the output
52    /// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
53    /// for several watches at once. The entire event history can be watched starting from the
54    /// last compaction revision.
55    pub async fn watch(
56        &mut self,
57        key: impl Into<Vec<u8>>,
58        options: Option<WatchOptions>,
59    ) -> Result<(Watcher, WatchStream)> {
60        let (request_sender, request_receiver) = channel::<WatchRequest>(100);
61        let request_stream = ReceiverStream::new(request_receiver);
62
63        request_sender
64            .send(options.unwrap_or_default().with_key(key).into())
65            .await
66            .map_err(|e| Error::WatchError(e.to_string()))?;
67
68        let response_stream = self.inner.watch(request_stream).await?.into_inner();
69        let mut watch_stream = WatchStream::new(response_stream);
70
71        let watch_id = match watch_stream.message().await? {
72            Some(resp) => {
73                assert!(resp.created(), "not a create watch response");
74                resp.watch_id()
75            }
76            None => {
77                return Err(Error::WatchError("failed to create watch".to_string()));
78            }
79        };
80
81        Ok((Watcher::new(watch_id, request_sender), watch_stream))
82    }
83}
84
85/// Options for `Watch` operation.
86#[derive(Debug, Default, Clone)]
87pub struct WatchOptions {
88    req: WatchCreateRequest,
89    key_range: KeyRange,
90}
91
92impl WatchOptions {
93    /// Sets key.
94    #[inline]
95    fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
96        self.key_range.with_key(key);
97        self
98    }
99
100    /// Creates a new `WatchOptions`.
101    #[inline]
102    pub const fn new() -> Self {
103        Self {
104            req: WatchCreateRequest {
105                key: Vec::new(),
106                range_end: Vec::new(),
107                start_revision: 0,
108                progress_notify: false,
109                filters: Vec::new(),
110                prev_kv: false,
111                watch_id: 0,
112                fragment: false,
113            },
114            key_range: KeyRange::new(),
115        }
116    }
117
118    /// Sets the end of the range [key, end) to watch. If `end` is not given,
119    /// only the key argument is watched. If `end` is equal to '\0', all keys greater than
120    /// or equal to the key argument are watched.
121    #[inline]
122    pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
123        self.key_range.with_range(end);
124        self
125    }
126
127    /// Watches all keys >= key.
128    #[inline]
129    pub fn with_from_key(mut self) -> Self {
130        self.key_range.with_from_key();
131        self
132    }
133
134    /// Watches all keys prefixed with key.
135    #[inline]
136    pub fn with_prefix(mut self) -> Self {
137        self.key_range.with_prefix();
138        self
139    }
140
141    /// Watches all keys.
142    #[inline]
143    pub fn with_all_keys(mut self) -> Self {
144        self.key_range.with_all_keys();
145        self
146    }
147
148    /// Sets the revision to watch from (inclusive). No `start_revision` is "now".
149    #[inline]
150    pub const fn with_start_revision(mut self, revision: i64) -> Self {
151        self.req.start_revision = revision;
152        self
153    }
154
155    /// `progress_notify` is set so that the etcd server will periodically send a `WatchResponse` with
156    /// no events to the new watcher if there are no recent events. It is useful when clients
157    /// wish to recover a disconnected watcher starting from a recent known revision.
158    /// The etcd server may decide how often it will send notifications based on current load.
159    #[inline]
160    pub const fn with_progress_notify(mut self) -> Self {
161        self.req.progress_notify = true;
162        self
163    }
164
165    /// Filter the events at server side before it sends back to the watcher.
166    #[inline]
167    pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
168        self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
169        self
170    }
171
172    /// If `prev_kv` is set, created watcher gets the previous KV before the event happens.
173    /// If the previous KV is already compacted, nothing will be returned.
174    #[inline]
175    pub const fn with_prev_key(mut self) -> Self {
176        self.req.prev_kv = true;
177        self
178    }
179
180    /// If `watch_id` is provided and non-zero, it will be assigned to this watcher.
181    /// Since creating a watcher in etcd is not a synchronous operation,
182    /// this can be used ensure that ordering is correct when creating multiple
183    /// watchers on the same stream. Creating a watcher with an ID already in
184    /// use on the stream will cause an error to be returned.
185    #[inline]
186    pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
187        self.req.watch_id = watch_id;
188        self
189    }
190
191    /// Enables splitting large revisions into multiple watch responses.
192    #[inline]
193    pub const fn with_fragment(mut self) -> Self {
194        self.req.fragment = true;
195        self
196    }
197}
198
199impl From<WatchOptions> for WatchCreateRequest {
200    #[inline]
201    fn from(mut options: WatchOptions) -> Self {
202        let (key, range_end) = options.key_range.build();
203        options.req.key = key;
204        options.req.range_end = range_end;
205        options.req
206    }
207}
208
209impl From<WatchOptions> for WatchRequest {
210    #[inline]
211    fn from(options: WatchOptions) -> Self {
212        Self {
213            request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
214        }
215    }
216}
217
218impl From<WatchCancelRequest> for WatchRequest {
219    #[inline]
220    fn from(req: WatchCancelRequest) -> Self {
221        Self {
222            request_union: Some(WatchRequestUnion::CancelRequest(req)),
223        }
224    }
225}
226
227impl From<WatchProgressRequest> for WatchRequest {
228    #[inline]
229    fn from(req: WatchProgressRequest) -> Self {
230        Self {
231            request_union: Some(WatchRequestUnion::ProgressRequest(req)),
232        }
233    }
234}
235
236/// Watch filter type.
237#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
238#[repr(i32)]
239pub enum WatchFilterType {
240    /// Filter out put event.
241    NoPut = 0,
242    /// Filter out delete event.
243    NoDelete = 1,
244}
245
246/// Response for `Watch` operation.
247#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
248#[derive(Debug, Clone)]
249#[repr(transparent)]
250pub struct WatchResponse(PbWatchResponse);
251
252impl WatchResponse {
253    /// Creates a new `WatchResponse`.
254    #[inline]
255    const fn new(resp: PbWatchResponse) -> Self {
256        Self(resp)
257    }
258
259    /// Watch response header.
260    #[inline]
261    pub fn header(&self) -> Option<&ResponseHeader> {
262        self.0.header.as_ref().map(From::from)
263    }
264
265    /// Takes the header out of the response, leaving a [`None`] in its place.
266    #[inline]
267    pub fn take_header(&mut self) -> Option<ResponseHeader> {
268        self.0.header.take().map(ResponseHeader::new)
269    }
270
271    /// The ID of the watcher that corresponds to the response.
272    #[inline]
273    pub const fn watch_id(&self) -> i64 {
274        self.0.watch_id
275    }
276
277    /// created is set to true if the response is for a create watch request.
278    /// The client should record the watch_id and expect to receive events for
279    /// the created watcher from the same stream.
280    /// All events sent to the created watcher will attach with the same watch_id.
281    #[inline]
282    pub const fn created(&self) -> bool {
283        self.0.created
284    }
285
286    /// `canceled` is set to true if the response is for a cancel watch request.
287    /// No further events will be sent to the canceled watcher.
288    #[inline]
289    pub const fn canceled(&self) -> bool {
290        self.0.canceled
291    }
292
293    /// `compact_revision` is set to the minimum index if a watcher tries to watch
294    /// at a compacted index.
295    ///
296    /// This happens when creating a watcher at a compacted revision or the watcher cannot
297    /// catch up with the progress of the key-value store.
298    ///
299    /// The client should treat the watcher as canceled and should not try to create any
300    /// watcher with the same start_revision again.
301    #[inline]
302    pub const fn compact_revision(&self) -> i64 {
303        self.0.compact_revision
304    }
305
306    /// Indicates the reason for canceling the watcher.
307    #[inline]
308    pub fn cancel_reason(&self) -> &str {
309        &self.0.cancel_reason
310    }
311
312    /// Events happened on the watched keys.
313    #[inline]
314    pub fn events(&self) -> &[Event] {
315        unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
316    }
317}
318
319/// Watching event.
320#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
321#[derive(Debug, Clone)]
322#[repr(transparent)]
323pub struct Event(PbEvent);
324
325impl Event {
326    /// The kind of event. If type is a `Put`, it indicates
327    /// new data has been stored to the key. If type is a `Delete`,
328    /// it indicates the key was deleted.
329    #[inline]
330    pub fn event_type(&self) -> EventType {
331        match self.0.r#type {
332            0 => EventType::Put,
333            1 => EventType::Delete,
334            i => panic!("unknown event {i}"),
335        }
336    }
337
338    /// The KeyValue for the event.
339    /// A `Put` event contains current kv pair.
340    /// A `Put` event with `kv.version()==1` indicates the creation of a key.
341    /// A `Delete` event contains the deleted key with
342    /// its modification revision set to the revision of deletion.
343    #[inline]
344    pub fn kv(&self) -> Option<&KeyValue> {
345        self.0.kv.as_ref().map(From::from)
346    }
347
348    /// The key-value pair before the event happens.
349    #[inline]
350    pub fn prev_kv(&self) -> Option<&KeyValue> {
351        self.0.prev_kv.as_ref().map(From::from)
352    }
353}
354
355/// The watching handle.
356#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
357#[derive(Debug)]
358pub struct Watcher {
359    watch_id: i64,
360    sender: Sender<WatchRequest>,
361}
362
363impl Watcher {
364    /// Creates a new `Watcher`.
365    #[inline]
366    const fn new(watch_id: i64, sender: Sender<WatchRequest>) -> Self {
367        Self { watch_id, sender }
368    }
369
370    /// The ID of the watcher.
371    #[inline]
372    pub const fn watch_id(&self) -> i64 {
373        self.watch_id
374    }
375
376    /// Watches for events happening or that have happened.
377    #[inline]
378    pub async fn watch(
379        &mut self,
380        key: impl Into<Vec<u8>>,
381        options: Option<WatchOptions>,
382    ) -> Result<()> {
383        self.sender
384            .send(options.unwrap_or_default().with_key(key).into())
385            .await
386            .map_err(|e| Error::WatchError(e.to_string()))
387    }
388
389    /// Cancels this watcher.
390    #[inline]
391    pub async fn cancel(&mut self) -> Result<()> {
392        let req = WatchCancelRequest {
393            watch_id: self.watch_id,
394        };
395        self.sender
396            .send(req.into())
397            .await
398            .map_err(|e| Error::WatchError(e.to_string()))
399    }
400
401    /// Cancels watch by specified `watch_id`.
402    #[inline]
403    pub async fn cancel_by_id(&mut self, watch_id: i64) -> Result<()> {
404        let req = WatchCancelRequest { watch_id };
405        self.sender
406            .send(req.into())
407            .await
408            .map_err(|e| Error::WatchError(e.to_string()))
409    }
410
411    /// Requests a watch stream progress status be sent in the watch response stream as soon as
412    /// possible.
413    #[inline]
414    pub async fn request_progress(&mut self) -> Result<()> {
415        let req = WatchProgressRequest {};
416        self.sender
417            .send(req.into())
418            .await
419            .map_err(|e| Error::WatchError(e.to_string()))
420    }
421}
422
423/// The watch response stream.
424#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
425#[derive(Debug)]
426pub struct WatchStream {
427    stream: Streaming<PbWatchResponse>,
428}
429
430impl WatchStream {
431    /// Creates a new `WatchStream`.
432    #[inline]
433    const fn new(stream: Streaming<PbWatchResponse>) -> Self {
434        Self { stream }
435    }
436
437    /// Fetch the next message from this stream.
438    #[inline]
439    pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
440        match self.stream.message().await? {
441            Some(resp) => Ok(Some(WatchResponse::new(resp))),
442            None => Ok(None),
443        }
444    }
445}
446
447impl Stream for WatchStream {
448    type Item = Result<WatchResponse>;
449
450    #[inline]
451    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
452        Pin::new(&mut self.get_mut().stream)
453            .poll_next(cx)
454            .map(|t| match t {
455                Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
456                Some(Err(e)) => Some(Err(From::from(e))),
457                None => None,
458            })
459    }
460}