use bytes::Bytes;
use dashmap::DashMap;
use prost::Message;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tracing::{error, info, warn};
use zus_common::{RpcEndpoint, Result, ZusError};
use zus_proto::*;
#[derive(Debug, Clone)]
pub struct ZooPathNode {
pub path: String,
pub value: Bytes,
pub version: u64,
pub create_flags: i32,
}
pub struct ZooServerClient {
#[allow(dead_code)]
addresses: Vec<String>, sessionid: Arc<parking_lot::RwLock<String>>,
endpoint: Arc<RpcEndpoint>,
path_cache: Arc<DashMap<String, ZooPathNode>>,
running: Arc<AtomicBool>,
}
impl ZooServerClient {
pub async fn new(addresses: Vec<String>) -> Result<Arc<Self>> {
if addresses.is_empty() {
return Err(ZusError::Connection(
"No ZooServer addresses provided".to_string(),
));
}
let endpoint = Self::connect_to_server(&addresses).await?;
let client = Arc::new(Self {
addresses,
sessionid: Arc::new(parking_lot::RwLock::new(String::new())),
endpoint: Arc::new(endpoint),
path_cache: Arc::new(DashMap::new()),
running: Arc::new(AtomicBool::new(true)),
});
client.register_client().await?;
let client_clone = client.clone();
tokio::spawn(async move {
client_clone.sync_loop().await;
});
Ok(client)
}
async fn connect_to_server(addresses: &[String]) -> Result<RpcEndpoint> {
for addr in addresses {
let parts: Vec<&str> = addr.split(':').collect();
if parts.len() != 2 {
continue;
}
let host = parts[0].to_string();
let port = parts[1].parse::<u16>().ok().unwrap_or(2181);
match RpcEndpoint::connect(host, port).await {
Ok(endpoint) => {
info!("Connected to ZooServer at {}", addr);
return Ok(endpoint);
}
Err(_) => continue,
}
}
Err(ZusError::Connection(
"Failed to connect to any ZooServer".to_string(),
))
}
async fn register_client(&self) -> Result<()> {
let req = ZooRegisterClientRequest::default();
let buf = req.encode_to_vec();
let response = self
.endpoint
.sync_call(
Bytes::from("RegisterClient"),
Bytes::from(buf),
5000,
)
.await?;
let resp = ZooRegisterClientResponse::decode(&response[..])
.map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;
let ret = resp.ret; if ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!(
"Registration failed: {}",
ret
)));
}
let sessionid = resp.sessionid.clone().unwrap_or_default(); *self.sessionid.write() = sessionid.clone();
info!("Registered with sessionid: {}", sessionid);
Ok(())
}
pub async fn create_path(
&self,
path: String,
value: Bytes,
create_flags: i32,
) -> Result<u64> {
let req = ZooCreatePathRequest {
sessionid: self.sessionid.read().clone(), path: path.clone(), flags: Some(create_flags), val: Some(value.to_vec()), };
let buf = req.encode_to_vec();
let response = self
.endpoint
.sync_call(
Bytes::from("CreatePath"),
Bytes::from(buf),
5000,
)
.await?;
let resp = ZooCreatePathResponse::decode(&response[..])
.map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;
let ret = resp.ret; if ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!(
"CreatePath failed: {}",
ret
)));
}
let version = resp.version.unwrap_or(0);
self.path_cache.insert(
path.clone(),
ZooPathNode {
path,
value,
version,
create_flags,
},
);
Ok(version)
}
pub async fn get_path_child(&self, path: &str) -> Result<Vec<ZooPathFileNode>> {
let req = ZooGetPathChildRequest {
sessionid: self.sessionid.read().clone(), path: path.to_string(), watch: None, };
let buf = req.encode_to_vec();
let response = self
.endpoint
.sync_call(
Bytes::from("GetPathChild"),
Bytes::from(buf),
5000,
)
.await?;
let resp = ZooGetPathChildResponse::decode(&response[..])
.map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;
let ret = resp.ret; if ret != constants::ZOO_RET_SUCCESS {
if ret == constants::ZOO_RET_PATH_NOT_EXIST {
return Ok(Vec::new());
}
return Err(ZusError::Rpc(format!(
"GetPathChild failed: {}",
ret
)));
}
Ok(resp.childs) }
pub async fn get_path_value(&self, path: &str) -> Result<(Bytes, u64)> {
let req = ZooGetPathRequest {
sessionid: self.sessionid.read().clone(), path: path.to_string(), watch: None, };
let buf = req.encode_to_vec();
let response = self
.endpoint
.sync_call(
Bytes::from("GetPath"),
Bytes::from(buf),
5000,
)
.await?;
let resp = ZooGetPathResponse::decode(&response[..])
.map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;
let ret = resp.ret; if ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!("GetPath failed: {}", ret)));
}
let value = resp.val.clone().unwrap_or_default(); let version = resp.version.unwrap_or(0);
Ok((Bytes::from(value), version))
}
pub async fn delete_path(&self, path: &str) -> Result<()> {
let req = ZooDeletePathRequest {
sessionid: self.sessionid.read().clone(), path: path.to_string(), version: None, };
let buf = req.encode_to_vec();
let response = self
.endpoint
.sync_call(
Bytes::from("DeletePath"),
Bytes::from(buf),
5000,
)
.await?;
let resp = ZooDeletePathResponse::decode(&response[..])
.map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;
let ret = resp.ret; if ret != constants::ZOO_RET_SUCCESS {
return Err(ZusError::Rpc(format!(
"DeletePath failed: {}",
ret
)));
}
self.path_cache.remove(path);
Ok(())
}
async fn sync_path_versions(&self) -> Result<()> {
if self.path_cache.is_empty() {
return Ok(());
}
let path_nodes: Vec<ZooPathFileNode> = self
.path_cache
.iter()
.map(|entry| {
ZooPathFileNode {
file: entry.key().clone(), version: entry.value().version, val: None, }
})
.collect();
let req = ZooSyncPathRequest {
sessionid: self.sessionid.read().clone(), pathnode: path_nodes, };
let buf = req.encode_to_vec();
let response = self
.endpoint
.sync_call(
Bytes::from("SyncPath"),
Bytes::from(buf),
5000,
)
.await;
match response {
Ok(data) => {
let resp = ZooSyncPathResponse::decode(&data[..])
.map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;
let ret = resp.ret; if ret == constants::ZOO_RET_SERVER_NOT_REG {
warn!("Server not registered, re-registering...");
self.register_client().await?;
self.rebuild_paths().await?;
}
}
Err(e) => {
warn!("Sync failed: {:?}", e);
}
}
Ok(())
}
async fn rebuild_paths(&self) -> Result<()> {
let paths: Vec<(String, Bytes, i32)> = self
.path_cache
.iter()
.map(|entry| {
(
entry.key().clone(),
entry.value().value.clone(),
entry.value().create_flags,
)
})
.collect();
for (path, value, flags) in paths {
if let Err(e) = self.create_path(path.clone(), value, flags).await {
error!("Failed to rebuild path {}: {:?}", path, e);
}
}
Ok(())
}
async fn sync_loop(self: Arc<Self>) {
let mut tick = interval(Duration::from_secs(3));
while self.running.load(Ordering::SeqCst) {
tick.tick().await;
if let Err(e) = self.sync_path_versions().await {
error!("Sync error: {:?}", e);
}
}
}
pub async fn shutdown(&self) {
self.running.store(false, Ordering::SeqCst);
}
pub fn sessionid(&self) -> String {
self.sessionid.read().clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_path_node() {
let node = ZooPathNode {
path: "/test".to_string(),
value: Bytes::from("hello"),
version: 1u64,
create_flags: 0,
};
assert_eq!(node.path, "/test");
assert_eq!(node.version, 1u64);
}
}