Skip to main content

edgestore_repl/
http_client.rs

1//! HTTP replication client implementing the `ReplicationProtocol` trait.
2//!
3//! Uses MessagePack (rmp-serde) for control messages (D07).
4//! Raw bytes for segment data transfer.
5//!
6//! All network errors are mapped to `EdgestoreError::ReplicationError`.
7
8use std::io::Read;
9
10use serde::{Deserialize, Serialize};
11
12use edgestore::replication::{ReplicationProtocol, SegmentRef};
13use edgestore::EdgestoreError;
14
15/// MessagePack wire struct for GET /merkle response.
16#[derive(Serialize, Deserialize)]
17struct MerkleResponse {
18    root: Vec<u8>,
19}
20
21/// MessagePack wire struct for one item in GET /segments response.
22#[derive(Serialize, Deserialize)]
23struct SegmentEntry {
24    segment_id: u64,
25    segment_hash: Vec<u8>,
26}
27
28/// HTTP replication client that implements `ReplicationProtocol` against an `HttpReplicationServer`.
29///
30/// Uses MessagePack (rmp-serde) for control messages and raw bytes for segment transfer.
31pub struct HttpReplicationClient {
32    base_url: String,
33}
34
35impl HttpReplicationClient {
36    /// Create a new client pointing at `base_url` (e.g. `"http://127.0.0.1:8900"`).
37    pub fn new(base_url: impl Into<String>) -> Self {
38        HttpReplicationClient {
39            base_url: base_url.into(),
40        }
41    }
42}
43
44impl ReplicationProtocol for HttpReplicationClient {
45    /// Fetch the remote peer's current Merkle root.
46    ///
47    /// Calls `GET {base_url}/merkle`, deserializes MessagePack body as `{root: Vec<u8>}`,
48    /// and converts to `[u8; 32]`.
49    fn merkle_root(&self) -> Result<[u8; 32], EdgestoreError> {
50        let url = format!("{}/merkle", self.base_url);
51        let response = ureq::get(&url)
52            .call()
53            .map_err(|e| EdgestoreError::ReplicationError(format!("GET /merkle: {}", e)))?;
54
55        let resp: MerkleResponse = rmp_serde::from_read(response.into_reader())
56            .map_err(|e| {
57                EdgestoreError::ReplicationError(format!("GET /merkle decode: {}", e))
58            })?;
59
60        if resp.root.len() != 32 {
61            return Err(EdgestoreError::ReplicationError(format!(
62                "GET /merkle: expected 32-byte root, got {} bytes",
63                resp.root.len()
64            )));
65        }
66
67        let mut hash = [0u8; 32];
68        hash.copy_from_slice(&resp.root);
69        Ok(hash)
70    }
71
72    /// Fetch the remote peer's full segment manifest.
73    ///
74    /// Calls `GET {base_url}/segments`, deserializes MessagePack as a list of segment entries,
75    /// and converts each to `SegmentRef`.
76    fn list_segments(&self) -> Result<Vec<SegmentRef>, EdgestoreError> {
77        let url = format!("{}/segments", self.base_url);
78        let response = ureq::get(&url)
79            .call()
80            .map_err(|e| EdgestoreError::ReplicationError(format!("GET /segments: {}", e)))?;
81
82        let entries: Vec<SegmentEntry> = rmp_serde::from_read(response.into_reader())
83            .map_err(|e| {
84                EdgestoreError::ReplicationError(format!("GET /segments decode: {}", e))
85            })?;
86
87        let refs = entries
88            .into_iter()
89            .map(|e| {
90                let mut hash = [0u8; 32];
91                let copy_len = e.segment_hash.len().min(32);
92                hash[..copy_len].copy_from_slice(&e.segment_hash[..copy_len]);
93                SegmentRef {
94                    segment_hash: hash,
95                    segment_id: e.segment_id,
96                }
97            })
98            .collect();
99
100        Ok(refs)
101    }
102
103    /// Fetch one segment's raw bytes by content hash.
104    ///
105    /// Calls `GET {base_url}/segments/{hash_hex}`, reads raw bytes from body.
106    /// Caller MUST verify BLAKE3 before applying (T-04-01).
107    fn fetch_segment(&self, hash: &[u8; 32]) -> Result<Vec<u8>, EdgestoreError> {
108        let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
109        let url = format!("{}/segments/{}", self.base_url, hash_hex);
110
111        let response = ureq::get(&url)
112            .call()
113            .map_err(|e| {
114                EdgestoreError::ReplicationError(format!("GET /segments/{}: {}", hash_hex, e))
115            })?;
116
117        let mut data = Vec::new();
118        response
119            .into_reader()
120            .read_to_end(&mut data)
121            .map_err(|e| {
122                EdgestoreError::ReplicationError(format!(
123                    "GET /segments/{} read body: {}",
124                    hash_hex, e
125                ))
126            })?;
127
128        Ok(data)
129    }
130}