rnacos 0.8.3

Nacos server re-implemented in Rust.
Documentation
use crate::cache::core::DirectCacheManager;
use crate::common::byte_utils::bin_to_id;
use crate::common::constant::{
    CACHE_TREE_NAME, CONFIG_TREE_NAME, DIRECT_CACHE_TABLE_NAME, MCP_SERVER_TABLE_NAME,
    MCP_TOOL_SPEC_TABLE_NAME, NAMESPACE_TREE_NAME, NAMING_INSTANCE_TABLE, SEQUENCE_TREE_NAME,
    SEQ_KEY_CONFIG, USER_TREE_NAME,
};
use crate::config::core::{ConfigActor, ConfigCmd, ConfigKey, ConfigValue};
use crate::config::model::{ConfigRaftCmd, ConfigValueDO};
use crate::mcp::core::McpManager;
use crate::namespace::NamespaceActor;
use crate::naming::core::NamingActor;
use crate::raft::db::table::{TableManager, TableManagerInnerReq, TableManagerReq};
use crate::raft::filestore::model::SnapshotRecordDto;
use crate::raft::filestore::raftapply::RaftApplyDataRequest;
use crate::raft::filestore::raftindex::{RaftIndexManager, RaftIndexRequest};
use crate::raft::filestore::raftsnapshot::SnapshotWriterActor;
use crate::raft::store::{ClientRequest, ClientResponse};
use crate::sequence::core::SequenceDbManager;
use actix::prelude::*;

#[derive(Clone)]
pub struct RaftDataHandler {
    pub config: Addr<ConfigActor>,
    pub table: Addr<TableManager>,
    pub namespace: Addr<NamespaceActor>,
    pub sequence_db: Addr<SequenceDbManager>,
    pub mcp_manager: Addr<McpManager>,
    pub naming_actor: Addr<NamingActor>,
    pub direct_cache_manager: Addr<DirectCacheManager>,
}

impl RaftDataHandler {
    pub async fn build_snapshot(&self, writer: Addr<SnapshotWriterActor>) -> anyhow::Result<()> {
        log::info!("RaftDataHandler|build_snapshot");
        self.sequence_db
            .send(RaftApplyDataRequest::BuildSnapshot(writer.clone()))
            .await??;
        self.config
            .send(ConfigCmd::BuildSnapshot(writer.clone()))
            .await??;
        self.table
            .send(TableManagerInnerReq::BuildSnapshot(writer.clone()))
            .await??;
        self.namespace
            .send(RaftApplyDataRequest::BuildSnapshot(writer.clone()))
            .await??;
        self.mcp_manager
            .send(RaftApplyDataRequest::BuildSnapshot(writer.clone()))
            .await??;
        self.naming_actor
            .send(RaftApplyDataRequest::BuildSnapshot(writer.clone()))
            .await??;
        self.direct_cache_manager
            .send(RaftApplyDataRequest::BuildSnapshot(writer.clone()))
            .await??;
        Ok(())
    }

    pub async fn load_snapshot(&self, record: SnapshotRecordDto) -> anyhow::Result<()> {
        if record.tree.as_str() == CONFIG_TREE_NAME.as_str() {
            let config_key = ConfigKey::from(&String::from_utf8(record.key)? as &str);
            let value_do = ConfigValueDO::from_bytes(&record.value)?;
            self.config
                .send(ConfigCmd::SetFullValue(config_key, value_do.into()))
                .await??;
        } else if record.tree.as_str() == SEQUENCE_TREE_NAME.as_str() {
            let key = String::from_utf8_lossy(&record.key);
            let last_id = bin_to_id(&record.value);
            if &key as &str == SEQ_KEY_CONFIG {
                self.config
                    .send(ConfigCmd::InnerSetLastId(last_id))
                    .await??;
            } else {
                let req = RaftApplyDataRequest::LoadSnapshotRecord(record);
                self.sequence_db.send(req).await??;
            }
        } else if record.tree.as_str() == USER_TREE_NAME.as_str() {
            let key = record.key;
            let value = record.value;
            let req = TableManagerReq::Set {
                table_name: USER_TREE_NAME.clone(),
                key,
                value,
                last_seq_id: None,
            };
            self.table.send(req).await??;
        } else if record.tree.as_str() == CACHE_TREE_NAME.as_str() {
            let key = record.key;
            let value = record.value;
            // 兼容旧版本
            //if let Ok(direct_req) = crate::cache::adaptation::AdaptationUtils::build_raft_req_from_old(&key, &value){
            //    self.direct_cache_manager.send(direct_req).await??;
            //}
            let req = TableManagerReq::Set {
                table_name: CACHE_TREE_NAME.clone(),
                key,
                value,
                last_seq_id: None,
            };
            self.table.send(req).await??;
        } else if record.tree.as_str() == DIRECT_CACHE_TABLE_NAME.as_str() {
            let req = RaftApplyDataRequest::LoadSnapshotRecord(record);
            self.direct_cache_manager.send(req).await??;
        } else if record.tree.as_str() == NAMESPACE_TREE_NAME.as_str() {
            let req = RaftApplyDataRequest::LoadSnapshotRecord(record);
            self.namespace.send(req).await??;
        } else if record.tree.as_str() == MCP_SERVER_TABLE_NAME.as_str()
            || record.tree.as_str() == MCP_TOOL_SPEC_TABLE_NAME.as_str()
        {
            let req = RaftApplyDataRequest::LoadSnapshotRecord(record);
            self.mcp_manager.send(req).await??;
        } else if record.tree.as_str() == NAMING_INSTANCE_TABLE.as_str() {
            let req = RaftApplyDataRequest::LoadSnapshotRecord(record);
            self.naming_actor.send(req).await??;
        } else {
            log::warn!(
                "do_load_snapshot ignore data,table name:{}",
                record.tree.as_str()
            );
        }
        Ok(())
    }

