zus-discovery 1.1.4

Service discovery client for ZUS RPC framework with ZooServer integration
Documentation
use bytes::Bytes;
use dashmap::DashMap;
use prost::Message;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tracing::{error, info, warn};

use zus_common::{RpcEndpoint, Result, ZusError};
use zus_proto::*;

/// ZooServer Path Node (cached path information)
#[derive(Debug, Clone)]
pub struct ZooPathNode {
    pub path: String,
    pub value: Bytes,
    pub version: u64,
    pub create_flags: i32,
}

/// ZooServer Client (replacing C++'s ZusZooClient, matching Java's ZooServerClient)
pub struct ZooServerClient {
    #[allow(dead_code)]
    addresses: Vec<String>, // Kept for potential reconnection logic
    sessionid: Arc<parking_lot::RwLock<String>>,
    endpoint: Arc<RpcEndpoint>,
    path_cache: Arc<DashMap<String, ZooPathNode>>,
    running: Arc<AtomicBool>,
}

impl ZooServerClient {
    /// Create new ZooServer client
    pub async fn new(addresses: Vec<String>) -> Result<Arc<Self>> {
        if addresses.is_empty() {
            return Err(ZusError::Connection(
                "No ZooServer addresses provided".to_string(),
            ));
        }

        // Connect to first available server
        let endpoint = Self::connect_to_server(&addresses).await?;

        let client = Arc::new(Self {
            addresses,
            sessionid: Arc::new(parking_lot::RwLock::new(String::new())),
            endpoint: Arc::new(endpoint),
            path_cache: Arc::new(DashMap::new()),
            running: Arc::new(AtomicBool::new(true)),
        });

        // Register client
        client.register_client().await?;

        // Start background sync thread
        let client_clone = client.clone();
        tokio::spawn(async move {
            client_clone.sync_loop().await;
        });

        Ok(client)
    }

    /// Connect to ZooServer
    async fn connect_to_server(addresses: &[String]) -> Result<RpcEndpoint> {
        for addr in addresses {
            let parts: Vec<&str> = addr.split(':').collect();
            if parts.len() != 2 {
                continue;
            }

            let host = parts[0].to_string();
            let port = parts[1].parse::<u16>().ok().unwrap_or(2181);

            match RpcEndpoint::connect(host, port).await {
                Ok(endpoint) => {
                    info!("Connected to ZooServer at {}", addr);
                    return Ok(endpoint);
                }
                Err(_) => continue,
            }
        }

        Err(ZusError::Connection(
            "Failed to connect to any ZooServer".to_string(),
        ))
    }

    /// Register client with ZooServer
    async fn register_client(&self) -> Result<()> {
        let req = ZooRegisterClientRequest::default();

        let buf = req.encode_to_vec();

        let response = self
            .endpoint
            .sync_call(
                Bytes::from("RegisterClient"),
                Bytes::from(buf),
                5000,
            )
            .await?;

        let resp = ZooRegisterClientResponse::decode(&response[..])
            .map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;

        let ret = resp.ret;  // required field - direct access
        if ret != constants::ZOO_RET_SUCCESS {
            return Err(ZusError::Rpc(format!(
                "Registration failed: {}",
                ret
            )));
        }

        let sessionid = resp.sessionid.clone().unwrap_or_default();  // optional field
        *self.sessionid.write() = sessionid.clone();
        info!("Registered with sessionid: {}", sessionid);

        Ok(())
    }

    /// Create path in ZooServer
    pub async fn create_path(
        &self,
        path: String,
        value: Bytes,
        create_flags: i32,
    ) -> Result<u64> {
        let req = ZooCreatePathRequest {
            sessionid: self.sessionid.read().clone(),  // required field
            path: path.clone(),  // required field
            flags: Some(create_flags),  // optional field
            val: Some(value.to_vec()),  // optional field
        };

        let buf = req.encode_to_vec();

        let response = self
            .endpoint
            .sync_call(
                Bytes::from("CreatePath"),
                Bytes::from(buf),
                5000,
            )
            .await?;

        let resp = ZooCreatePathResponse::decode(&response[..])
            .map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;

        let ret = resp.ret;  // required field - direct access
        if ret != constants::ZOO_RET_SUCCESS {
            return Err(ZusError::Rpc(format!(
                "CreatePath failed: {}",
                ret
            )));
        }

        let version = resp.version.unwrap_or(0);  // optional field

        // Cache the path
        self.path_cache.insert(
            path.clone(),
            ZooPathNode {
                path,
                value,
                version,
                create_flags,
            },
        );

        Ok(version)
    }

    /// Get path children
    pub async fn get_path_child(&self, path: &str) -> Result<Vec<ZooPathFileNode>> {
        let req = ZooGetPathChildRequest {
            sessionid: self.sessionid.read().clone(),  // required field
            path: path.to_string(),  // required field
            watch: None,  // optional field
        };

        let buf = req.encode_to_vec();

        let response = self
            .endpoint
            .sync_call(
                Bytes::from("GetPathChild"),
                Bytes::from(buf),
                5000,
            )
            .await?;

        let resp = ZooGetPathChildResponse::decode(&response[..])
            .map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;

        let ret = resp.ret;  // required field - direct access
        if ret != constants::ZOO_RET_SUCCESS {
            if ret == constants::ZOO_RET_PATH_NOT_EXIST {
                return Ok(Vec::new());
            }
            return Err(ZusError::Rpc(format!(
                "GetPathChild failed: {}",
                ret
            )));
        }

        Ok(resp.childs)  // repeated field - direct access
    }

