hyperstack_server/
projector.rs

1use crate::bus::{BusManager, BusMessage};
2use crate::cache::EntityCache;
3use crate::mutation_batch::MutationBatch;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::frame::{Frame, Mode};
6use bytes::Bytes;
7use hyperstack_interpreter::CanonicalLog;
8use smallvec::SmallVec;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tracing::{debug, error, instrument};
12
13#[cfg(feature = "otel")]
14use crate::metrics::Metrics;
15
16pub struct Projector {
17    view_index: Arc<ViewIndex>,
18    bus_manager: BusManager,
19    entity_cache: EntityCache,
20    mutations_rx: mpsc::Receiver<MutationBatch>,
21    #[cfg(feature = "otel")]
22    metrics: Option<Arc<Metrics>>,
23}
24
25impl Projector {
26    #[cfg(feature = "otel")]
27    pub fn new(
28        view_index: Arc<ViewIndex>,
29        bus_manager: BusManager,
30        entity_cache: EntityCache,
31        mutations_rx: mpsc::Receiver<MutationBatch>,
32        metrics: Option<Arc<Metrics>>,
33    ) -> Self {
34        Self {
35            view_index,
36            bus_manager,
37            entity_cache,
38            mutations_rx,
39            metrics,
40        }
41    }
42
43    #[cfg(not(feature = "otel"))]
44    pub fn new(
45        view_index: Arc<ViewIndex>,
46        bus_manager: BusManager,
47        entity_cache: EntityCache,
48        mutations_rx: mpsc::Receiver<MutationBatch>,
49    ) -> Self {
50        Self {
51            view_index,
52            bus_manager,
53            entity_cache,
54            mutations_rx,
55        }
56    }
57
58    pub async fn run(mut self) {
59        debug!("Projector started");
60
61        let mut json_buffer = Vec::with_capacity(4096);
62
63        while let Some(batch) = self.mutations_rx.recv().await {
64            let _span_guard = batch.span.enter();
65
66            let mut log = CanonicalLog::new();
67            log.set("phase", "projector");
68
69            let batch_size = batch.len();
70            let mut frames_published = 0u32;
71            let mut errors = 0u32;
72
73            for mutation in batch.mutations.into_iter() {
74                #[cfg(feature = "otel")]
75                let export = mutation.export.clone();
76
77                match self.process_mutation(mutation, &mut json_buffer).await {
78                    Ok(count) => frames_published += count,
79                    Err(e) => {
80                        error!("Failed to process mutation: {}", e);
81                        errors += 1;
82                    }
83                }
84
85                #[cfg(feature = "otel")]
86                if let Some(ref metrics) = self.metrics {
87                    metrics.record_mutation_processed(&export);
88                }
89            }
90
91            log.set("batch_size", batch_size)
92                .set("frames_published", frames_published)
93                .set("errors", errors);
94
95            #[cfg(feature = "otel")]
96            if let Some(ref metrics) = self.metrics {
97                metrics.record_projector_latency(log.duration_ms());
98            }
99
100            log.emit();
101        }
102
103        debug!("Projector stopped");
104    }
105
106    #[instrument(
107        name = "projector.mutation",
108        skip(self, mutation, json_buffer),
109        fields(export = %mutation.export)
110    )]
111    async fn process_mutation(
112        &self,
113        mutation: hyperstack_interpreter::Mutation,
114        json_buffer: &mut Vec<u8>,
115    ) -> anyhow::Result<u32> {
116        let specs = self.view_index.by_export(&mutation.export);
117
118        if specs.is_empty() {
119            return Ok(0);
120        }
121
122        let key = Self::extract_key(&mutation.key);
123        let hyperstack_interpreter::Mutation {
124            mut patch, append, ..
125        } = mutation;
126
127        let matching_specs: SmallVec<[&ViewSpec; 4]> = specs
128            .iter()
129            .filter(|spec| spec.filters.matches(&key))
130            .collect();
131
132        let match_count = matching_specs.len();
133        if match_count == 0 {
134            return Ok(0);
135        }
136
137        let mut frames_published = 0u32;
138
139        for (i, spec) in matching_specs.into_iter().enumerate() {
140            let is_last = i == match_count - 1;
141            let patch_data = if is_last {
142                std::mem::take(&mut patch)
143            } else {
144                patch.clone()
145            };
146
147            let projected = spec.projection.apply(patch_data);
148
149            let frame = Frame {
150                mode: spec.mode,
151                export: spec.id.clone(),
152                op: "patch",
153                key: key.clone(),
154                data: projected,
155                append: append.clone(),
156            };
157
158            json_buffer.clear();
159            serde_json::to_writer(&mut *json_buffer, &frame)?;
160            let payload = Arc::new(Bytes::copy_from_slice(json_buffer));
161
162            self.entity_cache
163                .upsert_with_append(&spec.id, &key, frame.data.clone(), &frame.append)
164                .await;
165
166            let message = Arc::new(BusMessage {
167                key: key.clone(),
168                entity: spec.id.clone(),
169                payload,
170            });
171
172            self.publish_frame(spec, message).await;
173            frames_published += 1;
174
175            #[cfg(feature = "otel")]
176            if let Some(ref metrics) = self.metrics {
177                let mode_str = match spec.mode {
178                    Mode::List => "list",
179                    Mode::State => "state",
180                    Mode::Append => "append",
181                };
182                metrics.record_frame_published(mode_str, &spec.export);
183            }
184        }
185
186        Ok(frames_published)
187    }
188
189    fn extract_key(key: &serde_json::Value) -> String {
190        key.as_str()
191            .map(|s| s.to_string())
192            .or_else(|| key.as_u64().map(|n| n.to_string()))
193            .or_else(|| key.as_i64().map(|n| n.to_string()))
194            .or_else(|| {
195                key.as_array().and_then(|arr| {
196                    let bytes: Vec<u8> = arr
197                        .iter()
198                        .filter_map(|v| v.as_u64().map(|n| n as u8))
199                        .collect();
200                    if bytes.len() == arr.len() {
201                        Some(hex::encode(&bytes))
202                    } else {
203                        None
204                    }
205                })
206            })
207            .unwrap_or_else(|| key.to_string())
208    }
209
210    #[instrument(
211        name = "projector.publish",
212        skip(self, spec, message),
213        fields(view_id = %spec.id, mode = ?spec.mode)
214    )]
215    async fn publish_frame(&self, spec: &ViewSpec, message: Arc<BusMessage>) {
216        match spec.mode {
217            Mode::State => {
218                self.bus_manager
219                    .publish_state(&spec.id, &message.key, message.payload.clone())
220                    .await;
221            }
222            Mode::List | Mode::Append => {
223                self.bus_manager.publish_list(&spec.id, message).await;
224            }
225        }
226    }
227}