1use std::sync::Arc;
2
3use actix::prelude::*;
4
5use crate::grpc::handler::RAFT_ROUTE_REQUEST;
6use crate::{
7 grpc::PayloadUtils,
8 raft::{
9 cluster::{
10 model::{RouteAddr, RouterRequest, RouterResponse},
11 route::RaftAddrRouter,
12 },
13 network::factory::RaftClusterRequestSender,
14 },
15};
16
17use super::table::{
18 TableManager, TableManagerAsyncReq, TableManagerQueryReq, TableManagerReq, TableManagerResult,
19};
20
21pub struct TableRoute {
22 table_manager: Addr<TableManager>,
23 raft_addr_route: Arc<RaftAddrRouter>,
24 cluster_sender: Arc<RaftClusterRequestSender>,
25}
26
27impl TableRoute {
28 pub fn new(
29 table_manager: Addr<TableManager>,
30 raft_addr_route: Arc<RaftAddrRouter>,
31 cluster_sender: Arc<RaftClusterRequestSender>,
32 ) -> Self {
33 Self {
34 table_manager,
35 raft_addr_route,
36 cluster_sender,
37 }
38 }
39
40 fn unknown_err(&self) -> anyhow::Error {
41 anyhow::anyhow!("unknown the raft leader addr!")
42 }
43
44 pub async fn request(&self, req: TableManagerReq) -> anyhow::Result<()> {
45 match self.raft_addr_route.get_route_addr().await? {
46 RouteAddr::Local => {
47 self.table_manager
48 .send(TableManagerAsyncReq(req))
49 .await?
50 .ok();
51 }
52 RouteAddr::Remote(_, addr) => {
53 let req: RouterRequest = req.into();
54 let request = serde_json::to_string(&req).unwrap_or_default();
55 let payload = PayloadUtils::build_payload(RAFT_ROUTE_REQUEST, request);
56 let _resp_payload = self.cluster_sender.send_request(addr, payload).await?;
57 }
58 RouteAddr::Unknown => {
59 return Err(self.unknown_err());
60 }
61 };
62 Ok(())
63 }
64
65 pub async fn get_leader_data(
66 &self,
67 req: TableManagerQueryReq,
68 ) -> anyhow::Result<TableManagerResult> {
69 match self.raft_addr_route.get_route_addr().await? {
70 RouteAddr::Local => self.table_manager.send(req).await?,
71 RouteAddr::Remote(_, addr) => {
72 let req: RouterRequest = req.into();
73 let request = serde_json::to_string(&req).unwrap_or_default();
74 let payload = PayloadUtils::build_payload(RAFT_ROUTE_REQUEST, request);
75 let resp_payload = self.cluster_sender.send_request(addr, payload).await?;
76 let body_vec = resp_payload.body.unwrap_or_default().value;
77 let resp: RouterResponse = serde_json::from_slice(&body_vec)?;
78 match resp {
79 RouterResponse::TableManagerResult { result } => Ok(result),
80 _ => Err(anyhow::anyhow!(
81 "TableManagerQueryReq response type is error!"
82 )),
83 }
84 }
85 RouteAddr::Unknown => Err(self.unknown_err()),
86 }
87 }
88}