Skip to main content

openstack_keystone_distributed_storage/
network.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12//
13// SPDX-License-Identifier: Apache-2.0
14use std::future::Future;
15use std::time::Duration;
16
17use futures::SinkExt;
18use futures::Stream;
19use futures::StreamExt;
20use futures::channel::mpsc;
21use openraft::AnyError;
22use openraft::OptionalSend;
23use openraft::RaftNetworkFactory;
24use openraft::base::BoxFuture;
25use openraft::base::BoxStream;
26use openraft::error::NetworkError;
27use openraft::error::ReplicationClosed;
28use openraft::error::Unreachable;
29use openraft::network::Backoff;
30use openraft::network::NetBackoff;
31use openraft::network::NetSnapshot;
32use openraft::network::NetStreamAppend;
33use openraft::network::NetTransferLeader;
34use openraft::network::NetVote;
35use openraft::network::RPCOption;
36use openraft::raft::StreamAppendError;
37use openraft::raft::StreamAppendResult;
38use openraft::raft::TransferLeaderRequest;
39use tonic::transport::Channel;
40
41use crate::protobuf as pb;
42use crate::protobuf::raft::VoteRequest as PbVoteRequest;
43use crate::protobuf::raft::VoteResponse as PbVoteResponse;
44use crate::protobuf::raft::raft_service_client::RaftServiceClient;
45use crate::types::NodeId;
46use crate::types::TypeConfig;
47use crate::types::*;
48
49/// Network implementation for gRPC-based Raft communication.
50/// Provides the networking layer for Raft nodes to communicate with each other.
51pub struct Network {}
52
53impl Network {}
54
55/// Implementation of the RaftNetworkFactory trait for creating new network
56/// connections. This factory creates gRPC client connections to other Raft
57/// nodes.
58impl RaftNetworkFactory<TypeConfig> for Network {
59    type Network = NetworkConnection;
60
61    #[tracing::instrument(level = "debug", skip_all)]
62    async fn new_client(&mut self, _: NodeId, node: &Node) -> Self::Network {
63        NetworkConnection::new(node.clone())
64    }
65}
66
67/// Represents an active network connection to a remote Raft node.
68/// Handles serialization and deserialization of Raft messages over gRPC.
69pub struct NetworkConnection {
70    target_node: pb::raft::Node,
71}
72
73impl NetworkConnection {
74    /// Creates a new NetworkConnection with the provided gRPC client.
75    pub fn new(target_node: Node) -> Self {
76        NetworkConnection { target_node }
77    }
78
79    /// Creates a gRPC client to the target node.
80    async fn make_client(&self) -> Result<RaftServiceClient<Channel>, RPCError> {
81        let server_addr = &self.target_node.rpc_addr;
82        let channel = Channel::builder(
83            format!("http://{}", server_addr)
84                .parse()
85                .map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))?,
86        )
87        .connect()
88        .await
89        .map_err(|e| RPCError::Unreachable(Unreachable::<TypeConfig>::new(&e)))?;
90        Ok(RaftServiceClient::new(channel))
91    }
92
93    /// Convert pb::AppendEntriesResponse to StreamAppendResult.
94    ///
95    /// For `StreamAppend`, conflict is encoded as `conflict = true` plus a
96    /// required `last_log_id` carrying the conflict log id.
97    fn pb_to_stream_result(
98        resp: pb::raft::AppendEntriesResponse,
99    ) -> Result<StreamAppendResult<TypeConfig>, RPCError> {
100        if let Some(higher_vote) = resp.rejected_by {
101            return Ok(Err(StreamAppendError::HigherVote(higher_vote)));
102        }
103
104        if resp.conflict {
105            let conflict_log_id = resp.last_log_id.ok_or_else(|| {
106                RPCError::Network(NetworkError::<TypeConfig>::new(&AnyError::error(
107                    "Missing `last_log_id` in conflict stream-append response",
108                )))
109            })?;
110            return Ok(Err(StreamAppendError::Conflict(conflict_log_id.into())));
111        }
112
113        Ok(Ok(resp.last_log_id.map(Into::into)))
114    }
115
116    /// Sends snapshot data in chunks through the provided channel.
117    async fn send_snapshot_chunks(
118        tx: &mut mpsc::Sender<pb::raft::SnapshotRequest>,
119        snapshot_data: &[u8],
120    ) -> Result<(), NetworkError<TypeConfig>> {
121        let chunk_size = 1024 * 1024;
122        for chunk in snapshot_data.chunks(chunk_size) {
123            let request = pb::raft::SnapshotRequest {
124                payload: Some(pb::raft::snapshot_request::Payload::Chunk(chunk.to_vec())),
125            };
126            tx.send(request)
127                .await
128                .map_err(|e| NetworkError::<TypeConfig>::new(&e))?;
129        }
130        Ok(())
131    }
132}
133
134// =============================================================================
135// Sub-trait implementations for NetworkConnection
136// =============================================================================
137//
138// Instead of implementing RaftNetworkV2 as a monolithic trait, this example
139// demonstrates implementing individual sub-traits directly. This approach:
140// - Shows exactly which network capabilities are provided
141// - Each impl is focused on a single concern
142// - gRPC's native bidirectional streaming maps naturally to NetStreamAppend
143
144impl NetStreamAppend<TypeConfig> for NetworkConnection {
145    fn stream_append<'s, S>(
146        &'s mut self,
147        input: S,
148        _option: RPCOption,
149    ) -> BoxFuture<
150        's,
151        Result<BoxStream<'s, Result<StreamAppendResult<TypeConfig>, RPCError>>, RPCError>,
152    >
153    where
154        S: Stream<Item = AppendEntriesRequest> + OptionalSend + Unpin + 'static,
155    {
156        let fu = async move {
157            let mut client = self.make_client().await?;
158
159            let response = client
160                .stream_append(input.map(pb::raft::AppendEntriesRequest::from))
161                .await
162                .map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))?;
163
164            let output = response.into_inner().map(|result| {
165                let resp =
166                    result.map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))?;
167                Self::pb_to_stream_result(resp)
168            });
169
170            Ok(Box::pin(output) as BoxStream<'s, _>)
171        };
172
173        Box::pin(fu)
174    }
175}
176
177impl NetVote<TypeConfig> for NetworkConnection {
178    async fn vote(
179        &mut self,
180        req: VoteRequest,
181        _option: RPCOption,
182    ) -> Result<VoteResponse, RPCError> {
183        let mut client = self.make_client().await?;
184
185        let proto_vote_req: PbVoteRequest = req.into();
186        let response = client
187            .vote(proto_vote_req)
188            .await
189            .map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))?;
190
191        let proto_vote_resp: PbVoteResponse = response.into_inner();
192        #[allow(clippy::result_large_err)]
193        proto_vote_resp
194            .try_into()
195            .map_err(|e| RPCError::Network(NetworkError::<TypeConfig>::new(&e)))
196    }
197}
198
199impl NetSnapshot<TypeConfig> for NetworkConnection {
200    async fn full_snapshot(
201        &mut self,
202        vote: Vote,
203        snapshot: Snapshot,
204        _cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
205        _option: RPCOption,
206    ) -> Result<SnapshotResponse, StreamingError> {
207        let mut client = self.make_client().await?;
208
209        let (mut tx, rx) = mpsc::channel(1024);
210        let response = client
211            .snapshot(rx)
212            .await
213            .map_err(|e| NetworkError::<TypeConfig>::new(&e))?;
214
215        // 1. Send meta chunk
216        let meta = &snapshot.meta;
217
218        let request = pb::raft::SnapshotRequest {
219            payload: Some(pb::raft::snapshot_request::Payload::Meta(
220                pb::raft::SnapshotRequestMeta {
221                    vote: Some(vote),
222                    last_log_id: meta.last_log_id.map(|log_id| log_id.into()),
223                    last_membership_log_id: meta
224                        .last_membership
225                        .log_id()
226                        .map(|log_id| log_id.into()),
227                    last_membership: Some(meta.last_membership.membership().clone().into()),
228                    snapshot_id: meta.snapshot_id.to_string(),
229                },
230            )),
231        };
232
233        tx.send(request)
234            .await
235            .map_err(|e| NetworkError::<TypeConfig>::new(&e))?;
236
237        // 2. Send data chunks
238        Self::send_snapshot_chunks(&mut tx, &snapshot.snapshot).await?;
239
240        // 3. Receive response
241        let message = response.into_inner();
242
243        Ok(SnapshotResponse {
244            vote: message.vote.ok_or_else(|| {
245                NetworkError::<TypeConfig>::new(&AnyError::error(
246                    "Missing `vote` in snapshot response",
247                ))
248            })?,
249        })
250    }
251}
252
253impl NetBackoff<TypeConfig> for NetworkConnection {
254    fn backoff(&self) -> Backoff {
255        Backoff::new(std::iter::repeat(Duration::from_millis(200)))
256    }
257}
258
259impl NetTransferLeader<TypeConfig> for NetworkConnection {
260    async fn transfer_leader(
261        &mut self,
262        _req: TransferLeaderRequest<TypeConfig>,
263        _option: RPCOption,
264    ) -> Result<(), RPCError> {
265        Err(RPCError::Unreachable(Unreachable::new(&AnyError::error(
266            "transfer_leader not implemented",
267        ))))
268    }
269}