1pub use crate::rpc::pb::mvccpb::event::EventType;
4
5use crate::auth::AuthService;
6use crate::error::{Error, Result};
7use crate::intercept::InterceptedChannel;
8use crate::rpc::pb::etcdserverpb::watch_client::WatchClient as PbWatchClient;
9use crate::rpc::pb::etcdserverpb::watch_request::RequestUnion as WatchRequestUnion;
10use crate::rpc::pb::etcdserverpb::{
11 WatchCancelRequest, WatchCreateRequest, WatchProgressRequest, WatchRequest,
12 WatchResponse as PbWatchResponse,
13};
14use crate::rpc::pb::mvccpb::Event as PbEvent;
15use crate::rpc::{KeyRange, KeyValue, ResponseHeader};
16use http::HeaderValue;
17use std::pin::Pin;
18use std::sync::{Arc, RwLock};
19use std::task::{Context, Poll};
20use tokio::sync::mpsc::{channel, Sender};
21use tokio_stream::{wrappers::ReceiverStream, Stream};
22use tonic::Streaming;
23
24#[repr(transparent)]
26#[derive(Clone)]
27pub struct WatchClient {
28 inner: PbWatchClient<AuthService<InterceptedChannel>>,
29}
30
31impl WatchClient {
32 #[inline]
34 pub(crate) fn new(
35 channel: InterceptedChannel,
36 auth_token: Arc<RwLock<Option<HeaderValue>>>,
37 ) -> Self {
38 let inner = PbWatchClient::new(AuthService::new(channel, auth_token));
39 Self { inner }
40 }
41
42 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
46 self.inner = self.inner.max_decoding_message_size(limit);
47 self
48 }
49
50 pub async fn watch(
56 &mut self,
57 key: impl Into<Vec<u8>>,
58 options: Option<WatchOptions>,
59 ) -> Result<(Watcher, WatchStream)> {
60 let (request_sender, request_receiver) = channel::<WatchRequest>(100);
61 let request_stream = ReceiverStream::new(request_receiver);
62
63 request_sender
64 .send(options.unwrap_or_default().with_key(key).into())
65 .await
66 .map_err(|e| Error::WatchError(e.to_string()))?;
67
68 let response_stream = self.inner.watch(request_stream).await?.into_inner();
69 let mut watch_stream = WatchStream::new(response_stream);
70
71 let watch_id = match watch_stream.message().await? {
72 Some(resp) => {
73 assert!(resp.created(), "not a create watch response");
74 resp.watch_id()
75 }
76 None => {
77 return Err(Error::WatchError("failed to create watch".to_string()));
78 }
79 };
80
81 Ok((Watcher::new(watch_id, request_sender), watch_stream))
82 }
83}
84
85#[derive(Debug, Default, Clone)]
87pub struct WatchOptions {
88 req: WatchCreateRequest,
89 key_range: KeyRange,
90}
91
92impl WatchOptions {
93 #[inline]
95 fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
96 self.key_range.with_key(key);
97 self
98 }
99
100 #[inline]
102 pub const fn new() -> Self {
103 Self {
104 req: WatchCreateRequest {
105 key: Vec::new(),
106 range_end: Vec::new(),
107 start_revision: 0,
108 progress_notify: false,
109 filters: Vec::new(),
110 prev_kv: false,
111 watch_id: 0,
112 fragment: false,
113 },
114 key_range: KeyRange::new(),
115 }
116 }
117
118 #[inline]
122 pub fn with_range(mut self, end: impl Into<Vec<u8>>) -> Self {
123 self.key_range.with_range(end);
124 self
125 }
126
127 #[inline]
129 pub fn with_from_key(mut self) -> Self {
130 self.key_range.with_from_key();
131 self
132 }
133
134 #[inline]
136 pub fn with_prefix(mut self) -> Self {
137 self.key_range.with_prefix();
138 self
139 }
140
141 #[inline]
143 pub fn with_all_keys(mut self) -> Self {
144 self.key_range.with_all_keys();
145 self
146 }
147
148 #[inline]
150 pub const fn with_start_revision(mut self, revision: i64) -> Self {
151 self.req.start_revision = revision;
152 self
153 }
154
155 #[inline]
160 pub const fn with_progress_notify(mut self) -> Self {
161 self.req.progress_notify = true;
162 self
163 }
164
165 #[inline]
167 pub fn with_filters(mut self, filters: impl Into<Vec<WatchFilterType>>) -> Self {
168 self.req.filters = filters.into().into_iter().map(|f| f as i32).collect();
169 self
170 }
171
172 #[inline]
175 pub const fn with_prev_key(mut self) -> Self {
176 self.req.prev_kv = true;
177 self
178 }
179
180 #[inline]
186 pub const fn with_watch_id(mut self, watch_id: i64) -> Self {
187 self.req.watch_id = watch_id;
188 self
189 }
190
191 #[inline]
193 pub const fn with_fragment(mut self) -> Self {
194 self.req.fragment = true;
195 self
196 }
197}
198
199impl From<WatchOptions> for WatchCreateRequest {
200 #[inline]
201 fn from(mut options: WatchOptions) -> Self {
202 let (key, range_end) = options.key_range.build();
203 options.req.key = key;
204 options.req.range_end = range_end;
205 options.req
206 }
207}
208
209impl From<WatchOptions> for WatchRequest {
210 #[inline]
211 fn from(options: WatchOptions) -> Self {
212 Self {
213 request_union: Some(WatchRequestUnion::CreateRequest(options.into())),
214 }
215 }
216}
217
218impl From<WatchCancelRequest> for WatchRequest {
219 #[inline]
220 fn from(req: WatchCancelRequest) -> Self {
221 Self {
222 request_union: Some(WatchRequestUnion::CancelRequest(req)),
223 }
224 }
225}
226
227impl From<WatchProgressRequest> for WatchRequest {
228 #[inline]
229 fn from(req: WatchProgressRequest) -> Self {
230 Self {
231 request_union: Some(WatchRequestUnion::ProgressRequest(req)),
232 }
233 }
234}
235
236#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
238#[repr(i32)]
239pub enum WatchFilterType {
240 NoPut = 0,
242 NoDelete = 1,
244}
245
246#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
248#[derive(Debug, Clone)]
249#[repr(transparent)]
250pub struct WatchResponse(PbWatchResponse);
251
252impl WatchResponse {
253 #[inline]
255 const fn new(resp: PbWatchResponse) -> Self {
256 Self(resp)
257 }
258
259 #[inline]
261 pub fn header(&self) -> Option<&ResponseHeader> {
262 self.0.header.as_ref().map(From::from)
263 }
264
265 #[inline]
267 pub fn take_header(&mut self) -> Option<ResponseHeader> {
268 self.0.header.take().map(ResponseHeader::new)
269 }
270
271 #[inline]
273 pub const fn watch_id(&self) -> i64 {
274 self.0.watch_id
275 }
276
277 #[inline]
282 pub const fn created(&self) -> bool {
283 self.0.created
284 }
285
286 #[inline]
289 pub const fn canceled(&self) -> bool {
290 self.0.canceled
291 }
292
293 #[inline]
302 pub const fn compact_revision(&self) -> i64 {
303 self.0.compact_revision
304 }
305
306 #[inline]
308 pub fn cancel_reason(&self) -> &str {
309 &self.0.cancel_reason
310 }
311
312 #[inline]
314 pub fn events(&self) -> &[Event] {
315 unsafe { &*(self.0.events.as_slice() as *const _ as *const [Event]) }
316 }
317}
318
319#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
321#[derive(Debug, Clone)]
322#[repr(transparent)]
323pub struct Event(PbEvent);
324
325impl Event {
326 #[inline]
330 pub fn event_type(&self) -> EventType {
331 match self.0.r#type {
332 0 => EventType::Put,
333 1 => EventType::Delete,
334 i => panic!("unknown event {i}"),
335 }
336 }
337
338 #[inline]
344 pub fn kv(&self) -> Option<&KeyValue> {
345 self.0.kv.as_ref().map(From::from)
346 }
347
348 #[inline]
350 pub fn prev_kv(&self) -> Option<&KeyValue> {
351 self.0.prev_kv.as_ref().map(From::from)
352 }
353}
354
355#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
357#[derive(Debug)]
358pub struct Watcher {
359 watch_id: i64,
360 sender: Sender<WatchRequest>,
361}
362
363impl Watcher {
364 #[inline]
366 const fn new(watch_id: i64, sender: Sender<WatchRequest>) -> Self {
367 Self { watch_id, sender }
368 }
369
370 #[inline]
372 pub const fn watch_id(&self) -> i64 {
373 self.watch_id
374 }
375
376 #[inline]
378 pub async fn watch(
379 &mut self,
380 key: impl Into<Vec<u8>>,
381 options: Option<WatchOptions>,
382 ) -> Result<()> {
383 self.sender
384 .send(options.unwrap_or_default().with_key(key).into())
385 .await
386 .map_err(|e| Error::WatchError(e.to_string()))
387 }
388
389 #[inline]
391 pub async fn cancel(&mut self) -> Result<()> {
392 let req = WatchCancelRequest {
393 watch_id: self.watch_id,
394 };
395 self.sender
396 .send(req.into())
397 .await
398 .map_err(|e| Error::WatchError(e.to_string()))
399 }
400
401 #[inline]
403 pub async fn cancel_by_id(&mut self, watch_id: i64) -> Result<()> {
404 let req = WatchCancelRequest { watch_id };
405 self.sender
406 .send(req.into())
407 .await
408 .map_err(|e| Error::WatchError(e.to_string()))
409 }
410
411 #[inline]
414 pub async fn request_progress(&mut self) -> Result<()> {
415 let req = WatchProgressRequest {};
416 self.sender
417 .send(req.into())
418 .await
419 .map_err(|e| Error::WatchError(e.to_string()))
420 }
421}
422
423#[cfg_attr(feature = "pub-response-field", visible::StructFields(pub))]
425#[derive(Debug)]
426pub struct WatchStream {
427 stream: Streaming<PbWatchResponse>,
428}
429
430impl WatchStream {
431 #[inline]
433 const fn new(stream: Streaming<PbWatchResponse>) -> Self {
434 Self { stream }
435 }
436
437 #[inline]
439 pub async fn message(&mut self) -> Result<Option<WatchResponse>> {
440 match self.stream.message().await? {
441 Some(resp) => Ok(Some(WatchResponse::new(resp))),
442 None => Ok(None),
443 }
444 }
445}
446
447impl Stream for WatchStream {
448 type Item = Result<WatchResponse>;
449
450 #[inline]
451 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
452 Pin::new(&mut self.get_mut().stream)
453 .poll_next(cx)
454 .map(|t| match t {
455 Some(Ok(resp)) => Some(Ok(WatchResponse::new(resp))),
456 Some(Err(e)) => Some(Err(From::from(e))),
457 None => None,
458 })
459 }
460}