hyperstack_server/
projector.rs1use crate::bus::{BusManager, BusMessage};
2use crate::view::{ViewIndex, ViewSpec};
3use crate::websocket::frame::{Frame, Mode};
4use bytes::Bytes;
5use smallvec::SmallVec;
6use std::sync::Arc;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9use tokio::sync::mpsc;
10use tracing::{debug, error};
11
12#[cfg(feature = "otel")]
13use crate::metrics::Metrics;
14
15pub struct Projector {
16 view_index: Arc<ViewIndex>,
17 bus_manager: BusManager,
18 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
19 #[cfg(feature = "otel")]
20 metrics: Option<Arc<Metrics>>,
21}
22
23impl Projector {
24 #[cfg(feature = "otel")]
25 pub fn new(
26 view_index: Arc<ViewIndex>,
27 bus_manager: BusManager,
28 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
29 metrics: Option<Arc<Metrics>>,
30 ) -> Self {
31 Self {
32 view_index,
33 bus_manager,
34 mutations_rx,
35 metrics,
36 }
37 }
38
39 #[cfg(not(feature = "otel"))]
40 pub fn new(
41 view_index: Arc<ViewIndex>,
42 bus_manager: BusManager,
43 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
44 ) -> Self {
45 Self {
46 view_index,
47 bus_manager,
48 mutations_rx,
49 }
50 }
51
52 pub async fn run(mut self) {
53 debug!("Projector started");
54
55 while let Some(mutations) = self.mutations_rx.recv().await {
56 for mutation in mutations.iter() {
57 #[cfg(feature = "otel")]
58 let start = Instant::now();
59
60 if let Err(e) = self.process_mutation(mutation).await {
61 error!("Failed to process mutation: {}", e);
62 }
63
64 #[cfg(feature = "otel")]
65 if let Some(ref metrics) = self.metrics {
66 let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
67 metrics.record_projector_latency(latency_ms);
68 metrics.record_mutation_processed(&mutation.export);
69 }
70 }
71 }
72
73 debug!("Projector stopped");
74 }
75
76 async fn process_mutation(
77 &self,
78 mutation: &hyperstack_interpreter::Mutation,
79 ) -> anyhow::Result<()> {
80 let specs = self.view_index.by_export(&mutation.export);
81
82 for spec in specs {
83 if !self.should_process(spec, mutation) {
84 continue;
85 }
86
87 let frame = self.create_frame(spec, mutation).await?;
88 let key = mutation
89 .key
90 .as_str()
91 .map(|s| s.to_string())
92 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
93 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
94 .or_else(|| {
95 mutation.key.as_array().and_then(|arr| {
97 let bytes: Vec<u8> = arr
98 .iter()
99 .filter_map(|v| v.as_u64().map(|n| n as u8))
100 .collect();
101 if bytes.len() == arr.len() {
102 Some(hex::encode(&bytes))
103 } else {
104 None
105 }
106 })
107 })
108 .unwrap_or_else(|| mutation.key.to_string());
109
110 let message = Arc::new(BusMessage {
111 key: key.clone(),
112 entity: spec.id.clone(),
113 payload: Arc::new(Bytes::from(serde_json::to_vec(&frame)?)),
114 });
115
116 self.publish_frame(spec, mutation, message).await;
117
118 #[cfg(feature = "otel")]
119 if let Some(ref metrics) = self.metrics {
120 let mode_str = match spec.mode {
121 Mode::Kv => "kv",
122 Mode::List => "list",
123 Mode::State => "state",
124 Mode::Append => "append",
125 };
126 metrics.record_frame_published(mode_str, &spec.export);
127 }
128 }
129
130 Ok(())
131 }
132
133 fn should_process(&self, spec: &ViewSpec, mutation: &hyperstack_interpreter::Mutation) -> bool {
134 let key = mutation
135 .key
136 .as_str()
137 .map(|s| s.to_string())
138 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
139 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
140 .unwrap_or_else(|| mutation.key.to_string());
141
142 spec.filters.matches(&key)
143 }
144
145 async fn create_frame(
146 &self,
147 spec: &ViewSpec,
148 mutation: &hyperstack_interpreter::Mutation,
149 ) -> anyhow::Result<Frame> {
150 match spec.mode {
151 Mode::Kv | Mode::State => self.create_kv_frame(spec, mutation),
152 Mode::List => self.create_list_frame(spec, mutation),
153 Mode::Append => self.create_append_frame(spec, mutation),
154 }
155 }
156
157 fn create_kv_frame(
158 &self,
159 spec: &ViewSpec,
160 mutation: &hyperstack_interpreter::Mutation,
161 ) -> anyhow::Result<Frame> {
162 let key = mutation
163 .key
164 .as_str()
165 .map(|s| s.to_string())
166 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
167 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
168 .unwrap_or_else(|| mutation.key.to_string());
169
170 let projected = spec.projection.apply(mutation.patch.clone());
171
172 Ok(Frame {
173 mode: spec.mode,
174 export: spec.id.clone(),
175 op: "patch",
176 key,
177 data: projected,
178 })
179 }
180
181 fn create_list_frame(
182 &self,
183 spec: &ViewSpec,
184 mutation: &hyperstack_interpreter::Mutation,
185 ) -> anyhow::Result<Frame> {
186 let key = mutation
187 .key
188 .as_str()
189 .map(|s| s.to_string())
190 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
191 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
192 .unwrap_or_else(|| mutation.key.to_string());
193
194 let projected = spec.projection.apply(mutation.patch.clone());
195
196 let list_item = serde_json::json!({
197 "id": key,
198 "order": 0,
199 "item": projected,
200 });
201
202 Ok(Frame {
203 mode: spec.mode,
204 export: spec.id.clone(),
205 op: "patch",
206 key: key.clone(),
207 data: list_item,
208 })
209 }
210
211 fn create_append_frame(
212 &self,
213 spec: &ViewSpec,
214 mutation: &hyperstack_interpreter::Mutation,
215 ) -> anyhow::Result<Frame> {
216 self.create_kv_frame(spec, mutation)
217 }
218
219 async fn publish_frame(
220 &self,
221 spec: &ViewSpec,
222 _mutation: &hyperstack_interpreter::Mutation,
223 message: Arc<BusMessage>,
224 ) {
225 match spec.mode {
226 Mode::State => {
227 self.bus_manager
228 .publish_state(&spec.id, &message.key, message.payload.clone())
229 .await;
230 }
231 Mode::Kv | Mode::Append => {
232 self.bus_manager.publish_kv(&spec.id, message).await;
233 }
234 Mode::List => {
235 self.bus_manager.publish_list(&spec.id, message).await;
236 }
237 }
238 }
239}