Skip to main content

datum/graph/
shapes.rs

1//! Port-bundle shapes describing a graph's (or stage's) external ports.
2//!
3//! A [`Shape`] is the contract returned from a `GraphDsl` builder closure and
4//! declared by every [`GraphStage`](super::GraphStage): it enumerates the open
5//! inlets and outlets. The builder's `finish` step checks that the result
6//! shape's ports exactly match the graph's unconnected ports. Shapes mirror
7//! Akka's `Shape` hierarchy (`SourceShape`, `FlowShape`, `FanInShape`, …).
8
9use super::*;
10
11/// A bundle of typed external ports. Implementors expose their inlets/outlets
12/// in a stable order; `inlets()`/`outlets()` return the type-erased views the
13/// builder validates against. Cheap to clone (ports are id + name).
14pub trait Shape: Clone + Send + Sync + 'static {
15    fn inlets(&self) -> Vec<AnyInlet>;
16    fn outlets(&self) -> Vec<AnyOutlet>;
17}
18
19/// One outlet, no inlets — a graph that behaves as a `Source`.
20#[derive(Debug, PartialEq, Eq)]
21pub struct SourceShape<Out: 'static> {
22    outlet: Outlet<Out>,
23}
24
25impl<Out: 'static> Clone for SourceShape<Out> {
26    fn clone(&self) -> Self {
27        Self {
28            outlet: self.outlet.clone(),
29        }
30    }
31}
32
33impl<Out: 'static> SourceShape<Out> {
34    #[must_use]
35    pub fn new(outlet: Outlet<Out>) -> Self {
36        Self { outlet }
37    }
38
39    #[must_use]
40    pub fn outlet(&self) -> Outlet<Out> {
41        self.outlet.clone()
42    }
43}
44
45impl<Out: 'static> Shape for SourceShape<Out> {
46    fn inlets(&self) -> Vec<AnyInlet> {
47        Vec::new()
48    }
49
50    fn outlets(&self) -> Vec<AnyOutlet> {
51        vec![self.outlet.erase()]
52    }
53}
54
55/// One inlet, no outlets — a graph that behaves as a `Sink`.
56#[derive(Debug, PartialEq, Eq)]
57pub struct SinkShape<In: 'static> {
58    inlet: Inlet<In>,
59}
60
61impl<In: 'static> Clone for SinkShape<In> {
62    fn clone(&self) -> Self {
63        Self {
64            inlet: self.inlet.clone(),
65        }
66    }
67}
68
69impl<In: 'static> SinkShape<In> {
70    #[must_use]
71    pub fn new(inlet: Inlet<In>) -> Self {
72        Self { inlet }
73    }
74
75    #[must_use]
76    pub fn inlet(&self) -> Inlet<In> {
77        self.inlet.clone()
78    }
79}
80
81impl<In: 'static> Shape for SinkShape<In> {
82    fn inlets(&self) -> Vec<AnyInlet> {
83        vec![self.inlet.erase()]
84    }
85
86    fn outlets(&self) -> Vec<AnyOutlet> {
87        Vec::new()
88    }
89}
90
91/// One inlet and one outlet — a graph that behaves as a `Flow`. Re-exported as
92/// `GraphFlowShape` from the crate root.
93#[derive(Debug, PartialEq, Eq)]
94pub struct FlowShape<In: 'static, Out: 'static> {
95    inlet: Inlet<In>,
96    outlet: Outlet<Out>,
97}
98
99impl<In: 'static, Out: 'static> Clone for FlowShape<In, Out> {
100    fn clone(&self) -> Self {
101        Self {
102            inlet: self.inlet.clone(),
103            outlet: self.outlet.clone(),
104        }
105    }
106}
107
108impl<In: 'static, Out: 'static> FlowShape<In, Out> {
109    #[must_use]
110    pub fn new(inlet: Inlet<In>, outlet: Outlet<Out>) -> Self {
111        Self { inlet, outlet }
112    }
113
114    #[must_use]
115    pub fn inlet(&self) -> Inlet<In> {
116        self.inlet.clone()
117    }
118
119    #[must_use]
120    pub fn outlet(&self) -> Outlet<Out> {
121        self.outlet.clone()
122    }
123}
124
125impl<In: 'static, Out: 'static> Shape for FlowShape<In, Out> {
126    fn inlets(&self) -> Vec<AnyInlet> {
127        vec![self.inlet.erase()]
128    }
129
130    fn outlets(&self) -> Vec<AnyOutlet> {
131        vec![self.outlet.erase()]
132    }
133}
134
135/// N inlets and one outlet — the shape of merge-style junctions (`Merge`,
136/// `Concat`, `Interleave`, `OrElse`, `MergeSorted`, …). `Out` defaults to `In`;
137/// `MergeLatest` uses `Out = Vec<In>`.
138#[derive(Debug, PartialEq, Eq)]
139pub struct FanInShape<In: 'static, Out: 'static = In> {
140    inlets: Vec<Inlet<In>>,
141    outlet: Outlet<Out>,
142}
143
144impl<In: 'static, Out: 'static> Clone for FanInShape<In, Out> {
145    fn clone(&self) -> Self {
146        Self {
147            inlets: self.inlets.clone(),
148            outlet: self.outlet.clone(),
149        }
150    }
151}
152
153impl<In: 'static, Out: 'static> FanInShape<In, Out> {
154    #[must_use]
155    pub fn new(inlets: Vec<Inlet<In>>, outlet: Outlet<Out>) -> Self {
156        Self { inlets, outlet }
157    }
158
159    #[must_use]
160    pub fn inlet_count(&self) -> usize {
161        self.inlets.len()
162    }
163
164    pub fn inlet(&self, index: usize) -> StreamResult<Inlet<In>> {
165        self.inlets
166            .get(index)
167            .cloned()
168            .ok_or_else(|| StreamError::GraphValidation(format!("fan-in inlet {index} is missing")))
169    }
170
171    #[must_use]
172    pub fn inlets_vec(&self) -> Vec<Inlet<In>> {
173        self.inlets.clone()
174    }
175
176    #[must_use]
177    pub fn outlet(&self) -> Outlet<Out> {
178        self.outlet.clone()
179    }
180}
181
182impl<In: 'static, Out: 'static> Shape for FanInShape<In, Out> {
183    fn inlets(&self) -> Vec<AnyInlet> {
184        self.inlets.iter().map(Inlet::erase).collect()
185    }
186
187    fn outlets(&self) -> Vec<AnyOutlet> {
188        vec![self.outlet.erase()]
189    }
190}
191
192/// One preferred inlet, N secondary inlets, and one outlet — the shape of
193/// `MergePreferred`. Explicit-only in the wiring DSL (use `.preferred()` /
194/// `.secondary(i)`).
195#[derive(Debug, PartialEq, Eq)]
196pub struct MergePreferredShape<T: 'static> {
197    preferred: Inlet<T>,
198    secondary: Vec<Inlet<T>>,
199    outlet: Outlet<T>,
200}
201
202impl<T: 'static> Clone for MergePreferredShape<T> {
203    fn clone(&self) -> Self {
204        Self {
205            preferred: self.preferred.clone(),
206            secondary: self.secondary.clone(),
207            outlet: self.outlet.clone(),
208        }
209    }
210}
211
212impl<T: 'static> MergePreferredShape<T> {
213    #[must_use]
214    pub fn new(preferred: Inlet<T>, secondary: Vec<Inlet<T>>, outlet: Outlet<T>) -> Self {
215        Self {
216            preferred,
217            secondary,
218            outlet,
219        }
220    }
221
222    #[must_use]
223    pub fn preferred(&self) -> Inlet<T> {
224        self.preferred.clone()
225    }
226
227    #[must_use]
228    pub fn secondary_count(&self) -> usize {
229        self.secondary.len()
230    }
231
232    pub fn secondary(&self, index: usize) -> StreamResult<Inlet<T>> {
233        self.secondary.get(index).cloned().ok_or_else(|| {
234            StreamError::GraphValidation(format!(
235                "merge-preferred secondary inlet {index} is missing"
236            ))
237        })
238    }
239
240    #[must_use]
241    pub fn secondary_vec(&self) -> Vec<Inlet<T>> {
242        self.secondary.clone()
243    }
244
245    #[must_use]
246    pub fn outlet(&self) -> Outlet<T> {
247        self.outlet.clone()
248    }
249}
250
251impl<T: 'static> Shape for MergePreferredShape<T> {
252    fn inlets(&self) -> Vec<AnyInlet> {
253        std::iter::once(self.preferred.erase())
254            .chain(self.secondary.iter().map(Inlet::erase))
255            .collect()
256    }
257
258    fn outlets(&self) -> Vec<AnyOutlet> {
259        vec![self.outlet.erase()]
260    }
261}
262
263/// One inlet and N outlets — the shape of fan-out junctions (`Broadcast`,
264/// `Balance`, `Partition`). `Out` defaults to `In`.
265#[derive(Debug, PartialEq, Eq)]
266pub struct FanOutShape<In: 'static, Out: 'static = In> {
267    inlet: Inlet<In>,
268    outlets: Vec<Outlet<Out>>,
269}
270
271impl<In: 'static, Out: 'static> Clone for FanOutShape<In, Out> {
272    fn clone(&self) -> Self {
273        Self {
274            inlet: self.inlet.clone(),
275            outlets: self.outlets.clone(),
276        }
277    }
278}
279
280impl<In: 'static, Out: 'static> FanOutShape<In, Out> {
281    #[must_use]
282    pub fn new(inlet: Inlet<In>, outlets: Vec<Outlet<Out>>) -> Self {
283        Self { inlet, outlets }
284    }
285
286    #[must_use]
287    pub fn inlet(&self) -> Inlet<In> {
288        self.inlet.clone()
289    }
290
291    #[must_use]
292    pub fn outlet_count(&self) -> usize {
293        self.outlets.len()
294    }
295
296    pub fn outlet(&self, index: usize) -> StreamResult<Outlet<Out>> {
297        self.outlets.get(index).cloned().ok_or_else(|| {
298            StreamError::GraphValidation(format!("fan-out outlet {index} is missing"))
299        })
300    }
301
302    #[must_use]
303    pub fn outlets_vec(&self) -> Vec<Outlet<Out>> {
304        self.outlets.clone()
305    }
306}
307
308impl<In: 'static, Out: 'static> Shape for FanOutShape<In, Out> {
309    fn inlets(&self) -> Vec<AnyInlet> {
310        vec![self.inlet.erase()]
311    }
312
313    fn outlets(&self) -> Vec<AnyOutlet> {
314        self.outlets.iter().map(Outlet::erase).collect()
315    }
316}
317
318/// One inlet and two heterogeneously-typed outlets — the shape of `Unzip` and
319/// `UnzipWith`. Outlets are reached via `.out0()` / `.out1()`.
320#[derive(Debug, PartialEq, Eq)]
321pub struct FanOutShape2<In: 'static, Out0: 'static, Out1: 'static> {
322    inlet: Inlet<In>,
323    out0: Outlet<Out0>,
324    out1: Outlet<Out1>,
325}
326
327impl<In: 'static, Out0: 'static, Out1: 'static> Clone for FanOutShape2<In, Out0, Out1> {
328    fn clone(&self) -> Self {
329        Self {
330            inlet: self.inlet.clone(),
331            out0: self.out0.clone(),
332            out1: self.out1.clone(),
333        }
334    }
335}
336
337impl<In: 'static, Out0: 'static, Out1: 'static> FanOutShape2<In, Out0, Out1> {
338    #[must_use]
339    pub fn new(inlet: Inlet<In>, out0: Outlet<Out0>, out1: Outlet<Out1>) -> Self {
340        Self { inlet, out0, out1 }
341    }
342
343    #[must_use]
344    pub fn inlet(&self) -> Inlet<In> {
345        self.inlet.clone()
346    }
347
348    #[must_use]
349    pub fn out0(&self) -> Outlet<Out0> {
350        self.out0.clone()
351    }
352
353    #[must_use]
354    pub fn out1(&self) -> Outlet<Out1> {
355        self.out1.clone()
356    }
357}
358
359impl<In: 'static, Out0: 'static, Out1: 'static> Shape for FanOutShape2<In, Out0, Out1> {
360    fn inlets(&self) -> Vec<AnyInlet> {
361        vec![self.inlet.erase()]
362    }
363
364    fn outlets(&self) -> Vec<AnyOutlet> {
365        vec![self.out0.erase(), self.out1.erase()]
366    }
367}
368
369/// Two heterogeneously-typed inlets and one `(Left, Right)` outlet — the shape
370/// of `Zip`. Inlets are reached via `.in0()` / `.in1()`.
371#[derive(Debug, PartialEq, Eq)]
372pub struct ZipShape<Left: 'static, Right: 'static> {
373    left: Inlet<Left>,
374    right: Inlet<Right>,
375    outlet: Outlet<(Left, Right)>,
376}
377
378impl<Left: 'static, Right: 'static> Clone for ZipShape<Left, Right> {
379    fn clone(&self) -> Self {
380        Self {
381            left: self.left.clone(),
382            right: self.right.clone(),
383            outlet: self.outlet.clone(),
384        }
385    }
386}
387
388impl<Left: 'static, Right: 'static> ZipShape<Left, Right> {
389    #[must_use]
390    pub fn new(left: Inlet<Left>, right: Inlet<Right>, outlet: Outlet<(Left, Right)>) -> Self {
391        Self {
392            left,
393            right,
394            outlet,
395        }
396    }
397
398    #[must_use]
399    pub fn in0(&self) -> Inlet<Left> {
400        self.left.clone()
401    }
402
403    #[must_use]
404    pub fn in1(&self) -> Inlet<Right> {
405        self.right.clone()
406    }
407
408    #[must_use]
409    pub fn outlet(&self) -> Outlet<(Left, Right)> {
410        self.outlet.clone()
411    }
412}
413
414impl<Left: 'static, Right: 'static> Shape for ZipShape<Left, Right> {
415    fn inlets(&self) -> Vec<AnyInlet> {
416        vec![self.left.erase(), self.right.erase()]
417    }
418
419    fn outlets(&self) -> Vec<AnyOutlet> {
420        vec![self.outlet.erase()]
421    }
422}
423
424/// Two inlets and two outlets arranged as two independent directions — the
425/// shape of a bidirectional flow (top: `I1 → O1`, bottom: `I2 → O2`).
426/// Explicit-only in the wiring DSL.
427#[derive(Debug, PartialEq, Eq)]
428pub struct BidiShape<I1: 'static, O1: 'static, I2: 'static, O2: 'static> {
429    in1: Inlet<I1>,
430    out1: Outlet<O1>,
431    in2: Inlet<I2>,
432    out2: Outlet<O2>,
433}
434
435impl<I1: 'static, O1: 'static, I2: 'static, O2: 'static> Clone for BidiShape<I1, O1, I2, O2> {
436    fn clone(&self) -> Self {
437        Self {
438            in1: self.in1.clone(),
439            out1: self.out1.clone(),
440            in2: self.in2.clone(),
441            out2: self.out2.clone(),
442        }
443    }
444}
445
446impl<I1: 'static, O1: 'static, I2: 'static, O2: 'static> BidiShape<I1, O1, I2, O2> {
447    #[must_use]
448    pub fn new(in1: Inlet<I1>, out1: Outlet<O1>, in2: Inlet<I2>, out2: Outlet<O2>) -> Self {
449        Self {
450            in1,
451            out1,
452            in2,
453            out2,
454        }
455    }
456
457    #[must_use]
458    pub fn in1(&self) -> Inlet<I1> {
459        self.in1.clone()
460    }
461
462    #[must_use]
463    pub fn out1(&self) -> Outlet<O1> {
464        self.out1.clone()
465    }
466
467    #[must_use]
468    pub fn in2(&self) -> Inlet<I2> {
469        self.in2.clone()
470    }
471
472    #[must_use]
473    pub fn out2(&self) -> Outlet<O2> {
474        self.out2.clone()
475    }
476
477    #[must_use]
478    pub fn from_flows<In1: 'static, Out1: 'static, In2: 'static, Out2: 'static>(
479        top: &FlowShape<In1, Out1>,
480        bottom: &FlowShape<In2, Out2>,
481    ) -> BidiShape<In1, Out1, In2, Out2> {
482        BidiShape::new(top.inlet(), top.outlet(), bottom.inlet(), bottom.outlet())
483    }
484}
485
486impl<I1: 'static, O1: 'static, I2: 'static, O2: 'static> Shape for BidiShape<I1, O1, I2, O2> {
487    fn inlets(&self) -> Vec<AnyInlet> {
488        vec![self.in1.erase(), self.in2.erase()]
489    }
490
491    fn outlets(&self) -> Vec<AnyOutlet> {
492        vec![self.out1.erase(), self.out2.erase()]
493    }
494}
495
496/// Hands a [`GraphStage`](super::GraphStage) fresh, uniquely-identified ports
497/// when its shape is allocated. Each `inlet`/`outlet` call draws a new global
498/// [`PortId`](super::PortId); the allocator itself is stateless.
499#[derive(Debug, Default)]
500pub struct PortAllocator;
501
502impl PortAllocator {
503    #[must_use]
504    pub fn inlet<T: 'static>(&mut self, name: impl Into<String>) -> Inlet<T> {
505        Inlet::with_id(next_port_id(), name)
506    }
507
508    pub(super) fn inlet_arc<T: 'static>(&mut self, name: Arc<str>) -> Inlet<T> {
509        Inlet::with_arc_name(next_port_id(), name)
510    }
511
512    #[must_use]
513    pub fn outlet<T: 'static>(&mut self, name: impl Into<String>) -> Outlet<T> {
514        Outlet::with_id(next_port_id(), name)
515    }
516
517    pub(super) fn outlet_arc<T: 'static>(&mut self, name: Arc<str>) -> Outlet<T> {
518        Outlet::with_arc_name(next_port_id(), name)
519    }
520}