1use std::sync::{Arc, Mutex};
13
14use serde::{Deserialize, Serialize};
15
16use edgestore::Engine;
17use edgestore::EdgestoreError;
18
19#[derive(Serialize, Deserialize)]
21struct MerkleResponse {
22 root: Vec<u8>,
23}
24
25#[derive(Serialize, Deserialize)]
27struct SegmentEntry {
28 segment_id: u64,
29 segment_hash: Vec<u8>,
30}
31
32pub struct HttpReplicationServer {
36 engine: Arc<Mutex<Engine>>,
37}
38
39impl HttpReplicationServer {
40 pub fn new(engine: Arc<Mutex<Engine>>) -> Self {
42 HttpReplicationServer { engine }
43 }
44
45 pub fn start(
50 &self,
51 bind_addr: &str,
52 ) -> Result<(std::thread::JoinHandle<()>, u16), EdgestoreError> {
53 let server = tiny_http::Server::http(bind_addr)
54 .map_err(|e| EdgestoreError::ReplicationError(format!("bind error: {}", e)))?;
55
56 let bound_port = match server.server_addr() {
58 tiny_http::ListenAddr::IP(addr) => addr.port(),
59 #[cfg(unix)]
60 tiny_http::ListenAddr::Unix(_) => {
61 return Err(EdgestoreError::ReplicationError(
62 "Unix socket bind not supported for replication server".into(),
63 ))
64 }
65 };
66
67 let engine = Arc::clone(&self.engine);
68
69 let handle = std::thread::spawn(move || {
70 for request in server.incoming_requests() {
71 handle_request(request, &engine);
72 }
73 });
74
75 Ok((handle, bound_port))
76 }
77}
78
79fn parse_url(url: &str) -> (Vec<&str>, bool) {
84 let (path_part, query_part) = match url.split_once('?') {
85 Some((p, q)) => (p, q),
86 None => (url, ""),
87 };
88
89 let debug_json = query_part.split('&').any(|kv| kv == "debug=json");
90
91 let parts: Vec<&str> = path_part
92 .split('/')
93 .filter(|s| !s.is_empty())
94 .collect();
95
96 (parts, debug_json)
97}
98
99fn respond_msgpack<T: Serialize>(
101 request: tiny_http::Request,
102 value: &T,
103 debug_json: bool,
104) {
105 if debug_json {
106 match serde_json::to_vec(value) {
107 Ok(json_bytes) => {
108 let response = tiny_http::Response::from_data(json_bytes)
109 .with_header(
110 tiny_http::Header::from_bytes(
111 "Content-Type",
112 "application/json",
113 )
114 .unwrap(),
115 );
116 let _ = request.respond(response);
117 }
118 Err(e) => {
119 respond_error(request, 500, &format!("json serialization error: {}", e));
120 }
121 }
122 } else {
123 match rmp_serde::to_vec_named(value) {
124 Ok(msgpack_bytes) => {
125 let response = tiny_http::Response::from_data(msgpack_bytes)
126 .with_header(
127 tiny_http::Header::from_bytes(
128 "Content-Type",
129 "application/msgpack",
130 )
131 .unwrap(),
132 );
133 let _ = request.respond(response);
134 }
135 Err(e) => {
136 respond_error(request, 500, &format!("msgpack serialization error: {}", e));
137 }
138 }
139 }
140}
141
142fn respond_raw(request: tiny_http::Request, data: Vec<u8>) {
144 let response = tiny_http::Response::from_data(data)
145 .with_header(
146 tiny_http::Header::from_bytes(
147 "Content-Type",
148 "application/octet-stream",
149 )
150 .unwrap(),
151 );
152 let _ = request.respond(response);
153}
154
155fn respond_error(request: tiny_http::Request, status: u16, msg: &str) {
157 let response = tiny_http::Response::from_string(msg.to_string())
158 .with_status_code(tiny_http::StatusCode(status));
159 let _ = request.respond(response);
160}
161
162fn handle_request(mut request: tiny_http::Request, engine: &Arc<Mutex<Engine>>) {
164 let mut _body = Vec::new();
166 let _ = request.as_reader().read_to_end(&mut _body);
167
168 let method = request.method().clone();
169 let url = request.url().to_string();
170
171 let (parts, debug_json) = parse_url(&url);
172
173 match (method, parts.as_slice()) {
174 (tiny_http::Method::Get, ["merkle"]) => {
175 match engine.lock() {
177 Ok(eng) => match eng.range_merkle_root() {
178 Ok(root) => {
179 let resp = MerkleResponse { root: root.to_vec() };
180 respond_msgpack(request, &resp, debug_json);
181 }
182 Err(e) => {
183 respond_error(request, 500, &format!("merkle root error: {}", e));
184 }
185 },
186 Err(_) => {
187 respond_error(request, 500, "engine lock poisoned");
188 }
189 }
190 }
191
192 (tiny_http::Method::Get, ["segments"]) => {
193 match engine.lock() {
195 Ok(eng) => match eng.export_manifest() {
196 Ok(refs) => {
197 let entries: Vec<SegmentEntry> = refs
198 .into_iter()
199 .map(|sr| SegmentEntry {
200 segment_id: sr.segment_id,
201 segment_hash: sr.segment_hash.to_vec(),
202 })
203 .collect();
204 respond_msgpack(request, &entries, debug_json);
205 }
206 Err(e) => {
207 respond_error(request, 500, &format!("export manifest error: {}", e));
208 }
209 },
210 Err(_) => {
211 respond_error(request, 500, "engine lock poisoned");
212 }
213 }
214 }
215
216 (tiny_http::Method::Get, ["segments", hash_hex]) => {
217 let hash_hex = *hash_hex;
220 if hash_hex.len() != 64 {
221 respond_error(request, 400, "hash_hex must be 64 hex characters");
222 return;
223 }
224
225 let mut hash_bytes = [0u8; 32];
227 let mut valid = true;
228 for i in 0..32 {
229 match u8::from_str_radix(&hash_hex[i * 2..i * 2 + 2], 16) {
230 Ok(b) => hash_bytes[i] = b,
231 Err(_) => {
232 valid = false;
233 break;
234 }
235 }
236 }
237 if !valid {
238 respond_error(request, 400, "invalid hex encoding in hash");
239 return;
240 }
241
242 let dat_bytes = {
247 let eng_guard = match engine.lock() {
248 Ok(g) => g,
249 Err(_) => {
250 respond_error(request, 500, "engine lock poisoned");
251 return;
252 }
253 };
254
255 let manifest = match eng_guard.export_manifest() {
257 Ok(m) => m,
258 Err(e) => {
259 respond_error(request, 500, &format!("manifest error: {}", e));
260 return;
261 }
262 };
263
264 let segment_id = manifest
265 .iter()
266 .find(|sr| sr.segment_hash == hash_bytes)
267 .map(|sr| sr.segment_id);
268
269 let base_path = eng_guard.db_path().to_path_buf();
270
271 drop(eng_guard); match segment_id {
274 Some(sid) => {
275 let dat_path = base_path.join(format!("segment-{:08}.dat", sid));
277 match std::fs::read(&dat_path) {
278 Ok(bytes) => bytes,
279 Err(_) => {
280 let fallback = base_path.join(format!("{}.dat", hash_hex));
282 match std::fs::read(&fallback) {
283 Ok(b) => b,
284 Err(e) => {
285 respond_error(
286 request,
287 404,
288 &format!("segment not found: {}", e),
289 );
290 return;
291 }
292 }
293 }
294 }
295 }
296 None => {
297 let dat_path = base_path.join(format!("{}.dat", hash_hex));
299 match std::fs::read(&dat_path) {
300 Ok(b) => b,
301 Err(_) => {
302 respond_error(request, 404, "segment not found");
303 return;
304 }
305 }
306 }
307 }
308 };
309
310 respond_raw(request, dat_bytes);
311 }
312
313 _ => {
314 respond_error(request, 404, "not found");
315 }
316 }
317}