Skip to main content

sim_lib_server/site/
single.rs

1use std::sync::Arc;
2
3use sim_kernel::{Cx, Error, EvalFabric, EvalReply, ReadPolicy, Result, Symbol};
4use sim_lib_stream_core::{ClockDomain, LatencyClass, StreamEndpoint, StreamEndpointKind};
5
6use crate::{
7    Coroutine, FrameKind, ServerAddress, ServerFrame, decode_frame_payload,
8    eval_request_from_frame, server_frame_from_reply,
9};
10
11use super::core::{
12    EvalSite, Site, SiteKind, eval_site_clock_domain, eval_site_endpoint_kind,
13    eval_site_latency_class, reply_codec_for_frame, site_endpoint_id,
14};
15use super::pipeline::{enforce_trigger_eval_policy, eval_request_from_trigger_frame};
16
17/// An eval site that evaluates requests directly in the local runtime.
18#[derive(Clone)]
19pub struct LocalEvalSite {
20    address: ServerAddress,
21    codecs: Vec<Symbol>,
22}
23
24impl LocalEvalSite {
25    /// Creates a local site answering at `address` over `codecs`.
26    pub fn new(address: ServerAddress, codecs: Vec<Symbol>) -> Self {
27        Self { address, codecs }
28    }
29}
30
31impl EvalSite for LocalEvalSite {
32    fn site_kind(&self) -> &'static str {
33        "local"
34    }
35
36    fn address(&self) -> &ServerAddress {
37        &self.address
38    }
39
40    fn codecs(&self) -> &[Symbol] {
41        &self.codecs
42    }
43
44    fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
45        match frame.kind {
46            FrameKind::Request => {
47                let consistency = frame.envelope.consistency;
48                let reply_codec = reply_codec_for_frame(self, &frame);
49                let request = eval_request_from_frame(cx, &frame)?;
50                let reply = realize_locally(cx, request)?;
51                server_frame_from_reply(cx, &reply_codec, reply, consistency)
52            }
53            FrameKind::Trigger { .. } => {
54                realize_trigger_locally(cx, &frame)?;
55                Ok(frame)
56            }
57            FrameKind::Notify => {
58                realize_notify_locally(cx, &frame)?;
59                Ok(frame)
60            }
61            _ => Err(Error::Eval(format!(
62                "local eval site cannot answer frame kind {}",
63                frame.kind.as_symbol()
64            ))),
65        }
66    }
67
68    fn as_any(&self) -> &dyn std::any::Any {
69        self
70    }
71}
72
73impl StreamEndpoint for LocalEvalSite {
74    fn endpoint_id(&self) -> Symbol {
75        site_endpoint_id(SiteKind::Local, &self.address)
76    }
77
78    fn endpoint_kind(&self) -> StreamEndpointKind {
79        eval_site_endpoint_kind()
80    }
81
82    fn clock_domain(&self) -> ClockDomain {
83        eval_site_clock_domain()
84    }
85
86    fn latency_class(&self) -> LatencyClass {
87        eval_site_latency_class()
88    }
89}
90
91impl Site for LocalEvalSite {
92    fn kind(&self) -> SiteKind {
93        SiteKind::Local
94    }
95}
96
97/// An eval site that delegates requests to a distributed [`EvalFabric`].
98#[derive(Clone)]
99pub struct FabricEvalSite {
100    kind: &'static str,
101    address: ServerAddress,
102    codecs: Vec<Symbol>,
103    fabric: Arc<dyn EvalFabric>,
104}
105
106impl FabricEvalSite {
107    /// Creates a fabric site labeled `kind`, answering at `address` over
108    /// `codecs`, backed by `fabric`.
109    pub fn new(
110        kind: &'static str,
111        address: ServerAddress,
112        codecs: Vec<Symbol>,
113        fabric: Arc<dyn EvalFabric>,
114    ) -> Self {
115        Self {
116            kind,
117            address,
118            codecs,
119            fabric,
120        }
121    }
122}
123
124impl EvalSite for FabricEvalSite {
125    fn site_kind(&self) -> &'static str {
126        self.kind
127    }
128
129    fn address(&self) -> &ServerAddress {
130        &self.address
131    }
132
133    fn codecs(&self) -> &[Symbol] {
134        &self.codecs
135    }
136
137    fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
138        match frame.kind {
139            FrameKind::Trigger { .. } => {
140                let request = eval_request_from_trigger_frame(cx, &frame)?;
141                let _ = self.fabric.realize(cx, request)?;
142                Ok(frame)
143            }
144            _ => {
145                let consistency = frame.envelope.consistency;
146                let reply_codec = reply_codec_for_frame(self, &frame);
147                let request = eval_request_from_frame(cx, &frame)?;
148                let reply = self.fabric.realize(cx, request)?;
149                server_frame_from_reply(cx, &reply_codec, reply, consistency)
150            }
151        }
152    }
153
154    fn as_eval_fabric(&self) -> Option<&dyn EvalFabric> {
155        Some(self.fabric.as_ref())
156    }
157
158    fn as_any(&self) -> &dyn std::any::Any {
159        self
160    }
161}
162
163impl StreamEndpoint for FabricEvalSite {
164    fn endpoint_id(&self) -> Symbol {
165        site_endpoint_id(SiteKind::Fabric, &self.address)
166    }
167
168    fn endpoint_kind(&self) -> StreamEndpointKind {
169        eval_site_endpoint_kind()
170    }
171
172    fn clock_domain(&self) -> ClockDomain {
173        eval_site_clock_domain()
174    }
175
176    fn latency_class(&self) -> LatencyClass {
177        eval_site_latency_class()
178    }
179}
180
181impl Site for FabricEvalSite {
182    fn kind(&self) -> SiteKind {
183        SiteKind::Fabric
184    }
185}
186
187/// An eval site that resumes a [`Coroutine`] with each request's expression.
188#[derive(Clone)]
189pub struct CoroutineEvalSite {
190    address: ServerAddress,
191    codecs: Vec<Symbol>,
192    coroutine: Arc<Coroutine>,
193}
194
195impl CoroutineEvalSite {
196    /// Creates a coroutine site answering at `address` over `codecs`, resuming
197    /// `coroutine`.
198    pub fn new(address: ServerAddress, codecs: Vec<Symbol>, coroutine: Arc<Coroutine>) -> Self {
199        Self {
200            address,
201            codecs,
202            coroutine,
203        }
204    }
205
206    /// Returns the coroutine this site resumes.
207    pub fn coroutine(&self) -> &Arc<Coroutine> {
208        &self.coroutine
209    }
210}
211
212impl EvalSite for CoroutineEvalSite {
213    fn site_kind(&self) -> &'static str {
214        "coroutine"
215    }
216
217    fn address(&self) -> &ServerAddress {
218        &self.address
219    }
220
221    fn codecs(&self) -> &[Symbol] {
222        &self.codecs
223    }
224
225    fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
226        match frame.kind {
227            FrameKind::Request => {
228                let consistency = frame.envelope.consistency;
229                let reply_codec = reply_codec_for_frame(self, &frame);
230                let request = eval_request_from_frame(cx, &frame)?;
231                let input = cx.factory().expr(request.expr)?;
232                let reply = self.coroutine.resume(cx, input)?;
233                let diagnostics = cx.take_diagnostics();
234                server_frame_from_reply(
235                    cx,
236                    &reply_codec,
237                    EvalReply {
238                        value: reply,
239                        diagnostics,
240                        trace: None,
241                    },
242                    consistency,
243                )
244            }
245            FrameKind::Trigger { .. } => {
246                let expr = frame.decode_expr(cx, ReadPolicy::default())?;
247                enforce_trigger_eval_policy(cx, &expr)?;
248                let input = cx.factory().expr(expr)?;
249                let _ = self.coroutine.resume(cx, input)?;
250                Ok(frame)
251            }
252            FrameKind::Notify => {
253                let expr = decode_frame_payload(
254                    cx,
255                    &frame.codec,
256                    &frame.payload,
257                    ReadPolicy::default(),
258                    Default::default(),
259                )?;
260                let input = cx.factory().expr(expr)?;
261                let _ = self.coroutine.resume(cx, input)?;
262                Ok(frame)
263            }
264            _ => Err(Error::Eval(format!(
265                "coroutine eval site cannot answer frame kind {}",
266                frame.kind.as_symbol()
267            ))),
268        }
269    }
270
271    fn as_any(&self) -> &dyn std::any::Any {
272        self
273    }
274}
275
276impl StreamEndpoint for CoroutineEvalSite {
277    fn endpoint_id(&self) -> Symbol {
278        site_endpoint_id(SiteKind::Coroutine, &self.address)
279    }
280
281    fn endpoint_kind(&self) -> StreamEndpointKind {
282        eval_site_endpoint_kind()
283    }
284
285    fn clock_domain(&self) -> ClockDomain {
286        eval_site_clock_domain()
287    }
288
289    fn latency_class(&self) -> LatencyClass {
290        eval_site_latency_class()
291    }
292}
293
294impl Site for CoroutineEvalSite {
295    fn kind(&self) -> SiteKind {
296        SiteKind::Coroutine
297    }
298}
299
300fn realize_locally(cx: &mut Cx, request: sim_kernel::EvalRequest) -> Result<EvalReply> {
301    for capability in &request.required_capabilities {
302        cx.require(capability)?;
303    }
304    let value = cx.eval_expr(request.expr)?;
305    if let Some(shape) = &request.result_shape {
306        let Some(shape_object) = shape.object().as_shape() else {
307            return Err(Error::TypeMismatch {
308                expected: "shape",
309                found: "non-shape",
310            });
311        };
312        let matched = shape_object.check_value(cx, value.clone())?;
313        if !matched.accepted {
314            return Err(Error::WrongShape {
315                expected: shape_object.id().unwrap_or(sim_kernel::ShapeId(0)),
316                diagnostics: matched.diagnostics,
317            });
318        }
319    }
320    Ok(EvalReply {
321        value,
322        diagnostics: cx.take_diagnostics(),
323        trace: request
324            .trace
325            .then(|| cx.factory().symbol(Symbol::new("local")).ok())
326            .flatten(),
327    })
328}
329
330fn realize_notify_locally(cx: &mut Cx, frame: &ServerFrame) -> Result<()> {
331    for capability in &frame.envelope.required_capabilities {
332        cx.require(capability)?;
333    }
334    let expr = decode_frame_payload(
335        cx,
336        &frame.codec,
337        &frame.payload,
338        ReadPolicy::default(),
339        Default::default(),
340    )?;
341    cx.eval_expr(expr)?;
342    Ok(())
343}
344
345fn realize_trigger_locally(cx: &mut Cx, frame: &ServerFrame) -> Result<()> {
346    for capability in &frame.envelope.required_capabilities {
347        cx.require(capability)?;
348    }
349    let expr = frame.decode_expr(cx, ReadPolicy::default())?;
350    enforce_trigger_eval_policy(cx, &expr)?;
351    cx.eval_expr(expr)?;
352    Ok(())
353}