openstack_keystone_distributed_storage/
network.rs1use std::future::Future;
15use std::time::Duration;
16
17use futures::SinkExt;
18use futures::Stream;
19use futures::StreamExt;
20use futures::channel::mpsc;
21use openraft::AnyError;
22use openraft::OptionalSend;
23use openraft::RaftNetworkFactory;
24use openraft::base::BoxFuture;
25use openraft::base::BoxStream;
26use openraft::error::NetworkError;
27use openraft::error::ReplicationClosed;
28use openraft::error::Unreachable;
29use openraft::network::Backoff;
30use openraft::network::NetBackoff;
31use openraft::network::NetSnapshot;
32use openraft::network::NetStreamAppend;
33use openraft::network::NetTransferLeader;
34use openraft::network::NetVote;
35use openraft::network::RPCOption;
36use openraft::raft::StreamAppendError;
37use openraft::raft::StreamAppendResult;
38use openraft::raft::TransferLeaderRequest;
39use tonic::transport::Channel;
40
41use crate::protobuf as pb;
42use crate::protobuf::raft::VoteRequest as PbVoteRequest;
43use crate::protobuf::raft::VoteResponse as PbVoteResponse;
44use crate::protobuf::raft::raft_service_client::RaftServiceClient;
45use crate::types::NodeId;
46use crate::types::TypeConfig;
47use crate::types::*;
48
49pub struct Network {}
52
53impl Network {}
54
55impl RaftNetworkFactory<TypeConfig> for Network {
59 type Network = NetworkConnection;
60
61 #[tracing::instrument(level = "debug", skip_all)]
62 async fn new_client(&mut self, _: NodeId, node: &Node) -> Self::Network {
63 NetworkConnection::new(node.clone())
64 }
65}
66
67pub struct NetworkConnection {
70 target_node: pb::raft::Node,
71}
72
73impl NetworkConnection {
74 pub fn new(target_node: Node) -> Self {
76 NetworkConnection { target_node }
77 }
78
79 async fn make_client(&self) -> Result<RaftServiceClient<Channel>, RPCError> {
81 let server_addr = &self.target_node.rpc_addr;
82 let channel = Channel::builder(
83 format!("http://{}", server_addr)
84 .parse()
85 .map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))?,
86 )
87 .connect()
88 .await
89 .map_err(|e| RPCError::Unreachable(Unreachable::<TypeConfig>::new(&e)))?;
90 Ok(RaftServiceClient::new(channel))
91 }
92
93 fn pb_to_stream_result(
98 resp: pb::raft::AppendEntriesResponse,
99 ) -> Result<StreamAppendResult<TypeConfig>, RPCError> {
100 if let Some(higher_vote) = resp.rejected_by {
101 return Ok(Err(StreamAppendError::HigherVote(higher_vote)));
102 }
103
104 if resp.conflict {
105 let conflict_log_id = resp.last_log_id.ok_or_else(|| {
106 RPCError::Network(NetworkError::<TypeConfig>::new(&AnyError::error(
107 "Missing `last_log_id` in conflict stream-append response",
108 )))
109 })?;
110 return Ok(Err(StreamAppendError::Conflict(conflict_log_id.into())));
111 }
112
113 Ok(Ok(resp.last_log_id.map(Into::into)))
114 }
115
116 async fn send_snapshot_chunks(
118 tx: &mut mpsc::Sender<pb::raft::SnapshotRequest>,
119 snapshot_data: &[u8],
120 ) -> Result<(), NetworkError<TypeConfig>> {
121 let chunk_size = 1024 * 1024;
122 for chunk in snapshot_data.chunks(chunk_size) {
123 let request = pb::raft::SnapshotRequest {
124 payload: Some(pb::raft::snapshot_request::Payload::Chunk(chunk.to_vec())),
125 };
126 tx.send(request)
127 .await
128 .map_err(|e| NetworkError::<TypeConfig>::new(&e))?;
129 }
130 Ok(())
131 }
132}
133
134impl NetStreamAppend<TypeConfig> for NetworkConnection {
145 fn stream_append<'s, S>(
146 &'s mut self,
147 input: S,
148 _option: RPCOption,
149 ) -> BoxFuture<
150 's,
151 Result<BoxStream<'s, Result<StreamAppendResult<TypeConfig>, RPCError>>, RPCError>,
152 >
153 where
154 S: Stream<Item = AppendEntriesRequest> + OptionalSend + Unpin + 'static,
155 {
156 let fu = async move {
157 let mut client = self.make_client().await?;
158
159 let response = client
160 .stream_append(input.map(pb::raft::AppendEntriesRequest::from))
161 .await
162 .map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))?;
163
164 let output = response.into_inner().map(|result| {
165 let resp =
166 result.map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))?;
167 Self::pb_to_stream_result(resp)
168 });
169
170 Ok(Box::pin(output) as BoxStream<'s, _>)
171 };
172
173 Box::pin(fu)
174 }
175}
176
177impl NetVote<TypeConfig> for NetworkConnection {
178 async fn vote(
179 &mut self,
180 req: VoteRequest,
181 _option: RPCOption,
182 ) -> Result<VoteResponse, RPCError> {
183 let mut client = self.make_client().await?;
184
185 let proto_vote_req: PbVoteRequest = req.into();
186 let response = client
187 .vote(proto_vote_req)
188 .await
189 .map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))?;
190
191 let proto_vote_resp: PbVoteResponse = response.into_inner();
192 #[allow(clippy::result_large_err)]
193 proto_vote_resp
194 .try_into()
195 .map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))
196 }
197}
198
199impl NetSnapshot<TypeConfig> for NetworkConnection {
200 async fn full_snapshot(
201 &mut self,
202 vote: Vote,
203 snapshot: Snapshot,
204 _cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
205 _option: RPCOption,
206 ) -> Result<SnapshotResponse, StreamingError> {
207 let mut client = self.make_client().await?;
208
209 let (mut tx, rx) = mpsc::channel(1024);
210 let response = client
211 .snapshot(rx)
212 .await
213 .map_err(|e| NetworkError::<TypeConfig>::new(&e))?;
214
215 let meta = &snapshot.meta;
217
218 let request = pb::raft::SnapshotRequest {
219 payload: Some(pb::raft::snapshot_request::Payload::Meta(
220 pb::raft::SnapshotRequestMeta {
221 vote: Some(vote),
222 last_log_id: meta.last_log_id.map(|log_id| log_id.into()),
223 last_membership_log_id: meta
224 .last_membership
225 .log_id()
226 .map(|log_id| log_id.into()),
227 last_membership: Some(meta.last_membership.membership().clone().into()),
228 snapshot_id: meta.snapshot_id.to_string(),
229 },
230 )),
231 };
232
233 tx.send(request)
234 .await
235 .map_err(|e| NetworkError::<TypeConfig>::new(&e))?;
236
237 Self::send_snapshot_chunks(&mut tx, &snapshot.snapshot).await?;
239
240 let message = response.into_inner();
242
243 Ok(SnapshotResponse {
244 vote: message.vote.ok_or_else(|| {
245 NetworkError::<TypeConfig>::new(&AnyError::error(
246 "Missing `vote` in snapshot response",
247 ))
248 })?,
249 })
250 }
251}
252
253impl NetBackoff<TypeConfig> for NetworkConnection {
254 fn backoff(&self) -> Backoff {
255 Backoff::new(std::iter::repeat(Duration::from_millis(200)))
256 }
257}
258
259impl NetTransferLeader<TypeConfig> for NetworkConnection {
260 async fn transfer_leader(
261 &mut self,
262 _req: TransferLeaderRequest<TypeConfig>,
263 _option: RPCOption,
264 ) -> Result<(), RPCError> {
265 Err(RPCError::Unreachable(Unreachable::new(&AnyError::error(
266 "transfer_leader not implemented",
267 ))))
268 }
269}