openstack_keystone_distributed_storage/grpc/
cluster_admin_service.rs1use std::collections::BTreeMap;
15
16use openraft::async_runtime::WatchReceiver;
17use tonic::Request;
18use tonic::Response;
19use tonic::Status;
20use tracing::trace;
21
22use crate::pb;
23use crate::protobuf::raft::cluster_admin_service_server::ClusterAdminService;
24use crate::types::*;
25
26pub struct ClusterAdminServiceImpl {
35 raft_node: Raft,
37}
38
39impl ClusterAdminServiceImpl {
40 pub fn new(raft_node: Raft) -> Self {
45 ClusterAdminServiceImpl { raft_node }
46 }
47}
48
49#[tonic::async_trait]
50impl ClusterAdminService for ClusterAdminServiceImpl {
51 #[tracing::instrument(level = "trace", skip(self))]
60 async fn init(&self, request: Request<pb::raft::InitRequest>) -> Result<Response<()>, Status> {
61 trace!("Initializing Raft cluster");
62 let req = request.into_inner();
63
64 let nodes_map: BTreeMap<u64, pb::raft::Node> = req
66 .nodes
67 .into_iter()
68 .map(|node| (node.node_id, node))
69 .collect();
70
71 let result = self
73 .raft_node
74 .initialize(nodes_map)
75 .await
76 .map_err(|e| Status::internal(format!("Failed to initialize cluster: {}", e)))?;
77
78 trace!("Cluster initialization successful");
79 Ok(Response::new(result))
80 }
81
82 #[tracing::instrument(level = "trace", skip(self))]
91 async fn add_learner(
92 &self,
93 request: Request<pb::raft::AddLearnerRequest>,
94 ) -> Result<Response<pb::raft::AdminResponse>, Status> {
95 let req = request.into_inner();
96
97 let node = req
98 .node
99 .ok_or_else(|| Status::internal("Node information is required"))?;
100
101 trace!("Adding learner node {}", node.node_id);
102
103 let raft_node = Node {
104 rpc_addr: node.rpc_addr.clone(),
105 node_id: node.node_id,
106 };
107
108 let result = self
109 .raft_node
110 .add_learner(node.node_id, raft_node, true)
111 .await
112 .map_err(|e| Status::internal(format!("Failed to add learner node: {}", e)))?;
113
114 trace!("Successfully added learner node {}", node.node_id);
115 Ok(Response::new(result.into()))
116 }
117
118 #[tracing::instrument(level = "trace", skip(self))]
127 async fn change_membership(
128 &self,
129 request: Request<pb::raft::ChangeMembershipRequest>,
130 ) -> Result<Response<pb::raft::AdminResponse>, Status> {
131 let req = request.into_inner();
132
133 trace!(
134 "Changing membership. Members: {:?}, Retain: {}",
135 req.members, req.retain
136 );
137
138 let result = self
139 .raft_node
140 .change_membership(req.members, req.retain)
141 .await
142 .map_err(|e| Status::internal(format!("Failed to change membership: {}", e)))?;
143
144 trace!("Successfully changed cluster membership");
145 Ok(Response::new(result.into()))
146 }
147
148 #[tracing::instrument(level = "trace", skip(self))]
150 async fn metrics(
151 &self,
152 _request: Request<()>,
153 ) -> Result<Response<pb::raft::MetricsResponse>, Status> {
154 trace!("Collecting metrics");
155 let metrics = self.raft_node.metrics().borrow_watched().clone();
156 let resp = pb::raft::MetricsResponse {
157 membership: Some(metrics.membership_config.membership().clone().into()),
158 other_metrics: metrics.to_string(),
159 };
160 Ok(Response::new(resp))
161 }
162}