etcd_rs/watch/
watch.rs

1use crate::proto::etcdserverpb;
2use crate::proto::etcdserverpb::watch_request::RequestUnion;
3use crate::{Event, KeyRange, ResponseHeader};
4
5#[derive(Debug)]
6pub struct WatchCreateRequest {
7    proto: crate::proto::etcdserverpb::WatchCreateRequest,
8}
9
10impl WatchCreateRequest {
11    /// Creates a new WatchRequest which will subscribe events of the specified key.
12    pub fn create(key_range: KeyRange) -> Self {
13        Self {
14            proto: etcdserverpb::WatchCreateRequest {
15                key: key_range.key,
16                range_end: key_range.range_end,
17                start_revision: 0,
18                progress_notify: false,
19                filters: vec![], // TODO support filters
20                prev_kv: false,
21                fragment: false, // TODO support fragment
22                watch_id: 0,     // TODO support watch_id
23            },
24        }
25    }
26
27    /// Sets the revision to watch from (inclusive). No start_revision is "now".
28    pub fn start_revision(mut self, revision: i64) -> Self {
29        self.proto.start_revision = revision;
30        self
31    }
32
33    pub fn progress_notify(mut self) -> Self {
34        self.proto.progress_notify = true;
35        self
36    }
37
38    /// Sets previous key value.
39    pub fn prev_kv(mut self) -> Self {
40        self.proto.prev_kv = true;
41        self
42    }
43}
44impl Into<etcdserverpb::WatchCreateRequest> for WatchCreateRequest {
45    fn into(self) -> etcdserverpb::WatchCreateRequest {
46        self.proto
47    }
48}
49
50impl Into<etcdserverpb::WatchRequest> for WatchCreateRequest {
51    fn into(self) -> etcdserverpb::WatchRequest {
52        etcdserverpb::WatchRequest {
53            request_union: Some(RequestUnion::CreateRequest(self.into())),
54        }
55    }
56}
57
58impl From<KeyRange> for WatchCreateRequest {
59    fn from(key_range: KeyRange) -> Self {
60        Self::create(key_range)
61    }
62}
63
64#[derive(Debug, Clone)]
65pub struct WatchCancelRequest {
66    proto: etcdserverpb::WatchCancelRequest,
67}
68
69impl WatchCancelRequest {
70    /// Creates a new WatchRequest which will unsubscribe the specified watch.
71    pub fn new(watch_id: i64) -> Self {
72        Self {
73            proto: etcdserverpb::WatchCancelRequest { watch_id },
74        }
75    }
76}
77
78impl From<i64> for WatchCancelRequest {
79    fn from(watch_id: i64) -> Self {
80        Self::new(watch_id)
81    }
82}
83
84impl Into<etcdserverpb::WatchCancelRequest> for WatchCancelRequest {
85    fn into(self) -> etcdserverpb::WatchCancelRequest {
86        self.proto
87    }
88}
89
90impl Into<etcdserverpb::WatchRequest> for WatchCancelRequest {
91    fn into(self) -> etcdserverpb::WatchRequest {
92        etcdserverpb::WatchRequest {
93            request_union: Some(RequestUnion::CancelRequest(self.into())),
94        }
95    }
96}
97
98#[derive(Debug, Clone)]
99pub struct WatchResponse {
100    pub header: ResponseHeader,
101    pub watch_id: i64,
102    pub created: bool,
103    pub canceled: bool,
104    pub events: Vec<Event>,
105}
106
107impl From<etcdserverpb::WatchResponse> for WatchResponse {
108    fn from(proto: etcdserverpb::WatchResponse) -> Self {
109        Self {
110            header: From::from(proto.header.expect("must fetch header")),
111            watch_id: proto.watch_id,
112            created: proto.created,
113            canceled: proto.canceled,
114            events: proto.events.into_iter().map(From::from).collect(),
115        }
116    }
117}