etcd_rs/watch/
mod.rs

1//! The Watch API provides an event-based interface for asynchronously monitoring changes to keys.
2
3mod watch;
4
5pub use watch::{WatchCancelRequest, WatchCreateRequest, WatchResponse};
6
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use async_trait::async_trait;
11use futures::Stream;
12use tokio::sync::mpsc::Sender;
13use tonic::Streaming;
14
15use crate::proto::etcdserverpb;
16use crate::proto::mvccpb;
17use crate::{Error, KeyValue, Result};
18
19#[async_trait]
20pub trait WatchOp {
21    async fn watch<R>(&self, req: R) -> Result<(WatchStream, WatchCanceler)>
22    where
23        R: Into<WatchCreateRequest> + Send;
24
25    // async fn cancel_watch<R>(&self, req: R) -> Result<()>
26    // where
27    //     R: Into<WatchCancelRequest> + Send;
28}
29
30#[derive(Debug)]
31pub enum WatchInbound {
32    Ready(WatchResponse),
33    Interrupted(Error),
34    Closed,
35}
36
37pub struct WatchStream {
38    stream: Streaming<etcdserverpb::WatchResponse>,
39    is_closed: bool,
40}
41
42impl WatchStream {
43    pub(crate) fn new(stream: Streaming<etcdserverpb::WatchResponse>) -> Self {
44        Self {
45            stream,
46            is_closed: false,
47        }
48    }
49
50    pub async fn inbound(&mut self) -> WatchInbound {
51        if self.is_closed {
52            return WatchInbound::Closed;
53        }
54
55        match self.stream.message().await {
56            Ok(Some(resp)) => {
57                if resp.canceled {
58                    self.is_closed = true;
59                }
60
61                if resp.canceled && resp.events.is_empty() {
62                    WatchInbound::Closed
63                } else {
64                    WatchInbound::Ready(resp.into())
65                }
66            }
67            Ok(None) => WatchInbound::Interrupted(Error::WatchEventExhausted),
68            Err(e) => WatchInbound::Interrupted(e.into()),
69        }
70    }
71}
72
73impl Stream for WatchStream {
74    type Item = WatchInbound;
75
76    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77        Pin::new(&mut self.get_mut().stream)
78            .poll_next(cx)
79            .map(|e| match e {
80                Some(Ok(resp)) => Some(WatchInbound::Ready(resp.into())),
81                Some(Err(e)) => Some(WatchInbound::Interrupted(e.into())),
82                None => Some(WatchInbound::Closed),
83            })
84    }
85}
86
87pub struct WatchCanceler {
88    watch_id: i64,
89    tx: Sender<etcdserverpb::WatchRequest>,
90}
91
92impl WatchCanceler {
93    pub(crate) fn new(watch_id: i64, tx: Sender<etcdserverpb::WatchRequest>) -> Self {
94        Self { watch_id, tx }
95    }
96
97    pub async fn cancel(self) -> Result<()> {
98        self.tx
99            .send(WatchCancelRequest::new(self.watch_id).into())
100            .await
101            .map_err(|e| Error::WatchChannelSend(e))
102    }
103}
104
105/// The kind of event.
106#[derive(Debug, PartialEq, Clone)]
107pub enum EventType {
108    Put,
109    Delete,
110}
111
112impl From<mvccpb::event::EventType> for EventType {
113    fn from(event_type: mvccpb::event::EventType) -> Self {
114        use mvccpb::event::EventType;
115        match event_type {
116            EventType::Put => Self::Put,
117            EventType::Delete => Self::Delete,
118        }
119    }
120}
121
122/// Every change to every key is represented with Event messages.
123#[derive(Debug, Clone)]
124pub struct Event {
125    pub event_type: EventType,
126    pub kv: KeyValue,
127    pub prev_kv: Option<KeyValue>,
128}
129
130impl From<mvccpb::Event> for Event {
131    fn from(proto: mvccpb::Event) -> Self {
132        Self {
133            event_type: match proto.r#type {
134                0 => EventType::Put,
135                _ => EventType::Delete, // FIXME: assert valid event type
136            },
137            kv: From::from(proto.kv.expect("must fetch kv")),
138            prev_kv: proto.prev_kv.map(KeyValue::from),
139        }
140    }
141}