cortex_runtime/live/
watch.rs1use crate::acquisition::http_client::HttpClient;
8use crate::acquisition::structured;
9use crate::cartography::feature_encoder;
10use crate::live::refresh;
11use crate::map::types::{SiteMap, FEATURE_DIM};
12use serde::{Deserialize, Serialize};
13use std::time::SystemTime;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct WatchDelta {
18 pub node: u32,
20 pub changed_features: Vec<(usize, f32, f32)>,
22 pub timestamp: f64,
24}
25
26#[derive(Debug, Clone)]
28pub struct WatchRequest {
29 pub domain: String,
31 pub nodes: Option<Vec<u32>>,
33 pub cluster: Option<u32>,
35 pub features: Option<Vec<usize>>,
37 pub interval_ms: u64,
39}
40
41pub fn compute_delta(
43 node: u32,
44 old_features: &[f32; 128],
45 new_features: &[f32; 128],
46 watch_features: Option<&[usize]>,
47 threshold: f32,
48) -> Option<WatchDelta> {
49 let mut changed = Vec::new();
50
51 let dimensions: Box<dyn Iterator<Item = usize>> = match watch_features {
52 Some(dims) => Box::new(dims.iter().copied()),
53 None => Box::new(0..128),
54 };
55
56 for dim in dimensions {
57 let old = old_features[dim];
58 let new = new_features[dim];
59 if (old - new).abs() > threshold {
60 changed.push((dim, old, new));
61 }
62 }
63
64 if changed.is_empty() {
65 return None;
66 }
67
68 let timestamp = SystemTime::now()
69 .duration_since(SystemTime::UNIX_EPOCH)
70 .map(|d| d.as_secs_f64())
71 .unwrap_or(0.0);
72
73 Some(WatchDelta {
74 node,
75 changed_features: changed,
76 timestamp,
77 })
78}
79
80pub fn select_watch_nodes(map: &SiteMap, request: &WatchRequest) -> Vec<u32> {
82 let refresh_req = refresh::RefreshRequest {
83 nodes: request.nodes.clone(),
84 cluster: request.cluster,
85 stale_threshold: None,
86 };
87 refresh::select_nodes_to_refresh(map, &refresh_req)
88}
89
90pub async fn fetch_node_features_http(
95 url: &str,
96 client: &HttpClient,
97) -> Option<[f32; FEATURE_DIM]> {
98 let resp = client.get(url, 10000).await.ok()?;
99 if resp.status != 200 {
100 return None;
101 }
102
103 let sd = structured::extract_structured_data(&resp.body, &resp.final_url);
104
105 if sd.has_jsonld || sd.has_opengraph {
107 let head = crate::acquisition::http_client::HeadResponse {
108 url: resp.url,
109 status: resp.status,
110 content_type: resp
111 .headers
112 .iter()
113 .find(|(k, _)| k == "content-type")
114 .map(|(_, v)| v.clone()),
115 content_language: resp
116 .headers
117 .iter()
118 .find(|(k, _)| k == "content-language")
119 .map(|(_, v)| v.clone()),
120 last_modified: resp
121 .headers
122 .iter()
123 .find(|(k, _)| k == "last-modified")
124 .map(|(_, v)| v.clone()),
125 cache_control: resp
126 .headers
127 .iter()
128 .find(|(k, _)| k == "cache-control")
129 .map(|(_, v)| v.clone()),
130 };
131
132 Some(feature_encoder::encode_features_from_structured_data(
133 &sd, url, &head,
134 ))
135 } else {
136 None }
138}