wolfengine 3.0.2

Wolf is a set of modules for realtime rendering, realtime streaming and game developing
use crate::system::algorithm::raft::{
    raft_converter, raft_srv::wolf_raft, raft_srv::wolf_raft::raft_client::RaftClient,
};
use crate::system::net::grpc;
use anyhow::{bail, Result};
use async_raft::{
    raft::{self, InstallSnapshotRequest, InstallSnapshotResponse, VoteRequest, VoteResponse},
    Config, NodeId, RaftNetwork,
};
use async_trait::async_trait;
use memstore::{ClientRequest, ClientResponse, MemStore};
use std::sync::Arc;
use uuid::Uuid;

pub type MemRaft = async_raft::Raft<ClientRequest, ClientResponse, RaftRouter, MemStore>;
const BASE_PORT: u64 = 7777;

#[derive(Debug)]
pub struct RaftRouter {}

impl Default for RaftRouter {
    fn default() -> Self {
        Self::new()
    }
}

impl RaftRouter {
    /// Create a new `RaftRouter` instance.
    #[must_use]
    pub const fn new() -> Self {
        Self {}
    }
}

#[async_trait]
impl RaftNetwork<ClientRequest> for RaftRouter {
    /// Send an `AppendEntries` RPC to the target Raft node
    async fn append_entries(
        &self,
        p_target_node: NodeId,
        p_rpc: raft::AppendEntriesRequest<ClientRequest>,
    ) -> Result<raft::AppendEntriesResponse> {
        const TRACE: &str = "raft_imp:append_entries";

        let uuid = Uuid::new_v5(&Uuid::NAMESPACE_X500, b"wolf_raft_append_entries");
        let res_append = raft_converter::raft_append_entries_req_to_grpc_append_entries_req(
            uuid.to_string(),
            &p_rpc,
        );
        let ret = match res_append {
            Ok(raft_req) => {
                //create a channel for grpc
                let uri = format!("http://localhost:{}", BASE_PORT + p_target_node);
                let ret_1 = match grpc::create_channel(uri).await {
                    Ok(c) => {
                        //call request with channel
                        let mut client = RaftClient::new(c).send_gzip().accept_gzip();
                        let ret_2 = match client.append_entries(raft_req).await {
                            Ok(r) => {
                                let ret_3 = match r.into_inner().response {
                                    Some(s) => {
                                        use wolf_raft::raft_append_entries_res::Response;
                                        if let Response::OkRes(ok) = s {
                                            //create AppendEntriesResponse from RaftAppendEntriesOkRes
                                            let ret = raft_converter::grpc_append_entries_ok_res_to_raft_append_entries_res(&ok);
                                            Ok(ret)
                                        } else if let Response::ErrorRes(e) = s {
                                            bail!(
                                                "AppendEntriesResponse for node {} contains Error {:?}. Trace: {}",
                                                p_target_node,
                                                e,
                                                TRACE
                                            )
                                        } else {
                                            bail!(
                                                "AppendEntriesResponse for node {} contains Unknown error. Trace: {}",
                                                p_target_node,
                                                TRACE
                                            )
                                        }
                                    }
                                    None => {
                                        bail!(
                                            "inner message of AppendEntriesResponse is None for node {}. Trace: {}",
                                            p_target_node,
                                            TRACE
                                        )
                                    }
                                };
                                ret_3
                            }
                            Err(e) => {
                                bail!(
                                    "AppendEntriesResponse for node {} contains error status {:?}. Trace: {}",
                                    p_target_node,
                                    e,
                                    TRACE
                                )
                            }
                        };
                        ret_2
                    }
                    Err(e) => {
                        bail!(
                            "could not create a grpc channel while sending raft::AppendEntriesResponse for node {} because: {:?}. Trace: {}",
                            p_target_node,
                            e,
                            TRACE
                        )
                    }
                };
                ret_1
            }
            Err(e) => {
                bail!("{:?}. trace: {}", e, TRACE)
            }
        };
        ret
    }

