hyperstack_server/
projector.rs1use crate::bus::{BusManager, BusMessage};
2use crate::view::{ViewIndex, ViewSpec};
3use crate::websocket::frame::{Frame, Mode};
4use bytes::Bytes;
5use smallvec::SmallVec;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::mpsc;
9use tracing::{debug, error};
10
11#[cfg(feature = "otel")]
12use crate::metrics::Metrics;
13
14pub struct Projector {
15 view_index: Arc<ViewIndex>,
16 bus_manager: BusManager,
17 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
18 #[cfg(feature = "otel")]
19 metrics: Option<Arc<Metrics>>,
20}
21
22impl Projector {
23 #[cfg(feature = "otel")]
24 pub fn new(
25 view_index: Arc<ViewIndex>,
26 bus_manager: BusManager,
27 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
28 metrics: Option<Arc<Metrics>>,
29 ) -> Self {
30 Self {
31 view_index,
32 bus_manager,
33 mutations_rx,
34 metrics,
35 }
36 }
37
38 #[cfg(not(feature = "otel"))]
39 pub fn new(
40 view_index: Arc<ViewIndex>,
41 bus_manager: BusManager,
42 mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
43 ) -> Self {
44 Self {
45 view_index,
46 bus_manager,
47 mutations_rx,
48 }
49 }
50
51 pub async fn run(mut self) {
52 debug!("Projector started");
53
54 while let Some(mutations) = self.mutations_rx.recv().await {
55 for mutation in mutations.iter() {
56 let _start = Instant::now();
57
58 if let Err(e) = self.process_mutation(mutation).await {
59 error!("Failed to process mutation: {}", e);
60 }
61
62 #[cfg(feature = "otel")]
63 if let Some(ref metrics) = self.metrics {
64 let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
65 metrics.record_projector_latency(latency_ms);
66 metrics.record_mutation_processed(&mutation.export);
67 }
68 }
69 }
70
71 debug!("Projector stopped");
72 }
73
74 async fn process_mutation(
75 &self,
76 mutation: &hyperstack_interpreter::Mutation,
77 ) -> anyhow::Result<()> {
78 let specs = self.view_index.by_export(&mutation.export);
79
80 for spec in specs {
81 if !self.should_process(spec, mutation) {
82 continue;
83 }
84
85 let frame = self.create_frame(spec, mutation).await?;
86 let key = mutation
87 .key
88 .as_str()
89 .map(|s| s.to_string())
90 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
91 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
92 .or_else(|| {
93 mutation.key.as_array().and_then(|arr| {
95 let bytes: Vec<u8> = arr
96 .iter()
97 .filter_map(|v| v.as_u64().map(|n| n as u8))
98 .collect();
99 if bytes.len() == arr.len() {
100 Some(hex::encode(&bytes))
101 } else {
102 None
103 }
104 })
105 })
106 .unwrap_or_else(|| mutation.key.to_string());
107
108 let message = Arc::new(BusMessage {
109 key: key.clone(),
110 entity: spec.id.clone(),
111 payload: Arc::new(Bytes::from(serde_json::to_vec(&frame)?)),
112 });
113
114 self.publish_frame(spec, mutation, message).await;
115
116 #[cfg(feature = "otel")]
117 if let Some(ref metrics) = self.metrics {
118 let mode_str = match spec.mode {
119 Mode::Kv => "kv",
120 Mode::List => "list",
121 Mode::State => "state",
122 Mode::Append => "append",
123 };
124 metrics.record_frame_published(mode_str, &spec.export);
125 }
126 }
127
128 Ok(())
129 }
130
131 fn should_process(&self, spec: &ViewSpec, mutation: &hyperstack_interpreter::Mutation) -> bool {
132 let key = mutation
133 .key
134 .as_str()
135 .map(|s| s.to_string())
136 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
137 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
138 .unwrap_or_else(|| mutation.key.to_string());
139
140 spec.filters.matches(&key)
141 }
142
143 async fn create_frame(
144 &self,
145 spec: &ViewSpec,
146 mutation: &hyperstack_interpreter::Mutation,
147 ) -> anyhow::Result<Frame> {
148 match spec.mode {
149 Mode::Kv | Mode::State => self.create_kv_frame(spec, mutation),
150 Mode::List => self.create_list_frame(spec, mutation),
151 Mode::Append => self.create_append_frame(spec, mutation),
152 }
153 }
154
155 fn create_kv_frame(
156 &self,
157 spec: &ViewSpec,
158 mutation: &hyperstack_interpreter::Mutation,
159 ) -> anyhow::Result<Frame> {
160 let key = mutation
161 .key
162 .as_str()
163 .map(|s| s.to_string())
164 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
165 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
166 .unwrap_or_else(|| mutation.key.to_string());
167
168 let projected = spec.projection.apply(mutation.patch.clone());
169
170 Ok(Frame {
171 mode: spec.mode,
172 export: spec.id.clone(),
173 op: "patch",
174 key,
175 data: projected,
176 })
177 }
178
179 fn create_list_frame(
180 &self,
181 spec: &ViewSpec,
182 mutation: &hyperstack_interpreter::Mutation,
183 ) -> anyhow::Result<Frame> {
184 let key = mutation
185 .key
186 .as_str()
187 .map(|s| s.to_string())
188 .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
189 .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
190 .unwrap_or_else(|| mutation.key.to_string());
191
192 let projected = spec.projection.apply(mutation.patch.clone());
193
194 let list_item = serde_json::json!({
195 "id": key,
196 "order": 0,
197 "item": projected,
198 });
199
200 Ok(Frame {
201 mode: spec.mode,
202 export: spec.id.clone(),
203 op: "patch",
204 key: key.clone(),
205 data: list_item,
206 })
207 }
208
209 fn create_append_frame(
210 &self,
211 spec: &ViewSpec,
212 mutation: &hyperstack_interpreter::Mutation,
213 ) -> anyhow::Result<Frame> {
214 self.create_kv_frame(spec, mutation)
215 }
216
217 async fn publish_frame(
218 &self,
219 spec: &ViewSpec,
220 _mutation: &hyperstack_interpreter::Mutation,
221 message: Arc<BusMessage>,
222 ) {
223 match spec.mode {
224 Mode::State => {
225 self.bus_manager
226 .publish_state(&spec.id, &message.key, message.payload.clone())
227 .await;
228 }
229 Mode::Kv | Mode::Append => {
230 self.bus_manager.publish_kv(&spec.id, message).await;
231 }
232 Mode::List => {
233 self.bus_manager.publish_list(&spec.id, message).await;
234 }
235 }
236 }
237}