    pub fn load_complete(&self) -> anyhow::Result<()> {
        log::info!("RaftDataHandler|load_complete");
        self.namespace.do_send(RaftApplyDataRequest::LoadCompleted);
        self.sequence_db
            .do_send(RaftApplyDataRequest::LoadCompleted);
        self.mcp_manager
            .do_send(RaftApplyDataRequest::LoadCompleted);
        self.naming_actor
            .do_send(RaftApplyDataRequest::LoadCompleted);
        self.direct_cache_manager
            .do_send(RaftApplyDataRequest::LoadCompleted);
        Ok(())
    }

    /// 启动时加载日志
    pub async fn load_log(
        &self,
        req: ClientRequest,
        index_manager: &Addr<RaftIndexManager>,
    ) -> anyhow::Result<()> {
        match req {
            ClientRequest::NodeAddr { id, addr } => {
                index_manager
                    .send(RaftIndexRequest::AddNodeAddr(id, addr))
                    .await
                    .ok();
            }
            ClientRequest::Members(member) => {
                index_manager
                    .send(RaftIndexRequest::SaveMember {
                        member: member.clone(),
                        member_after_consensus: None,
                        node_addr: None,
                    })
                    .await
                    .ok();
            }
            ClientRequest::SequenceReq { req } => {
                self.sequence_db.send(req).await.ok();
            }
            ClientRequest::ConfigSet {
                key,
                value,
                config_type,
                desc,
                history_id,
                history_table_id,
                op_time,
                op_user,
            } => {
                let cmd = ConfigRaftCmd::ConfigAdd {
                    key,
                    value,
                    config_type,
                    desc,
                    history_id,
                    history_table_id,
                    op_time,
                    op_user,
                };
                self.config.send(cmd).await.ok();
            }
            ClientRequest::ConfigFullValue {
                key,
                value,
                last_seq_id: last_id,
            } => {
                let key = String::from_utf8_lossy(&key).to_string();
                let key: ConfigKey = (&key as &str).into();
                let value_do = ConfigValueDO::from_bytes(&value)?;
                let config_value: ConfigValue = value_do.into();
                let cmd = ConfigRaftCmd::SetFullValue {
                    key,
                    value: config_value,
                    last_id,
                };
                self.config.send(cmd).await.ok();
            }
            ClientRequest::ConfigRemove { key } => {
                let cmd = ConfigRaftCmd::ConfigRemove { key };
                self.config.send(cmd).await.ok();
            }
            ClientRequest::TableManagerReq(req) => {
                self.table.send(req).await.ok();
            }
            ClientRequest::NamespaceReq(req) => {
                self.namespace.send(req).await.ok();
            }
            ClientRequest::McpReq { req } => {
                self.mcp_manager.send(req).await.ok();
            }
            ClientRequest::NamingReq { req } => {
                self.naming_actor.send(req).await.ok();
            }
            ClientRequest::CacheReq { req } => {
                self.direct_cache_manager.send(req).await.ok();
            }
        }
        Ok(())
    }

