Skip to main content

hyperstack_server/
projector.rs

1use crate::bus::{BusManager, BusMessage};
2use crate::cache::EntityCache;
3use crate::mutation_batch::{MutationBatch, SlotContext};
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::frame::{transform_large_u64_to_strings, Frame, Mode};
6use bytes::Bytes;
7use hyperstack_interpreter::CanonicalLog;
8use serde_json::Value;
9use smallvec::SmallVec;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tracing::{debug, error, instrument};
13
14#[cfg(feature = "otel")]
15use crate::metrics::Metrics;
16
17pub struct Projector {
18    view_index: Arc<ViewIndex>,
19    bus_manager: BusManager,
20    entity_cache: EntityCache,
21    mutations_rx: mpsc::Receiver<MutationBatch>,
22    #[cfg(feature = "otel")]
23    metrics: Option<Arc<Metrics>>,
24}
25
26impl Projector {
27    #[cfg(feature = "otel")]
28    pub fn new(
29        view_index: Arc<ViewIndex>,
30        bus_manager: BusManager,
31        entity_cache: EntityCache,
32        mutations_rx: mpsc::Receiver<MutationBatch>,
33        metrics: Option<Arc<Metrics>>,
34    ) -> Self {
35        Self {
36            view_index,
37            bus_manager,
38            entity_cache,
39            mutations_rx,
40            metrics,
41        }
42    }
43
44    #[cfg(not(feature = "otel"))]
45    pub fn new(
46        view_index: Arc<ViewIndex>,
47        bus_manager: BusManager,
48        entity_cache: EntityCache,
49        mutations_rx: mpsc::Receiver<MutationBatch>,
50    ) -> Self {
51        Self {
52            view_index,
53            bus_manager,
54            entity_cache,
55            mutations_rx,
56        }
57    }
58
59    pub async fn run(mut self) {
60        debug!("Projector started");
61
62        let mut json_buffer = Vec::with_capacity(4096);
63
64        while let Some(batch) = self.mutations_rx.recv().await {
65            let _span_guard = batch.span.enter();
66
67            let mut log = CanonicalLog::new();
68            log.set("phase", "projector");
69
70            let batch_size = batch.len();
71            let slot_context = batch.slot_context;
72            let mut frames_published = 0u32;
73            let mut errors = 0u32;
74
75            if let Some(ctx) = batch.event_context.as_ref() {
76                log.set("program", &ctx.program)
77                    .set("event_kind", &ctx.event_kind)
78                    .set("event_type", &ctx.event_type)
79                    .set("account", &ctx.account)
80                    .set("accounts_count", ctx.accounts_count);
81            }
82
83            for mutation in batch.mutations.into_iter() {
84                #[cfg(feature = "otel")]
85                let export = mutation.export.clone();
86
87                match self
88                    .process_mutation(mutation, slot_context, &mut json_buffer)
89                    .await
90                {
91                    Ok(count) => frames_published += count,
92                    Err(e) => {
93                        error!("Failed to process mutation: {}", e);
94                        errors += 1;
95                    }
96                }
97
98                #[cfg(feature = "otel")]
99                if let Some(ref metrics) = self.metrics {
100                    metrics.record_mutation_processed(&export);
101                }
102            }
103
104            log.set("batch_size", batch_size)
105                .set("frames_published", frames_published)
106                .set("errors", errors);
107
108            #[cfg(feature = "otel")]
109            if let Some(ref metrics) = self.metrics {
110                metrics.record_projector_latency(log.duration_ms());
111            }
112
113            log.emit();
114        }
115
116        debug!("Projector stopped");
117    }
118
119    #[instrument(
120        name = "projector.mutation",
121        skip(self, mutation, slot_context, json_buffer),
122        fields(export = %mutation.export)
123    )]
124    async fn process_mutation(
125        &self,
126        mutation: hyperstack_interpreter::Mutation,
127        slot_context: Option<SlotContext>,
128        json_buffer: &mut Vec<u8>,
129    ) -> anyhow::Result<u32> {
130        let specs = self.view_index.by_export(&mutation.export);
131
132        if specs.is_empty() {
133            return Ok(0);
134        }
135
136        let key = Self::extract_key(&mutation.key);
137        let hyperstack_interpreter::Mutation {
138            mut patch, append, ..
139        } = mutation;
140
141        // Inject _seq for recency sorting if slot context is available
142        if let Some(ctx) = slot_context {
143            if let Value::Object(ref mut map) = patch {
144                map.insert("_seq".to_string(), Value::String(ctx.to_seq_string()));
145            }
146        }
147
148        let matching_specs: SmallVec<[&ViewSpec; 4]> = specs
149            .iter()
150            .filter(|spec| spec.filters.matches(&key))
151            .collect();
152
153        let match_count = matching_specs.len();
154        if match_count == 0 {
155            return Ok(0);
156        }
157
158        let mut frames_published = 0u32;
159
160        for (i, spec) in matching_specs.into_iter().enumerate() {
161            let is_last = i == match_count - 1;
162            let patch_data = if is_last {
163                std::mem::take(&mut patch)
164            } else {
165                patch.clone()
166            };
167
168            let mut projected = spec.projection.apply(patch_data);
169            transform_large_u64_to_strings(&mut projected);
170
171            // Extract _seq from the patch data to include in the frame
172            let seq = slot_context.map(|ctx| ctx.to_seq_string());
173
174            let frame = Frame {
175                mode: spec.mode,
176                export: spec.id.clone(),
177                op: "patch",
178                key: key.clone(),
179                data: projected,
180                append: append.clone(),
181                seq,
182            };
183
184            json_buffer.clear();
185            serde_json::to_writer(&mut *json_buffer, &frame)?;
186            let payload = Arc::new(Bytes::copy_from_slice(json_buffer));
187
188            self.entity_cache
189                .upsert_with_append(&spec.id, &key, frame.data.clone(), &frame.append)
190                .await;
191
192            if spec.mode == Mode::List {
193                self.update_derived_view_caches(&spec.id, &key).await;
194            }
195
196            let message = Arc::new(BusMessage {
197                key: key.clone(),
198                entity: spec.id.clone(),
199                payload,
200            });
201
202            self.publish_frame(spec, message).await;
203            frames_published += 1;
204
205            #[cfg(feature = "otel")]
206            if let Some(ref metrics) = self.metrics {
207                let mode_str = match spec.mode {
208                    Mode::List => "list",
209                    Mode::State => "state",
210                    Mode::Append => "append",
211                };
212                metrics.record_frame_published(mode_str, &spec.export);
213            }
214        }
215
216        Ok(frames_published)
217    }
218
219    fn extract_key(key: &serde_json::Value) -> String {
220        key.as_str()
221            .map(|s| s.to_string())
222            .or_else(|| key.as_u64().map(|n| n.to_string()))
223            .or_else(|| key.as_i64().map(|n| n.to_string()))
224            .or_else(|| {
225                key.as_array().and_then(|arr| {
226                    let bytes: Vec<u8> = arr
227                        .iter()
228                        .filter_map(|v| v.as_u64().map(|n| n as u8))
229                        .collect();
230                    if bytes.len() == arr.len() {
231                        Some(hex::encode(&bytes))
232                    } else {
233                        None
234                    }
235                })
236            })
237            .unwrap_or_else(|| key.to_string())
238    }
239
240    async fn update_derived_view_caches(&self, source_view_id: &str, entity_key: &str) {
241        let derived_views = self.view_index.get_derived_views_for_source(source_view_id);
242        if derived_views.is_empty() {
243            return;
244        }
245
246        let entity_data = match self.entity_cache.get(source_view_id, entity_key).await {
247            Some(data) => data,
248            None => return,
249        };
250
251        let sorted_caches = self.view_index.sorted_caches();
252        let mut caches = sorted_caches.write().await;
253
254        for derived_spec in derived_views {
255            if let Some(cache) = caches.get_mut(&derived_spec.id) {
256                cache.upsert(entity_key.to_string(), entity_data.clone());
257                debug!(
258                    "Updated sorted cache for derived view {} with key {}",
259                    derived_spec.id, entity_key
260                );
261            }
262        }
263    }
264
265    #[instrument(
266        name = "projector.publish",
267        skip(self, spec, message),
268        fields(view_id = %spec.id, mode = ?spec.mode)
269    )]
270    async fn publish_frame(&self, spec: &ViewSpec, message: Arc<BusMessage>) {
271        match spec.mode {
272            Mode::State => {
273                self.bus_manager
274                    .publish_state(&spec.id, &message.key, message.payload.clone())
275                    .await;
276            }
277            Mode::List | Mode::Append => {
278                self.bus_manager.publish_list(&spec.id, message).await;
279            }
280        }
281    }
282}