ya_etcd_rs/watch/
watch.rs

1use crate::proto::etcdserverpb;
2use crate::proto::etcdserverpb::watch_request::RequestUnion;
3use crate::{Event, KeyRange, ResponseHeader};
4
5#[derive(Debug)]
6pub struct WatchCreateRequest {
7    proto: crate::proto::etcdserverpb::WatchCreateRequest,
8}
9
10impl WatchCreateRequest {
11    /// Creates a new WatchRequest which will subscribe events of the specified key.
12    pub fn create(key_range: KeyRange) -> Self {
13        Self {
14            proto: etcdserverpb::WatchCreateRequest {
15                key: key_range.key,
16                range_end: key_range.range_end,
17                start_revision: 0,
18                progress_notify: false,
19                filters: vec![], // TODO support filters
20                prev_kv: false,
21                fragment: false, // TODO support fragment
22                watch_id: 0,     // TODO support watch_id
23            },
24        }
25    }
26
27    /// Sets the revision to watch from (inclusive). No start_revision is "now".
28    pub fn start_revision(mut self, revision: i64) -> Self {
29        self.proto.start_revision = revision;
30        self
31    }
32
33    pub fn progress_notify(mut self) -> Self {
34        self.proto.progress_notify = true;
35        self
36    }
37
38    /// Sets previous key value.
39    pub fn prev_kv(mut self) -> Self {
40        self.proto.prev_kv = true;
41        self
42    }
43}
44
45impl From<WatchCreateRequest> for etcdserverpb::WatchCreateRequest {
46    fn from(value: WatchCreateRequest) -> Self {
47        value.proto
48    }
49}
50
51impl From<WatchCreateRequest> for etcdserverpb::WatchRequest {
52    fn from(value: WatchCreateRequest) -> Self {
53        etcdserverpb::WatchRequest {
54            request_union: Some(RequestUnion::CreateRequest(value.into())),
55        }
56    }
57}
58
59impl From<KeyRange> for WatchCreateRequest {
60    fn from(key_range: KeyRange) -> Self {
61        Self::create(key_range)
62    }
63}
64
65#[derive(Debug, Clone)]
66pub struct WatchCancelRequest {
67    proto: etcdserverpb::WatchCancelRequest,
68}
69
70impl WatchCancelRequest {
71    /// Creates a new WatchRequest which will unsubscribe the specified watch.
72    pub fn new(watch_id: i64) -> Self {
73        Self {
74            proto: etcdserverpb::WatchCancelRequest { watch_id },
75        }
76    }
77}
78
79impl From<i64> for WatchCancelRequest {
80    fn from(watch_id: i64) -> Self {
81        Self::new(watch_id)
82    }
83}
84
85impl From<WatchCancelRequest> for etcdserverpb::WatchCancelRequest {
86    fn from(value: WatchCancelRequest) -> Self {
87        value.proto
88    }
89}
90
91impl From<WatchCancelRequest> for etcdserverpb::WatchRequest {
92    fn from(value: WatchCancelRequest) -> Self {
93        etcdserverpb::WatchRequest {
94            request_union: Some(RequestUnion::CancelRequest(value.into())),
95        }
96    }
97}
98
99#[derive(Debug, Clone)]
100pub struct WatchResponse {
101    pub header: ResponseHeader,
102    pub watch_id: i64,
103    pub created: bool,
104    pub canceled: bool,
105    pub events: Vec<Event>,
106}
107
108impl From<etcdserverpb::WatchResponse> for WatchResponse {
109    fn from(proto: etcdserverpb::WatchResponse) -> Self {
110        Self {
111            header: From::from(proto.header.expect("must fetch header")),
112            watch_id: proto.watch_id,
113            created: proto.created,
114            canceled: proto.canceled,
115            events: proto.events.into_iter().map(From::from).collect(),
116        }
117    }
118}