flare_dht/rpc_server/
control_api.rs1use crate::metadata::MetadataManager;
2use flare_pb::flare_control_server::FlareControl;
3use flare_pb::{
4 ClusterMetadata, ClusterMetadataRequest, ClusterTopologyInfo,
5 ClusterTopologyRequest, JoinRequest, JoinResponse, LeaveRequest,
6 LeaveResponse,
7};
8use std::sync::Arc;
9use tonic::{Request, Response, Status};
10use tracing::info;
11
12pub struct FlareControlService {
13 metadata_manager: Arc<dyn MetadataManager>,
14}
15
16impl FlareControlService {
17 pub fn new(metadata_manager: Arc<dyn MetadataManager>) -> Self {
18 Self { metadata_manager }
19 }
20}
21
22#[tonic::async_trait]
23impl FlareControl for FlareControlService {
24 async fn join(
25 &self,
26 request: Request<JoinRequest>,
27 ) -> Result<Response<JoinResponse>, Status> {
28 let join_request = request.into_inner();
29 info!("receive join request {}", &join_request.addr);
30 self.metadata_manager
31 .other_join(join_request)
32 .await
33 .map(|r| Response::new(r))
34 .map_err(|e| e.into())
35 }
36
37 async fn leave(
38 &self,
39 request: Request<LeaveRequest>,
40 ) -> Result<Response<LeaveResponse>, Status> {
41 let leave_req = request.into_inner();
42 info!("receive leave request {}", &leave_req.node_id);
43 self.metadata_manager.other_leave(leave_req.node_id).await?;
44 Ok(Response::new(LeaveResponse::default()))
45 }
46
47 async fn get_topology(
48 &self,
49 _request: Request<ClusterTopologyRequest>,
50 ) -> Result<Response<ClusterTopologyInfo>, Status> {
51 todo!()
52 }
53
54 async fn get_metadata(
55 &self,
56 _req: Request<ClusterMetadataRequest>,
57 ) -> Result<Response<ClusterMetadata>, Status> {
58 let meta = __self.metadata_manager.get_metadata().await?;
59 Ok(Response::new(meta))
60 }
61}