use zookeeper::{AddWatchMode, Acl, CreateMode, WatcherType};
use zookeeper::{WatchedEvent, ZooKeeper, ZooKeeperExt};
use ZkCluster;
use env_logger;
use std::sync::mpsc;
use std::time::Duration;
#[test]
fn persistent_watch_receives_more_than_one_message_on_modifications() {
let _ = env_logger::try_init();
let cluster = ZkCluster::start(1);
let zk_watcher = ZooKeeper::connect(
&cluster.connect_string,
Duration::from_secs(30),
|event: WatchedEvent| info!("{:?}", event),
)
.unwrap();
let zk_modifier = ZooKeeper::connect(
&cluster.connect_string,
Duration::from_secs(30),
|event: WatchedEvent| info!("{:?}", event),
)
.unwrap();
zk_modifier.ensure_path("/base").unwrap();
let (snd, rcv) = mpsc::channel::<()>();
zk_watcher.add_watch("/base", AddWatchMode::Persistent, move |_| {
snd.send(()).unwrap();
}).unwrap();
zk_modifier.set_data("/base", b"hello1".to_vec(), None).unwrap();
rcv.recv_timeout(Duration::from_millis(100)).unwrap();
zk_modifier.set_data("/base", b"hello2".to_vec(), None).unwrap();
rcv.recv_timeout(Duration::from_millis(100)).unwrap();
}
#[test]
fn persistent_watch_does_not_receive_children_changes() {
let _ = env_logger::try_init();
let cluster = ZkCluster::start(1);
let zk_watcher = ZooKeeper::connect(
&cluster.connect_string,
Duration::from_secs(30),
|event: WatchedEvent| info!("{:?}", event),
)
.unwrap();
let zk_modifier = ZooKeeper::connect(
&cluster.connect_string,
Duration::from_secs(30),
|event: WatchedEvent| info!("{:?}", event),
)
.unwrap();
zk_modifier.ensure_path("/base").unwrap();
zk_modifier.ensure_path("/base/child").unwrap();
let (snd, rcv) = mpsc::channel::<()>();
zk_watcher.add_watch("/base", AddWatchMode::Persistent, move |_| {
snd.send(()).unwrap();
}).unwrap();
zk_modifier.set_data("/base", b"hello1".to_vec(), None).unwrap();
rcv.recv_timeout(Duration::from_millis(100)).unwrap();
zk_modifier.set_data("/base/child", b"hello2".to_vec(), None).unwrap();
if rcv.recv_timeout(Duration::from_millis(100)).is_ok() {
panic!("received unexpected event for child");
}
}
#[test]
fn persistent_recursive_watch_stops_receiving_updates_when_removed() {
let _ = env_logger::try_init();
let cluster = ZkCluster::start(1);
let zk_watcher = ZooKeeper::connect(
&cluster.connect_string,
Duration::from_secs(30),
|event: WatchedEvent| info!("{:?}", event),
)
.unwrap();
let zk_modifier = ZooKeeper::connect(
&cluster.connect_string,
Duration::from_secs(30),
|event: WatchedEvent| info!("{:?}", event),
)
.unwrap();
zk_modifier.ensure_path("/base").unwrap();
let (snd, rcv) = mpsc::channel::<()>();
zk_watcher.add_watch("/base", AddWatchMode::PersistentRecursive, move |_| {
snd.send(()).unwrap();
}).unwrap();
zk_modifier.create("/base/child1", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
rcv.recv_timeout(Duration::from_millis(100)).unwrap();
zk_watcher.remove_watches("/base", WatcherType::Any).unwrap();
zk_modifier.create("/base/child2", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
if rcv.recv_timeout(Duration::from_millis(100)).is_ok() {
panic!("received unexpected event for child");
}
}
#[test]
fn persistent_recursive_watch_receive_children_changes() {
let _ = env_logger::try_init();
let cluster = ZkCluster::start(1);
let zk_watcher = ZooKeeper::connect(
&cluster.connect_string,
Duration::from_secs(30),
|event: WatchedEvent| info!("{:?}", event),
)
.unwrap();
let zk_modifier = ZooKeeper::connect(
&cluster.connect_string,
Duration::from_secs(30),
|event: WatchedEvent| info!("{:?}", event),
)
.unwrap();
zk_modifier.ensure_path("/base").unwrap();
let (snd, rcv) = mpsc::channel::<()>();
zk_watcher.add_watch("/base", AddWatchMode::PersistentRecursive, move |_| {
snd.send(()).unwrap();
}).unwrap();
zk_modifier.create("/base/child1", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
rcv.recv_timeout(Duration::from_millis(100)).unwrap();
zk_modifier.create("/base/child2", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
rcv.recv_timeout(Duration::from_millis(100)).unwrap();
}