Skip to main content

rnacos/raft/db/
route.rs

1use std::sync::Arc;
2
3use actix::prelude::*;
4
5use crate::grpc::handler::RAFT_ROUTE_REQUEST;
6use crate::{
7    grpc::PayloadUtils,
8    raft::{
9        cluster::{
10            model::{RouteAddr, RouterRequest, RouterResponse},
11            route::RaftAddrRouter,
12        },
13        network::factory::RaftClusterRequestSender,
14    },
15};
16
17use super::table::{
18    TableManager, TableManagerAsyncReq, TableManagerQueryReq, TableManagerReq, TableManagerResult,
19};
20
21pub struct TableRoute {
22    table_manager: Addr<TableManager>,
23    raft_addr_route: Arc<RaftAddrRouter>,
24    cluster_sender: Arc<RaftClusterRequestSender>,
25}
26
27impl TableRoute {
28    pub fn new(
29        table_manager: Addr<TableManager>,
30        raft_addr_route: Arc<RaftAddrRouter>,
31        cluster_sender: Arc<RaftClusterRequestSender>,
32    ) -> Self {
33        Self {
34            table_manager,
35            raft_addr_route,
36            cluster_sender,
37        }
38    }
39
40    fn unknown_err(&self) -> anyhow::Error {
41        anyhow::anyhow!("unknown the raft leader addr!")
42    }
43
44    pub async fn request(&self, req: TableManagerReq) -> anyhow::Result<()> {
45        match self.raft_addr_route.get_route_addr().await? {
46            RouteAddr::Local => {
47                self.table_manager
48                    .send(TableManagerAsyncReq(req))
49                    .await?
50                    .ok();
51            }
52            RouteAddr::Remote(_, addr) => {
53                let req: RouterRequest = req.into();
54                let request = serde_json::to_string(&req).unwrap_or_default();
55                let payload = PayloadUtils::build_payload(RAFT_ROUTE_REQUEST, request);
56                let _resp_payload = self.cluster_sender.send_request(addr, payload).await?;
57            }
58            RouteAddr::Unknown => {
59                return Err(self.unknown_err());
60            }
61        };
62        Ok(())
63    }
64
65    pub async fn get_leader_data(
66        &self,
67        req: TableManagerQueryReq,
68    ) -> anyhow::Result<TableManagerResult> {
69        match self.raft_addr_route.get_route_addr().await? {
70            RouteAddr::Local => self.table_manager.send(req).await?,
71            RouteAddr::Remote(_, addr) => {
72                let req: RouterRequest = req.into();
73                let request = serde_json::to_string(&req).unwrap_or_default();
74                let payload = PayloadUtils::build_payload(RAFT_ROUTE_REQUEST, request);
75                let resp_payload = self.cluster_sender.send_request(addr, payload).await?;
76                let body_vec = resp_payload.body.unwrap_or_default().value;
77                let resp: RouterResponse = serde_json::from_slice(&body_vec)?;
78                match resp {
79                    RouterResponse::TableManagerResult { result } => Ok(result),
80                    _ => Err(anyhow::anyhow!(
81                        "TableManagerQueryReq response type is error!"
82                    )),
83                }
84            }
85            RouteAddr::Unknown => Err(self.unknown_err()),
86        }
87    }
88}