orca_control/raft/
network.rs1use openraft::error::{InstallSnapshotError, RPCError, RaftError, Unreachable};
4use openraft::network::RPCOption;
5use openraft::raft::{
6 AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
7 VoteRequest, VoteResponse,
8};
9use openraft::{BasicNode, RaftNetwork, RaftNetworkFactory};
10
11use super::type_config::OrcaTypeConfig;
12
13type C = OrcaTypeConfig;
14
15pub struct NetworkFactory {
17 client: reqwest::Client,
18}
19
20impl Default for NetworkFactory {
21 fn default() -> Self {
22 Self::new()
23 }
24}
25
26impl NetworkFactory {
27 pub fn new() -> Self {
28 Self {
29 client: reqwest::Client::new(),
30 }
31 }
32}
33
34impl RaftNetworkFactory<C> for NetworkFactory {
35 type Network = NetworkConnection;
36
37 async fn new_client(&mut self, _target: u64, node: &BasicNode) -> Self::Network {
38 NetworkConnection {
39 addr: node.addr.clone(),
40 client: self.client.clone(),
41 }
42 }
43}
44
45pub struct NetworkConnection {
47 addr: String,
48 client: reqwest::Client,
49}
50
51impl NetworkConnection {
52 fn url(&self, path: &str) -> String {
53 format!("http://{}/raft/{}", self.addr, path)
54 }
55}
56
57impl RaftNetwork<C> for NetworkConnection {
58 async fn append_entries(
59 &mut self,
60 rpc: AppendEntriesRequest<C>,
61 _option: RPCOption,
62 ) -> Result<AppendEntriesResponse<u64>, RPCError<u64, BasicNode, RaftError<u64>>> {
63 let resp = self
64 .client
65 .post(self.url("append"))
66 .json(&rpc)
67 .send()
68 .await
69 .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
70
71 let body = resp
72 .json()
73 .await
74 .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
75 Ok(body)
76 }
77
78 async fn install_snapshot(
79 &mut self,
80 rpc: InstallSnapshotRequest<C>,
81 _option: RPCOption,
82 ) -> Result<
83 InstallSnapshotResponse<u64>,
84 RPCError<u64, BasicNode, RaftError<u64, InstallSnapshotError>>,
85 > {
86 let resp = self
87 .client
88 .post(self.url("snapshot"))
89 .json(&rpc)
90 .send()
91 .await
92 .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
93
94 let body: InstallSnapshotResponse<u64> = resp
95 .json()
96 .await
97 .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
98 Ok(body)
99 }
100
101 async fn vote(
102 &mut self,
103 rpc: VoteRequest<u64>,
104 _option: RPCOption,
105 ) -> Result<VoteResponse<u64>, RPCError<u64, BasicNode, RaftError<u64>>> {
106 let resp = self
107 .client
108 .post(self.url("vote"))
109 .json(&rpc)
110 .send()
111 .await
112 .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
113
114 let body = resp
115 .json()
116 .await
117 .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
118 Ok(body)
119 }
120}