Skip to main content

edgestore_repl/
http_server.rs

1//! HTTP replication server exposing 3 pull-only endpoints.
2//!
3//! Endpoints:
4//!   GET /merkle              → MessagePack `{root: Vec<u8>}`
5//!   GET /segments            → MessagePack `[{segment_id, segment_hash}]`
6//!   GET /segments/{hash_hex} → raw segment bytes (application/octet-stream)
7//!
8//! All endpoints support `?debug=json` to re-serialize as JSON for human inspection (D07).
9//!
10//! Engine access is serialized via `Arc<Mutex<Engine>>` — correct for single-writer (D08).
11
12use std::sync::{Arc, Mutex};
13
14use serde::{Deserialize, Serialize};
15
16use edgestore::Engine;
17use edgestore::EdgestoreError;
18
19/// MessagePack wire struct for GET /merkle response.
20#[derive(Serialize, Deserialize)]
21struct MerkleResponse {
22    root: Vec<u8>,
23}
24
25/// MessagePack wire struct for one item in GET /segments response.
26#[derive(Serialize, Deserialize)]
27struct SegmentEntry {
28    segment_id: u64,
29    segment_hash: Vec<u8>,
30}
31
32/// HTTP replication server wrapping an `Arc<Mutex<Engine>>`.
33///
34/// Call `start(bind_addr)` to spawn the server loop in a background thread.
35pub struct HttpReplicationServer {
36    engine: Arc<Mutex<Engine>>,
37}
38
39impl HttpReplicationServer {
40    /// Create a new server wrapping the provided engine.
41    pub fn new(engine: Arc<Mutex<Engine>>) -> Self {
42        HttpReplicationServer { engine }
43    }
44
45    /// Bind to `bind_addr` and start the request loop in a background thread.
46    ///
47    /// Returns `(JoinHandle, bound_port)`. The bound port is useful when `bind_addr`
48    /// uses port 0 (OS-assigned ephemeral port). The server runs until the process exits.
49    pub fn start(
50        &self,
51        bind_addr: &str,
52    ) -> Result<(std::thread::JoinHandle<()>, u16), EdgestoreError> {
53        let server = tiny_http::Server::http(bind_addr)
54            .map_err(|e| EdgestoreError::ReplicationError(format!("bind error: {}", e)))?;
55
56        // Extract the bound port before moving `server` into the thread.
57        let bound_port = match server.server_addr() {
58            tiny_http::ListenAddr::IP(addr) => addr.port(),
59            #[cfg(unix)]
60            tiny_http::ListenAddr::Unix(_) => {
61                return Err(EdgestoreError::ReplicationError(
62                    "Unix socket bind not supported for replication server".into(),
63                ))
64            }
65        };
66
67        let engine = Arc::clone(&self.engine);
68
69        let handle = std::thread::spawn(move || {
70            for request in server.incoming_requests() {
71                handle_request(request, &engine);
72            }
73        });
74
75        Ok((handle, bound_port))
76    }
77}
78
79/// Parse path and query string from a request URL.
80///
81/// Returns `(path_parts, debug_json)` where `path_parts` is the non-empty path
82/// components and `debug_json` is `true` when `?debug=json` is present.
83fn parse_url(url: &str) -> (Vec<&str>, bool) {
84    let (path_part, query_part) = match url.split_once('?') {
85        Some((p, q)) => (p, q),
86        None => (url, ""),
87    };
88
89    let debug_json = query_part.split('&').any(|kv| kv == "debug=json");
90
91    let parts: Vec<&str> = path_part
92        .split('/')
93        .filter(|s| !s.is_empty())
94        .collect();
95
96    (parts, debug_json)
97}
98
99/// Respond with a MessagePack-serialized value, or JSON if `debug_json` is set.
100fn respond_msgpack<T: Serialize>(
101    request: tiny_http::Request,
102    value: &T,
103    debug_json: bool,
104) {
105    if debug_json {
106        match serde_json::to_vec(value) {
107            Ok(json_bytes) => {
108                let response = tiny_http::Response::from_data(json_bytes)
109                    .with_header(
110                        tiny_http::Header::from_bytes(
111                            "Content-Type",
112                            "application/json",
113                        )
114                        .unwrap(),
115                    );
116                let _ = request.respond(response);
117            }
118            Err(e) => {
119                respond_error(request, 500, &format!("json serialization error: {}", e));
120            }
121        }
122    } else {
123        match rmp_serde::to_vec_named(value) {
124            Ok(msgpack_bytes) => {
125                let response = tiny_http::Response::from_data(msgpack_bytes)
126                    .with_header(
127                        tiny_http::Header::from_bytes(
128                            "Content-Type",
129                            "application/msgpack",
130                        )
131                        .unwrap(),
132                    );
133                let _ = request.respond(response);
134            }
135            Err(e) => {
136                respond_error(request, 500, &format!("msgpack serialization error: {}", e));
137            }
138        }
139    }
140}
141
142/// Respond with raw bytes and `application/octet-stream`.
143fn respond_raw(request: tiny_http::Request, data: Vec<u8>) {
144    let response = tiny_http::Response::from_data(data)
145        .with_header(
146            tiny_http::Header::from_bytes(
147                "Content-Type",
148                "application/octet-stream",
149            )
150            .unwrap(),
151        );
152    let _ = request.respond(response);
153}
154
155/// Respond with an error status and plain-text body.
156fn respond_error(request: tiny_http::Request, status: u16, msg: &str) {
157    let response = tiny_http::Response::from_string(msg.to_string())
158        .with_status_code(tiny_http::StatusCode(status));
159    let _ = request.respond(response);
160}
161
162/// Dispatch a single HTTP request.
163fn handle_request(mut request: tiny_http::Request, engine: &Arc<Mutex<Engine>>) {
164    // Drain the request body to avoid blocking the client.
165    let mut _body = Vec::new();
166    let _ = request.as_reader().read_to_end(&mut _body);
167
168    let method = request.method().clone();
169    let url = request.url().to_string();
170
171    let (parts, debug_json) = parse_url(&url);
172
173    match (method, parts.as_slice()) {
174        (tiny_http::Method::Get, ["merkle"]) => {
175            // GET /merkle — return Merkle root as MessagePack.
176            match engine.lock() {
177                Ok(eng) => match eng.range_merkle_root() {
178                    Ok(root) => {
179                        let resp = MerkleResponse { root: root.to_vec() };
180                        respond_msgpack(request, &resp, debug_json);
181                    }
182                    Err(e) => {
183                        respond_error(request, 500, &format!("merkle root error: {}", e));
184                    }
185                },
186                Err(_) => {
187                    respond_error(request, 500, "engine lock poisoned");
188                }
189            }
190        }
191
192        (tiny_http::Method::Get, ["segments"]) => {
193            // GET /segments — return full manifest as MessagePack.
194            match engine.lock() {
195                Ok(eng) => match eng.export_manifest() {
196                    Ok(refs) => {
197                        let entries: Vec<SegmentEntry> = refs
198                            .into_iter()
199                            .map(|sr| SegmentEntry {
200                                segment_id: sr.segment_id,
201                                segment_hash: sr.segment_hash.to_vec(),
202                            })
203                            .collect();
204                        respond_msgpack(request, &entries, debug_json);
205                    }
206                    Err(e) => {
207                        respond_error(request, 500, &format!("export manifest error: {}", e));
208                    }
209                },
210                Err(_) => {
211                    respond_error(request, 500, "engine lock poisoned");
212                }
213            }
214        }
215
216        (tiny_http::Method::Get, ["segments", hash_hex]) => {
217            // GET /segments/{hash_hex} — return raw segment bytes.
218            // Parse 64-char hex string into 32 bytes.
219            let hash_hex = *hash_hex;
220            if hash_hex.len() != 64 {
221                respond_error(request, 400, "hash_hex must be 64 hex characters");
222                return;
223            }
224
225            // Decode hex.
226            let mut hash_bytes = [0u8; 32];
227            let mut valid = true;
228            for i in 0..32 {
229                match u8::from_str_radix(&hash_hex[i * 2..i * 2 + 2], 16) {
230                    Ok(b) => hash_bytes[i] = b,
231                    Err(_) => {
232                        valid = false;
233                        break;
234                    }
235                }
236            }
237            if !valid {
238                respond_error(request, 400, "invalid hex encoding in hash");
239                return;
240            }
241
242            // Determine the segment file path from the engine's db path.
243            // The canonical segment filename is segment-{id:08}.dat, but we need to find it
244            // by content hash. Look up via export_manifest to find the segment_id, then
245            // read the canonical .dat file. If not found, fall back to {hash_hex}.dat.
246            let dat_bytes = {
247                let eng_guard = match engine.lock() {
248                    Ok(g) => g,
249                    Err(_) => {
250                        respond_error(request, 500, "engine lock poisoned");
251                        return;
252                    }
253                };
254
255                // Find the segment_id for this hash.
256                let manifest = match eng_guard.export_manifest() {
257                    Ok(m) => m,
258                    Err(e) => {
259                        respond_error(request, 500, &format!("manifest error: {}", e));
260                        return;
261                    }
262                };
263
264                let segment_id = manifest
265                    .iter()
266                    .find(|sr| sr.segment_hash == hash_bytes)
267                    .map(|sr| sr.segment_id);
268
269                let base_path = eng_guard.db_path().to_path_buf();
270
271                drop(eng_guard); // release lock before file I/O
272
273                match segment_id {
274                    Some(sid) => {
275                        // Read the canonical segment-{id:08}.dat file.
276                        let dat_path = base_path.join(format!("segment-{:08}.dat", sid));
277                        match std::fs::read(&dat_path) {
278                            Ok(bytes) => bytes,
279                            Err(_) => {
280                                // Fall back to hash-named file.
281                                let fallback = base_path.join(format!("{}.dat", hash_hex));
282                                match std::fs::read(&fallback) {
283                                    Ok(b) => b,
284                                    Err(e) => {
285                                        respond_error(
286                                            request,
287                                            404,
288                                            &format!("segment not found: {}", e),
289                                        );
290                                        return;
291                                    }
292                                }
293                            }
294                        }
295                    }
296                    None => {
297                        // Hash not in manifest; try direct hash-named file.
298                        let dat_path = base_path.join(format!("{}.dat", hash_hex));
299                        match std::fs::read(&dat_path) {
300                            Ok(b) => b,
301                            Err(_) => {
302                                respond_error(request, 404, "segment not found");
303                                return;
304                            }
305                        }
306                    }
307                }
308            };
309
310            respond_raw(request, dat_bytes);
311        }
312
313        _ => {
314            respond_error(request, 404, "not found");
315        }
316    }
317}