1pub use crate::rpc::pb::mvccpb::event::EventType;
4
5use crate::auth::AuthService;
6use crate::channel::Channel;
7use crate::error::{Error, Result};
8use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
9use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
10use crate::rpc::pb::etcdserverpb::{
11 WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
12 WatchResponse as PbWatchResponse,
13};
14use crate::rpc::pb::mvccpb::Event as PbEvent;
15use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
16use http::HeaderValue;
17use std::pin::Pin;
18use std::sync::{Arc, RwLock};
19use std::task::{Context, Poll};
20use tokio::sync::mpsc::{channel, Sender};
21use tokio_stream::{wrappers::ReceiverStream, Stream};
22use tonic::Streaming;
23
24#[repr(transparent)]
26#[derive(Clone)]
27pub struct WatchClient {
28 inner: PbWatchClient<AuthService<Channel>>,
29}
30
31impl WatchClient {
32 #[inline]
34 pub(crate) fn new(channel: Channel, auth_token: Arc<RwLock<Option<HeaderValue>>>) -> Self {
35 let inner = PbWatchClient::new(AuthService::new(channel, auth_token));
36 Self { inner }
37 }
38
39 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
43 self.inner = self.inner.max_decoding_message_size(limit);
44 self
45 }
46
47 pub async fn watch(
53 &mut self,
54 key: impl Into<Vec<u8>>,
55 options: Option<WatchOptions>,
56 ) -> Result<(Watcher, WatchStream)> {
57 let (request_sender, request_receiver) = channel::<WatchRequest>(100);
58 let request_stream = ReceiverStream::new(request_receiver);
59
60 request_sender
61 .send(options.unwrap_or_default().with_key(key).into())
62 .await
63 .map_err(|e| Error::WatchError(e.to_string()))?;
64
65 let response_stream = self.inner.watch(request_stream).await?.into_inner();
66 let mut watch_stream = WatchStream::new(response_stream);
67
68 let watch_id = match watch_stream.message().await? {
69 Some(resp) => {
70 assert!(resp.created(), "not a create watch response");
71 resp.watch_id()
72 }
73 None => {
74 return Err(Error::WatchError("failed to create watch".to_string()));
75 }
76 };
77
78 Ok((Watcher::new(watch_id, request_sender), watch_stream))
79 }
80}
81
82#[derive(Debug, Default, Clone)]
84pub struct WatchOptions {
85 req: WatchCreateRequest,
86 key_range: KeyRange,
87}
88
89impl WatchOptions {
90 #[inline]
92 fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
93 self.key_range.with_key(key);
94 self
95 }
96
97 #[inline]
99 pub const fn new() -> Self {
100 Self {
101 req: WatchCreateRequest {
102 key: Vec::new(),
103 range_end: Vec::new(),
104 start_revision: 0,
105 progress_notify: false,
106 filters: Vec::new(),
107 prev_kv: false,
108 watch_id: 0,
109 fragment: false,
110 },
111 key_range: KeyRange::new(),
112 }
113 }
114
115 #[inline]
119 pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
120 self.key_range.with_range(end);
121 self
122 }
123
124 #[inline]
126 pub fn with_from_key(mut self) -> Self {
127 self.key_range.with_from_key();
128 self
129 }
130
131 #[inline]
133 pub fn with_prefix(mut self) -> Self {
134 self.key_range.with_prefix();
135 self
136 }
137
138 #[inline]
140 pub fn with_all_keys(mut self) -> Self {
141 self.key_range.with_all_keys();
142 self
143 }
144
145 #[inline]
147 pub const fn with_start_revision(mut self, revision: i64) -> Self {
148 self.req.start_revision = revision;
149 self
150 }
151
152 #[inline]
157 pub const fn with_progress_notify(mut self) -> Self {
158 self.req.progress_notify = true;
159 self
160 }
161
162 #[inline]
164 pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
165 self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
166 self
167 }
168
169 #[inline]
172 pub const fn with_prev_key(mut self) -> Self {
173 self.req.prev_kv = true;
174 self
175 }
176
177 #[inline]
183 pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
184 self.req.watch_id = watch_id;
185 self
186 }
187
188 #[inline]
190 pub const fn with_fragment(mut self) -> Self {
191 self.req.fragment = true;
192 self
193 }
194}
195
196impl From<WatchOptions> for WatchCreateRequest {
197 #[inline]
198 fn from(mut options: WatchOptions) -> Self {
199 let (key, range_end) = options.key_range.build();
200 options.req.key = key;
201 options.req.range_end = range_end;
202 options.req
203 }
204}
205
206impl From<WatchOptions> for WatchRequest {
207 #[inline]
208 fn from(options: WatchOptions) -> Self {
209 Self {
210 request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
211 }
212 }
213}
214
215impl From<WatchCancelRequest> for WatchRequest {
216 #[inline]
217 fn from(req: WatchCancelRequest) -> Self {
218 Self {
219 request_union: Some(WatchRequestUnion::CancelRequest(req)),
220 }
221 }
222}
223
224impl From<WatchProgressRequest> for WatchRequest {
225 #[inline]
226 fn from(req: WatchProgressRequest) -> Self {
227 Self {
228 request_union: Some(WatchRequestUnion::ProgressRequest(req)),
229 }
230 }
231}
232
233#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
235#[repr(i32)]
236pub enum WatchFilterType {
237 NoPut = 0,
239 NoDelete = 1,
241}
242
243#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
245#[derive(Debug, Clone)]
246#[repr(transparent)]
247pub struct WatchResponse(PbWatchResponse);
248
249impl WatchResponse {
250 #[inline]
252 const fn new(resp: PbWatchResponse) -> Self {
253 Self(resp)
254 }
255
256 #[inline]
258 pub fn header(&self) -> Option<&ResponseHeader> {
259 self.0.header.as_ref().map(From::from)
260 }
261
262 #[inline]
264 pub fn take_header(&mut self) -> Option<ResponseHeader> {
265 self.0.header.take().map(ResponseHeader::new)
266 }
267
268 #[inline]
270 pub const fn watch_id(&self) -> i64 {
271 self.0.watch_id
272 }
273
274 #[inline]
279 pub const fn created(&self) -> bool {
280 self.0.created
281 }
282
283 #[inline]
286 pub const fn canceled(&self) -> bool {
287 self.0.canceled
288 }
289
290 #[inline]
299 pub const fn compact_revision(&self) -> i64 {
300 self.0.compact_revision
301 }
302
303 #[inline]
305 pub fn cancel_reason(&self) -> &str {
306 &self.0.cancel_reason
307 }
308
309 #[inline]
311 pub fn events(&self) -> &[Event] {
312 unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
313 }
314}
315
316#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
318#[derive(Debug, Clone)]
319#[repr(transparent)]
320pub struct Event(PbEvent);
321
322impl Event {
323 #[inline]
327 pub fn event_type(&self) -> EventType {
328 match self.0.r#type {
329 0 => EventType::Put,
330 1 => EventType::Delete,
331 i => panic!("unknown event {}", i),
332 }
333 }
334
335 #[inline]
341 pub fn kv(&self) -> Option<&KeyValue> {
342 self.0.kv.as_ref().map(From::from)
343 }
344
345 #[inline]
347 pub fn prev_kv(&self) -> Option<&KeyValue> {
348 self.0.prev_kv.as_ref().map(From::from)
349 }
350}
351
352#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
354#[derive(Debug)]
355pub struct Watcher {
356 watch_id: i64,
357 sender: Sender<WatchRequest>,
358}
359
360impl Watcher {
361 #[inline]
363 const fn new(watch_id: i64, sender: Sender<WatchRequest>) -> Self {
364 Self { watch_id, sender }
365 }
366
367 #[inline]
369 pub const fn watch_id(&self) -> i64 {
370 self.watch_id
371 }
372
373 #[inline]
375 pub async fn watch(
376 &mut self,
377 key: impl Into<Vec<u8>>,
378 options: Option<WatchOptions>,
379 ) -> Result<()> {
380 self.sender
381 .send(options.unwrap_or_default().with_key(key).into())
382 .await
383 .map_err(|e| Error::WatchError(e.to_string()))
384 }
385
386 #[inline]
388 pub async fn cancel(&mut self) -> Result<()> {
389 let req = WatchCancelRequest {
390 watch_id: self.watch_id,
391 };
392 self.sender
393 .send(req.into())
394 .await
395 .map_err(|e| Error::WatchError(e.to_string()))
396 }
397
398 #[inline]
400 pub async fn cancel_by_id(&mut self, watch_id: i64) -> Result<()> {
401 let req = WatchCancelRequest { watch_id };
402 self.sender
403 .send(req.into())
404 .await
405 .map_err(|e| Error::WatchError(e.to_string()))
406 }
407
408 #[inline]
411 pub async fn request_progress(&mut self) -> Result<()> {
412 let req = WatchProgressRequest {};
413 self.sender
414 .send(req.into())
415 .await
416 .map_err(|e| Error::WatchError(e.to_string()))
417 }
418}
419
420#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
422#[derive(Debug)]
423pub struct WatchStream {
424 stream: Streaming<PbWatchResponse>,
425}
426
427impl WatchStream {
428 #[inline]
430 const fn new(stream: Streaming<PbWatchResponse>) -> Self {
431 Self { stream }
432 }
433
434 #[inline]
436 pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
437 match self.stream.message().await? {
438 Some(resp) => Ok(Some(WatchResponse::new(resp))),
439 None => Ok(None),
440 }
441 }
442}
443
444impl Stream for WatchStream {
445 type Item = Result<WatchResponse>;
446
447 #[inline]
448 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
449 Pin::new(&mut self.get_mut().stream)
450 .poll_next(cx)
451 .map(|t| match t {
452 Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
453 Some(Err(e)) => Some(Err(From::from(e))),
454 None => None,
455 })
456 }
457}