openstack_keystone_distributed_storage/grpc/
raft_service.rs1use std::pin::Pin;
15
16use futures::Stream;
17use futures::StreamExt;
18use openraft::Snapshot;
19use tonic::Request;
20use tonic::Response;
21use tonic::Status;
22use tonic::Streaming;
23use tracing::trace;
24
25use crate::protobuf as pb;
26use crate::protobuf::raft::VoteRequest;
27use crate::protobuf::raft::VoteResponse;
28use crate::protobuf::raft::raft_service_server::RaftService;
29use crate::types::*;
30
31pub struct RaftServiceImpl {
45 raft_node: Raft,
47}
48
49impl RaftServiceImpl {
50 pub fn new(raft_node: Raft) -> Self {
55 RaftServiceImpl { raft_node }
56 }
57}
58
59#[tonic::async_trait]
60impl RaftService for RaftServiceImpl {
61 #[tracing::instrument(level = "trace", skip(self))]
74 async fn vote(&self, request: Request<VoteRequest>) -> Result<Response<VoteResponse>, Status> {
75 let vote_resp = self
76 .raft_node
77 .vote(
78 request
79 .into_inner()
80 .try_into()
81 .map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?,
82 )
83 .await
84 .map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?;
85
86 trace!("Vote request processed successfully");
87 Ok(Response::new(vote_resp.into()))
88 }
89
90 #[tracing::instrument(level = "trace", skip(self))]
105 async fn append_entries(
106 &self,
107 request: Request<pb::raft::AppendEntriesRequest>,
108 ) -> Result<Response<pb::raft::AppendEntriesResponse>, Status> {
109 let append_resp =
110 self.raft_node
111 .append_entries(request.into_inner().try_into().map_err(|e| {
112 Status::internal(format!("Append entries operation failed: {}", e))
113 })?)
114 .await
115 .map_err(|e| Status::internal(format!("Append entries operation failed: {}", e)))?;
116
117 trace!("Append entries request processed successfully");
118 Ok(Response::new(append_resp.into()))
119 }
120
121 #[tracing::instrument(level = "trace", skip(self))]
132 async fn snapshot(
133 &self,
134 request: Request<Streaming<pb::raft::SnapshotRequest>>,
135 ) -> Result<Response<pb::raft::SnapshotResponse>, Status> {
136 let mut stream = request.into_inner();
137
138 let first_chunk = stream
140 .next()
141 .await
142 .ok_or_else(|| Status::invalid_argument("Empty snapshot stream"))??;
143
144 let vote;
145 let snapshot_meta;
146 {
147 let meta = first_chunk
148 .into_meta()
149 .ok_or_else(|| Status::invalid_argument("First snapshot chunk must be metadata"))?;
150
151 trace!("Received snapshot metadata chunk: {:?}", meta);
152
153 vote = meta
154 .vote
155 .ok_or_else(|| Status::invalid_argument("Missing `Vote`"))?;
156
157 snapshot_meta = SnapshotMeta {
158 last_log_id: meta.last_log_id.map(|log_id| log_id.into()),
159 last_membership: StoredMembership::new(
160 meta.last_membership_log_id.map(|x| x.into()),
161 meta.last_membership
162 .ok_or_else(|| Status::invalid_argument("Membership information missing"))?
163 .try_into()
164 .map_err(|e| {
165 Status::invalid_argument(format!("invalid membership: {:?}", e))
166 })?,
167 ),
168 snapshot_id: meta.snapshot_id,
169 };
170 }
171
172 let mut snapshot_data_bytes = Vec::new();
174
175 while let Some(chunk) = stream.next().await {
176 let data = chunk?
177 .into_data_chunk()
178 .ok_or_else(|| Status::invalid_argument("Snapshot chunk must be data"))?;
179 snapshot_data_bytes.extend_from_slice(&data);
180 }
181
182 let snapshot = Snapshot {
183 meta: snapshot_meta,
184 snapshot: snapshot_data_bytes,
185 };
186
187 let snapshot_resp = self
189 .raft_node
190 .install_full_snapshot(vote, snapshot)
191 .await
192 .map_err(|e| Status::internal(format!("Snapshot installation failed: {}", e)))?;
193
194 trace!("Streaming snapshot installation request processed successfully");
195 Ok(Response::new(pb::raft::SnapshotResponse {
196 vote: Some(snapshot_resp.vote),
197 }))
198 }
199
200 type StreamAppendStream =
201 Pin<Box<dyn Stream<Item = Result<pb::raft::AppendEntriesResponse, Status>> + Send>>;
202
203 #[tracing::instrument(level = "trace", skip(self))]
208 async fn stream_append(
209 &self,
210 request: Request<Streaming<pb::raft::AppendEntriesRequest>>,
211 ) -> Result<Response<Self::StreamAppendStream>, Status> {
212 let input = request.into_inner();
213
214 let input_stream = input.filter_map(|r| async move {
216 r.ok().map(TryInto::try_into).transpose().unwrap_or(None)
217 });
218
219 let output = self.raft_node.stream_append(input_stream);
221
222 #[allow(clippy::result_large_err)]
224 let output_stream = output.map(|result| match result {
225 Ok(stream_result) => Ok(stream_result.into()),
226 Err(fatal) => Err(Status::internal(format!("Fatal Raft error: {}", fatal))),
227 });
228
229 Ok(Response::new(Box::pin(output_stream)))
230 }
231}