hyperstack_server/
projector.rs

1use crate::bus::{BusManager, BusMessage};
2use crate::view::{ViewIndex, ViewSpec};
3use crate::websocket::frame::{Frame, Mode};
4use bytes::Bytes;
5use smallvec::SmallVec;
6use std::sync::Arc;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9use tokio::sync::mpsc;
10use tracing::{debug, error};
11
12#[cfg(feature = "otel")]
13use crate::metrics::Metrics;
14
15pub struct Projector {
16    view_index: Arc<ViewIndex>,
17    bus_manager: BusManager,
18    mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
19    #[cfg(feature = "otel")]
20    metrics: Option<Arc<Metrics>>,
21}
22
23impl Projector {
24    #[cfg(feature = "otel")]
25    pub fn new(
26        view_index: Arc<ViewIndex>,
27        bus_manager: BusManager,
28        mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
29        metrics: Option<Arc<Metrics>>,
30    ) -> Self {
31        Self {
32            view_index,
33            bus_manager,
34            mutations_rx,
35            metrics,
36        }
37    }
38
39    #[cfg(not(feature = "otel"))]
40    pub fn new(
41        view_index: Arc<ViewIndex>,
42        bus_manager: BusManager,
43        mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
44    ) -> Self {
45        Self {
46            view_index,
47            bus_manager,
48            mutations_rx,
49        }
50    }
51
52    pub async fn run(mut self) {
53        debug!("Projector started");
54
55        while let Some(mutations) = self.mutations_rx.recv().await {
56            for mutation in mutations.iter() {
57                #[cfg(feature = "otel")]
58                let start = Instant::now();
59
60                if let Err(e) = self.process_mutation(mutation).await {
61                    error!("Failed to process mutation: {}", e);
62                }
63
64                #[cfg(feature = "otel")]
65                if let Some(ref metrics) = self.metrics {
66                    let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
67                    metrics.record_projector_latency(latency_ms);
68                    metrics.record_mutation_processed(&mutation.export);
69                }
70            }
71        }
72
73        debug!("Projector stopped");
74    }
75
76    async fn process_mutation(
77        &self,
78        mutation: &hyperstack_interpreter::Mutation,
79    ) -> anyhow::Result<()> {
80        let specs = self.view_index.by_export(&mutation.export);
81
82        for spec in specs {
83            if !self.should_process(spec, mutation) {
84                continue;
85            }
86
87            let frame = self.create_frame(spec, mutation).await?;
88            let key = mutation
89                .key
90                .as_str()
91                .map(|s| s.to_string())
92                .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
93                .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
94                .or_else(|| {
95                    // Handle byte arrays by converting to hex string
96                    mutation.key.as_array().and_then(|arr| {
97                        let bytes: Vec<u8> = arr
98                            .iter()
99                            .filter_map(|v| v.as_u64().map(|n| n as u8))
100                            .collect();
101                        if bytes.len() == arr.len() {
102                            Some(hex::encode(&bytes))
103                        } else {
104                            None
105                        }
106                    })
107                })
108                .unwrap_or_else(|| mutation.key.to_string());
109
110            let message = Arc::new(BusMessage {
111                key: key.clone(),
112                entity: spec.id.clone(),
113                payload: Arc::new(Bytes::from(serde_json::to_vec(&frame)?)),
114            });
115
116            self.publish_frame(spec, mutation, message).await;
117
118            #[cfg(feature = "otel")]
119            if let Some(ref metrics) = self.metrics {
120                let mode_str = match spec.mode {
121                    Mode::Kv => "kv",
122                    Mode::List => "list",
123                    Mode::State => "state",
124                    Mode::Append => "append",
125                };
126                metrics.record_frame_published(mode_str, &spec.export);
127            }
128        }
129
130        Ok(())
131    }
132
133    fn should_process(&self, spec: &ViewSpec, mutation: &hyperstack_interpreter::Mutation) -> bool {
134        let key = mutation
135            .key
136            .as_str()
137            .map(|s| s.to_string())
138            .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
139            .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
140            .unwrap_or_else(|| mutation.key.to_string());
141
142        spec.filters.matches(&key)
143    }
144
145    async fn create_frame(
146        &self,
147        spec: &ViewSpec,
148        mutation: &hyperstack_interpreter::Mutation,
149    ) -> anyhow::Result<Frame> {
150        match spec.mode {
151            Mode::Kv | Mode::State => self.create_kv_frame(spec, mutation),
152            Mode::List => self.create_list_frame(spec, mutation),
153            Mode::Append => self.create_append_frame(spec, mutation),
154        }
155    }
156
157    fn create_kv_frame(
158        &self,
159        spec: &ViewSpec,
160        mutation: &hyperstack_interpreter::Mutation,
161    ) -> anyhow::Result<Frame> {
162        let key = mutation
163            .key
164            .as_str()
165            .map(|s| s.to_string())
166            .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
167            .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
168            .unwrap_or_else(|| mutation.key.to_string());
169
170        let projected = spec.projection.apply(mutation.patch.clone());
171
172        Ok(Frame {
173            mode: spec.mode,
174            export: spec.id.clone(),
175            op: "patch",
176            key,
177            data: projected,
178        })
179    }
180
181    fn create_list_frame(
182        &self,
183        spec: &ViewSpec,
184        mutation: &hyperstack_interpreter::Mutation,
185    ) -> anyhow::Result<Frame> {
186        let key = mutation
187            .key
188            .as_str()
189            .map(|s| s.to_string())
190            .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
191            .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
192            .unwrap_or_else(|| mutation.key.to_string());
193
194        let projected = spec.projection.apply(mutation.patch.clone());
195
196        let list_item = serde_json::json!({
197            "id": key,
198            "order": 0,
199            "item": projected,
200        });
201
202        Ok(Frame {
203            mode: spec.mode,
204            export: spec.id.clone(),
205            op: "patch",
206            key: key.clone(),
207            data: list_item,
208        })
209    }
210
211    fn create_append_frame(
212        &self,
213        spec: &ViewSpec,
214        mutation: &hyperstack_interpreter::Mutation,
215    ) -> anyhow::Result<Frame> {
216        self.create_kv_frame(spec, mutation)
217    }
218
219    async fn publish_frame(
220        &self,
221        spec: &ViewSpec,
222        _mutation: &hyperstack_interpreter::Mutation,
223        message: Arc<BusMessage>,
224    ) {
225        match spec.mode {
226            Mode::State => {
227                self.bus_manager
228                    .publish_state(&spec.id, &message.key, message.payload.clone())
229                    .await;
230            }
231            Mode::Kv | Mode::Append => {
232                self.bus_manager.publish_kv(&spec.id, message).await;
233            }
234            Mode::List => {
235                self.bus_manager.publish_list(&spec.id, message).await;
236            }
237        }
238    }
239}