rust_etcd_utils/
watcher.rs

1use {
2    super::{retry::retry_etcd_legacy, Revision},
3    crate::retry::is_transient,
4    etcd_client::{EventType, WatchClient, WatchFilterType, WatchOptions},
5    retry::delay::Exponential,
6    serde::de::DeserializeOwned,
7    tokio::sync::{broadcast, mpsc},
8    tokio_stream::StreamExt,
9    tracing::{error, info, warn},
10};
11
12///
13/// Custom types for watch events.
14///
15/// Unwrap the etcd watch event to a more user-friendly event.
16///
17pub enum WatchEvent<V> {
18    Put {
19        key: Vec<u8>,
20        value: V,
21        revision: Revision,
22    },
23    Delete {
24        key: Vec<u8>,
25        prev_value: Option<V>,
26        revision: Revision,
27    },
28}
29
30///
31/// Extension trait for [`WatchClient`].
32///
33/// This trait provides utility methods for working with [`WatchClient`].
34///
35/// This extension trait provides rust channel of watch stream and more reliability in case of transient errors.
36///
37/// On transient errors, the watch stream will be retried and resume where you left off.
38///
39#[async_trait::async_trait]
40pub trait WatchClientExt {
41    fn get_watch_client(&self) -> WatchClient;
42
43    ///
44    /// Creates a channel that watches for changes to a key in etcd.
45    ///
46    /// The channel will send a [`WatchEvent`] for each change to the key.
47    /// The channel will be retried on transient errors.
48    ///
49    /// The channel will be closed if the watch is cancelled or if the stream is closed.
50    ///
51    /// The watch expect value to be JSON encoded.
52    fn json_watch_channel<V>(
53        &self,
54        key: impl Into<Vec<u8>>,
55        watch_options: Option<WatchOptions>,
56    ) -> mpsc::Receiver<WatchEvent<V>>
57    where
58        V: DeserializeOwned + Send + 'static,
59    {
60        let wc = self.get_watch_client();
61        let (tx, rx) = tokio::sync::mpsc::channel(10);
62        let key: Vec<u8> = key.into();
63        tokio::spawn(async move {
64            let wopts_prototype = watch_options.unwrap_or_default().with_prev_key();
65            let mut last_revision = None; // 0 = latest revision
66            'outer: loop {
67                let mut wopts = wopts_prototype.clone();
68                if let Some(rev) = last_revision {
69                    wopts = wopts.with_start_revision(rev);
70                }
71                let wc2 = wc.clone();
72                let key2 = key.clone();
73                let retry_strategy = Exponential::from_millis_with_factor(10, 10.0).take(3);
74
75                let (mut watcher, mut stream) = retry_etcd_legacy(retry_strategy, move || {
76                    let mut wc = wc2.clone();
77                    let key = key2.clone();
78                    let wopts = wopts.clone();
79                    async move { wc.watch(key.clone(), Some(wopts)).await }
80                })
81                .await
82                .expect("watch retry failed");
83
84                'inner: while let Some(watch_resp) = stream.next().await {
85                    match watch_resp {
86                        Ok(watch_resp) => {
87                            if watch_resp.canceled() {
88                                // This is probably because the compaction_revision < initial revision
89                                error!("watch cancelled: {watch_resp:?}");
90                                break 'outer;
91                            }
92                            for event in watch_resp.events() {
93                                let watch_event = match event.event_type() {
94                                    EventType::Put => {
95                                        let kv = event.kv().expect("put event with no kv");
96                                        let key = Vec::from(kv.key());
97                                        let value = serde_json::from_slice::<V>(kv.value())
98                                            .expect("failed to deserialize controller state");
99                                        last_revision.replace(kv.mod_revision());
100                                        WatchEvent::Put {
101                                            key,
102                                            value,
103                                            revision: kv.mod_revision(),
104                                        }
105                                    }
106                                    EventType::Delete => {
107                                        let kv = event.kv().expect("delete event with no kv");
108                                        let prev_value = event
109                                            .prev_kv()
110                                            .map(|prev_kv| prev_kv.value())
111                                            .map(|prev_v| serde_json::from_slice::<V>(prev_v))
112                                            .transpose()
113                                            .expect("failed to deserialize prev controller state");
114                                        let key = Vec::from(kv.key());
115                                        last_revision.replace(kv.mod_revision());
116                                        WatchEvent::Delete {
117                                            key,
118                                            prev_value,
119                                            revision: kv.mod_revision(),
120                                        }
121                                    }
122                                };
123                                if tx.send(watch_event).await.is_err() {
124                                    warn!("closed watch event receiver");
125                                    break 'outer;
126                                }
127                            }
128                        }
129                        Err(e) => {
130                            error!("watch stream error: {:?}", e);
131                            break 'inner;
132                        }
133                    }
134                }
135                let _ = watcher.cancel().await;
136            }
137        });
138        rx
139    }
140
141    fn json_put_watch_channel<T>(
142        &self,
143        key: impl Into<Vec<u8>>,
144        watch_options: Option<WatchOptions>,
145    ) -> mpsc::Receiver<(Revision, T)>
146    where
147        T: DeserializeOwned + Send + 'static,
148    {
149        let wc = self.get_watch_client();
150        let (tx, rx) = tokio::sync::mpsc::channel(10);
151        let key: Vec<u8> = key.into();
152        tokio::spawn(async move {
153            let mut last_revision = None; // 0 = latest revision
154            let wopts_prototype = watch_options
155                .unwrap_or_default()
156                .with_filters(vec![WatchFilterType::NoDelete]);
157            'outer: loop {
158                let mut wopts = wopts_prototype.clone();
159                if let Some(rev) = last_revision {
160                    wopts = wopts.with_start_revision(rev);
161                }
162
163                let retry_strategy = Exponential::from_millis_with_factor(10, 10.0).take(3);
164                let wc2 = wc.clone();
165                let key2 = key.clone();
166                let (mut watcher, mut stream) = retry_etcd_legacy(retry_strategy, move || {
167                    let mut wc = wc2.clone();
168                    let key = key2.clone();
169                    let wopts = wopts.clone();
170                    async move { wc.watch(key.clone(), Some(wopts)).await }
171                })
172                .await
173                .expect("watch retry failed");
174
175                'inner: while let Some(watch_resp) = stream.next().await {
176                    match watch_resp {
177                        Ok(watch_resp) => {
178                            let max_kv = watch_resp
179                                .events()
180                                .iter()
181                                .filter_map(|ev| ev.kv())
182                                .max_by_key(|kv| kv.mod_revision());
183                            if let Some(kv) = max_kv {
184                                let revision = kv.mod_revision();
185                                last_revision.replace(revision);
186
187                                let state = serde_json::from_slice::<T>(kv.value())
188                                    .expect("failed to deserialize kv value");
189                                if tx.send((revision, state)).await.is_err() {
190                                    let key_str = String::from_utf8(key).expect("key is not utf8");
191                                    warn!("json watch channel closed its receiving half for {key_str}");
192                                    break 'outer;
193                                }
194                            } else if watch_resp.canceled() {
195                                // This is probably because the compaction_revision < initial revision
196                                error!("watch cancelled: {watch_resp:?}");
197                                break 'outer;
198                            }
199                        }
200                        Err(e) => {
201                            error!("watch stream error: {:?}", e);
202                            break 'inner;
203                        }
204                    }
205                }
206                let _ = watcher.cancel().await;
207            }
208        });
209        rx
210    }
211
212    ///
213    /// Creates a broadcast channel that watches for a lock key that gets deleted.
214    ///
215    fn watch_lock_key_change(
216        &self,
217        key: impl Into<Vec<u8>>,
218        key_mod_revision: Revision,
219    ) -> broadcast::Sender<Revision> {
220        let wc = self.get_watch_client();
221        let key: Vec<u8> = key.into();
222
223        let (tx, _) = broadcast::channel(1);
224
225        let tx2 = tx.clone();
226        tokio::spawn(async move {
227            let tx = tx2;
228            'outer: loop {
229                let key2 = key.clone();
230                let key = key.clone();
231                let wopts = WatchOptions::new().with_start_revision(key_mod_revision);
232                let retry_strategy = Exponential::from_millis_with_factor(10, 10.0).take(3);
233                let wc2 = wc.clone();
234                let (mut watcher, mut stream) = retry_etcd_legacy(retry_strategy, move || {
235                    let mut wc = wc2.clone();
236                    let key = key2.clone();
237                    let wopts = wopts.clone();
238                    async move { wc.watch(key.clone(), Some(wopts)).await }
239                })
240                .await
241                .expect("watch retry failed");
242
243                'inner: while let Some(result) = stream.next().await {
244                    match result {
245                        Ok(watch_resp) => {
246                            if watch_resp.canceled() {
247                                // This is probably because the compaction_revision < initial revision
248                                error!("watch cancelled: {watch_resp:?}");
249                                break 'inner;
250                            }
251                            for event in watch_resp.events() {
252                                match event.event_type() {
253                                    EventType::Put => {
254                                        let kv = event.kv().expect("put event with no kv");
255                                        if kv.key() == key {
256                                            continue;
257                                        }
258                                        let revision = kv.mod_revision();
259                                        if revision <= key_mod_revision {
260                                            continue 'inner;
261                                        }
262                                        info!("watcher detected put event on key {key:?} with revision {revision} > {key_mod_revision}");
263                                        let _ = tx.send(revision);
264                                        let _ = watcher.cancel().await;
265                                        break 'outer;
266                                    }
267                                    EventType::Delete => {
268                                        let kv = event.kv().expect("delete event with no kv");
269                                        let revision = kv.mod_revision();
270                                        if revision < key_mod_revision {
271                                            continue;
272                                        }
273
274                                        if kv.key() == key {
275                                            let key_label = String::from_utf8_lossy(&key);
276                                            info!("watcher detected delete event on key {key_label:?} with revision {revision} >= {key_mod_revision}");
277                                            let _ = tx.send(revision);
278                                            let _ = watcher.cancel().await;
279                                            break 'outer;
280                                        }
281                                    }
282                                }
283                            }
284                        }
285                        Err(e) => {
286                            if !is_transient(&e) {
287                                tracing::error!("watch stream error: {e}");
288                                break 'outer;
289                            }
290                        }
291                    }
292                }
293                let _ = watcher.cancel().await;
294            }
295        });
296        tx
297    }
298}
299
300impl WatchClientExt for WatchClient {
301    fn get_watch_client(&self) -> WatchClient {
302        self.clone()
303    }
304}