cortex_runtime/temporal/
store.rs1use crate::collective::delta::{self, MapDelta};
4use crate::collective::registry::LocalRegistry;
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10
11type TimeSeries = HashMap<String, Vec<(DateTime<Utc>, f32)>>;
13
14pub struct TemporalStore {
16 registry: Arc<LocalRegistry>,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct NodeDiff {
22 pub timestamp: DateTime<Utc>,
24 pub changed_features: Vec<(u8, f32, f32)>,
26 pub contributed_by: String,
28}
29
30impl TemporalStore {
31 pub fn new(registry: Arc<LocalRegistry>) -> Self {
33 Self { registry }
34 }
35
36 pub fn history(
40 &self,
41 domain: &str,
42 _node_url: &str,
43 feature_dim: u8,
44 since: DateTime<Utc>,
45 ) -> Result<Vec<(DateTime<Utc>, f32)>> {
46 let deltas = self.registry.pull_since(domain, since)?.unwrap_or_default();
47
48 let mut points: Vec<(DateTime<Utc>, f32)> = Vec::new();
49
50 for delta_data in &deltas {
51 for (_idx, feature_delta) in &delta_data.nodes_modified {
52 for &(dim, value) in &feature_delta.changed_dims {
53 if dim == feature_dim {
54 points.push((delta_data.timestamp, value));
55 }
56 }
57 }
58 }
59
60 points.sort_by_key(|(ts, _)| *ts);
61 Ok(points)
62 }
63
64 pub fn diff(
66 &self,
67 domain: &str,
68 _node_url: &str,
69 since: DateTime<Utc>,
70 ) -> Result<Vec<NodeDiff>> {
71 let deltas = self.registry.pull_since(domain, since)?.unwrap_or_default();
72
73 let mut diffs: Vec<NodeDiff> = Vec::new();
74
75 for delta_data in &deltas {
76 for (_idx, feature_delta) in &delta_data.nodes_modified {
77 if !feature_delta.changed_dims.is_empty() {
78 diffs.push(NodeDiff {
79 timestamp: delta_data.timestamp,
80 changed_features: feature_delta
81 .changed_dims
82 .iter()
83 .map(|&(dim, new)| (dim, 0.0, new)) .collect(),
85 contributed_by: delta_data.cortex_instance_id.clone(),
86 });
87 }
88 }
89 }
90
91 Ok(diffs)
92 }
93
94 pub fn history_compare(
96 &self,
97 node_urls: &[(String, String)], feature_dim: u8,
99 since: DateTime<Utc>,
100 ) -> Result<TimeSeries> {
101 let mut result: TimeSeries = HashMap::new();
102
103 for (domain, url) in node_urls {
104 let points = self.history(domain, url, feature_dim, since)?;
105 let key = format!("{domain}:{url}");
106 result.insert(key, points);
107 }
108
109 Ok(result)
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use crate::collective::registry::LocalRegistry;
117 use tempfile::TempDir;
118
119 #[test]
120 fn test_temporal_store_empty_history() {
121 let dir = TempDir::new().unwrap();
122 let registry = Arc::new(LocalRegistry::new(dir.path().to_path_buf()).unwrap());
123 let store = TemporalStore::new(registry);
124
125 let history = store
126 .history(
127 "test.com",
128 "/page",
129 48,
130 Utc::now() - chrono::Duration::days(30),
131 )
132 .unwrap();
133 assert!(history.is_empty());
134 }
135
136 #[test]
139 fn test_v4_temporal_store_with_delta_history() {
140 use crate::collective::delta::compute_delta;
141 use crate::map::builder::SiteMapBuilder;
142 use crate::map::types::*;
143
144 let dir = TempDir::new().unwrap();
145
146 let mut builder1 = SiteMapBuilder::new("shop.com");
148 let mut feats1 = [0.0f32; FEATURE_DIM];
149 feats1[FEAT_PRICE] = 100.0;
150 builder1.add_node(
151 "https://shop.com/product/1",
152 PageType::ProductDetail,
153 feats1,
154 200,
155 );
156 let map1 = builder1.build();
157
158 let mut push_registry = LocalRegistry::new(dir.path().to_path_buf()).unwrap();
160 push_registry.push("shop.com", &map1, None).unwrap();
161
162 let mut builder2 = SiteMapBuilder::new("shop.com");
164 let mut feats2 = [0.0f32; FEATURE_DIM];
165 feats2[FEAT_PRICE] = 80.0;
166 builder2.add_node(
167 "https://shop.com/product/1",
168 PageType::ProductDetail,
169 feats2,
170 200,
171 );
172 let map2 = builder2.build();
173
174 let delta = compute_delta(&map1, &map2, "test");
175 push_registry.push("shop.com", &map2, Some(delta)).unwrap();
176
177 let registry2 = Arc::new(LocalRegistry::new(dir.path().to_path_buf()).unwrap());
179 let store = TemporalStore::new(registry2);
180
181 let _history = store
183 .history(
184 "shop.com",
185 "https://shop.com/product/1",
186 FEAT_PRICE as u8,
187 Utc::now() - chrono::Duration::days(30),
188 )
189 .unwrap();
190
191 }
194
195 #[test]
196 fn test_v4_temporal_store_history_compare() {
197 let dir = TempDir::new().unwrap();
198 let registry = Arc::new(LocalRegistry::new(dir.path().to_path_buf()).unwrap());
199 let store = TemporalStore::new(registry);
200
201 let pairs = vec![
202 ("a.com".to_string(), "/p1".to_string()),
203 ("b.com".to_string(), "/p2".to_string()),
204 ];
205
206 let result = store
207 .history_compare(&pairs, 48, Utc::now() - chrono::Duration::days(30))
208 .unwrap();
209
210 assert_eq!(result.len(), 2);
212 assert!(result.contains_key("a.com:/p1"));
213 assert!(result.contains_key("b.com:/p2"));
214 }
215}