hyperstack_server/
projector.rs1use crate::bus::{BusManager, BusMessage};
2use crate::cache::EntityCache;
3use crate::view::{ViewIndex, ViewSpec};
4use crate::websocket::frame::{Frame, Mode};
5use bytes::Bytes;
6use smallvec::SmallVec;
7use std::sync::Arc;
8#[cfg(feature = "otel")]
9use std::time::Instant;
10use tokio::sync::mpsc;
11use tracing::{debug, error};
12
13#[cfg(feature = "otel")]
14use crate::metrics::Metrics;
15
16pub struct Projector {
17 view_index: Arc<ViewIndex>,
18 bus_manager: BusManager,
19 entity_cache: EntityCache,
20 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
21 #[cfg(feature = "otel")]
22 metrics: Option<Arc<Metrics>>,
23}
24
25impl Projector {
26 #[cfg(feature = "otel")]
27 pub fn new(
28 view_index: Arc<ViewIndex>,
29 bus_manager: BusManager,
30 entity_cache: EntityCache,
31 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
32 metrics: Option<Arc<Metrics>>,
33 ) -> Self {
34 Self {
35 view_index,
36 bus_manager,
37 entity_cache,
38 mutations_rx,
39 metrics,
40 }
41 }
42
43 #[cfg(not(feature = "otel"))]
44 pub fn new(
45 view_index: Arc<ViewIndex>,
46 bus_manager: BusManager,
47 entity_cache: EntityCache,
48 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
49 ) -> Self {
50 Self {
51 view_index,
52 bus_manager,
53 entity_cache,
54 mutations_rx,
55 }
56 }
57
58 pub async fn run(mut self) {
59 debug!("Projector started");
60
61 while let Some(mutations) = self.mutations_rx.recv().await {
62 for mutation in mutations.iter() {
63 #[cfg(feature = "otel")]
64 let start = Instant::now();
65
66 if let Err(e) = self.process_mutation(mutation).await {
67 error!("Failed to process mutation: {}", e);
68 }
69
70 #[cfg(feature = "otel")]
71 if let Some(ref metrics) = self.metrics {
72 let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
73 metrics.record_projector_latency(latency_ms);
74 metrics.record_mutation_processed(&mutation.export);
75 }
76 }
77 }
78
79 debug!("Projector stopped");
80 }
81
82 async fn process_mutation(
83 &self,
84 mutation: &hyperstack_interpreter::Mutation,
85 ) -> anyhow::Result<()> {
86 let specs = self.view_index.by_export(&mutation.export);
87
88 for spec in specs {
89 if !self.should_process(spec, mutation) {
90 continue;
91 }
92
93 let frame = self.create_frame(spec, mutation).await?;
94 let key = mutation
95 .key
96 .as_str()
97 .map(|s| s.to_string())
98 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
99 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
100 .or_else(|| {
101 mutation.key.as_array().and_then(|arr| {
102 let bytes: Vec<u8> = arr
103 .iter()
104 .filter_map(|v| v.as_u64().map(|n| n as u8))
105 .collect();
106 if bytes.len() == arr.len() {
107 Some(hex::encode(&bytes))
108 } else {
109 None
110 }
111 })
112 })
113 .unwrap_or_else(|| mutation.key.to_string());
114
115 self.entity_cache.upsert(&spec.id, &key, &frame.data).await;
116
117 let message = Arc::new(BusMessage {
118 key: key.clone(),
119 entity: spec.id.clone(),
120 payload: Arc::new(Bytes::from(serde_json::to_vec(&frame)?)),
121 });
122
123 self.publish_frame(spec, mutation, message).await;
124
125 #[cfg(feature = "otel")]
126 if let Some(ref metrics) = self.metrics {
127 let mode_str = match spec.mode {
128 Mode::List => "list",
129 Mode::State => "state",
130 Mode::Append => "append",
131 };
132 metrics.record_frame_published(mode_str, &spec.export);
133 }
134 }
135
136 Ok(())
137 }
138
139 fn should_process(&self, spec: &ViewSpec, mutation: &hyperstack_interpreter::Mutation) -> bool {
140 let key = mutation
141 .key
142 .as_str()
143 .map(|s| s.to_string())
144 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
145 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
146 .unwrap_or_else(|| mutation.key.to_string());
147
148 spec.filters.matches(&key)
149 }
150
151 async fn create_frame(
152 &self,
153 spec: &ViewSpec,
154 mutation: &hyperstack_interpreter::Mutation,
155 ) -> anyhow::Result<Frame> {
156 let key = mutation
157 .key
158 .as_str()
159 .map(|s| s.to_string())
160 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
161 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
162 .unwrap_or_else(|| mutation.key.to_string());
163
164 let projected = spec.projection.apply(mutation.patch.clone());
165
166 Ok(Frame {
167 mode: spec.mode,
168 export: spec.id.clone(),
169 op: "patch",
170 key,
171 data: projected,
172 })
173 }
174
175 async fn publish_frame(
176 &self,
177 spec: &ViewSpec,
178 _mutation: &hyperstack_interpreter::Mutation,
179 message: Arc<BusMessage>,
180 ) {
181 match spec.mode {
182 Mode::State => {
183 self.bus_manager
184 .publish_state(&spec.id, &message.key, message.payload.clone())
185 .await;
186 }
187 Mode::List | Mode::Append => {
188 self.bus_manager.publish_list(&spec.id, message).await;
189 }
190 }
191 }
192}