zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::io::Read;
use std::io::Write;
use std::net::TcpStream;

use zookeeper_client::CreateSequence;
use zookeeper_client::Error;
use zookeeper_client::OneshotWatcher;
use zookeeper_client::Stat;

use zookeeper_client as zk;

use crate::commons::split_str;

pub enum LockOutcome {
    Locked(String),
    NotLocked(String),
}

// https://crates.io/crates/zookeeper-client
pub struct ZkClient {
    pub server_address: String,
    pub client: zookeeper_client::Client,
}

impl ZkClient {
    pub async fn server_info(&self) {
        let cmd = "envi";
        let server_address = split_str(&self.server_address, ",".to_string())[0];

        match TcpStream::connect(server_address) {
            Ok(mut stream) => {
                if let Err(e) = stream.write_all(cmd.as_bytes()) {
                    log::error!(
                        "server_info-write_all-failed: server_address={}, err: {:?}",
                        server_address,
                        e
                    );
                } else {
                    let mut server_info = String::new();

                    if stream.read_to_string(&mut server_info).is_ok() {
                        log::info!(
                            "Zookeeper has been successfully connected: server_address={}, server_info={}",
                            server_address,
                            get_server_info(server_info)
                        );
                    }
                }
            }
            Err(e) => {
                log::error!(
                    "server_info-connect-failed: server_address={}, err={:?}",
                    server_address,
                    e
                );
            }
        }
    }

    pub async fn try_lock(&self, prefix: &str, path: &str) {
        loop {
            match self.lock(prefix, path, "").await {
                Ok(LockOutcome::Locked(_)) => {
                    break;
                }
                Ok(LockOutcome::NotLocked(path)) => {
                    log::debug!(
                        "zk-try_lock-failed: path={}, will be lock again after 2 secsons.",
                        path
                    );
                    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
                }
                Err(err) => {
                    log::debug!("zk-try_lock-failed: path={}, error={:?}", path, err);
                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
                }
            }
        }
    }

    // `{dir}/_c_{uuid}-{name}{ephemeral_sequence}`.
    // path = /app/data
    pub async fn lock(&self, dir: &str, path: &str, data: &str) -> Result<LockOutcome, Error> {
        let dir = format!("/{}", dir).replace("//", "/");
        let path = format!("/{}", path.replace('/', ""));
        let p = format!("{}{}", dir, path);

        let prefix = zk::LockPrefix::new_curator(&dir, "latch-").unwrap();

        let options = zk::LockOptions::new(zk::Acls::anyone_all())
            .with_ancestor_options(zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()))
            .unwrap();

        match self.client.lock(prefix, b"", options).await {
            Ok(latch) => {
                match latch
                    .create(
                        &path,
                        data.as_bytes(),
                        &zk::CreateMode::Ephemeral.with_acls(zk::Acls::anyone_all()),
                    )
                    .await
                {
                    Ok((_, sequence)) => {
                        log::info!("zk-locked-success: path={}, seq={:?}", p, sequence);
                        Ok(LockOutcome::Locked(p))
                    }
                    Err(e) => {
                        log::info!("zk-locked-failed: path={}, error={:?}", p, e);
                        Ok(LockOutcome::NotLocked(p))
                    }
                }
            }
            Err(e) => Err(e),
        }
    }

    pub async fn check_and_watch_stat(&self, path: &str) -> Result<OneshotWatcher, Error> {
        match self.client.check_and_watch_stat(path).await {
            Ok((_, stat_watcher)) => Ok(stat_watcher),
            Err(e) => Err(e),
        }
    }

    pub async fn create(&self, path: &str, data: &str) -> Result<(Stat, CreateSequence), Error> {
        let data = data.as_bytes().to_vec();
        let create_options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());

        match self.client.create(path, &data, &create_options).await {
            Ok((stat, sequence)) => {
                log::info!(
                    "zk-create-success: path={}, stat={:?}, sequence={:?}",
                    path,
                    stat,
                    sequence
                );
                Ok((stat, sequence))
            }
            Err(e) => {
                log::error!("zk-create-failed: path={}, error={:?}", path, e);
                Err(e)
            }
        }
    }
}

fn get_server_info(server_info: String) -> String {
    pub struct LinesWithEndings<'a> {
        input: &'a str,
    }

    impl<'a> LinesWithEndings<'a> {
        pub fn from(input: &'a str) -> LinesWithEndings<'a> {
            LinesWithEndings { input }
        }
    }

    impl<'a> Iterator for LinesWithEndings<'a> {
        type Item = &'a str;

        #[inline]
        fn next(&mut self) -> Option<&'a str> {
            if self.input.is_empty() {
                return None;
            }
            let split = self
                .input
                .find('\n')
                .map(|i| i + 1)
                .unwrap_or(self.input.len());
            let (line, rest) = self.input.split_at(split);
            self.input = rest;
            Some(line)
        }
    }

    let ll = LinesWithEndings::from(&server_info);

    let mut info_map = std::collections::HashMap::new();

    for line in ll {
        if line.trim().starts_with("zookeeper.version") {
            info_map.insert(
                "zookeeper_version",
                split_str(split_str(line.trim(), "=".to_string())[1], "-".to_string())[0],
            );
            continue;
        } else if line.trim().starts_with("java.version") {
            info_map.insert("java_version", split_str(line.trim(), "=".to_string())[1]);
            continue;
        } else if line.trim().starts_with("java.home") {
            info_map.insert("java_home", split_str(line.trim(), "=".to_string())[1]);
            continue;
        } else if line.trim().starts_with("os.version") {
            info_map.insert("os_version", split_str(line.trim(), "=".to_string())[1]);
            continue;
        }
    }

    crate::commons::generic_to_string(&info_map).unwrap()
}