flare_dht/rpc_server/
control_api.rs

1use crate::metadata::MetadataManager;
2use flare_pb::flare_control_server::FlareControl;
3use flare_pb::{
4    ClusterMetadata, ClusterMetadataRequest, ClusterTopologyInfo,
5    ClusterTopologyRequest, JoinRequest, JoinResponse, LeaveRequest,
6    LeaveResponse,
7};
8use std::sync::Arc;
9use tonic::{Request, Response, Status};
10use tracing::info;
11
12pub struct FlareControlService {
13    metadata_manager: Arc<dyn MetadataManager>,
14}
15
16impl FlareControlService {
17    pub fn new(metadata_manager: Arc<dyn MetadataManager>) -> Self {
18        Self { metadata_manager }
19    }
20}
21
22#[tonic::async_trait]
23impl FlareControl for FlareControlService {
24    async fn join(
25        &self,
26        request: Request<JoinRequest>,
27    ) -> Result<Response<JoinResponse>, Status> {
28        let join_request = request.into_inner();
29        info!("receive join request {}", &join_request.addr);
30        self.metadata_manager
31            .other_join(join_request)
32            .await
33            .map(|r| Response::new(r))
34            .map_err(|e| e.into())
35    }
36
37    async fn leave(
38        &self,
39        request: Request<LeaveRequest>,
40    ) -> Result<Response<LeaveResponse>, Status> {
41        let leave_req = request.into_inner();
42        info!("receive leave request {}", &leave_req.node_id);
43        self.metadata_manager.other_leave(leave_req.node_id).await?;
44        Ok(Response::new(LeaveResponse::default()))
45    }
46
47    async fn get_topology(
48        &self,
49        _request: Request<ClusterTopologyRequest>,
50    ) -> Result<Response<ClusterTopologyInfo>, Status> {
51        todo!()
52    }
53
54    async fn get_metadata(
55        &self,
56        _req: Request<ClusterMetadataRequest>,
57    ) -> Result<Response<ClusterMetadata>, Status> {
58        let meta = __self.metadata_manager.get_metadata().await?;
59        Ok(Response::new(meta))
60    }
61}