1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
//! Caching mechanisms.
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::Mutex;
use tracing::*;

use crate::listeners::ListenerSet;
use crate::paths::make_path;
use crate::{
    Stat, Subscription, WatchedEvent, WatchedEventType, ZkError, ZkResult, ZkState, ZooKeeper,
    ZooKeeperExt,
};

/// Data contents of a znode and associated `Stat`.
pub type ChildData = Arc<(Vec<u8>, Stat)>;

/// Data for all known children of a znode.
pub type Data = HashMap<String, ChildData>;

#[derive(Debug, Clone)]
pub enum PathChildrenCacheEvent {
    Initialized(Data),
    ConnectionSuspended,
    ConnectionLost,
    ConnectionReconnected,
    ChildRemoved(String),
    ChildAdded(String, ChildData),
    ChildUpdated(String, ChildData),
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum RefreshMode {
    Standard,
    ForceGetDataAndStat,
}

#[allow(dead_code)]
#[derive(Debug)]
enum Operation {
    Initialize,
    Shutdown,
    Refresh(RefreshMode),
    Event(PathChildrenCacheEvent),
    GetData(String /* path */),
    ZkStateEvent(ZkState),
}

/// A [Path Cache](https://curator.apache.org/curator-recipes/path-cache.html) is used to watch a
/// znode.
///
/// A utility that attempts to keep all data from all children of a ZK path locally cached. This
/// will watch the ZK path; whenever a child is added, updated or removed, the Path Cache will
/// change its state to contain the current set of children, the children's data and the children's
/// state. You can register a listener that will get notified when changes occur.
///
/// # Note
/// It is not possible to stay transactionally in sync. Users of this class must be prepared for
/// false-positives and false-negatives. Additionally, always use the version number when updating
/// data to avoid overwriting another process's change.
pub struct PathChildrenCache {
    path: Arc<String>,
    zk: Arc<ZooKeeper>,
    data: Arc<Mutex<Data>>,
    channel: Option<UnboundedSender<Operation>>,
    listener_subscription: Option<Subscription>,
    event_listeners: ListenerSet<PathChildrenCacheEvent>,
}

impl PathChildrenCache {
    /// Create a new cache instance watching `path`. If `path` does not exist, it will be created
    /// (see `ZooKeeperExt::ensure_path`).
    ///
    /// # Note
    /// After creating the instance, you *must* call `start`.
    pub async fn new(zk: Arc<ZooKeeper>, path: &str) -> ZkResult<PathChildrenCache> {
        let data = Arc::new(Mutex::new(HashMap::new()));

        zk.ensure_path(path).await?;

        Ok(PathChildrenCache {
            path: Arc::new(path.to_owned()),
            zk,
            data,
            channel: None,
            listener_subscription: None,
            event_listeners: ListenerSet::new(),
        })
    }

    async fn get_children(
        zk: Arc<ZooKeeper>,
        path: &str,
        data: Arc<Mutex<Data>>,
        ops_chan: UnboundedSender<Operation>,
        mode: RefreshMode,
    ) -> ZkResult<()> {
        let ops_chan1 = ops_chan.clone();

        let watcher = move |event: WatchedEvent| {
            match event.event_type {
                WatchedEventType::NodeChildrenChanged => {
                    let _path = event.path.as_ref().expect("Path absent");

                    // Subscribe to new changes recursively
                    if let Err(err) = ops_chan1.send(Operation::Refresh(RefreshMode::Standard)) {
                        warn!("error sending Refresh operation to ops channel: {:?}", err);
                    }
                }
                _ => error!("Unexpected: {:?}", event),
            };
        };

        let children = zk.get_children_w(path, watcher).await?;

        let mut data_locked = data.lock().await;

        for child in &children {
            let child_path = make_path(path, child);

            if mode == RefreshMode::ForceGetDataAndStat || !data_locked.contains_key(&child_path) {
                let child_data = Arc::new(
                    Self::get_data(zk.clone(), &child_path, data.clone(), ops_chan.clone()).await?,
                );

                data_locked.insert(child_path.clone(), child_data.clone());

                ops_chan
                    .send(Operation::Event(PathChildrenCacheEvent::ChildAdded(
                        child_path, child_data,
                    )))
                    .map_err(|err| {
                        info!("error sending ChildAdded event: {:?}", err);
                        ZkError::APIError
                    })?;
            }
        }

        trace!("New data: {:?}", *data_locked);

        Ok(())
    }

    async fn get_data(
        zk: Arc<ZooKeeper>,
        path: &str,
        data: Arc<Mutex<Data>>,
        ops_chan: UnboundedSender<Operation>,
    ) -> ZkResult<(Vec<u8>, Stat)> {
        let path1 = path.to_owned();

        let data_watcher = move |event: WatchedEvent| {
            let data = data.clone();
            let ops_chan = ops_chan.clone();
            let path1 = path1.clone();

            tokio::spawn(async move {
                let mut data_locked = data.lock().await;
                match event.event_type {
                    WatchedEventType::NodeDeleted => {
                        data_locked.remove(&path1);

                        if let Err(err) = ops_chan.send(Operation::Event(
                            PathChildrenCacheEvent::ChildRemoved(path1.clone()),
                        )) {
                            warn!("error sending ChildRemoved event: {:?}", err);
                        }
                    }
                    WatchedEventType::NodeDataChanged => {
                        // Subscribe to new changes recursively
                        if let Err(err) = ops_chan.send(Operation::GetData(path1.clone())) {
                            warn!("error sending GetData to op channel: {:?}", err);
                        }
                    }
                    _ => error!("Unexpected: {:?}", event),
                };

                trace!("New data: {:?}", *data_locked);
            });
        };

        zk.get_data_w(path, data_watcher).await
    }

