1use super::*;
13
14pub trait WireSpec {
17 fn apply(self, builder: &mut GraphBuilder) -> StreamResult<()>;
18}
19
20pub trait WireDsl: Shape + Clone {
56 fn to<D>(&self, inlet: D) -> WirePair<Self, D>
57 where
58 Self: AutoOutletEndpoint,
59 {
60 WirePair::new(self.clone(), inlet)
61 }
62
63 #[must_use]
64 fn out(&self, index: usize) -> OutletCursor<Self> {
65 OutletCursor::new(self.clone(), index)
66 }
67
68 #[must_use]
69 fn in_(&self, index: usize) -> InletCursor<Self> {
70 InletCursor::new(self.clone(), index)
71 }
72}
73
74impl<S> WireDsl for S where S: Shape + Clone {}
75
76#[diagnostic::on_unimplemented(
77 message = "`{Self}` cannot be auto-wired as a graph DSL outlet",
78 note = "MergePreferred and Bidi shapes are explicit-only; use `.preferred()` / `.secondary(i)` / explicit `.out(i)` cursors"
79)]
80#[doc(hidden)]
81pub trait AutoOutletEndpoint: Shape {
82 fn auto_outlet(&self, builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
83 select_auto_outlet(&self.outlets(), builder)
84 }
85}
86
87#[diagnostic::on_unimplemented(
88 message = "`{Self}` cannot be auto-wired as a graph DSL inlet",
89 note = "MergePreferred and Bidi shapes are explicit-only; use `.preferred()` / `.secondary(i)` / explicit `.in_(i)` cursors"
90)]
91#[doc(hidden)]
92pub trait AutoInletEndpoint: Shape {
93 fn auto_inlet(&self, builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
94 select_auto_inlet(&self.inlets(), builder)
95 }
96}
97
98impl<Out: 'static> AutoOutletEndpoint for SourceShape<Out> {}
99impl<In: 'static, Out: 'static> AutoOutletEndpoint for FlowShape<In, Out> {}
100impl<In: 'static, Out: 'static> AutoOutletEndpoint for FanInShape<In, Out> {}
101impl<In: 'static, Out: 'static> AutoOutletEndpoint for FanOutShape<In, Out> {}
102impl<In: 'static, Out0: 'static, Out1: 'static> AutoOutletEndpoint
103 for FanOutShape2<In, Out0, Out1>
104{
105}
106impl<Left: 'static, Right: 'static> AutoOutletEndpoint for ZipShape<Left, Right> {}
107
108impl<In: 'static> AutoInletEndpoint for SinkShape<In> {}
109impl<In: 'static, Out: 'static> AutoInletEndpoint for FlowShape<In, Out> {}
110impl<In: 'static, Out: 'static> AutoInletEndpoint for FanInShape<In, Out> {}
111impl<In: 'static, Out: 'static> AutoInletEndpoint for FanOutShape<In, Out> {}
112impl<In: 'static, Out0: 'static, Out1: 'static> AutoInletEndpoint for FanOutShape2<In, Out0, Out1> {}
113impl<Left: 'static, Right: 'static> AutoInletEndpoint for ZipShape<Left, Right> {}
114
115#[derive(Clone, Debug)]
116pub struct OutletCursor<S> {
117 shape: S,
118 index: usize,
119}
120
121impl<S> OutletCursor<S> {
122 #[must_use]
123 pub fn new(shape: S, index: usize) -> Self {
124 Self { shape, index }
125 }
126
127 #[must_use]
128 pub const fn index(&self) -> usize {
129 self.index
130 }
131
132 #[must_use]
133 pub fn to<D>(self, inlet: D) -> WirePair<Self, D> {
134 WirePair::new(self, inlet)
135 }
136}
137
138#[derive(Clone, Debug)]
139pub struct InletCursor<S> {
140 shape: S,
141 index: usize,
142}
143
144impl<S> InletCursor<S> {
145 #[must_use]
146 pub fn new(shape: S, index: usize) -> Self {
147 Self { shape, index }
148 }
149
150 #[must_use]
151 pub const fn index(&self) -> usize {
152 self.index
153 }
154}
155
156#[derive(Clone, Debug)]
157pub struct WirePair<Out, In> {
158 outlet: Out,
159 inlet: In,
160}
161
162impl<Out, In> WirePair<Out, In> {
163 #[must_use]
164 pub fn new(outlet: Out, inlet: In) -> Self {
165 Self { outlet, inlet }
166 }
167}
168
169impl<Out, In> WireSpec for WirePair<Out, In>
170where
171 Out: WireOutletEndpoint,
172 In: WireInletEndpoint,
173{
174 fn apply(self, builder: &mut GraphBuilder) -> StreamResult<()> {
175 let outlet = self.outlet.select_outlet(builder)?;
176 let inlet = self.inlet.select_inlet(builder)?;
177 builder
178 .connect_any_unrecorded(outlet.port.clone(), inlet.port.clone())
179 .map_err(|error| {
180 StreamError::GraphValidation(format!(
181 "{} -> {}: {}",
182 outlet.label, inlet.label, error
183 ))
184 })
185 }
186}
187
188#[doc(hidden)]
189#[derive(Clone, Debug)]
190pub struct SelectedOutlet {
191 port: AnyOutlet,
192 label: String,
193}
194
195#[doc(hidden)]
196#[derive(Clone, Debug)]
197pub struct SelectedInlet {
198 port: AnyInlet,
199 label: String,
200}
201
202trait WireOutletEndpoint {
203 fn select_outlet(self, builder: &GraphBuilder) -> StreamResult<SelectedOutlet>;
204}
205
206trait WireInletEndpoint {
207 fn select_inlet(self, builder: &GraphBuilder) -> StreamResult<SelectedInlet>;
208}
209
210macro_rules! impl_auto_outlet_endpoint {
211 ($($params:ident),*; $shape:ty) => {
212 impl<$($params: 'static),*> WireOutletEndpoint for $shape {
213 fn select_outlet(self, builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
214 self.auto_outlet(builder)
215 }
216 }
217
218 impl<$($params: 'static),*> WireOutletEndpoint for &$shape {
219 fn select_outlet(self, builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
220 (*self).auto_outlet(builder)
221 }
222 }
223 };
224}
225
226macro_rules! impl_auto_inlet_endpoint {
227 ($($params:ident),*; $shape:ty) => {
228 impl<$($params: 'static),*> WireInletEndpoint for $shape {
229 fn select_inlet(self, builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
230 self.auto_inlet(builder)
231 }
232 }
233
234 impl<$($params: 'static),*> WireInletEndpoint for &$shape {
235 fn select_inlet(self, builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
236 (*self).auto_inlet(builder)
237 }
238 }
239 };
240}
241
242impl_auto_outlet_endpoint!(Out; SourceShape<Out>);
243impl_auto_outlet_endpoint!(In, Out; FlowShape<In, Out>);
244impl_auto_outlet_endpoint!(In, Out; FanInShape<In, Out>);
245impl_auto_outlet_endpoint!(In, Out; FanOutShape<In, Out>);
246impl_auto_outlet_endpoint!(In, Out0, Out1; FanOutShape2<In, Out0, Out1>);
247impl_auto_outlet_endpoint!(Left, Right; ZipShape<Left, Right>);
248
249impl_auto_inlet_endpoint!(In; SinkShape<In>);
250impl_auto_inlet_endpoint!(In, Out; FlowShape<In, Out>);
251impl_auto_inlet_endpoint!(In, Out; FanInShape<In, Out>);
252impl_auto_inlet_endpoint!(In, Out; FanOutShape<In, Out>);
253impl_auto_inlet_endpoint!(In, Out0, Out1; FanOutShape2<In, Out0, Out1>);
254impl_auto_inlet_endpoint!(Left, Right; ZipShape<Left, Right>);
255
256impl<S> WireOutletEndpoint for OutletCursor<S>
257where
258 S: Shape,
259{
260 fn select_outlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
261 select_indexed_outlet(&self.shape.outlets(), self.index)
262 }
263}
264
265impl<S> WireOutletEndpoint for &OutletCursor<S>
266where
267 S: Shape,
268{
269 fn select_outlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
270 select_indexed_outlet(&self.shape.outlets(), self.index)
271 }
272}
273
274impl<T: 'static> WireOutletEndpoint for Outlet<T> {
275 fn select_outlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
276 let port = self.erase();
277 let label = indexed_port_label(port.name(), 0, PortKind::Outlet);
278 Ok(SelectedOutlet { port, label })
279 }
280}
281
282impl<T: 'static> WireOutletEndpoint for &Outlet<T> {
283 fn select_outlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedOutlet> {
284 let port = self.erase();
285 let label = indexed_port_label(port.name(), 0, PortKind::Outlet);
286 Ok(SelectedOutlet { port, label })
287 }
288}
289
290impl<S> WireInletEndpoint for InletCursor<S>
291where
292 S: Shape,
293{
294 fn select_inlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
295 select_indexed_inlet(&self.shape.inlets(), self.index)
296 }
297}
298
299impl<S> WireInletEndpoint for &InletCursor<S>
300where
301 S: Shape,
302{
303 fn select_inlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
304 select_indexed_inlet(&self.shape.inlets(), self.index)
305 }
306}
307
308impl<T: 'static> WireInletEndpoint for Inlet<T> {
309 fn select_inlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
310 let port = self.erase();
311 let label = indexed_port_label(port.name(), 0, PortKind::Inlet);
312 Ok(SelectedInlet { port, label })
313 }
314}
315
316impl<T: 'static> WireInletEndpoint for &Inlet<T> {
317 fn select_inlet(self, _builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
318 let port = self.erase();
319 let label = indexed_port_label(port.name(), 0, PortKind::Inlet);
320 Ok(SelectedInlet { port, label })
321 }
322}
323
324fn select_auto_outlet(
325 outlets: &[AnyOutlet],
326 builder: &GraphBuilder,
327) -> StreamResult<SelectedOutlet> {
328 outlets
329 .iter()
330 .enumerate()
331 .find(|(_, outlet)| !builder.is_outlet_connected(outlet))
332 .map(|(index, outlet)| SelectedOutlet {
333 port: outlet.clone(),
334 label: indexed_port_label(outlet.name(), index, PortKind::Outlet),
335 })
336 .ok_or_else(|| {
337 StreamError::GraphValidation(format!(
338 "{}: no unconnected outlet",
339 missing_port_label(
340 outlets.iter().map(AnyOutlet::name),
341 outlets.len(),
342 PortKind::Outlet
343 )
344 ))
345 })
346}
347
348fn select_auto_inlet(inlets: &[AnyInlet], builder: &GraphBuilder) -> StreamResult<SelectedInlet> {
349 inlets
350 .iter()
351 .enumerate()
352 .find(|(_, inlet)| !builder.is_inlet_connected(inlet))
353 .map(|(index, inlet)| SelectedInlet {
354 port: inlet.clone(),
355 label: indexed_port_label(inlet.name(), index, PortKind::Inlet),
356 })
357 .ok_or_else(|| {
358 StreamError::GraphValidation(format!(
359 "{}: no unconnected inlet",
360 missing_port_label(
361 inlets.iter().map(AnyInlet::name),
362 inlets.len(),
363 PortKind::Inlet
364 )
365 ))
366 })
367}
368
369fn select_indexed_outlet(outlets: &[AnyOutlet], index: usize) -> StreamResult<SelectedOutlet> {
370 outlets
371 .get(index)
372 .map(|outlet| SelectedOutlet {
373 port: outlet.clone(),
374 label: indexed_port_label(outlet.name(), index, PortKind::Outlet),
375 })
376 .ok_or_else(|| {
377 StreamError::GraphValidation(format!(
378 "{}: no outlet at cursor index {index}",
379 missing_port_label(outlets.iter().map(AnyOutlet::name), index, PortKind::Outlet)
380 ))
381 })
382}
383
384fn select_indexed_inlet(inlets: &[AnyInlet], index: usize) -> StreamResult<SelectedInlet> {
385 inlets
386 .get(index)
387 .map(|inlet| SelectedInlet {
388 port: inlet.clone(),
389 label: indexed_port_label(inlet.name(), index, PortKind::Inlet),
390 })
391 .ok_or_else(|| {
392 StreamError::GraphValidation(format!(
393 "{}: no inlet at cursor index {index}",
394 missing_port_label(inlets.iter().map(AnyInlet::name), index, PortKind::Inlet)
395 ))
396 })
397}
398
399fn missing_port_label<'a>(
400 mut names: impl Iterator<Item = &'a str>,
401 index: usize,
402 kind: PortKind,
403) -> String {
404 names
405 .next()
406 .map(|name| indexed_port_label(name, index, kind))
407 .unwrap_or_else(|| match kind {
408 PortKind::Inlet => format!("inlet[{index}]"),
409 PortKind::Outlet => format!("outlet[{index}]"),
410 })
411}
412
413fn indexed_port_label(name: &str, index: usize, _kind: PortKind) -> String {
414 let suffix_start = name
415 .char_indices()
416 .rev()
417 .find_map(|(offset, ch)| (!ch.is_ascii_digit()).then_some(offset + ch.len_utf8()))
418 .unwrap_or(0);
419 if suffix_start < name.len() {
420 format!("{}[{index}]", &name[..suffix_start])
421 } else {
422 format!("{name}[{index}]")
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 #[test]
431 fn wire_dsl_matches_connect_for_linear_identity_map_chain() {
432 let connect_graph = GraphDsl::try_create(|builder| {
433 let identity = builder.add(Identity::<u64>::new());
434 let plus_one = builder.add(MapStage::new(|item: u64| item + 1));
435 let times_two = builder.add(MapStage::new(|item: u64| item * 2));
436
437 builder.connect(identity.outlet(), plus_one.inlet())?;
438 builder.connect(plus_one.outlet(), times_two.inlet())?;
439
440 Ok(FlowShape::new(identity.inlet(), times_two.outlet()))
441 })
442 .unwrap();
443
444 let wire_graph = GraphDsl::try_create(|builder| {
445 let identity = builder.add(Identity::<u64>::new());
446 let plus_one = builder.add(MapStage::new(|item: u64| item + 1));
447 let times_two = builder.add(MapStage::new(|item: u64| item * 2));
448
449 builder
450 .wire(identity.to(&plus_one))
451 .wire(plus_one.to(×_two));
452
453 Ok(FlowShape::new(identity.inlet(), times_two.outlet()))
454 })
455 .unwrap();
456
457 let input = 0_u64..32;
458 assert_eq!(
459 wire_graph.run_with_input(input.clone()).unwrap(),
460 connect_graph.run_with_input(input).unwrap()
461 );
462 }
463
464 #[test]
465 fn wire_dsl_matches_connect_for_broadcast_zip_diamond() {
466 let connect_graph = GraphDsl::try_create(|builder| {
467 let broadcast = builder.add(Broadcast::<i32>::new(2));
468 let zip = builder.add(Zip::<i32, i32>::new());
469
470 builder.connect(broadcast.outlet(0)?, zip.in0())?;
471 builder.connect(broadcast.outlet(1)?, zip.in1())?;
472
473 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
474 })
475 .unwrap();
476
477 let wire_graph = GraphDsl::try_create(|builder| {
478 let broadcast = builder.add(Broadcast::<i32>::new(2));
479 let zip = builder.add(Zip::<i32, i32>::new());
480
481 builder.wire(broadcast.to(&zip)).wire(broadcast.to(&zip));
482
483 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
484 })
485 .unwrap();
486
487 let input = 0..16;
488 assert_eq!(
489 wire_graph.run_with_input(input.clone()).unwrap(),
490 connect_graph.run_with_input(input).unwrap()
491 );
492 }
493
494 #[test]
495 fn wire_dsl_matches_connect_for_balance_merge() {
496 let connect_graph = GraphDsl::try_create(|builder| {
497 let balance = builder.add(Balance::<i32>::new(2));
498 let merge = builder.add(Merge::<i32>::new(2));
499
500 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
501 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
502
503 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
504 })
505 .unwrap();
506
507 let wire_graph = GraphDsl::try_create(|builder| {
508 let balance = builder.add(Balance::<i32>::new(2));
509 let merge = builder.add(Merge::<i32>::new(2));
510
511 builder.wire(balance.to(&merge)).wire(balance.to(&merge));
512
513 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
514 })
515 .unwrap();
516
517 let input = 0..32;
518 assert_eq!(
519 wire_graph.run_with_input(input.clone()).unwrap(),
520 connect_graph.run_with_input(input).unwrap()
521 );
522 }
523
524 #[test]
525 fn wire_dsl_matches_connect_for_partition_merge() {
526 let connect_graph = GraphDsl::try_create(|builder| {
527 let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
528 let merge = builder.add(Merge::<i32>::new(2));
529
530 builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
531 builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
532
533 Ok(FlowShape::new(partition.inlet(), merge.outlet()))
534 })
535 .unwrap();
536
537 let wire_graph = GraphDsl::try_create(|builder| {
538 let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
539 let merge = builder.add(Merge::<i32>::new(2));
540
541 builder
542 .wire(partition.to(&merge))
543 .wire(partition.to(&merge));
544
545 Ok(FlowShape::new(partition.inlet(), merge.outlet()))
546 })
547 .unwrap();
548
549 let input = 0..32;
550 assert_eq!(
551 wire_graph.run_with_input(input.clone()).unwrap(),
552 connect_graph.run_with_input(input).unwrap()
553 );
554 }
555
556 #[test]
557 fn wire_dsl_matches_connect_for_merge_preferred_feedback_cycle() {
558 let connect_graph = GraphDsl::try_create(|builder| {
559 let merge = builder.add(MergePreferred::<u64>::new(1));
560 let broadcast = builder.add(Broadcast::<u64>::new(2));
561 let buffer = builder.add(Buffer::<u64>::new(8, OverflowStrategy::Backpressure));
562 let positive = builder.add(TakeWhile::<u64>::new(|item| *item > 0));
563 let decrement = builder.add(MapStage::new(|item: u64| item - 1));
564
565 builder.connect(merge.outlet(), broadcast.inlet())?;
566 builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
567 builder.connect(buffer.outlet(), positive.inlet())?;
568 builder.connect(positive.outlet(), decrement.inlet())?;
569 builder.connect(decrement.outlet(), merge.preferred())?;
570
571 Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
572 })
573 .unwrap();
574
575 let wire_graph = GraphDsl::try_create(|builder| {
576 let merge = builder.add(MergePreferred::<u64>::new(1));
577 let broadcast = builder.add(Broadcast::<u64>::new(2));
578 let buffer = builder.add(Buffer::<u64>::new(8, OverflowStrategy::Backpressure));
579 let positive = builder.add(TakeWhile::<u64>::new(|item| *item > 0));
580 let decrement = builder.add(MapStage::new(|item: u64| item - 1));
581
582 builder
583 .wire(merge.out(0).to(&broadcast))
584 .wire(broadcast.out(1).to(&buffer))
585 .wire(buffer.to(&positive))
586 .wire(positive.to(&decrement))
587 .wire(decrement.to(&merge.preferred()));
588
589 Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
590 })
591 .unwrap();
592
593 assert_eq!(
594 wire_graph.run_with_input([5]).unwrap(),
595 connect_graph.run_with_input([5]).unwrap()
596 );
597 }
598
599 #[test]
600 fn wire_dsl_advances_auto_ports_and_supports_explicit_cursors() {
601 let auto = GraphDsl::try_create(|builder| {
602 let broadcast = builder.add(Broadcast::<i32>::new(2));
603 let zip = builder.add(Zip::<i32, i32>::new());
604
605 builder.wire(broadcast.to(&zip)).wire(broadcast.to(&zip));
606
607 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
608 })
609 .unwrap();
610 assert_eq!(auto.edge_count(), 2);
611
612 let explicit = GraphDsl::try_create(|builder| {
613 let broadcast = builder.add(Broadcast::<i32>::new(2));
614 let zip = builder.add(Zip::<i32, i32>::new());
615
616 builder
617 .wire(broadcast.out(1).to(&zip.in_(1)))
618 .wire(broadcast.out(0).to(&zip.in_(0)));
619
620 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
621 })
622 .unwrap();
623 assert_eq!(explicit.edge_count(), 2);
624
625 let input = [1, 2, 3];
626 assert_eq!(
627 explicit.run_with_input(input).unwrap(),
628 auto.run_with_input(input).unwrap()
629 );
630 }
631
632 #[test]
633 fn wire_dsl_records_type_mismatch_with_selected_port_context() {
634 let error = GraphDsl::create(|builder| {
635 let first = builder.add(Identity::<u64>::new());
636 let second = builder.add(Identity::<String>::new());
637
638 builder.wire(first.to(&second));
639
640 FlowShape::new(first.inlet(), second.outlet())
641 })
642 .unwrap_err();
643 let message = error.to_string();
644
645 assert!(
646 message.contains("Identity.out[0] -> Identity.in[0]"),
647 "{message}"
648 );
649 assert!(
650 message.contains("cannot connect outlet Identity.out"),
651 "{message}"
652 );
653 assert!(message.contains("to inlet Identity.in"), "{message}");
654 }
655
656 #[test]
657 fn wire_dsl_rejects_duplicate_ports_and_no_open_auto_port() {
658 let duplicate = GraphDsl::create(|builder| {
659 let first = builder.add(Identity::<i32>::new());
660 let second = builder.add(Identity::<i32>::new());
661 let third = builder.add(Identity::<i32>::new());
662
663 builder
664 .wire(first.out(0).to(&second))
665 .wire(first.out(0).to(&third));
666
667 FlowShape::new(first.inlet(), third.outlet())
668 })
669 .unwrap_err()
670 .to_string();
671 assert!(duplicate.contains("Identity.out[0]"), "{duplicate}");
672 assert!(duplicate.contains("already connected"), "{duplicate}");
673
674 let no_open = GraphDsl::create(|builder| {
675 let broadcast = builder.add(Broadcast::<i32>::new(2));
676 let merge = builder.add(Merge::<i32>::new(2));
677
678 builder
679 .wire(broadcast.to(&merge))
680 .wire(broadcast.to(&merge))
681 .wire(broadcast.to(&merge));
682
683 FlowShape::new(broadcast.inlet(), merge.outlet())
684 })
685 .unwrap_err()
686 .to_string();
687 assert!(
688 no_open.contains("Broadcast.out[2]: no unconnected outlet"),
689 "{no_open}"
690 );
691 }
692
693 #[test]
694 fn wire_dsl_defers_wire_errors_but_try_wire_fails_fast() {
695 let deferred = GraphDsl::try_create(|builder| {
696 let first = builder.add(Identity::<i32>::new());
697 let second = builder.add(Identity::<u64>::new());
698
699 builder.wire(first.to(&second));
700
701 Ok(FlowShape::new(first.inlet(), second.outlet()))
702 })
703 .unwrap_err()
704 .to_string();
705 assert!(
706 deferred.contains("Identity.out[0] -> Identity.in[0]"),
707 "{deferred}"
708 );
709
710 let immediate = GraphDsl::try_create(|builder| {
711 let first = builder.add(Identity::<i32>::new());
712 let second = builder.add(Identity::<u64>::new());
713
714 builder.try_wire(first.to(&second))?;
715
716 Ok(FlowShape::new(first.inlet(), second.outlet()))
717 })
718 .unwrap_err()
719 .to_string();
720 assert!(
721 immediate.contains("Identity.out[0] -> Identity.in[0]"),
722 "{immediate}"
723 );
724 assert!(
725 !immediate.contains("result shape"),
726 "try_wire should return before finish aggregation: {immediate}"
727 );
728 }
729}