use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
use edgestore::Engine;
use edgestore::EdgestoreError;
#[derive(Serialize, Deserialize)]
struct MerkleResponse {
root: Vec<u8>,
}
#[derive(Serialize, Deserialize)]
struct SegmentEntry {
segment_id: u64,
segment_hash: Vec<u8>,
}
pub struct HttpReplicationServer {
engine: Arc<Mutex<Engine>>,
}
impl HttpReplicationServer {
pub fn new(engine: Arc<Mutex<Engine>>) -> Self {
HttpReplicationServer { engine }
}
pub fn start(
&self,
bind_addr: &str,
) -> Result<(std::thread::JoinHandle<()>, u16), EdgestoreError> {
let server = tiny_http::Server::http(bind_addr)
.map_err(|e| EdgestoreError::ReplicationError(format!("bind error: {}", e)))?;
let bound_port = match server.server_addr() {
tiny_http::ListenAddr::IP(addr) => addr.port(),
#[cfg(unix)]
tiny_http::ListenAddr::Unix(_) => {
return Err(EdgestoreError::ReplicationError(
"Unix socket bind not supported for replication server".into(),
))
}
};
let engine = Arc::clone(&self.engine);
let handle = std::thread::spawn(move || {
for request in server.incoming_requests() {
handle_request(request, &engine);
}
});
Ok((handle, bound_port))
}
}
fn parse_url(url: &str) -> (Vec<&str>, bool) {
let (path_part, query_part) = match url.split_once('?') {
Some((p, q)) => (p, q),
None => (url, ""),
};
let debug_json = query_part.split('&').any(|kv| kv == "debug=json");
let parts: Vec<&str> = path_part
.split('/')
.filter(|s| !s.is_empty())
.collect();
(parts, debug_json)
}
fn respond_msgpack<T: Serialize>(
request: tiny_http::Request,
value: &T,
debug_json: bool,
) {
if debug_json {
match serde_json::to_vec(value) {
Ok(json_bytes) => {
let response = tiny_http::Response::from_data(json_bytes)
.with_header(
tiny_http::Header::from_bytes(
"Content-Type",
"application/json",
)
.unwrap(),
);
let _ = request.respond(response);
}
Err(e) => {
respond_error(request, 500, &format!("json serialization error: {}", e));
}
}
} else {
match rmp_serde::to_vec_named(value) {
Ok(msgpack_bytes) => {
let response = tiny_http::Response::from_data(msgpack_bytes)
.with_header(
tiny_http::Header::from_bytes(
"Content-Type",
"application/msgpack",
)
.unwrap(),
);
let _ = request.respond(response);
}
Err(e) => {
respond_error(request, 500, &format!("msgpack serialization error: {}", e));
}
}
}
}
fn respond_raw(request: tiny_http::Request, data: Vec<u8>) {
let response = tiny_http::Response::from_data(data)
.with_header(
tiny_http::Header::from_bytes(
"Content-Type",
"application/octet-stream",
)
.unwrap(),
);
let _ = request.respond(response);
}
fn respond_error(request: tiny_http::Request, status: u16, msg: &str) {
let response = tiny_http::Response::from_string(msg.to_string())
.with_status_code(tiny_http::StatusCode(status));
let _ = request.respond(response);
}
fn handle_request(mut request: tiny_http::Request, engine: &Arc<Mutex<Engine>>) {
let mut _body = Vec::new();
let _ = request.as_reader().read_to_end(&mut _body);
let method = request.method().clone();
let url = request.url().to_string();
let (parts, debug_json) = parse_url(&url);
match (method, parts.as_slice()) {
(tiny_http::Method::Get, ["merkle"]) => {
match engine.lock() {
Ok(eng) => match eng.range_merkle_root() {
Ok(root) => {
let resp = MerkleResponse { root: root.to_vec() };
respond_msgpack(request, &resp, debug_json);
}
Err(e) => {
respond_error(request, 500, &format!("merkle root error: {}", e));
}
},
Err(_) => {
respond_error(request, 500, "engine lock poisoned");
}
}
}
(tiny_http::Method::Get, ["segments"]) => {
match engine.lock() {
Ok(eng) => match eng.export_manifest() {
Ok(refs) => {
let entries: Vec<SegmentEntry> = refs
.into_iter()
.map(|sr| SegmentEntry {
segment_id: sr.segment_id,
segment_hash: sr.segment_hash.to_vec(),
})
.collect();
respond_msgpack(request, &entries, debug_json);
}
Err(e) => {
respond_error(request, 500, &format!("export manifest error: {}", e));
}
},
Err(_) => {
respond_error(request, 500, "engine lock poisoned");
}
}
}
(tiny_http::Method::Get, ["segments", hash_hex]) => {
let hash_hex = *hash_hex;
if hash_hex.len() != 64 {
respond_error(request, 400, "hash_hex must be 64 hex characters");
return;
}
let mut hash_bytes = [0u8; 32];
let mut valid = true;
for i in 0..32 {
match u8::from_str_radix(&hash_hex[i * 2..i * 2 + 2], 16) {
Ok(b) => hash_bytes[i] = b,
Err(_) => {
valid = false;
break;
}
}
}
if !valid {
respond_error(request, 400, "invalid hex encoding in hash");
return;
}
let dat_bytes = {
let eng_guard = match engine.lock() {
Ok(g) => g,
Err(_) => {
respond_error(request, 500, "engine lock poisoned");
return;
}
};
let manifest = match eng_guard.export_manifest() {
Ok(m) => m,
Err(e) => {
respond_error(request, 500, &format!("manifest error: {}", e));
return;
}
};
let segment_id = manifest
.iter()
.find(|sr| sr.segment_hash == hash_bytes)
.map(|sr| sr.segment_id);
let base_path = eng_guard.db_path().to_path_buf();
drop(eng_guard);
match segment_id {
Some(sid) => {
let dat_path = base_path.join(format!("segment-{:08}.dat", sid));
match std::fs::read(&dat_path) {
Ok(bytes) => bytes,
Err(_) => {
let fallback = base_path.join(format!("{}.dat", hash_hex));
match std::fs::read(&fallback) {
Ok(b) => b,
Err(e) => {
respond_error(
request,
404,
&format!("segment not found: {}", e),
);
return;
}
}
}
}
}
None => {
let dat_path = base_path.join(format!("{}.dat", hash_hex));
match std::fs::read(&dat_path) {
Ok(b) => b,
Err(_) => {
respond_error(request, 404, "segment not found");
return;
}
}
}
}
};
respond_raw(request, dat_bytes);
}
_ => {
respond_error(request, 404, "not found");
}
}
}