Skip to main content

sim_lib_server/site/
pipeline.rs

1use sim_kernel::{
2    Args, Cx, Error, EvalMode, EvalRequest, Expr, ReadPolicy, Result, Symbol, Value,
3    read_eval_capability,
4};
5use sim_lib_stream_core::{ClockDomain, LatencyClass, StreamEndpoint, StreamEndpointKind};
6
7use crate::{
8    Connection, FrameKind, ServerAddress, ServerFrame, eval_reply_from_frame,
9    eval_request_from_frame, server_frame_from_reply, server_frame_from_request,
10};
11
12use super::core::{
13    EvalSite, Site, SiteKind, eval_site_clock_domain, eval_site_endpoint_kind,
14    eval_site_latency_class, reply_codec_for_frame, site_endpoint_id,
15};
16
17/// An eval site that threads each request through an ordered chain of
18/// connection steps, feeding each step's reply into the next.
19#[derive(Clone)]
20pub struct PipelineEvalSite {
21    address: ServerAddress,
22    codecs: Vec<Symbol>,
23    steps: Vec<Connection>,
24}
25
26impl PipelineEvalSite {
27    /// Creates a pipeline site answering at `address` over `codecs`, running the
28    /// given ordered `steps`.
29    pub fn new(address: ServerAddress, codecs: Vec<Symbol>, steps: Vec<Connection>) -> Self {
30        Self {
31            address,
32            codecs,
33            steps,
34        }
35    }
36
37    /// Returns the pipeline's ordered connection steps.
38    pub fn steps(&self) -> &[Connection] {
39        &self.steps
40    }
41}
42
43impl EvalSite for PipelineEvalSite {
44    fn site_kind(&self) -> &'static str {
45        "pipeline"
46    }
47
48    fn address(&self) -> &ServerAddress {
49        &self.address
50    }
51
52    fn codecs(&self) -> &[Symbol] {
53        &self.codecs
54    }
55
56    fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
57        if self.steps.is_empty() {
58            return Err(Error::Eval(
59                "pipeline eval site requires at least one step".to_owned(),
60            ));
61        }
62
63        match frame.kind {
64            FrameKind::Request => answer_pipeline_request(cx, self, &self.steps, frame),
65            _ => answer_pipeline_passthrough(cx, &self.steps, frame),
66        }
67    }
68
69    fn as_any(&self) -> &dyn std::any::Any {
70        self
71    }
72}
73
74impl StreamEndpoint for PipelineEvalSite {
75    fn endpoint_id(&self) -> Symbol {
76        site_endpoint_id(SiteKind::Pipeline, &self.address)
77    }
78
79    fn endpoint_kind(&self) -> StreamEndpointKind {
80        eval_site_endpoint_kind()
81    }
82
83    fn clock_domain(&self) -> ClockDomain {
84        eval_site_clock_domain()
85    }
86
87    fn latency_class(&self) -> LatencyClass {
88        eval_site_latency_class()
89    }
90}
91
92impl Site for PipelineEvalSite {
93    fn kind(&self) -> SiteKind {
94        SiteKind::Pipeline
95    }
96}
97
98/// An eval site that repeatedly runs its pipeline steps -- up to
99/// `max_iterations` times -- until the `until` predicate fires on a reply.
100#[derive(Clone)]
101pub struct LoopEvalSite {
102    address: ServerAddress,
103    codecs: Vec<Symbol>,
104    steps: Vec<Connection>,
105    max_iterations: usize,
106    until: Value,
107}
108
109impl LoopEvalSite {
110    /// Creates a loop site answering at `address` over `codecs`, running `steps`
111    /// at most `max_iterations` times or until `until` fires.
112    pub fn new(
113        address: ServerAddress,
114        codecs: Vec<Symbol>,
115        steps: Vec<Connection>,
116        max_iterations: usize,
117        until: Value,
118    ) -> Self {
119        Self {
120            address,
121            codecs,
122            steps,
123            max_iterations,
124            until,
125        }
126    }
127}
128
129impl EvalSite for LoopEvalSite {
130    fn site_kind(&self) -> &'static str {
131        "loop"
132    }
133
134    fn address(&self) -> &ServerAddress {
135        &self.address
136    }
137
138    fn codecs(&self) -> &[Symbol] {
139        &self.codecs
140    }
141
142    fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
143        if self.steps.is_empty() {
144            return Err(Error::Eval(
145                "loop eval site requires at least one step".to_owned(),
146            ));
147        }
148
149        let mut current = frame;
150        for _ in 0..self.max_iterations {
151            current = match current.kind {
152                FrameKind::Request => answer_pipeline_request(cx, self, &self.steps, current)?,
153                FrameKind::Response => {
154                    let request_frame = response_as_request(cx, current)?;
155                    answer_pipeline_request(cx, self, &self.steps, request_frame)?
156                }
157                _ => answer_pipeline_passthrough(cx, &self.steps, current)?,
158            };
159            if loop_until_fired(cx, &self.until, &current)? {
160                return Ok(current);
161            }
162        }
163        Ok(current)
164    }
165
166    fn as_any(&self) -> &dyn std::any::Any {
167        self
168    }
169}
170
171impl StreamEndpoint for LoopEvalSite {
172    fn endpoint_id(&self) -> Symbol {
173        site_endpoint_id(SiteKind::Loop, &self.address)
174    }
175
176    fn endpoint_kind(&self) -> StreamEndpointKind {
177        eval_site_endpoint_kind()
178    }
179
180    fn clock_domain(&self) -> ClockDomain {
181        eval_site_clock_domain()
182    }
183
184    fn latency_class(&self) -> LatencyClass {
185        eval_site_latency_class()
186    }
187}
188
189impl Site for LoopEvalSite {
190    fn kind(&self) -> SiteKind {
191        SiteKind::Loop
192    }
193}
194
195fn answer_pipeline_request(
196    cx: &mut Cx,
197    site: &dyn EvalSite,
198    steps: &[Connection],
199    frame: ServerFrame,
200) -> Result<ServerFrame> {
201    let request = eval_request_from_frame(cx, &frame)?;
202    let mut current_expr = request.expr;
203    let mut last_reply = None;
204    let consistency = frame.envelope.consistency;
205
206    for (index, step) in steps.iter().enumerate() {
207        let step_request = EvalRequest {
208            expr: current_expr,
209            result_shape: None,
210            required_capabilities: frame.envelope.required_capabilities.clone(),
211            deadline: frame.envelope.deadline,
212            consistency,
213            mode: EvalMode::Eval,
214            answer_limit: None,
215            stream_buffer: None,
216            stream: false,
217            trace: frame.envelope.trace,
218        };
219        let mut step_frame = server_frame_from_request(cx, step.default_codec(), step_request)?;
220        step_frame.msg_id = frame.msg_id;
221        step_frame.correlate = frame.correlate;
222        step_frame.envelope.reply_codec_hint = frame.envelope.reply_codec_hint.clone();
223        step_frame.envelope.role = step.role().cloned().or_else(|| frame.envelope.role.clone());
224        step_frame.envelope.trigger_source = frame.envelope.trigger_source.clone();
225        step_frame.envelope.hop = frame.envelope.hop.saturating_add(index as u32 + 1);
226
227        let mut reply_frame = step.site().answer(cx, step_frame)?;
228        reply_frame.envelope.role = step.role().cloned().or(reply_frame.envelope.role);
229        reply_frame.envelope.hop = frame.envelope.hop.saturating_add(index as u32 + 1);
230        let reply = eval_reply_from_frame(cx, &reply_frame)?;
231        current_expr = reply.value.object().as_expr(cx)?;
232        last_reply = Some(reply);
233    }
234
235    let mut final_frame = server_frame_from_reply(
236        cx,
237        &reply_codec_for_frame(site, &frame),
238        last_reply.expect("pipeline steps are non-empty"),
239        consistency,
240    )?;
241    final_frame.msg_id = frame.msg_id;
242    final_frame.correlate = frame.correlate;
243    final_frame.envelope.reply_codec_hint = frame.envelope.reply_codec_hint;
244    final_frame.envelope.role = steps
245        .last()
246        .and_then(|step| step.role().cloned())
247        .or(frame.envelope.role);
248    final_frame.envelope.trigger_source = frame.envelope.trigger_source;
249    final_frame.envelope.hop = frame.envelope.hop.saturating_add(steps.len() as u32);
250    Ok(final_frame)
251}
252
253fn answer_pipeline_passthrough(
254    cx: &mut Cx,
255    steps: &[Connection],
256    frame: ServerFrame,
257) -> Result<ServerFrame> {
258    let mut current = frame;
259    for (index, step) in steps.iter().enumerate() {
260        current.codec = step.default_codec().clone();
261        current.envelope.role = step
262            .role()
263            .cloned()
264            .or_else(|| current.envelope.role.clone());
265        current.envelope.hop = current.envelope.hop.saturating_add(1);
266        current = step.site().answer(cx, current)?;
267        current.envelope.hop = index as u32 + 1;
268    }
269    Ok(current)
270}
271
272fn loop_until_fired(cx: &mut Cx, until: &Value, frame: &ServerFrame) -> Result<bool> {
273    let value = match frame.kind {
274        FrameKind::Response => eval_reply_from_frame(cx, frame)?.value,
275        _ => {
276            let expr = frame.decode_expr(cx, ReadPolicy::default())?;
277            cx.factory().expr(expr)?
278        }
279    };
280    let fired = cx.call_value(until.clone(), Args::new(vec![value]))?;
281    fired.object().truth(cx)
282}
283
284fn response_as_request(cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
285    let reply = eval_reply_from_frame(cx, &frame)?;
286    let request = EvalRequest {
287        expr: reply.value.object().as_expr(cx)?,
288        result_shape: None,
289        required_capabilities: frame.envelope.required_capabilities.clone(),
290        deadline: frame.envelope.deadline,
291        consistency: frame.envelope.consistency,
292        mode: EvalMode::Eval,
293        answer_limit: None,
294        stream_buffer: None,
295        stream: false,
296        trace: frame.envelope.trace,
297    };
298    let mut request_frame = server_frame_from_request(cx, &frame.codec, request)?;
299    request_frame.msg_id = frame.msg_id;
300    request_frame.correlate = frame.correlate;
301    request_frame.envelope.reply_codec_hint = frame.envelope.reply_codec_hint;
302    request_frame.envelope.role = frame.envelope.role;
303    request_frame.envelope.hop = frame.envelope.hop;
304    request_frame.envelope.trigger_source = frame.envelope.trigger_source;
305    Ok(request_frame)
306}
307
308pub(crate) fn eval_request_from_trigger_frame(
309    cx: &mut Cx,
310    frame: &ServerFrame,
311) -> Result<EvalRequest> {
312    let expr = frame.decode_expr(cx, ReadPolicy::default())?;
313    enforce_trigger_eval_policy(cx, &expr)?;
314    Ok(EvalRequest {
315        expr,
316        result_shape: None,
317        required_capabilities: frame.envelope.required_capabilities.clone(),
318        deadline: frame.envelope.deadline,
319        consistency: frame.envelope.consistency,
320        mode: EvalMode::Eval,
321        answer_limit: None,
322        stream_buffer: None,
323        stream: false,
324        trace: frame.envelope.trace,
325    })
326}
327
328pub(crate) fn enforce_trigger_eval_policy(cx: &Cx, expr: &Expr) -> Result<()> {
329    if is_self_evaluating_trigger_expr(expr) {
330        cx.require(&read_eval_capability())?;
331    }
332    Ok(())
333}
334
335fn is_self_evaluating_trigger_expr(expr: &Expr) -> bool {
336    match expr {
337        Expr::Nil
338        | Expr::Bool(_)
339        | Expr::Number(_)
340        | Expr::Symbol(_)
341        | Expr::Local(_)
342        | Expr::String(_)
343        | Expr::Bytes(_)
344        | Expr::List(_)
345        | Expr::Vector(_)
346        | Expr::Map(_)
347        | Expr::Set(_)
348        | Expr::Quote { .. } => true,
349        Expr::Block(_)
350        | Expr::Call { .. }
351        | Expr::Infix { .. }
352        | Expr::Prefix { .. }
353        | Expr::Postfix { .. }
354        | Expr::Annotated { .. }
355        | Expr::Extension { .. } => false,
356    }
357}