cortex_runtime/collective/
registry.rs1use crate::collective::delta::{self, MapDelta};
6use crate::map::types::SiteMap;
7use anyhow::{Context, Result};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::path::PathBuf;
12
13pub struct LocalRegistry {
15 storage_dir: PathBuf,
17 index: HashMap<String, RegistryEntry>,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct RegistryEntry {
24 pub domain: String,
26 pub latest_hash: [u8; 32],
28 pub latest_timestamp: DateTime<Utc>,
30 pub snapshot_path: PathBuf,
32 pub deltas: Vec<DeltaRef>,
34 pub contributed_by: Vec<String>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct DeltaRef {
41 pub timestamp: DateTime<Utc>,
43 pub path: PathBuf,
45 pub base_hash: [u8; 32],
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RegistryStats {
52 pub domain_count: usize,
54 pub total_snapshot_bytes: u64,
56 pub total_deltas: usize,
58}
59
60impl LocalRegistry {
61 pub fn new(storage_dir: PathBuf) -> Result<Self> {
63 std::fs::create_dir_all(&storage_dir)
64 .with_context(|| format!("creating registry dir: {}", storage_dir.display()))?;
65
66 let mut registry = Self {
67 storage_dir: storage_dir.clone(),
68 index: HashMap::new(),
69 };
70
71 let index_path = storage_dir.join("index.json");
73 if index_path.exists() {
74 let data = std::fs::read_to_string(&index_path)?;
75 registry.index = serde_json::from_str(&data).unwrap_or_default();
76 }
77
78 Ok(registry)
79 }
80
81 pub fn push(&mut self, domain: &str, map: &SiteMap, delta: Option<MapDelta>) -> Result<()> {
83 let domain_dir = self.storage_dir.join(domain.replace('.', "_"));
84 std::fs::create_dir_all(&domain_dir)?;
85
86 let snapshot_path = domain_dir.join("snapshot.ctx");
88 let data = map.serialize();
89 std::fs::write(&snapshot_path, &data)?;
90
91 let hash = delta::hash_map(map);
92
93 let mut deltas = self
95 .index
96 .get(domain)
97 .map(|e| e.deltas.clone())
98 .unwrap_or_default();
99
100 if let Some(d) = delta {
101 let delta_filename = format!("delta_{}.json", d.timestamp.format("%Y%m%d_%H%M%S"));
102 let delta_path = domain_dir.join(&delta_filename);
103 let delta_bytes = delta::serialize_delta(&d);
104 std::fs::write(&delta_path, &delta_bytes)?;
105
106 deltas.push(DeltaRef {
107 timestamp: d.timestamp,
108 path: delta_path,
109 base_hash: d.base_hash,
110 });
111 }
112
113 let contributed_by = self
114 .index
115 .get(domain)
116 .map(|e| e.contributed_by.clone())
117 .unwrap_or_default();
118
119 self.index.insert(
120 domain.to_string(),
121 RegistryEntry {
122 domain: domain.to_string(),
123 latest_hash: hash,
124 latest_timestamp: Utc::now(),
125 snapshot_path,
126 deltas,
127 contributed_by,
128 },
129 );
130
131 self.save_index()?;
132 Ok(())
133 }
134
135 pub fn pull(&self, domain: &str) -> Result<Option<(SiteMap, DateTime<Utc>)>> {
137 let entry = match self.index.get(domain) {
138 Some(e) => e,
139 None => return Ok(None),
140 };
141
142 if !entry.snapshot_path.exists() {
143 return Ok(None);
144 }
145
146 let data = std::fs::read(&entry.snapshot_path)?;
147 let map = SiteMap::deserialize(&data)?;
148
149 Ok(Some((map, entry.latest_timestamp)))
150 }
151
152 pub fn pull_since(&self, domain: &str, since: DateTime<Utc>) -> Result<Option<Vec<MapDelta>>> {
154 let entry = match self.index.get(domain) {
155 Some(e) => e,
156 None => return Ok(None),
157 };
158
159 let mut deltas = Vec::new();
160 for delta_ref in &entry.deltas {
161 if delta_ref.timestamp > since && delta_ref.path.exists() {
162 let bytes = std::fs::read(&delta_ref.path)?;
163 let d = delta::deserialize_delta(&bytes)?;
164 deltas.push(d);
165 }
166 }
167
168 Ok(Some(deltas))
169 }
170
171 pub fn list(&self) -> Vec<&RegistryEntry> {
173 self.index.values().collect()
174 }
175
176 pub fn stats(&self) -> RegistryStats {
178 let total_snapshot_bytes: u64 = self
179 .index
180 .values()
181 .map(|e| {
182 std::fs::metadata(&e.snapshot_path)
183 .map(|m| m.len())
184 .unwrap_or(0)
185 })
186 .sum();
187
188 let total_deltas: usize = self.index.values().map(|e| e.deltas.len()).sum();
189
190 RegistryStats {
191 domain_count: self.index.len(),
192 total_snapshot_bytes,
193 total_deltas,
194 }
195 }
196
197 pub fn gc(&mut self, keep_count: usize) -> Result<usize> {
199 let mut removed = 0;
200 for entry in self.index.values_mut() {
201 while entry.deltas.len() > keep_count {
202 let old = entry.deltas.remove(0);
203 if old.path.exists() {
204 let _ = std::fs::remove_file(&old.path);
205 removed += 1;
206 }
207 }
208 }
209 self.save_index()?;
210 Ok(removed)
211 }
212
213 fn save_index(&self) -> Result<()> {
215 let index_path = self.storage_dir.join("index.json");
216 let data = serde_json::to_string_pretty(&self.index)?;
217 std::fs::write(index_path, data)?;
218 Ok(())
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use crate::map::builder::SiteMapBuilder;
226 use tempfile::TempDir;
227
228 fn build_test_map(domain: &str) -> SiteMap {
229 let mut builder = SiteMapBuilder::new(domain);
230 let feats = [0.0f32; 128];
231 builder.add_node(
232 &format!("https://{domain}/"),
233 crate::map::types::PageType::Home,
234 feats,
235 200,
236 );
237 builder.build()
238 }
239
240 #[test]
241 fn test_registry_push_pull() {
242 let dir = TempDir::new().unwrap();
243 let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
244
245 let map = build_test_map("test.com");
246 registry.push("test.com", &map, None).unwrap();
247
248 let result = registry.pull("test.com").unwrap();
249 assert!(result.is_some());
250
251 let (pulled_map, _ts) = result.unwrap();
252 assert_eq!(pulled_map.header.domain, "test.com");
253 }
254
255 #[test]
256 fn test_registry_pull_nonexistent() {
257 let dir = TempDir::new().unwrap();
258 let registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
259 assert!(registry.pull("nope.com").unwrap().is_none());
260 }
261
262 #[test]
263 fn test_registry_list() {
264 let dir = TempDir::new().unwrap();
265 let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
266
267 registry
268 .push("a.com", &build_test_map("a.com"), None)
269 .unwrap();
270 registry
271 .push("b.com", &build_test_map("b.com"), None)
272 .unwrap();
273
274 assert_eq!(registry.list().len(), 2);
275 }
276
277 #[test]
278 fn test_registry_stats() {
279 let dir = TempDir::new().unwrap();
280 let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
281
282 registry
283 .push("test.com", &build_test_map("test.com"), None)
284 .unwrap();
285
286 let stats = registry.stats();
287 assert_eq!(stats.domain_count, 1);
288 assert!(stats.total_snapshot_bytes > 0);
289 }
290
291 fn build_product_map(domain: &str, count: usize) -> SiteMap {
294 let mut builder = SiteMapBuilder::new(domain);
295 for i in 0..count {
296 let mut feats = [0.0f32; 128];
297 feats[48] = 50.0 + i as f32 * 10.0; builder.add_node(
299 &format!("https://{domain}/p/{i}"),
300 crate::map::types::PageType::ProductDetail,
301 feats,
302 200,
303 );
304 }
305 builder.build()
306 }
307
308 #[test]
309 fn test_v4_registry_push_pull_round_trip() {
310 let dir = TempDir::new().unwrap();
311 let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
312
313 let map = build_product_map("shop.com", 20);
314 registry.push("shop.com", &map, None).unwrap();
315
316 let (pulled, _ts) = registry.pull("shop.com").unwrap().unwrap();
317 assert_eq!(
318 pulled.nodes.len(),
319 map.nodes.len(),
320 "pulled map should have same node count"
321 );
322 assert_eq!(pulled.edges.len(), map.edges.len());
323 assert_eq!(pulled.features.len(), map.features.len());
324
325 assert_eq!(
327 pulled.features[0][48], map.features[0][48],
328 "price should be preserved"
329 );
330 }
331
332 #[test]
333 fn test_v4_registry_push_with_delta() {
334 let dir = TempDir::new().unwrap();
335 let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
336
337 let map1 = build_product_map("shop.com", 5);
338 registry.push("shop.com", &map1, None).unwrap();
339
340 let map2 = build_product_map("shop.com", 7);
341 let delta = crate::collective::delta::compute_delta(&map1, &map2, "test");
342 registry.push("shop.com", &map2, Some(delta)).unwrap();
343
344 let (pulled, _ts) = registry.pull("shop.com").unwrap().unwrap();
346 assert_eq!(pulled.nodes.len(), 7);
347 }
348
349 #[test]
350 fn test_v4_registry_multiple_domains() {
351 let dir = TempDir::new().unwrap();
352 let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
353
354 for domain in &["a.com", "b.com", "c.com", "d.com", "e.com"] {
355 registry
356 .push(domain, &build_test_map(domain), None)
357 .unwrap();
358 }
359
360 assert_eq!(registry.list().len(), 5);
361
362 let stats = registry.stats();
363 assert_eq!(stats.domain_count, 5);
364 }
365
366 #[test]
367 fn test_v4_registry_gc() {
368 let dir = TempDir::new().unwrap();
369 let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
370
371 registry
372 .push("old.com", &build_test_map("old.com"), None)
373 .unwrap();
374 registry
375 .push("new.com", &build_test_map("new.com"), None)
376 .unwrap();
377
378 let cleaned = registry.gc(1).unwrap();
380 let _ = cleaned;
382 }
383
384 #[test]
385 fn test_v4_registry_pull_since() {
386 let dir = TempDir::new().unwrap();
387 let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
388
389 let map = build_test_map("test.com");
390 let delta = crate::collective::delta::MapDelta {
391 domain: "test.com".to_string(),
392 base_hash: [0u8; 32],
393 timestamp: chrono::Utc::now(),
394 cortex_instance_id: "test".to_string(),
395 nodes_added: vec![],
396 nodes_removed: vec![],
397 nodes_modified: vec![],
398 edges_added: vec![],
399 edges_removed: vec![],
400 schema_delta: None,
401 };
402
403 registry.push("test.com", &map, Some(delta)).unwrap();
404
405 let since = chrono::Utc::now() - chrono::Duration::hours(1);
406 let deltas = registry.pull_since("test.com", since).unwrap();
407 assert!(deltas.is_some());
408 }
409}