zookeeper 0.8.0

A minimal ZooKeeper client
Documentation
use zookeeper::{AddWatchMode, Acl, CreateMode, WatcherType};
use zookeeper::{WatchedEvent, ZooKeeper, ZooKeeperExt};

use ZkCluster;

use env_logger;
use std::sync::mpsc;
use std::time::Duration;

#[test]
fn persistent_watch_receives_more_than_one_message_on_modifications() {
    let _ = env_logger::try_init();

    // Create a test cluster
    let cluster = ZkCluster::start(1);

    let zk_watcher = ZooKeeper::connect(
        &cluster.connect_string,
        Duration::from_secs(30),
        |event: WatchedEvent| info!("{:?}", event),
    )
    .unwrap();

    let zk_modifier = ZooKeeper::connect(
        &cluster.connect_string,
        Duration::from_secs(30),
        |event: WatchedEvent| info!("{:?}", event),
    )
    .unwrap();

    zk_modifier.ensure_path("/base").unwrap();

    let (snd, rcv) = mpsc::channel::<()>();
    zk_watcher.add_watch("/base", AddWatchMode::Persistent, move |_| {
        snd.send(()).unwrap();
    }).unwrap();
    zk_modifier.set_data("/base", b"hello1".to_vec(), None).unwrap();
    rcv.recv_timeout(Duration::from_millis(100)).unwrap();
    zk_modifier.set_data("/base", b"hello2".to_vec(), None).unwrap();
    rcv.recv_timeout(Duration::from_millis(100)).unwrap();
}

#[test]
fn persistent_watch_does_not_receive_children_changes() {
    let _ = env_logger::try_init();

    // Create a test cluster
    let cluster = ZkCluster::start(1);

    let zk_watcher = ZooKeeper::connect(
        &cluster.connect_string,
        Duration::from_secs(30),
        |event: WatchedEvent| info!("{:?}", event),
    )
    .unwrap();

    let zk_modifier = ZooKeeper::connect(
        &cluster.connect_string,
        Duration::from_secs(30),
        |event: WatchedEvent| info!("{:?}", event),
    )
    .unwrap();

    zk_modifier.ensure_path("/base").unwrap();
    zk_modifier.ensure_path("/base/child").unwrap();

    let (snd, rcv) = mpsc::channel::<()>();
    zk_watcher.add_watch("/base", AddWatchMode::Persistent, move |_| {
        snd.send(()).unwrap();
    }).unwrap();
    zk_modifier.set_data("/base", b"hello1".to_vec(), None).unwrap();
    rcv.recv_timeout(Duration::from_millis(100)).unwrap();
    zk_modifier.set_data("/base/child", b"hello2".to_vec(), None).unwrap();
    if rcv.recv_timeout(Duration::from_millis(100)).is_ok() {
        panic!("received unexpected event for child");
    }
}

#[test]
fn persistent_recursive_watch_stops_receiving_updates_when_removed() {
    let _ = env_logger::try_init();

    // Create a test cluster
    let cluster = ZkCluster::start(1);

    let zk_watcher = ZooKeeper::connect(
        &cluster.connect_string,
        Duration::from_secs(30),
        |event: WatchedEvent| info!("{:?}", event),
    )
    .unwrap();

    let zk_modifier = ZooKeeper::connect(
        &cluster.connect_string,
        Duration::from_secs(30),
        |event: WatchedEvent| info!("{:?}", event),
    )
    .unwrap();

    zk_modifier.ensure_path("/base").unwrap();

    let (snd, rcv) = mpsc::channel::<()>();
    zk_watcher.add_watch("/base", AddWatchMode::PersistentRecursive, move |_| {
        snd.send(()).unwrap();
    }).unwrap();
    zk_modifier.create("/base/child1", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
    rcv.recv_timeout(Duration::from_millis(100)).unwrap();
    zk_watcher.remove_watches("/base", WatcherType::Any).unwrap();
    zk_modifier.create("/base/child2", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
    if rcv.recv_timeout(Duration::from_millis(100)).is_ok() {
        panic!("received unexpected event for child");
    }
}

#[test]
fn persistent_recursive_watch_receive_children_changes() {
    let _ = env_logger::try_init();

    // Create a test cluster
    let cluster = ZkCluster::start(1);

    let zk_watcher = ZooKeeper::connect(
        &cluster.connect_string,
        Duration::from_secs(30),
        |event: WatchedEvent| info!("{:?}", event),
    )
    .unwrap();

    let zk_modifier = ZooKeeper::connect(
        &cluster.connect_string,
        Duration::from_secs(30),
        |event: WatchedEvent| info!("{:?}", event),
    )
    .unwrap();

    zk_modifier.ensure_path("/base").unwrap();

    let (snd, rcv) = mpsc::channel::<()>();
    zk_watcher.add_watch("/base", AddWatchMode::PersistentRecursive, move |_| {
        snd.send(()).unwrap();
    }).unwrap();
    zk_modifier.create("/base/child1", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
    rcv.recv_timeout(Duration::from_millis(100)).unwrap();
    zk_modifier.create("/base/child2", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap();
    rcv.recv_timeout(Duration::from_millis(100)).unwrap();
}