asteroid_mq/protocol/node/raft/
network.rs1use openraft::{
2 error::{
3 ClientWriteError, Fatal, InstallSnapshotError, RPCError, RaftError, RemoteError,
4 Unreachable,
5 },
6 raft::{
7 AppendEntriesRequest, AppendEntriesResponse, ClientWriteResponse, InstallSnapshotRequest,
8 InstallSnapshotResponse, VoteRequest, VoteResponse,
9 },
10 RaftNetwork,
11};
12use serde::{Deserialize, Serialize};
13use tokio::sync::oneshot::Receiver;
14
15use crate::prelude::NodeId;
16
17use super::{
18 network_factory::{RaftNodeInfo, TcpNetworkService},
19 proposal::Proposal,
20 raft_node::TcpNode,
21 TypeConfig,
22};
23
24pub struct TcpNetwork {
25 peer: RaftNodeInfo,
26 source: TcpNetworkService,
27}
28
29#[derive(Debug, Serialize, Deserialize)]
30pub(super) enum Request {
31 Vote(VoteRequest<NodeId>),
32 AppendEntries(AppendEntriesRequest<TypeConfig>),
33 InstallSnapshot(InstallSnapshotRequest<TypeConfig>),
34 Proposal(Proposal),
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38pub(super) enum Response {
39 Vote(Result<VoteResponse<NodeId>, RaftError<NodeId>>),
40 AppendEntries(Result<AppendEntriesResponse<NodeId>, RaftError<NodeId>>),
41 InstallSnapshot(
42 Result<InstallSnapshotResponse<NodeId>, RaftError<NodeId, InstallSnapshotError>>,
43 ),
44 Proposal(
45 Result<
46 ClientWriteResponse<TypeConfig>,
47 RaftError<NodeId, ClientWriteError<NodeId, TcpNode>>,
48 >,
49 ),
50}
51
52impl Response {
53 #[tracing::instrument(skip_all)]
54 pub(crate) fn catch_fatal(&self) -> Option<Fatal<NodeId>> {
55 let fatal = match self {
56 Response::Vote(Err(RaftError::Fatal(f))) => f,
57 Response::AppendEntries(Err(RaftError::Fatal(f))) => f,
58 Response::InstallSnapshot(Err(RaftError::Fatal(f))) => f,
59 Response::Proposal(Err(RaftError::Fatal(f))) => f,
60 _ => {
61 return None;
62 }
63 };
64 return Some(fatal.clone());
65 }
66}
67#[derive(Debug, Serialize, Deserialize)]
68pub(super) enum Payload {
69 Request(Request),
70 Response(Response),
71}
72
73#[derive(Debug, Serialize, Deserialize)]
74pub(super) struct Packet {
75 pub seq_id: u64,
76 pub payload: Payload,
77}
78
79#[derive(Debug)]
80pub struct ConnectionNotEstablished;
81impl std::fmt::Display for ConnectionNotEstablished {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(f, "connection not established")
84 }
85}
86impl std::error::Error for ConnectionNotEstablished {}
87
88impl TcpNetwork {
89 pub fn new(peer: RaftNodeInfo, source: TcpNetworkService) -> Self {
90 Self { peer, source }
91 }
92
93 #[tracing::instrument(skip_all, fields(peer_id = ?self.peer.id))]
94 async fn send_request(&mut self, req: Request) -> Result<Receiver<Response>, Unreachable> {
95 let connection = self
96 .source
97 .ensure_connection(self.peer.id, self.peer.node.addr.clone())
98 .await
99 .map_err(|e| Unreachable::new(&e))?;
100 connection.send_request(req).await
101 }
102}
103
104impl RaftNetwork<TypeConfig> for TcpNetwork {
105 async fn vote(
106 &mut self,
107 rpc: VoteRequest<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
108 _option: openraft::network::RPCOption,
109 ) -> Result<
110 VoteResponse<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
111 RPCError<
112 <TypeConfig as openraft::RaftTypeConfig>::NodeId,
113 <TypeConfig as openraft::RaftTypeConfig>::Node,
114 RaftError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
115 >,
116 > {
117 let receiver = self.send_request(Request::Vote(rpc)).await?;
118 let response = receiver.await.map_err(|e| {
119 openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
120 })?;
121 let Response::Vote(vote) = response else {
122 unreachable!("wrong implementation: expect vote response")
123 };
124 vote.map_err(|e| RPCError::RemoteError(RemoteError::new(self.peer.id, e)))
125 }
126 async fn append_entries(
127 &mut self,
128 rpc: openraft::raft::AppendEntriesRequest<TypeConfig>,
129 _option: openraft::network::RPCOption,
130 ) -> Result<
131 openraft::raft::AppendEntriesResponse<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
132 openraft::error::RPCError<
133 <TypeConfig as openraft::RaftTypeConfig>::NodeId,
134 <TypeConfig as openraft::RaftTypeConfig>::Node,
135 openraft::error::RaftError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
136 >,
137 > {
138 let receiver = self.send_request(Request::AppendEntries(rpc)).await?;
139 let response = receiver.await.map_err(|e| {
140 openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
141 })?;
142 let Response::AppendEntries(append_entries) = response else {
143 unreachable!("wrong implementation: expect vote response")
144 };
145 append_entries.map_err(|e| RPCError::RemoteError(RemoteError::new(self.peer.id, e)))
146 }
147 async fn install_snapshot(
148 &mut self,
149 rpc: openraft::raft::InstallSnapshotRequest<TypeConfig>,
150 _option: openraft::network::RPCOption,
151 ) -> Result<
152 openraft::raft::InstallSnapshotResponse<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
153 openraft::error::RPCError<
154 <TypeConfig as openraft::RaftTypeConfig>::NodeId,
155 <TypeConfig as openraft::RaftTypeConfig>::Node,
156 openraft::error::RaftError<
157 <TypeConfig as openraft::RaftTypeConfig>::NodeId,
158 openraft::error::InstallSnapshotError,
159 >,
160 >,
161 > {
162 let receiver = self.send_request(Request::InstallSnapshot(rpc)).await?;
163 let response = receiver.await.map_err(|e| {
164 openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
165 })?;
166 let Response::InstallSnapshot(install_snapshot) = response else {
167 unreachable!("wrong implementation: expect vote response")
168 };
169
170 install_snapshot.map_err(|e| RPCError::RemoteError(RemoteError::new(self.peer.id, e)))
171 }
172}