use std::{any::Any, time::Duration};
use sim_kernel::{Consistency, Cx, EvalFabric, EvalMode, EvalReply, EvalRequest, Result, Symbol};
use sim_lib_stream_core::{
ClockDomain, LatencyClass, PlacedFragment, StreamEndpoint, StreamEndpointKind, StreamEnvelope,
};
use crate::helpers::default_server_codec;
use crate::{ServerAddress, ServerFrame, eval_reply_from_frame, server_frame_from_request};
pub trait StreamSink {
fn chunk(&mut self, cx: &mut Cx, frame: ServerFrame) -> Result<()>;
fn end(&mut self, cx: &mut Cx) -> Result<()>;
}
pub trait EvalSite: Send + Sync {
fn site_kind(&self) -> &'static str;
fn address(&self) -> &ServerAddress;
fn codecs(&self) -> &[Symbol];
fn answer(&self, cx: &mut Cx, frame: ServerFrame) -> Result<ServerFrame>;
fn answer_with_timeout(
&self,
cx: &mut Cx,
frame: ServerFrame,
_timeout: Option<Duration>,
) -> Result<ServerFrame> {
self.answer(cx, frame)
}
fn close_connection(&self, _cx: &mut Cx) -> Result<()> {
Ok(())
}
fn as_any(&self) -> &dyn Any;
fn stream(&self, cx: &mut Cx, frame: ServerFrame, sink: &mut dyn StreamSink) -> Result<()> {
let reply = self.answer(cx, frame)?;
sink.chunk(cx, reply)?;
sink.end(cx)
}
fn as_eval_fabric(&self) -> Option<&dyn EvalFabric> {
None
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SiteKind {
Local,
Coroutine,
Pipeline,
Loop,
Fabric,
}
impl SiteKind {
pub fn as_str(self) -> &'static str {
match self {
Self::Local => "local",
Self::Coroutine => "coroutine",
Self::Pipeline => "pipeline",
Self::Loop => "loop",
Self::Fabric => "fabric",
}
}
pub fn symbol(self) -> Symbol {
Symbol::qualified("server/site", self.as_str())
}
}
pub trait Site: EvalSite + StreamEndpoint {
fn kind(&self) -> SiteKind;
fn run_fragment(&self, cx: &mut Cx, fragment: &PlacedFragment) -> Result<Vec<StreamEnvelope>> {
self.accept_input_edges(fragment.input_edges())?;
let reply = self.realize_fragment_node(cx, fragment)?;
let payload = reply.value.object().as_expr(cx)?;
if fragment.output_edges().is_empty() {
return self.output_envelopes(fragment);
}
fragment
.output_edges()
.iter()
.enumerate()
.map(|(sequence, edge)| edge.result_envelope(sequence as u64, payload.clone()))
.collect()
}
fn realize_fragment_node(&self, cx: &mut Cx, fragment: &PlacedFragment) -> Result<EvalReply> {
self.realize_fragment_node_with_timeout(cx, fragment, None)
}
fn realize_fragment_node_with_timeout(
&self,
cx: &mut Cx,
fragment: &PlacedFragment,
timeout: Option<Duration>,
) -> Result<EvalReply> {
let request = EvalRequest {
expr: fragment.node().clone(),
result_shape: None,
required_capabilities: Vec::new(),
deadline: timeout,
consistency: Consistency::LocalFirst,
mode: EvalMode::Eval,
answer_limit: None,
stream_buffer: None,
stream: false,
trace: false,
};
let codec = default_server_codec(self.codecs())?;
let frame = server_frame_from_request(cx, &codec, request)?;
let reply = self.answer_with_timeout(cx, frame, timeout)?;
eval_reply_from_frame(cx, &reply)
}
}
pub(crate) fn reply_codec_for_frame(site: &dyn EvalSite, frame: &ServerFrame) -> Symbol {
if let Some(hint) = &frame.envelope.reply_codec_hint
&& site.codecs().iter().any(|codec| codec == hint)
{
return hint.clone();
}
default_server_codec(site.codecs()).unwrap_or_else(|_| frame.codec.clone())
}
pub(crate) fn site_endpoint_id(kind: SiteKind, address: &ServerAddress) -> Symbol {
Symbol::qualified(
"server/site-endpoint",
format!("{}-{}", kind.as_str(), address.kind_symbol().name),
)
}
pub(crate) fn eval_site_endpoint_kind() -> StreamEndpointKind {
StreamEndpointKind::EvalSite
}
pub(crate) fn eval_site_clock_domain() -> ClockDomain {
ClockDomain::Control
}
pub(crate) fn eval_site_latency_class() -> LatencyClass {
LatencyClass::Interactive
}