Skip to main content

asteroid_mq/protocol/node/raft/
network.rs

1use openraft::{
2    error::{
3        ClientWriteError, Fatal, InstallSnapshotError, RPCError, RaftError, RemoteError,
4        Unreachable,
5    },
6    raft::{
7        AppendEntriesRequest, AppendEntriesResponse, ClientWriteResponse, InstallSnapshotRequest,
8        InstallSnapshotResponse, VoteRequest, VoteResponse,
9    },
10    RaftNetwork,
11};
12use serde::{Deserialize, Serialize};
13use tokio::sync::oneshot::Receiver;
14
15use crate::prelude::NodeId;
16
17use super::{
18    network_factory::{RaftNodeInfo, TcpNetworkService},
19    proposal::Proposal,
20    raft_node::TcpNode,
21    TypeConfig,
22};
23
24pub struct TcpNetwork {
25    peer: RaftNodeInfo,
26    source: TcpNetworkService,
27}
28
29#[derive(Debug, Serialize, Deserialize)]
30pub(super) enum Request {
31    Vote(VoteRequest<NodeId>),
32    AppendEntries(AppendEntriesRequest<TypeConfig>),
33    InstallSnapshot(InstallSnapshotRequest<TypeConfig>),
34    Proposal(Proposal),
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38pub(super) enum Response {
39    Vote(Result<VoteResponse<NodeId>, RaftError<NodeId>>),
40    AppendEntries(Result<AppendEntriesResponse<NodeId>, RaftError<NodeId>>),
41    InstallSnapshot(
42        Result<InstallSnapshotResponse<NodeId>, RaftError<NodeId, InstallSnapshotError>>,
43    ),
44    Proposal(
45        Result<
46            ClientWriteResponse<TypeConfig>,
47            RaftError<NodeId, ClientWriteError<NodeId, TcpNode>>,
48        >,
49    ),
50}
51
52impl Response {
53    #[tracing::instrument(skip_all)]
54    pub(crate) fn catch_fatal(&self) -> Option<Fatal<NodeId>> {
55        let fatal = match self {
56            Response::Vote(Err(RaftError::Fatal(f))) => f,
57            Response::AppendEntries(Err(RaftError::Fatal(f))) => f,
58            Response::InstallSnapshot(Err(RaftError::Fatal(f))) => f,
59            Response::Proposal(Err(RaftError::Fatal(f))) => f,
60            _ => {
61                return None;
62            }
63        };
64        return Some(fatal.clone());
65    }
66}
67#[derive(Debug, Serialize, Deserialize)]
68pub(super) enum Payload {
69    Request(Request),
70    Response(Response),
71}
72
73#[derive(Debug, Serialize, Deserialize)]
74pub(super) struct Packet {
75    pub seq_id: u64,
76    pub payload: Payload,
77}
78
79#[derive(Debug)]
80pub struct ConnectionNotEstablished;
81impl std::fmt::Display for ConnectionNotEstablished {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        write!(f, "connection not established")
84    }
85}
86impl std::error::Error for ConnectionNotEstablished {}
87
88impl TcpNetwork {
89    pub fn new(peer: RaftNodeInfo, source: TcpNetworkService) -> Self {
90        Self { peer, source }
91    }
92
93    #[tracing::instrument(skip_all, fields(peer_id = ?self.peer.id))]
94    async fn send_request(&mut self, req: Request) -> Result<Receiver<Response>, Unreachable> {
95        let connection = self
96            .source
97            .ensure_connection(self.peer.id, self.peer.node.addr.clone())
98            .await
99            .map_err(|e| Unreachable::new(&e))?;
100        connection.send_request(req).await
101    }
102}
103
104impl RaftNetwork<TypeConfig> for TcpNetwork {
105    async fn vote(
106        &mut self,
107        rpc: VoteRequest<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
108        _option: openraft::network::RPCOption,
109    ) -> Result<
110        VoteResponse<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
111        RPCError<
112            <TypeConfig as openraft::RaftTypeConfig>::NodeId,
113            <TypeConfig as openraft::RaftTypeConfig>::Node,
114            RaftError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
115        >,
116    > {
117        let receiver = self.send_request(Request::Vote(rpc)).await?;
118        let response = receiver.await.map_err(|e| {
119            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
120        })?;
121        let Response::Vote(vote) = response else {
122            unreachable!("wrong implementation: expect vote response")
123        };
124        vote.map_err(|e| RPCError::RemoteError(RemoteError::new(self.peer.id, e)))
125    }
126    async fn append_entries(
127        &mut self,
128        rpc: openraft::raft::AppendEntriesRequest<TypeConfig>,
129        _option: openraft::network::RPCOption,
130    ) -> Result<
131        openraft::raft::AppendEntriesResponse<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
132        openraft::error::RPCError<
133            <TypeConfig as openraft::RaftTypeConfig>::NodeId,
134            <TypeConfig as openraft::RaftTypeConfig>::Node,
135            openraft::error::RaftError<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
136        >,
137    > {
138        let receiver = self.send_request(Request::AppendEntries(rpc)).await?;
139        let response = receiver.await.map_err(|e| {
140            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
141        })?;
142        let Response::AppendEntries(append_entries) = response else {
143            unreachable!("wrong implementation: expect vote response")
144        };
145        append_entries.map_err(|e| RPCError::RemoteError(RemoteError::new(self.peer.id, e)))
146    }
147    async fn install_snapshot(
148        &mut self,
149        rpc: openraft::raft::InstallSnapshotRequest<TypeConfig>,
150        _option: openraft::network::RPCOption,
151    ) -> Result<
152        openraft::raft::InstallSnapshotResponse<<TypeConfig as openraft::RaftTypeConfig>::NodeId>,
153        openraft::error::RPCError<
154            <TypeConfig as openraft::RaftTypeConfig>::NodeId,
155            <TypeConfig as openraft::RaftTypeConfig>::Node,
156            openraft::error::RaftError<
157                <TypeConfig as openraft::RaftTypeConfig>::NodeId,
158                openraft::error::InstallSnapshotError,
159            >,
160        >,
161    > {
162        let receiver = self.send_request(Request::InstallSnapshot(rpc)).await?;
163        let response = receiver.await.map_err(|e| {
164            openraft::error::RPCError::Network(openraft::error::NetworkError::new(&e))
165        })?;
166        let Response::InstallSnapshot(install_snapshot) = response else {
167            unreachable!("wrong implementation: expect vote response")
168        };
169
170        install_snapshot.map_err(|e| RPCError::RemoteError(RemoteError::new(self.peer.id, e)))
171    }
172}