1use sim_kernel::{
2 Args, Cx, Error, EvalMode, EvalRequest, Expr, ReadPolicy, Result, Symbol, Value,
3 read_eval_capability,
4};
5use sim_lib_stream_core::{ClockDomain, LatencyClass, StreamEndpoint, StreamEndpointKind};
6
7use crate::{
8 Connection, FrameKind, ServerAddress, ServerFrame, eval_reply_from_frame,
9 eval_request_from_frame, server_frame_from_reply, server_frame_from_request,
10};
11
12use super::core::{
13 EvalSite, Site, SiteKind, eval_site_clock_domain, eval_site_endpoint_kind,
14 eval_site_latency_class, reply_codec_for_frame, site_endpoint_id,
15};
16
17#[derive(Clone)]
20pub struct PipelineEvalSite {
21 address: ServerAddress,
22 codecs: Vec<Symbol>,
23 steps: Vec<Connection>,
24}
25
26impl PipelineEvalSite {
27 pub fn new(address: ServerAddress, codecs: Vec<Symbol>, steps: Vec<Connection>) -> Self {
30 Self {
31 address,
32 codecs,
33 steps,
34 }
35 }
36
37 pub fn steps(&self) -> &[Connection] {
39 &self.steps
40 }
41}
42
43impl EvalSite for PipelineEvalSite {
44 fn site_kind(&self) -> &'static str {
45 "pipeline"
46 }
47
48 fn address(&self) -> &ServerAddress {
49 &self.address
50 }
51
52 fn codecs(&self) -> &[Symbol] {
53 &self.codecs
54 }
55
56 fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
57 if self.steps.is_empty() {
58 return Err(Error::Eval(
59 "pipeline eval site requires at least one step".to_owned(),
60 ));
61 }
62
63 match frame.kind {
64 FrameKind::Request => answer_pipeline_request(cx, self, &self.steps, frame),
65 _ => answer_pipeline_passthrough(cx, &self.steps, frame),
66 }
67 }
68
69 fn as_any(&self) -> &dyn std::any::Any {
70 self
71 }
72}
73
74impl StreamEndpoint for PipelineEvalSite {
75 fn endpoint_id(&self) -> Symbol {
76 site_endpoint_id(SiteKind::Pipeline, &self.address)
77 }
78
79 fn endpoint_kind(&self) -> StreamEndpointKind {
80 eval_site_endpoint_kind()
81 }
82
83 fn clock_domain(&self) -> ClockDomain {
84 eval_site_clock_domain()
85 }
86
87 fn latency_class(&self) -> LatencyClass {
88 eval_site_latency_class()
89 }
90}
91
92impl Site for PipelineEvalSite {
93 fn kind(&self) -> SiteKind {
94 SiteKind::Pipeline
95 }
96}
97
98#[derive(Clone)]
101pub struct LoopEvalSite {
102 address: ServerAddress,
103 codecs: Vec<Symbol>,
104 steps: Vec<Connection>,
105 max_iterations: usize,
106 until: Value,
107}
108
109impl LoopEvalSite {
110 pub fn new(
113 address: ServerAddress,
114 codecs: Vec<Symbol>,
115 steps: Vec<Connection>,
116 max_iterations: usize,
117 until: Value,
118 ) -> Self {
119 Self {
120 address,
121 codecs,
122 steps,
123 max_iterations,
124 until,
125 }
126 }
127}
128
129impl EvalSite for LoopEvalSite {
130 fn site_kind(&self) -> &'static str {
131 "loop"
132 }
133
134 fn address(&self) -> &ServerAddress {
135 &self.address
136 }
137
138 fn codecs(&self) -> &[Symbol] {
139 &self.codecs
140 }
141
142 fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
143 if self.steps.is_empty() {
144 return Err(Error::Eval(
145 "loop eval site requires at least one step".to_owned(),
146 ));
147 }
148
149 let mut current = frame;
150 for _ in 0..self.max_iterations {
151 current = match current.kind {
152 FrameKind::Request => answer_pipeline_request(cx, self, &self.steps, current)?,
153 FrameKind::Response => {
154 let request_frame = response_as_request(cx, current)?;
155 answer_pipeline_request(cx, self, &self.steps, request_frame)?
156 }
157 _ => answer_pipeline_passthrough(cx, &self.steps, current)?,
158 };
159 if loop_until_fired(cx, &self.until, ¤t)? {
160 return Ok(current);
161 }
162 }
163 Ok(current)
164 }
165
166 fn as_any(&self) -> &dyn std::any::Any {
167 self
168 }
169}
170
171impl StreamEndpoint for LoopEvalSite {
172 fn endpoint_id(&self) -> Symbol {
173 site_endpoint_id(SiteKind::Loop, &self.address)
174 }
175
176 fn endpoint_kind(&self) -> StreamEndpointKind {
177 eval_site_endpoint_kind()
178 }
179
180 fn clock_domain(&self) -> ClockDomain {
181 eval_site_clock_domain()
182 }
183
184 fn latency_class(&self) -> LatencyClass {
185 eval_site_latency_class()
186 }
187}
188
189impl Site for LoopEvalSite {
190 fn kind(&self) -> SiteKind {
191 SiteKind::Loop
192 }
193}
194
195fn answer_pipeline_request(
196 cx: &mut Cx,
197 site: &dyn EvalSite,
198 steps: &[Connection],
199 frame: ServerFrame,
200) -> Result<ServerFrame> {
201 let request = eval_request_from_frame(cx, &frame)?;
202 let mut current_expr = request.expr;
203 let mut last_reply = None;
204 let consistency = frame.envelope.consistency;
205
206 for (index, step) in steps.iter().enumerate() {
207 let step_request = EvalRequest {
208 expr: current_expr,
209 result_shape: None,
210 required_capabilities: frame.envelope.required_capabilities.clone(),
211 deadline: frame.envelope.deadline,
212 consistency,
213 mode: EvalMode::Eval,
214 answer_limit: None,
215 stream_buffer: None,
216 stream: false,
217 trace: frame.envelope.trace,
218 };
219 let mut step_frame = server_frame_from_request(cx, step.default_codec(), step_request)?;
220 step_frame.msg_id = frame.msg_id;
221 step_frame.correlate = frame.correlate;
222 step_frame.envelope.reply_codec_hint = frame.envelope.reply_codec_hint.clone();
223 step_frame.envelope.role = step.role().cloned().or_else(|| frame.envelope.role.clone());
224 step_frame.envelope.trigger_source = frame.envelope.trigger_source.clone();
225 step_frame.envelope.hop = frame.envelope.hop.saturating_add(index as u32 + 1);
226
227 let mut reply_frame = step.site().answer(cx, step_frame)?;
228 reply_frame.envelope.role = step.role().cloned().or(reply_frame.envelope.role);
229 reply_frame.envelope.hop = frame.envelope.hop.saturating_add(index as u32 + 1);
230 let reply = eval_reply_from_frame(cx, &reply_frame)?;
231 current_expr = reply.value.object().as_expr(cx)?;
232 last_reply = Some(reply);
233 }
234
235 let mut final_frame = server_frame_from_reply(
236 cx,
237 &reply_codec_for_frame(site, &frame),
238 last_reply.expect("pipeline steps are non-empty"),
239 consistency,
240 )?;
241 final_frame.msg_id = frame.msg_id;
242 final_frame.correlate = frame.correlate;
243 final_frame.envelope.reply_codec_hint = frame.envelope.reply_codec_hint;
244 final_frame.envelope.role = steps
245 .last()
246 .and_then(|step| step.role().cloned())
247 .or(frame.envelope.role);
248 final_frame.envelope.trigger_source = frame.envelope.trigger_source;
249 final_frame.envelope.hop = frame.envelope.hop.saturating_add(steps.len() as u32);
250 Ok(final_frame)
251}
252
253fn answer_pipeline_passthrough(
254 cx: &mut Cx,
255 steps: &[Connection],
256 frame: ServerFrame,
257) -> Result<ServerFrame> {
258 let mut current = frame;
259 for (index, step) in steps.iter().enumerate() {
260 current.codec = step.default_codec().clone();
261 current.envelope.role = step
262 .role()
263 .cloned()
264 .or_else(|| current.envelope.role.clone());
265 current.envelope.hop = current.envelope.hop.saturating_add(1);
266 current = step.site().answer(cx, current)?;
267 current.envelope.hop = index as u32 + 1;
268 }
269 Ok(current)
270}
271
272fn loop_until_fired(cx: &mut Cx, until: &Value, frame: &ServerFrame) -> Result<bool> {
273 let value = match frame.kind {
274 FrameKind::Response => eval_reply_from_frame(cx, frame)?.value,
275 _ => {
276 let expr = frame.decode_expr(cx, ReadPolicy::default())?;
277 cx.factory().expr(expr)?
278 }
279 };
280 let fired = cx.call_value(until.clone(), Args::new(vec![value]))?;
281 fired.object().truth(cx)
282}
283
284fn response_as_request(cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
285 let reply = eval_reply_from_frame(cx, &frame)?;
286 let request = EvalRequest {
287 expr: reply.value.object().as_expr(cx)?,
288 result_shape: None,
289 required_capabilities: frame.envelope.required_capabilities.clone(),
290 deadline: frame.envelope.deadline,
291 consistency: frame.envelope.consistency,
292 mode: EvalMode::Eval,
293 answer_limit: None,
294 stream_buffer: None,
295 stream: false,
296 trace: frame.envelope.trace,
297 };
298 let mut request_frame = server_frame_from_request(cx, &frame.codec, request)?;
299 request_frame.msg_id = frame.msg_id;
300 request_frame.correlate = frame.correlate;
301 request_frame.envelope.reply_codec_hint = frame.envelope.reply_codec_hint;
302 request_frame.envelope.role = frame.envelope.role;
303 request_frame.envelope.hop = frame.envelope.hop;
304 request_frame.envelope.trigger_source = frame.envelope.trigger_source;
305 Ok(request_frame)
306}
307
308pub(crate) fn eval_request_from_trigger_frame(
309 cx: &mut Cx,
310 frame: &ServerFrame,
311) -> Result<EvalRequest> {
312 let expr = frame.decode_expr(cx, ReadPolicy::default())?;
313 enforce_trigger_eval_policy(cx, &expr)?;
314 Ok(EvalRequest {
315 expr,
316 result_shape: None,
317 required_capabilities: frame.envelope.required_capabilities.clone(),
318 deadline: frame.envelope.deadline,
319 consistency: frame.envelope.consistency,
320 mode: EvalMode::Eval,
321 answer_limit: None,
322 stream_buffer: None,
323 stream: false,
324 trace: frame.envelope.trace,
325 })
326}
327
328pub(crate) fn enforce_trigger_eval_policy(cx: &Cx, expr: &Expr) -> Result<()> {
329 if is_self_evaluating_trigger_expr(expr) {
330 cx.require(&read_eval_capability())?;
331 }
332 Ok(())
333}
334
335fn is_self_evaluating_trigger_expr(expr: &Expr) -> bool {
336 match expr {
337 Expr::Nil
338 | Expr::Bool(_)
339 | Expr::Number(_)
340 | Expr::Symbol(_)
341 | Expr::Local(_)
342 | Expr::String(_)
343 | Expr::Bytes(_)
344 | Expr::List(_)
345 | Expr::Vector(_)
346 | Expr::Map(_)
347 | Expr::Set(_)
348 | Expr::Quote { .. } => true,
349 Expr::Block(_)
350 | Expr::Call { .. }
351 | Expr::Infix { .. }
352 | Expr::Prefix { .. }
353 | Expr::Postfix { .. }
354 | Expr::Annotated { .. }
355 | Expr::Extension { .. } => false,
356 }
357}