hyperstack_server/
projector.rs1use crate::bus::{BusManager, BusMessage};
2use crate::cache::EntityCache;
3use crate::mutation_batch::MutationBatch;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::frame::{Frame, Mode};
6use bytes::Bytes;
7use hyperstack_interpreter::CanonicalLog;
8use smallvec::SmallVec;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tracing::{debug, error, instrument};
12
13#[cfg(feature = "otel")]
14use crate::metrics::Metrics;
15
16pub struct Projector {
17 view_index: Arc<ViewIndex>,
18 bus_manager: BusManager,
19 entity_cache: EntityCache,
20 mutations_rx: mpsc::Receiver<MutationBatch>,
21 #[cfg(feature = "otel")]
22 metrics: Option<Arc<Metrics>>,
23}
24
25impl Projector {
26 #[cfg(feature = "otel")]
27 pub fn new(
28 view_index: Arc<ViewIndex>,
29 bus_manager: BusManager,
30 entity_cache: EntityCache,
31 mutations_rx: mpsc::Receiver<MutationBatch>,
32 metrics: Option<Arc<Metrics>>,
33 ) -> Self {
34 Self {
35 view_index,
36 bus_manager,
37 entity_cache,
38 mutations_rx,
39 metrics,
40 }
41 }
42
43 #[cfg(not(feature = "otel"))]
44 pub fn new(
45 view_index: Arc<ViewIndex>,
46 bus_manager: BusManager,
47 entity_cache: EntityCache,
48 mutations_rx: mpsc::Receiver<MutationBatch>,
49 ) -> Self {
50 Self {
51 view_index,
52 bus_manager,
53 entity_cache,
54 mutations_rx,
55 }
56 }
57
58 pub async fn run(mut self) {
59 debug!("Projector started");
60
61 let mut json_buffer = Vec::with_capacity(4096);
62
63 while let Some(batch) = self.mutations_rx.recv().await {
64 let _span_guard = batch.span.enter();
65
66 let mut log = CanonicalLog::new();
67 log.set("phase", "projector");
68
69 let batch_size = batch.len();
70 let mut frames_published = 0u32;
71 let mut errors = 0u32;
72
73 for mutation in batch.mutations.into_iter() {
74 #[cfg(feature = "otel")]
75 let export = mutation.export.clone();
76
77 match self.process_mutation(mutation, &mut json_buffer).await {
78 Ok(count) => frames_published += count,
79 Err(e) => {
80 error!("Failed to process mutation: {}", e);
81 errors += 1;
82 }
83 }
84
85 #[cfg(feature = "otel")]
86 if let Some(ref metrics) = self.metrics {
87 metrics.record_mutation_processed(&export);
88 }
89 }
90
91 log.set("batch_size", batch_size)
92 .set("frames_published", frames_published)
93 .set("errors", errors);
94
95 #[cfg(feature = "otel")]
96 if let Some(ref metrics) = self.metrics {
97 metrics.record_projector_latency(log.duration_ms());
98 }
99
100 log.emit();
101 }
102
103 debug!("Projector stopped");
104 }
105
106 #[instrument(
107 name = "projector.mutation",
108 skip(self, mutation, json_buffer),
109 fields(export = %mutation.export)
110 )]
111 async fn process_mutation(
112 &self,
113 mutation: hyperstack_interpreter::Mutation,
114 json_buffer: &mut Vec<u8>,
115 ) -> anyhow::Result<u32> {
116 let specs = self.view_index.by_export(&mutation.export);
117
118 if specs.is_empty() {
119 return Ok(0);
120 }
121
122 let key = Self::extract_key(&mutation.key);
123 let hyperstack_interpreter::Mutation {
124 mut patch, append, ..
125 } = mutation;
126
127 let matching_specs: SmallVec<[&ViewSpec; 4]> = specs
128 .iter()
129 .filter(|spec| spec.filters.matches(&key))
130 .collect();
131
132 let match_count = matching_specs.len();
133 if match_count == 0 {
134 return Ok(0);
135 }
136
137 let mut frames_published = 0u32;
138
139 for (i, spec) in matching_specs.into_iter().enumerate() {
140 let is_last = i == match_count - 1;
141 let patch_data = if is_last {
142 std::mem::take(&mut patch)
143 } else {
144 patch.clone()
145 };
146
147 let projected = spec.projection.apply(patch_data);
148
149 let frame = Frame {
150 mode: spec.mode,
151 export: spec.id.clone(),
152 op: "patch",
153 key: key.clone(),
154 data: projected,
155 append: append.clone(),
156 };
157
158 json_buffer.clear();
159 serde_json::to_writer(&mut *json_buffer, &frame)?;
160 let payload = Arc::new(Bytes::copy_from_slice(json_buffer));
161
162 self.entity_cache
163 .upsert_with_append(&spec.id, &key, frame.data.clone(), &frame.append)
164 .await;
165
166 let message = Arc::new(BusMessage {
167 key: key.clone(),
168 entity: spec.id.clone(),
169 payload,
170 });
171
172 self.publish_frame(spec, message).await;
173 frames_published += 1;
174
175 #[cfg(feature = "otel")]
176 if let Some(ref metrics) = self.metrics {
177 let mode_str = match spec.mode {
178 Mode::List => "list",
179 Mode::State => "state",
180 Mode::Append => "append",
181 };
182 metrics.record_frame_published(mode_str, &spec.export);
183 }
184 }
185
186 Ok(frames_published)
187 }
188
189 fn extract_key(key: &serde_json::Value) -> String {
190 key.as_str()
191 .map(|s| s.to_string())
192 .or_else(|| key.as_u64().map(|n| n.to_string()))
193 .or_else(|| key.as_i64().map(|n| n.to_string()))
194 .or_else(|| {
195 key.as_array().and_then(|arr| {
196 let bytes: Vec<u8> = arr
197 .iter()
198 .filter_map(|v| v.as_u64().map(|n| n as u8))
199 .collect();
200 if bytes.len() == arr.len() {
201 Some(hex::encode(&bytes))
202 } else {
203 None
204 }
205 })
206 })
207 .unwrap_or_else(|| key.to_string())
208 }
209
210 #[instrument(
211 name = "projector.publish",
212 skip(self, spec, message),
213 fields(view_id = %spec.id, mode = ?spec.mode)
214 )]
215 async fn publish_frame(&self, spec: &ViewSpec, message: Arc<BusMessage>) {
216 match spec.mode {
217 Mode::State => {
218 self.bus_manager
219 .publish_state(&spec.id, &message.key, message.payload.clone())
220 .await;
221 }
222 Mode::List | Mode::Append => {
223 self.bus_manager.publish_list(&spec.id, message).await;
224 }
225 }
226 }
227}