zookeeper 0.8.0

A minimal ZooKeeper client
Documentation
#![deny(unused_mut)]
extern crate zookeeper;
#[macro_use]
extern crate log;
extern crate env_logger;

use std::env;
use std::io;
use std::io::BufRead;
use std::time::Duration;
use zookeeper::AddWatchMode;
use zookeeper::WatcherType;
use zookeeper::ZooKeeperExt;
use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher, ZooKeeper};

struct LoggingWatcher;
impl Watcher for LoggingWatcher {
    fn handle(&self, e: WatchedEvent) {
        info!("{:?}", e)
    }
}

fn zk_server_urls() -> String {
    let key = "ZOOKEEPER_SERVERS";
    match env::var(key) {
        Ok(val) => val,
        Err(_) => "localhost:2181".to_string(),
    }
}

fn zk_example() {
    let zk_urls = zk_server_urls();
    println!("connecting to {}", zk_urls);

    let root = format!("/example-{}", uuid::Uuid::new_v4());
    let modifying_zk =
        ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap();
    let recursive_watch_zk =
        ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap();
    let persistent_watch_zk =
        ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap();

    // Creating separate clients to show the example where modifications to the nodes
    // take place in a different session than our own.
    modifying_zk.add_listener(|zk_state| println!("New modifying ZkState is {:?}", zk_state));

    // Also separate clients per type of watch as there is a bug when creating multiple type watchers in the same
    // path in the same session
    // https://issues.apache.org/jira/browse/ZOOKEEPER-4466
    recursive_watch_zk.add_listener(|zk_state| println!("New recursive watch ZkState is {:?}", zk_state));
    persistent_watch_zk.add_listener(|zk_state| println!("New peristent watch ZkState is {:?}", zk_state));

    modifying_zk.ensure_path(&root).unwrap();

    recursive_watch_zk
        .add_watch(&root, AddWatchMode::PersistentRecursive, |event| {
            println!("received persistent recursive watch event {event:?}");
        })
        .unwrap();

    persistent_watch_zk
        .add_watch(&root, AddWatchMode::Persistent, |event| {
            println!("received persistent watch event {event:?}");
        })
        .unwrap();

    println!("press c to add and modify child, e to edit the watched node, anything else to proceed");
    let stdin = io::stdin();
    let inputs = stdin.lock().lines();
    let mut incr = 0;
    for input in inputs {
        incr += 1;
        match input.unwrap().as_str() {
            "c" => {
                let child_path = format!("{root}/child-{incr}");
                modifying_zk
                    .create(
                        &child_path,
                        b"".to_vec(),
                        Acl::open_unsafe().clone(),
                        CreateMode::Ephemeral,
                    )
                    .unwrap();
                modifying_zk
                    .set_data(
                        &child_path,
                        b"new-data".to_vec(),
                        None,
                    )
                    .unwrap();
                modifying_zk
                    .delete(&child_path, None)
                    .unwrap();
            },
            "e" => {
                modifying_zk
                    .set_data(
                        &root,
                        format!("new-data-{incr}").into_bytes(),
                        None,
                    )
                    .unwrap();
            }
            other => {
                println!("received {other}");
                break
            }
        }
    }

    println!("removing watch");
    recursive_watch_zk
        .remove_watches(&root, WatcherType::Any)
        .unwrap();
    persistent_watch_zk
        .remove_watches(&root, WatcherType::Any)
        .unwrap();

    println!("creating child node. This shouldn't have a new event");
    modifying_zk
        .create(
            &format!("{root}/child-no-notification"),
            b"".to_vec(),
            Acl::open_unsafe().clone(),
            CreateMode::Ephemeral,
        )
        .unwrap();

    modifying_zk.delete_recursive(&root).unwrap();
}

fn main() {
    env_logger::init();
    zk_example();
}