sim_lib_stream_core/site.rs
1//! Stream placement -- where stream fragments live and how they are wired.
2//!
3//! This module describes the placement surface of the streaming fabric: a
4//! [`StreamEndpoint`] is a site that produces, consumes, or routes a stream; a
5//! [`StreamEdge`] is a typed, rate-contracted port carrying envelopes between
6//! sites; and a [`PlacedFragment`] is a graph node bound to its input and
7//! output edges. The kernel supplies the protocol types (clock domains, rate
8//! contracts, [`Symbol`], [`Expr`]); this module supplies the concrete
9//! placement and routing behavior.
10
11use sim_kernel::{Error, Expr, Result, Symbol};
12
13use crate::{
14 ClockDomain, LatencyClass, RateContract, StreamDirection, StreamEnvelope, StreamItem,
15 StreamMedia, StreamMetadata, StreamPacket, TransportProfile,
16};
17
18/// Role a [`StreamEndpoint`] plays at a placement site.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum StreamEndpointKind {
21 /// Site that originates stream packets.
22 Producer,
23 /// Site that terminates a stream by consuming packets.
24 Consumer,
25 /// Site that forwards a stream between two other sites.
26 Bridge,
27 /// Site that observes a stream without altering its flow.
28 Inspector,
29 /// Site that evaluates over the stream as a distributed eval target.
30 EvalSite,
31}
32
33impl StreamEndpointKind {
34 /// Returns the stable wire label for this kind.
35 pub fn wire_label(self) -> &'static str {
36 match self {
37 Self::Producer => "producer",
38 Self::Consumer => "consumer",
39 Self::Bridge => "bridge",
40 Self::Inspector => "inspector",
41 Self::EvalSite => "eval-site",
42 }
43 }
44
45 /// Returns this kind as a qualified `stream/endpoint-kind` symbol.
46 pub fn symbol(self) -> Symbol {
47 Symbol::qualified("stream/endpoint-kind", self.wire_label())
48 }
49}
50
51/// A typed, rate-contracted port carrying envelopes between sites.
52///
53/// An edge names its port, the [`RateContract`] (and thus clock domain) it must
54/// honor, the stream [`StreamMetadata`] flowing across it, and any envelopes
55/// already buffered on it.
56#[derive(Clone, Debug, PartialEq, Eq)]
57pub struct StreamEdge {
58 port: Symbol,
59 rate_contract: RateContract,
60 metadata: StreamMetadata,
61 envelopes: Vec<StreamEnvelope>,
62}
63
64impl StreamEdge {
65 /// Creates an empty edge for `port` under the given rate contract and
66 /// metadata.
67 pub fn new(port: Symbol, rate_contract: RateContract, metadata: StreamMetadata) -> Self {
68 Self {
69 port,
70 rate_contract,
71 metadata,
72 envelopes: Vec::new(),
73 }
74 }
75
76 /// Returns the edge with its buffered envelopes replaced.
77 pub fn with_envelopes(mut self, envelopes: Vec<StreamEnvelope>) -> Self {
78 self.envelopes = envelopes;
79 self
80 }
81
82 /// Returns the port symbol naming this edge.
83 pub fn port(&self) -> &Symbol {
84 &self.port
85 }
86
87 /// Returns the rate contract this edge must honor.
88 pub fn rate_contract(&self) -> RateContract {
89 self.rate_contract
90 }
91
92 /// Returns the metadata of the stream flowing across this edge.
93 pub fn metadata(&self) -> &StreamMetadata {
94 &self.metadata
95 }
96
97 /// Returns the envelopes currently buffered on this edge.
98 pub fn envelopes(&self) -> &[StreamEnvelope] {
99 &self.envelopes
100 }
101
102 /// Builds a `site-result` data envelope for this edge.
103 ///
104 /// Wraps `payload` in a `stream/data` packet sequenced at `sequence` under
105 /// a memory-local transport profile, using this edge's metadata.
106 pub fn result_envelope(&self, sequence: u64, payload: Expr) -> Result<StreamEnvelope> {
107 let item = StreamItem::new(StreamPacket::data(
108 Symbol::qualified("stream/data", "site-result"),
109 payload,
110 ));
111 StreamEnvelope::from_item_with_profile(
112 &self.metadata,
113 sequence,
114 &item,
115 TransportProfile::memory_local(),
116 )
117 }
118}
119
120/// A graph node placed at a site with its wired input and output edges.
121///
122/// Identifies the fragment, holds the [`Expr`] node it evaluates, and tracks
123/// the [`StreamEdge`]s feeding into and out of it.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub struct PlacedFragment {
126 id: Symbol,
127 node: Expr,
128 input_edges: Vec<StreamEdge>,
129 output_edges: Vec<StreamEdge>,
130}
131
132impl PlacedFragment {
133 /// Creates a fragment for `id` evaluating `node`, with no edges yet.
134 pub fn new(id: Symbol, node: Expr) -> Self {
135 Self {
136 id,
137 node,
138 input_edges: Vec::new(),
139 output_edges: Vec::new(),
140 }
141 }
142
143 /// Returns the fragment with `edge` appended to its input edges.
144 pub fn with_input_edge(mut self, edge: StreamEdge) -> Self {
145 self.input_edges.push(edge);
146 self
147 }
148
149 /// Returns the fragment with `edge` appended to its output edges.
150 pub fn with_output_edge(mut self, edge: StreamEdge) -> Self {
151 self.output_edges.push(edge);
152 self
153 }
154
155 /// Returns the fragment identifier.
156 pub fn id(&self) -> &Symbol {
157 &self.id
158 }
159
160 /// Returns the [`Expr`] node this fragment evaluates.
161 pub fn node(&self) -> &Expr {
162 &self.node
163 }
164
165 /// Returns the fragment's input edges.
166 pub fn input_edges(&self) -> &[StreamEdge] {
167 &self.input_edges
168 }
169
170 /// Returns the fragment's output edges.
171 pub fn output_edges(&self) -> &[StreamEdge] {
172 &self.output_edges
173 }
174
175 /// Collects the envelopes buffered across every output edge.
176 pub fn output_envelopes(&self) -> Vec<StreamEnvelope> {
177 self.output_edges
178 .iter()
179 .flat_map(|edge| edge.envelopes().iter().cloned())
180 .collect()
181 }
182}
183
184/// A site that produces, consumes, or routes a stream.
185///
186/// An endpoint declares its identity, [kind](StreamEndpointKind), clock domain,
187/// and latency class, and accepts input edges whose rate contracts must agree
188/// with its clock domain. Implementors supply the concrete placement behavior;
189/// the default methods enforce the clock-domain contract and surface a
190/// fragment's output envelopes.
191pub trait StreamEndpoint: Send + Sync {
192 /// Returns the endpoint's stable identifier.
193 fn endpoint_id(&self) -> Symbol;
194 /// Returns the role this endpoint plays.
195 fn endpoint_kind(&self) -> StreamEndpointKind;
196 /// Returns the clock domain this endpoint runs in.
197 fn clock_domain(&self) -> ClockDomain;
198 /// Returns the latency class this endpoint targets.
199 fn latency_class(&self) -> LatencyClass;
200
201 /// Validates that each input edge's clock domain matches this endpoint.
202 ///
203 /// Returns an error naming the first edge whose rate-contract clock domain
204 /// differs from [`clock_domain`](StreamEndpoint::clock_domain).
205 fn accept_input_edges(&self, edges: &[StreamEdge]) -> Result<()> {
206 for edge in edges {
207 if edge.rate_contract().clock_domain() != self.clock_domain() {
208 return Err(Error::Eval(format!(
209 "stream edge {} clock domain {} does not match endpoint {}",
210 edge.port(),
211 edge.rate_contract().clock_domain().wire_label(),
212 self.clock_domain().wire_label()
213 )));
214 }
215 }
216 Ok(())
217 }
218
219 /// Returns the envelopes this endpoint emits for `fragment`.
220 ///
221 /// Defaults to the fragment's own output envelopes; endpoints that
222 /// transform the stream override this.
223 fn output_envelopes(&self, fragment: &PlacedFragment) -> Result<Vec<StreamEnvelope>> {
224 Ok(fragment.output_envelopes())
225 }
226}
227
228/// Builds a [`StreamEdge`] for `port` from media, direction, and rate contract.
229///
230/// Synthesizes a `stream/edge` metadata record with a bounded single-slot
231/// buffer and the clock domain drawn from `rate_contract`.
232pub fn stream_edge(
233 port: impl Into<String>,
234 media: StreamMedia,
235 direction: StreamDirection,
236 rate_contract: RateContract,
237) -> StreamEdge {
238 let port = Symbol::new(port.into());
239 let metadata = StreamMetadata::new(
240 Symbol::qualified("stream/edge", port.name.to_string()),
241 media,
242 direction,
243 rate_contract.clock_domain().symbol(),
244 crate::BufferPolicy::bounded(1).expect("stream edge helper uses a nonzero buffer"),
245 );
246 StreamEdge::new(port, rate_contract, metadata)
247}