flare_dht/rpc_server/
control_api.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use crate::metadata::MetadataManager;
use flare_pb::flare_control_server::FlareControl;
use flare_pb::{
    ClusterMetadata, ClusterMetadataRequest, ClusterTopologyInfo,
    ClusterTopologyRequest, JoinRequest, JoinResponse, LeaveRequest,
    LeaveResponse,
};
use std::sync::Arc;
use tonic::{Request, Response, Status};
use tracing::info;

pub struct FlareControlService {
    metadata_manager: Arc<dyn MetadataManager>,
}

impl FlareControlService {
    pub fn new(metadata_manager: Arc<dyn MetadataManager>) -> Self {
        Self { metadata_manager }
    }
}

#[tonic::async_trait]
impl FlareControl for FlareControlService {
    async fn join(
        &self,
        request: Request<JoinRequest>,
    ) -> Result<Response<JoinResponse>, Status> {
        let join_request = request.into_inner();
        info!("receive join request {}", &join_request.addr);
        self.metadata_manager
            .other_join(join_request)
            .await
            .map(|r| Response::new(r))
            .map_err(|e| e.into())
    }

    async fn leave(
        &self,
        request: Request<LeaveRequest>,
    ) -> Result<Response<LeaveResponse>, Status> {
        let leave_req = request.into_inner();
        info!("receive leave request {}", &leave_req.node_id);
        self.metadata_manager.other_leave(leave_req.node_id).await?;
        Ok(Response::new(LeaveResponse::default()))
    }

    async fn get_topology(
        &self,
        _request: Request<ClusterTopologyRequest>,
    ) -> Result<Response<ClusterTopologyInfo>, Status> {
        todo!()
    }

    async fn get_metadata(
        &self,
        _req: Request<ClusterMetadataRequest>,
    ) -> Result<Response<ClusterMetadata>, Status> {
        let meta = __self.metadata_manager.get_metadata().await?;
        Ok(Response::new(meta))
    }
}