1use crate::bus::{BusManager, BusMessage};
2use crate::cache::EntityCache;
3use crate::mutation_batch::{MutationBatch, SlotContext};
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::frame::{transform_large_u64_to_strings, Frame, Mode};
6use bytes::Bytes;
7use hyperstack_interpreter::CanonicalLog;
8use serde_json::Value;
9use smallvec::SmallVec;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tracing::{debug, error, instrument};
13
14#[cfg(feature = "otel")]
15use crate::metrics::Metrics;
16
17pub struct Projector {
18 view_index: Arc<ViewIndex>,
19 bus_manager: BusManager,
20 entity_cache: EntityCache,
21 mutations_rx: mpsc::Receiver<MutationBatch>,
22 #[cfg(feature = "otel")]
23 metrics: Option<Arc<Metrics>>,
24}
25
26impl Projector {
27 #[cfg(feature = "otel")]
28 pub fn new(
29 view_index: Arc<ViewIndex>,
30 bus_manager: BusManager,
31 entity_cache: EntityCache,
32 mutations_rx: mpsc::Receiver<MutationBatch>,
33 metrics: Option<Arc<Metrics>>,
34 ) -> Self {
35 Self {
36 view_index,
37 bus_manager,
38 entity_cache,
39 mutations_rx,
40 metrics,
41 }
42 }
43
44 #[cfg(not(feature = "otel"))]
45 pub fn new(
46 view_index: Arc<ViewIndex>,
47 bus_manager: BusManager,
48 entity_cache: EntityCache,
49 mutations_rx: mpsc::Receiver<MutationBatch>,
50 ) -> Self {
51 Self {
52 view_index,
53 bus_manager,
54 entity_cache,
55 mutations_rx,
56 }
57 }
58
59 pub async fn run(mut self) {
60 debug!("Projector started");
61
62 let mut json_buffer = Vec::with_capacity(4096);
63
64 while let Some(batch) = self.mutations_rx.recv().await {
65 let _span_guard = batch.span.enter();
66
67 let mut log = CanonicalLog::new();
68 log.set("phase", "projector");
69
70 let batch_size = batch.len();
71 let slot_context = batch.slot_context;
72 let mut frames_published = 0u32;
73 let mut errors = 0u32;
74
75 if let Some(ctx) = batch.event_context.as_ref() {
76 log.set("program", &ctx.program)
77 .set("event_kind", &ctx.event_kind)
78 .set("event_type", &ctx.event_type)
79 .set("account", &ctx.account)
80 .set("accounts_count", ctx.accounts_count);
81 }
82
83 for mutation in batch.mutations.into_iter() {
84 #[cfg(feature = "otel")]
85 let export = mutation.export.clone();
86
87 match self
88 .process_mutation(mutation, slot_context, &mut json_buffer)
89 .await
90 {
91 Ok(count) => frames_published += count,
92 Err(e) => {
93 error!("Failed to process mutation: {}", e);
94 errors += 1;
95 }
96 }
97
98 #[cfg(feature = "otel")]
99 if let Some(ref metrics) = self.metrics {
100 metrics.record_mutation_processed(&export);
101 }
102 }
103
104 log.set("batch_size", batch_size)
105 .set("frames_published", frames_published)
106 .set("errors", errors);
107
108 #[cfg(feature = "otel")]
109 if let Some(ref metrics) = self.metrics {
110 metrics.record_projector_latency(log.duration_ms());
111 }
112
113 log.emit();
114 }
115
116 debug!("Projector stopped");
117 }
118
119 #[instrument(
120 name = "projector.mutation",
121 skip(self, mutation, slot_context, json_buffer),
122 fields(export = %mutation.export)
123 )]
124 async fn process_mutation(
125 &self,
126 mutation: hyperstack_interpreter::Mutation,
127 slot_context: Option<SlotContext>,
128 json_buffer: &mut Vec<u8>,
129 ) -> anyhow::Result<u32> {
130 let specs = self.view_index.by_export(&mutation.export);
131
132 if specs.is_empty() {
133 return Ok(0);
134 }
135
136 let key = Self::extract_key(&mutation.key);
137 let hyperstack_interpreter::Mutation {
138 mut patch, append, ..
139 } = mutation;
140
141 if let Some(ctx) = slot_context {
143 if let Value::Object(ref mut map) = patch {
144 map.insert("_seq".to_string(), Value::String(ctx.to_seq_string()));
145 }
146 }
147
148 let matching_specs: SmallVec<[&ViewSpec; 4]> = specs
149 .iter()
150 .filter(|spec| spec.filters.matches(&key))
151 .collect();
152
153 let match_count = matching_specs.len();
154 if match_count == 0 {
155 return Ok(0);
156 }
157
158 let mut frames_published = 0u32;
159
160 for (i, spec) in matching_specs.into_iter().enumerate() {
161 let is_last = i == match_count - 1;
162 let patch_data = if is_last {
163 std::mem::take(&mut patch)
164 } else {
165 patch.clone()
166 };
167
168 let mut projected = spec.projection.apply(patch_data);
169 transform_large_u64_to_strings(&mut projected);
170
171 let seq = slot_context.map(|ctx| ctx.to_seq_string());
173
174 let frame = Frame {
175 mode: spec.mode,
176 export: spec.id.clone(),
177 op: "patch",
178 key: key.clone(),
179 data: projected,
180 append: append.clone(),
181 seq,
182 };
183
184 json_buffer.clear();
185 serde_json::to_writer(&mut *json_buffer, &frame)?;
186 let payload = Arc::new(Bytes::copy_from_slice(json_buffer));
187
188 self.entity_cache
189 .upsert_with_append(&spec.id, &key, frame.data.clone(), &frame.append)
190 .await;
191
192 if spec.mode == Mode::List {
193 self.update_derived_view_caches(&spec.id, &key).await;
194 }
195
196 let message = Arc::new(BusMessage {
197 key: key.clone(),
198 entity: spec.id.clone(),
199 payload,
200 });
201
202 self.publish_frame(spec, message).await;
203 frames_published += 1;
204
205 #[cfg(feature = "otel")]
206 if let Some(ref metrics) = self.metrics {
207 let mode_str = match spec.mode {
208 Mode::List => "list",
209 Mode::State => "state",
210 Mode::Append => "append",
211 };
212 metrics.record_frame_published(mode_str, &spec.export);
213 }
214 }
215
216 Ok(frames_published)
217 }
218
219 fn extract_key(key: &serde_json::Value) -> String {
220 key.as_str()
221 .map(|s| s.to_string())
222 .or_else(|| key.as_u64().map(|n| n.to_string()))
223 .or_else(|| key.as_i64().map(|n| n.to_string()))
224 .or_else(|| {
225 key.as_array().and_then(|arr| {
226 let bytes: Vec<u8> = arr
227 .iter()
228 .filter_map(|v| v.as_u64().map(|n| n as u8))
229 .collect();
230 if bytes.len() == arr.len() {
231 Some(hex::encode(&bytes))
232 } else {
233 None
234 }
235 })
236 })
237 .unwrap_or_else(|| key.to_string())
238 }
239
240 async fn update_derived_view_caches(&self, source_view_id: &str, entity_key: &str) {
241 let derived_views = self.view_index.get_derived_views_for_source(source_view_id);
242 if derived_views.is_empty() {
243 return;
244 }
245
246 let entity_data = match self.entity_cache.get(source_view_id, entity_key).await {
247 Some(data) => data,
248 None => return,
249 };
250
251 let sorted_caches = self.view_index.sorted_caches();
252 let mut caches = sorted_caches.write().await;
253
254 for derived_spec in derived_views {
255 if let Some(cache) = caches.get_mut(&derived_spec.id) {
256 cache.upsert(entity_key.to_string(), entity_data.clone());
257 debug!(
258 "Updated sorted cache for derived view {} with key {}",
259 derived_spec.id, entity_key
260 );
261 }
262 }
263 }
264
265 #[instrument(
266 name = "projector.publish",
267 skip(self, spec, message),
268 fields(view_id = %spec.id, mode = ?spec.mode)
269 )]
270 async fn publish_frame(&self, spec: &ViewSpec, message: Arc<BusMessage>) {
271 match spec.mode {
272 Mode::State => {
273 self.bus_manager
274 .publish_state(&spec.id, &message.key, message.payload.clone())
275 .await;
276 }
277 Mode::List | Mode::Append => {
278 self.bus_manager.publish_list(&spec.id, message).await;
279 }
280 }
281 }
282}