1mod watch;
2
3pub use watch::{WatchCancelRequest, WatchCreateRequest, WatchResponse};
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use async_trait::async_trait;
9use futures::Stream;
10use tokio::sync::mpsc::Sender;
11use tonic::Streaming;
12
13use crate::proto::etcdserverpb;
14use crate::proto::mvccpb;
15use crate::{Error, KeyValue, Result};
16
17#[async_trait]
18pub trait WatchOp {
19 async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
20 where
21 R: Into<WatchCreateRequest> + Send;
22}
23
24#[derive(Debug)]
25pub enum WatchInbound {
26 Ready(WatchResponse),
27 Interrupted(Error),
28 Closed,
29}
30
31pub struct WatchStream {
32 stream: Streaming<etcdserverpb::WatchResponse>,
33 is_closed: bool,
34}
35
36impl WatchStream {
37 pub(crate) fn new(stream: Streaming<etcdserverpb::WatchResponse>) -> Self {
38 Self {
39 stream,
40 is_closed: false,
41 }
42 }
43
44 pub async fn inbound(&mut self) -> WatchInbound {
45 if self.is_closed {
46 return WatchInbound::Closed;
47 }
48
49 match self.stream.message().await {
50 Ok(Some(resp)) => {
51 if resp.canceled {
52 self.is_closed = true;
53 }
54
55 if resp.canceled && resp.events.is_empty() {
56 WatchInbound::Closed
57 } else {
58 WatchInbound::Ready(resp.into())
59 }
60 }
61 Ok(None) => WatchInbound::Interrupted(Error::WatchEventExhausted),
62 Err(e) => WatchInbound::Interrupted(e.into()),
63 }
64 }
65}
66
67impl Stream for WatchStream {
68 type Item = WatchInbound;
69
70 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71 Pin::new(&mut self.get_mut().stream)
72 .poll_next(cx)
73 .map(|e| match e {
74 Some(Ok(resp)) => Some(WatchInbound::Ready(resp.into())),
75 Some(Err(e)) => Some(WatchInbound::Interrupted(e.into())),
76 None => Some(WatchInbound::Closed),
77 })
78 }
79}
80
81pub struct WatchCanceler {
82 watch_id: i64,
83 tx: Sender<etcdserverpb::WatchRequest>,
84}
85
86impl WatchCanceler {
87 pub(crate) fn new(watch_id: i64, tx: Sender<etcdserverpb::WatchRequest>) -> Self {
88 Self { watch_id, tx }
89 }
90
91 pub async fn cancel(self) -> Result<()> {
92 self.tx
93 .send(WatchCancelRequest::new(self.watch_id).into())
94 .await
95 .map_err(|e| Error::WatchChannelSend(e))
96 }
97}
98
99#[derive(Debug, PartialEq, Clone)]
100pub enum EventType {
101 Put,
102 Delete,
103}
104
105impl From<mvccpb::event::EventType> for EventType {
106 fn from(event_type: mvccpb::event::EventType) -> Self {
107 use mvccpb::event::EventType;
108 match event_type {
109 EventType::Put => Self::Put,
110 EventType::Delete => Self::Delete,
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
116pub struct Event {
117 pub event_type: EventType,
118 pub kv: KeyValue,
119}
120
121impl From<mvccpb::Event> for Event {
122 fn from(proto: mvccpb::Event) -> Self {
123 Self {
124 event_type: match proto.r#type {
125 0 => EventType::Put,
126 _ => EventType::Delete,
127 },
128 kv: From::from(proto.kv.expect("must fetch kv")),
129 }
130 }
131}