zookeeper-async 5.0.0

An async ZooKeeper client
Documentation
#![deny(unused_mut)]

use std::env;
use std::io;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use zookeeper_async::recipes::cache::PathChildrenCache;
use zookeeper_async::{Acl, CreateMode, WatchedEvent, Watcher, ZooKeeper};

struct LoggingWatcher;
impl Watcher for LoggingWatcher {
    fn handle(&self, e: WatchedEvent) {
        println!("{:?}", e)
    }
}

fn zk_server_urls() -> String {
    let key = "ZOOKEEPER_SERVERS";
    match env::var(key) {
        Ok(val) => val,
        Err(_) => "localhost:2181".to_string(),
    }
}

async fn zk_example() {
    let zk_urls = zk_server_urls();
    println!("connecting to {}", zk_urls);

    let zk = ZooKeeper::connect(&zk_urls, Duration::from_secs(15), LoggingWatcher)
        .await
        .unwrap();

    zk.add_listener(|zk_state| println!("New ZkState is {:?}", zk_state));

    let mut tmp = String::new();

    let auth = zk.add_auth("digest", vec![1, 2, 3, 4]).await;

    println!("authenticated -> {:?}", auth);

    let path = zk
        .create(
            "/test",
            vec![1, 2],
            Acl::open_unsafe().clone(),
            CreateMode::Ephemeral,
        )
        .await;

    println!("created -> {:?}", path);

    let exists = zk.exists("/test", true).await;

    println!("exists -> {:?}", exists);

    let doesnt_exist = zk.exists("/blabla", true).await;

    println!("don't exists path -> {:?}", doesnt_exist);

    let get_acl = zk.get_acl("/test").await;

    println!("get_acl -> {:?}", get_acl);

    let set_acl = zk.set_acl("/test", Acl::open_unsafe().clone(), None).await;

    println!("set_acl -> {:?}", set_acl);

    let children = zk.get_children("/", true).await;

    println!("children of / -> {:?}", children);

    let set_data = zk.set_data("/test", vec![6, 5, 4, 3], None).await;

    println!("set_data -> {:?}", set_data);

    let get_data = zk.get_data("/test", true).await;

    println!("get_data -> {:?}", get_data);

    // let delete = zk.delete("/test", -1);

    // println!("deleted /test -> {:?}", delete);

    let watch_children = zk
        .get_children_w("/", |event: WatchedEvent| {
            println!("watched event {:?}", event);
        })
        .await;

    println!("watch children -> {:?}", watch_children);

    let zk_arc = Arc::new(zk);

    let mut pcc = PathChildrenCache::new(zk_arc.clone(), "/").await.unwrap();
    match pcc.start() {
        Err(err) => {
            println!("error starting cache: {:?}", err);
            return;
        }
        _ => {
            println!("cache started");
        }
    }

    let (ev_tx, ev_rx) = mpsc::channel();
    pcc.add_listener(move |e| ev_tx.send(e).unwrap());
    thread::spawn(move || {
        for ev in ev_rx {
            println!("received event {:?}", ev);
        }
    });

    println!("press enter to close client");
    io::stdin().read_line(&mut tmp).unwrap();

    // The client can be shared between tasks
    let zk_arc_captured = zk_arc.clone();
    tokio::spawn(async move {
        zk_arc_captured.close().await.unwrap();

        // And operations return error after closed
        match zk_arc_captured.exists("/test", false).await {
            Err(err) => println!("Usage after closed should end up with error: {:?}", err),
            Ok(_) => panic!("Shouldn't happen"),
        }
    });

    println!("press enter to exit example");
    io::stdin().read_line(&mut tmp).unwrap();

    zk_arc.close().await.unwrap();
}

#[tokio::main]
async fn main() {
    zk_example().await;
}