Skip to main content

orca_control/raft/
network.rs

1//! HTTP-based Raft RPC transport using reqwest.
2
3use openraft::error::{InstallSnapshotError, RPCError, RaftError, Unreachable};
4use openraft::network::RPCOption;
5use openraft::raft::{
6    AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
7    VoteRequest, VoteResponse,
8};
9use openraft::{BasicNode, RaftNetwork, RaftNetworkFactory};
10
11use super::type_config::OrcaTypeConfig;
12
13type C = OrcaTypeConfig;
14
15/// Factory that creates HTTP network connections to Raft peers.
16pub struct NetworkFactory {
17    client: reqwest::Client,
18}
19
20impl Default for NetworkFactory {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl NetworkFactory {
27    pub fn new() -> Self {
28        Self {
29            client: reqwest::Client::new(),
30        }
31    }
32}
33
34impl RaftNetworkFactory<C> for NetworkFactory {
35    type Network = NetworkConnection;
36
37    async fn new_client(&mut self, _target: u64, node: &BasicNode) -> Self::Network {
38        NetworkConnection {
39            addr: node.addr.clone(),
40            client: self.client.clone(),
41        }
42    }
43}
44
45/// A single HTTP connection to a Raft peer node.
46pub struct NetworkConnection {
47    addr: String,
48    client: reqwest::Client,
49}
50
51impl NetworkConnection {
52    fn url(&self, path: &str) -> String {
53        format!("http://{}/raft/{}", self.addr, path)
54    }
55}
56
57impl RaftNetwork<C> for NetworkConnection {
58    async fn append_entries(
59        &mut self,
60        rpc: AppendEntriesRequest<C>,
61        _option: RPCOption,
62    ) -> Result<AppendEntriesResponse<u64>, RPCError<u64, BasicNode, RaftError<u64>>> {
63        let resp = self
64            .client
65            .post(self.url("append"))
66            .json(&rpc)
67            .send()
68            .await
69            .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
70
71        let body = resp
72            .json()
73            .await
74            .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
75        Ok(body)
76    }
77
78    async fn install_snapshot(
79        &mut self,
80        rpc: InstallSnapshotRequest<C>,
81        _option: RPCOption,
82    ) -> Result<
83        InstallSnapshotResponse<u64>,
84        RPCError<u64, BasicNode, RaftError<u64, InstallSnapshotError>>,
85    > {
86        let resp = self
87            .client
88            .post(self.url("snapshot"))
89            .json(&rpc)
90            .send()
91            .await
92            .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
93
94        let body: InstallSnapshotResponse<u64> = resp
95            .json()
96            .await
97            .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
98        Ok(body)
99    }
100
101    async fn vote(
102        &mut self,
103        rpc: VoteRequest<u64>,
104        _option: RPCOption,
105    ) -> Result<VoteResponse<u64>, RPCError<u64, BasicNode, RaftError<u64>>> {
106        let resp = self
107            .client
108            .post(self.url("vote"))
109            .json(&rpc)
110            .send()
111            .await
112            .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
113
114        let body = resp
115            .json()
116            .await
117            .map_err(|e| RPCError::Unreachable(Unreachable::new(&e)))?;
118        Ok(body)
119    }
120}