Skip to main content

sim_lib_server/site/
core.rs

1use std::{any::Any, time::Duration};
2
3use sim_kernel::{Consistency, Cx, EvalFabric, EvalMode, EvalReply, EvalRequest, Result, Symbol};
4use sim_lib_stream_core::{
5    ClockDomain, LatencyClass, PlacedFragment, StreamEndpoint, StreamEndpointKind, StreamEnvelope,
6};
7
8use crate::helpers::default_server_codec;
9use crate::{ServerAddress, ServerFrame, eval_reply_from_frame, server_frame_from_request};
10
11/// Receiver for streamed server frames produced by [`EvalSite::stream`].
12pub trait StreamSink {
13    /// Accept one streamed server frame.
14    ///
15    /// The default `EvalSite::stream` compatibility path may pass a
16    /// `FrameKind::Response` reply through this method. Real streaming sites
17    /// should emit `FrameKind::StreamStart`, `FrameKind::StreamChunk`, and
18    /// `FrameKind::StreamEnd` frames instead.
19    fn chunk(&mut self, cx: &mut Cx, frame: ServerFrame) -> Result<()>;
20    /// Finish the transport or sink.
21    ///
22    /// Implementations should treat this as idempotent because a
23    /// `FrameKind::StreamEnd` frame may already have closed the logical stream.
24    fn end(&mut self, cx: &mut Cx) -> Result<()>;
25}
26
27/// A request-answering eval endpoint: it accepts server frames and produces
28/// reply frames, optionally streaming or exposing an [`EvalFabric`].
29pub trait EvalSite: Send + Sync {
30    /// Returns a short label naming the kind of site.
31    fn site_kind(&self) -> &'static str;
32    /// Returns the address this site answers at.
33    fn address(&self) -> &ServerAddress;
34    /// Returns the codecs this site can decode and encode.
35    fn codecs(&self) -> &[Symbol];
36    /// Answers `frame`, returning the reply frame.
37    fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame>;
38    /// Answers `frame` with an optional deadline. Defaults to [`EvalSite::answer`].
39    fn answer_with_timeout(
40        &self,
41        cx: &mut Cx,
42        frame: ServerFrame,
43        _timeout: Option<Duration>,
44    ) -> Result<ServerFrame> {
45        self.answer(cx, frame)
46    }
47    /// Closes any connection backing the site. Defaults to a no-op.
48    fn close_connection(&self, _cx: &mut Cx) -> Result<()> {
49        Ok(())
50    }
51    /// Returns the site as `&dyn Any` for downcasting.
52    fn as_any(&self) -> &dyn Any;
53
54    /// Streams the answer to `frame` into `sink`. The default path answers once
55    /// and emits a single chunk followed by `end`.
56    fn stream(&self, cx: &mut Cx, frame: ServerFrame, sink: &mut dyn StreamSink) -> Result<()> {
57        let reply = self.answer(cx, frame)?;
58        sink.chunk(cx, reply)?;
59        sink.end(cx)
60    }
61
62    /// Returns this site as an [`EvalFabric`] when it backs one. Defaults to `None`.
63    fn as_eval_fabric(&self) -> Option<&dyn EvalFabric> {
64        None
65    }
66}
67
68/// Identifies the variety of an eval [`Site`].
69#[derive(Clone, Copy, Debug, PartialEq, Eq)]
70pub enum SiteKind {
71    /// Evaluates requests in the local runtime.
72    Local,
73    /// Resumes a coroutine for each request.
74    Coroutine,
75    /// Chains requests through a sequence of connection steps.
76    Pipeline,
77    /// Repeats a pipeline until an until-condition fires.
78    Loop,
79    /// Delegates to a distributed eval fabric.
80    Fabric,
81}
82
83impl SiteKind {
84    /// Returns the lowercase string label for this kind.
85    pub fn as_str(self) -> &'static str {
86        match self {
87            Self::Local => "local",
88            Self::Coroutine => "coroutine",
89            Self::Pipeline => "pipeline",
90            Self::Loop => "loop",
91            Self::Fabric => "fabric",
92        }
93    }
94
95    /// Returns the `server/site`-qualified symbol naming this kind.
96    pub fn symbol(self) -> Symbol {
97        Symbol::qualified("server/site", self.as_str())
98    }
99}
100
101/// An [`EvalSite`] that is also a stream endpoint and can realize placed
102/// dataflow fragments.
103pub trait Site: EvalSite + StreamEndpoint {
104    /// Returns the [`SiteKind`] of this site.
105    fn kind(&self) -> SiteKind;
106
107    /// Realizes `fragment`: feeds its input edges, evaluates its node, and emits
108    /// a result envelope per output edge.
109    fn run_fragment(&self, cx: &mut Cx, fragment: &PlacedFragment) -> Result<Vec<StreamEnvelope>> {
110        self.accept_input_edges(fragment.input_edges())?;
111        let reply = self.realize_fragment_node(cx, fragment)?;
112        let payload = reply.value.object().as_expr(cx)?;
113
114        if fragment.output_edges().is_empty() {
115            return self.output_envelopes(fragment);
116        }
117
118        fragment
119            .output_edges()
120            .iter()
121            .enumerate()
122            .map(|(sequence, edge)| edge.result_envelope(sequence as u64, payload.clone()))
123            .collect()
124    }
125
126    /// Evaluates the fragment's node and returns its reply, with no deadline.
127    fn realize_fragment_node(&self, cx: &mut Cx, fragment: &PlacedFragment) -> Result<EvalReply> {
128        self.realize_fragment_node_with_timeout(cx, fragment, None)
129    }
130
131    /// Evaluates the fragment's node and returns its reply, honoring `timeout`.
132    fn realize_fragment_node_with_timeout(
133        &self,
134        cx: &mut Cx,
135        fragment: &PlacedFragment,
136        timeout: Option<Duration>,
137    ) -> Result<EvalReply> {
138        let request = EvalRequest {
139            expr: fragment.node().clone(),
140            result_shape: None,
141            required_capabilities: Vec::new(),
142            deadline: timeout,
143            consistency: Consistency::LocalFirst,
144            mode: EvalMode::Eval,
145            answer_limit: None,
146            stream_buffer: None,
147            stream: false,
148            trace: false,
149        };
150        let codec = default_server_codec(self.codecs())?;
151        let frame = server_frame_from_request(cx, &codec, request)?;
152        let reply = self.answer_with_timeout(cx, frame, timeout)?;
153        eval_reply_from_frame(cx, &reply)
154    }
155}
156
157pub(crate) fn reply_codec_for_frame(site: &dyn EvalSite, frame: &ServerFrame) -> Symbol {
158    if let Some(hint) = &frame.envelope.reply_codec_hint
159        && site.codecs().iter().any(|codec| codec == hint)
160    {
161        return hint.clone();
162    }
163    default_server_codec(site.codecs()).unwrap_or_else(|_| frame.codec.clone())
164}
165
166pub(crate) fn site_endpoint_id(kind: SiteKind, address: &ServerAddress) -> Symbol {
167    Symbol::qualified(
168        "server/site-endpoint",
169        format!("{}-{}", kind.as_str(), address.kind_symbol().name),
170    )
171}
172
173pub(crate) fn eval_site_endpoint_kind() -> StreamEndpointKind {
174    StreamEndpointKind::EvalSite
175}
176
177pub(crate) fn eval_site_clock_domain() -> ClockDomain {
178    ClockDomain::Control
179}
180
181pub(crate) fn eval_site_latency_class() -> LatencyClass {
182    LatencyClass::Interactive
183}