    /// Send an `InstallSnapshot` RPC to the target Raft node
    async fn install_snapshot(
        &self,
        p_target_node: u64,
        p_rpc: InstallSnapshotRequest,
    ) -> Result<InstallSnapshotResponse> {
        const TRACE: &str = "raft_imp::install_snapshot";

        //create a channel for grpc
        let uri = format!("http://localhost:{}", BASE_PORT + p_target_node);
        let ret = match crate::system::net::grpc::create_channel(uri).await {
            Ok(c) => {
                //call request with channel
                let uuid = Uuid::new_v5(&Uuid::NAMESPACE_X500, b"wolf_raft_install_snapshot");
                let msg_id = uuid.to_string();
                let rpc_req =
                    raft_converter::raft_install_snapshot_req_to_grpc_install_snapshot_req(
                        msg_id, &p_rpc,
                    );

                let mut client = RaftClient::new(c).send_gzip().accept_gzip();
                let ret_1 = match client.install_snapshot(rpc_req).await {
                    Ok(r) => {
                        let ret_2 = match r.into_inner().response {
                            Some(s) => {
                                use wolf_raft::raft_install_snapshot_res::Response;
                                if let Response::OkRes(ok) = s {
                                    let response = raft_converter::grpc_install_snapshot_ok_res_to_raft_install_snapshot_res(&ok);
                                    Ok(response)
                                } else if let Response::ErrorRes(e) = s {
                                    bail!(
                                        "InstallSnapshotResponse for node {} contains Error {:?}. Trace: {}",
                                        p_target_node,
                                        e,
                                        TRACE
                                    )
                                } else {
                                    bail!(
                                        "InstallSnapshotResponse for node {} contains Unknown error. Trace: {}",
                                        p_target_node,
                                        TRACE
                                    )
                                }
                            }
                            None => {
                                bail!(
                                    "inner message of InstallSnapshotResponse is None for node {}. Trace: {}",
                                    p_target_node,
                                    TRACE
                                )
                            }
                        };
                        ret_2
                    }
                    Err(e) => {
                        bail!(
                            "InstallSnapshotResponse for node {} contains error status {:?}. Trace: {}",
                            p_target_node,
                            e,
                            TRACE
                        )
                    }
                };
                ret_1
            }
            Err(e) => {
                bail!(
                    "could not create grpc channel on sending raft::InstallSnapshotResponse for node {} because: {:?}. Trace: {}",
                    p_target_node,
                    e,
                    TRACE
                )
            }
        };
        ret
    }

    /// Send an `Vote` RPC to the target Raft node
    async fn vote(&self, p_target_node: u64, p_rpc: VoteRequest) -> Result<VoteResponse> {
        const TRACE: &str = "raft_imp::vote";

        //create a channel for grpc
        let uri = format!("http://localhost:{}", BASE_PORT + p_target_node);
        let ret = match crate::system::net::grpc::create_channel(uri).await {
            Ok(c) => {
                //call request with channel
                let uuid = Uuid::new_v5(&Uuid::NAMESPACE_X500, b"wolf_raft_vote");
                let msg_id = uuid.to_string();
                let rpc_req = raft_converter::raft_vote_req_to_grpc_vote_req(msg_id, &p_rpc);
                let mut client = RaftClient::new(c).send_gzip().accept_gzip();
                let ret_1 = match client.vote(rpc_req).await {
                    Ok(r) => {
                        let ret_2 = match r.into_inner().response {
                            Some(s) => {
                                use wolf_raft::raft_vote_res::Response;
                                if let Response::OkRes(ok) = s {
                                    let res_vote =
                                        raft_converter::grpc_vote_ok_res_to_raft_vote_res(&ok);
                                    Ok(res_vote)
                                } else if let Response::ErrorRes(e) = s {
                                    bail!(
                                        "VoteResponse for node {} contains Error {:?}. Trace: {}",
                                        p_target_node,
                                        e,
                                        TRACE
                                    )
                                } else {
                                    bail!(
                                        "VoteResponse for node {} contains Unknown error. Trace: {}",
                                        p_target_node,
                                        TRACE
                                    )
                                }
                            }
                            None => {
                                bail!(
                                    "inner message of VoteResponse is None for node {}. Trace: {}",
                                    p_target_node,
                                    TRACE
                                )
                            }
                        };
                        ret_2
                    }
                    Err(e) => {
                        bail!(
                            "VoteResponse for node {} contains error status {:?}. Trace: {}",
                            p_target_node,
                            e,
                            TRACE
                        )
                    }
                };
                ret_1
            }
            Err(e) => {
                bail!(
                    "could not create grpc channel on sending raft::VoteResponse for node {} because: {:?}. Trace: {}",
                    p_target_node,
                    e,
                    TRACE)
            }
        };
        ret
    }
}

/// create a `MemRaft` node with specific node id and cluster information
/// # Panics
///
/// Will panic if config is not valid
#[must_use]
pub fn new(p_cluster_name: &str, p_node_id: u64) -> MemRaft {
    let config = Config::build(p_cluster_name.into())
        .validate()
        .unwrap_or_else(|e| {
            panic!(
                "failed to build Raft config for cluster:{} and node id:{}. error: {}",
                p_cluster_name, p_node_id, e
            )
        });

    //now create MemRaft node
    let arc_config = Arc::new(config);
    let network = Arc::new(RaftRouter::new());
    let storage = Arc::new(MemStore::new(p_node_id));
    raft::Raft::new(p_node_id, arc_config, network, storage)
}