    /// Get path value
    pub async fn get_path_value(&self, path: &str) -> Result<(Bytes, u64)> {
        let req = ZooGetPathRequest {
            sessionid: self.sessionid.read().clone(),  // required field
            path: path.to_string(),  // required field
            watch: None,  // optional field
        };

        let buf = req.encode_to_vec();

        let response = self
            .endpoint
            .sync_call(
                Bytes::from("GetPath"),
                Bytes::from(buf),
                5000,
            )
            .await?;

        let resp = ZooGetPathResponse::decode(&response[..])
            .map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;

        let ret = resp.ret;  // required field - direct access
        if ret != constants::ZOO_RET_SUCCESS {
            return Err(ZusError::Rpc(format!("GetPath failed: {}", ret)));
        }

        let value = resp.val.clone().unwrap_or_default();  // optional field
        let version = resp.version.unwrap_or(0);  // optional field

        Ok((Bytes::from(value), version))
    }

    /// Delete path
    pub async fn delete_path(&self, path: &str) -> Result<()> {
        let req = ZooDeletePathRequest {
            sessionid: self.sessionid.read().clone(),  // required field
            path: path.to_string(),  // required field
            version: None,  // optional field
        };

        let buf = req.encode_to_vec();

        let response = self
            .endpoint
            .sync_call(
                Bytes::from("DeletePath"),
                Bytes::from(buf),
                5000,
            )
            .await?;

        let resp = ZooDeletePathResponse::decode(&response[..])
            .map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;

        let ret = resp.ret;  // required field - direct access
        if ret != constants::ZOO_RET_SUCCESS {
            return Err(ZusError::Rpc(format!(
                "DeletePath failed: {}",
                ret
            )));
        }

        self.path_cache.remove(path);
        Ok(())
    }

    /// Synchronize path versions (background task)
    async fn sync_path_versions(&self) -> Result<()> {
        if self.path_cache.is_empty() {
            return Ok(());
        }

        let path_nodes: Vec<ZooPathFileNode> = self
            .path_cache
            .iter()
            .map(|entry| {
                ZooPathFileNode {
                    file: entry.key().clone(),  // required field
                    version: entry.value().version,  // required field
                    val: None,  // optional field
                }
            })
            .collect();

        let req = ZooSyncPathRequest {
            sessionid: self.sessionid.read().clone(),  // required field
            pathnode: path_nodes,  // repeated field
        };

        let buf = req.encode_to_vec();

        let response = self
            .endpoint
            .sync_call(
                Bytes::from("SyncPath"),
                Bytes::from(buf),
                5000,
            )
            .await;

        match response {
            Ok(data) => {
                let resp = ZooSyncPathResponse::decode(&data[..])
                    .map_err(|e| ZusError::Rpc(format!("Failed to parse response: {}", e)))?;
                let ret = resp.ret;  // required field - direct access
                if ret == constants::ZOO_RET_SERVER_NOT_REG {
                    warn!("Server not registered, re-registering...");
                    self.register_client().await?;
                    self.rebuild_paths().await?;
                }
            }
            Err(e) => {
                warn!("Sync failed: {:?}", e);
            }
        }

        Ok(())
    }

    /// Rebuild all paths after reconnection
    async fn rebuild_paths(&self) -> Result<()> {
        let paths: Vec<(String, Bytes, i32)> = self
            .path_cache
            .iter()
            .map(|entry| {
                (
                    entry.key().clone(),
                    entry.value().value.clone(),
                    entry.value().create_flags,
                )
            })
            .collect();

        for (path, value, flags) in paths {
            if let Err(e) = self.create_path(path.clone(), value, flags).await {
                error!("Failed to rebuild path {}: {:?}", path, e);
            }
        }

        Ok(())
    }

    /// Background sync loop (every 3 seconds, matching Java version)
    async fn sync_loop(self: Arc<Self>) {
        let mut tick = interval(Duration::from_secs(3));

        while self.running.load(Ordering::SeqCst) {
            tick.tick().await;

            if let Err(e) = self.sync_path_versions().await {
                error!("Sync error: {:?}", e);
            }
        }
    }

    /// Shutdown the client
    pub async fn shutdown(&self) {
        self.running.store(false, Ordering::SeqCst);
    }

    pub fn sessionid(&self) -> String {
        self.sessionid.read().clone()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_path_node() {
        let node = ZooPathNode {
            path: "/test".to_string(),
            value: Bytes::from("hello"),
            version: 1u64,
            create_flags: 0,
        };

        assert_eq!(node.path, "/test");
        assert_eq!(node.version, 1u64);
    }
}