hyperstack_server/
projector.rs

1use crate::bus::{BusManager, BusMessage};
2use crate::view::{ViewIndex, ViewSpec};
3use crate::websocket::frame::{Frame, Mode};
4use bytes::Bytes;
5use smallvec::SmallVec;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::mpsc;
9use tracing::{debug, error};
10
11#[cfg(feature = "otel")]
12use crate::metrics::Metrics;
13
14pub struct Projector {
15    view_index: Arc<ViewIndex>,
16    bus_manager: BusManager,
17    mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
18    #[cfg(feature = "otel")]
19    metrics: Option<Arc<Metrics>>,
20}
21
22impl Projector {
23    #[cfg(feature = "otel")]
24    pub fn new(
25        view_index: Arc<ViewIndex>,
26        bus_manager: BusManager,
27        mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
28        metrics: Option<Arc<Metrics>>,
29    ) -> Self {
30        Self {
31            view_index,
32            bus_manager,
33            mutations_rx,
34            metrics,
35        }
36    }
37
38    #[cfg(not(feature = "otel"))]
39    pub fn new(
40        view_index: Arc<ViewIndex>,
41        bus_manager: BusManager,
42        mutations_rx: mpsc::Receiver<SmallVec<[hyperstack_interpreter::Mutation; 6]>>,
43    ) -> Self {
44        Self {
45            view_index,
46            bus_manager,
47            mutations_rx,
48        }
49    }
50
51    pub async fn run(mut self) {
52        debug!("Projector started");
53
54        while let Some(mutations) = self.mutations_rx.recv().await {
55            for mutation in mutations.iter() {
56                let _start = Instant::now();
57
58                if let Err(e) = self.process_mutation(mutation).await {
59                    error!("Failed to process mutation: {}", e);
60                }
61
62                #[cfg(feature = "otel")]
63                if let Some(ref metrics) = self.metrics {
64                    let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
65                    metrics.record_projector_latency(latency_ms);
66                    metrics.record_mutation_processed(&mutation.export);
67                }
68            }
69        }
70
71        debug!("Projector stopped");
72    }
73
74    async fn process_mutation(
75        &self,
76        mutation: &hyperstack_interpreter::Mutation,
77    ) -> anyhow::Result<()> {
78        let specs = self.view_index.by_export(&mutation.export);
79
80        for spec in specs {
81            if !self.should_process(spec, mutation) {
82                continue;
83            }
84
85            let frame = self.create_frame(spec, mutation).await?;
86            let key = mutation
87                .key
88                .as_str()
89                .map(|s| s.to_string())
90                .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
91                .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
92                .or_else(|| {
93                    // Handle byte arrays by converting to hex string
94                    mutation.key.as_array().and_then(|arr| {
95                        let bytes: Vec<u8> = arr
96                            .iter()
97                            .filter_map(|v| v.as_u64().map(|n| n as u8))
98                            .collect();
99                        if bytes.len() == arr.len() {
100                            Some(hex::encode(&bytes))
101                        } else {
102                            None
103                        }
104                    })
105                })
106                .unwrap_or_else(|| mutation.key.to_string());
107
108            let message = Arc::new(BusMessage {
109                key: key.clone(),
110                entity: spec.id.clone(),
111                payload: Arc::new(Bytes::from(serde_json::to_vec(&frame)?)),
112            });
113
114            self.publish_frame(spec, mutation, message).await;
115
116            #[cfg(feature = "otel")]
117            if let Some(ref metrics) = self.metrics {
118                let mode_str = match spec.mode {
119                    Mode::Kv => "kv",
120                    Mode::List => "list",
121                    Mode::State => "state",
122                    Mode::Append => "append",
123                };
124                metrics.record_frame_published(mode_str, &spec.export);
125            }
126        }
127
128        Ok(())
129    }
130
131    fn should_process(&self, spec: &ViewSpec, mutation: &hyperstack_interpreter::Mutation) -> bool {
132        let key = mutation
133            .key
134            .as_str()
135            .map(|s| s.to_string())
136            .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
137            .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
138            .unwrap_or_else(|| mutation.key.to_string());
139
140        spec.filters.matches(&key)
141    }
142
143    async fn create_frame(
144        &self,
145        spec: &ViewSpec,
146        mutation: &hyperstack_interpreter::Mutation,
147    ) -> anyhow::Result<Frame> {
148        match spec.mode {
149            Mode::Kv | Mode::State => self.create_kv_frame(spec, mutation),
150            Mode::List => self.create_list_frame(spec, mutation),
151            Mode::Append => self.create_append_frame(spec, mutation),
152        }
153    }
154
155    fn create_kv_frame(
156        &self,
157        spec: &ViewSpec,
158        mutation: &hyperstack_interpreter::Mutation,
159    ) -> anyhow::Result<Frame> {
160        let key = mutation
161            .key
162            .as_str()
163            .map(|s| s.to_string())
164            .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
165            .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
166            .unwrap_or_else(|| mutation.key.to_string());
167
168        let projected = spec.projection.apply(mutation.patch.clone());
169
170        Ok(Frame {
171            mode: spec.mode,
172            export: spec.id.clone(),
173            op: "patch",
174            key,
175            data: projected,
176        })
177    }
178
179    fn create_list_frame(
180        &self,
181        spec: &ViewSpec,
182        mutation: &hyperstack_interpreter::Mutation,
183    ) -> anyhow::Result<Frame> {
184        let key = mutation
185            .key
186            .as_str()
187            .map(|s| s.to_string())
188            .or_else(|| mutation.key.as_u64().map(|n| n.to_string()))
189            .or_else(|| mutation.key.as_i64().map(|n| n.to_string()))
190            .unwrap_or_else(|| mutation.key.to_string());
191
192        let projected = spec.projection.apply(mutation.patch.clone());
193
194        let list_item = serde_json::json!({
195            "id": key,
196            "order": 0,
197            "item": projected,
198        });
199
200        Ok(Frame {
201            mode: spec.mode,
202            export: spec.id.clone(),
203            op: "patch",
204            key: key.clone(),
205            data: list_item,
206        })
207    }
208
209    fn create_append_frame(
210        &self,
211        spec: &ViewSpec,
212        mutation: &hyperstack_interpreter::Mutation,
213    ) -> anyhow::Result<Frame> {
214        self.create_kv_frame(spec, mutation)
215    }
216
217    async fn publish_frame(
218        &self,
219        spec: &ViewSpec,
220        _mutation: &hyperstack_interpreter::Mutation,
221        message: Arc<BusMessage>,
222    ) {
223        match spec.mode {
224            Mode::State => {
225                self.bus_manager
226                    .publish_state(&spec.id, &message.key, message.payload.clone())
227                    .await;
228            }
229            Mode::Kv | Mode::Append => {
230                self.bus_manager.publish_kv(&spec.id, message).await;
231            }
232            Mode::List => {
233                self.bus_manager.publish_list(&spec.id, message).await;
234            }
235        }
236    }
237}