    async fn update_data(
        zk: Arc<ZooKeeper>,
        path: &str,
        data: Arc<Mutex<Data>>,
        ops_chan_tx: UnboundedSender<Operation>,
    ) -> ZkResult<()> {
        let mut data_locked = data.lock().await;

        let path = path.to_owned();

        let result = Self::get_data(zk.clone(), &path, data.clone(), ops_chan_tx.clone()).await;

        match result {
            Ok(child_data) => {
                trace!("got data {:?}", child_data);

                let child_data = Arc::new(child_data);

                data_locked.insert(path.clone(), child_data.clone());

                ops_chan_tx
                    .send(Operation::Event(PathChildrenCacheEvent::ChildUpdated(
                        path, child_data,
                    )))
                    .map_err(|err| {
                        warn!("error sending ChildUpdated event: {:?}", err);
                        ZkError::APIError
                    })
            }
            Err(err) => {
                warn!("error getting child data: {:?}", err);
                Err(ZkError::APIError)
            }
        }
    }

    /// Return the current data. There are no guarantees of accuracy. This is merely the most recent
    /// view of the data.
    pub async fn get_current_data(&self) -> Data {
        self.data.lock().await.clone()
    }

    pub async fn clear(&self) {
        self.data.lock().await.clear()
    }

    fn handle_state_change(state: ZkState, ops_chan_tx: UnboundedSender<Operation>) -> bool {
        let mut done = false;

        debug!("zk state change {:?}", state);
        if let ZkState::Connected = state {
            if let Err(err) = ops_chan_tx.send(Operation::Refresh(RefreshMode::ForceGetDataAndStat))
            {
                warn!("error sending Refresh to op channel: {:?}", err);
                done = true;
            }
        }

        done
    }

    async fn handle_operation(
        op: Operation,
        zk: Arc<ZooKeeper>,
        path: Arc<String>,
        data: Arc<Mutex<Data>>,
        event_listeners: ListenerSet<PathChildrenCacheEvent>,
        ops_chan_tx: UnboundedSender<Operation>,
    ) -> bool {
        let mut done = false;

        match op {
            Operation::Initialize => {
                debug!("initialising...");
                let result = Self::get_children(
                    zk.clone(),
                    &path,
                    data.clone(),
                    ops_chan_tx.clone(),
                    RefreshMode::ForceGetDataAndStat,
                )
                .await;
                debug!("got children {:?}", result);

                event_listeners.notify(&PathChildrenCacheEvent::Initialized(
                    data.lock().await.clone(),
                ));
            }
            Operation::Shutdown => {
                debug!("shutting down worker thread");
                done = true;
            }
            Operation::Refresh(mode) => {
                debug!("getting children");
                let result =
                    Self::get_children(zk.clone(), &path, data.clone(), ops_chan_tx.clone(), mode)
                        .await;
                debug!("got children {:?}", result);
            }
            Operation::GetData(path) => {
                debug!("getting data");
                let result =
                    Self::update_data(zk.clone(), &path, data.clone(), ops_chan_tx.clone()).await;
                if let Err(err) = result {
                    warn!("error getting child data: {:?}", err);
                }
            }
            Operation::Event(event) => {
                debug!("received event {:?}", event);
                event_listeners.notify(&event);
            }
            Operation::ZkStateEvent(state) => {
                done = Self::handle_state_change(state, ops_chan_tx.clone());
            }
        }

        done
    }

    /// Start the cache. The cache is not started automatically. You must call this method.
    pub fn start(&mut self) -> ZkResult<()> {
        let (ops_chan_tx, mut ops_chan_rx) = unbounded_channel();
        let ops_chan_rx_zk_events = ops_chan_tx.clone();

        let sub = self.zk.add_listener(move |s| {
            ops_chan_rx_zk_events
                .send(Operation::ZkStateEvent(s))
                .unwrap()
        });
        self.listener_subscription = Some(sub);

        let zk = self.zk.clone();
        let path = self.path.clone();
        let data = self.data.clone();
        let event_listeners = self.event_listeners.clone();
        self.channel = Some(ops_chan_tx.clone());

        tokio::spawn(async move {
            let mut done = false;

            while !done {
                match ops_chan_rx.recv().await {
                    Some(operation) => {
                        done = Self::handle_operation(
                            operation,
                            zk.clone(),
                            path.clone(),
                            data.clone(),
                            event_listeners.clone(),
                            ops_chan_tx.clone(),
                        )
                        .await;
                    }
                    None => {
                        info!("error receiving from operations channel. shutting down");
                        done = true;
                    }
                }
            }
        });

        self.offer_operation(Operation::Initialize)
    }

    pub fn add_listener<Listener: Fn(PathChildrenCacheEvent) + Send + 'static>(
        &self,
        subscriber: Listener,
    ) -> Subscription {
        self.event_listeners.subscribe(subscriber)
    }

    pub fn remove_listener(&self, sub: Subscription) {
        self.event_listeners.unsubscribe(sub)
    }

    fn offer_operation(&self, op: Operation) -> ZkResult<()> {
        match self.channel {
            Some(ref chan) => chan.send(op).map_err(|err| {
                warn!("error submitting op to channel: {:?}", err);
                ZkError::APIError
            }),
            None => Err(ZkError::APIError),
        }
    }
}