use std::io::Read;
use std::io::Write;
use std::net::TcpStream;
use zookeeper_client::CreateSequence;
use zookeeper_client::Error;
use zookeeper_client::OneshotWatcher;
use zookeeper_client::Stat;
use zookeeper_client as zk;
use crate::commons::split_str;
pub enum LockOutcome {
Locked(String),
NotLocked(String),
}
pub struct ZkClient {
pub server_address: String,
pub client: zookeeper_client::Client,
}
impl ZkClient {
pub async fn server_info(&self) {
let cmd = "envi";
let server_address = split_str(&self.server_address, ",".to_string())[0];
match TcpStream::connect(server_address) {
Ok(mut stream) => {
if let Err(e) = stream.write_all(cmd.as_bytes()) {
log::error!(
"server_info-write_all-failed: server_address={}, err: {:?}",
server_address,
e
);
} else {
let mut server_info = String::new();
if stream.read_to_string(&mut server_info).is_ok() {
log::info!(
"Zookeeper has been successfully connected: server_address={}, server_info={}",
server_address,
get_server_info(server_info)
);
}
}
}
Err(e) => {
log::error!(
"server_info-connect-failed: server_address={}, err={:?}",
server_address,
e
);
}
}
}
pub async fn try_lock(&self, prefix: &str, path: &str) {
loop {
match self.lock(prefix, path, "").await {
Ok(LockOutcome::Locked(_)) => {
break;
}
Ok(LockOutcome::NotLocked(path)) => {
log::debug!(
"zk-try_lock-failed: path={}, will be lock again after 2 secsons.",
path
);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
Err(err) => {
log::debug!("zk-try_lock-failed: path={}, error={:?}", path, err);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
pub async fn lock(&self, dir: &str, path: &str, data: &str) -> Result<LockOutcome, Error> {
let dir = format!("/{}", dir).replace("//", "/");
let path = format!("/{}", path.replace('/', ""));
let p = format!("{}{}", dir, path);
let prefix = zk::LockPrefix::new_curator(&dir, "latch-").unwrap();
let options = zk::LockOptions::new(zk::Acls::anyone_all())
.with_ancestor_options(zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()))
.unwrap();
match self.client.lock(prefix, b"", options).await {
Ok(latch) => {
match latch
.create(
&path,
data.as_bytes(),
&zk::CreateMode::Ephemeral.with_acls(zk::Acls::anyone_all()),
)
.await
{
Ok((_, sequence)) => {
log::info!("zk-locked-success: path={}, seq={:?}", p, sequence);
Ok(LockOutcome::Locked(p))
}
Err(e) => {
log::info!("zk-locked-failed: path={}, error={:?}", p, e);
Ok(LockOutcome::NotLocked(p))
}
}
}
Err(e) => Err(e),
}
}
pub async fn check_and_watch_stat(&self, path: &str) -> Result<OneshotWatcher, Error> {
match self.client.check_and_watch_stat(path).await {
Ok((_, stat_watcher)) => Ok(stat_watcher),
Err(e) => Err(e),
}
}
pub async fn create(&self, path: &str, data: &str) -> Result<(Stat, CreateSequence), Error> {
let data = data.as_bytes().to_vec();
let create_options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
match self.client.create(path, &data, &create_options).await {
Ok((stat, sequence)) => {
log::info!(
"zk-create-success: path={}, stat={:?}, sequence={:?}",
path,
stat,
sequence
);
Ok((stat, sequence))
}
Err(e) => {
log::error!("zk-create-failed: path={}, error={:?}", path, e);
Err(e)
}
}
}
}
fn get_server_info(server_info: String) -> String {
pub struct LinesWithEndings<'a> {
input: &'a str,
}
impl<'a> LinesWithEndings<'a> {
pub fn from(input: &'a str) -> LinesWithEndings<'a> {
LinesWithEndings { input }
}
}
impl<'a> Iterator for LinesWithEndings<'a> {
type Item = &'a str;
#[inline]
fn next(&mut self) -> Option<&'a str> {
if self.input.is_empty() {
return None;
}
let split = self
.input
.find('\n')
.map(|i| i + 1)
.unwrap_or(self.input.len());
let (line, rest) = self.input.split_at(split);
self.input = rest;
Some(line)
}
}
let ll = LinesWithEndings::from(&server_info);
let mut info_map = std::collections::HashMap::new();
for line in ll {
if line.trim().starts_with("zookeeper.version") {
info_map.insert(
"zookeeper_version",
split_str(split_str(line.trim(), "=".to_string())[1], "-".to_string())[0],
);
continue;
} else if line.trim().starts_with("java.version") {
info_map.insert("java_version", split_str(line.trim(), "=".to_string())[1]);
continue;
} else if line.trim().starts_with("java.home") {
info_map.insert("java_home", split_str(line.trim(), "=".to_string())[1]);
continue;
} else if line.trim().starts_with("os.version") {
info_map.insert("os_version", split_str(line.trim(), "=".to_string())[1]);
continue;
}
}
crate::commons::generic_to_string(&info_map).unwrap()
}