Skip to main content

cortex_runtime/collective/
registry.rs

1//! Local map registry — stores and serves map snapshots + deltas.
2//!
3//! Provides push/pull semantics for sharing maps between Cortex operations.
4
5use crate::collective::delta::{self, MapDelta};
6use crate::map::types::SiteMap;
7use anyhow::{Context, Result};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::path::PathBuf;
12
13/// Local registry that stores map snapshots and deltas.
14pub struct LocalRegistry {
15    /// Storage directory (e.g., ~/.cortex/registry/).
16    storage_dir: PathBuf,
17    /// In-memory index: domain → entry.
18    index: HashMap<String, RegistryEntry>,
19}
20
21/// A registry entry for a single domain.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct RegistryEntry {
24    /// Domain name.
25    pub domain: String,
26    /// Hash of the latest map version.
27    pub latest_hash: [u8; 32],
28    /// When the latest version was stored.
29    pub latest_timestamp: DateTime<Utc>,
30    /// Path to the full map snapshot on disk.
31    pub snapshot_path: PathBuf,
32    /// Ordered list of delta references.
33    pub deltas: Vec<DeltaRef>,
34    /// Instance IDs that contributed to this entry.
35    pub contributed_by: Vec<String>,
36}
37
38/// Reference to a stored delta.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct DeltaRef {
41    /// When this delta was created.
42    pub timestamp: DateTime<Utc>,
43    /// Path to the delta file.
44    pub path: PathBuf,
45    /// Base hash this delta applies to.
46    pub base_hash: [u8; 32],
47}
48
49/// Registry-wide statistics.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RegistryStats {
52    /// Number of domains in the registry.
53    pub domain_count: usize,
54    /// Total size of all snapshots on disk (bytes).
55    pub total_snapshot_bytes: u64,
56    /// Total number of deltas across all domains.
57    pub total_deltas: usize,
58}
59
60impl LocalRegistry {
61    /// Create or open a local registry at the given directory.
62    pub fn new(storage_dir: PathBuf) -> Result<Self> {
63        std::fs::create_dir_all(&storage_dir)
64            .with_context(|| format!("creating registry dir: {}", storage_dir.display()))?;
65
66        let mut registry = Self {
67            storage_dir: storage_dir.clone(),
68            index: HashMap::new(),
69        };
70
71        // Load existing index if present
72        let index_path = storage_dir.join("index.json");
73        if index_path.exists() {
74            let data = std::fs::read_to_string(&index_path)?;
75            registry.index = serde_json::from_str(&data).unwrap_or_default();
76        }
77
78        Ok(registry)
79    }
80
81    /// Push a map (and optional delta) to the registry.
82    pub fn push(&mut self, domain: &str, map: &SiteMap, delta: Option<MapDelta>) -> Result<()> {
83        let domain_dir = self.storage_dir.join(domain.replace('.', "_"));
84        std::fs::create_dir_all(&domain_dir)?;
85
86        // Serialize and save snapshot
87        let snapshot_path = domain_dir.join("snapshot.ctx");
88        let data = map.serialize();
89        std::fs::write(&snapshot_path, &data)?;
90
91        let hash = delta::hash_map(map);
92
93        // Save delta if provided
94        let mut deltas = self
95            .index
96            .get(domain)
97            .map(|e| e.deltas.clone())
98            .unwrap_or_default();
99
100        if let Some(d) = delta {
101            let delta_filename = format!("delta_{}.json", d.timestamp.format("%Y%m%d_%H%M%S"));
102            let delta_path = domain_dir.join(&delta_filename);
103            let delta_bytes = delta::serialize_delta(&d);
104            std::fs::write(&delta_path, &delta_bytes)?;
105
106            deltas.push(DeltaRef {
107                timestamp: d.timestamp,
108                path: delta_path,
109                base_hash: d.base_hash,
110            });
111        }
112
113        let contributed_by = self
114            .index
115            .get(domain)
116            .map(|e| e.contributed_by.clone())
117            .unwrap_or_default();
118
119        self.index.insert(
120            domain.to_string(),
121            RegistryEntry {
122                domain: domain.to_string(),
123                latest_hash: hash,
124                latest_timestamp: Utc::now(),
125                snapshot_path,
126                deltas,
127                contributed_by,
128            },
129        );
130
131        self.save_index()?;
132        Ok(())
133    }
134
135    /// Pull the latest map for a domain.
136    pub fn pull(&self, domain: &str) -> Result<Option<(SiteMap, DateTime<Utc>)>> {
137        let entry = match self.index.get(domain) {
138            Some(e) => e,
139            None => return Ok(None),
140        };
141
142        if !entry.snapshot_path.exists() {
143            return Ok(None);
144        }
145
146        let data = std::fs::read(&entry.snapshot_path)?;
147        let map = SiteMap::deserialize(&data)?;
148
149        Ok(Some((map, entry.latest_timestamp)))
150    }
151
152    /// Pull only deltas since a given timestamp.
153    pub fn pull_since(&self, domain: &str, since: DateTime<Utc>) -> Result<Option<Vec<MapDelta>>> {
154        let entry = match self.index.get(domain) {
155            Some(e) => e,
156            None => return Ok(None),
157        };
158
159        let mut deltas = Vec::new();
160        for delta_ref in &entry.deltas {
161            if delta_ref.timestamp > since && delta_ref.path.exists() {
162                let bytes = std::fs::read(&delta_ref.path)?;
163                let d = delta::deserialize_delta(&bytes)?;
164                deltas.push(d);
165            }
166        }
167
168        Ok(Some(deltas))
169    }
170
171    /// List all entries in the registry.
172    pub fn list(&self) -> Vec<&RegistryEntry> {
173        self.index.values().collect()
174    }
175
176    /// Get registry statistics.
177    pub fn stats(&self) -> RegistryStats {
178        let total_snapshot_bytes: u64 = self
179            .index
180            .values()
181            .map(|e| {
182                std::fs::metadata(&e.snapshot_path)
183                    .map(|m| m.len())
184                    .unwrap_or(0)
185            })
186            .sum();
187
188        let total_deltas: usize = self.index.values().map(|e| e.deltas.len()).sum();
189
190        RegistryStats {
191            domain_count: self.index.len(),
192            total_snapshot_bytes,
193            total_deltas,
194        }
195    }
196
197    /// Garbage collect old deltas (keep only the last N per domain).
198    pub fn gc(&mut self, keep_count: usize) -> Result<usize> {
199        let mut removed = 0;
200        for entry in self.index.values_mut() {
201            while entry.deltas.len() > keep_count {
202                let old = entry.deltas.remove(0);
203                if old.path.exists() {
204                    let _ = std::fs::remove_file(&old.path);
205                    removed += 1;
206                }
207            }
208        }
209        self.save_index()?;
210        Ok(removed)
211    }
212
213    /// Save the index to disk.
214    fn save_index(&self) -> Result<()> {
215        let index_path = self.storage_dir.join("index.json");
216        let data = serde_json::to_string_pretty(&self.index)?;
217        std::fs::write(index_path, data)?;
218        Ok(())
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use crate::map::builder::SiteMapBuilder;
226    use tempfile::TempDir;
227
228    fn build_test_map(domain: &str) -> SiteMap {
229        let mut builder = SiteMapBuilder::new(domain);
230        let feats = [0.0f32; 128];
231        builder.add_node(
232            &format!("https://{domain}/"),
233            crate::map::types::PageType::Home,
234            feats,
235            200,
236        );
237        builder.build()
238    }
239
240    #[test]
241    fn test_registry_push_pull() {
242        let dir = TempDir::new().unwrap();
243        let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
244
245        let map = build_test_map("test.com");
246        registry.push("test.com", &map, None).unwrap();
247
248        let result = registry.pull("test.com").unwrap();
249        assert!(result.is_some());
250
251        let (pulled_map, _ts) = result.unwrap();
252        assert_eq!(pulled_map.header.domain, "test.com");
253    }
254
255    #[test]
256    fn test_registry_pull_nonexistent() {
257        let dir = TempDir::new().unwrap();
258        let registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
259        assert!(registry.pull("nope.com").unwrap().is_none());
260    }
261
262    #[test]
263    fn test_registry_list() {
264        let dir = TempDir::new().unwrap();
265        let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
266
267        registry
268            .push("a.com", &build_test_map("a.com"), None)
269            .unwrap();
270        registry
271            .push("b.com", &build_test_map("b.com"), None)
272            .unwrap();
273
274        assert_eq!(registry.list().len(), 2);
275    }
276
277    #[test]
278    fn test_registry_stats() {
279        let dir = TempDir::new().unwrap();
280        let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
281
282        registry
283            .push("test.com", &build_test_map("test.com"), None)
284            .unwrap();
285
286        let stats = registry.stats();
287        assert_eq!(stats.domain_count, 1);
288        assert!(stats.total_snapshot_bytes > 0);
289    }
290
291    // ── v4 Test Suite: Phase 2B — Registry Push/Pull ──
292
293    fn build_product_map(domain: &str, count: usize) -> SiteMap {
294        let mut builder = SiteMapBuilder::new(domain);
295        for i in 0..count {
296            let mut feats = [0.0f32; 128];
297            feats[48] = 50.0 + i as f32 * 10.0; // price
298            builder.add_node(
299                &format!("https://{domain}/p/{i}"),
300                crate::map::types::PageType::ProductDetail,
301                feats,
302                200,
303            );
304        }
305        builder.build()
306    }
307
308    #[test]
309    fn test_v4_registry_push_pull_round_trip() {
310        let dir = TempDir::new().unwrap();
311        let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
312
313        let map = build_product_map("shop.com", 20);
314        registry.push("shop.com", &map, None).unwrap();
315
316        let (pulled, _ts) = registry.pull("shop.com").unwrap().unwrap();
317        assert_eq!(
318            pulled.nodes.len(),
319            map.nodes.len(),
320            "pulled map should have same node count"
321        );
322        assert_eq!(pulled.edges.len(), map.edges.len());
323        assert_eq!(pulled.features.len(), map.features.len());
324
325        // Verify feature data preserved
326        assert_eq!(
327            pulled.features[0][48], map.features[0][48],
328            "price should be preserved"
329        );
330    }
331
332    #[test]
333    fn test_v4_registry_push_with_delta() {
334        let dir = TempDir::new().unwrap();
335        let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
336
337        let map1 = build_product_map("shop.com", 5);
338        registry.push("shop.com", &map1, None).unwrap();
339
340        let map2 = build_product_map("shop.com", 7);
341        let delta = crate::collective::delta::compute_delta(&map1, &map2, "test");
342        registry.push("shop.com", &map2, Some(delta)).unwrap();
343
344        // Should have updated map
345        let (pulled, _ts) = registry.pull("shop.com").unwrap().unwrap();
346        assert_eq!(pulled.nodes.len(), 7);
347    }
348
349    #[test]
350    fn test_v4_registry_multiple_domains() {
351        let dir = TempDir::new().unwrap();
352        let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
353
354        for domain in &["a.com", "b.com", "c.com", "d.com", "e.com"] {
355            registry
356                .push(domain, &build_test_map(domain), None)
357                .unwrap();
358        }
359
360        assert_eq!(registry.list().len(), 5);
361
362        let stats = registry.stats();
363        assert_eq!(stats.domain_count, 5);
364    }
365
366    #[test]
367    fn test_v4_registry_gc() {
368        let dir = TempDir::new().unwrap();
369        let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
370
371        registry
372            .push("old.com", &build_test_map("old.com"), None)
373            .unwrap();
374        registry
375            .push("new.com", &build_test_map("new.com"), None)
376            .unwrap();
377
378        // GC should work without errors (keep only 1 delta per domain)
379        let cleaned = registry.gc(1).unwrap();
380        // GC returns a count — any non-negative value is valid
381        let _ = cleaned;
382    }
383
384    #[test]
385    fn test_v4_registry_pull_since() {
386        let dir = TempDir::new().unwrap();
387        let mut registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
388
389        let map = build_test_map("test.com");
390        let delta = crate::collective::delta::MapDelta {
391            domain: "test.com".to_string(),
392            base_hash: [0u8; 32],
393            timestamp: chrono::Utc::now(),
394            cortex_instance_id: "test".to_string(),
395            nodes_added: vec![],
396            nodes_removed: vec![],
397            nodes_modified: vec![],
398            edges_added: vec![],
399            edges_removed: vec![],
400            schema_delta: None,
401        };
402
403        registry.push("test.com", &map, Some(delta)).unwrap();
404
405        let since = chrono::Utc::now() - chrono::Duration::hours(1);
406        let deltas = registry.pull_since("test.com", since).unwrap();
407        assert!(deltas.is_some());
408    }
409}