Skip to main content

cortex_runtime/collective/
sync.rs

1//! Remote registry sync — push/pull deltas to remote Cortex registries.
2//!
3//! Handles communication with remote registry servers over HTTPS.
4
5use crate::collective::delta::{self, MapDelta};
6use crate::map::types::SiteMap;
7use anyhow::{Context, Result};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10
11/// Remote sync client for pushing/pulling to a remote registry.
12pub struct RemoteSync {
13    /// Remote registry endpoint URL.
14    endpoint: String,
15    /// This Cortex instance's unique ID.
16    instance_id: String,
17    /// Optional API key for authenticated writes.
18    api_key: Option<String>,
19    /// HTTP client.
20    client: reqwest::Client,
21}
22
23/// A remote registry entry listing.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct RemoteEntry {
26    pub domain: String,
27    pub latest_timestamp: DateTime<Utc>,
28    pub node_count: usize,
29}
30
31impl RemoteSync {
32    /// Create a new remote sync client.
33    pub fn new(endpoint: &str, instance_id: &str, api_key: Option<String>) -> Self {
34        Self {
35            endpoint: endpoint.trim_end_matches('/').to_string(),
36            instance_id: instance_id.to_string(),
37            api_key,
38            client: reqwest::Client::new(),
39        }
40    }
41
42    /// Push a delta to the remote registry.
43    pub async fn push_delta(&self, domain: &str, delta_data: &MapDelta) -> Result<()> {
44        let url = format!("{}/v1/maps/{}/deltas", self.endpoint, domain);
45        let body = delta::serialize_delta(delta_data);
46
47        let hash_hex = hex_encode(&delta_data.base_hash);
48
49        let mut req = self
50            .client
51            .post(&url)
52            .header("Content-Type", "application/octet-stream")
53            .header("X-Cortex-Instance", &self.instance_id)
54            .header("X-Cortex-Base-Hash", &hash_hex)
55            .body(body);
56
57        if let Some(ref key) = self.api_key {
58            req = req.header("Authorization", format!("Bearer {key}"));
59        }
60
61        let resp = req.send().await.context("pushing delta to remote")?;
62        if !resp.status().is_success() {
63            anyhow::bail!(
64                "remote push failed: {} {}",
65                resp.status(),
66                resp.text().await.unwrap_or_default()
67            );
68        }
69
70        Ok(())
71    }
72
73    /// Pull the latest map from the remote registry.
74    pub async fn pull_map(&self, domain: &str) -> Result<Option<SiteMap>> {
75        let url = format!("{}/v1/maps/{}", self.endpoint, domain);
76
77        let resp = self
78            .client
79            .get(&url)
80            .send()
81            .await
82            .context("pulling map from remote")?;
83
84        if resp.status() == reqwest::StatusCode::NOT_FOUND {
85            return Ok(None);
86        }
87
88        if !resp.status().is_success() {
89            anyhow::bail!("remote pull failed: {}", resp.status());
90        }
91
92        let bytes = resp.bytes().await?;
93        let map = SiteMap::deserialize(&bytes)?;
94        Ok(Some(map))
95    }
96
97    /// Pull deltas since a given timestamp.
98    pub async fn pull_since(&self, domain: &str, since: DateTime<Utc>) -> Result<Vec<MapDelta>> {
99        let url = format!(
100            "{}/v1/maps/{}/deltas?since={}",
101            self.endpoint,
102            domain,
103            since.to_rfc3339()
104        );
105
106        let resp = self.client.get(&url).send().await?;
107        if !resp.status().is_success() {
108            return Ok(Vec::new());
109        }
110
111        let bytes = resp.bytes().await?;
112        // Response is a JSON array of deltas
113        let deltas: Vec<MapDelta> = serde_json::from_slice(&bytes).unwrap_or_default();
114        Ok(deltas)
115    }
116
117    /// List all available domains on the remote registry.
118    pub async fn list_available(&self) -> Result<Vec<RemoteEntry>> {
119        let url = format!("{}/v1/maps", self.endpoint);
120        let resp = self.client.get(&url).send().await?;
121
122        if !resp.status().is_success() {
123            return Ok(Vec::new());
124        }
125
126        let entries: Vec<RemoteEntry> = resp.json().await.unwrap_or_default();
127        Ok(entries)
128    }
129}
130
131/// Merkle node for efficient sync between registries.
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct MerkleNode {
134    /// Hash of this node's subtree.
135    pub hash: [u8; 32],
136    /// Alphabetical domain range this node covers.
137    pub domain_range: (String, String),
138    /// Children (None for leaf nodes).
139    pub children: Option<(Box<MerkleNode>, Box<MerkleNode>)>,
140}
141
142impl MerkleNode {
143    /// Build a Merkle tree from a sorted list of (domain, hash) pairs.
144    pub fn build(entries: &[(String, [u8; 32])]) -> Option<Self> {
145        if entries.is_empty() {
146            return None;
147        }
148        if entries.len() == 1 {
149            return Some(MerkleNode {
150                hash: entries[0].1,
151                domain_range: (entries[0].0.clone(), entries[0].0.clone()),
152                children: None,
153            });
154        }
155
156        let mid = entries.len() / 2;
157        let left = Self::build(&entries[..mid]);
158        let right = Self::build(&entries[mid..]);
159
160        match (left, right) {
161            (Some(l), Some(r)) => {
162                let combined_hash = combine_hashes(&l.hash, &r.hash);
163                Some(MerkleNode {
164                    hash: combined_hash,
165                    domain_range: (l.domain_range.0.clone(), r.domain_range.1.clone()),
166                    children: Some((Box::new(l), Box::new(r))),
167                })
168            }
169            (Some(node), None) | (None, Some(node)) => Some(node),
170            (None, None) => None,
171        }
172    }
173
174    /// Find domains that differ between two Merkle trees.
175    pub fn diff(a: &MerkleNode, b: &MerkleNode) -> Vec<String> {
176        if a.hash == b.hash {
177            return Vec::new();
178        }
179
180        match (&a.children, &b.children) {
181            (Some((al, ar)), Some((bl, br))) => {
182                let mut diffs = Self::diff(al, bl);
183                diffs.extend(Self::diff(ar, br));
184                diffs
185            }
186            _ => {
187                // Leaf level — return the domain range
188                vec![a.domain_range.0.clone()]
189            }
190        }
191    }
192}
193
194/// Combine two hashes for Merkle tree internal nodes.
195fn combine_hashes(a: &[u8; 32], b: &[u8; 32]) -> [u8; 32] {
196    let mut result = [0u8; 32];
197    for i in 0..32 {
198        result[i] = a[i] ^ b[i];
199    }
200    // Mix with rotation for better distribution
201    for i in 0..31 {
202        result[i] = result[i].wrapping_add(result[i + 1]);
203    }
204    result
205}
206
207/// Encode bytes as hex string.
208fn hex_encode(bytes: &[u8]) -> String {
209    bytes.iter().map(|b| format!("{b:02x}")).collect()
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_merkle_tree_build() {
218        let entries = vec![
219            ("a.com".to_string(), [1u8; 32]),
220            ("b.com".to_string(), [2u8; 32]),
221            ("c.com".to_string(), [3u8; 32]),
222        ];
223
224        let tree = MerkleNode::build(&entries);
225        assert!(tree.is_some());
226
227        let tree = tree.unwrap();
228        assert_eq!(tree.domain_range.0, "a.com");
229        assert_eq!(tree.domain_range.1, "c.com");
230    }
231
232    #[test]
233    fn test_merkle_diff_identical() {
234        let entries = vec![
235            ("a.com".to_string(), [1u8; 32]),
236            ("b.com".to_string(), [2u8; 32]),
237        ];
238
239        let tree_a = MerkleNode::build(&entries).unwrap();
240        let tree_b = MerkleNode::build(&entries).unwrap();
241
242        let diffs = MerkleNode::diff(&tree_a, &tree_b);
243        assert!(diffs.is_empty());
244    }
245
246    #[test]
247    fn test_merkle_diff_detects_change() {
248        let entries_a = vec![
249            ("a.com".to_string(), [1u8; 32]),
250            ("b.com".to_string(), [2u8; 32]),
251        ];
252        let entries_b = vec![
253            ("a.com".to_string(), [1u8; 32]),
254            ("b.com".to_string(), [9u8; 32]), // changed
255        ];
256
257        let tree_a = MerkleNode::build(&entries_a).unwrap();
258        let tree_b = MerkleNode::build(&entries_b).unwrap();
259
260        let diffs = MerkleNode::diff(&tree_a, &tree_b);
261        assert!(!diffs.is_empty());
262    }
263
264    #[test]
265    fn test_hex_encode() {
266        assert_eq!(hex_encode(&[0xDE, 0xAD]), "dead");
267        assert_eq!(hex_encode(&[0x00, 0xFF]), "00ff");
268    }
269
270    #[test]
271    fn test_merkle_empty() {
272        let tree = MerkleNode::build(&[]);
273        assert!(tree.is_none());
274    }
275
276    #[test]
277    fn test_merkle_single() {
278        let entries = vec![("x.com".to_string(), [42u8; 32])];
279        let tree = MerkleNode::build(&entries).unwrap();
280        assert_eq!(tree.domain_range.0, "x.com");
281        assert!(tree.children.is_none());
282    }
283}