1use std::sync::Arc;
2
3use sim_kernel::{Cx, Error, EvalFabric, EvalReply, ReadPolicy, Result, Symbol};
4use sim_lib_stream_core::{ClockDomain, LatencyClass, StreamEndpoint, StreamEndpointKind};
5
6use crate::{
7 Coroutine, FrameKind, ServerAddress, ServerFrame, decode_frame_payload,
8 eval_request_from_frame, server_frame_from_reply,
9};
10
11use super::core::{
12 EvalSite, Site, SiteKind, eval_site_clock_domain, eval_site_endpoint_kind,
13 eval_site_latency_class, reply_codec_for_frame, site_endpoint_id,
14};
15use super::pipeline::{enforce_trigger_eval_policy, eval_request_from_trigger_frame};
16
17#[derive(Clone)]
19pub struct LocalEvalSite {
20 address: ServerAddress,
21 codecs: Vec<Symbol>,
22}
23
24impl LocalEvalSite {
25 pub fn new(address: ServerAddress, codecs: Vec<Symbol>) -> Self {
27 Self { address, codecs }
28 }
29}
30
31impl EvalSite for LocalEvalSite {
32 fn site_kind(&self) -> &'static str {
33 "local"
34 }
35
36 fn address(&self) -> &ServerAddress {
37 &self.address
38 }
39
40 fn codecs(&self) -> &[Symbol] {
41 &self.codecs
42 }
43
44 fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
45 match frame.kind {
46 FrameKind::Request => {
47 let consistency = frame.envelope.consistency;
48 let reply_codec = reply_codec_for_frame(self, &frame);
49 let request = eval_request_from_frame(cx, &frame)?;
50 let reply = realize_locally(cx, request)?;
51 server_frame_from_reply(cx, &reply_codec, reply, consistency)
52 }
53 FrameKind::Trigger { .. } => {
54 realize_trigger_locally(cx, &frame)?;
55 Ok(frame)
56 }
57 FrameKind::Notify => {
58 realize_notify_locally(cx, &frame)?;
59 Ok(frame)
60 }
61 _ => Err(Error::Eval(format!(
62 "local eval site cannot answer frame kind {}",
63 frame.kind.as_symbol()
64 ))),
65 }
66 }
67
68 fn as_any(&self) -> &dyn std::any::Any {
69 self
70 }
71}
72
73impl StreamEndpoint for LocalEvalSite {
74 fn endpoint_id(&self) -> Symbol {
75 site_endpoint_id(SiteKind::Local, &self.address)
76 }
77
78 fn endpoint_kind(&self) -> StreamEndpointKind {
79 eval_site_endpoint_kind()
80 }
81
82 fn clock_domain(&self) -> ClockDomain {
83 eval_site_clock_domain()
84 }
85
86 fn latency_class(&self) -> LatencyClass {
87 eval_site_latency_class()
88 }
89}
90
91impl Site for LocalEvalSite {
92 fn kind(&self) -> SiteKind {
93 SiteKind::Local
94 }
95}
96
97#[derive(Clone)]
99pub struct FabricEvalSite {
100 kind: &'static str,
101 address: ServerAddress,
102 codecs: Vec<Symbol>,
103 fabric: Arc<dyn EvalFabric>,
104}
105
106impl FabricEvalSite {
107 pub fn new(
110 kind: &'static str,
111 address: ServerAddress,
112 codecs: Vec<Symbol>,
113 fabric: Arc<dyn EvalFabric>,
114 ) -> Self {
115 Self {
116 kind,
117 address,
118 codecs,
119 fabric,
120 }
121 }
122}
123
124impl EvalSite for FabricEvalSite {
125 fn site_kind(&self) -> &'static str {
126 self.kind
127 }
128
129 fn address(&self) -> &ServerAddress {
130 &self.address
131 }
132
133 fn codecs(&self) -> &[Symbol] {
134 &self.codecs
135 }
136
137 fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
138 match frame.kind {
139 FrameKind::Trigger { .. } => {
140 let request = eval_request_from_trigger_frame(cx, &frame)?;
141 let _ = self.fabric.realize(cx, request)?;
142 Ok(frame)
143 }
144 _ => {
145 let consistency = frame.envelope.consistency;
146 let reply_codec = reply_codec_for_frame(self, &frame);
147 let request = eval_request_from_frame(cx, &frame)?;
148 let reply = self.fabric.realize(cx, request)?;
149 server_frame_from_reply(cx, &reply_codec, reply, consistency)
150 }
151 }
152 }
153
154 fn as_eval_fabric(&self) -> Option<&dyn EvalFabric> {
155 Some(self.fabric.as_ref())
156 }
157
158 fn as_any(&self) -> &dyn std::any::Any {
159 self
160 }
161}
162
163impl StreamEndpoint for FabricEvalSite {
164 fn endpoint_id(&self) -> Symbol {
165 site_endpoint_id(SiteKind::Fabric, &self.address)
166 }
167
168 fn endpoint_kind(&self) -> StreamEndpointKind {
169 eval_site_endpoint_kind()
170 }
171
172 fn clock_domain(&self) -> ClockDomain {
173 eval_site_clock_domain()
174 }
175
176 fn latency_class(&self) -> LatencyClass {
177 eval_site_latency_class()
178 }
179}
180
181impl Site for FabricEvalSite {
182 fn kind(&self) -> SiteKind {
183 SiteKind::Fabric
184 }
185}
186
187#[derive(Clone)]
189pub struct CoroutineEvalSite {
190 address: ServerAddress,
191 codecs: Vec<Symbol>,
192 coroutine: Arc<Coroutine>,
193}
194
195impl CoroutineEvalSite {
196 pub fn new(address: ServerAddress, codecs: Vec<Symbol>, coroutine: Arc<Coroutine>) -> Self {
199 Self {
200 address,
201 codecs,
202 coroutine,
203 }
204 }
205
206 pub fn coroutine(&self) -> &Arc<Coroutine> {
208 &self.coroutine
209 }
210}
211
212impl EvalSite for CoroutineEvalSite {
213 fn site_kind(&self) -> &'static str {
214 "coroutine"
215 }
216
217 fn address(&self) -> &ServerAddress {
218 &self.address
219 }
220
221 fn codecs(&self) -> &[Symbol] {
222 &self.codecs
223 }
224
225 fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame> {
226 match frame.kind {
227 FrameKind::Request => {
228 let consistency = frame.envelope.consistency;
229 let reply_codec = reply_codec_for_frame(self, &frame);
230 let request = eval_request_from_frame(cx, &frame)?;
231 let input = cx.factory().expr(request.expr)?;
232 let reply = self.coroutine.resume(cx, input)?;
233 let diagnostics = cx.take_diagnostics();
234 server_frame_from_reply(
235 cx,
236 &reply_codec,
237 EvalReply {
238 value: reply,
239 diagnostics,
240 trace: None,
241 },
242 consistency,
243 )
244 }
245 FrameKind::Trigger { .. } => {
246 let expr = frame.decode_expr(cx, ReadPolicy::default())?;
247 enforce_trigger_eval_policy(cx, &expr)?;
248 let input = cx.factory().expr(expr)?;
249 let _ = self.coroutine.resume(cx, input)?;
250 Ok(frame)
251 }
252 FrameKind::Notify => {
253 let expr = decode_frame_payload(
254 cx,
255 &frame.codec,
256 &frame.payload,
257 ReadPolicy::default(),
258 Default::default(),
259 )?;
260 let input = cx.factory().expr(expr)?;
261 let _ = self.coroutine.resume(cx, input)?;
262 Ok(frame)
263 }
264 _ => Err(Error::Eval(format!(
265 "coroutine eval site cannot answer frame kind {}",
266 frame.kind.as_symbol()
267 ))),
268 }
269 }
270
271 fn as_any(&self) -> &dyn std::any::Any {
272 self
273 }
274}
275
276impl StreamEndpoint for CoroutineEvalSite {
277 fn endpoint_id(&self) -> Symbol {
278 site_endpoint_id(SiteKind::Coroutine, &self.address)
279 }
280
281 fn endpoint_kind(&self) -> StreamEndpointKind {
282 eval_site_endpoint_kind()
283 }
284
285 fn clock_domain(&self) -> ClockDomain {
286 eval_site_clock_domain()
287 }
288
289 fn latency_class(&self) -> LatencyClass {
290 eval_site_latency_class()
291 }
292}
293
294impl Site for CoroutineEvalSite {
295 fn kind(&self) -> SiteKind {
296 SiteKind::Coroutine
297 }
298}
299
300fn realize_locally(cx: &mut Cx, request: sim_kernel::EvalRequest) -> Result<EvalReply> {
301 for capability in &request.required_capabilities {
302 cx.require(capability)?;
303 }
304 let value = cx.eval_expr(request.expr)?;
305 if let Some(shape) = &request.result_shape {
306 let Some(shape_object) = shape.object().as_shape() else {
307 return Err(Error::TypeMismatch {
308 expected: "shape",
309 found: "non-shape",
310 });
311 };
312 let matched = shape_object.check_value(cx, value.clone())?;
313 if !matched.accepted {
314 return Err(Error::WrongShape {
315 expected: shape_object.id().unwrap_or(sim_kernel::ShapeId(0)),
316 diagnostics: matched.diagnostics,
317 });
318 }
319 }
320 Ok(EvalReply {
321 value,
322 diagnostics: cx.take_diagnostics(),
323 trace: request
324 .trace
325 .then(|| cx.factory().symbol(Symbol::new("local")).ok())
326 .flatten(),
327 })
328}
329
330fn realize_notify_locally(cx: &mut Cx, frame: &ServerFrame) -> Result<()> {
331 for capability in &frame.envelope.required_capabilities {
332 cx.require(capability)?;
333 }
334 let expr = decode_frame_payload(
335 cx,
336 &frame.codec,
337 &frame.payload,
338 ReadPolicy::default(),
339 Default::default(),
340 )?;
341 cx.eval_expr(expr)?;
342 Ok(())
343}
344
345fn realize_trigger_locally(cx: &mut Cx, frame: &ServerFrame) -> Result<()> {
346 for capability in &frame.envelope.required_capabilities {
347 cx.require(capability)?;
348 }
349 let expr = frame.decode_expr(cx, ReadPolicy::default())?;
350 enforce_trigger_eval_policy(cx, &expr)?;
351 cx.eval_expr(expr)?;
352 Ok(())
353}