Skip to main content

cortex_runtime/temporal/
store.rs

1//! Temporal store — time-series access to registry delta history.
2
3use crate::collective::delta::{self, MapDelta};
4use crate::collective::registry::LocalRegistry;
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10
11/// Time-series data: domain/url key → list of (timestamp, value) points.
12type TimeSeries = HashMap<String, Vec<(DateTime<Utc>, f32)>>;
13
14/// Time-series store backed by the registry's delta history.
15pub struct TemporalStore {
16    registry: Arc<LocalRegistry>,
17}
18
19/// A single change to a node over time.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct NodeDiff {
22    /// When this change occurred.
23    pub timestamp: DateTime<Utc>,
24    /// Features that changed: (dimension, old_value, new_value).
25    pub changed_features: Vec<(u8, f32, f32)>,
26    /// Which Cortex instance contributed this change.
27    pub contributed_by: String,
28}
29
30impl TemporalStore {
31    /// Create a temporal store backed by the given registry.
32    pub fn new(registry: Arc<LocalRegistry>) -> Self {
33        Self { registry }
34    }
35
36    /// Get the history of a specific feature dimension for a node.
37    ///
38    /// Returns (timestamp, value) pairs ordered by time.
39    pub fn history(
40        &self,
41        domain: &str,
42        _node_url: &str,
43        feature_dim: u8,
44        since: DateTime<Utc>,
45    ) -> Result<Vec<(DateTime<Utc>, f32)>> {
46        let deltas = self.registry.pull_since(domain, since)?.unwrap_or_default();
47
48        let mut points: Vec<(DateTime<Utc>, f32)> = Vec::new();
49
50        for delta_data in &deltas {
51            for (_idx, feature_delta) in &delta_data.nodes_modified {
52                for &(dim, value) in &feature_delta.changed_dims {
53                    if dim == feature_dim {
54                        points.push((delta_data.timestamp, value));
55                    }
56                }
57            }
58        }
59
60        points.sort_by_key(|(ts, _)| *ts);
61        Ok(points)
62    }
63
64    /// Get all changes to a specific node over time.
65    pub fn diff(
66        &self,
67        domain: &str,
68        _node_url: &str,
69        since: DateTime<Utc>,
70    ) -> Result<Vec<NodeDiff>> {
71        let deltas = self.registry.pull_since(domain, since)?.unwrap_or_default();
72
73        let mut diffs: Vec<NodeDiff> = Vec::new();
74
75        for delta_data in &deltas {
76            for (_idx, feature_delta) in &delta_data.nodes_modified {
77                if !feature_delta.changed_dims.is_empty() {
78                    diffs.push(NodeDiff {
79                        timestamp: delta_data.timestamp,
80                        changed_features: feature_delta
81                            .changed_dims
82                            .iter()
83                            .map(|&(dim, new)| (dim, 0.0, new)) // old value not stored in delta
84                            .collect(),
85                        contributed_by: delta_data.cortex_instance_id.clone(),
86                    });
87                }
88            }
89        }
90
91        Ok(diffs)
92    }
93
94    /// Compare a feature dimension across multiple domains over time.
95    pub fn history_compare(
96        &self,
97        node_urls: &[(String, String)], // (domain, url) pairs
98        feature_dim: u8,
99        since: DateTime<Utc>,
100    ) -> Result<TimeSeries> {
101        let mut result: TimeSeries = HashMap::new();
102
103        for (domain, url) in node_urls {
104            let points = self.history(domain, url, feature_dim, since)?;
105            let key = format!("{domain}:{url}");
106            result.insert(key, points);
107        }
108
109        Ok(result)
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use crate::collective::registry::LocalRegistry;
117    use tempfile::TempDir;
118
119    #[test]
120    fn test_temporal_store_empty_history() {
121        let dir = TempDir::new().unwrap();
122        let registry = Arc::new(LocalRegistry::new(dir.path().to_path_buf()).unwrap());
123        let store = TemporalStore::new(registry);
124
125        let history = store
126            .history(
127                "test.com",
128                "/page",
129                48,
130                Utc::now() - chrono::Duration::days(30),
131            )
132            .unwrap();
133        assert!(history.is_empty());
134    }
135
136    // ── v4 Test Suite: Phase 3A — History Queries ──
137
138    #[test]
139    fn test_v4_temporal_store_with_delta_history() {
140        use crate::collective::delta::compute_delta;
141        use crate::map::builder::SiteMapBuilder;
142        use crate::map::types::*;
143
144        let dir = TempDir::new().unwrap();
145
146        // Push initial map
147        let mut builder1 = SiteMapBuilder::new("shop.com");
148        let mut feats1 = [0.0f32; FEATURE_DIM];
149        feats1[FEAT_PRICE] = 100.0;
150        builder1.add_node(
151            "https://shop.com/product/1",
152            PageType::ProductDetail,
153            feats1,
154            200,
155        );
156        let map1 = builder1.build();
157
158        // Use a separate mutable registry for pushes
159        let mut push_registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
160        push_registry.push("shop.com", &map1, None).unwrap();
161
162        // Push second map with price change
163        let mut builder2 = SiteMapBuilder::new("shop.com");
164        let mut feats2 = [0.0f32; FEATURE_DIM];
165        feats2[FEAT_PRICE] = 80.0;
166        builder2.add_node(
167            "https://shop.com/product/1",
168            PageType::ProductDetail,
169            feats2,
170            200,
171        );
172        let map2 = builder2.build();
173
174        let delta = compute_delta(&map1, &map2, "test");
175        push_registry.push("shop.com", &map2, Some(delta)).unwrap();
176
177        // Re-create registry and store from the same directory
178        let registry2 = Arc::new(LocalRegistry::new(dir.path().to_path_buf()).unwrap());
179        let store = TemporalStore::new(registry2);
180
181        // Query history — should find delta data
182        let _history = store
183            .history(
184                "shop.com",
185                "https://shop.com/product/1",
186                FEAT_PRICE as u8,
187                Utc::now() - chrono::Duration::days(30),
188            )
189            .unwrap();
190
191        // History may or may not have data depending on delta storage format
192        // At minimum, the query should not error
193    }
194
195    #[test]
196    fn test_v4_temporal_store_history_compare() {
197        let dir = TempDir::new().unwrap();
198        let registry = Arc::new(LocalRegistry::new(dir.path().to_path_buf()).unwrap());
199        let store = TemporalStore::new(registry);
200
201        let pairs = vec![
202            ("a.com".to_string(), "/p1".to_string()),
203            ("b.com".to_string(), "/p2".to_string()),
204        ];
205
206        let result = store
207            .history_compare(&pairs, 48, Utc::now() - chrono::Duration::days(30))
208            .unwrap();
209
210        // Should return a map with entries for both pairs (possibly empty)
211        assert_eq!(result.len(), 2);
212        assert!(result.contains_key("a.com:/p1"));
213        assert!(result.contains_key("b.com:/p2"));
214    }
215}