use {
bytes::Bytes,
dashmap::DashMap,
prost::Message,
std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
},
tokio::time::interval,
tracing::{error, info, warn},
};
use {
zus_common::{Result, RpcEndpoint, ZusError},
zus_proto::*,
};
#[derive(Debug, Clone)]
pub struct ZooPathNode {
pub path: String,
pub value: Bytes,
pub version: i64,
pub create_flags: i32,
}
pub struct ZusZooClient {
#[allow(dead_code)]
addresses: Vec<String>, sessionid: Arc<parking_lot::RwLock<String>>,
endpoint: Arc<RpcEndpoint>,
path_cache: Arc<DashMap<String, ZooPathNode>>,
running: Arc<AtomicBool>,
}
impl ZusZooClient {
pub async fn new(addresses: Vec<String>) -> Result<Arc<Self>> {
if addresses.is_empty() {
return Err(ZusError::Connection("No ZooServer addresses provided".to_string()));
}
let endpoint = Self::connect_to_server(&addresses).await?;
let client = Arc::new(Self {
addresses,
sessionid: Arc::new(parking_lot::RwLock::new(String::new())),
endpoint: Arc::new(endpoint),
path_cache: Arc::new(DashMap::new()),
running: Arc::new(AtomicBool::new(true)),
});
client.register_client().await?;
let client_clone = client.clone();
tokio::spawn(async move {
client_clone.sync_loop().await;
});
Ok(client)
}
async fn connect_to_server(addresses: &[String]) -> Result<RpcEndpoint> {
for addr in addresses {
let parts: Vec<&str> = addr.split(':').collect();
if parts.len() != 2 {
continue;
}
let host = parts[0].to_string();
let port = parts[1].parse::<u16>().ok().unwrap_or(2181);
match RpcEndpoint::connect(host, port).await {
| Ok(endpoint) => {
info!("Connected to ZooServer at {}", addr);
return Ok(endpoint);
}
| Err(_) => continue,
}
}
Err(ZusError::Connection("Failed to connect to any ZooServer".to_string()))
}
async fn register_client(&self) -> Result<()> {
let req = ZooRegisterClientRequest::default();
let mut buf = Vec::new();
req.encode(&mut buf)?;
let response = self
.endpoint
.sync_call(Bytes::from("RegisterClient"), Bytes::from(buf), 5000)
.await?;
let resp = ZooRegisterClientResponse::decode(response)?;
if resp.ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!("Registration failed: {}", resp.ret)));
}
let sessionid = resp.sessionid.clone().unwrap_or_default();
*self.sessionid.write() = sessionid.clone();
info!("Registered with sessionid: {}", sessionid);
Ok(())
}
pub async fn create_path(&self, path: String, value: Bytes, create_flags: i32) -> Result<u64> {
let req = ZooCreatePathRequest {
sessionid: self.sessionid.read().clone(),
path: path.clone(),
flags: Some(create_flags),
val: Some(value.to_vec()),
};
let mut buf = Vec::new();
req.encode(&mut buf)?;
let response = self
.endpoint
.sync_call(Bytes::from("CreatePath"), Bytes::from(buf), 5000)
.await?;
let resp = ZooCreatePathResponse::decode(response)?;
if resp.ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!("CreatePath failed: {}", resp.ret)));
}
let version = resp.version.unwrap_or(0);
self.path_cache.insert(
path.clone(),
ZooPathNode {
path,
value,
version: version as i64,
create_flags,
},
);
Ok(version)
}
pub async fn get_path_child(&self, path: &str) -> Result<Vec<ZooPathFileNode>> {
let req = ZooGetPathChildRequest {
sessionid: self.sessionid.read().clone(),
path: path.to_string(),
watch: None,
};
let mut buf = Vec::new();
req.encode(&mut buf)?;
let response = self
.endpoint
.sync_call(Bytes::from("GetPathChild"), Bytes::from(buf), 5000)
.await?;
let resp = ZooGetPathChildResponse::decode(response)?;
if resp.ret != constants::ZOO_RET_SUCCESS {
if resp.ret == constants::ZOO_RET_PATH_NOT_EXIST {
return Ok(Vec::new());
}
return Err(ZusError::Rpc(format!("GetPathChild failed: {}", resp.ret)));
}
Ok(resp.childs)
}
pub async fn get_path_child_ex(&self, path: &str) -> Result<Vec<ZooPathFileNodeEx>> {
let req = ZooGetPathChildExRequest {
sessionid: self.sessionid.read().clone(),
path: path.to_string(),
watch: None,
};
let mut buf = Vec::new();
req.encode(&mut buf)?;
let response = self
.endpoint
.sync_call(Bytes::from("GetPathChildEx"), Bytes::from(buf), 5000)
.await?;
let resp = ZooGetPathChildExResponse::decode(response)?;
if resp.ret != constants::ZOO_RET_SUCCESS {
if resp.ret == constants::ZOO_RET_PATH_NOT_EXIST {
return Ok(Vec::new());
}
return Err(ZusError::Rpc(format!("GetPathChildEx failed: {}", resp.ret)));
}
Ok(resp.childs)
}
pub async fn get_path_value(&self, path: &str) -> Result<(Bytes, u64)> {
let req = ZooGetPathRequest {
sessionid: self.sessionid.read().clone(),
path: path.to_string(),
watch: None,
};
let mut buf = Vec::new();
req.encode(&mut buf)?;
let response = self
.endpoint
.sync_call(Bytes::from("GetPath"), Bytes::from(buf), 5000)
.await?;
let resp = ZooGetPathResponse::decode(response)?;
if resp.ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!("GetPath failed: {}", resp.ret)));
}
let value = resp.val.clone().unwrap_or_default();
let version = resp.version.unwrap_or(0);
Ok((Bytes::from(value), version))
}
pub async fn delete_path(&self, path: &str) -> Result<()> {
let req = ZooDeletePathRequest {
sessionid: self.sessionid.read().clone(),
path: path.to_string(),
version: None,
};
let mut buf = Vec::new();
req.encode(&mut buf)?;
let response = self
.endpoint
.sync_call(Bytes::from("DeletePath"), Bytes::from(buf), 5000)
.await?;
let resp = ZooDeletePathResponse::decode(response)?;
if resp.ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!("DeletePath failed: {}", resp.ret)));
}
self.path_cache.remove(path);
Ok(())
}
pub async fn set_path(&self, path: &str, value: Bytes, version: u64) -> Result<u64> {
let req = ZooSetPathRequest {
sessionid: self.sessionid.read().clone(),
path: path.to_string(),
watch: None,
val: value.to_vec(),
version: Some(version),
};
let mut buf = Vec::new();
req.encode(&mut buf)?;
let response = self
.endpoint
.sync_call(Bytes::from("SetPath"), Bytes::from(buf), 5000)
.await?;
let resp = ZooSetPathResponse::decode(response)?;
if resp.ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!("SetPath failed: {}", resp.ret)));
}
let new_version = resp.version.unwrap_or(version);
if let Some(mut entry) = self.path_cache.get_mut(path) {
entry.value = value;
entry.version = new_version as i64;
}
Ok(new_version)
}
async fn sync_path_versions(&self) -> Result<()> {
if self.path_cache.is_empty() {
return Ok(());
}
let path_nodes: Vec<ZooPathFileNode> = self
.path_cache
.iter()
.map(|entry| ZooPathFileNode {
file: entry.key().clone(),
version: entry.value().version as u64,
val: None,
})
.collect();
let req = ZooSyncPathRequest {
sessionid: self.sessionid.read().clone(),
pathnode: path_nodes,
};
let mut buf = Vec::new();
req.encode(&mut buf)?;
let response = self
.endpoint
.sync_call(Bytes::from("SyncPath"), Bytes::from(buf), 5000)
.await;
match response {
| Ok(data) => {
let resp = ZooSyncPathResponse::decode(data)?;
if resp.ret == constants::ZOO_RET_SERVER_NOT_REG {
warn!("Server not registered, re-registering...");
self.register_client().await?;
self.rebuild_paths().await?;
}
}
| Err(e) => {
warn!("Sync failed: {:?}", e);
}
}
Ok(())
}
async fn rebuild_paths(&self) -> Result<()> {
let paths: Vec<(String, Bytes, i32)> = self
.path_cache
.iter()
.map(|entry| {
(
entry.key().clone(),
entry.value().value.clone(),
entry.value().create_flags,
)
})
.collect();
for (path, value, flags) in paths {
if let Err(e) = self.create_path(path.clone(), value, flags).await {
error!("Failed to rebuild path {}: {:?}", path, e);
}
}
Ok(())
}
async fn sync_loop(self: Arc<Self>) {
let mut tick = interval(Duration::from_secs(3));
while self.running.load(Ordering::SeqCst) {
tick.tick().await;
if let Err(e) = self.sync_path_versions().await {
error!("Sync error: {:?}", e);
}
}
}
pub async fn shutdown(&self) {
self.running.store(false, Ordering::SeqCst);
}
pub fn sessionid(&self) -> String {
self.sessionid.read().clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_path_node() {
let node = ZooPathNode {
path: "/test".to_string(),
value: Bytes::from("hello"),
version: 1,
create_flags: 0,
};
assert_eq!(node.path, "/test");
assert_eq!(node.version, 1);
}
}