sim_lib_server/site/
core.rs1use std::{any::Any, time::Duration};
2
3use sim_kernel::{Consistency, Cx, EvalFabric, EvalMode, EvalReply, EvalRequest, Result, Symbol};
4use sim_lib_stream_core::{
5 ClockDomain, LatencyClass, PlacedFragment, StreamEndpoint, StreamEndpointKind, StreamEnvelope,
6};
7
8use crate::helpers::default_server_codec;
9use crate::{ServerAddress, ServerFrame, eval_reply_from_frame, server_frame_from_request};
10
11pub trait StreamSink {
13 fn chunk(&mut self, cx: &mut Cx, frame: ServerFrame) -> Result<()>;
20 fn end(&mut self, cx: &mut Cx) -> Result<()>;
25}
26
27pub trait EvalSite: Send + Sync {
30 fn site_kind(&self) -> &'static str;
32 fn address(&self) -> &ServerAddress;
34 fn codecs(&self) -> &[Symbol];
36 fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame>;
38 fn answer_with_timeout(
40 &self,
41 cx: &mut Cx,
42 frame: ServerFrame,
43 _timeout: Option<Duration>,
44 ) -> Result<ServerFrame> {
45 self.answer(cx, frame)
46 }
47 fn close_connection(&self, _cx: &mut Cx) -> Result<()> {
49 Ok(())
50 }
51 fn as_any(&self) -> &dyn Any;
53
54 fn stream(&self, cx: &mut Cx, frame: ServerFrame, sink: &mut dyn StreamSink) -> Result<()> {
57 let reply = self.answer(cx, frame)?;
58 sink.chunk(cx, reply)?;
59 sink.end(cx)
60 }
61
62 fn as_eval_fabric(&self) -> Option<&dyn EvalFabric> {
64 None
65 }
66}
67
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
70pub enum SiteKind {
71 Local,
73 Coroutine,
75 Pipeline,
77 Loop,
79 Fabric,
81}
82
83impl SiteKind {
84 pub fn as_str(self) -> &'static str {
86 match self {
87 Self::Local => "local",
88 Self::Coroutine => "coroutine",
89 Self::Pipeline => "pipeline",
90 Self::Loop => "loop",
91 Self::Fabric => "fabric",
92 }
93 }
94
95 pub fn symbol(self) -> Symbol {
97 Symbol::qualified("server/site", self.as_str())
98 }
99}
100
101pub trait Site: EvalSite + StreamEndpoint {
104 fn kind(&self) -> SiteKind;
106
107 fn run_fragment(&self, cx: &mut Cx, fragment: &PlacedFragment) -> Result<Vec<StreamEnvelope>> {
110 self.accept_input_edges(fragment.input_edges())?;
111 let reply = self.realize_fragment_node(cx, fragment)?;
112 let payload = reply.value.object().as_expr(cx)?;
113
114 if fragment.output_edges().is_empty() {
115 return self.output_envelopes(fragment);
116 }
117
118 fragment
119 .output_edges()
120 .iter()
121 .enumerate()
122 .map(|(sequence, edge)| edge.result_envelope(sequence as u64, payload.clone()))
123 .collect()
124 }
125
126 fn realize_fragment_node(&self, cx: &mut Cx, fragment: &PlacedFragment) -> Result<EvalReply> {
128 self.realize_fragment_node_with_timeout(cx, fragment, None)
129 }
130
131 fn realize_fragment_node_with_timeout(
133 &self,
134 cx: &mut Cx,
135 fragment: &PlacedFragment,
136 timeout: Option<Duration>,
137 ) -> Result<EvalReply> {
138 let request = EvalRequest {
139 expr: fragment.node().clone(),
140 result_shape: None,
141 required_capabilities: Vec::new(),
142 deadline: timeout,
143 consistency: Consistency::LocalFirst,
144 mode: EvalMode::Eval,
145 answer_limit: None,
146 stream_buffer: None,
147 stream: false,
148 trace: false,
149 };
150 let codec = default_server_codec(self.codecs())?;
151 let frame = server_frame_from_request(cx, &codec, request)?;
152 let reply = self.answer_with_timeout(cx, frame, timeout)?;
153 eval_reply_from_frame(cx, &reply)
154 }
155}
156
157pub(crate) fn reply_codec_for_frame(site: &dyn EvalSite, frame: &ServerFrame) -> Symbol {
158 if let Some(hint) = &frame.envelope.reply_codec_hint
159 && site.codecs().iter().any(|codec| codec == hint)
160 {
161 return hint.clone();
162 }
163 default_server_codec(site.codecs()).unwrap_or_else(|_| frame.codec.clone())
164}
165
166pub(crate) fn site_endpoint_id(kind: SiteKind, address: &ServerAddress) -> Symbol {
167 Symbol::qualified(
168 "server/site-endpoint",
169 format!("{}-{}", kind.as_str(), address.kind_symbol().name),
170 )
171}
172
173pub(crate) fn eval_site_endpoint_kind() -> StreamEndpointKind {
174 StreamEndpointKind::EvalSite
175}
176
177pub(crate) fn eval_site_clock_domain() -> ClockDomain {
178 ClockDomain::Control
179}
180
181pub(crate) fn eval_site_latency_class() -> LatencyClass {
182 LatencyClass::Interactive
183}