Skip to main content

sim_lib_stream_core/
site.rs

1//! Stream placement -- where stream fragments live and how they are wired.
2//!
3//! This module describes the placement surface of the streaming fabric: a
4//! [`StreamEndpoint`] is a site that produces, consumes, or routes a stream; a
5//! [`StreamEdge`] is a typed, rate-contracted port carrying envelopes between
6//! sites; and a [`PlacedFragment`] is a graph node bound to its input and
7//! output edges. The kernel supplies the protocol types (clock domains, rate
8//! contracts, [`Symbol`], [`Expr`]); this module supplies the concrete
9//! placement and routing behavior.
10
11use sim_kernel::{Error, Expr, Result, Symbol};
12
13use crate::{
14    ClockDomain, LatencyClass, RateContract, StreamDirection, StreamEnvelope, StreamItem,
15    StreamMedia, StreamMetadata, StreamPacket, TransportProfile,
16};
17
18/// Role a [`StreamEndpoint`] plays at a placement site.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum StreamEndpointKind {
21    /// Site that originates stream packets.
22    Producer,
23    /// Site that terminates a stream by consuming packets.
24    Consumer,
25    /// Site that forwards a stream between two other sites.
26    Bridge,
27    /// Site that observes a stream without altering its flow.
28    Inspector,
29    /// Site that evaluates over the stream as a distributed eval target.
30    EvalSite,
31}
32
33impl StreamEndpointKind {
34    /// Returns the stable wire label for this kind.
35    pub fn wire_label(self) -> &'static str {
36        match self {
37            Self::Producer => "producer",
38            Self::Consumer => "consumer",
39            Self::Bridge => "bridge",
40            Self::Inspector => "inspector",
41            Self::EvalSite => "eval-site",
42        }
43    }
44
45    /// Returns this kind as a qualified `stream/endpoint-kind` symbol.
46    pub fn symbol(self) -> Symbol {
47        Symbol::qualified("stream/endpoint-kind", self.wire_label())
48    }
49}
50
51/// A typed, rate-contracted port carrying envelopes between sites.
52///
53/// An edge names its port, the [`RateContract`] (and thus clock domain) it must
54/// honor, the stream [`StreamMetadata`] flowing across it, and any envelopes
55/// already buffered on it.
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct StreamEdge {
58    port: Symbol,
59    rate_contract: RateContract,
60    metadata: StreamMetadata,
61    envelopes: Vec<StreamEnvelope>,
62}
63
64impl StreamEdge {
65    /// Creates an empty edge for `port` under the given rate contract and
66    /// metadata.
67    pub fn new(port: Symbol, rate_contract: RateContract, metadata: StreamMetadata) -> Self {
68        Self {
69            port,
70            rate_contract,
71            metadata,
72            envelopes: Vec::new(),
73        }
74    }
75
76    /// Returns the edge with its buffered envelopes replaced.
77    pub fn with_envelopes(mut self, envelopes: Vec<StreamEnvelope>) -> Self {
78        self.envelopes = envelopes;
79        self
80    }
81
82    /// Returns the port symbol naming this edge.
83    pub fn port(&self) -> &Symbol {
84        &self.port
85    }
86
87    /// Returns the rate contract this edge must honor.
88    pub fn rate_contract(&self) -> RateContract {
89        self.rate_contract
90    }
91
92    /// Returns the metadata of the stream flowing across this edge.
93    pub fn metadata(&self) -> &StreamMetadata {
94        &self.metadata
95    }
96
97    /// Returns the envelopes currently buffered on this edge.
98    pub fn envelopes(&self) -> &[StreamEnvelope] {
99        &self.envelopes
100    }
101
102    /// Builds a `site-result` data envelope for this edge.
103    ///
104    /// Wraps `payload` in a `stream/data` packet sequenced at `sequence` under
105    /// a memory-local transport profile, using this edge's metadata.
106    pub fn result_envelope(&self, sequence: u64, payload: Expr) -> Result<StreamEnvelope> {
107        let item = StreamItem::new(StreamPacket::data(
108            Symbol::qualified("stream/data", "site-result"),
109            payload,
110        ));
111        StreamEnvelope::from_item_with_profile(
112            &self.metadata,
113            sequence,
114            &item,
115            TransportProfile::memory_local(),
116        )
117    }
118}
119
120/// A graph node placed at a site with its wired input and output edges.
121///
122/// Identifies the fragment, holds the [`Expr`] node it evaluates, and tracks
123/// the [`StreamEdge`]s feeding into and out of it.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub struct PlacedFragment {
126    id: Symbol,
127    node: Expr,
128    input_edges: Vec<StreamEdge>,
129    output_edges: Vec<StreamEdge>,
130}
131
132impl PlacedFragment {
133    /// Creates a fragment for `id` evaluating `node`, with no edges yet.
134    pub fn new(id: Symbol, node: Expr) -> Self {
135        Self {
136            id,
137            node,
138            input_edges: Vec::new(),
139            output_edges: Vec::new(),
140        }
141    }
142
143    /// Returns the fragment with `edge` appended to its input edges.
144    pub fn with_input_edge(mut self, edge: StreamEdge) -> Self {
145        self.input_edges.push(edge);
146        self
147    }
148
149    /// Returns the fragment with `edge` appended to its output edges.
150    pub fn with_output_edge(mut self, edge: StreamEdge) -> Self {
151        self.output_edges.push(edge);
152        self
153    }
154
155    /// Returns the fragment identifier.
156    pub fn id(&self) -> &Symbol {
157        &self.id
158    }
159
160    /// Returns the [`Expr`] node this fragment evaluates.
161    pub fn node(&self) -> &Expr {
162        &self.node
163    }
164
165    /// Returns the fragment's input edges.
166    pub fn input_edges(&self) -> &[StreamEdge] {
167        &self.input_edges
168    }
169
170    /// Returns the fragment's output edges.
171    pub fn output_edges(&self) -> &[StreamEdge] {
172        &self.output_edges
173    }
174
175    /// Collects the envelopes buffered across every output edge.
176    pub fn output_envelopes(&self) -> Vec<StreamEnvelope> {
177        self.output_edges
178            .iter()
179            .flat_map(|edge| edge.envelopes().iter().cloned())
180            .collect()
181    }
182}
183
184/// A site that produces, consumes, or routes a stream.
185///
186/// An endpoint declares its identity, [kind](StreamEndpointKind), clock domain,
187/// and latency class, and accepts input edges whose rate contracts must agree
188/// with its clock domain. Implementors supply the concrete placement behavior;
189/// the default methods enforce the clock-domain contract and surface a
190/// fragment's output envelopes.
191pub trait StreamEndpoint: Send + Sync {
192    /// Returns the endpoint's stable identifier.
193    fn endpoint_id(&self) -> Symbol;
194    /// Returns the role this endpoint plays.
195    fn endpoint_kind(&self) -> StreamEndpointKind;
196    /// Returns the clock domain this endpoint runs in.
197    fn clock_domain(&self) -> ClockDomain;
198    /// Returns the latency class this endpoint targets.
199    fn latency_class(&self) -> LatencyClass;
200
201    /// Validates that each input edge's clock domain matches this endpoint.
202    ///
203    /// Returns an error naming the first edge whose rate-contract clock domain
204    /// differs from [`clock_domain`](StreamEndpoint::clock_domain).
205    fn accept_input_edges(&self, edges: &[StreamEdge]) -> Result<()> {
206        for edge in edges {
207            if edge.rate_contract().clock_domain() != self.clock_domain() {
208                return Err(Error::Eval(format!(
209                    "stream edge {} clock domain {} does not match endpoint {}",
210                    edge.port(),
211                    edge.rate_contract().clock_domain().wire_label(),
212                    self.clock_domain().wire_label()
213                )));
214            }
215        }
216        Ok(())
217    }
218
219    /// Returns the envelopes this endpoint emits for `fragment`.
220    ///
221    /// Defaults to the fragment's own output envelopes; endpoints that
222    /// transform the stream override this.
223    fn output_envelopes(&self, fragment: &PlacedFragment) -> Result<Vec<StreamEnvelope>> {
224        Ok(fragment.output_envelopes())
225    }
226}
227
228/// Builds a [`StreamEdge`] for `port` from media, direction, and rate contract.
229///
230/// Synthesizes a `stream/edge` metadata record with a bounded single-slot
231/// buffer and the clock domain drawn from `rate_contract`.
232pub fn stream_edge(
233    port: impl Into<String>,
234    media: StreamMedia,
235    direction: StreamDirection,
236    rate_contract: RateContract,
237) -> StreamEdge {
238    let port = Symbol::new(port.into());
239    let metadata = StreamMetadata::new(
240        Symbol::qualified("stream/edge", port.name.to_string()),
241        media,
242        direction,
243        rate_contract.clock_domain().symbol(),
244        crate::BufferPolicy::bounded(1).expect("stream edge helper uses a nonzero buffer"),
245    );
246    StreamEdge::new(port, rate_contract, metadata)
247}