Skip to main content

aegis_replication/
http_transport.rs

1//! HTTP Transport for Raft Communication
2//!
3//! Implements the `Transport` trait using HTTP (reqwest) for inter-node
4//! Raft message passing. Each node is identified by a `NodeId` mapped to
5//! an HTTP URL. Messages are sent as JSON POSTs to `{url}/api/v1/cluster/raft`.
6//!
7//! Incoming messages are received via an internal channel — an external HTTP
8//! handler (e.g., an Axum route) should push received messages into the channel
9//! using `push_message()`.
10//!
11//! @version 0.1.0
12//! @author AutomataNexus Development Team
13
14use crate::node::NodeId;
15use crate::transport::{Message, Transport, TransportError};
16use std::collections::HashMap;
17use std::sync::mpsc;
18use std::sync::{Mutex, RwLock};
19use std::time::Duration;
20
21/// HTTP-based transport for Raft consensus communication.
22///
23/// Sends outgoing Raft messages as JSON POST requests to peer nodes.
24/// Receives incoming messages via an internal channel that should be
25/// fed by an external HTTP endpoint handler.
26pub struct HttpTransport {
27    /// Mapping from node IDs to their HTTP base URLs (e.g., "http://127.0.0.1:9091").
28    peer_urls: RwLock<HashMap<NodeId, String>>,
29    /// HTTP client for sending requests to peers.
30    client: reqwest::blocking::Client,
31    /// Sender side of the incoming message channel.
32    incoming_tx: mpsc::Sender<Message>,
33    /// Receiver side of the incoming message channel.
34    incoming_rx: Mutex<mpsc::Receiver<Message>>,
35}
36
37impl HttpTransport {
38    /// Create a new HTTP transport with the given peer URL mappings.
39    ///
40    /// # Arguments
41    /// * `peer_urls` - A map of `NodeId` to base HTTP URL for each peer node.
42    ///
43    /// # Example
44    /// ```ignore
45    /// use std::collections::HashMap;
46    /// use aegis_replication::node::NodeId;
47    /// use aegis_replication::http_transport::HttpTransport;
48    ///
49    /// let mut peers = HashMap::new();
50    /// peers.insert(NodeId::new("node2"), "http://127.0.0.1:9091".to_string());
51    /// peers.insert(NodeId::new("node3"), "http://127.0.0.1:7001".to_string());
52    /// let transport = HttpTransport::new(peers);
53    /// ```
54    pub fn new(peer_urls: HashMap<NodeId, String>) -> Self {
55        let (tx, rx) = mpsc::channel();
56        let client = reqwest::blocking::Client::builder()
57            .timeout(Duration::from_secs(5))
58            .connect_timeout(Duration::from_secs(2))
59            .build()
60            .expect("failed to build HTTP client");
61
62        Self {
63            peer_urls: RwLock::new(peer_urls),
64            client,
65            incoming_tx: tx,
66            incoming_rx: Mutex::new(rx),
67        }
68    }
69
70    /// Get a sender handle for pushing incoming messages into this transport.
71    ///
72    /// Use this to integrate with an HTTP server endpoint. When a Raft message
73    /// arrives via HTTP, deserialize it and call `sender.send(message)`.
74    pub fn sender(&self) -> mpsc::Sender<Message> {
75        self.incoming_tx.clone()
76    }
77
78    /// Push an incoming message into the receive channel.
79    ///
80    /// This is the primary way to feed messages from an external HTTP handler
81    /// into the transport layer.
82    pub fn push_message(&self, message: Message) -> Result<(), TransportError> {
83        self.incoming_tx
84            .send(message)
85            .map_err(|e| TransportError::Unknown(format!("Failed to push message: {}", e)))
86    }
87
88    /// Add or update a peer URL mapping.
89    pub fn add_peer(&self, node_id: NodeId, url: String) {
90        self.peer_urls
91            .write()
92            .expect("http_transport peer_urls lock poisoned")
93            .insert(node_id, url);
94    }
95
96    /// Remove a peer URL mapping.
97    pub fn remove_peer(&self, node_id: &NodeId) {
98        self.peer_urls
99            .write()
100            .expect("http_transport peer_urls lock poisoned")
101            .remove(node_id);
102    }
103
104    /// Get the URL for a peer node.
105    pub fn peer_url(&self, node_id: &NodeId) -> Option<String> {
106        self.peer_urls
107            .read()
108            .expect("http_transport peer_urls lock poisoned")
109            .get(node_id)
110            .cloned()
111    }
112
113    /// Send a message to a specific URL endpoint.
114    fn send_to_url(&self, url: &str, message: &Message) -> Result<(), TransportError> {
115        let endpoint = format!("{}/api/v1/cluster/raft", url.trim_end_matches('/'));
116
117        self.client
118            .post(&endpoint)
119            .json(message)
120            .send()
121            .map_err(|e| TransportError::ConnectionFailed(format!("{}: {}", endpoint, e)))?;
122
123        Ok(())
124    }
125}
126
127impl Transport for HttpTransport {
128    /// Send a message to the target node via HTTP POST.
129    ///
130    /// The message's `to` field is used to look up the peer's URL.
131    /// The message is serialized as JSON and sent to `{peer_url}/api/v1/cluster/raft`.
132    fn send(&self, message: Message) -> Result<(), TransportError> {
133        let url = {
134            let urls = self
135                .peer_urls
136                .read()
137                .expect("http_transport peer_urls lock poisoned");
138            urls.get(&message.to).cloned().ok_or_else(|| {
139                TransportError::ConnectionFailed(format!(
140                    "No URL configured for node {}",
141                    message.to
142                ))
143            })?
144        };
145
146        self.send_to_url(&url, &message)
147    }
148
149    /// Receive a message (blocking).
150    ///
151    /// Blocks until a message is available in the incoming channel.
152    /// Messages are pushed into this channel by external HTTP handlers
153    /// via `push_message()` or the `sender()` handle.
154    fn recv(&self) -> Result<Message, TransportError> {
155        let rx = self
156            .incoming_rx
157            .lock()
158            .expect("http_transport incoming_rx lock poisoned");
159        rx.recv().map_err(|_| TransportError::Disconnected)
160    }
161
162    /// Try to receive a message (non-blocking).
163    ///
164    /// Returns `None` if no message is currently available.
165    fn try_recv(&self) -> Option<Message> {
166        let rx = self
167            .incoming_rx
168            .lock()
169            .expect("http_transport incoming_rx lock poisoned");
170        rx.try_recv().ok()
171    }
172
173    /// Broadcast a message to all specified peers via HTTP POST.
174    ///
175    /// Sends the message to each peer independently. Failures for individual
176    /// peers do not affect delivery to other peers.
177    fn broadcast(&self, message: Message, peers: &[NodeId]) -> Vec<Result<(), TransportError>> {
178        peers
179            .iter()
180            .map(|peer| {
181                let mut msg = message.clone();
182                msg.to = peer.clone();
183                self.send(msg)
184            })
185            .collect()
186    }
187}
188
189// =============================================================================
190// Tests
191// =============================================================================
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use crate::transport::MessageType;
197
198    #[test]
199    fn test_http_transport_creation() {
200        let mut peers = HashMap::new();
201        peers.insert(NodeId::new("node2"), "http://127.0.0.1:9091".to_string());
202        peers.insert(NodeId::new("node3"), "http://127.0.0.1:7001".to_string());
203
204        let transport = HttpTransport::new(peers);
205
206        assert_eq!(
207            transport.peer_url(&NodeId::new("node2")),
208            Some("http://127.0.0.1:9091".to_string())
209        );
210        assert_eq!(
211            transport.peer_url(&NodeId::new("node3")),
212            Some("http://127.0.0.1:7001".to_string())
213        );
214        assert_eq!(transport.peer_url(&NodeId::new("node4")), None);
215    }
216
217    #[test]
218    fn test_http_transport_add_remove_peer() {
219        let transport = HttpTransport::new(HashMap::new());
220
221        transport.add_peer(NodeId::new("node2"), "http://127.0.0.1:9091".to_string());
222        assert_eq!(
223            transport.peer_url(&NodeId::new("node2")),
224            Some("http://127.0.0.1:9091".to_string())
225        );
226
227        transport.remove_peer(&NodeId::new("node2"));
228        assert_eq!(transport.peer_url(&NodeId::new("node2")), None);
229    }
230
231    #[test]
232    fn test_http_transport_push_and_recv() {
233        let transport = HttpTransport::new(HashMap::new());
234
235        let msg = Message::heartbeat(NodeId::new("node1"), NodeId::new("node2"), 1);
236        transport.push_message(msg).unwrap();
237
238        let received = transport.try_recv().unwrap();
239        assert_eq!(received.message_type, MessageType::Heartbeat);
240        assert_eq!(received.from.as_str(), "node1");
241        assert_eq!(received.to.as_str(), "node2");
242    }
243
244    #[test]
245    fn test_http_transport_try_recv_empty() {
246        let transport = HttpTransport::new(HashMap::new());
247        assert!(transport.try_recv().is_none());
248    }
249
250    #[test]
251    fn test_http_transport_sender_channel() {
252        let transport = HttpTransport::new(HashMap::new());
253        let sender = transport.sender();
254
255        let msg = Message::heartbeat(NodeId::new("node1"), NodeId::new("node2"), 5);
256        sender.send(msg).unwrap();
257
258        let received = transport.try_recv().unwrap();
259        assert_eq!(received.term, 5);
260    }
261
262    #[test]
263    fn test_http_transport_send_no_peer_url() {
264        let transport = HttpTransport::new(HashMap::new());
265
266        let msg = Message::heartbeat(NodeId::new("node1"), NodeId::new("unknown"), 1);
267        let result = transport.send(msg);
268
269        assert!(result.is_err());
270        match result.unwrap_err() {
271            TransportError::ConnectionFailed(s) => {
272                assert!(s.contains("No URL configured"));
273            }
274            other => panic!("Expected ConnectionFailed, got {:?}", other),
275        }
276    }
277
278    #[test]
279    fn test_http_transport_multiple_messages() {
280        let transport = HttpTransport::new(HashMap::new());
281
282        for i in 0..5 {
283            let msg = Message::heartbeat(NodeId::new("node1"), NodeId::new("node2"), i);
284            transport.push_message(msg).unwrap();
285        }
286
287        for i in 0..5 {
288            let received = transport.try_recv().unwrap();
289            assert_eq!(received.term, i);
290        }
291
292        assert!(transport.try_recv().is_none());
293    }
294}