Skip to main content

openstack_keystone_distributed_storage/grpc/
raft_service.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12//
13// SPDX-License-Identifier: Apache-2.0
14use std::pin::Pin;
15
16use futures::Stream;
17use futures::StreamExt;
18use openraft::Snapshot;
19use tonic::Request;
20use tonic::Response;
21use tonic::Status;
22use tonic::Streaming;
23use tracing::trace;
24
25use crate::protobuf as pb;
26use crate::protobuf::raft::VoteRequest;
27use crate::protobuf::raft::VoteResponse;
28use crate::protobuf::raft::raft_service_server::RaftService;
29use crate::types::*;
30
31/// Internal gRPC service implementation for Raft protocol communications.
32/// This service handles the core Raft consensus protocol operations between
33/// cluster nodes.
34///
35/// # Responsibilities
36/// - Vote requests/responses during leader election
37/// - Log replication between nodes
38/// - Snapshot installation for state synchronization
39///
40/// # Protocol Safety
41/// This service implements critical consensus protocol operations and should
42/// only be exposed to other trusted Raft cluster nodes, never to external
43/// clients.
44pub struct RaftServiceImpl {
45    /// The local Raft node instance that this service operates on
46    raft_node: Raft,
47}
48
49impl RaftServiceImpl {
50    /// Creates a new instance of the internal service
51    ///
52    /// # Arguments
53    /// * `raft_node` - The Raft node instance this service will operate on
54    pub fn new(raft_node: Raft) -> Self {
55        RaftServiceImpl { raft_node }
56    }
57}
58
59#[tonic::async_trait]
60impl RaftService for RaftServiceImpl {
61    /// Handles vote requests during leader election.
62    ///
63    /// # Arguments
64    /// * `request` - The vote request containing candidate information
65    ///
66    /// # Returns
67    /// * `Ok(Response)` - Vote response indicating whether the vote was granted
68    /// * `Err(Status)` - Error status if the vote operation fails
69    ///
70    /// # Protocol Details
71    /// This implements the RequestVote RPC from the Raft protocol.
72    /// Nodes vote for candidates based on log completeness and term numbers.
73    #[tracing::instrument(level = "trace", skip(self))]
74    async fn vote(&self, request: Request<VoteRequest>) -> Result<Response<VoteResponse>, Status> {
75        let vote_resp = self
76            .raft_node
77            .vote(
78                request
79                    .into_inner()
80                    .try_into()
81                    .map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?,
82            )
83            .await
84            .map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?;
85
86        trace!("Vote request processed successfully");
87        Ok(Response::new(vote_resp.into()))
88    }
89
90    /// Handles append entries requests for log replication.
91    ///
92    /// # Arguments
93    /// * `request` - The append entries request containing log entries to
94    ///   replicate
95    ///
96    /// # Returns
97    /// * `Ok(Response)` - Response indicating success/failure of the append
98    ///   operation
99    /// * `Err(Status)` - Error status if the append operation fails
100    ///
101    /// # Protocol Details
102    /// This implements the AppendEntries RPC from the Raft protocol.
103    /// Used for both log replication and as heartbeat mechanism.
104    #[tracing::instrument(level = "trace", skip(self))]
105    async fn append_entries(
106        &self,
107        request: Request<pb::raft::AppendEntriesRequest>,
108    ) -> Result<Response<pb::raft::AppendEntriesResponse>, Status> {
109        let append_resp =
110            self.raft_node
111                .append_entries(request.into_inner().try_into().map_err(|e| {
112                    Status::internal(format!("Append entries operation failed: {}", e))
113                })?)
114                .await
115                .map_err(|e| Status::internal(format!("Append entries operation failed: {}", e)))?;
116
117        trace!("Append entries request processed successfully");
118        Ok(Response::new(append_resp.into()))
119    }
120
121    /// Handles snapshot installation requests for state transfer using
122    /// streaming.
123    ///
124    /// # Arguments
125    /// * `request` - Stream of snapshot chunks with metadata
126    ///
127    /// # Returns
128    /// * `Ok(Response)` - Response indicating success/failure of snapshot
129    ///   installation
130    /// * `Err(Status)` - Error status if the snapshot operation fails
131    #[tracing::instrument(level = "trace", skip(self))]
132    async fn snapshot(
133        &self,
134        request: Request<Streaming<pb::raft::SnapshotRequest>>,
135    ) -> Result<Response<pb::raft::SnapshotResponse>, Status> {
136        let mut stream = request.into_inner();
137
138        // Get the first chunk which contains metadata
139        let first_chunk = stream
140            .next()
141            .await
142            .ok_or_else(|| Status::invalid_argument("Empty snapshot stream"))??;
143
144        let vote;
145        let snapshot_meta;
146        {
147            let meta = first_chunk
148                .into_meta()
149                .ok_or_else(|| Status::invalid_argument("First snapshot chunk must be metadata"))?;
150
151            trace!("Received snapshot metadata chunk: {:?}", meta);
152
153            vote = meta
154                .vote
155                .ok_or_else(|| Status::invalid_argument("Missing `Vote`"))?;
156
157            snapshot_meta = SnapshotMeta {
158                last_log_id: meta.last_log_id.map(|log_id| log_id.into()),
159                last_membership: StoredMembership::new(
160                    meta.last_membership_log_id.map(|x| x.into()),
161                    meta.last_membership
162                        .ok_or_else(|| Status::invalid_argument("Membership information missing"))?
163                        .try_into()
164                        .map_err(|e| {
165                            Status::invalid_argument(format!("invalid membership: {:?}", e))
166                        })?,
167                ),
168                snapshot_id: meta.snapshot_id,
169            };
170        }
171
172        // Collect snapshot data
173        let mut snapshot_data_bytes = Vec::new();
174
175        while let Some(chunk) = stream.next().await {
176            let data = chunk?
177                .into_data_chunk()
178                .ok_or_else(|| Status::invalid_argument("Snapshot chunk must be data"))?;
179            snapshot_data_bytes.extend_from_slice(&data);
180        }
181
182        let snapshot = Snapshot {
183            meta: snapshot_meta,
184            snapshot: snapshot_data_bytes,
185        };
186
187        // Install the full snapshot
188        let snapshot_resp = self
189            .raft_node
190            .install_full_snapshot(vote, snapshot)
191            .await
192            .map_err(|e| Status::internal(format!("Snapshot installation failed: {}", e)))?;
193
194        trace!("Streaming snapshot installation request processed successfully");
195        Ok(Response::new(pb::raft::SnapshotResponse {
196            vote: Some(snapshot_resp.vote),
197        }))
198    }
199
200    type StreamAppendStream =
201        Pin<Box<dyn Stream<Item = Result<pb::raft::AppendEntriesResponse, Status>> + Send>>;
202
203    /// Handles streaming append entries requests for pipeline replication.
204    ///
205    /// This enables efficient pipelining of log replication where multiple
206    /// AppendEntries requests can be in-flight simultaneously.
207    #[tracing::instrument(level = "trace", skip(self))]
208    async fn stream_append(
209        &self,
210        request: Request<Streaming<pb::raft::AppendEntriesRequest>>,
211    ) -> Result<Response<Self::StreamAppendStream>, Status> {
212        let input = request.into_inner();
213
214        // Convert pb stream to openraft AppendEntriesRequest stream
215        let input_stream = input.filter_map(|r| async move {
216            r.ok().map(TryInto::try_into).transpose().unwrap_or(None)
217        });
218
219        // Call Raft::stream_append
220        let output = self.raft_node.stream_append(input_stream);
221
222        // Convert StreamAppendResult to pb::AppendEntriesResponse
223        #[allow(clippy::result_large_err)]
224        let output_stream = output.map(|result| match result {
225            Ok(stream_result) => Ok(stream_result.into()),
226            Err(fatal) => Err(Status::internal(format!("Fatal Raft error: {}", fatal))),
227        });
228
229        Ok(Response::new(Box::pin(output_stream)))
230    }
231}