Skip to main content

openstack_keystone_distributed_storage/grpc/
cluster_admin_service.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12//
13// SPDX-License-Identifier: Apache-2.0
14use std::collections::BTreeMap;
15
16use openraft::async_runtime::WatchReceiver;
17use tonic::Request;
18use tonic::Response;
19use tonic::Status;
20use tracing::trace;
21
22use crate::pb;
23use crate::protobuf::raft::cluster_admin_service_server::ClusterAdminService;
24use crate::types::*;
25
26/// Raft cluster administrative operations.
27///
28/// # Responsibilities
29/// - Manages the Raft cluster
30///
31/// # Protocol Safety
32/// This service implements the client-facing API and should validate all inputs
33/// before processing them through the Raft consensus protocol.
34pub struct ClusterAdminServiceImpl {
35    /// The Raft node instance for consensus operations
36    raft_node: Raft,
37}
38
39impl ClusterAdminServiceImpl {
40    /// Creates a new instance of the API service
41    ///
42    /// # Arguments
43    /// * `raft_node` - The Raft node instance this service will use
44    pub fn new(raft_node: Raft) -> Self {
45        ClusterAdminServiceImpl { raft_node }
46    }
47}
48
49#[tonic::async_trait]
50impl ClusterAdminService for ClusterAdminServiceImpl {
51    /// Initializes a new Raft cluster with the specified nodes
52    ///
53    /// # Arguments
54    /// * `request` - Contains the initial set of nodes for the cluster
55    ///
56    /// # Returns
57    /// * Success response with initialization details
58    /// * Error if initialization fails
59    #[tracing::instrument(level = "trace", skip(self))]
60    async fn init(&self, request: Request<pb::raft::InitRequest>) -> Result<Response<()>, Status> {
61        trace!("Initializing Raft cluster");
62        let req = request.into_inner();
63
64        // Convert nodes into required format
65        let nodes_map: BTreeMap<u64, pb::raft::Node> = req
66            .nodes
67            .into_iter()
68            .map(|node| (node.node_id, node))
69            .collect();
70
71        // Initialize the cluster
72        let result = self
73            .raft_node
74            .initialize(nodes_map)
75            .await
76            .map_err(|e| Status::internal(format!("Failed to initialize cluster: {}", e)))?;
77
78        trace!("Cluster initialization successful");
79        Ok(Response::new(result))
80    }
81
82    /// Adds a learner node to the Raft cluster
83    ///
84    /// # Arguments
85    /// * `request` - Contains the node information and blocking preference
86    ///
87    /// # Returns
88    /// * Success response with learner addition details
89    /// * Error if the operation fails
90    #[tracing::instrument(level = "trace", skip(self))]
91    async fn add_learner(
92        &self,
93        request: Request<pb::raft::AddLearnerRequest>,
94    ) -> Result<Response<pb::raft::AdminResponse>, Status> {
95        let req = request.into_inner();
96
97        let node = req
98            .node
99            .ok_or_else(|| Status::internal("Node information is required"))?;
100
101        trace!("Adding learner node {}", node.node_id);
102
103        let raft_node = Node {
104            rpc_addr: node.rpc_addr.clone(),
105            node_id: node.node_id,
106        };
107
108        let result = self
109            .raft_node
110            .add_learner(node.node_id, raft_node, true)
111            .await
112            .map_err(|e| Status::internal(format!("Failed to add learner node: {}", e)))?;
113
114        trace!("Successfully added learner node {}", node.node_id);
115        Ok(Response::new(result.into()))
116    }
117
118    /// Changes the membership of the Raft cluster
119    ///
120    /// # Arguments
121    /// * `request` - Contains the new member set and retention policy
122    ///
123    /// # Returns
124    /// * Success response with membership change details
125    /// * Error if the operation fails
126    #[tracing::instrument(level = "trace", skip(self))]
127    async fn change_membership(
128        &self,
129        request: Request<pb::raft::ChangeMembershipRequest>,
130    ) -> Result<Response<pb::raft::AdminResponse>, Status> {
131        let req = request.into_inner();
132
133        trace!(
134            "Changing membership. Members: {:?}, Retain: {}",
135            req.members, req.retain
136        );
137
138        let result = self
139            .raft_node
140            .change_membership(req.members, req.retain)
141            .await
142            .map_err(|e| Status::internal(format!("Failed to change membership: {}", e)))?;
143
144        trace!("Successfully changed cluster membership");
145        Ok(Response::new(result.into()))
146    }
147
148    /// Retrieves metrics about the Raft node
149    #[tracing::instrument(level = "trace", skip(self))]
150    async fn metrics(
151        &self,
152        _request: Request<()>,
153    ) -> Result<Response<pb::raft::MetricsResponse>, Status> {
154        trace!("Collecting metrics");
155        let metrics = self.raft_node.metrics().borrow_watched().clone();
156        let resp = pb::raft::MetricsResponse {
157            membership: Some(metrics.membership_config.membership().clone().into()),
158            other_metrics: metrics.to_string(),
159        };
160        Ok(Response::new(resp))
161    }
162}