hyperstack_server/
projector.rs

1use crate::bus::{BusManager, BusMessage};
2use crate::cache::EntityCache;
3use crate::view::{ViewIndex, ViewSpec};
4use crate::websocket::frame::{Frame, Mode};
5use bytes::Bytes;
6use smallvec::SmallVec;
7use std::sync::Arc;
8#[cfg(feature = "otel")]
9use std::time::Instant;
10use tokio::sync::mpsc;
11use tracing::{debug, error};
12
13#[cfg(feature = "otel")]
14use crate::metrics::Metrics;
15
16pub struct Projector {
17    view_index: Arc<ViewIndex>,
18    bus_manager: BusManager,
19    entity_cache: EntityCache,
20    mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
21    #[cfg(feature = "otel")]
22    metrics: Option<Arc<Metrics>>,
23}
24
25impl Projector {
26    #[cfg(feature = "otel")]
27    pub fn new(
28        view_index: Arc<ViewIndex>,
29        bus_manager: BusManager,
30        entity_cache: EntityCache,
31        mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
32        metrics: Option<Arc<Metrics>>,
33    ) -> Self {
34        Self {
35            view_index,
36            bus_manager,
37            entity_cache,
38            mutations_rx,
39            metrics,
40        }
41    }
42
43    #[cfg(not(feature = "otel"))]
44    pub fn new(
45        view_index: Arc<ViewIndex>,
46        bus_manager: BusManager,
47        entity_cache: EntityCache,
48        mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
49    ) -> Self {
50        Self {
51            view_index,
52            bus_manager,
53            entity_cache,
54            mutations_rx,
55        }
56    }
57
58    pub async fn run(mut self) {
59        debug!("Projector started");
60
61        while let Some(mutations) = self.mutations_rx.recv().await {
62            for mutation in mutations.iter() {
63                #[cfg(feature = "otel")]
64                let start = Instant::now();
65
66                if let Err(e) = self.process_mutation(mutation).await {
67                    error!("Failed to process mutation: {}", e);
68                }
69
70                #[cfg(feature = "otel")]
71                if let Some(ref metrics) = self.metrics {
72                    let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
73                    metrics.record_projector_latency(latency_ms);
74                    metrics.record_mutation_processed(&mutation.export);
75                }
76            }
77        }
78
79        debug!("Projector stopped");
80    }
81
82    async fn process_mutation(
83        &self,
84        mutation: &hyperstack_interpreter::Mutation,
85    ) -> anyhow::Result<()> {
86        let specs = self.view_index.by_export(&mutation.export);
87
88        for spec in specs {
89            if !self.should_process(spec, mutation) {
90                continue;
91            }
92
93            let frame = self.create_frame(spec, mutation).await?;
94            let key = mutation
95                .key
96                .as_str()
97                .map(|s| s.to_string())
98                .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
99                .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
100                .or_else(|| {
101                    mutation.key.as_array().and_then(|arr| {
102                        let bytes: Vec<u8> = arr
103                            .iter()
104                            .filter_map(|v| v.as_u64().map(|n| n as u8))
105                            .collect();
106                        if bytes.len() == arr.len() {
107                            Some(hex::encode(&bytes))
108                        } else {
109                            None
110                        }
111                    })
112                })
113                .unwrap_or_else(|| mutation.key.to_string());
114
115            self.entity_cache.upsert(&spec.id, &key, &frame.data).await;
116
117            let message = Arc::new(BusMessage {
118                key: key.clone(),
119                entity: spec.id.clone(),
120                payload: Arc::new(Bytes::from(serde_json::to_vec(&frame)?)),
121            });
122
123            self.publish_frame(spec, mutation, message).await;
124
125            #[cfg(feature = "otel")]
126            if let Some(ref metrics) = self.metrics {
127                let mode_str = match spec.mode {
128                    Mode::List => "list",
129                    Mode::State => "state",
130                    Mode::Append => "append",
131                };
132                metrics.record_frame_published(mode_str, &spec.export);
133            }
134        }
135
136        Ok(())
137    }
138
139    fn should_process(&self, spec: &ViewSpec, mutation: &hyperstack_interpreter::Mutation) -> bool {
140        let key = mutation
141            .key
142            .as_str()
143            .map(|s| s.to_string())
144            .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
145            .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
146            .unwrap_or_else(|| mutation.key.to_string());
147
148        spec.filters.matches(&key)
149    }
150
151    async fn create_frame(
152        &self,
153        spec: &ViewSpec,
154        mutation: &hyperstack_interpreter::Mutation,
155    ) -> anyhow::Result<Frame> {
156        let key = mutation
157            .key
158            .as_str()
159            .map(|s| s.to_string())
160            .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
161            .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
162            .unwrap_or_else(|| mutation.key.to_string());
163
164        let projected = spec.projection.apply(mutation.patch.clone());
165
166        Ok(Frame {
167            mode: spec.mode,
168            export: spec.id.clone(),
169            op: "patch",
170            key,
171            data: projected,
172        })
173    }
174
175    async fn publish_frame(
176        &self,
177        spec: &ViewSpec,
178        _mutation: &hyperstack_interpreter::Mutation,
179        message: Arc<BusMessage>,
180    ) {
181        match spec.mode {
182            Mode::State => {
183                self.bus_manager
184                    .publish_state(&spec.id, &message.key, message.payload.clone())
185                    .await;
186            }
187            Mode::List | Mode::Append => {
188                self.bus_manager.publish_list(&spec.id, message).await;
189            }
190        }
191    }
192}