cortex_runtime/collective/
sync.rs1use crate::collective::delta::{self, MapDelta};
6use crate::map::types::SiteMap;
7use anyhow::{Context, Result};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10
11pub struct RemoteSync {
13 endpoint: String,
15 instance_id: String,
17 api_key: Option<String>,
19 client: reqwest::Client,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct RemoteEntry {
26 pub domain: String,
27 pub latest_timestamp: DateTime<Utc>,
28 pub node_count: usize,
29}
30
31impl RemoteSync {
32 pub fn new(endpoint: &str, instance_id: &str, api_key: Option<String>) -> Self {
34 Self {
35 endpoint: endpoint.trim_end_matches('/').to_string(),
36 instance_id: instance_id.to_string(),
37 api_key,
38 client: reqwest::Client::new(),
39 }
40 }
41
42 pub async fn push_delta(&self, domain: &str, delta_data: &MapDelta) -> Result<()> {
44 let url = format!("{}/v1/maps/{}/deltas", self.endpoint, domain);
45 let body = delta::serialize_delta(delta_data);
46
47 let hash_hex = hex_encode(&delta_data.base_hash);
48
49 let mut req = self
50 .client
51 .post(&url)
52 .header("Content-Type", "application/octet-stream")
53 .header("X-Cortex-Instance", &self.instance_id)
54 .header("X-Cortex-Base-Hash", &hash_hex)
55 .body(body);
56
57 if let Some(ref key) = self.api_key {
58 req = req.header("Authorization", format!("Bearer {key}"));
59 }
60
61 let resp = req.send().await.context("pushing delta to remote")?;
62 if !resp.status().is_success() {
63 anyhow::bail!(
64 "remote push failed: {} {}",
65 resp.status(),
66 resp.text().await.unwrap_or_default()
67 );
68 }
69
70 Ok(())
71 }
72
73 pub async fn pull_map(&self, domain: &str) -> Result<Option<SiteMap>> {
75 let url = format!("{}/v1/maps/{}", self.endpoint, domain);
76
77 let resp = self
78 .client
79 .get(&url)
80 .send()
81 .await
82 .context("pulling map from remote")?;
83
84 if resp.status() == reqwest::StatusCode::NOT_FOUND {
85 return Ok(None);
86 }
87
88 if !resp.status().is_success() {
89 anyhow::bail!("remote pull failed: {}", resp.status());
90 }
91
92 let bytes = resp.bytes().await?;
93 let map = SiteMap::deserialize(&bytes)?;
94 Ok(Some(map))
95 }
96
97 pub async fn pull_since(&self, domain: &str, since: DateTime<Utc>) -> Result<Vec<MapDelta>> {
99 let url = format!(
100 "{}/v1/maps/{}/deltas?since={}",
101 self.endpoint,
102 domain,
103 since.to_rfc3339()
104 );
105
106 let resp = self.client.get(&url).send().await?;
107 if !resp.status().is_success() {
108 return Ok(Vec::new());
109 }
110
111 let bytes = resp.bytes().await?;
112 let deltas: Vec<MapDelta> = serde_json::from_slice(&bytes).unwrap_or_default();
114 Ok(deltas)
115 }
116
117 pub async fn list_available(&self) -> Result<Vec<RemoteEntry>> {
119 let url = format!("{}/v1/maps", self.endpoint);
120 let resp = self.client.get(&url).send().await?;
121
122 if !resp.status().is_success() {
123 return Ok(Vec::new());
124 }
125
126 let entries: Vec<RemoteEntry> = resp.json().await.unwrap_or_default();
127 Ok(entries)
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct MerkleNode {
134 pub hash: [u8; 32],
136 pub domain_range: (String, String),
138 pub children: Option<(Box<MerkleNode>, Box<MerkleNode>)>,
140}
141
142impl MerkleNode {
143 pub fn build(entries: &[(String, [u8; 32])]) -> Option<Self> {
145 if entries.is_empty() {
146 return None;
147 }
148 if entries.len() == 1 {
149 return Some(MerkleNode {
150 hash: entries[0].1,
151 domain_range: (entries[0].0.clone(), entries[0].0.clone()),
152 children: None,
153 });
154 }
155
156 let mid = entries.len() / 2;
157 let left = Self::build(&entries[..mid]);
158 let right = Self::build(&entries[mid..]);
159
160 match (left, right) {
161 (Some(l), Some(r)) => {
162 let combined_hash = combine_hashes(&l.hash, &r.hash);
163 Some(MerkleNode {
164 hash: combined_hash,
165 domain_range: (l.domain_range.0.clone(), r.domain_range.1.clone()),
166 children: Some((Box::new(l), Box::new(r))),
167 })
168 }
169 (Some(node), None) | (None, Some(node)) => Some(node),
170 (None, None) => None,
171 }
172 }
173
174 pub fn diff(a: &MerkleNode, b: &MerkleNode) -> Vec<String> {
176 if a.hash == b.hash {
177 return Vec::new();
178 }
179
180 match (&a.children, &b.children) {
181 (Some((al, ar)), Some((bl, br))) => {
182 let mut diffs = Self::diff(al, bl);
183 diffs.extend(Self::diff(ar, br));
184 diffs
185 }
186 _ => {
187 vec![a.domain_range.0.clone()]
189 }
190 }
191 }
192}
193
194fn combine_hashes(a: &[u8; 32], b: &[u8; 32]) -> [u8; 32] {
196 let mut result = [0u8; 32];
197 for i in 0..32 {
198 result[i] = a[i] ^ b[i];
199 }
200 for i in 0..31 {
202 result[i] = result[i].wrapping_add(result[i + 1]);
203 }
204 result
205}
206
207fn hex_encode(bytes: &[u8]) -> String {
209 bytes.iter().map(|b| format!("{b:02x}")).collect()
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_merkle_tree_build() {
218 let entries = vec![
219 ("a.com".to_string(), [1u8; 32]),
220 ("b.com".to_string(), [2u8; 32]),
221 ("c.com".to_string(), [3u8; 32]),
222 ];
223
224 let tree = MerkleNode::build(&entries);
225 assert!(tree.is_some());
226
227 let tree = tree.unwrap();
228 assert_eq!(tree.domain_range.0, "a.com");
229 assert_eq!(tree.domain_range.1, "c.com");
230 }
231
232 #[test]
233 fn test_merkle_diff_identical() {
234 let entries = vec![
235 ("a.com".to_string(), [1u8; 32]),
236 ("b.com".to_string(), [2u8; 32]),
237 ];
238
239 let tree_a = MerkleNode::build(&entries).unwrap();
240 let tree_b = MerkleNode::build(&entries).unwrap();
241
242 let diffs = MerkleNode::diff(&tree_a, &tree_b);
243 assert!(diffs.is_empty());
244 }
245
246 #[test]
247 fn test_merkle_diff_detects_change() {
248 let entries_a = vec![
249 ("a.com".to_string(), [1u8; 32]),
250 ("b.com".to_string(), [2u8; 32]),
251 ];
252 let entries_b = vec![
253 ("a.com".to_string(), [1u8; 32]),
254 ("b.com".to_string(), [9u8; 32]), ];
256
257 let tree_a = MerkleNode::build(&entries_a).unwrap();
258 let tree_b = MerkleNode::build(&entries_b).unwrap();
259
260 let diffs = MerkleNode::diff(&tree_a, &tree_b);
261 assert!(!diffs.is_empty());
262 }
263
264 #[test]
265 fn test_hex_encode() {
266 assert_eq!(hex_encode(&[0xDE, 0xAD]), "dead");
267 assert_eq!(hex_encode(&[0x00, 0xFF]), "00ff");
268 }
269
270 #[test]
271 fn test_merkle_empty() {
272 let tree = MerkleNode::build(&[]);
273 assert!(tree.is_none());
274 }
275
276 #[test]
277 fn test_merkle_single() {
278 let entries = vec![("x.com".to_string(), [42u8; 32])];
279 let tree = MerkleNode::build(&entries).unwrap();
280 assert_eq!(tree.domain_range.0, "x.com");
281 assert!(tree.children.is_none());
282 }
283}