    /// 接收raft请求到状态机,需要返回结果到调用端
    pub async fn apply_log_to_state_machine(
        &self,
        req: ClientRequest,
        index_manager: &Addr<RaftIndexManager>,
    ) -> anyhow::Result<ClientResponse> {
        match req {
            ClientRequest::NodeAddr { id, addr } => {
                index_manager.do_send(RaftIndexRequest::AddNodeAddr(id, addr));
                Ok(ClientResponse::Success)
            }
            ClientRequest::Members(member) => {
                index_manager.do_send(RaftIndexRequest::SaveMember {
                    member: member.clone(),
                    member_after_consensus: None,
                    node_addr: None,
                });
                Ok(ClientResponse::Success)
            }
            ClientRequest::SequenceReq { req } => {
                let r = self.sequence_db.send(req).await??;
                Ok(ClientResponse::SequenceResp { resp: r })
            }
            ClientRequest::ConfigSet {
                key,
                value,
                config_type,
                desc,
                history_id,
                history_table_id,
                op_time,
                op_user,
            } => {
                let cmd = ConfigRaftCmd::ConfigAdd {
                    key,
                    value,
                    config_type,
                    desc,
                    history_id,
                    history_table_id,
                    op_time,
                    op_user,
                };
                self.config.send(cmd).await??;
                Ok(ClientResponse::Success)
            }
            ClientRequest::ConfigFullValue {
                key,
                value,
                last_seq_id: last_id,
            } => {
                let key = String::from_utf8_lossy(&key).to_string();
                let key: ConfigKey = (&key as &str).into();
                let value_do = ConfigValueDO::from_bytes(&value)?;
                let config_value: ConfigValue = value_do.into();
                let cmd = ConfigRaftCmd::SetFullValue {
                    key,
                    value: config_value,
                    last_id,
                };
                self.config.send(cmd).await??;
                Ok(ClientResponse::Success)
            }
            ClientRequest::ConfigRemove { key } => {
                let cmd = ConfigRaftCmd::ConfigRemove { key };
                self.config.send(cmd).await??;
                Ok(ClientResponse::Success)
            }
            ClientRequest::TableManagerReq(req) => {
                self.table.send(req).await??;
                Ok(ClientResponse::Success)
            }
            ClientRequest::NamespaceReq(req) => {
                self.namespace.send(req).await??;
                Ok(ClientResponse::Success)
            }
            ClientRequest::McpReq { req } => {
                let resp = self.mcp_manager.send(req).await??;
                Ok(ClientResponse::McpResp { resp })
            }
            ClientRequest::NamingReq { req } => {
                let resp = self.naming_actor.send(req).await??;
                Ok(ClientResponse::NamingResp { resp })
            }
            ClientRequest::CacheReq { req } => {
                let resp = self.direct_cache_manager.send(req).await??;
                Ok(ClientResponse::CacheResp { resp })
            }
        }
    }

    pub fn do_send_log(
        &self,
        req: ClientRequest,
        index_manager: &Addr<RaftIndexManager>,
    ) -> anyhow::Result<()> {
        match req {
            ClientRequest::NodeAddr { id, addr } => {
                index_manager.do_send(RaftIndexRequest::AddNodeAddr(id, addr));
            }
            ClientRequest::Members(member) => {
                index_manager.do_send(RaftIndexRequest::SaveMember {
                    member: member.clone(),
                    member_after_consensus: None,
                    node_addr: None,
                });
            }
            ClientRequest::SequenceReq { req } => {
                self.sequence_db.do_send(req);
            }
            ClientRequest::ConfigSet {
                key,
                value,
                config_type,
                desc,
                history_id,
                history_table_id,
                op_time,
                op_user,
            } => {
                let cmd = ConfigRaftCmd::ConfigAdd {
                    key,
                    value,
                    config_type,
                    desc,
                    history_id,
                    history_table_id,
                    op_time,
                    op_user,
                };
                self.config.do_send(cmd);
            }
            ClientRequest::ConfigFullValue {
                key,
                value,
                last_seq_id: last_id,
            } => {
                let key = String::from_utf8_lossy(&key).to_string();
                let key: ConfigKey = (&key as &str).into();
                let value_do = ConfigValueDO::from_bytes(&value)?;
                let config_value: ConfigValue = value_do.into();
                let cmd = ConfigRaftCmd::SetFullValue {
                    key,
                    value: config_value,
                    last_id,
                };
                self.config.do_send(cmd);
            }
            ClientRequest::ConfigRemove { key } => {
                let cmd = ConfigRaftCmd::ConfigRemove { key };
                self.config.do_send(cmd);
            }
            ClientRequest::TableManagerReq(req) => {
                self.table.do_send(req);
            }
            ClientRequest::NamespaceReq(req) => {
                self.namespace.do_send(req);
            }
            ClientRequest::McpReq { req } => {
                self.mcp_manager.do_send(req);
            }
            ClientRequest::NamingReq { req } => {
                self.naming_actor.do_send(req);
            }
            ClientRequest::CacheReq { req } => {
                self.direct_cache_manager.do_send(req);
            }
        };
        Ok(())
    }
}