krpc-core 0.2.0

RPC framework for service registration and discovery through API exposure, compatible with Dubbo3 protocol, intertunable with Java projects
Documentation
use super::{Register, Resource, SocketInfo};
use crate::support::dubbo::{decode_url, encode_url};
use async_recursion::async_recursion;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tracing::info;
use zk::OneshotWatcher;
use zookeeper_client as zk;

static EPHEMERAL_OPEN: &zk::CreateOptions<'static> =
    &zk::CreateMode::Ephemeral.with_acls(zk::Acls::anyone_all());
static CONTAINER_OPEN: &zk::CreateOptions<'static> =
    &zk::CreateMode::Container.with_acls(zk::Acls::anyone_all());

pub struct KrpcZookeeper {
    addr: String,

    root_path: String,

    map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
}

impl Register for KrpcZookeeper {
    fn add_resource(&self, resource: Resource) {
        creat_resource_node(
            self.addr.clone(),
            self.root_path.clone(),
            resource,
            self.map.clone(),
        )
    }
}

impl KrpcZookeeper {
    pub fn init(
        addr: &str,
        _name_space: &str,
        map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
    ) -> Self {
        let root_path = "/dubbo".to_string();
        let krpc_zookeeper = KrpcZookeeper {
            addr: addr.to_string(),
            root_path,
            map,
        };
        return krpc_zookeeper;
    }
}

#[async_recursion]
async fn connect(cluster: &str, chroot: &str) -> zk::Client {
    let client = match zk::Client::connect(&cluster).await {
        Ok(client) => client,
        Err(err) => {
            tokio::time::sleep(Duration::from_secs(30)).await;
            info!("connect err {:?} ,Try to re-establish the connection", err);
            return connect(cluster, chroot).await;
        }
    };
    if chroot.len() <= 1 {
        return client;
    }
    let mut i = 1;
    while i <= chroot.len() {
        let j = match chroot[i..].find('/') {
            Some(j) => j + i,
            None => chroot.len(),
        };
        let path = &chroot[..j];
        match client
            .create(path, Default::default(), CONTAINER_OPEN)
            .await
        {
            Ok(_) | Err(zk::Error::NodeExists) => {}
            Err(err) => {
                tokio::time::sleep(Duration::from_secs(30)).await;
                info!("connect err {:?} ,Try to re-establish the connection", err);
                return connect(cluster, chroot).await;
            }
        }
        i = j + 1;
    }
    match client.chroot(chroot.to_string()) {
        Ok(client) => client,
        Err(err) => {
            tokio::time::sleep(Duration::from_secs(30)).await;
            info!("connect err {:?} ,Try to re-establish the connection", err);
            return connect(cluster, chroot).await;
        }
    }
}

fn creat_resource_node(
    cluster: String,
    root: String,
    resource: Resource,
    map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
) {
    let mut path = root.to_string();
    let info = match &resource {
        Resource::Client(info) => {
            listener_resource_node_change(
                cluster.clone(),
                root,
                Resource::Client(info.clone()),
                map,
            );
            path.push_str(&("/".to_owned() + &info.server_name + "/consumers"));
            info
        }
        Resource::Server(info) => {
            path.push_str(&("/".to_owned() + &info.server_name + "/providers"));
            info
        }
    };
    let node_name = encode_url(&resource);
    let node_data = serde_json::to_string(&info).unwrap();
    tokio::spawn(async move {
        loop {
            let client = connect(&cluster, &path).await;
            match client
                .create(&node_name, node_data.as_bytes(), EPHEMERAL_OPEN)
                .await
            {
                Ok(_) => {}
                Err(err) => {
                    info!("create node err {:?}", err);
                    tokio::time::sleep(Duration::from_secs(5)).await;
                    continue;
                }
            }
            match client.check_and_watch_stat(&node_name).await {
                Ok(watch) => {
                    let event = watch.1.changed().await;
                    info!("resource node event {:?}", event);
                }
                Err(err) => {
                    info!("resource node err {:?}", err);
                }
            };
            drop(client);
        }
    });
}

fn listener_resource_node_change(
    cluster: String,
    root: String,
    resource: Resource,
    map: Arc<RwLock<HashMap<String, Vec<SocketInfo>>>>,
) {
    let mut path = root;
    let info = match resource {
        Resource::Client(info) => {
            path.push_str(&("/".to_owned() + &info.server_name + "/providers"));
            info
        }
        Resource::Server(_) => return,
    };
    tokio::spawn(async move {
        let mut client = connect(&cluster.clone(), &path).await;
        let map = map;
        let info = info;
        loop {
            let watcher: (Vec<String>, zk::Stat, OneshotWatcher) =
                match client.get_and_watch_children("/").await {
                    Ok(watcher) => watcher,
                    Err(_) => {
                        client = connect(&cluster.clone(), &path).await;
                        continue;
                    }
                };
            let mut server_list = vec![];
            for node in watcher.0 {
                let resource = decode_url(&node);
                if let Ok(resource) = resource {
                    if let Resource::Server(resource_info) = resource {
                        if &info.version == &resource_info.version  {
                            server_list.push(SocketInfo {
                                info : resource_info,
                                sender: Arc::new(RwLock::new(None)),
                            });
                        }
                    }
                }
            }
            let mut key = info.server_name.clone();
            if let Some(version) = &info.version {
                key.push_str(":");
                key.push_str(version);
            }
            info!("update server cache {:?} : {:?}", key, server_list);
            let mut temp_map = map.write().await;
            temp_map.insert(key, server_list);
            drop(temp_map);
            let event: zk::WatchedEvent = watcher.2.changed().await;
            if let zk::EventType::NodeChildrenChanged = event.event_type {
                info!("Monitor node changes");
            } else {
                client = connect(&cluster.clone(), &path).await;
            }
        }
    });
}