Skip to main content

datum/graph/
wire.rs

1//! The M8 method-based graph wiring DSL (additive over `connect`).
2//!
3//! [`WireDsl`] adds `shape.to(&other)` plus `.out(i)` / `.in_(i)` cursors so
4//! edges read as `builder.wire(a.to(&b))`. `to` auto-selects the next
5//! unconnected outlet/inlet in declaration order; cursors pick an exact port.
6//! [`GraphBuilder::wire`] defers errors to graph creation, while `try_wire`
7//! fails fast. `MergePreferredShape` and `BidiShape` are explicit-only (no
8//! `AutoOutletEndpoint`/`AutoInletEndpoint` impls — the `compile_fail` doctests
9//! below pin that). All of this is sugar over the same
10//! [`GraphBuilder::connect_any_unrecorded`] validation that `connect` uses.
11
12use super::*;
13
14/// A graph wiring specification accepted by [`GraphBuilder::wire`] and
15/// [`GraphBuilder::try_wire`].
16pub trait WireSpec {
17    fn apply(self, builder: &mut GraphBuilder) -> StreamResult<()>;
18}
19
20/// Shape extension methods for method-based graph wiring.
21///
22/// `to` intentionally supports only shapes with an unambiguous auto-outlet.
23/// `MergePreferredShape` and `BidiShape` are explicit-only.
24///
25/// ```compile_fail
26/// use datum::{Broadcast, GraphDsl, GraphFlowShape, MergePreferred, WireDsl};
27///
28/// let _ = GraphDsl::try_create(|builder| {
29///     let merge = builder.add(MergePreferred::<u64>::new(1));
30///     let bcast = builder.add(Broadcast::<u64>::new(2));
31///
32///     builder.wire(merge.to(&bcast));
33///
34///     Ok(GraphFlowShape::new(merge.secondary(0)?, bcast.outlet(0)?))
35/// });
36/// ```
37///
38/// ```compile_fail
39/// use datum::{BidiShape, FlowShape, GraphDsl, Inlet, Outlet, WireDsl};
40///
41/// let _ = GraphDsl::try_create(|builder| {
42///     let bidi = BidiShape::new(
43///         Inlet::<u64>::new("bidi.in1"),
44///         Outlet::<u64>::new("bidi.out1"),
45///         Inlet::<u64>::new("bidi.in2"),
46///         Outlet::<u64>::new("bidi.out2"),
47///     );
48///     let flow = FlowShape::new(Inlet::<u64>::new("flow.in"), Outlet::<u64>::new("flow.out"));
49///
50///     builder.wire(bidi.to(&flow));
51///
52///     Ok(flow)
53/// });
54/// ```
55pub trait WireDsl: Shape + Clone {
56    fn to<D>(&self, inlet: D) -> WirePair<Self, D>
57    where
58        Self: AutoOutletEndpoint,
59    {
60        WirePair::new(self.clone(), inlet)
61    }
62
63    #[must_use]
64    fn out(&self, index: usize) -> OutletCursor<Self> {
65        OutletCursor::new(self.clone(), index)
66    }
67
68    #[must_use]
69    fn in_(&self, index: usize) -> InletCursor<Self> {
70        InletCursor::new(self.clone(), index)
71    }
72}
73
74impl<S> WireDsl for S where S: Shape + Clone {}
75
76#[diagnostic::on_unimplemented(
77    message = "`{Self}` cannot be auto-wired as a graph DSL outlet",
78    note = "MergePreferred and Bidi shapes are explicit-only; use `.preferred()` / `.secondary(i)` / explicit `.out(i)` cursors"
79)]
80#[doc(hidden)]
81pub trait AutoOutletEndpoint: Shape {
82    fn auto_outlet(&self, builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
83        select_auto_outlet(&self.outlets(), builder)
84    }
85}
86
87#[diagnostic::on_unimplemented(
88    message = "`{Self}` cannot be auto-wired as a graph DSL inlet",
89    note = "MergePreferred and Bidi shapes are explicit-only; use `.preferred()` / `.secondary(i)` / explicit `.in_(i)` cursors"
90)]
91#[doc(hidden)]
92pub trait AutoInletEndpoint: Shape {
93    fn auto_inlet(&self, builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
94        select_auto_inlet(&self.inlets(), builder)
95    }
96}
97
98impl<Out: 'static> AutoOutletEndpoint for SourceShape<Out> {}
99impl<In: 'static, Out: 'static> AutoOutletEndpoint for FlowShape<In, Out> {}
100impl<In: 'static, Out: 'static> AutoOutletEndpoint for FanInShape<In, Out> {}
101impl<In: 'static, Out: 'static> AutoOutletEndpoint for FanOutShape<In, Out> {}
102impl<In: 'static, Out0: 'static, Out1: 'static> AutoOutletEndpoint
103    for FanOutShape2<In, Out0, Out1>
104{
105}
106impl<Left: 'static, Right: 'static> AutoOutletEndpoint for ZipShape<Left, Right> {}
107
108impl<In: 'static> AutoInletEndpoint for SinkShape<In> {}
109impl<In: 'static, Out: 'static> AutoInletEndpoint for FlowShape<In, Out> {}
110impl<In: 'static, Out: 'static> AutoInletEndpoint for FanInShape<In, Out> {}
111impl<In: 'static, Out: 'static> AutoInletEndpoint for FanOutShape<In, Out> {}
112impl<In: 'static, Out0: 'static, Out1: 'static> AutoInletEndpoint for FanOutShape2<In, Out0, Out1> {}
113impl<Left: 'static, Right: 'static> AutoInletEndpoint for ZipShape<Left, Right> {}
114
115#[derive(Clone, Debug)]
116pub struct OutletCursor<S> {
117    shape: S,
118    index: usize,
119}
120
121impl<S> OutletCursor<S> {
122    #[must_use]
123    pub fn new(shape: S, index: usize) -> Self {
124        Self { shape, index }
125    }
126
127    #[must_use]
128    pub const fn index(&self) -> usize {
129        self.index
130    }
131
132    #[must_use]
133    pub fn to<D>(self, inlet: D) -> WirePair<Self, D> {
134        WirePair::new(self, inlet)
135    }
136}
137
138#[derive(Clone, Debug)]
139pub struct InletCursor<S> {
140    shape: S,
141    index: usize,
142}
143
144impl<S> InletCursor<S> {
145    #[must_use]
146    pub fn new(shape: S, index: usize) -> Self {
147        Self { shape, index }
148    }
149
150    #[must_use]
151    pub const fn index(&self) -> usize {
152        self.index
153    }
154}
155
156#[derive(Clone, Debug)]
157pub struct WirePair<Out, In> {
158    outlet: Out,
159    inlet: In,
160}
161
162impl<Out, In> WirePair<Out, In> {
163    #[must_use]
164    pub fn new(outlet: Out, inlet: In) -> Self {
165        Self { outlet, inlet }
166    }
167}
168
169impl<Out, In> WireSpec for WirePair<Out, In>
170where
171    Out: WireOutletEndpoint,
172    In: WireInletEndpoint,
173{
174    fn apply(self, builder: &mut GraphBuilder) -> StreamResult<()> {
175        let outlet = self.outlet.select_outlet(builder)?;
176        let inlet = self.inlet.select_inlet(builder)?;
177        builder
178            .connect_any_unrecorded(outlet.port.clone(), inlet.port.clone())
179            .map_err(|error| {
180                StreamError::GraphValidation(format!(
181                    "{} -> {}: {}",
182                    outlet.label, inlet.label, error
183                ))
184            })
185    }
186}
187
188#[doc(hidden)]
189#[derive(Clone, Debug)]
190pub struct SelectedOutlet {
191    port: AnyOutlet,
192    label: String,
193}
194
195#[doc(hidden)]
196#[derive(Clone, Debug)]
197pub struct SelectedInlet {
198    port: AnyInlet,
199    label: String,
200}
201
202trait WireOutletEndpoint {
203    fn select_outlet(self, builder: &GraphBuilder) -> StreamResult<SelectedOutlet>;
204}
205
206trait WireInletEndpoint {
207    fn select_inlet(self, builder: &GraphBuilder) -> StreamResult<SelectedInlet>;
208}
209
210macro_rules! impl_auto_outlet_endpoint {
211    ($($params:ident),*; $shape:ty) => {
212        impl<$($params: 'static),*> WireOutletEndpoint for $shape {
213            fn select_outlet(self, builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
214                self.auto_outlet(builder)
215            }
216        }
217
218        impl<$($params: 'static),*> WireOutletEndpoint for &$shape {
219            fn select_outlet(self, builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
220                (*self).auto_outlet(builder)
221            }
222        }
223    };
224}
225
226macro_rules! impl_auto_inlet_endpoint {
227    ($($params:ident),*; $shape:ty) => {
228        impl<$($params: 'static),*> WireInletEndpoint for $shape {
229            fn select_inlet(self, builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
230                self.auto_inlet(builder)
231            }
232        }
233
234        impl<$($params: 'static),*> WireInletEndpoint for &$shape {
235            fn select_inlet(self, builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
236                (*self).auto_inlet(builder)
237            }
238        }
239    };
240}
241
242impl_auto_outlet_endpoint!(Out; SourceShape<Out>);
243impl_auto_outlet_endpoint!(In, Out; FlowShape<In, Out>);
244impl_auto_outlet_endpoint!(In, Out; FanInShape<In, Out>);
245impl_auto_outlet_endpoint!(In, Out; FanOutShape<In, Out>);
246impl_auto_outlet_endpoint!(In, Out0, Out1; FanOutShape2<In, Out0, Out1>);
247impl_auto_outlet_endpoint!(Left, Right; ZipShape<Left, Right>);
248
249impl_auto_inlet_endpoint!(In; SinkShape<In>);
250impl_auto_inlet_endpoint!(In, Out; FlowShape<In, Out>);
251impl_auto_inlet_endpoint!(In, Out; FanInShape<In, Out>);
252impl_auto_inlet_endpoint!(In, Out; FanOutShape<In, Out>);
253impl_auto_inlet_endpoint!(In, Out0, Out1; FanOutShape2<In, Out0, Out1>);
254impl_auto_inlet_endpoint!(Left, Right; ZipShape<Left, Right>);
255
256impl<S> WireOutletEndpoint for OutletCursor<S>
257where
258    S: Shape,
259{
260    fn select_outlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
261        select_indexed_outlet(&self.shape.outlets(), self.index)
262    }
263}
264
265impl<S> WireOutletEndpoint for &OutletCursor<S>
266where
267    S: Shape,
268{
269    fn select_outlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
270        select_indexed_outlet(&self.shape.outlets(), self.index)
271    }
272}
273
274impl<T: 'static> WireOutletEndpoint for Outlet<T> {
275    fn select_outlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
276        let port = self.erase();
277        let label = indexed_port_label(port.name(), 0, PortKind::Outlet);
278        Ok(SelectedOutlet { port, label })
279    }
280}
281
282impl<T: 'static> WireOutletEndpoint for &Outlet<T> {
283    fn select_outlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
284        let port = self.erase();
285        let label = indexed_port_label(port.name(), 0, PortKind::Outlet);
286        Ok(SelectedOutlet { port, label })
287    }
288}
289
290impl<S> WireInletEndpoint for InletCursor<S>
291where
292    S: Shape,
293{
294    fn select_inlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
295        select_indexed_inlet(&self.shape.inlets(), self.index)
296    }
297}
298
299impl<S> WireInletEndpoint for &InletCursor<S>
300where
301    S: Shape,
302{
303    fn select_inlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
304        select_indexed_inlet(&self.shape.inlets(), self.index)
305    }
306}
307
308impl<T: 'static> WireInletEndpoint for Inlet<T> {
309    fn select_inlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
310        let port = self.erase();
311        let label = indexed_port_label(port.name(), 0, PortKind::Inlet);
312        Ok(SelectedInlet { port, label })
313    }
314}
315
316impl<T: 'static> WireInletEndpoint for &Inlet<T> {
317    fn select_inlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
318        let port = self.erase();
319        let label = indexed_port_label(port.name(), 0, PortKind::Inlet);
320        Ok(SelectedInlet { port, label })
321    }
322}
323
324fn select_auto_outlet(
325    outlets: &[AnyOutlet],
326    builder: &GraphBuilder,
327) -> StreamResult<SelectedOutlet> {
328    outlets
329        .iter()
330        .enumerate()
331        .find(|(_, outlet)| !builder.is_outlet_connected(outlet))
332        .map(|(index, outlet)| SelectedOutlet {
333            port: outlet.clone(),
334            label: indexed_port_label(outlet.name(), index, PortKind::Outlet),
335        })
336        .ok_or_else(|| {
337            StreamError::GraphValidation(format!(
338                "{}: no unconnected outlet",
339                missing_port_label(
340                    outlets.iter().map(AnyOutlet::name),
341                    outlets.len(),
342                    PortKind::Outlet
343                )
344            ))
345        })
346}
347
348fn select_auto_inlet(inlets: &[AnyInlet], builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
349    inlets
350        .iter()
351        .enumerate()
352        .find(|(_, inlet)| !builder.is_inlet_connected(inlet))
353        .map(|(index, inlet)| SelectedInlet {
354            port: inlet.clone(),
355            label: indexed_port_label(inlet.name(), index, PortKind::Inlet),
356        })
357        .ok_or_else(|| {
358            StreamError::GraphValidation(format!(
359                "{}: no unconnected inlet",
360                missing_port_label(
361                    inlets.iter().map(AnyInlet::name),
362                    inlets.len(),
363                    PortKind::Inlet
364                )
365            ))
366        })
367}
368
369fn select_indexed_outlet(outlets: &[AnyOutlet], index: usize) -> StreamResult<SelectedOutlet> {
370    outlets
371        .get(index)
372        .map(|outlet| SelectedOutlet {
373            port: outlet.clone(),
374            label: indexed_port_label(outlet.name(), index, PortKind::Outlet),
375        })
376        .ok_or_else(|| {
377            StreamError::GraphValidation(format!(
378                "{}: no outlet at cursor index {index}",
379                missing_port_label(outlets.iter().map(AnyOutlet::name), index, PortKind::Outlet)
380            ))
381        })
382}
383
384fn select_indexed_inlet(inlets: &[AnyInlet], index: usize) -> StreamResult<SelectedInlet> {
385    inlets
386        .get(index)
387        .map(|inlet| SelectedInlet {
388            port: inlet.clone(),
389            label: indexed_port_label(inlet.name(), index, PortKind::Inlet),
390        })
391        .ok_or_else(|| {
392            StreamError::GraphValidation(format!(
393                "{}: no inlet at cursor index {index}",
394                missing_port_label(inlets.iter().map(AnyInlet::name), index, PortKind::Inlet)
395            ))
396        })
397}
398
399fn missing_port_label<'a>(
400    mut names: impl Iterator<Item = &'a str>,
401    index: usize,
402    kind: PortKind,
403) -> String {
404    names
405        .next()
406        .map(|name| indexed_port_label(name, index, kind))
407        .unwrap_or_else(|| match kind {
408            PortKind::Inlet => format!("inlet[{index}]"),
409            PortKind::Outlet => format!("outlet[{index}]"),
410        })
411}
412
413fn indexed_port_label(name: &str, index: usize, _kind: PortKind) -> String {
414    let suffix_start = name
415        .char_indices()
416        .rev()
417        .find_map(|(offset, ch)| (!ch.is_ascii_digit()).then_some(offset + ch.len_utf8()))
418        .unwrap_or(0);
419    if suffix_start < name.len() {
420        format!("{}[{index}]", &name[..suffix_start])
421    } else {
422        format!("{name}[{index}]")
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn wire_dsl_matches_connect_for_linear_identity_map_chain() {
432        let connect_graph = GraphDsl::try_create(|builder| {
433            let identity = builder.add(Identity::<u64>::new());
434            let plus_one = builder.add(MapStage::new(|item: u64| item + 1));
435            let times_two = builder.add(MapStage::new(|item: u64| item * 2));
436
437            builder.connect(identity.outlet(), plus_one.inlet())?;
438            builder.connect(plus_one.outlet(), times_two.inlet())?;
439
440            Ok(FlowShape::new(identity.inlet(), times_two.outlet()))
441        })
442        .unwrap();
443
444        let wire_graph = GraphDsl::try_create(|builder| {
445            let identity = builder.add(Identity::<u64>::new());
446            let plus_one = builder.add(MapStage::new(|item: u64| item + 1));
447            let times_two = builder.add(MapStage::new(|item: u64| item * 2));
448
449            builder
450                .wire(identity.to(&plus_one))
451                .wire(plus_one.to(&times_two));
452
453            Ok(FlowShape::new(identity.inlet(), times_two.outlet()))
454        })
455        .unwrap();
456
457        let input = 0_u64..32;
458        assert_eq!(
459            wire_graph.run_with_input(input.clone()).unwrap(),
460            connect_graph.run_with_input(input).unwrap()
461        );
462    }
463
464    #[test]
465    fn wire_dsl_matches_connect_for_broadcast_zip_diamond() {
466        let connect_graph = GraphDsl::try_create(|builder| {
467            let broadcast = builder.add(Broadcast::<i32>::new(2));
468            let zip = builder.add(Zip::<i32, i32>::new());
469
470            builder.connect(broadcast.outlet(0)?, zip.in0())?;
471            builder.connect(broadcast.outlet(1)?, zip.in1())?;
472
473            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
474        })
475        .unwrap();
476
477        let wire_graph = GraphDsl::try_create(|builder| {
478            let broadcast = builder.add(Broadcast::<i32>::new(2));
479            let zip = builder.add(Zip::<i32, i32>::new());
480
481            builder.wire(broadcast.to(&zip)).wire(broadcast.to(&zip));
482
483            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
484        })
485        .unwrap();
486
487        let input = 0..16;
488        assert_eq!(
489            wire_graph.run_with_input(input.clone()).unwrap(),
490            connect_graph.run_with_input(input).unwrap()
491        );
492    }
493
494    #[test]
495    fn wire_dsl_matches_connect_for_balance_merge() {
496        let connect_graph = GraphDsl::try_create(|builder| {
497            let balance = builder.add(Balance::<i32>::new(2));
498            let merge = builder.add(Merge::<i32>::new(2));
499
500            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
501            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
502
503            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
504        })
505        .unwrap();
506
507        let wire_graph = GraphDsl::try_create(|builder| {
508            let balance = builder.add(Balance::<i32>::new(2));
509            let merge = builder.add(Merge::<i32>::new(2));
510
511            builder.wire(balance.to(&merge)).wire(balance.to(&merge));
512
513            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
514        })
515        .unwrap();
516
517        let input = 0..32;
518        assert_eq!(
519            wire_graph.run_with_input(input.clone()).unwrap(),
520            connect_graph.run_with_input(input).unwrap()
521        );
522    }
523
524    #[test]
525    fn wire_dsl_matches_connect_for_partition_merge() {
526        let connect_graph = GraphDsl::try_create(|builder| {
527            let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
528            let merge = builder.add(Merge::<i32>::new(2));
529
530            builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
531            builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
532
533            Ok(FlowShape::new(partition.inlet(), merge.outlet()))
534        })
535        .unwrap();
536
537        let wire_graph = GraphDsl::try_create(|builder| {
538            let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
539            let merge = builder.add(Merge::<i32>::new(2));
540
541            builder
542                .wire(partition.to(&merge))
543                .wire(partition.to(&merge));
544
545            Ok(FlowShape::new(partition.inlet(), merge.outlet()))
546        })
547        .unwrap();
548
549        let input = 0..32;
550        assert_eq!(
551            wire_graph.run_with_input(input.clone()).unwrap(),
552            connect_graph.run_with_input(input).unwrap()
553        );
554    }
555
556    #[test]
557    fn wire_dsl_matches_connect_for_merge_preferred_feedback_cycle() {
558        let connect_graph = GraphDsl::try_create(|builder| {
559            let merge = builder.add(MergePreferred::<u64>::new(1));
560            let broadcast = builder.add(Broadcast::<u64>::new(2));
561            let buffer = builder.add(Buffer::<u64>::new(8, OverflowStrategy::Backpressure));
562            let positive = builder.add(TakeWhile::<u64>::new(|item| *item > 0));
563            let decrement = builder.add(MapStage::new(|item: u64| item - 1));
564
565            builder.connect(merge.outlet(), broadcast.inlet())?;
566            builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
567            builder.connect(buffer.outlet(), positive.inlet())?;
568            builder.connect(positive.outlet(), decrement.inlet())?;
569            builder.connect(decrement.outlet(), merge.preferred())?;
570
571            Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
572        })
573        .unwrap();
574
575        let wire_graph = GraphDsl::try_create(|builder| {
576            let merge = builder.add(MergePreferred::<u64>::new(1));
577            let broadcast = builder.add(Broadcast::<u64>::new(2));
578            let buffer = builder.add(Buffer::<u64>::new(8, OverflowStrategy::Backpressure));
579            let positive = builder.add(TakeWhile::<u64>::new(|item| *item > 0));
580            let decrement = builder.add(MapStage::new(|item: u64| item - 1));
581
582            builder
583                .wire(merge.out(0).to(&broadcast))
584                .wire(broadcast.out(1).to(&buffer))
585                .wire(buffer.to(&positive))
586                .wire(positive.to(&decrement))
587                .wire(decrement.to(&merge.preferred()));
588
589            Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
590        })
591        .unwrap();
592
593        assert_eq!(
594            wire_graph.run_with_input([5]).unwrap(),
595            connect_graph.run_with_input([5]).unwrap()
596        );
597    }
598
599    #[test]
600    fn wire_dsl_advances_auto_ports_and_supports_explicit_cursors() {
601        let auto = GraphDsl::try_create(|builder| {
602            let broadcast = builder.add(Broadcast::<i32>::new(2));
603            let zip = builder.add(Zip::<i32, i32>::new());
604
605            builder.wire(broadcast.to(&zip)).wire(broadcast.to(&zip));
606
607            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
608        })
609        .unwrap();
610        assert_eq!(auto.edge_count(), 2);
611
612        let explicit = GraphDsl::try_create(|builder| {
613            let broadcast = builder.add(Broadcast::<i32>::new(2));
614            let zip = builder.add(Zip::<i32, i32>::new());
615
616            builder
617                .wire(broadcast.out(1).to(&zip.in_(1)))
618                .wire(broadcast.out(0).to(&zip.in_(0)));
619
620            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
621        })
622        .unwrap();
623        assert_eq!(explicit.edge_count(), 2);
624
625        let input = [1, 2, 3];
626        assert_eq!(
627            explicit.run_with_input(input).unwrap(),
628            auto.run_with_input(input).unwrap()
629        );
630    }
631
632    #[test]
633    fn wire_dsl_records_type_mismatch_with_selected_port_context() {
634        let error = GraphDsl::create(|builder| {
635            let first = builder.add(Identity::<u64>::new());
636            let second = builder.add(Identity::<String>::new());
637
638            builder.wire(first.to(&second));
639
640            FlowShape::new(first.inlet(), second.outlet())
641        })
642        .unwrap_err();
643        let message = error.to_string();
644
645        assert!(
646            message.contains("Identity.out[0] -> Identity.in[0]"),
647            "{message}"
648        );
649        assert!(
650            message.contains("cannot connect outlet Identity.out"),
651            "{message}"
652        );
653        assert!(message.contains("to inlet Identity.in"), "{message}");
654    }
655
656    #[test]
657    fn wire_dsl_rejects_duplicate_ports_and_no_open_auto_port() {
658        let duplicate = GraphDsl::create(|builder| {
659            let first = builder.add(Identity::<i32>::new());
660            let second = builder.add(Identity::<i32>::new());
661            let third = builder.add(Identity::<i32>::new());
662
663            builder
664                .wire(first.out(0).to(&second))
665                .wire(first.out(0).to(&third));
666
667            FlowShape::new(first.inlet(), third.outlet())
668        })
669        .unwrap_err()
670        .to_string();
671        assert!(duplicate.contains("Identity.out[0]"), "{duplicate}");
672        assert!(duplicate.contains("already connected"), "{duplicate}");
673
674        let no_open = GraphDsl::create(|builder| {
675            let broadcast = builder.add(Broadcast::<i32>::new(2));
676            let merge = builder.add(Merge::<i32>::new(2));
677
678            builder
679                .wire(broadcast.to(&merge))
680                .wire(broadcast.to(&merge))
681                .wire(broadcast.to(&merge));
682
683            FlowShape::new(broadcast.inlet(), merge.outlet())
684        })
685        .unwrap_err()
686        .to_string();
687        assert!(
688            no_open.contains("Broadcast.out[2]: no unconnected outlet"),
689            "{no_open}"
690        );
691    }
692
693    #[test]
694    fn wire_dsl_defers_wire_errors_but_try_wire_fails_fast() {
695        let deferred = GraphDsl::try_create(|builder| {
696            let first = builder.add(Identity::<i32>::new());
697            let second = builder.add(Identity::<u64>::new());
698
699            builder.wire(first.to(&second));
700
701            Ok(FlowShape::new(first.inlet(), second.outlet()))
702        })
703        .unwrap_err()
704        .to_string();
705        assert!(
706            deferred.contains("Identity.out[0] -> Identity.in[0]"),
707            "{deferred}"
708        );
709
710        let immediate = GraphDsl::try_create(|builder| {
711            let first = builder.add(Identity::<i32>::new());
712            let second = builder.add(Identity::<u64>::new());
713
714            builder.try_wire(first.to(&second))?;
715
716            Ok(FlowShape::new(first.inlet(), second.outlet()))
717        })
718        .unwrap_err()
719        .to_string();
720        assert!(
721            immediate.contains("Identity.out[0] -> Identity.in[0]"),
722            "{immediate}"
723        );
724        assert!(
725            !immediate.contains("result shape"),
726            "try_wire should return before finish aggregation: {immediate}"
727        );
728    }
729}