rust_etcd_utils/
watcher.rs1use {
2 super::{retry::retry_etcd_legacy, Revision},
3 crate::retry::is_transient,
4 etcd_client::{EventType, WatchClient, WatchFilterType, WatchOptions},
5 retry::delay::Exponential,
6 serde::de::DeserializeOwned,
7 tokio::sync::{broadcast, mpsc},
8 tokio_stream::StreamExt,
9 tracing::{error, info, warn},
10};
11
12pub enum WatchEvent<V> {
18 Put {
19 key: Vec<u8>,
20 value: V,
21 revision: Revision,
22 },
23 Delete {
24 key: Vec<u8>,
25 prev_value: Option<V>,
26 revision: Revision,
27 },
28}
29
30#[async_trait::async_trait]
40pub trait WatchClientExt {
41 fn get_watch_client(&self) -> WatchClient;
42
43 fn json_watch_channel<V>(
53 &self,
54 key: impl Into<Vec<u8>>,
55 watch_options: Option<WatchOptions>,
56 ) -> mpsc::Receiver<WatchEvent<V>>
57 where
58 V: DeserializeOwned + Send + 'static,
59 {
60 let wc = self.get_watch_client();
61 let (tx, rx) = tokio::sync::mpsc::channel(10);
62 let key: Vec<u8> = key.into();
63 tokio::spawn(async move {
64 let wopts_prototype = watch_options.unwrap_or_default().with_prev_key();
65 let mut last_revision = None; 'outer: loop {
67 let mut wopts = wopts_prototype.clone();
68 if let Some(rev) = last_revision {
69 wopts = wopts.with_start_revision(rev);
70 }
71 let wc2 = wc.clone();
72 let key2 = key.clone();
73 let retry_strategy = Exponential::from_millis_with_factor(10, 10.0).take(3);
74
75 let (mut watcher, mut stream) = retry_etcd_legacy(retry_strategy, move || {
76 let mut wc = wc2.clone();
77 let key = key2.clone();
78 let wopts = wopts.clone();
79 async move { wc.watch(key.clone(), Some(wopts)).await }
80 })
81 .await
82 .expect("watch retry failed");
83
84 'inner: while let Some(watch_resp) = stream.next().await {
85 match watch_resp {
86 Ok(watch_resp) => {
87 if watch_resp.canceled() {
88 error!("watch cancelled: {watch_resp:?}");
90 break 'outer;
91 }
92 for event in watch_resp.events() {
93 let watch_event = match event.event_type() {
94 EventType::Put => {
95 let kv = event.kv().expect("put event with no kv");
96 let key = Vec::from(kv.key());
97 let value = serde_json::from_slice::<V>(kv.value())
98 .expect("failed to deserialize controller state");
99 last_revision.replace(kv.mod_revision());
100 WatchEvent::Put {
101 key,
102 value,
103 revision: kv.mod_revision(),
104 }
105 }
106 EventType::Delete => {
107 let kv = event.kv().expect("delete event with no kv");
108 let prev_value = event
109 .prev_kv()
110 .map(|prev_kv| prev_kv.value())
111 .map(|prev_v| serde_json::from_slice::<V>(prev_v))
112 .transpose()
113 .expect("failed to deserialize prev controller state");
114 let key = Vec::from(kv.key());
115 last_revision.replace(kv.mod_revision());
116 WatchEvent::Delete {
117 key,
118 prev_value,
119 revision: kv.mod_revision(),
120 }
121 }
122 };
123 if tx.send(watch_event).await.is_err() {
124 warn!("closed watch event receiver");
125 break 'outer;
126 }
127 }
128 }
129 Err(e) => {
130 error!("watch stream error: {:?}", e);
131 break 'inner;
132 }
133 }
134 }
135 let _ = watcher.cancel().await;
136 }
137 });
138 rx
139 }
140
141 fn json_put_watch_channel<T>(
142 &self,
143 key: impl Into<Vec<u8>>,
144 watch_options: Option<WatchOptions>,
145 ) -> mpsc::Receiver<(Revision, T)>
146 where
147 T: DeserializeOwned + Send + 'static,
148 {
149 let wc = self.get_watch_client();
150 let (tx, rx) = tokio::sync::mpsc::channel(10);
151 let key: Vec<u8> = key.into();
152 tokio::spawn(async move {
153 let mut last_revision = None; let wopts_prototype = watch_options
155 .unwrap_or_default()
156 .with_filters(vec![WatchFilterType::NoDelete]);
157 'outer: loop {
158 let mut wopts = wopts_prototype.clone();
159 if let Some(rev) = last_revision {
160 wopts = wopts.with_start_revision(rev);
161 }
162
163 let retry_strategy = Exponential::from_millis_with_factor(10, 10.0).take(3);
164 let wc2 = wc.clone();
165 let key2 = key.clone();
166 let (mut watcher, mut stream) = retry_etcd_legacy(retry_strategy, move || {
167 let mut wc = wc2.clone();
168 let key = key2.clone();
169 let wopts = wopts.clone();
170 async move { wc.watch(key.clone(), Some(wopts)).await }
171 })
172 .await
173 .expect("watch retry failed");
174
175 'inner: while let Some(watch_resp) = stream.next().await {
176 match watch_resp {
177 Ok(watch_resp) => {
178 let max_kv = watch_resp
179 .events()
180 .iter()
181 .filter_map(|ev| ev.kv())
182 .max_by_key(|kv| kv.mod_revision());
183 if let Some(kv) = max_kv {
184 let revision = kv.mod_revision();
185 last_revision.replace(revision);
186
187 let state = serde_json::from_slice::<T>(kv.value())
188 .expect("failed to deserialize kv value");
189 if tx.send((revision, state)).await.is_err() {
190 let key_str = String::from_utf8(key).expect("key is not utf8");
191 warn!("json watch channel closed its receiving half for {key_str}");
192 break 'outer;
193 }
194 } else if watch_resp.canceled() {
195 error!("watch cancelled: {watch_resp:?}");
197 break 'outer;
198 }
199 }
200 Err(e) => {
201 error!("watch stream error: {:?}", e);
202 break 'inner;
203 }
204 }
205 }
206 let _ = watcher.cancel().await;
207 }
208 });
209 rx
210 }
211
212 fn watch_lock_key_change(
216 &self,
217 key: impl Into<Vec<u8>>,
218 key_mod_revision: Revision,
219 ) -> broadcast::Sender<Revision> {
220 let wc = self.get_watch_client();
221 let key: Vec<u8> = key.into();
222
223 let (tx, _) = broadcast::channel(1);
224
225 let tx2 = tx.clone();
226 tokio::spawn(async move {
227 let tx = tx2;
228 'outer: loop {
229 let key2 = key.clone();
230 let key = key.clone();
231 let wopts = WatchOptions::new().with_start_revision(key_mod_revision);
232 let retry_strategy = Exponential::from_millis_with_factor(10, 10.0).take(3);
233 let wc2 = wc.clone();
234 let (mut watcher, mut stream) = retry_etcd_legacy(retry_strategy, move || {
235 let mut wc = wc2.clone();
236 let key = key2.clone();
237 let wopts = wopts.clone();
238 async move { wc.watch(key.clone(), Some(wopts)).await }
239 })
240 .await
241 .expect("watch retry failed");
242
243 'inner: while let Some(result) = stream.next().await {
244 match result {
245 Ok(watch_resp) => {
246 if watch_resp.canceled() {
247 error!("watch cancelled: {watch_resp:?}");
249 break 'inner;
250 }
251 for event in watch_resp.events() {
252 match event.event_type() {
253 EventType::Put => {
254 let kv = event.kv().expect("put event with no kv");
255 if kv.key() == key {
256 continue;
257 }
258 let revision = kv.mod_revision();
259 if revision <= key_mod_revision {
260 continue 'inner;
261 }
262 info!("watcher detected put event on key {key:?} with revision {revision} > {key_mod_revision}");
263 let _ = tx.send(revision);
264 let _ = watcher.cancel().await;
265 break 'outer;
266 }
267 EventType::Delete => {
268 let kv = event.kv().expect("delete event with no kv");
269 let revision = kv.mod_revision();
270 if revision < key_mod_revision {
271 continue;
272 }
273
274 if kv.key() == key {
275 let key_label = String::from_utf8_lossy(&key);
276 info!("watcher detected delete event on key {key_label:?} with revision {revision} >= {key_mod_revision}");
277 let _ = tx.send(revision);
278 let _ = watcher.cancel().await;
279 break 'outer;
280 }
281 }
282 }
283 }
284 }
285 Err(e) => {
286 if !is_transient(&e) {
287 tracing::error!("watch stream error: {e}");
288 break 'outer;
289 }
290 }
291 }
292 }
293 let _ = watcher.cancel().await;
294 }
295 });
296 tx
297 }
298}
299
300impl WatchClientExt for WatchClient {
301 fn get_watch_client(&self) -> WatchClient {
302 self.clone()
303 }
304}