aegis_replication/
http_transport.rs1use crate::node::NodeId;
15use crate::transport::{Message, Transport, TransportError};
16use std::collections::HashMap;
17use std::sync::mpsc;
18use std::sync::{Mutex, RwLock};
19use std::time::Duration;
20
21pub struct HttpTransport {
27 peer_urls: RwLock<HashMap<NodeId, String>>,
29 client: reqwest::blocking::Client,
31 incoming_tx: mpsc::Sender<Message>,
33 incoming_rx: Mutex<mpsc::Receiver<Message>>,
35}
36
37impl HttpTransport {
38 pub fn new(peer_urls: HashMap<NodeId, String>) -> Self {
55 let (tx, rx) = mpsc::channel();
56 let client = reqwest::blocking::Client::builder()
57 .timeout(Duration::from_secs(5))
58 .connect_timeout(Duration::from_secs(2))
59 .build()
60 .expect("failed to build HTTP client");
61
62 Self {
63 peer_urls: RwLock::new(peer_urls),
64 client,
65 incoming_tx: tx,
66 incoming_rx: Mutex::new(rx),
67 }
68 }
69
70 pub fn sender(&self) -> mpsc::Sender<Message> {
75 self.incoming_tx.clone()
76 }
77
78 pub fn push_message(&self, message: Message) -> Result<(), TransportError> {
83 self.incoming_tx
84 .send(message)
85 .map_err(|e| TransportError::Unknown(format!("Failed to push message: {}", e)))
86 }
87
88 pub fn add_peer(&self, node_id: NodeId, url: String) {
90 self.peer_urls
91 .write()
92 .expect("http_transport peer_urls lock poisoned")
93 .insert(node_id, url);
94 }
95
96 pub fn remove_peer(&self, node_id: &NodeId) {
98 self.peer_urls
99 .write()
100 .expect("http_transport peer_urls lock poisoned")
101 .remove(node_id);
102 }
103
104 pub fn peer_url(&self, node_id: &NodeId) -> Option<String> {
106 self.peer_urls
107 .read()
108 .expect("http_transport peer_urls lock poisoned")
109 .get(node_id)
110 .cloned()
111 }
112
113 fn send_to_url(&self, url: &str, message: &Message) -> Result<(), TransportError> {
115 let endpoint = format!("{}/api/v1/cluster/raft", url.trim_end_matches('/'));
116
117 self.client
118 .post(&endpoint)
119 .json(message)
120 .send()
121 .map_err(|e| TransportError::ConnectionFailed(format!("{}: {}", endpoint, e)))?;
122
123 Ok(())
124 }
125}
126
127impl Transport for HttpTransport {
128 fn send(&self, message: Message) -> Result<(), TransportError> {
133 let url = {
134 let urls = self
135 .peer_urls
136 .read()
137 .expect("http_transport peer_urls lock poisoned");
138 urls.get(&message.to).cloned().ok_or_else(|| {
139 TransportError::ConnectionFailed(format!(
140 "No URL configured for node {}",
141 message.to
142 ))
143 })?
144 };
145
146 self.send_to_url(&url, &message)
147 }
148
149 fn recv(&self) -> Result<Message, TransportError> {
155 let rx = self
156 .incoming_rx
157 .lock()
158 .expect("http_transport incoming_rx lock poisoned");
159 rx.recv().map_err(|_| TransportError::Disconnected)
160 }
161
162 fn try_recv(&self) -> Option<Message> {
166 let rx = self
167 .incoming_rx
168 .lock()
169 .expect("http_transport incoming_rx lock poisoned");
170 rx.try_recv().ok()
171 }
172
173 fn broadcast(&self, message: Message, peers: &[NodeId]) -> Vec<Result<(), TransportError>> {
178 peers
179 .iter()
180 .map(|peer| {
181 let mut msg = message.clone();
182 msg.to = peer.clone();
183 self.send(msg)
184 })
185 .collect()
186 }
187}
188
189#[cfg(test)]
194mod tests {
195 use super::*;
196 use crate::transport::MessageType;
197
198 #[test]
199 fn test_http_transport_creation() {
200 let mut peers = HashMap::new();
201 peers.insert(NodeId::new("node2"), "http://127.0.0.1:9091".to_string());
202 peers.insert(NodeId::new("node3"), "http://127.0.0.1:7001".to_string());
203
204 let transport = HttpTransport::new(peers);
205
206 assert_eq!(
207 transport.peer_url(&NodeId::new("node2")),
208 Some("http://127.0.0.1:9091".to_string())
209 );
210 assert_eq!(
211 transport.peer_url(&NodeId::new("node3")),
212 Some("http://127.0.0.1:7001".to_string())
213 );
214 assert_eq!(transport.peer_url(&NodeId::new("node4")), None);
215 }
216
217 #[test]
218 fn test_http_transport_add_remove_peer() {
219 let transport = HttpTransport::new(HashMap::new());
220
221 transport.add_peer(NodeId::new("node2"), "http://127.0.0.1:9091".to_string());
222 assert_eq!(
223 transport.peer_url(&NodeId::new("node2")),
224 Some("http://127.0.0.1:9091".to_string())
225 );
226
227 transport.remove_peer(&NodeId::new("node2"));
228 assert_eq!(transport.peer_url(&NodeId::new("node2")), None);
229 }
230
231 #[test]
232 fn test_http_transport_push_and_recv() {
233 let transport = HttpTransport::new(HashMap::new());
234
235 let msg = Message::heartbeat(NodeId::new("node1"), NodeId::new("node2"), 1);
236 transport.push_message(msg).unwrap();
237
238 let received = transport.try_recv().unwrap();
239 assert_eq!(received.message_type, MessageType::Heartbeat);
240 assert_eq!(received.from.as_str(), "node1");
241 assert_eq!(received.to.as_str(), "node2");
242 }
243
244 #[test]
245 fn test_http_transport_try_recv_empty() {
246 let transport = HttpTransport::new(HashMap::new());
247 assert!(transport.try_recv().is_none());
248 }
249
250 #[test]
251 fn test_http_transport_sender_channel() {
252 let transport = HttpTransport::new(HashMap::new());
253 let sender = transport.sender();
254
255 let msg = Message::heartbeat(NodeId::new("node1"), NodeId::new("node2"), 5);
256 sender.send(msg).unwrap();
257
258 let received = transport.try_recv().unwrap();
259 assert_eq!(received.term, 5);
260 }
261
262 #[test]
263 fn test_http_transport_send_no_peer_url() {
264 let transport = HttpTransport::new(HashMap::new());
265
266 let msg = Message::heartbeat(NodeId::new("node1"), NodeId::new("unknown"), 1);
267 let result = transport.send(msg);
268
269 assert!(result.is_err());
270 match result.unwrap_err() {
271 TransportError::ConnectionFailed(s) => {
272 assert!(s.contains("No URL configured"));
273 }
274 other => panic!("Expected ConnectionFailed, got {:?}", other),
275 }
276 }
277
278 #[test]
279 fn test_http_transport_multiple_messages() {
280 let transport = HttpTransport::new(HashMap::new());
281
282 for i in 0..5 {
283 let msg = Message::heartbeat(NodeId::new("node1"), NodeId::new("node2"), i);
284 transport.push_message(msg).unwrap();
285 }
286
287 for i in 0..5 {
288 let received = transport.try_recv().unwrap();
289 assert_eq!(received.term, i);
290 }
291
292 assert!(transport.try_recv().is_none());
293 }
294}