use sim_kernel::{Error, Expr, Result, Symbol};
use crate::{
ClockDomain, LatencyClass, RateContract, StreamDirection, StreamEnvelope, StreamItem,
StreamMedia, StreamMetadata, StreamPacket, TransportProfile,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum StreamEndpointKind {
Producer,
Consumer,
Bridge,
Inspector,
EvalSite,
}
impl StreamEndpointKind {
pub fn wire_label(self) -> &'static str {
match self {
Self::Producer => "producer",
Self::Consumer => "consumer",
Self::Bridge => "bridge",
Self::Inspector => "inspector",
Self::EvalSite => "eval-site",
}
}
pub fn symbol(self) -> Symbol {
Symbol::qualified("stream/endpoint-kind", self.wire_label())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamEdge {
port: Symbol,
rate_contract: RateContract,
metadata: StreamMetadata,
envelopes: Vec<StreamEnvelope>,
}
impl StreamEdge {
pub fn new(port: Symbol, rate_contract: RateContract, metadata: StreamMetadata) -> Self {
Self {
port,
rate_contract,
metadata,
envelopes: Vec::new(),
}
}
pub fn with_envelopes(mut self, envelopes: Vec<StreamEnvelope>) -> Self {
self.envelopes = envelopes;
self
}
pub fn port(&self) -> &Symbol {
&self.port
}
pub fn rate_contract(&self) -> RateContract {
self.rate_contract
}
pub fn metadata(&self) -> &StreamMetadata {
&self.metadata
}
pub fn envelopes(&self) -> &[StreamEnvelope] {
&self.envelopes
}
pub fn result_envelope(&self, sequence: u64, payload: Expr) -> Result<StreamEnvelope> {
let item = StreamItem::new(StreamPacket::data(
Symbol::qualified("stream/data", "site-result"),
payload,
));
StreamEnvelope::from_item_with_profile(
&self.metadata,
sequence,
&item,
TransportProfile::memory_local(),
)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PlacedFragment {
id: Symbol,
node: Expr,
input_edges: Vec<StreamEdge>,
output_edges: Vec<StreamEdge>,
}
impl PlacedFragment {
pub fn new(id: Symbol, node: Expr) -> Self {
Self {
id,
node,
input_edges: Vec::new(),
output_edges: Vec::new(),
}
}
pub fn with_input_edge(mut self, edge: StreamEdge) -> Self {
self.input_edges.push(edge);
self
}
pub fn with_output_edge(mut self, edge: StreamEdge) -> Self {
self.output_edges.push(edge);
self
}
pub fn id(&self) -> &Symbol {
&self.id
}
pub fn node(&self) -> &Expr {
&self.node
}
pub fn input_edges(&self) -> &[StreamEdge] {
&self.input_edges
}
pub fn output_edges(&self) -> &[StreamEdge] {
&self.output_edges
}
pub fn output_envelopes(&self) -> Vec<StreamEnvelope> {
self.output_edges
.iter()
.flat_map(|edge| edge.envelopes().iter().cloned())
.collect()
}
}
pub trait StreamEndpoint: Send + Sync {
fn endpoint_id(&self) -> Symbol;
fn endpoint_kind(&self) -> StreamEndpointKind;
fn clock_domain(&self) -> ClockDomain;
fn latency_class(&self) -> LatencyClass;
fn accept_input_edges(&self, edges: &[StreamEdge]) -> Result<()> {
for edge in edges {
if edge.rate_contract().clock_domain() != self.clock_domain() {
return Err(Error::Eval(format!(
"stream edge {} clock domain {} does not match endpoint {}",
edge.port(),
edge.rate_contract().clock_domain().wire_label(),
self.clock_domain().wire_label()
)));
}
}
Ok(())
}
fn output_envelopes(&self, fragment: &PlacedFragment) -> Result<Vec<StreamEnvelope>> {
Ok(fragment.output_envelopes())
}
}
pub fn stream_edge(
port: impl Into<String>,
media: StreamMedia,
direction: StreamDirection,
rate_contract: RateContract,
) -> StreamEdge {
let port = Symbol::new(port.into());
let metadata = StreamMetadata::new(
Symbol::qualified("stream/edge", port.name.to_string()),
media,
direction,
rate_contract.clock_domain().symbol(),
crate::BufferPolicy::bounded(1).expect("stream edge helper uses a nonzero buffer"),
);
StreamEdge::new(port, rate_contract, metadata)
}