Skip to main content

datum/graph/
shapes.rs

1use super::*;
2
3pub trait Shape: Clone + Send + Sync + 'static {
4    fn inlets(&self) -> Vec<AnyInlet>;
5    fn outlets(&self) -> Vec<AnyOutlet>;
6}
7
8#[derive(Debug, PartialEq, Eq)]
9pub struct SourceShape<Out: 'static> {
10    outlet: Outlet<Out>,
11}
12
13impl<Out: 'static> Clone for SourceShape<Out> {
14    fn clone(&self) -> Self {
15        Self {
16            outlet: self.outlet.clone(),
17        }
18    }
19}
20
21impl<Out: 'static> SourceShape<Out> {
22    #[must_use]
23    pub fn new(outlet: Outlet<Out>) -> Self {
24        Self { outlet }
25    }
26
27    #[must_use]
28    pub fn outlet(&self) -> Outlet<Out> {
29        self.outlet.clone()
30    }
31}
32
33impl<Out: 'static> Shape for SourceShape<Out> {
34    fn inlets(&self) -> Vec<AnyInlet> {
35        Vec::new()
36    }
37
38    fn outlets(&self) -> Vec<AnyOutlet> {
39        vec![self.outlet.erase()]
40    }
41}
42
43#[derive(Debug, PartialEq, Eq)]
44pub struct SinkShape<In: 'static> {
45    inlet: Inlet<In>,
46}
47
48impl<In: 'static> Clone for SinkShape<In> {
49    fn clone(&self) -> Self {
50        Self {
51            inlet: self.inlet.clone(),
52        }
53    }
54}
55
56impl<In: 'static> SinkShape<In> {
57    #[must_use]
58    pub fn new(inlet: Inlet<In>) -> Self {
59        Self { inlet }
60    }
61
62    #[must_use]
63    pub fn inlet(&self) -> Inlet<In> {
64        self.inlet.clone()
65    }
66}
67
68impl<In: 'static> Shape for SinkShape<In> {
69    fn inlets(&self) -> Vec<AnyInlet> {
70        vec![self.inlet.erase()]
71    }
72
73    fn outlets(&self) -> Vec<AnyOutlet> {
74        Vec::new()
75    }
76}
77
78#[derive(Debug, PartialEq, Eq)]
79pub struct FlowShape<In: 'static, Out: 'static> {
80    inlet: Inlet<In>,
81    outlet: Outlet<Out>,
82}
83
84impl<In: 'static, Out: 'static> Clone for FlowShape<In, Out> {
85    fn clone(&self) -> Self {
86        Self {
87            inlet: self.inlet.clone(),
88            outlet: self.outlet.clone(),
89        }
90    }
91}
92
93impl<In: 'static, Out: 'static> FlowShape<In, Out> {
94    #[must_use]
95    pub fn new(inlet: Inlet<In>, outlet: Outlet<Out>) -> Self {
96        Self { inlet, outlet }
97    }
98
99    #[must_use]
100    pub fn inlet(&self) -> Inlet<In> {
101        self.inlet.clone()
102    }
103
104    #[must_use]
105    pub fn outlet(&self) -> Outlet<Out> {
106        self.outlet.clone()
107    }
108}
109
110impl<In: 'static, Out: 'static> Shape for FlowShape<In, Out> {
111    fn inlets(&self) -> Vec<AnyInlet> {
112        vec![self.inlet.erase()]
113    }
114
115    fn outlets(&self) -> Vec<AnyOutlet> {
116        vec![self.outlet.erase()]
117    }
118}
119
120#[derive(Debug, PartialEq, Eq)]
121pub struct FanInShape<In: 'static, Out: 'static = In> {
122    inlets: Vec<Inlet<In>>,
123    outlet: Outlet<Out>,
124}
125
126impl<In: 'static, Out: 'static> Clone for FanInShape<In, Out> {
127    fn clone(&self) -> Self {
128        Self {
129            inlets: self.inlets.clone(),
130            outlet: self.outlet.clone(),
131        }
132    }
133}
134
135impl<In: 'static, Out: 'static> FanInShape<In, Out> {
136    #[must_use]
137    pub fn new(inlets: Vec<Inlet<In>>, outlet: Outlet<Out>) -> Self {
138        Self { inlets, outlet }
139    }
140
141    #[must_use]
142    pub fn inlet_count(&self) -> usize {
143        self.inlets.len()
144    }
145
146    pub fn inlet(&self, index: usize) -> StreamResult<Inlet<In>> {
147        self.inlets
148            .get(index)
149            .cloned()
150            .ok_or_else(|| StreamError::GraphValidation(format!("fan-in inlet {index} is missing")))
151    }
152
153    #[must_use]
154    pub fn inlets_vec(&self) -> Vec<Inlet<In>> {
155        self.inlets.clone()
156    }
157
158    #[must_use]
159    pub fn outlet(&self) -> Outlet<Out> {
160        self.outlet.clone()
161    }
162}
163
164impl<In: 'static, Out: 'static> Shape for FanInShape<In, Out> {
165    fn inlets(&self) -> Vec<AnyInlet> {
166        self.inlets.iter().map(Inlet::erase).collect()
167    }
168
169    fn outlets(&self) -> Vec<AnyOutlet> {
170        vec![self.outlet.erase()]
171    }
172}
173
174#[derive(Debug, PartialEq, Eq)]
175pub struct MergePreferredShape<T: 'static> {
176    preferred: Inlet<T>,
177    secondary: Vec<Inlet<T>>,
178    outlet: Outlet<T>,
179}
180
181impl<T: 'static> Clone for MergePreferredShape<T> {
182    fn clone(&self) -> Self {
183        Self {
184            preferred: self.preferred.clone(),
185            secondary: self.secondary.clone(),
186            outlet: self.outlet.clone(),
187        }
188    }
189}
190
191impl<T: 'static> MergePreferredShape<T> {
192    #[must_use]
193    pub fn new(preferred: Inlet<T>, secondary: Vec<Inlet<T>>, outlet: Outlet<T>) -> Self {
194        Self {
195            preferred,
196            secondary,
197            outlet,
198        }
199    }
200
201    #[must_use]
202    pub fn preferred(&self) -> Inlet<T> {
203        self.preferred.clone()
204    }
205
206    #[must_use]
207    pub fn secondary_count(&self) -> usize {
208        self.secondary.len()
209    }
210
211    pub fn secondary(&self, index: usize) -> StreamResult<Inlet<T>> {
212        self.secondary.get(index).cloned().ok_or_else(|| {
213            StreamError::GraphValidation(format!(
214                "merge-preferred secondary inlet {index} is missing"
215            ))
216        })
217    }
218
219    #[must_use]
220    pub fn secondary_vec(&self) -> Vec<Inlet<T>> {
221        self.secondary.clone()
222    }
223
224    #[must_use]
225    pub fn outlet(&self) -> Outlet<T> {
226        self.outlet.clone()
227    }
228}
229
230impl<T: 'static> Shape for MergePreferredShape<T> {
231    fn inlets(&self) -> Vec<AnyInlet> {
232        std::iter::once(self.preferred.erase())
233            .chain(self.secondary.iter().map(Inlet::erase))
234            .collect()
235    }
236
237    fn outlets(&self) -> Vec<AnyOutlet> {
238        vec![self.outlet.erase()]
239    }
240}
241
242#[derive(Debug, PartialEq, Eq)]
243pub struct FanOutShape<In: 'static, Out: 'static = In> {
244    inlet: Inlet<In>,
245    outlets: Vec<Outlet<Out>>,
246}
247
248impl<In: 'static, Out: 'static> Clone for FanOutShape<In, Out> {
249    fn clone(&self) -> Self {
250        Self {
251            inlet: self.inlet.clone(),
252            outlets: self.outlets.clone(),
253        }
254    }
255}
256
257impl<In: 'static, Out: 'static> FanOutShape<In, Out> {
258    #[must_use]
259    pub fn new(inlet: Inlet<In>, outlets: Vec<Outlet<Out>>) -> Self {
260        Self { inlet, outlets }
261    }
262
263    #[must_use]
264    pub fn inlet(&self) -> Inlet<In> {
265        self.inlet.clone()
266    }
267
268    #[must_use]
269    pub fn outlet_count(&self) -> usize {
270        self.outlets.len()
271    }
272
273    pub fn outlet(&self, index: usize) -> StreamResult<Outlet<Out>> {
274        self.outlets.get(index).cloned().ok_or_else(|| {
275            StreamError::GraphValidation(format!("fan-out outlet {index} is missing"))
276        })
277    }
278
279    #[must_use]
280    pub fn outlets_vec(&self) -> Vec<Outlet<Out>> {
281        self.outlets.clone()
282    }
283}
284
285impl<In: 'static, Out: 'static> Shape for FanOutShape<In, Out> {
286    fn inlets(&self) -> Vec<AnyInlet> {
287        vec![self.inlet.erase()]
288    }
289
290    fn outlets(&self) -> Vec<AnyOutlet> {
291        self.outlets.iter().map(Outlet::erase).collect()
292    }
293}
294
295#[derive(Debug, PartialEq, Eq)]
296pub struct FanOutShape2<In: 'static, Out0: 'static, Out1: 'static> {
297    inlet: Inlet<In>,
298    out0: Outlet<Out0>,
299    out1: Outlet<Out1>,
300}
301
302impl<In: 'static, Out0: 'static, Out1: 'static> Clone for FanOutShape2<In, Out0, Out1> {
303    fn clone(&self) -> Self {
304        Self {
305            inlet: self.inlet.clone(),
306            out0: self.out0.clone(),
307            out1: self.out1.clone(),
308        }
309    }
310}
311
312impl<In: 'static, Out0: 'static, Out1: 'static> FanOutShape2<In, Out0, Out1> {
313    #[must_use]
314    pub fn new(inlet: Inlet<In>, out0: Outlet<Out0>, out1: Outlet<Out1>) -> Self {
315        Self { inlet, out0, out1 }
316    }
317
318    #[must_use]
319    pub fn inlet(&self) -> Inlet<In> {
320        self.inlet.clone()
321    }
322
323    #[must_use]
324    pub fn out0(&self) -> Outlet<Out0> {
325        self.out0.clone()
326    }
327
328    #[must_use]
329    pub fn out1(&self) -> Outlet<Out1> {
330        self.out1.clone()
331    }
332}
333
334impl<In: 'static, Out0: 'static, Out1: 'static> Shape for FanOutShape2<In, Out0, Out1> {
335    fn inlets(&self) -> Vec<AnyInlet> {
336        vec![self.inlet.erase()]
337    }
338
339    fn outlets(&self) -> Vec<AnyOutlet> {
340        vec![self.out0.erase(), self.out1.erase()]
341    }
342}
343
344#[derive(Debug, PartialEq, Eq)]
345pub struct ZipShape<Left: 'static, Right: 'static> {
346    left: Inlet<Left>,
347    right: Inlet<Right>,
348    outlet: Outlet<(Left, Right)>,
349}
350
351impl<Left: 'static, Right: 'static> Clone for ZipShape<Left, Right> {
352    fn clone(&self) -> Self {
353        Self {
354            left: self.left.clone(),
355            right: self.right.clone(),
356            outlet: self.outlet.clone(),
357        }
358    }
359}
360
361impl<Left: 'static, Right: 'static> ZipShape<Left, Right> {
362    #[must_use]
363    pub fn new(left: Inlet<Left>, right: Inlet<Right>, outlet: Outlet<(Left, Right)>) -> Self {
364        Self {
365            left,
366            right,
367            outlet,
368        }
369    }
370
371    #[must_use]
372    pub fn in0(&self) -> Inlet<Left> {
373        self.left.clone()
374    }
375
376    #[must_use]
377    pub fn in1(&self) -> Inlet<Right> {
378        self.right.clone()
379    }
380
381    #[must_use]
382    pub fn outlet(&self) -> Outlet<(Left, Right)> {
383        self.outlet.clone()
384    }
385}
386
387impl<Left: 'static, Right: 'static> Shape for ZipShape<Left, Right> {
388    fn inlets(&self) -> Vec<AnyInlet> {
389        vec![self.left.erase(), self.right.erase()]
390    }
391
392    fn outlets(&self) -> Vec<AnyOutlet> {
393        vec![self.outlet.erase()]
394    }
395}
396
397#[derive(Debug, PartialEq, Eq)]
398pub struct BidiShape<I1: 'static, O1: 'static, I2: 'static, O2: 'static> {
399    in1: Inlet<I1>,
400    out1: Outlet<O1>,
401    in2: Inlet<I2>,
402    out2: Outlet<O2>,
403}
404
405impl<I1: 'static, O1: 'static, I2: 'static, O2: 'static> Clone for BidiShape<I1, O1, I2, O2> {
406    fn clone(&self) -> Self {
407        Self {
408            in1: self.in1.clone(),
409            out1: self.out1.clone(),
410            in2: self.in2.clone(),
411            out2: self.out2.clone(),
412        }
413    }
414}
415
416impl<I1: 'static, O1: 'static, I2: 'static, O2: 'static> BidiShape<I1, O1, I2, O2> {
417    #[must_use]
418    pub fn new(in1: Inlet<I1>, out1: Outlet<O1>, in2: Inlet<I2>, out2: Outlet<O2>) -> Self {
419        Self {
420            in1,
421            out1,
422            in2,
423            out2,
424        }
425    }
426
427    #[must_use]
428    pub fn in1(&self) -> Inlet<I1> {
429        self.in1.clone()
430    }
431
432    #[must_use]
433    pub fn out1(&self) -> Outlet<O1> {
434        self.out1.clone()
435    }
436
437    #[must_use]
438    pub fn in2(&self) -> Inlet<I2> {
439        self.in2.clone()
440    }
441
442    #[must_use]
443    pub fn out2(&self) -> Outlet<O2> {
444        self.out2.clone()
445    }
446
447    #[must_use]
448    pub fn from_flows<In1: 'static, Out1: 'static, In2: 'static, Out2: 'static>(
449        top: &FlowShape<In1, Out1>,
450        bottom: &FlowShape<In2, Out2>,
451    ) -> BidiShape<In1, Out1, In2, Out2> {
452        BidiShape::new(top.inlet(), top.outlet(), bottom.inlet(), bottom.outlet())
453    }
454}
455
456impl<I1: 'static, O1: 'static, I2: 'static, O2: 'static> Shape for BidiShape<I1, O1, I2, O2> {
457    fn inlets(&self) -> Vec<AnyInlet> {
458        vec![self.in1.erase(), self.in2.erase()]
459    }
460
461    fn outlets(&self) -> Vec<AnyOutlet> {
462        vec![self.out1.erase(), self.out2.erase()]
463    }
464}
465
466#[derive(Debug, Default)]
467pub struct PortAllocator;
468
469impl PortAllocator {
470    #[must_use]
471    pub fn inlet<T: 'static>(&mut self, name: impl Into<String>) -> Inlet<T> {
472        Inlet::with_id(next_port_id(), name)
473    }
474
475    pub(super) fn inlet_arc<T: 'static>(&mut self, name: Arc<str>) -> Inlet<T> {
476        Inlet::with_arc_name(next_port_id(), name)
477    }
478
479    #[must_use]
480    pub fn outlet<T: 'static>(&mut self, name: impl Into<String>) -> Outlet<T> {
481        Outlet::with_id(next_port_id(), name)
482    }
483
484    pub(super) fn outlet_arc<T: 'static>(&mut self, name: Arc<str>) -> Outlet<T> {
485        Outlet::with_arc_name(next_port_id(), name)
486    }
487}