1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::sync::Arc;

use actix::prelude::*;

use crate::grpc::handler::RAFT_ROUTE_REQUEST;
use crate::{
    grpc::PayloadUtils,
    raft::{
        cluster::{
            model::{RouteAddr, RouterRequest, RouterResponse},
            route::RaftAddrRouter,
        },
        network::factory::RaftClusterRequestSender,
    },
};

use super::table::{
    TableManager, TableManagerAsyncReq, TableManagerQueryReq, TableManagerReq, TableManagerResult,
};

pub struct TableRoute {
    table_manager: Addr<TableManager>,
    raft_addr_route: Arc<RaftAddrRouter>,
    cluster_sender: Arc<RaftClusterRequestSender>,
}

impl TableRoute {
    pub fn new(
        table_manager: Addr<TableManager>,
        raft_addr_route: Arc<RaftAddrRouter>,
        cluster_sender: Arc<RaftClusterRequestSender>,
    ) -> Self {
        Self {
            table_manager,
            raft_addr_route,
            cluster_sender,
        }
    }

    fn unknown_err(&self) -> anyhow::Error {
        anyhow::anyhow!("unknown the raft leader addr!")
    }

    pub async fn request(&self, req: TableManagerReq) -> anyhow::Result<()> {
        match self.raft_addr_route.get_route_addr().await? {
            RouteAddr::Local => {
                self.table_manager
                    .send(TableManagerAsyncReq(req))
                    .await?
                    .ok();
            }
            RouteAddr::Remote(_, addr) => {
                let req: RouterRequest = req.into();
                let request = serde_json::to_string(&req).unwrap_or_default();
                let payload = PayloadUtils::build_payload(RAFT_ROUTE_REQUEST, request);
                let _resp_payload = self.cluster_sender.send_request(addr, payload).await?;
            }
            RouteAddr::Unknown => {
                return Err(self.unknown_err());
            }
        };
        Ok(())
    }

    pub async fn get_leader_data(
        &self,
        req: TableManagerQueryReq,
    ) -> anyhow::Result<TableManagerResult> {
        match self.raft_addr_route.get_route_addr().await? {
            RouteAddr::Local => self.table_manager.send(req).await?,
            RouteAddr::Remote(_, addr) => {
                let req: RouterRequest = req.into();
                let request = serde_json::to_string(&req).unwrap_or_default();
                let payload = PayloadUtils::build_payload(RAFT_ROUTE_REQUEST, request);
                let resp_payload = self.cluster_sender.send_request(addr, payload).await?;
                let body_vec = resp_payload.body.unwrap_or_default().value;
                let resp: RouterResponse = serde_json::from_slice(&body_vec)?;
                match resp {
                    RouterResponse::TableManagerResult { result } => Ok(result),
                    _ => Err(anyhow::anyhow!(
                        "TableManagerQueryReq response type is error!"
                    )),
                }
            }
            RouteAddr::Unknown => Err(self.unknown_err()),
        }
    }
}