1use std::{
2    fmt::Debug,
3    ops::{ControlFlow, Deref, DerefMut},
4};
5
6use daggy::{
7    petgraph::{graph::NodeReferences, visit::Topo},
8    Dag, NodeWeightsMut, Walker,
9};
10use fixedbitset::FixedBitSet;
11
12use crate::{Edge, FnId, FnIdInner, Rank};
13
14#[cfg(feature = "async")]
15use daggy::NodeIndex;
16#[cfg(feature = "async")]
17use futures::future::LocalBoxFuture;
18#[cfg(feature = "async")]
19use futures::{
20    future::Future,
21    stream::{self, Stream, StreamExt, TryStreamExt},
22    task::Poll,
23};
24#[cfg(feature = "async")]
25use tokio::sync::{
26    mpsc::{self, error::SendError, Receiver, Sender},
27    RwLock,
28};
29
30#[cfg(feature = "async")]
31use crate::{
32    EdgeCounts, FnRef, FnWrapper, FnWrapperMut, StreamOpts, StreamOrder, StreamOutcome,
33    StreamOutcomeState,
34};
35#[cfg(all(feature = "async", feature = "interruptible"))]
36use interruptible::{InterruptibilityState, InterruptibleStreamExt, PollOutcome};
37
38#[derive(Clone, Debug)]
48pub struct FnGraph<F> {
49    pub graph: Dag<F, Edge, FnIdInner>,
51    pub(crate) graph_structure: Dag<(), Edge, FnIdInner>,
53    pub(crate) graph_structure_rev: Dag<(), Edge, FnIdInner>,
55    pub(crate) ranks: Vec<Rank>,
60    #[cfg(feature = "async")]
62    pub(crate) edge_counts: EdgeCounts,
63}
64
65impl<F> FnGraph<F> {
66    pub fn new() -> Self {
68        Self::default()
69    }
70
71    pub fn ranks(&self) -> &[Rank] {
73        &self.ranks
74    }
75
76    pub fn toposort(&self) -> Topo<FnId, FixedBitSet> {
81        Topo::new(&self.graph_structure)
82    }
83
84    pub fn iter(&self) -> impl Iterator<Item = &F> {
92        Topo::new(&self.graph_structure)
93            .iter(&self.graph_structure)
94            .map(|fn_id| &self.graph[fn_id])
95    }
96
97    pub fn iter_rev(&self) -> impl Iterator<Item = &F> {
108        Topo::new(&self.graph_structure_rev)
109            .iter(&self.graph_structure_rev)
110            .map(|fn_id| &self.graph[fn_id])
111    }
112
113    #[cfg(feature = "async")]
118    pub fn stream(&self) -> impl Stream<Item = FnRef<'_, F>> + '_ {
119        self.stream_internal(StreamOrder::Forward)
120    }
121
122    #[cfg(all(feature = "async", feature = "interruptible"))]
127    pub fn stream_interruptible(&self) -> impl Stream<Item = PollOutcome<FnRef<'_, F>>> + '_ {
128        self.stream_with_interruptible(StreamOpts::default())
129    }
130
131    #[cfg(feature = "async")]
139    pub fn stream_with<'rx>(
140        &'rx self,
141        opts: StreamOpts<'rx, 'rx>,
142    ) -> impl Stream<Item = FnRef<'rx, F>> + 'rx {
143        let StreamOpts {
144            stream_order,
145            #[cfg(feature = "interruptible")]
146                interruptibility_state: _,
147            #[cfg(feature = "interruptible")]
148                interrupted_next_item_include: _,
149            marker: _,
150        } = opts;
151
152        self.stream_internal(stream_order)
153    }
154
155    #[cfg(all(feature = "async", feature = "interruptible"))]
167    pub fn stream_with_interruptible<'rx>(
168        &'rx self,
169        opts: StreamOpts<'rx, 'rx>,
170    ) -> impl Stream<Item = PollOutcome<FnRef<'rx, F>>> + 'rx {
171        let StreamOpts {
172            stream_order,
173            interruptibility_state,
174            interrupted_next_item_include: _,
175            marker: _,
176        } = opts;
177
178        self.stream_internal(stream_order)
179            .interruptible_with(interruptibility_state)
180    }
181
182    #[cfg(feature = "async")]
183    fn stream_internal(&self, stream_order: StreamOrder) -> impl Stream<Item = FnRef<'_, F>> + '_ {
184        let FnGraph {
185            graph,
186            graph_structure,
187            graph_structure_rev,
188            ranks: _,
189            edge_counts,
190        } = self;
191
192        let StreamSetupInit {
193            graph_structure,
194            mut predecessor_counts,
195            fn_ready_tx,
196            mut fn_ready_rx,
197            fn_done_tx,
198            mut fn_done_rx,
199        } = stream_setup_init(
200            graph_structure,
201            graph_structure_rev,
202            edge_counts,
203            stream_order,
204        );
205
206        let mut fns_remaining = graph_structure.node_count();
207        let mut fn_ready_tx = Some(fn_ready_tx);
208        let mut fn_done_tx = Some(fn_done_tx);
209
210        if fns_remaining == 0 {
211            fn_done_tx.take();
212            fn_ready_tx.take();
213        }
214
215        stream::poll_fn(move |context| {
216            match fn_done_rx.poll_recv(context) {
217                Poll::Pending => {}
218                Poll::Ready(None) => {}
219                Poll::Ready(Some(fn_id)) => graph_structure
220                    .children(fn_id)
221                    .iter(graph_structure)
222                    .for_each(|(_edge_id, child_fn_id)| {
223                        predecessor_counts[child_fn_id.index()] -= 1;
224                        if predecessor_counts[child_fn_id.index()] == 0 {
225                            if let Some(fn_ready_tx) = fn_ready_tx.as_ref() {
226                                let _ = fn_ready_tx.try_send(child_fn_id);
229                            }
230                        }
231                    }),
232            }
233
234            let poll = if let Some(fn_done_tx) = fn_done_tx.as_ref() {
235                fn_ready_rx.poll_recv(context).map(|fn_id| {
236                    fn_id.map(|fn_id| {
237                        let r#fn = &graph[fn_id];
238                        FnRef {
239                            fn_id,
240                            r#fn,
241                            fn_done_tx: fn_done_tx.clone(),
242                        }
243                    })
244                })
245            } else {
246                Poll::Ready(None)
247            };
248
249            if let Poll::Ready(Some(..)) = &poll {
251                fns_remaining -= 1;
252
253                if fns_remaining == 0 {
254                    fn_done_tx.take();
255                    fn_ready_tx.take();
256                }
257            }
258
259            poll
260        })
261    }
262
263    #[cfg(feature = "async")]
268    pub async fn fold_async<'f, Seed, FnFold>(
269        &'f self,
270        seed: Seed,
271        fn_fold: FnFold,
272    ) -> StreamOutcome<Seed>
273    where
274        for<'iter> FnFold: Fn(Seed, FnWrapper<'iter, 'f, F>) -> LocalBoxFuture<'iter, Seed>,
275        F: 'f,
276    {
277        self.fold_async_internal(seed, StreamOpts::default(), fn_fold)
278            .await
279    }
280
281    #[cfg(feature = "async")]
286    pub async fn fold_async_with<'f, Seed, FnFold>(
287        &'f self,
288        seed: Seed,
289        opts: StreamOpts<'_, '_>,
290        fn_fold: FnFold,
291    ) -> StreamOutcome<Seed>
292    where
293        for<'iter> FnFold: Fn(Seed, FnWrapper<'iter, 'f, F>) -> LocalBoxFuture<'iter, Seed>,
294        F: 'f,
295    {
296        self.fold_async_internal(seed, opts, fn_fold).await
297    }
298
299    #[cfg(feature = "async")]
300    async fn fold_async_internal<'f, Seed, FnFold>(
301        &'f self,
302        seed: Seed,
303        opts: StreamOpts<'_, '_>,
304        fn_fold: FnFold,
305    ) -> StreamOutcome<Seed>
306    where
307        for<'iter> FnFold: Fn(Seed, FnWrapper<'iter, 'f, F>) -> LocalBoxFuture<'iter, Seed>,
308        F: 'f,
309    {
310        let FnGraph {
311            graph,
312            graph_structure,
313            graph_structure_rev,
314            ranks: _,
315            edge_counts,
316        } = self;
317
318        let StreamOpts {
319            stream_order,
320            #[cfg(feature = "interruptible")]
321            interruptibility_state,
322            #[cfg(feature = "interruptible")]
323            interrupted_next_item_include,
324            marker: _,
325        } = opts;
326
327        let StreamSetupInit {
328            graph_structure,
329            predecessor_counts,
330            fn_ready_tx,
331            fn_ready_rx,
332            fn_done_tx,
333            fn_done_rx,
334        } = stream_setup_init(
335            graph_structure,
336            graph_structure_rev,
337            edge_counts,
338            stream_order,
339        );
340
341        let queuer = fn_ready_queuer(graph_structure, predecessor_counts, fn_done_rx, fn_ready_tx);
342
343        let fns_remaining = graph_structure.node_count();
344        let mut fn_done_tx = Some(fn_done_tx);
345        if fns_remaining == 0 {
346            fn_done_tx.take();
347        }
348        let fold_stream_state = FoldStreamState {
349            graph,
350            fns_remaining,
351            fn_done_tx,
352            seed,
353            fn_fold,
354        };
355        let scheduler = async move {
356            let mut fn_ids_processed = Vec::with_capacity(graph_structure.node_count());
357            let fold_stream_state = poll_and_track_fn_ready(
358                fn_ready_rx,
359                &mut fn_ids_processed,
360                #[cfg(feature = "interruptible")]
361                interruptibility_state,
362                #[cfg(feature = "interruptible")]
363                interrupted_next_item_include,
364            )
365            .fold(
366                fold_stream_state,
367                |fold_stream_state,
368                 #[cfg(not(feature = "interruptible"))] fn_id,
369                 #[cfg(feature = "interruptible")] fn_id_poll_outcome| async move {
370                    #[cfg(not(feature = "interruptible"))]
371                    let fn_id = Some(fn_id);
372                    #[cfg(feature = "interruptible")]
373                    let (fn_id, interrupted) = fn_id_from_interrupt(fn_id_poll_outcome);
374
375                    let FoldStreamState {
376                        graph,
377                        mut fns_remaining,
378                        mut fn_done_tx,
379                        mut seed,
380                        fn_fold,
381                    } = fold_stream_state;
382
383                    if let Some(fn_id) = fn_id {
384                        let r#fn = &graph[fn_id];
385                        seed = fn_fold(seed, FnWrapper::new(r#fn)).await;
386                        if let Some(fn_done_tx) = fn_done_tx.as_ref() {
387                            fn_done_send(fn_done_tx, fn_id).await;
388                        }
389
390                        fns_remaining -= 1;
392                    }
393
394                    if fns_remaining == 0 {
395                        fn_done_tx.take();
396                    }
397
398                    #[cfg(feature = "interruptible")]
399                    if interrupted {
400                        fn_done_tx.take();
401                    }
402
403                    FoldStreamState {
404                        graph,
405                        fns_remaining,
406                        fn_done_tx,
407                        seed,
408                        fn_fold,
409                    }
410                },
411            )
412            .await;
413
414            let FoldStreamState {
415                graph: _,
416                fns_remaining,
417                fn_done_tx: _,
418                seed,
419                fn_fold: _,
420            } = fold_stream_state;
421
422            let stream_outcome_state = stream_outcome_state_after_stream(fns_remaining);
423            StreamOutcome::new(
424                graph_structure,
425                seed,
426                stream_outcome_state,
427                fn_ids_processed,
428            )
429        };
430
431        let ((), stream_outcome) = futures::join!(queuer, scheduler);
432
433        stream_outcome
434    }
435
436    #[cfg(feature = "async")]
441    pub async fn fold_async_mut<'f, Seed, FnFold>(
442        &'f mut self,
443        seed: Seed,
444        fn_fold: FnFold,
445    ) -> StreamOutcome<Seed>
446    where
447        for<'iter> FnFold: FnMut(Seed, FnWrapperMut<'iter, 'f, F>) -> LocalBoxFuture<'iter, Seed>,
448        F: 'f,
449    {
450        self.fold_async_mut_internal(seed, StreamOpts::default(), fn_fold)
451            .await
452    }
453
454    #[cfg(feature = "async")]
459    pub async fn fold_async_mut_with<'f, Seed, FnFold>(
460        &'f mut self,
461        seed: Seed,
462        opts: StreamOpts<'_, '_>,
463        fn_fold: FnFold,
464    ) -> StreamOutcome<Seed>
465    where
466        for<'iter> FnFold: FnMut(Seed, FnWrapperMut<'iter, 'f, F>) -> LocalBoxFuture<'iter, Seed>,
467        F: 'f,
468    {
469        self.fold_async_mut_internal(seed, opts, fn_fold).await
470    }
471
472    #[cfg(feature = "async")]
473    async fn fold_async_mut_internal<'f, Seed, FnFold>(
474        &'f mut self,
475        seed: Seed,
476        opts: StreamOpts<'_, '_>,
477        fn_fold: FnFold,
478    ) -> StreamOutcome<Seed>
479    where
480        for<'iter> FnFold: FnMut(Seed, FnWrapperMut<'iter, 'f, F>) -> LocalBoxFuture<'iter, Seed>,
481        F: 'f,
482    {
483        let &mut FnGraph {
484            ref mut graph,
485            ref graph_structure,
486            ref graph_structure_rev,
487            ranks: _,
488            ref edge_counts,
489        } = self;
490
491        let StreamOpts {
492            stream_order,
493            #[cfg(feature = "interruptible")]
494            interruptibility_state,
495            #[cfg(feature = "interruptible")]
496            interrupted_next_item_include,
497            marker: _,
498        } = opts;
499
500        let StreamSetupInit {
501            graph_structure,
502            predecessor_counts,
503            fn_ready_tx,
504            fn_ready_rx,
505            fn_done_tx,
506            fn_done_rx,
507        } = stream_setup_init(
508            graph_structure,
509            graph_structure_rev,
510            edge_counts,
511            stream_order,
512        );
513
514        let queuer = fn_ready_queuer(graph_structure, predecessor_counts, fn_done_rx, fn_ready_tx);
515
516        let fns_remaining = graph_structure.node_count();
517        let mut fn_done_tx = Some(fn_done_tx);
518        if fns_remaining == 0 {
519            fn_done_tx.take();
520        }
521        let fold_stream_state = FoldStreamStateMut {
522            graph,
523            fns_remaining,
524            fn_done_tx,
525            seed,
526            fn_fold,
527        };
528        let scheduler = async move {
529            let mut fn_ids_processed = Vec::with_capacity(graph_structure.node_count());
530            let fold_stream_state = poll_and_track_fn_ready(
531                fn_ready_rx,
532                &mut fn_ids_processed,
533                #[cfg(feature = "interruptible")]
534                interruptibility_state,
535                #[cfg(feature = "interruptible")]
536                interrupted_next_item_include,
537            )
538            .fold(
539                fold_stream_state,
540                |fold_stream_state,
541                 #[cfg(not(feature = "interruptible"))] fn_id,
542                 #[cfg(feature = "interruptible")] fn_id_poll_outcome| async move {
543                    #[cfg(not(feature = "interruptible"))]
544                    let fn_id = Some(fn_id);
545                    #[cfg(feature = "interruptible")]
546                    let (fn_id, interrupted) = fn_id_from_interrupt(fn_id_poll_outcome);
547
548                    let FoldStreamStateMut {
549                        graph,
550                        mut fns_remaining,
551                        mut fn_done_tx,
552                        mut seed,
553                        mut fn_fold,
554                    } = fold_stream_state;
555
556                    if let Some(fn_id) = fn_id {
557                        let r#fn = &mut graph[fn_id];
558                        seed = fn_fold(seed, FnWrapperMut::new(r#fn)).await;
559                        if let Some(fn_done_tx) = fn_done_tx.as_ref() {
560                            fn_done_send(fn_done_tx, fn_id).await;
561                        }
562
563                        fns_remaining -= 1;
565                    }
566
567                    if fns_remaining == 0 {
568                        fn_done_tx.take();
569                    }
570
571                    #[cfg(feature = "interruptible")]
572                    if interrupted {
573                        fn_done_tx.take();
574                    }
575
576                    FoldStreamStateMut {
577                        graph,
578                        fns_remaining,
579                        fn_done_tx,
580                        seed,
581                        fn_fold,
582                    }
583                },
584            )
585            .await;
586
587            let FoldStreamStateMut {
588                graph: _,
589                fns_remaining,
590                fn_done_tx: _,
591                seed,
592                fn_fold: _,
593            } = fold_stream_state;
594
595            let stream_outcome_state = stream_outcome_state_after_stream(fns_remaining);
596            StreamOutcome::new(
597                graph_structure,
598                seed,
599                stream_outcome_state,
600                fn_ids_processed,
601            )
602        };
603
604        let ((), stream_outcome) = futures::join!(queuer, scheduler);
605
606        stream_outcome
607    }
608
609    #[cfg(feature = "async")]
621    pub async fn for_each_concurrent<'f, FnForEach, Fut>(
622        &'f self,
623        limit: impl Into<Option<usize>>,
624        fn_for_each: FnForEach,
625    ) -> StreamOutcome<()>
626    where
627        FnForEach: Fn(&'f F) -> Fut,
628        Fut: Future<Output = ()> + 'f,
629        F: 'f,
630    {
631        self.for_each_concurrent_internal(limit, StreamOpts::default(), fn_for_each)
632            .await
633    }
634
635    #[cfg(feature = "async")]
647    pub async fn for_each_concurrent_with<'f, FnForEach, Fut>(
648        &'f self,
649        limit: impl Into<Option<usize>>,
650        opts: StreamOpts<'f, 'f>,
651        fn_for_each: FnForEach,
652    ) -> StreamOutcome<()>
653    where
654        FnForEach: Fn(&'f F) -> Fut,
655        Fut: Future<Output = ()> + 'f,
656        F: 'f,
657    {
658        self.for_each_concurrent_internal(limit, opts, fn_for_each)
659            .await
660    }
661
662    #[cfg(feature = "async")]
664    async fn for_each_concurrent_internal<'f, FnForEach, Fut>(
665        &'f self,
666        limit: impl Into<Option<usize>>,
667        opts: StreamOpts<'f, 'f>,
668        fn_for_each: FnForEach,
669    ) -> StreamOutcome<()>
670    where
671        FnForEach: Fn(&'f F) -> Fut,
672        Fut: Future<Output = ()> + 'f,
673        F: 'f,
674    {
675        let FnGraph {
676            graph,
677            graph_structure,
678            graph_structure_rev,
679            ranks: _,
680            edge_counts,
681        } = self;
682
683        let StreamSetupInitConcurrent {
684            graph_structure,
685            fn_ready_rx,
686            queuer,
687            fn_done_tx,
688            fns_remaining,
689        } = stream_setup_init_concurrent(graph_structure, graph_structure_rev, edge_counts, &opts);
690
691        let StreamOpts {
692            stream_order: _,
693            #[cfg(feature = "interruptible")]
694            interruptibility_state,
695            #[cfg(feature = "interruptible")]
696            interrupted_next_item_include,
697            marker: _,
698        } = opts;
699
700        let fn_done_tx = &fn_done_tx;
701        let fn_for_each = &fn_for_each;
702        let fns_remaining = &fns_remaining;
703        let fn_refs = graph;
704
705        if graph_structure.node_count() == 0 {
706            fn_done_tx.write().await.take();
707        }
708        let scheduler = async move {
709            let mut fn_ids_processed = Vec::with_capacity(graph_structure.node_count());
710            poll_and_track_fn_ready(
711                fn_ready_rx,
712                &mut fn_ids_processed,
713                #[cfg(feature = "interruptible")]
714                interruptibility_state,
715                #[cfg(feature = "interruptible")]
716                interrupted_next_item_include,
717            )
718            .for_each_concurrent(
719                limit,
720                |#[cfg(not(feature = "interruptible"))] fn_id,
721                 #[cfg(feature = "interruptible")] fn_id_poll_outcome| async move {
722                    #[cfg(not(feature = "interruptible"))]
723                    let fn_id = Some(fn_id);
724                    #[cfg(feature = "interruptible")]
725                    let (fn_id, interrupted) = fn_id_from_interrupt(fn_id_poll_outcome);
726
727                    if let Some(fn_id) = fn_id {
728                        let r#fn = fn_refs.node_weight(fn_id).expect("Expected to borrow fn.");
729                        fn_for_each(r#fn).await;
730                        fn_done_send_locked(fn_done_tx, fn_id).await;
731                        fns_remaining_decrement(fns_remaining, fn_done_tx).await;
732                    }
733
734                    #[cfg(feature = "interruptible")]
735                    fn_done_tx_drop_if_interrupted(fn_done_tx, interrupted).await;
736                },
737            )
738            .await;
739
740            let stream_outcome_state =
741                stream_outcome_state_after_stream(*fns_remaining.read().await);
742            StreamOutcome::new(graph_structure, (), stream_outcome_state, fn_ids_processed)
743        };
744
745        let ((), stream_outcome) = futures::join!(queuer, scheduler);
746        stream_outcome
747    }
748
749    #[cfg(feature = "async")]
761    pub async fn for_each_concurrent_mut<FnForEach, Fut>(
762        &mut self,
763        limit: impl Into<Option<usize>>,
764        fn_for_each: FnForEach,
765    ) -> StreamOutcome<()>
766    where
767        FnForEach: Fn(&mut F) -> Fut,
768        Fut: Future<Output = ()>,
769    {
770        self.for_each_concurrent_mut_internal(limit, StreamOpts::default(), fn_for_each)
771            .await
772    }
773
774    #[cfg(feature = "async")]
786    pub async fn for_each_concurrent_mut_with<'f, FnForEach, Fut>(
787        &mut self,
788        limit: impl Into<Option<usize>>,
789        opts: StreamOpts<'f, 'f>,
790        fn_for_each: FnForEach,
791    ) -> StreamOutcome<()>
792    where
793        FnForEach: Fn(&mut F) -> Fut,
794        Fut: Future<Output = ()>,
795    {
796        self.for_each_concurrent_mut_internal(limit, opts, fn_for_each)
797            .await
798    }
799
800    #[cfg(feature = "async")]
802    async fn for_each_concurrent_mut_internal<'f, FnForEach, Fut>(
803        &mut self,
804        limit: impl Into<Option<usize>>,
805        opts: StreamOpts<'f, 'f>,
806        fn_for_each: FnForEach,
807    ) -> StreamOutcome<()>
808    where
809        FnForEach: Fn(&mut F) -> Fut,
810        Fut: Future<Output = ()>,
811    {
812        let &mut FnGraph {
813            ref mut graph,
814            ref graph_structure,
815            ref graph_structure_rev,
816            ranks: _,
817            ref edge_counts,
818        } = self;
819
820        let StreamSetupInitConcurrent {
821            graph_structure,
822            fn_ready_rx,
823            queuer,
824            fn_done_tx,
825            fns_remaining,
826        } = stream_setup_init_concurrent(graph_structure, graph_structure_rev, edge_counts, &opts);
827
828        let StreamOpts {
829            stream_order: _,
830            #[cfg(feature = "interruptible")]
831            interruptibility_state,
832            #[cfg(feature = "interruptible")]
833            interrupted_next_item_include,
834            marker: _,
835        } = opts;
836
837        let fn_done_tx = &fn_done_tx;
838        let fn_for_each = &fn_for_each;
839        let fns_remaining = &fns_remaining;
840        let fn_mut_refs = graph
841            .node_weights_mut()
842            .map(RwLock::new)
843            .collect::<Vec<_>>();
844        let fn_mut_refs = &fn_mut_refs;
845
846        if graph_structure.node_count() == 0 {
847            fn_done_tx.write().await.take();
848        }
849        let scheduler = async move {
850            let mut fn_ids_processed = Vec::with_capacity(graph_structure.node_count());
851            poll_and_track_fn_ready(
852                fn_ready_rx,
853                &mut fn_ids_processed,
854                #[cfg(feature = "interruptible")]
855                interruptibility_state,
856                #[cfg(feature = "interruptible")]
857                interrupted_next_item_include,
858            )
859            .for_each_concurrent(
860                limit,
861                |#[cfg(not(feature = "interruptible"))] fn_id,
862                 #[cfg(feature = "interruptible")] fn_id_poll_outcome| async move {
863                    #[cfg(not(feature = "interruptible"))]
864                    let fn_id = Some(fn_id);
865                    #[cfg(feature = "interruptible")]
866                    let (fn_id, interrupted) = fn_id_from_interrupt(fn_id_poll_outcome);
867
868                    if let Some(fn_id) = fn_id {
869                        let mut r#fn = fn_mut_refs[fn_id.index()]
870                            .try_write()
871                            .expect("Expected to borrow fn mutably.");
872                        fn_for_each(&mut r#fn).await;
873                        fn_done_send_locked(fn_done_tx, fn_id).await;
874                        fns_remaining_decrement(fns_remaining, fn_done_tx).await;
875                    }
876
877                    #[cfg(feature = "interruptible")]
878                    fn_done_tx_drop_if_interrupted(fn_done_tx, interrupted).await;
879                },
880            )
881            .await;
882
883            fn_ids_processed
884        };
885
886        let ((), fn_ids_processed) = futures::join!(queuer, scheduler);
887        let stream_outcome_state = stream_outcome_state_after_stream(*fns_remaining.read().await);
888
889        StreamOutcome::new(graph_structure, (), stream_outcome_state, fn_ids_processed)
890    }
891
892    #[cfg(feature = "async")]
897    pub async fn try_fold_async<'f, E, Seed, FnTryFold>(
898        &'f self,
899        seed: Seed,
900        fn_try_fold: FnTryFold,
901    ) -> Result<StreamOutcome<Seed>, E>
902    where
903        for<'iter> FnTryFold:
904            Fn(Seed, FnWrapper<'iter, 'f, F>) -> LocalBoxFuture<'iter, Result<Seed, E>>,
905        F: 'f,
906    {
907        self.try_fold_async_internal(seed, StreamOpts::default(), fn_try_fold)
908            .await
909    }
910
911    #[cfg(feature = "async")]
927    pub async fn try_fold_async_with<'f, E, Seed, FnTryFold>(
928        &'f self,
929        seed: Seed,
930        opts: StreamOpts<'_, '_>,
931        fn_try_fold: FnTryFold,
932    ) -> Result<StreamOutcome<Seed>, E>
933    where
934        for<'iter> FnTryFold:
935            Fn(Seed, FnWrapper<'iter, 'f, F>) -> LocalBoxFuture<'iter, Result<Seed, E>>,
936        F: 'f,
937    {
938        self.try_fold_async_internal(seed, opts, fn_try_fold).await
939    }
940
941    #[cfg(feature = "async")]
942    async fn try_fold_async_internal<'f, E, Seed, FnTryFold>(
943        &'f self,
944        seed: Seed,
945        opts: StreamOpts<'_, '_>,
946        fn_try_fold: FnTryFold,
947    ) -> Result<StreamOutcome<Seed>, E>
948    where
949        for<'iter> FnTryFold:
950            Fn(Seed, FnWrapper<'iter, 'f, F>) -> LocalBoxFuture<'iter, Result<Seed, E>>,
951        F: 'f,
952    {
953        let FnGraph {
954            graph,
955            graph_structure,
956            graph_structure_rev,
957            ranks: _,
958            edge_counts,
959        } = self;
960
961        let StreamOpts {
962            stream_order,
963            #[cfg(feature = "interruptible")]
964            interruptibility_state,
965            #[cfg(feature = "interruptible")]
966            interrupted_next_item_include,
967            marker: _,
968        } = opts;
969
970        let StreamSetupInit {
971            graph_structure,
972            predecessor_counts,
973            fn_ready_tx,
974            fn_ready_rx,
975            fn_done_tx,
976            fn_done_rx,
977        } = stream_setup_init(
978            graph_structure,
979            graph_structure_rev,
980            edge_counts,
981            stream_order,
982        );
983
984        let queuer = fn_ready_queuer(graph_structure, predecessor_counts, fn_done_rx, fn_ready_tx);
985
986        let fns_remaining = graph_structure.node_count();
987        let mut fn_done_tx = Some(fn_done_tx);
988        if fns_remaining == 0 {
989            fn_done_tx.take();
990        }
991        let fold_stream_state = FoldStreamState {
992            graph,
993            fns_remaining,
994            fn_done_tx,
995            seed,
996            fn_fold: fn_try_fold,
997        };
998        let scheduler = async move {
999            let mut fn_ids_processed = Vec::with_capacity(graph_structure.node_count());
1000            let seed_result = poll_and_track_fn_ready(
1001                fn_ready_rx,
1002                &mut fn_ids_processed,
1003                #[cfg(feature = "interruptible")]
1004                interruptibility_state,
1005                #[cfg(feature = "interruptible")]
1006                interrupted_next_item_include,
1007            )
1008            .map(Result::<_, E>::Ok)
1009            .try_fold(
1010                fold_stream_state,
1011                |fold_stream_state,
1012                 #[cfg(not(feature = "interruptible"))] fn_id,
1013                 #[cfg(feature = "interruptible")] fn_id_poll_outcome| async move {
1014                    #[cfg(not(feature = "interruptible"))]
1015                    let fn_id = Some(fn_id);
1016                    #[cfg(feature = "interruptible")]
1017                    let (fn_id, interrupted) = fn_id_from_interrupt(fn_id_poll_outcome);
1018
1019                    let FoldStreamState {
1020                        graph,
1021                        mut fns_remaining,
1022                        mut fn_done_tx,
1023                        mut seed,
1024                        fn_fold: fn_try_fold,
1025                    } = fold_stream_state;
1026
1027                    if let Some(fn_id) = fn_id {
1028                        let r#fn = &graph[fn_id];
1029                        seed = fn_try_fold(seed, FnWrapper::new(r#fn)).await?;
1030                        if let Some(fn_done_tx) = fn_done_tx.as_ref() {
1031                            fn_done_send(fn_done_tx, fn_id).await;
1032                        }
1033
1034                        fns_remaining -= 1;
1036                    }
1037                    if fns_remaining == 0 {
1038                        fn_done_tx.take();
1039                    }
1040
1041                    #[cfg(feature = "interruptible")]
1042                    if interrupted {
1043                        fn_done_tx.take();
1044                    }
1045
1046                    let fold_stream_state = FoldStreamState {
1047                        graph,
1048                        fns_remaining,
1049                        fn_done_tx,
1050                        seed,
1051                        fn_fold: fn_try_fold,
1052                    };
1053
1054                    Ok(fold_stream_state)
1055                },
1056            )
1057            .await
1058            .map(|fold_stream_state| {
1059                let FoldStreamState {
1060                    graph: _,
1061                    fns_remaining,
1062                    fn_done_tx: _,
1063                    seed,
1064                    fn_fold: _,
1065                } = fold_stream_state;
1066
1067                (seed, fns_remaining)
1068            });
1069
1070            seed_result.map(|(seed, fns_remaining)| {
1071                let stream_outcome_state = stream_outcome_state_after_stream(fns_remaining);
1072                StreamOutcome::new(
1073                    graph_structure,
1074                    seed,
1075                    stream_outcome_state,
1076                    fn_ids_processed,
1077                )
1078            })
1079        };
1080
1081        let ((), stream_outcome) = futures::join!(queuer, scheduler);
1082
1083        stream_outcome
1084    }
1085
1086    #[cfg(feature = "async")]
1091    pub async fn try_fold_async_mut<'f, E, Seed, FnTryFold>(
1092        &'f mut self,
1093        seed: Seed,
1094        fn_try_fold: FnTryFold,
1095    ) -> Result<StreamOutcome<Seed>, E>
1096    where
1097        for<'iter> FnTryFold:
1098            FnMut(Seed, FnWrapperMut<'iter, 'f, F>) -> LocalBoxFuture<'iter, Result<Seed, E>>,
1099        F: 'f,
1100    {
1101        self.try_fold_async_mut_internal(seed, StreamOpts::default(), fn_try_fold)
1102            .await
1103    }
1104
1105    #[cfg(feature = "async")]
1121    pub async fn try_fold_async_mut_with<'f, E, Seed, FnTryFold>(
1122        &'f mut self,
1123        seed: Seed,
1124        opts: StreamOpts<'_, '_>,
1125        fn_try_fold: FnTryFold,
1126    ) -> Result<StreamOutcome<Seed>, E>
1127    where
1128        for<'iter> FnTryFold:
1129            FnMut(Seed, FnWrapperMut<'iter, 'f, F>) -> LocalBoxFuture<'iter, Result<Seed, E>>,
1130        F: 'f,
1131    {
1132        self.try_fold_async_mut_internal(seed, opts, fn_try_fold)
1133            .await
1134    }
1135
1136    #[cfg(feature = "async")]
1137    async fn try_fold_async_mut_internal<'f, E, Seed, FnTryFold>(
1138        &'f mut self,
1139        seed: Seed,
1140        opts: StreamOpts<'_, '_>,
1141        fn_try_fold: FnTryFold,
1142    ) -> Result<StreamOutcome<Seed>, E>
1143    where
1144        for<'iter> FnTryFold:
1145            FnMut(Seed, FnWrapperMut<'iter, 'f, F>) -> LocalBoxFuture<'iter, Result<Seed, E>>,
1146        F: 'f,
1147    {
1148        let &mut FnGraph {
1149            ref mut graph,
1150            ref graph_structure,
1151            ref graph_structure_rev,
1152            ranks: _,
1153            ref edge_counts,
1154        } = self;
1155
1156        let StreamOpts {
1157            stream_order,
1158            #[cfg(feature = "interruptible")]
1159            interruptibility_state,
1160            #[cfg(feature = "interruptible")]
1161            interrupted_next_item_include,
1162            marker: _,
1163        } = opts;
1164
1165        let StreamSetupInit {
1166            graph_structure,
1167            predecessor_counts,
1168            fn_ready_tx,
1169            fn_ready_rx,
1170            fn_done_tx,
1171            fn_done_rx,
1172        } = stream_setup_init(
1173            graph_structure,
1174            graph_structure_rev,
1175            edge_counts,
1176            stream_order,
1177        );
1178
1179        let queuer = fn_ready_queuer(graph_structure, predecessor_counts, fn_done_rx, fn_ready_tx);
1180
1181        let fns_remaining = graph_structure.node_count();
1182        let mut fn_done_tx = Some(fn_done_tx);
1183        if fns_remaining == 0 {
1184            fn_done_tx.take();
1185        }
1186        let fold_stream_state = FoldStreamStateMut {
1187            graph,
1188            fns_remaining,
1189            fn_done_tx,
1190            seed,
1191            fn_fold: fn_try_fold,
1192        };
1193        let scheduler = async move {
1194            let mut fn_ids_processed = Vec::with_capacity(graph_structure.node_count());
1195            let seed_result = poll_and_track_fn_ready(
1196                fn_ready_rx,
1197                &mut fn_ids_processed,
1198                #[cfg(feature = "interruptible")]
1199                interruptibility_state,
1200                #[cfg(feature = "interruptible")]
1201                interrupted_next_item_include,
1202            )
1203            .map(Result::<_, E>::Ok)
1204            .try_fold(
1205                fold_stream_state,
1206                |fold_stream_state,
1207                 #[cfg(not(feature = "interruptible"))] fn_id,
1208                 #[cfg(feature = "interruptible")] fn_id_poll_outcome| async move {
1209                    #[cfg(not(feature = "interruptible"))]
1210                    let fn_id = Some(fn_id);
1211                    #[cfg(feature = "interruptible")]
1212                    let (fn_id, interrupted) = fn_id_from_interrupt(fn_id_poll_outcome);
1213
1214                    let FoldStreamStateMut {
1215                        graph,
1216                        mut fns_remaining,
1217                        mut fn_done_tx,
1218                        mut seed,
1219                        fn_fold: mut fn_try_fold,
1220                    } = fold_stream_state;
1221
1222                    if let Some(fn_id) = fn_id {
1223                        let r#fn = &mut graph[fn_id];
1224                        seed = fn_try_fold(seed, FnWrapperMut::new(r#fn)).await?;
1225                        if let Some(fn_done_tx) = fn_done_tx.as_ref() {
1226                            fn_done_send(fn_done_tx, fn_id).await;
1227                        }
1228
1229                        fns_remaining -= 1;
1231                    }
1232
1233                    if fns_remaining == 0 {
1234                        fn_done_tx.take();
1235                    }
1236
1237                    #[cfg(feature = "interruptible")]
1238                    if interrupted {
1239                        fn_done_tx.take();
1240                    }
1241
1242                    let fold_stream_state = FoldStreamStateMut {
1243                        graph,
1244                        fns_remaining,
1245                        fn_done_tx,
1246                        seed,
1247                        fn_fold: fn_try_fold,
1248                    };
1249
1250                    Ok(fold_stream_state)
1251                },
1252            )
1253            .await
1254            .map(|fold_stream_state| {
1255                let FoldStreamStateMut {
1256                    graph: _,
1257                    fns_remaining,
1258                    fn_done_tx: _,
1259                    seed,
1260                    fn_fold: _,
1261                } = fold_stream_state;
1262
1263                (seed, fns_remaining)
1264            });
1265
1266            seed_result.map(|(seed, fns_remaining)| {
1267                let stream_outcome_state = stream_outcome_state_after_stream(fns_remaining);
1268                StreamOutcome::new(
1269                    graph_structure,
1270                    seed,
1271                    stream_outcome_state,
1272                    fn_ids_processed,
1273                )
1274            })
1275        };
1276
1277        let ((), stream_outcome) = futures::join!(queuer, scheduler);
1278
1279        stream_outcome
1280    }
1281
1282    #[cfg(feature = "async")]
1298    pub async fn try_for_each_concurrent<'f, E, FnTryForEach, Fut>(
1299        &'f self,
1300        limit: impl Into<Option<usize>>,
1301        fn_try_for_each: FnTryForEach,
1302    ) -> Result<StreamOutcome<()>, (StreamOutcome<()>, Vec<E>)>
1303    where
1304        E: Debug,
1305        FnTryForEach: Fn(&'f F) -> Fut,
1306        Fut: Future<Output = Result<(), E>> + 'f,
1307    {
1308        self.try_for_each_concurrent_internal(limit, StreamOpts::default(), fn_try_for_each)
1309            .await
1310    }
1311
1312    #[cfg(feature = "async")]
1328    pub async fn try_for_each_concurrent_with<'f, E, FnTryForEach, Fut>(
1329        &'f self,
1330        limit: impl Into<Option<usize>>,
1331        opts: StreamOpts<'f, 'f>,
1332        fn_try_for_each: FnTryForEach,
1333    ) -> Result<StreamOutcome<()>, (StreamOutcome<()>, Vec<E>)>
1334    where
1335        E: Debug,
1336        FnTryForEach: Fn(&'f F) -> Fut,
1337        Fut: Future<Output = Result<(), E>> + 'f,
1338    {
1339        self.try_for_each_concurrent_internal(limit, opts, fn_try_for_each)
1340            .await
1341    }
1342
1343    #[cfg(feature = "async")]
1357    pub async fn try_for_each_concurrent_control<'f, E, FnTryForEach, Fut>(
1358        &'f self,
1359        limit: impl Into<Option<usize>>,
1360        fn_try_for_each: FnTryForEach,
1361    ) -> ControlFlow<(StreamOutcome<()>, Vec<E>), StreamOutcome<()>>
1362    where
1363        E: Debug,
1364        FnTryForEach: Fn(&'f F) -> Fut,
1365        Fut: Future<Output = ControlFlow<E, ()>> + 'f,
1366    {
1367        let result = self
1368            .try_for_each_concurrent_internal(limit, StreamOpts::default(), |f| {
1369                let fut = fn_try_for_each(f);
1370                async move {
1371                    match fut.await {
1372                        ControlFlow::Continue(()) => Result::Ok(()),
1373                        ControlFlow::Break(e) => Result::Err(e),
1374                    }
1375                }
1376            })
1377            .await;
1378        match result {
1379            Result::Ok(outcome) => match outcome.state {
1380                StreamOutcomeState::NotStarted | StreamOutcomeState::Interrupted => {
1381                    ControlFlow::Break((outcome, Vec::new()))
1382                }
1383                StreamOutcomeState::Finished => ControlFlow::Continue(outcome),
1384            },
1385            Result::Err(outcome_and_err) => ControlFlow::Break(outcome_and_err),
1386        }
1387    }
1388
1389    #[cfg(feature = "async")]
1403    pub async fn try_for_each_concurrent_control_with<'f, E, FnTryForEach, Fut>(
1404        &'f self,
1405        limit: impl Into<Option<usize>>,
1406        opts: StreamOpts<'f, 'f>,
1407        fn_try_for_each: FnTryForEach,
1408    ) -> ControlFlow<(StreamOutcome<()>, Vec<E>), StreamOutcome<()>>
1409    where
1410        E: Debug,
1411        FnTryForEach: Fn(&'f F) -> Fut,
1412        Fut: Future<Output = ControlFlow<E, ()>> + 'f,
1413    {
1414        let result = self
1415            .try_for_each_concurrent_internal(limit, opts, |f| {
1416                let fut = fn_try_for_each(f);
1417                async move {
1418                    match fut.await {
1419                        ControlFlow::Continue(()) => Result::Ok(()),
1420                        ControlFlow::Break(e) => Result::Err(e),
1421                    }
1422                }
1423            })
1424            .await;
1425        match result {
1426            Result::Ok(outcome) => match outcome.state {
1427                StreamOutcomeState::NotStarted | StreamOutcomeState::Interrupted => {
1428                    ControlFlow::Break((outcome, Vec::new()))
1429                }
1430                StreamOutcomeState::Finished => ControlFlow::Continue(outcome),
1431            },
1432            Result::Err(outcome_and_err) => ControlFlow::Break(outcome_and_err),
1433        }
1434    }
1435
1436    #[cfg(feature = "async")]
1438    async fn try_for_each_concurrent_internal<'f, E, FnTryForEach, Fut>(
1439        &'f self,
1440        limit: impl Into<Option<usize>>,
1441        opts: StreamOpts<'f, 'f>,
1442        fn_try_for_each: FnTryForEach,
1443    ) -> Result<StreamOutcome<()>, (StreamOutcome<()>, Vec<E>)>
1444    where
1445        E: Debug,
1446        FnTryForEach: Fn(&'f F) -> Fut,
1447        Fut: Future<Output = Result<(), E>> + 'f,
1448    {
1449        let FnGraph {
1450            graph: _,
1451            graph_structure,
1452            graph_structure_rev,
1453            ranks: _,
1454            edge_counts,
1455        } = self;
1456
1457        let StreamSetupInitConcurrent {
1458            graph_structure,
1459            fn_ready_rx,
1460            queuer,
1461            fn_done_tx,
1462            fns_remaining,
1463        } = stream_setup_init_concurrent(graph_structure, graph_structure_rev, edge_counts, &opts);
1464
1465        let StreamOpts {
1466            stream_order: _,
1467            #[cfg(feature = "interruptible")]
1468            interruptibility_state,
1469            #[cfg(feature = "interruptible")]
1470            interrupted_next_item_include,
1471            marker: _,
1472        } = opts;
1473
1474        let channel_capacity = std::cmp::max(1, graph_structure.node_count());
1475        let (result_tx, mut result_rx) = mpsc::channel(channel_capacity);
1476
1477        let fn_done_tx = &fn_done_tx;
1478        let fn_try_for_each = &fn_try_for_each;
1479        let fns_remaining = &fns_remaining;
1480        let fn_refs = &self.graph;
1481
1482        if graph_structure.node_count() == 0 {
1483            fn_done_tx.write().await.take();
1484        }
1485        let scheduler = async move {
1486            let result_tx_ref = &result_tx;
1487
1488            let mut fn_ids_processed = Vec::with_capacity(graph_structure.node_count());
1489            poll_and_track_fn_ready(
1490                fn_ready_rx,
1491                &mut fn_ids_processed,
1492                #[cfg(feature = "interruptible")]
1493                interruptibility_state,
1494                #[cfg(feature = "interruptible")]
1495                interrupted_next_item_include,
1496            )
1497            .for_each_concurrent(
1498                limit,
1499                |#[cfg(not(feature = "interruptible"))] fn_id,
1500                 #[cfg(feature = "interruptible")] fn_id_poll_outcome| async move {
1501                    #[cfg(not(feature = "interruptible"))]
1502                    let fn_id = Some(fn_id);
1503                    #[cfg(feature = "interruptible")]
1504                    let (fn_id, interrupted) = fn_id_from_interrupt(fn_id_poll_outcome);
1505
1506                    if let Some(fn_id) = fn_id {
1507                        let r#fn = fn_refs.node_weight(fn_id).expect("Expected to borrow fn.");
1508                        if let Err(e) = fn_try_for_each(r#fn).await {
1509                            result_tx_ref
1510                                .send(e)
1511                                .await
1512                                .expect("Scheduler failed to send Err result in `result_tx`.");
1513
1514                            fn_done_tx.write().await.take();
1517                        };
1518
1519                        fn_done_send_locked(fn_done_tx, fn_id).await;
1520                        fns_remaining_decrement(fns_remaining, fn_done_tx).await;
1521                    }
1522
1523                    #[cfg(feature = "interruptible")]
1524                    fn_done_tx_drop_if_interrupted(fn_done_tx, interrupted).await;
1525                },
1526            )
1527            .await;
1528
1529            drop(result_tx);
1530
1531            fn_ids_processed
1532        };
1533
1534        let ((), fn_ids_processed) = futures::join!(queuer, scheduler);
1535        let stream_outcome_state = stream_outcome_state_after_stream(*fns_remaining.read().await);
1536        let stream_outcome =
1537            StreamOutcome::new(graph_structure, (), stream_outcome_state, fn_ids_processed);
1538
1539        let results = stream::poll_fn(move |ctx| result_rx.poll_recv(ctx))
1540            .collect::<Vec<E>>()
1541            .await;
1542
1543        if results.is_empty() {
1544            Ok(stream_outcome)
1545        } else {
1546            Err((stream_outcome, results))
1547        }
1548    }
1549
1550    #[cfg(feature = "async")]
1566    pub async fn try_for_each_concurrent_mut<E, FnTryForEach, Fut>(
1567        &mut self,
1568        limit: impl Into<Option<usize>>,
1569        fn_try_for_each: FnTryForEach,
1570    ) -> Result<StreamOutcome<()>, (StreamOutcome<()>, Vec<E>)>
1571    where
1572        E: Debug,
1573        FnTryForEach: Fn(&mut F) -> Fut,
1574        Fut: Future<Output = Result<(), E>>,
1575    {
1576        self.try_for_each_concurrent_mut_internal(limit, StreamOpts::default(), fn_try_for_each)
1577            .await
1578    }
1579
1580    #[cfg(feature = "async")]
1596    pub async fn try_for_each_concurrent_mut_with<'f, E, FnTryForEach, Fut>(
1597        &mut self,
1598        limit: impl Into<Option<usize>>,
1599        opts: StreamOpts<'f, 'f>,
1600        fn_try_for_each: FnTryForEach,
1601    ) -> Result<StreamOutcome<()>, (StreamOutcome<()>, Vec<E>)>
1602    where
1603        E: Debug,
1604        FnTryForEach: Fn(&mut F) -> Fut,
1605        Fut: Future<Output = Result<(), E>>,
1606    {
1607        self.try_for_each_concurrent_mut_internal(limit, opts, fn_try_for_each)
1608            .await
1609    }
1610
1611    #[cfg(feature = "async")]
1625    pub async fn try_for_each_concurrent_control_mut<E, FnTryForEach, Fut>(
1626        &mut self,
1627        limit: impl Into<Option<usize>>,
1628        fn_try_for_each: FnTryForEach,
1629    ) -> ControlFlow<(StreamOutcome<()>, Vec<E>), StreamOutcome<()>>
1630    where
1631        E: Debug,
1632        FnTryForEach: Fn(&mut F) -> Fut,
1633        Fut: Future<Output = ControlFlow<E, ()>>,
1634    {
1635        let result = self
1636            .try_for_each_concurrent_mut_internal(limit, StreamOpts::default(), |f| {
1637                let fut = fn_try_for_each(f);
1638                async move {
1639                    match fut.await {
1640                        ControlFlow::Continue(()) => Result::Ok(()),
1641                        ControlFlow::Break(e) => Result::Err(e),
1642                    }
1643                }
1644            })
1645            .await;
1646        match result {
1647            Result::Ok(outcome) => match outcome.state {
1648                StreamOutcomeState::NotStarted | StreamOutcomeState::Interrupted => {
1649                    ControlFlow::Break((outcome, Vec::new()))
1650                }
1651                StreamOutcomeState::Finished => ControlFlow::Continue(outcome),
1652            },
1653            Result::Err(outcome_and_err) => ControlFlow::Break(outcome_and_err),
1654        }
1655    }
1656
1657    #[cfg(feature = "async")]
1671    pub async fn try_for_each_concurrent_control_mut_with<'f, E, FnTryForEach, Fut>(
1672        &mut self,
1673        limit: impl Into<Option<usize>>,
1674        opts: StreamOpts<'f, 'f>,
1675        fn_try_for_each: FnTryForEach,
1676    ) -> ControlFlow<(StreamOutcome<()>, Vec<E>), StreamOutcome<()>>
1677    where
1678        E: Debug,
1679        FnTryForEach: Fn(&mut F) -> Fut,
1680        Fut: Future<Output = ControlFlow<E, ()>>,
1681    {
1682        let result = self
1683            .try_for_each_concurrent_mut_internal(limit, opts, |f| {
1684                let fut = fn_try_for_each(f);
1685                async move {
1686                    match fut.await {
1687                        ControlFlow::Continue(()) => Result::Ok(()),
1688                        ControlFlow::Break(e) => Result::Err(e),
1689                    }
1690                }
1691            })
1692            .await;
1693        match result {
1694            Result::Ok(outcome) => match outcome.state {
1695                StreamOutcomeState::NotStarted | StreamOutcomeState::Interrupted => {
1696                    ControlFlow::Break((outcome, Vec::new()))
1697                }
1698                StreamOutcomeState::Finished => ControlFlow::Continue(outcome),
1699            },
1700            Result::Err(outcome_and_err) => ControlFlow::Break(outcome_and_err),
1701        }
1702    }
1703
1704    #[cfg(feature = "async")]
1706    async fn try_for_each_concurrent_mut_internal<'f, E, FnTryForEach, Fut>(
1707        &mut self,
1708        limit: impl Into<Option<usize>>,
1709        opts: StreamOpts<'f, 'f>,
1710        fn_try_for_each: FnTryForEach,
1711    ) -> Result<StreamOutcome<()>, (StreamOutcome<()>, Vec<E>)>
1712    where
1713        E: Debug,
1714        FnTryForEach: Fn(&mut F) -> Fut,
1715        Fut: Future<Output = Result<(), E>>,
1716    {
1717        let &mut FnGraph {
1718            graph: _,
1719            ref graph_structure,
1720            ref graph_structure_rev,
1721            ranks: _,
1722            ref edge_counts,
1723        } = self;
1724
1725        let StreamSetupInitConcurrent {
1726            graph_structure,
1727            fn_ready_rx,
1728            queuer,
1729            fn_done_tx,
1730            fns_remaining,
1731        } = stream_setup_init_concurrent(graph_structure, graph_structure_rev, edge_counts, &opts);
1732
1733        let StreamOpts {
1734            stream_order: _,
1735            #[cfg(feature = "interruptible")]
1736            interruptibility_state,
1737            #[cfg(feature = "interruptible")]
1738            interrupted_next_item_include,
1739            marker: _,
1740        } = opts;
1741
1742        let channel_capacity = std::cmp::max(1, graph_structure.node_count());
1743        let (result_tx, mut result_rx) = mpsc::channel(channel_capacity);
1744
1745        let fn_done_tx = &fn_done_tx;
1746        let fn_try_for_each = &fn_try_for_each;
1747        let fns_remaining = &fns_remaining;
1748        let fn_mut_refs = self
1749            .graph
1750            .node_weights_mut()
1751            .map(RwLock::new)
1752            .collect::<Vec<_>>();
1753        let fn_mut_refs = &fn_mut_refs;
1754        let scheduler = async move {
1755            let result_tx_ref = &result_tx;
1756
1757            let mut fn_ids_processed = Vec::with_capacity(graph_structure.node_count());
1758            poll_and_track_fn_ready(
1759                fn_ready_rx,
1760                &mut fn_ids_processed,
1761                #[cfg(feature = "interruptible")]
1762                interruptibility_state,
1763                #[cfg(feature = "interruptible")]
1764                interrupted_next_item_include,
1765            )
1766            .for_each_concurrent(
1767                limit,
1768                |#[cfg(not(feature = "interruptible"))] fn_id,
1769                 #[cfg(feature = "interruptible")] fn_id_poll_outcome| async move {
1770                    #[cfg(not(feature = "interruptible"))]
1771                    let fn_id = Some(fn_id);
1772                    #[cfg(feature = "interruptible")]
1773                    let (fn_id, interrupted) = fn_id_from_interrupt(fn_id_poll_outcome);
1774
1775                    if let Some(fn_id) = fn_id {
1776                        let mut r#fn = fn_mut_refs[fn_id.index()]
1777                            .try_write()
1778                            .expect("Expected to borrow fn mutably.");
1779                        if let Err(e) = fn_try_for_each(&mut r#fn).await {
1780                            result_tx_ref
1781                                .send(e)
1782                                .await
1783                                .expect("Scheduler failed to send Err result in `result_tx`.");
1784
1785                            fn_done_tx.write().await.take();
1788                        };
1789
1790                        fn_done_send_locked(fn_done_tx, fn_id).await;
1791                        fns_remaining_decrement(fns_remaining, fn_done_tx).await;
1792                    }
1793
1794                    #[cfg(feature = "interruptible")]
1795                    fn_done_tx_drop_if_interrupted(fn_done_tx, interrupted).await;
1796                },
1797            )
1798            .await;
1799
1800            drop(result_tx);
1801
1802            fn_ids_processed
1803        };
1804
1805        let ((), fn_ids_processed) = futures::join!(queuer, scheduler);
1806        let stream_outcome_state = stream_outcome_state_after_stream(*fns_remaining.read().await);
1807        let stream_outcome =
1808            StreamOutcome::new(graph_structure, (), stream_outcome_state, fn_ids_processed);
1809
1810        let results = stream::poll_fn(move |ctx| result_rx.poll_recv(ctx))
1811            .collect::<Vec<E>>()
1812            .await;
1813
1814        if results.is_empty() {
1815            Ok(stream_outcome)
1816        } else {
1817            Err((stream_outcome, results))
1818        }
1819    }
1820
1821    pub fn map<'f, FnMap, FnMapRet>(
1823        &'f mut self,
1824        mut fn_map: FnMap,
1825    ) -> impl Iterator<Item = FnMapRet> + 'f
1826    where
1827        FnMap: FnMut(&mut F) -> FnMapRet + 'f,
1828    {
1829        let mut topo = Topo::new(&self.graph);
1830        let graph = &mut self.graph;
1831        std::iter::from_fn(move || {
1832            topo.next(&*graph).map(|fn_id| {
1833                let r#fn = &mut graph[fn_id];
1834                fn_map(r#fn)
1835            })
1836        })
1837    }
1838
1839    pub fn fold<Seed, FnFold>(&mut self, mut seed: Seed, mut fn_fold: FnFold) -> Seed
1841    where
1842        FnFold: FnMut(Seed, &mut F) -> Seed,
1843    {
1844        let mut topo = Topo::new(&self.graph);
1845        let graph = &mut self.graph;
1846        while let Some(r#fn) = topo.next(&*graph).map(|fn_id| &mut graph[fn_id]) {
1847            seed = fn_fold(seed, r#fn);
1848        }
1849        seed
1850    }
1851
1852    pub fn try_fold<Seed, FnFold, E>(
1855        &mut self,
1856        mut seed: Seed,
1857        mut fn_fold: FnFold,
1858    ) -> Result<Seed, E>
1859    where
1860        FnFold: FnMut(Seed, &mut F) -> Result<Seed, E>,
1861    {
1862        let mut topo = Topo::new(&self.graph);
1863        let graph = &mut self.graph;
1864        while let Some(r#fn) = topo.next(&*graph).map(|fn_id| &mut graph[fn_id]) {
1865            seed = fn_fold(seed, r#fn)?;
1866        }
1867        Ok(seed)
1868    }
1869
1870    pub fn for_each<FnForEach>(&mut self, mut fn_for_each: FnForEach)
1872    where
1873        FnForEach: FnMut(&mut F),
1874    {
1875        let mut topo = Topo::new(&self.graph);
1876        let graph = &mut self.graph;
1877        while let Some(r#fn) = topo.next(&*graph).map(|fn_id| &mut graph[fn_id]) {
1878            fn_for_each(r#fn);
1879        }
1880    }
1881
1882    pub fn try_for_each<FnForEach, E>(&mut self, mut fn_for_each: FnForEach) -> Result<(), E>
1884    where
1885        FnForEach: FnMut(&mut F) -> Result<(), E>,
1886    {
1887        let mut topo = Topo::new(&self.graph);
1888        let graph = &mut self.graph;
1889        while let Some(r#fn) = topo.next(&*graph).map(|fn_id| &mut graph[fn_id]) {
1890            fn_for_each(r#fn)?;
1891        }
1892        Ok(())
1893    }
1894
1895    pub fn iter_insertion(&self) -> impl ExactSizeIterator<Item = &F> + DoubleEndedIterator {
1899        use daggy::petgraph::visit::IntoNodeReferences;
1900        self.graph.node_references().map(|(_, function)| function)
1901    }
1902
1903    pub fn iter_insertion_mut(&mut self) -> NodeWeightsMut<F, FnIdInner> {
1907        self.graph.node_weights_mut()
1908    }
1909
1910    pub fn iter_insertion_with_indices(&self) -> NodeReferences<F, FnIdInner> {
1914        use daggy::petgraph::visit::IntoNodeReferences;
1915        self.graph.node_references()
1916    }
1917}
1918
1919#[cfg(all(feature = "async", feature = "interruptible"))]
1920fn fn_id_from_interrupt(
1921    fn_id_poll_outcome: PollOutcome<NodeIndex<FnIdInner>>,
1922) -> (Option<NodeIndex<FnIdInner>>, bool) {
1923    let (fn_id, interrupted) = match fn_id_poll_outcome {
1924        PollOutcome::Interrupted(fn_id) => (fn_id, true),
1925        PollOutcome::NoInterrupt(fn_id) => (Some(fn_id), false),
1926    };
1927
1928    (fn_id, interrupted)
1929}
1930
1931#[cfg(feature = "async")]
1938fn stream_outcome_state_after_stream(fns_remaining: usize) -> StreamOutcomeState {
1939    match fns_remaining {
1940        0 => StreamOutcomeState::Finished,
1941        _ => StreamOutcomeState::Interrupted,
1942    }
1943}
1944
1945#[cfg(feature = "async")]
1947fn fns_no_predecessors<'f>(
1948    graph_structure: &'f Dag<(), Edge, FnIdInner>,
1949    predecessor_counts: &'f [usize],
1950) -> impl Iterator<Item = FnId> + 'f {
1951    Topo::new(graph_structure)
1952        .iter(graph_structure)
1953        .filter(|fn_id| predecessor_counts[fn_id.index()] == 0)
1954}
1955
1956#[cfg(feature = "async")]
1959fn fns_no_predecessors_preload(
1960    graph_structure: &Dag<(), Edge, FnIdInner>,
1961    predecessor_counts: &[usize],
1962    fn_ready_tx: &Sender<NodeIndex<FnIdInner>>,
1963) {
1964    fns_no_predecessors(graph_structure, predecessor_counts)
1965        .try_for_each(|fn_id| fn_ready_tx.try_send(fn_id))
1966        .expect("Failed to preload function with no predecessors.");
1967}
1968
1969#[cfg(feature = "async")]
1973async fn fn_done_send_locked(
1974    fn_done_tx: &RwLock<Option<Sender<NodeIndex<FnIdInner>>>>,
1975    fn_id: NodeIndex<FnIdInner>,
1976) {
1977    if let Some(fn_done_tx) = fn_done_tx.read().await.as_ref() {
1978        fn_done_send(fn_done_tx, fn_id).await;
1979    }
1980}
1981
1982#[cfg(feature = "async")]
1986async fn fn_done_send(fn_done_tx: &Sender<NodeIndex<FnIdInner>>, fn_id: NodeIndex<FnIdInner>) {
1987    let fn_done_send_result = fn_done_tx.send(fn_id).await;
1988
1989    match fn_done_send_result {
1990        Ok(()) => {}
1991        Err(SendError(_fn_id)) => {
1992            }
1995    }
1996}
1997
1998#[cfg(feature = "async")]
2001async fn fns_remaining_decrement(
2002    fns_remaining: &RwLock<usize>,
2003    fn_done_tx: &RwLock<Option<Sender<NodeIndex<FnIdInner>>>>,
2004) {
2005    let fns_remaining_val = {
2006        let mut fns_remaining_ref = fns_remaining.write().await;
2007        *fns_remaining_ref -= 1;
2008        *fns_remaining_ref
2009    };
2010    if fns_remaining_val == 0 {
2011        fn_done_tx.write().await.take();
2012    }
2013}
2014
2015#[cfg(all(feature = "async", feature = "interruptible"))]
2017async fn fn_done_tx_drop_if_interrupted(
2018    fn_done_tx: &RwLock<Option<Sender<NodeIndex<FnIdInner>>>>,
2019    interrupted: bool,
2020) {
2021    if interrupted {
2022        fn_done_tx.write().await.take();
2023    }
2024}
2025
2026#[cfg(feature = "async")]
2027fn stream_setup_init<'f>(
2028    graph_structure: &'f Dag<(), Edge, FnIdInner>,
2029    graph_structure_rev: &'f Dag<(), Edge, FnIdInner>,
2030    edge_counts: &EdgeCounts,
2031    stream_order: StreamOrder,
2032) -> StreamSetupInit<'f> {
2033    let (graph_structure, predecessor_counts) = match stream_order {
2034        StreamOrder::Forward => (graph_structure, edge_counts.incoming().to_vec()),
2035        StreamOrder::Reverse => (graph_structure_rev, edge_counts.outgoing().to_vec()),
2036    };
2037    let channel_capacity = std::cmp::max(1, graph_structure.node_count());
2041    let (fn_ready_tx, fn_ready_rx) = mpsc::channel(channel_capacity);
2042    let (fn_done_tx, fn_done_rx) = mpsc::channel::<FnId>(channel_capacity);
2043
2044    fns_no_predecessors_preload(graph_structure, &predecessor_counts, &fn_ready_tx);
2045
2046    StreamSetupInit {
2047        graph_structure,
2048        predecessor_counts,
2049        fn_ready_tx,
2050        fn_ready_rx,
2051        fn_done_tx,
2052        fn_done_rx,
2053    }
2054}
2055
2056#[cfg(feature = "async")]
2057fn stream_setup_init_concurrent<'f>(
2058    graph_structure: &'f Dag<(), Edge, FnIdInner>,
2059    graph_structure_rev: &'f Dag<(), Edge, FnIdInner>,
2060    edge_counts: &EdgeCounts,
2061    opts: &StreamOpts<'f, 'f>,
2062) -> StreamSetupInitConcurrent<'f, impl Future<Output = ()> + 'f + use<'f>> {
2063    let StreamOpts {
2064        stream_order,
2065        #[cfg(feature = "interruptible")]
2066            interruptibility_state: _,
2067        #[cfg(feature = "interruptible")]
2068            interrupted_next_item_include: _,
2069        marker: _,
2070    } = opts;
2071
2072    let StreamSetupInit {
2073        graph_structure,
2074        predecessor_counts,
2075        fn_ready_tx,
2076        fn_ready_rx,
2077        fn_done_tx,
2078        fn_done_rx,
2079    } = stream_setup_init(
2080        graph_structure,
2081        graph_structure_rev,
2082        edge_counts,
2083        *stream_order,
2084    );
2085
2086    let queuer = fn_ready_queuer(graph_structure, predecessor_counts, fn_done_rx, fn_ready_tx);
2087
2088    let fn_done_tx = RwLock::new(Some(fn_done_tx));
2089    let fns_remaining = graph_structure.node_count();
2090    let fns_remaining = RwLock::new(fns_remaining);
2091    StreamSetupInitConcurrent {
2092        graph_structure,
2093        fn_ready_rx,
2094        queuer,
2095        fn_done_tx,
2096        fns_remaining,
2097    }
2098}
2099
2100#[cfg(feature = "async")]
2103async fn fn_ready_queuer(
2104    graph_structure: &Dag<(), Edge, FnIdInner>,
2105    predecessor_counts: Vec<usize>,
2106    mut fn_done_rx: Receiver<FnId>,
2107    fn_ready_tx: Sender<FnId>,
2108) {
2109    let fns_remaining = graph_structure.node_count();
2110    let mut fn_ready_tx = Some(fn_ready_tx);
2111    if fns_remaining == 0 {
2112        fn_ready_tx.take();
2113    }
2114    let stream = stream::poll_fn(move |context| fn_done_rx.poll_recv(context));
2115
2116    let queuer_stream_state =
2117        QueuerStreamState::new(fns_remaining, predecessor_counts, fn_ready_tx);
2118    queuer_stream_fold(stream, queuer_stream_state, graph_structure).await;
2119}
2120
2121#[cfg(feature = "async")]
2123async fn queuer_stream_fold(
2124    stream: impl Stream<Item = NodeIndex<FnIdInner>>,
2125    queuer_stream_state: QueuerStreamState,
2126    graph_structure: &Dag<(), Edge, FnIdInner>,
2127) {
2128    stream
2129        .fold(
2130            queuer_stream_state,
2131            move |queuer_stream_state, fn_id| async move {
2132                let QueuerStreamState {
2133                    mut fns_remaining,
2134                    mut predecessor_counts,
2135                    mut fn_ready_tx,
2136                } = queuer_stream_state;
2137
2138                fns_remaining -= 1;
2140                if fns_remaining == 0 {
2141                    fn_ready_tx.take();
2142                }
2143
2144                graph_structure
2145                    .children(fn_id)
2146                    .iter(graph_structure)
2147                    .for_each(|(_edge_id, child_fn_id)| {
2148                        predecessor_counts[child_fn_id.index()] -= 1;
2149                        if predecessor_counts[child_fn_id.index()] == 0 {
2150                            if let Some(fn_ready_tx) = fn_ready_tx.as_ref() {
2151                                let _ = fn_ready_tx.try_send(child_fn_id);
2154                            }
2155                        }
2156                    });
2157
2158                QueuerStreamState {
2159                    fns_remaining,
2160                    predecessor_counts,
2161                    fn_ready_tx,
2162                }
2163            },
2164        )
2165        .await;
2166}
2167
2168#[cfg(feature = "async")]
2169fn poll_and_track_fn_ready_common(
2170    mut fn_ready_rx: Receiver<NodeIndex<FnIdInner>>,
2171    fn_ids_processed: &mut Vec<NodeIndex<FnIdInner>>,
2172) -> impl Stream<Item = FnId> + '_ {
2173    stream::poll_fn(move |context| {
2174        fn_ready_rx.poll_recv(context).map(|fn_id_opt| {
2175            fn_id_opt.inspect(|&fn_id| {
2176                fn_ids_processed.push(fn_id);
2177            })
2178        })
2179    })
2180}
2181
2182#[cfg(all(feature = "async", not(feature = "interruptible")))]
2183fn poll_and_track_fn_ready(
2184    fn_ready_rx: Receiver<NodeIndex<FnIdInner>>,
2185    fn_ids_processed: &mut Vec<NodeIndex<FnIdInner>>,
2186) -> impl Stream<Item = FnId> + '_ {
2187    poll_and_track_fn_ready_common(fn_ready_rx, fn_ids_processed)
2188}
2189
2190#[cfg(all(feature = "async", feature = "interruptible"))]
2191fn poll_and_track_fn_ready<'f>(
2192    mut fn_ready_rx: Receiver<NodeIndex<FnIdInner>>,
2193    fn_ids_processed: &'f mut Vec<NodeIndex<FnIdInner>>,
2194    interruptibility_state: InterruptibilityState<'f, 'f>,
2195    interrupted_next_item_include: bool,
2196) -> impl Stream<Item = PollOutcome<FnId>> + 'f {
2197    if interrupted_next_item_include {
2198        poll_and_track_fn_ready_common(fn_ready_rx, fn_ids_processed)
2199            .interruptible_with(interruptibility_state)
2200            .left_stream()
2201    } else {
2202        stream::poll_fn(move |context| fn_ready_rx.poll_recv(context))
2203            .interruptible_with(interruptibility_state)
2204            .filter_map(|mut fn_id_poll_outcome| {
2205                let fn_id = match &mut fn_id_poll_outcome {
2206                    PollOutcome::Interrupted(fn_id) => {
2207                        fn_id.take();
2208                        None
2209                    }
2210                    PollOutcome::NoInterrupt(fn_id) => Some(*fn_id),
2211                };
2212
2213                if let Some(fn_id) = fn_id {
2214                    fn_ids_processed.push(fn_id);
2215                }
2216
2217                futures::future::ready(Some(fn_id_poll_outcome))
2218            })
2219            .right_stream()
2220    }
2221}
2222
2223#[cfg(feature = "async")]
2224struct StreamSetupInit<'f> {
2225    graph_structure: &'f Dag<(), Edge, FnIdInner>,
2226    predecessor_counts: Vec<usize>,
2227    fn_ready_tx: Sender<NodeIndex<FnIdInner>>,
2228    fn_ready_rx: Receiver<NodeIndex<FnIdInner>>,
2229    fn_done_tx: Sender<NodeIndex<FnIdInner>>,
2230    fn_done_rx: Receiver<NodeIndex<FnIdInner>>,
2231}
2232
2233#[cfg(feature = "async")]
2234struct StreamSetupInitConcurrent<'f, QueuerFut> {
2235    graph_structure: &'f Dag<(), Edge, FnIdInner>,
2236    fn_ready_rx: Receiver<NodeIndex<FnIdInner>>,
2237    queuer: QueuerFut,
2238    fn_done_tx: RwLock<Option<Sender<NodeIndex<FnIdInner>>>>,
2239    fns_remaining: RwLock<usize>,
2240}
2241
2242#[cfg(feature = "async")]
2243struct FoldStreamState<'f, F, Seed, FnFold> {
2244    graph: &'f Dag<F, Edge, FnIdInner>,
2246    fns_remaining: usize,
2248    fn_done_tx: Option<Sender<NodeIndex<FnIdInner>>>,
2250    seed: Seed,
2252    fn_fold: FnFold,
2254}
2255
2256#[cfg(feature = "async")]
2257struct FoldStreamStateMut<'f, F, Seed, FnFold> {
2258    graph: &'f mut Dag<F, Edge, FnIdInner>,
2260    fns_remaining: usize,
2262    fn_done_tx: Option<Sender<NodeIndex<FnIdInner>>>,
2264    seed: Seed,
2266    fn_fold: FnFold,
2268}
2269
2270#[cfg(feature = "async")]
2271struct QueuerStreamState {
2272    fns_remaining: usize,
2274    predecessor_counts: Vec<usize>,
2276    fn_ready_tx: Option<Sender<NodeIndex<FnIdInner>>>,
2278}
2279
2280#[cfg(feature = "async")]
2281impl QueuerStreamState {
2282    fn new(
2283        fns_remaining: usize,
2284        predecessor_counts: Vec<usize>,
2285        fn_ready_tx: Option<Sender<NodeIndex<FnIdInner>>>,
2286    ) -> Self {
2287        Self {
2288            fns_remaining,
2289            predecessor_counts,
2290            fn_ready_tx,
2291        }
2292    }
2293}
2294
2295impl<F> Default for FnGraph<F> {
2296    fn default() -> Self {
2297        Self {
2298            graph: Dag::new(),
2299            graph_structure: Dag::new(),
2300            graph_structure_rev: Dag::new(),
2301            ranks: Vec::new(),
2302            #[cfg(feature = "async")]
2303            edge_counts: EdgeCounts::default(),
2304        }
2305    }
2306}
2307
2308impl<F> Deref for FnGraph<F> {
2309    type Target = Dag<F, Edge, FnIdInner>;
2310
2311    fn deref(&self) -> &Self::Target {
2312        &self.graph
2313    }
2314}
2315
2316impl<F> DerefMut for FnGraph<F> {
2317    fn deref_mut(&mut self) -> &mut Self::Target {
2318        &mut self.graph
2319    }
2320}
2321
2322impl<F> PartialEq for FnGraph<F>
2323where
2324    F: PartialEq,
2325{
2326    fn eq(&self, other: &Self) -> bool {
2327        if self.graph.node_count() == other.graph.node_count()
2328            && self.graph.edge_count() == other.graph.edge_count()
2329        {
2330            let (ControlFlow::Continue(edge_eq) | ControlFlow::Break(edge_eq)) = self
2331                .graph
2332                .raw_edges()
2333                .iter()
2334                .zip(other.graph.raw_edges().iter())
2335                .try_fold(true, |_, (edge_self, edge_other)| {
2336                    if edge_self.source() == edge_other.source()
2337                        && edge_self.target() == edge_other.target()
2338                        && edge_self.weight == edge_other.weight
2339                    {
2340                        ControlFlow::Continue(true)
2341                    } else {
2342                        ControlFlow::Break(false)
2343                    }
2344                });
2345
2346            if !edge_eq {
2347                return false;
2348            }
2349
2350            let (ControlFlow::Continue(f_eq) | ControlFlow::Break(f_eq)) = self
2351                .iter_insertion()
2352                .zip(other.iter_insertion())
2353                .try_fold(true, |_, (f_self, f_other)| {
2354                    if f_self == f_other {
2355                        ControlFlow::Continue(true)
2356                    } else {
2357                        ControlFlow::Break(false)
2358                    }
2359                });
2360
2361            f_eq
2362        } else {
2363            false
2364        }
2365    }
2366}
2367
2368impl<F> Eq for FnGraph<F> where F: Eq {}
2369
2370#[cfg(feature = "fn_meta")]
2371#[cfg(test)]
2372mod tests {
2373    use daggy::WouldCycle;
2374    use resman::{FnRes, IntoFnRes, Resources};
2375
2376    use super::FnGraph;
2377    use crate::{Edge, FnGraphBuilder, FnId};
2378
2379    #[test]
2380    fn clone() {
2381        let mut fn_graph = FnGraph::<u32>::new();
2382        fn_graph.add_node(0);
2383
2384        let fn_graph_clone = fn_graph.clone();
2385
2386        assert_eq!(fn_graph, fn_graph_clone);
2387    }
2388
2389    #[test]
2390    fn debug() {
2391        let mut fn_graph = FnGraph::<u32>::new();
2392        fn_graph.add_node(0);
2393
2394        let debug_str = format!("{fn_graph:?}");
2395        assert!(debug_str.starts_with("FnGraph {"));
2396    }
2397
2398    #[test]
2399    fn new_returns_empty_graph() {
2400        let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
2401
2402        assert_eq!(0, fn_graph.node_count());
2403        assert!(fn_graph.ranks().is_empty());
2404    }
2405
2406    #[test]
2407    fn partial_eq_returns_true_for_empty_graphs() {
2408        let fn_graph_a = FnGraph::<u32>::new();
2409        let fn_graph_b = FnGraph::<u32>::new();
2410
2411        assert_eq!(fn_graph_a, fn_graph_b);
2412    }
2413
2414    #[test]
2415    fn partial_eq_returns_false_for_different_value_graphs() {
2416        let mut fn_graph_a = FnGraph::<u32>::new();
2417        fn_graph_a.add_node(0);
2418
2419        let mut fn_graph_b = FnGraph::<u32>::new();
2420        fn_graph_b.add_node(1);
2421
2422        assert_ne!(fn_graph_a, fn_graph_b);
2423    }
2424
2425    #[test]
2426    fn partial_eq_returns_true_for_same_value_same_edges_graphs() -> Result<(), WouldCycle<Edge>> {
2427        let mut fn_graph_a = FnGraph::<u32>::new();
2428        let a_0 = fn_graph_a.add_node(0);
2429        let a_1 = fn_graph_a.add_node(1);
2430        fn_graph_a.add_edge(a_0, a_1, Edge::Logic)?;
2431
2432        let mut fn_graph_b = FnGraph::<u32>::new();
2433        let b_0 = fn_graph_b.add_node(0);
2434        let b_1 = fn_graph_b.add_node(1);
2435        fn_graph_b.add_edge(b_0, b_1, Edge::Logic)?;
2436
2437        assert_eq!(fn_graph_a, fn_graph_b);
2438        Ok(())
2439    }
2440
2441    #[test]
2442    fn partial_eq_returns_false_for_same_value_different_edge_direction_graphs(
2443    ) -> Result<(), WouldCycle<Edge>> {
2444        let mut fn_graph_a = FnGraph::<u32>::new();
2445        let a_0 = fn_graph_a.add_node(0);
2446        let a_1 = fn_graph_a.add_node(1);
2447        fn_graph_a.add_edge(a_0, a_1, Edge::Logic)?;
2448
2449        let mut fn_graph_b = FnGraph::<u32>::new();
2450        let b_0 = fn_graph_b.add_node(0);
2451        let b_1 = fn_graph_b.add_node(1);
2452        fn_graph_b.add_edge(b_1, b_0, Edge::Logic)?;
2453
2454        assert_ne!(fn_graph_a, fn_graph_b);
2455        Ok(())
2456    }
2457
2458    #[test]
2459    fn partial_eq_returns_false_for_same_value_different_edge_type_graphs(
2460    ) -> Result<(), WouldCycle<Edge>> {
2461        let mut fn_graph_a = FnGraph::<u32>::new();
2462        let a_0 = fn_graph_a.add_node(0);
2463        let a_1 = fn_graph_a.add_node(1);
2464        fn_graph_a.add_edge(a_0, a_1, Edge::Logic)?;
2465
2466        let mut fn_graph_b = FnGraph::<u32>::new();
2467        let b_0 = fn_graph_b.add_node(0);
2468        let b_1 = fn_graph_b.add_node(1);
2469        fn_graph_b.add_edge(b_0, b_1, Edge::Data)?;
2470
2471        assert_ne!(fn_graph_a, fn_graph_b);
2472        Ok(())
2473    }
2474
2475    #[test]
2476    fn iter_returns_fns_in_dep_order() -> Result<(), WouldCycle<Edge>> {
2477        let fn_graph = complex_graph()?;
2478
2479        let mut resources = Resources::new();
2480        resources.insert(0u8);
2481        resources.insert(0u16);
2482        let fn_iter_order = fn_graph
2483            .iter()
2484            .map(|f| f.call(&resources))
2485            .collect::<Vec<_>>();
2486
2487        assert_eq!(["f", "a", "b", "c", "d", "e"], fn_iter_order.as_slice());
2488        Ok(())
2489    }
2490
2491    #[test]
2492    fn iter_rev_returns_fns_in_dep_rev_order() -> Result<(), WouldCycle<Edge>> {
2493        let fn_graph = complex_graph()?;
2494
2495        let mut resources = Resources::new();
2496        resources.insert(0u8);
2497        resources.insert(0u16);
2498        let fn_iter_order = fn_graph
2499            .iter_rev()
2500            .map(|f| f.call(&resources))
2501            .collect::<Vec<_>>();
2502
2503        assert_eq!(["e", "d", "c", "b", "a", "f"], fn_iter_order.as_slice());
2504        Ok(())
2505    }
2506
2507    #[test]
2508    fn map_iterates_over_fns_in_dep_order() -> Result<(), WouldCycle<Edge>> {
2509        let mut fn_graph = complex_graph()?;
2510
2511        let mut resources = Resources::new();
2512        resources.insert(0u8);
2513        resources.insert(0u16);
2514        let fn_iter_order = fn_graph.map(|f| f.call(&resources)).collect::<Vec<_>>();
2515
2516        assert_eq!(["f", "a", "b", "c", "d", "e"], fn_iter_order.as_slice());
2517        Ok(())
2518    }
2519
2520    #[test]
2521    fn fold_iterates_over_fns_in_dep_order() -> Result<(), WouldCycle<Edge>> {
2522        let mut fn_graph = complex_graph()?;
2523
2524        let mut resources = Resources::new();
2525        resources.insert(0u8);
2526        resources.insert(0u16);
2527        let fn_iter_order = fn_graph.fold(
2528            Vec::with_capacity(fn_graph.node_count()),
2529            |mut fn_iter_order, f| {
2530                fn_iter_order.push(f.call(&resources));
2531                fn_iter_order
2532            },
2533        );
2534
2535        assert_eq!(["f", "a", "b", "c", "d", "e"], fn_iter_order.as_slice());
2536        Ok(())
2537    }
2538
2539    #[test]
2540    fn try_fold_iterates_over_fns_in_dep_order() -> Result<(), WouldCycle<Edge>> {
2541        let mut fn_graph = complex_graph()?;
2542
2543        let mut resources = Resources::new();
2544        resources.insert(0u8);
2545        resources.insert(0u16);
2546        let fn_iter_order = fn_graph.try_fold(
2547            Vec::with_capacity(fn_graph.node_count()),
2548            |mut fn_iter_order, f| {
2549                fn_iter_order.push(f.call(&resources));
2550                Ok(fn_iter_order)
2551            },
2552        )?;
2553
2554        assert_eq!(["f", "a", "b", "c", "d", "e"], fn_iter_order.as_slice());
2555        Ok(())
2556    }
2557
2558    #[test]
2559    fn for_each_iterates_over_fns_in_dep_order() -> Result<(), WouldCycle<Edge>> {
2560        let mut fn_graph = complex_graph()?;
2561
2562        let mut resources = Resources::new();
2563        resources.insert(0u8);
2564        resources.insert(0u16);
2565
2566        let mut fn_iter_order = Vec::with_capacity(fn_graph.node_count());
2567        fn_graph.for_each(|f| fn_iter_order.push(f.call(&resources)));
2568
2569        assert_eq!(["f", "a", "b", "c", "d", "e"], fn_iter_order.as_slice());
2570        Ok(())
2571    }
2572
2573    #[test]
2574    fn try_for_each_iterates_over_fns_in_dep_order() -> Result<(), WouldCycle<Edge>> {
2575        let mut fn_graph = complex_graph()?;
2576
2577        let mut resources = Resources::new();
2578        resources.insert(0u8);
2579        resources.insert(0u16);
2580
2581        let mut fn_iter_order = Vec::with_capacity(fn_graph.node_count());
2582        fn_graph.try_for_each(|f| {
2583            fn_iter_order.push(f.call(&resources));
2584            Ok(())
2585        })?;
2586
2587        assert_eq!(["f", "a", "b", "c", "d", "e"], fn_iter_order.as_slice());
2588        Ok(())
2589    }
2590
2591    #[test]
2592    fn iter_insertion_returns_fns() {
2593        let resources = Resources::new();
2594        let fn_graph = {
2595            let mut fn_graph_builder = FnGraphBuilder::new();
2596            fn_graph_builder.add_fn((|| 1).into_fn_res());
2597            fn_graph_builder.add_fn((|| 2).into_fn_res());
2598            fn_graph_builder.build()
2599        };
2600
2601        let call_fn = |f: &dyn FnRes<Ret = u32>| f.call(&resources);
2602        let mut fn_iter = fn_graph.iter_insertion();
2603        assert_eq!(Some(1), fn_iter.next().map(|f| call_fn(f.as_ref())));
2604        assert_eq!(Some(2), fn_iter.next().map(|f| call_fn(f.as_ref())));
2605        assert_eq!(None, fn_iter.next().map(|f| call_fn(f.as_ref())));
2606    }
2607
2608    #[test]
2609    fn iter_insertion_mut_returns_fns() {
2610        let mut resources = Resources::new();
2611        resources.insert(0usize);
2612        resources.insert(0u32);
2613        let mut fn_graph = {
2614            let mut fn_graph_builder = FnGraphBuilder::new();
2615            fn_graph_builder.add_fn(
2616                (|a: &mut usize| {
2617                    *a += 1;
2618                    *a as u32
2619                })
2620                .into_fn_res(),
2621            );
2622            fn_graph_builder.add_fn(
2623                (|b: &mut u32| {
2624                    *b += 2;
2625                    *b
2626                })
2627                .into_fn_res(),
2628            );
2629            fn_graph_builder.build()
2630        };
2631
2632        let call_fn = |f: &mut Box<dyn FnRes<Ret = u32>>| f.call(&resources);
2633        let mut fn_iter = fn_graph.iter_insertion_mut();
2634        assert_eq!(Some(1), fn_iter.next().map(call_fn));
2635        assert_eq!(Some(2), fn_iter.next().map(call_fn));
2636        assert_eq!(None, fn_iter.next().map(call_fn));
2637
2638        let mut fn_iter = fn_graph.iter_insertion_mut();
2639        assert_eq!(Some(2), fn_iter.next().map(call_fn));
2640        assert_eq!(Some(4), fn_iter.next().map(call_fn));
2641        assert_eq!(None, fn_iter.next().map(call_fn));
2642    }
2643
2644    #[test]
2645    fn iter_with_indices_returns_fns_with_indices() {
2646        let resources = Resources::new();
2647        let fn_graph = {
2648            let mut fn_graph_builder = FnGraphBuilder::new();
2649            fn_graph_builder.add_fn((|| 1u32).into_fn_res());
2650            fn_graph_builder.add_fn((|| 2u32).into_fn_res());
2651            fn_graph_builder.build()
2652        };
2653
2654        let call_fn = |(fn_id, f): (FnId, &dyn FnRes<Ret = u32>)| (fn_id, f.call(&resources));
2655        let mut fn_iter = fn_graph.iter_insertion_with_indices();
2656
2657        assert_eq!(
2658            Some((FnId::new(0), 1)),
2659            fn_iter.next().map(|(id, f)| call_fn((id, f.as_ref())))
2660        );
2661        assert_eq!(
2662            Some((FnId::new(1), 2)),
2663            fn_iter.next().map(|(id, f)| call_fn((id, f.as_ref())))
2664        );
2665        assert_eq!(
2666            None,
2667            fn_iter.next().map(|(id, f)| call_fn((id, f.as_ref())))
2668        );
2669    }
2670
2671    #[cfg(feature = "async")]
2672    mod async_tests {
2673        use std::{fmt, ops::ControlFlow};
2674
2675        use daggy::{NodeIndex, WouldCycle};
2676        use futures::{future::BoxFuture, stream, Future, FutureExt, StreamExt};
2677        use resman::{FnRes, FnResMut, IntoFnRes, IntoFnResMut, Resources};
2678        use tokio::{
2679            sync::mpsc::{self, error::TryRecvError, Receiver},
2680            time::{self, Duration, Instant},
2681        };
2682
2683        use crate::{
2684            Edge, FnGraph, FnGraphBuilder, FnIdInner, StreamOpts, StreamOutcome, StreamOutcomeState,
2685        };
2686
2687        macro_rules! sleep_duration {
2688            () => {
2689                Duration::from_millis(50)
2690            };
2691        }
2692
2693        #[tokio::test]
2694        async fn stream_returns_when_graph_is_empty() {
2695            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
2696
2697            fn_graph
2698                .stream()
2699                .for_each(
2700                    #[cfg_attr(coverage_nightly, coverage(off))]
2701                    |_f| async {},
2702                )
2703                .await;
2704        }
2705
2706        #[tokio::test]
2707        async fn stream_returns_fns_in_dep_order_concurrently(
2708        ) -> Result<(), Box<dyn std::error::Error>> {
2709            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
2710
2711            let mut resources = Resources::new();
2712            resources.insert(0u8);
2713            resources.insert(0u16);
2714            let resources = &resources;
2715            test_timeout(
2716                Duration::from_millis(200),
2717                Duration::from_millis(265),
2718                fn_graph.stream().for_each_concurrent(None, |f| async move {
2719                    let _ = f.call(resources).await;
2720                }),
2721            )
2722            .await;
2723
2724            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
2725                .take(fn_graph.node_count())
2726                .collect::<Vec<&'static str>>()
2727                .await;
2728
2729            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
2730            Ok(())
2731        }
2732
2733        #[tokio::test]
2734        async fn stream_with_rev_returns_when_graph_is_empty() {
2735            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
2736
2737            fn_graph
2738                .stream_with(StreamOpts::new().rev())
2739                .for_each(
2740                    #[cfg_attr(coverage_nightly, coverage(off))]
2741                    |_f| async {},
2742                )
2743                .await;
2744        }
2745
2746        #[tokio::test]
2747        async fn stream_with_rev_returns_fns_in_dep_rev_order_concurrently(
2748        ) -> Result<(), Box<dyn std::error::Error>> {
2749            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
2750
2751            let mut resources = Resources::new();
2752            resources.insert(0u8);
2753            resources.insert(0u16);
2754            let resources = &resources;
2755            test_timeout(
2756                Duration::from_millis(200),
2757                Duration::from_millis(265),
2758                fn_graph
2759                    .stream_with(StreamOpts::new().rev())
2760                    .for_each_concurrent(None, |f| async move {
2761                        let _ = f.call(resources).await;
2762                    }),
2763            )
2764            .await;
2765
2766            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
2767                .take(fn_graph.node_count())
2768                .collect::<Vec<&'static str>>()
2769                .await;
2770
2771            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
2772            Ok(())
2773        }
2774
2775        #[cfg(feature = "interruptible")]
2776        #[tokio::test]
2777        async fn stream_with_interruptible_rev_returns_fns_in_dep_rev_order_concurrently(
2778        ) -> Result<(), Box<dyn std::error::Error>> {
2779            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
2780
2781            let mut resources = Resources::new();
2782            resources.insert(0u8);
2783            resources.insert(0u16);
2784            let resources = &resources;
2785            test_timeout(
2786                Duration::from_millis(200),
2787                Duration::from_millis(265),
2788                fn_graph
2789                    .stream_with_interruptible(StreamOpts::new().rev())
2790                    .for_each_concurrent(None, |f| async move {
2791                        use interruptible::PollOutcome;
2792
2793                        match f {
2794                            PollOutcome::Interrupted(None) => {}
2795                            PollOutcome::Interrupted(Some(f)) | PollOutcome::NoInterrupt(f) => {
2796                                let _ = f.call(resources).await;
2797                            }
2798                        }
2799                    }),
2800            )
2801            .await;
2802
2803            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
2804                .take(fn_graph.node_count())
2805                .collect::<Vec<&'static str>>()
2806                .await;
2807
2808            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
2809            Ok(())
2810        }
2811
2812        #[tokio::test]
2813        async fn fold_async_returns_when_graph_is_empty() {
2814            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
2815
2816            fn_graph
2817                .fold_async(
2818                    (),
2819                    #[cfg_attr(coverage_nightly, coverage(off))]
2820                    |(), _f| async {}.boxed_local(),
2821                )
2822                .await;
2823        }
2824
2825        #[tokio::test]
2826        async fn fold_async_runs_fns_in_dep_order() -> Result<(), Box<dyn std::error::Error>> {
2827            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
2828
2829            let mut resources = Resources::new();
2830            resources.insert(0u8);
2831            resources.insert(0u16);
2832            test_timeout(
2833                Duration::from_millis(200),
2834                Duration::from_millis(385), fn_graph.fold_async(resources, |resources, f| {
2836                    async move {
2837                        f.call(&resources).await;
2838                        resources
2839                    }
2840                    .boxed_local()
2841                }),
2842            )
2843            .await;
2844
2845            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
2846                .take(6)
2847                .collect::<Vec<&'static str>>()
2848                .await;
2849
2850            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
2851            Ok(())
2852        }
2853
2854        #[tokio::test]
2855        async fn fold_async_with_returns_when_graph_is_empty() {
2856            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
2857
2858            fn_graph
2859                .fold_async_with(
2860                    (),
2861                    StreamOpts::new().rev(),
2862                    #[cfg_attr(coverage_nightly, coverage(off))]
2863                    |(), _f| Box::pin(async {}),
2864                )
2865                .await;
2866        }
2867
2868        #[tokio::test]
2869        async fn fold_async_with_runs_fns_in_dep_rev_order(
2870        ) -> Result<(), Box<dyn std::error::Error>> {
2871            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
2872
2873            let mut resources = Resources::new();
2874            resources.insert(0u8);
2875            resources.insert(0u16);
2876            test_timeout(
2877                Duration::from_millis(200),
2878                Duration::from_millis(385), fn_graph.fold_async_with(resources, StreamOpts::new().rev(), |resources, f| {
2880                    async move {
2881                        f.call(&resources).await;
2882                        resources
2883                    }
2884                    .boxed_local()
2885                }),
2886            )
2887            .await;
2888
2889            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
2890                .take(6)
2891                .collect::<Vec<&'static str>>()
2892                .await;
2893
2894            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
2895            Ok(())
2896        }
2897
2898        #[tokio::test]
2899        async fn fold_async_mut_returns_when_graph_is_empty() {
2900            let mut fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
2901
2902            fn_graph
2903                .fold_async_mut(
2904                    (),
2905                    #[cfg_attr(coverage_nightly, coverage(off))]
2906                    |(), _f| async {}.boxed_local(),
2907                )
2908                .await;
2909        }
2910
2911        #[tokio::test]
2912        async fn fold_async_mut_runs_fns_in_dep_order() -> Result<(), Box<dyn std::error::Error>> {
2913            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
2914
2915            let mut resources = Resources::new();
2916            resources.insert(0u8);
2917            resources.insert(0u16);
2918            test_timeout(
2919                Duration::from_millis(200),
2920                Duration::from_millis(385), fn_graph.fold_async_mut(resources, |resources, mut f| {
2922                    async move {
2923                        f.call_mut(&resources).await;
2924                        resources
2925                    }
2926                    .boxed_local()
2927                }),
2928            )
2929            .await;
2930
2931            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
2932                .collect::<Vec<&'static str>>()
2933                .await;
2934
2935            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
2936            Ok(())
2937        }
2938
2939        #[tokio::test]
2940        async fn fold_async_mut_with_returns_when_graph_is_empty() {
2941            let mut fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
2942
2943            fn_graph
2944                .fold_async_mut_with(
2945                    (),
2946                    StreamOpts::new().rev(),
2947                    #[cfg_attr(coverage_nightly, coverage(off))]
2948                    |(), _f| async {}.boxed_local(),
2949                )
2950                .await;
2951        }
2952
2953        #[tokio::test]
2954        async fn fold_async_mut_with_runs_fns_in_dep_rev_order(
2955        ) -> Result<(), Box<dyn std::error::Error>> {
2956            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
2957
2958            let mut resources = Resources::new();
2959            resources.insert(0u8);
2960            resources.insert(0u16);
2961            test_timeout(
2962                Duration::from_millis(200),
2963                Duration::from_millis(385), fn_graph.fold_async_mut_with(
2965                    resources,
2966                    StreamOpts::new().rev(),
2967                    |resources, mut f| {
2968                        async move {
2969                            f.call_mut(&resources).await;
2970                            resources
2971                        }
2972                        .boxed_local()
2973                    },
2974                ),
2975            )
2976            .await;
2977
2978            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
2979                .collect::<Vec<&'static str>>()
2980                .await;
2981
2982            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
2983            Ok(())
2984        }
2985
2986        #[tokio::test]
2987        async fn for_each_concurrent_returns_when_graph_is_empty() {
2988            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
2989
2990            fn_graph
2991                .for_each_concurrent(
2992                    None,
2993                    #[cfg_attr(coverage_nightly, coverage(off))]
2994                    |_f| async {},
2995                )
2996                .await;
2997        }
2998
2999        #[tokio::test]
3000        async fn for_each_concurrent_runs_fns_concurrently(
3001        ) -> Result<(), Box<dyn std::error::Error>> {
3002            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3003
3004            let mut resources = Resources::new();
3005            resources.insert(0u8);
3006            resources.insert(0u16);
3007            let resources = &resources;
3008            test_timeout(
3009                Duration::from_millis(200),
3010                Duration::from_millis(265),
3011                fn_graph.for_each_concurrent(None, |f| {
3012                    let fut = f.call(resources);
3013                    async move {
3014                        let _ = fut.await;
3015                    }
3016                }),
3017            )
3018            .await;
3019
3020            seq_rx.close();
3021            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3022                .collect::<Vec<&'static str>>()
3023                .await;
3024
3025            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
3026            Ok(())
3027        }
3028
3029        #[tokio::test]
3030        async fn for_each_concurrent_with_returns_when_graph_is_empty() {
3031            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
3032
3033            fn_graph
3034                .for_each_concurrent_with(
3035                    None,
3036                    StreamOpts::new().rev(),
3037                    #[cfg_attr(coverage_nightly, coverage(off))]
3038                    |_f| async {},
3039                )
3040                .await;
3041        }
3042
3043        #[tokio::test]
3044        async fn for_each_concurrent_with_runs_fns_concurrently(
3045        ) -> Result<(), Box<dyn std::error::Error>> {
3046            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3047
3048            let mut resources = Resources::new();
3049            resources.insert(0u8);
3050            resources.insert(0u16);
3051            let resources = &resources;
3052            test_timeout(
3053                Duration::from_millis(200),
3054                Duration::from_millis(265),
3055                fn_graph.for_each_concurrent_with(None, StreamOpts::new().rev(), |f| {
3056                    let fut = f.call(resources);
3057                    async move {
3058                        let _ = fut.await;
3059                    }
3060                }),
3061            )
3062            .await;
3063
3064            seq_rx.close();
3065            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3066                .collect::<Vec<&'static str>>()
3067                .await;
3068
3069            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
3070            Ok(())
3071        }
3072
3073        #[tokio::test]
3074        #[cfg(feature = "interruptible")]
3075        async fn for_each_concurrent_with_interrupt_returns_fn_ids_not_processed(
3076        ) -> Result<(), Box<dyn std::error::Error>> {
3077            use interruptible::{InterruptSignal, Interruptibility};
3078
3079            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3080
3081            let mut resources = Resources::new();
3082            resources.insert(0u8);
3083            resources.insert(0u16);
3084            let resources = &resources;
3085
3086            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
3087            interrupt_tx
3088                .send(InterruptSignal)
3089                .await
3090                .expect("Expected `InterruptSignal` to be sent successfully.");
3091            let stream_outcome = test_timeout(
3092                Duration::from_millis(100),
3093                Duration::from_millis(155),
3094                fn_graph.for_each_concurrent_with(
3095                    None,
3096                    StreamOpts::new().interruptibility_state(
3097                        Interruptibility::poll_next_n(interrupt_rx.into(), 4).into(),
3098                    ),
3099                    |f| {
3100                        let fut = f.call(resources);
3101                        async move {
3102                            let _ = fut.await;
3103                        }
3104                    },
3105                ),
3106            )
3107            .await;
3108
3109            let fn_ids_processed = [5, 0, 2, 1] .into_iter()
3111                .map(NodeIndex::new)
3112                .collect::<Vec<NodeIndex<FnIdInner>>>();
3113
3114            let fn_ids_not_processed = [3, 4] .into_iter()
3117                .map(NodeIndex::new)
3118                .collect::<Vec<NodeIndex<FnIdInner>>>();
3119            assert_eq!(
3120                StreamOutcome {
3121                    value: (),
3122                    state: StreamOutcomeState::Interrupted,
3123                    fn_ids_processed,
3124                    fn_ids_not_processed,
3125                },
3126                stream_outcome
3127            );
3128
3129            seq_rx.close();
3130            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3131                .collect::<Vec<&'static str>>()
3132                .await;
3133
3134            assert_eq!(["f", "a", "c", "b"], fn_iter_order.as_slice());
3141
3142            Ok(())
3143        }
3144
3145        #[tokio::test]
3146        #[cfg(feature = "interruptible")]
3147        async fn for_each_concurrent_with_interrupt_finish_current_also_interrupts_preloaded_fns(
3148        ) -> Result<(), Box<dyn std::error::Error>> {
3149            use interruptible::{InterruptSignal, InterruptibilityState};
3150
3151            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3152
3153            let mut resources = Resources::new();
3154            resources.insert(0u8);
3155            resources.insert(0u16);
3156            let resources = &resources;
3157
3158            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
3159            interrupt_tx
3160                .send(InterruptSignal)
3161                .await
3162                .expect("Expected `InterruptSignal` to be sent successfully.");
3163            let stream_outcome = test_timeout(
3164                Duration::from_millis(0),
3165                Duration::from_millis(25),
3166                fn_graph.for_each_concurrent_with(
3167                    None,
3168                    StreamOpts::new().interruptibility_state(
3169                        InterruptibilityState::new_finish_current(interrupt_rx.into()),
3170                    ),
3171                    |f| {
3172                        let fut = f.call(resources);
3173                        async move {
3174                            let _ = fut.await;
3175                        }
3176                    },
3177                ),
3178            )
3179            .await;
3180
3181            let fn_ids_processed = []
3183                .into_iter()
3184                .map(NodeIndex::new)
3185                .collect::<Vec<NodeIndex<FnIdInner>>>();
3186
3187            let fn_ids_not_processed = [0, 1, 2, 3, 4, 5] .into_iter()
3190                .map(NodeIndex::new)
3191                .collect::<Vec<NodeIndex<FnIdInner>>>();
3192            assert_eq!(
3193                StreamOutcome {
3194                    value: (),
3195                    state: StreamOutcomeState::Interrupted,
3196                    fn_ids_processed,
3197                    fn_ids_not_processed,
3198                },
3199                stream_outcome
3200            );
3201
3202            seq_rx.close();
3203            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3204                .collect::<Vec<&'static str>>()
3205                .await;
3206
3207            assert_eq!(<[&str; 0]>::default(), fn_iter_order.as_slice());
3208
3209            Ok(())
3210        }
3211
3212        #[tokio::test]
3213        #[cfg(feature = "interruptible")]
3214        async fn for_each_concurrent_with_interrupt_finish_current_with_sleep(
3215        ) -> Result<(), Box<dyn std::error::Error>> {
3216            use interruptible::{InterruptSignal, InterruptibilityState};
3217
3218            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3219
3220            let mut resources = Resources::new();
3221            resources.insert(0u8);
3222            resources.insert(0u16);
3223            let resources = &resources;
3224
3225            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
3226
3227            let interrupt_after_delay = async move {
3228                tokio::time::sleep(sleep_duration!() / 2).await;
3229                interrupt_tx
3230                    .send(InterruptSignal)
3231                    .await
3232                    .expect("Expected `InterruptSignal` to be sent successfully.");
3233            };
3234            let stream_outcome_task = async move {
3235                test_timeout(
3236                    Duration::from_millis(50),
3237                    Duration::from_millis(130),
3238                    fn_graph.for_each_concurrent_with(
3239                        None,
3240                        StreamOpts::new().interruptibility_state(
3241                            InterruptibilityState::new_finish_current(interrupt_rx.into()),
3242                        ),
3243                        |f| {
3244                            let fut = f.call(resources);
3245                            async move {
3246                                let _ = fut.await;
3247                            }
3248                        },
3249                    ),
3250                )
3251                .await
3252            };
3253            let (stream_outcome, ()) = tokio::join!(stream_outcome_task, interrupt_after_delay);
3254
3255            let fn_ids_processed = [5, 0, 2] .into_iter()
3258                .map(NodeIndex::new)
3259                .collect::<Vec<NodeIndex<FnIdInner>>>();
3260
3261            let fn_ids_not_processed = [1, 3, 4] .into_iter()
3264                .map(NodeIndex::new)
3265                .collect::<Vec<NodeIndex<FnIdInner>>>();
3266            assert_eq!(
3267                StreamOutcome {
3268                    value: (),
3269                    state: StreamOutcomeState::Interrupted,
3270                    fn_ids_processed,
3271                    fn_ids_not_processed,
3272                },
3273                stream_outcome
3274            );
3275
3276            seq_rx.close();
3277            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3278                .collect::<Vec<&'static str>>()
3279                .await;
3280
3281            assert_eq!(["f", "a", "c"], fn_iter_order.as_slice());
3282
3283            Ok(())
3284        }
3285
3286        #[tokio::test]
3287        #[cfg(feature = "interruptible")]
3288        async fn for_each_concurrent_with_interrupt_finish_current_with_sleep_include_interrupted_false(
3289        ) -> Result<(), Box<dyn std::error::Error>> {
3290            use interruptible::{InterruptSignal, InterruptibilityState};
3291
3292            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3293
3294            let mut resources = Resources::new();
3295            resources.insert(0u8);
3296            resources.insert(0u16);
3297            let resources = &resources;
3298
3299            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
3300
3301            let interrupt_after_delay = async move {
3302                tokio::time::sleep(sleep_duration!() / 2).await;
3303                interrupt_tx
3304                    .send(InterruptSignal)
3305                    .await
3306                    .expect("Expected `InterruptSignal` to be sent successfully.");
3307            };
3308            let stream_outcome_task = async move {
3309                test_timeout(
3310                    Duration::from_millis(50),
3311                    Duration::from_millis(75),
3312                    fn_graph.for_each_concurrent_with(
3313                        None,
3314                        StreamOpts::new()
3315                            .interruptibility_state(InterruptibilityState::new_finish_current(
3316                                interrupt_rx.into(),
3317                            ))
3318                            .interrupted_next_item_include(false),
3319                        |f| {
3320                            let fut = f.call(resources);
3321                            async move {
3322                                let _ = fut.await;
3323                            }
3324                        },
3325                    ),
3326                )
3327                .await
3328            };
3329            let (stream_outcome, ()) = tokio::join!(stream_outcome_task, interrupt_after_delay);
3330
3331            let fn_ids_processed = [5, 0] .into_iter()
3334                .map(NodeIndex::new)
3335                .collect::<Vec<NodeIndex<FnIdInner>>>();
3336
3337            let fn_ids_not_processed = [1, 2, 3, 4] .into_iter()
3340                .map(NodeIndex::new)
3341                .collect::<Vec<NodeIndex<FnIdInner>>>();
3342            assert_eq!(
3343                StreamOutcome {
3344                    value: (),
3345                    state: StreamOutcomeState::Interrupted,
3346                    fn_ids_processed,
3347                    fn_ids_not_processed,
3348                },
3349                stream_outcome
3350            );
3351
3352            seq_rx.close();
3353            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3354                .collect::<Vec<&'static str>>()
3355                .await;
3356
3357            assert_eq!(["f", "a"], fn_iter_order.as_slice());
3358
3359            Ok(())
3360        }
3361
3362        #[tokio::test]
3363        #[cfg(feature = "interruptible")]
3364        async fn for_each_concurrent_with_interrupt_poll_next_n_also_interrupts_preloaded_fns(
3365        ) -> Result<(), Box<dyn std::error::Error>> {
3366            use interruptible::{InterruptSignal, InterruptibilityState};
3367
3368            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3369
3370            let mut resources = Resources::new();
3371            resources.insert(0u8);
3372            resources.insert(0u16);
3373            let resources = &resources;
3374
3375            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
3376            interrupt_tx
3377                .send(InterruptSignal)
3378                .await
3379                .expect("Expected `InterruptSignal` to be sent successfully.");
3380            let stream_outcome = test_timeout(
3381                Duration::from_millis(50),
3382                Duration::from_millis(75),
3383                fn_graph.for_each_concurrent_with(
3384                    None,
3385                    StreamOpts::new().interruptibility_state(
3386                        InterruptibilityState::new_poll_next_n(interrupt_rx.into(), 2),
3387                    ),
3388                    |f| {
3389                        let fut = f.call(resources);
3390                        async move {
3391                            let _ = fut.await;
3392                        }
3393                    },
3394                ),
3395            )
3396            .await;
3397
3398            let fn_ids_processed = [5, 0] .into_iter()
3401                .map(NodeIndex::new)
3402                .collect::<Vec<NodeIndex<FnIdInner>>>();
3403
3404            let fn_ids_not_processed = [1, 2, 3, 4] .into_iter()
3407                .map(NodeIndex::new)
3408                .collect::<Vec<NodeIndex<FnIdInner>>>();
3409            assert_eq!(
3410                StreamOutcome {
3411                    value: (),
3412                    state: StreamOutcomeState::Interrupted,
3413                    fn_ids_processed,
3414                    fn_ids_not_processed,
3415                },
3416                stream_outcome
3417            );
3418
3419            seq_rx.close();
3420            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3421                .collect::<Vec<&'static str>>()
3422                .await;
3423
3424            assert_eq!(["f", "a"], fn_iter_order.as_slice());
3425
3426            Ok(())
3427        }
3428
3429        #[tokio::test]
3430        async fn try_for_each_concurrent_returns_when_graph_is_empty() -> Result<(), ()> {
3431            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
3432
3433            let stream_outcome = fn_graph
3434                .try_for_each_concurrent(
3435                    None,
3436                    #[cfg_attr(coverage_nightly, coverage(off))]
3437                    |_f| async { Ok::<_, ()>(()) },
3438                )
3439                .await
3440                .map_err(|_| ())?;
3441
3442            assert_eq!(
3443                StreamOutcome {
3444                    value: (),
3445                    state: StreamOutcomeState::Finished,
3446                    fn_ids_processed: Vec::new(),
3447                    fn_ids_not_processed: Vec::new(),
3448                },
3449                stream_outcome
3450            );
3451
3452            Ok(())
3453        }
3454
3455        #[tokio::test]
3456        async fn try_fold_async_returns_when_graph_is_empty() -> Result<(), TestError> {
3457            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
3458
3459            fn_graph
3460                .try_fold_async(
3461                    (),
3462                    #[cfg_attr(coverage_nightly, coverage(off))]
3463                    |(), _f| async { Result::<_, TestError>::Ok(()) }.boxed_local(),
3464                )
3465                .await?;
3466
3467            Ok(())
3468        }
3469
3470        #[tokio::test]
3471        async fn try_fold_async_runs_fns_in_dep_order() -> Result<(), Box<dyn std::error::Error>> {
3472            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3473
3474            let mut resources = Resources::new();
3475            resources.insert(0u8);
3476            resources.insert(0u16);
3477            test_timeout(
3478                Duration::from_millis(200),
3479                Duration::from_millis(385), fn_graph.try_fold_async(resources, |resources, f| {
3481                    async move {
3482                        f.call(&resources).await;
3483                        Result::<_, TestError>::Ok(resources)
3484                    }
3485                    .boxed_local()
3486                }),
3487            )
3488            .await?;
3489
3490            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3491                .take(6)
3492                .collect::<Vec<&'static str>>()
3493                .await;
3494
3495            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
3496            Ok(())
3497        }
3498
3499        #[tokio::test]
3500        async fn try_fold_async_with_returns_when_graph_is_empty() -> Result<(), TestError> {
3501            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
3502
3503            fn_graph
3504                .try_fold_async_with(
3505                    (),
3506                    StreamOpts::new().rev(),
3507                    #[cfg_attr(coverage_nightly, coverage(off))]
3508                    |(), _f| async { Result::<_, TestError>::Ok(()) }.boxed_local(),
3509                )
3510                .await?;
3511
3512            Ok(())
3513        }
3514
3515        #[tokio::test]
3516        async fn try_fold_async_with_runs_fns_in_dep_rev_order(
3517        ) -> Result<(), Box<dyn std::error::Error>> {
3518            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3519
3520            let mut resources = Resources::new();
3521            resources.insert(0u8);
3522            resources.insert(0u16);
3523            test_timeout(
3524                Duration::from_millis(200),
3525                Duration::from_millis(385), fn_graph.try_fold_async_with(resources, StreamOpts::new().rev(), |resources, f| {
3527                    async move {
3528                        f.call(&resources).await;
3529                        Result::<_, TestError>::Ok(resources)
3530                    }
3531                    .boxed_local()
3532                }),
3533            )
3534            .await?;
3535
3536            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3537                .take(6)
3538                .collect::<Vec<&'static str>>()
3539                .await;
3540
3541            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
3542            Ok(())
3543        }
3544
3545        #[tokio::test]
3546        async fn try_fold_async_mut_returns_when_graph_is_empty() -> Result<(), TestError> {
3547            let mut fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
3548
3549            fn_graph
3550                .try_fold_async_mut(
3551                    (),
3552                    #[cfg_attr(coverage_nightly, coverage(off))]
3553                    |(), _f| async { Result::<_, TestError>::Ok(()) }.boxed_local(),
3554                )
3555                .await?;
3556
3557            Ok(())
3558        }
3559
3560        #[tokio::test]
3561        async fn try_fold_async_mut_runs_fns_in_dep_order() -> Result<(), Box<dyn std::error::Error>>
3562        {
3563            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
3564
3565            let mut resources = Resources::new();
3566            resources.insert(0u8);
3567            resources.insert(0u16);
3568            test_timeout(
3569                Duration::from_millis(200),
3570                Duration::from_millis(385), fn_graph.try_fold_async_mut(resources, |resources, mut f| {
3572                    async move {
3573                        f.call_mut(&resources).await;
3574                        Result::<_, TestError>::Ok(resources)
3575                    }
3576                    .boxed_local()
3577                }),
3578            )
3579            .await?;
3580
3581            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3582                .collect::<Vec<&'static str>>()
3583                .await;
3584
3585            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
3586            Ok(())
3587        }
3588
3589        #[tokio::test]
3590        async fn try_fold_async_mut_with_returns_when_graph_is_empty() -> Result<(), TestError> {
3591            let mut fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
3592
3593            fn_graph
3594                .try_fold_async_mut_with(
3595                    (),
3596                    StreamOpts::new().rev(),
3597                    #[cfg_attr(coverage_nightly, coverage(off))]
3598                    |(), _f| async { Result::<_, TestError>::Ok(()) }.boxed_local(),
3599                )
3600                .await?;
3601
3602            Ok(())
3603        }
3604
3605        #[tokio::test]
3606        async fn try_fold_async_mut_with_runs_fns_in_dep_rev_order(
3607        ) -> Result<(), Box<dyn std::error::Error>> {
3608            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
3609
3610            let mut resources = Resources::new();
3611            resources.insert(0u8);
3612            resources.insert(0u16);
3613            test_timeout(
3614                Duration::from_millis(200),
3615                Duration::from_millis(385), fn_graph.try_fold_async_mut_with(
3617                    resources,
3618                    StreamOpts::new().rev(),
3619                    |resources, mut f| {
3620                        async move {
3621                            f.call_mut(&resources).await;
3622                            Result::<_, TestError>::Ok(resources)
3623                        }
3624                        .boxed_local()
3625                    },
3626                ),
3627            )
3628            .await?;
3629
3630            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3631                .collect::<Vec<&'static str>>()
3632                .await;
3633
3634            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
3635            Ok(())
3636        }
3637
3638        #[tokio::test]
3639        async fn try_for_each_concurrent_runs_fns_concurrently(
3640        ) -> Result<(), Box<dyn std::error::Error>> {
3641            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3642
3643            let mut resources = Resources::new();
3644            resources.insert(0u8);
3645            resources.insert(0u16);
3646            let resources = &resources;
3647
3648            test_timeout(
3649                Duration::from_millis(200),
3650                Duration::from_millis(265),
3651                fn_graph.try_for_each_concurrent(None, |f| {
3652                    let fut = f.call(resources);
3653                    async move {
3654                        let _ = fut.await;
3655                        Result::<_, TestError>::Ok(())
3656                    }
3657                }),
3658            )
3659            .await
3660            .unwrap();
3661
3662            seq_rx.close();
3663            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3664                .collect::<Vec<&'static str>>()
3665                .await;
3666
3667            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
3668
3669            Ok(())
3670        }
3671
3672        #[tokio::test]
3673        async fn try_for_each_concurrent_gracefully_ends_when_one_function_returns_failure(
3674        ) -> Result<(), Box<dyn std::error::Error>> {
3675            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3676
3677            let mut resources = Resources::new();
3678            resources.insert(0u8);
3679            resources.insert(0u16);
3680            let resources = &resources;
3681
3682            let result = test_timeout(
3683                Duration::from_millis(50),
3684                Duration::from_millis(70),
3685                fn_graph.try_for_each_concurrent(None, |f| {
3686                    let fut = f.call(resources);
3687                    async move {
3688                        match fut.await {
3689                            "a" => Err(TestError("a")),
3690                            _ => Ok(()),
3691                        }
3692                    }
3693                }),
3694            )
3695            .await;
3696
3697            assert_eq!([TestError("a")], result.unwrap_err().1.as_slice());
3698            assert_eq!("f", seq_rx.try_recv().unwrap());
3699            assert_eq!("a", seq_rx.try_recv().unwrap()); assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
3701
3702            Ok(())
3703        }
3704
3705        #[tokio::test]
3706        async fn try_for_each_concurrent_gracefully_ends_when_one_function_returns_failure_variation(
3707        ) -> Result<(), Box<dyn std::error::Error>> {
3708            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3709
3710            let mut resources = Resources::new();
3711            resources.insert(0u8);
3712            resources.insert(0u16);
3713            let resources = &resources;
3714
3715            let result = test_timeout(
3716                Duration::from_millis(100),
3717                Duration::from_millis(130),
3718                fn_graph.try_for_each_concurrent(None, |f| {
3719                    let fut = f.call(resources);
3720                    async move {
3721                        match fut.await {
3722                            "c" => Err(TestError("c")),
3723                            _ => Ok(()),
3724                        }
3725                    }
3726                }),
3727            )
3728            .await;
3729
3730            assert_eq!([TestError("c")], result.unwrap_err().1.as_slice());
3731            assert_eq!("f", seq_rx.try_recv().unwrap());
3732            assert_eq!("a", seq_rx.try_recv().unwrap());
3733            assert_eq!("c", seq_rx.try_recv().unwrap());
3734            assert_eq!("b", seq_rx.try_recv().unwrap());
3735            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
3736
3737            Ok(())
3738        }
3739
3740        #[tokio::test]
3741        async fn try_for_each_concurrent_gracefully_ends_when_multiple_functions_return_failure(
3742        ) -> Result<(), Box<dyn std::error::Error>> {
3743            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3744
3745            let mut resources = Resources::new();
3746            resources.insert(0u8);
3747            resources.insert(0u16);
3748            let resources = &resources;
3749
3750            let result = test_timeout(
3751                Duration::from_millis(100),
3752                Duration::from_millis(130),
3753                fn_graph.try_for_each_concurrent(None, |f| {
3754                    let fut = f.call(resources);
3755                    async move {
3756                        match fut.await {
3757                            "b" => Err(TestError("b")),
3758                            "c" => Err(TestError("c")),
3759                            _ => Ok(()),
3760                        }
3761                    }
3762                }),
3763            )
3764            .await;
3765
3766            assert_eq!(
3769                [TestError("c"), TestError("b")],
3770                result.unwrap_err().1.as_slice()
3771            );
3772            assert_eq!("f", seq_rx.try_recv().unwrap());
3773            assert_eq!("a", seq_rx.try_recv().unwrap());
3774            assert_eq!("c", seq_rx.try_recv().unwrap());
3775            assert_eq!("b", seq_rx.try_recv().unwrap());
3776            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
3777
3778            Ok(())
3779        }
3780
3781        #[tokio::test]
3782        async fn try_for_each_concurrent_with_returns_when_graph_is_empty() -> Result<(), ()> {
3783            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
3784
3785            let stream_outcome = fn_graph
3786                .try_for_each_concurrent_with(
3787                    None,
3788                    StreamOpts::new().rev(),
3789                    #[cfg_attr(coverage_nightly, coverage(off))]
3790                    |_f| async { Ok::<_, ()>(()) },
3791                )
3792                .await
3793                .map_err(|_| ())?;
3794
3795            assert_eq!(
3796                StreamOutcome {
3797                    value: (),
3798                    state: StreamOutcomeState::Finished,
3799                    fn_ids_processed: Vec::new(),
3800                    fn_ids_not_processed: Vec::new(),
3801                },
3802                stream_outcome
3803            );
3804
3805            Ok(())
3806        }
3807
3808        #[tokio::test]
3809        async fn try_for_each_concurrent_with_runs_fns_concurrently(
3810        ) -> Result<(), Box<dyn std::error::Error>> {
3811            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3812
3813            let mut resources = Resources::new();
3814            resources.insert(0u8);
3815            resources.insert(0u16);
3816            let resources = &resources;
3817
3818            let stream_outcome = test_timeout(
3819                Duration::from_millis(200),
3820                Duration::from_millis(265),
3821                fn_graph.try_for_each_concurrent_with(None, StreamOpts::new().rev(), |f| {
3822                    let fut = f.call(resources);
3823                    async move {
3824                        let _ = fut.await;
3825                        Result::<_, TestError>::Ok(())
3826                    }
3827                }),
3828            )
3829            .await
3830            .unwrap();
3831
3832            let fn_ids_processed = [4, 3, 1, 2, 5, 0]
3833                .into_iter()
3834                .map(NodeIndex::new)
3835                .collect::<Vec<NodeIndex<FnIdInner>>>();
3836            assert_eq!(
3837                StreamOutcome {
3838                    value: (),
3839                    state: StreamOutcomeState::Finished,
3840                    fn_ids_processed,
3841                    fn_ids_not_processed: Vec::new(),
3842                },
3843                stream_outcome
3844            );
3845
3846            seq_rx.close();
3847            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3848                .collect::<Vec<&'static str>>()
3849                .await;
3850
3851            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
3852
3853            Ok(())
3854        }
3855
3856        #[tokio::test]
3857        #[cfg(feature = "interruptible")]
3858        async fn try_for_each_concurrent_with_interrupt_returns_fn_ids_not_processed(
3859        ) -> Result<(), Box<dyn std::error::Error>> {
3860            use interruptible::{InterruptSignal, Interruptibility};
3861
3862            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3863
3864            let mut resources = Resources::new();
3865            resources.insert(0u8);
3866            resources.insert(0u16);
3867            let resources = &resources;
3868
3869            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
3870            interrupt_tx
3871                .send(InterruptSignal)
3872                .await
3873                .expect("Expected `InterruptSignal` to be sent successfully.");
3874            let stream_outcome = test_timeout(
3875                Duration::from_millis(100),
3876                Duration::from_millis(155),
3877                fn_graph.try_for_each_concurrent_with(
3878                    None,
3879                    StreamOpts::new().interruptibility_state(
3880                        Interruptibility::poll_next_n(interrupt_rx.into(), 4).into(),
3881                    ),
3882                    |f| {
3883                        let fut = f.call(resources);
3884                        async move {
3885                            let _ = fut.await;
3886                            Result::<_, TestError>::Ok(())
3887                        }
3888                    },
3889                ),
3890            )
3891            .await
3892            .unwrap();
3893
3894            let fn_ids_processed = [5, 0, 2, 1] .into_iter()
3896                .map(NodeIndex::new)
3897                .collect::<Vec<NodeIndex<FnIdInner>>>();
3898
3899            let fn_ids_not_processed = [3, 4] .into_iter()
3902                .map(NodeIndex::new)
3903                .collect::<Vec<NodeIndex<FnIdInner>>>();
3904            assert_eq!(
3905                StreamOutcome {
3906                    value: (),
3907                    state: StreamOutcomeState::Interrupted,
3908                    fn_ids_processed,
3909                    fn_ids_not_processed,
3910                },
3911                stream_outcome
3912            );
3913
3914            seq_rx.close();
3915            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3916                .collect::<Vec<&'static str>>()
3917                .await;
3918
3919            assert_eq!(["f", "a", "c", "b"], fn_iter_order.as_slice());
3926
3927            Ok(())
3928        }
3929
3930        #[tokio::test]
3931        #[cfg(feature = "interruptible")]
3932        async fn try_for_each_concurrent_with_interrupt_finish_current_also_interrupts_preloaded_fns(
3933        ) -> Result<(), Box<dyn std::error::Error>> {
3934            use interruptible::{InterruptSignal, InterruptibilityState};
3935
3936            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
3937
3938            let mut resources = Resources::new();
3939            resources.insert(0u8);
3940            resources.insert(0u16);
3941            let resources = &resources;
3942
3943            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
3944            interrupt_tx
3945                .send(InterruptSignal)
3946                .await
3947                .expect("Expected `InterruptSignal` to be sent successfully.");
3948            let stream_outcome = test_timeout(
3949                Duration::from_millis(0),
3950                Duration::from_millis(25),
3951                fn_graph.try_for_each_concurrent_with(
3952                    None,
3953                    StreamOpts::new().interruptibility_state(
3954                        InterruptibilityState::new_finish_current(interrupt_rx.into()),
3955                    ),
3956                    |f| {
3957                        let fut = f.call(resources);
3958                        async move {
3959                            let _ = fut.await;
3960                            Result::<_, TestError>::Ok(())
3961                        }
3962                    },
3963                ),
3964            )
3965            .await
3966            .unwrap();
3967
3968            let fn_ids_processed = []
3970                .into_iter()
3971                .map(NodeIndex::new)
3972                .collect::<Vec<NodeIndex<FnIdInner>>>();
3973
3974            let fn_ids_not_processed = [0, 1, 2, 3, 4, 5] .into_iter()
3977                .map(NodeIndex::new)
3978                .collect::<Vec<NodeIndex<FnIdInner>>>();
3979            assert_eq!(
3980                StreamOutcome {
3981                    value: (),
3982                    state: StreamOutcomeState::Interrupted,
3983                    fn_ids_processed,
3984                    fn_ids_not_processed,
3985                },
3986                stream_outcome
3987            );
3988
3989            seq_rx.close();
3990            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
3991                .collect::<Vec<&'static str>>()
3992                .await;
3993
3994            assert_eq!(<[&str; 0]>::default(), fn_iter_order.as_slice());
3995
3996            Ok(())
3997        }
3998
3999        #[tokio::test]
4000        #[cfg(feature = "interruptible")]
4001        async fn try_for_each_concurrent_with_interrupt_finish_current_with_sleep(
4002        ) -> Result<(), Box<dyn std::error::Error>> {
4003            use interruptible::{InterruptSignal, InterruptibilityState};
4004
4005            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4006
4007            let mut resources = Resources::new();
4008            resources.insert(0u8);
4009            resources.insert(0u16);
4010            let resources = &resources;
4011
4012            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
4013
4014            let interrupt_after_delay = async move {
4015                tokio::time::sleep(sleep_duration!() / 2).await;
4016                interrupt_tx
4017                    .send(InterruptSignal)
4018                    .await
4019                    .expect("Expected `InterruptSignal` to be sent successfully.");
4020            };
4021            let stream_outcome_task = async move {
4022                test_timeout(
4023                    Duration::from_millis(50),
4024                    Duration::from_millis(130),
4025                    fn_graph.try_for_each_concurrent_with(
4026                        None,
4027                        StreamOpts::new().interruptibility_state(
4028                            InterruptibilityState::new_finish_current(interrupt_rx.into()),
4029                        ),
4030                        |f| {
4031                            let fut = f.call(resources);
4032                            async move {
4033                                let _ = fut.await;
4034                                Result::<_, TestError>::Ok(())
4035                            }
4036                        },
4037                    ),
4038                )
4039                .await
4040                .unwrap()
4041            };
4042            let (stream_outcome, ()) = tokio::join!(stream_outcome_task, interrupt_after_delay);
4043
4044            let fn_ids_processed = [5, 0, 2] .into_iter()
4047                .map(NodeIndex::new)
4048                .collect::<Vec<NodeIndex<FnIdInner>>>();
4049
4050            let fn_ids_not_processed = [1, 3, 4] .into_iter()
4053                .map(NodeIndex::new)
4054                .collect::<Vec<NodeIndex<FnIdInner>>>();
4055            assert_eq!(
4056                StreamOutcome {
4057                    value: (),
4058                    state: StreamOutcomeState::Interrupted,
4059                    fn_ids_processed,
4060                    fn_ids_not_processed,
4061                },
4062                stream_outcome
4063            );
4064
4065            seq_rx.close();
4066            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4067                .collect::<Vec<&'static str>>()
4068                .await;
4069
4070            assert_eq!(["f", "a", "c"], fn_iter_order.as_slice());
4071
4072            Ok(())
4073        }
4074
4075        #[tokio::test]
4076        #[cfg(feature = "interruptible")]
4077        async fn try_for_each_concurrent_with_interrupt_finish_current_with_sleep_include_interrupted_false(
4078        ) -> Result<(), Box<dyn std::error::Error>> {
4079            use interruptible::{InterruptSignal, InterruptibilityState};
4080
4081            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4082
4083            let mut resources = Resources::new();
4084            resources.insert(0u8);
4085            resources.insert(0u16);
4086            let resources = &resources;
4087
4088            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
4089
4090            let interrupt_after_delay = async move {
4091                tokio::time::sleep(sleep_duration!() / 2).await;
4092                interrupt_tx
4093                    .send(InterruptSignal)
4094                    .await
4095                    .expect("Expected `InterruptSignal` to be sent successfully.");
4096            };
4097            let stream_outcome_task = async move {
4098                test_timeout(
4099                    Duration::from_millis(50),
4100                    Duration::from_millis(75),
4101                    fn_graph.try_for_each_concurrent_with(
4102                        None,
4103                        StreamOpts::new()
4104                            .interruptibility_state(InterruptibilityState::new_finish_current(
4105                                interrupt_rx.into(),
4106                            ))
4107                            .interrupted_next_item_include(false),
4108                        |f| {
4109                            let fut = f.call(resources);
4110                            async move {
4111                                let _ = fut.await;
4112                                Result::<_, TestError>::Ok(())
4113                            }
4114                        },
4115                    ),
4116                )
4117                .await
4118                .unwrap()
4119            };
4120            let (stream_outcome, ()) = tokio::join!(stream_outcome_task, interrupt_after_delay);
4121
4122            let fn_ids_processed = [5, 0] .into_iter()
4125                .map(NodeIndex::new)
4126                .collect::<Vec<NodeIndex<FnIdInner>>>();
4127
4128            let fn_ids_not_processed = [1, 2, 3, 4] .into_iter()
4131                .map(NodeIndex::new)
4132                .collect::<Vec<NodeIndex<FnIdInner>>>();
4133            assert_eq!(
4134                StreamOutcome {
4135                    value: (),
4136                    state: StreamOutcomeState::Interrupted,
4137                    fn_ids_processed,
4138                    fn_ids_not_processed,
4139                },
4140                stream_outcome
4141            );
4142
4143            seq_rx.close();
4144            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4145                .collect::<Vec<&'static str>>()
4146                .await;
4147
4148            assert_eq!(["f", "a"], fn_iter_order.as_slice());
4149
4150            Ok(())
4151        }
4152
4153        #[tokio::test]
4154        #[cfg(feature = "interruptible")]
4155        async fn try_for_each_concurrent_with_interrupt_poll_next_n_also_interrupts_preloaded_fns(
4156        ) -> Result<(), Box<dyn std::error::Error>> {
4157            use interruptible::{InterruptSignal, InterruptibilityState};
4158
4159            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4160
4161            let mut resources = Resources::new();
4162            resources.insert(0u8);
4163            resources.insert(0u16);
4164            let resources = &resources;
4165
4166            let (interrupt_tx, interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
4167            interrupt_tx
4168                .send(InterruptSignal)
4169                .await
4170                .expect("Expected `InterruptSignal` to be sent successfully.");
4171            let stream_outcome = test_timeout(
4172                Duration::from_millis(50),
4173                Duration::from_millis(75),
4174                fn_graph.try_for_each_concurrent_with(
4175                    None,
4176                    StreamOpts::new().interruptibility_state(
4177                        InterruptibilityState::new_poll_next_n(interrupt_rx.into(), 2),
4178                    ),
4179                    |f| {
4180                        let fut = f.call(resources);
4181                        async move {
4182                            let _ = fut.await;
4183                            Result::<_, TestError>::Ok(())
4184                        }
4185                    },
4186                ),
4187            )
4188            .await
4189            .unwrap();
4190
4191            let fn_ids_processed = [5, 0] .into_iter()
4194                .map(NodeIndex::new)
4195                .collect::<Vec<NodeIndex<FnIdInner>>>();
4196
4197            let fn_ids_not_processed = [1, 2, 3, 4] .into_iter()
4200                .map(NodeIndex::new)
4201                .collect::<Vec<NodeIndex<FnIdInner>>>();
4202            assert_eq!(
4203                StreamOutcome {
4204                    value: (),
4205                    state: StreamOutcomeState::Interrupted,
4206                    fn_ids_processed,
4207                    fn_ids_not_processed,
4208                },
4209                stream_outcome
4210            );
4211
4212            seq_rx.close();
4213            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4214                .collect::<Vec<&'static str>>()
4215                .await;
4216
4217            assert_eq!(["f", "a"], fn_iter_order.as_slice());
4218
4219            Ok(())
4220        }
4221
4222        #[tokio::test]
4223        async fn try_for_each_concurrent_with_gracefully_ends_when_one_function_returns_failure(
4224        ) -> Result<(), Box<dyn std::error::Error>> {
4225            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4226
4227            let mut resources = Resources::new();
4228            resources.insert(0u8);
4229            resources.insert(0u16);
4230            let resources = &resources;
4231
4232            let result = test_timeout(
4233                Duration::from_millis(50),
4234                Duration::from_millis(70),
4235                fn_graph.try_for_each_concurrent_with(None, StreamOpts::new().rev(), |f| {
4236                    let fut = f.call(resources);
4237                    async move {
4238                        match fut.await {
4239                            "e" => Err(TestError("e")),
4240                            _ => Ok(()),
4241                        }
4242                    }
4243                }),
4244            )
4245            .await;
4246
4247            assert_eq!([TestError("e")], result.unwrap_err().1.as_slice());
4248            assert_eq!("e", seq_rx.try_recv().unwrap()); assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4250
4251            Ok(())
4252        }
4253
4254        #[tokio::test]
4255        async fn try_for_each_concurrent_with_gracefully_ends_when_one_function_returns_failure_variation(
4256        ) -> Result<(), Box<dyn std::error::Error>> {
4257            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4258
4259            let mut resources = Resources::new();
4260            resources.insert(0u8);
4261            resources.insert(0u16);
4262            let resources = &resources;
4263
4264            let result = test_timeout(
4265                Duration::from_millis(150),
4266                Duration::from_millis(197),
4267                fn_graph.try_for_each_concurrent_with(None, StreamOpts::new().rev(), |f| {
4268                    let fut = f.call(resources);
4269                    async move {
4270                        match fut.await {
4271                            "b" => Err(TestError("b")),
4272                            _ => Ok(()),
4273                        }
4274                    }
4275                }),
4276            )
4277            .await;
4278
4279            assert_eq!([TestError("b")], result.unwrap_err().1.as_slice());
4280            assert_eq!("e", seq_rx.try_recv().unwrap());
4281            assert_eq!("d", seq_rx.try_recv().unwrap());
4282            assert_eq!("b", seq_rx.try_recv().unwrap());
4283            assert_eq!("c", seq_rx.try_recv().unwrap());
4284            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4285
4286            Ok(())
4287        }
4288
4289        #[tokio::test]
4290        async fn try_for_each_concurrent_with_gracefully_ends_when_multiple_functions_return_failure(
4291        ) -> Result<(), Box<dyn std::error::Error>> {
4292            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4293
4294            let mut resources = Resources::new();
4295            resources.insert(0u8);
4296            resources.insert(0u16);
4297            let resources = &resources;
4298
4299            let result = test_timeout(
4300                Duration::from_millis(150),
4301                Duration::from_millis(197),
4302                fn_graph.try_for_each_concurrent_with(None, StreamOpts::new().rev(), |f| {
4303                    let fut = f.call(resources);
4304                    async move {
4305                        match fut.await {
4306                            "b" => Err(TestError("b")),
4307                            "c" => Err(TestError("c")),
4308                            _ => Ok(()),
4309                        }
4310                    }
4311                }),
4312            )
4313            .await;
4314
4315            assert_eq!(
4316                [TestError("b"), TestError("c")],
4317                result.unwrap_err().1.as_slice()
4318            );
4319            assert_eq!("e", seq_rx.try_recv().unwrap());
4320            assert_eq!("d", seq_rx.try_recv().unwrap());
4321            assert_eq!("b", seq_rx.try_recv().unwrap());
4322            assert_eq!("c", seq_rx.try_recv().unwrap());
4323            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4324
4325            Ok(())
4326        }
4327
4328        #[tokio::test]
4329        async fn for_each_concurrent_mut_runs_fns_concurrently(
4330        ) -> Result<(), Box<dyn std::error::Error>> {
4331            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4332
4333            let mut resources = Resources::new();
4334            resources.insert(0u8);
4335            resources.insert(0u16);
4336            let resources = &resources;
4337            test_timeout(
4338                Duration::from_millis(200),
4339                Duration::from_millis(265),
4340                fn_graph.for_each_concurrent_mut(None, |f| {
4341                    let fut = f.call_mut(resources);
4342                    async move {
4343                        let _ = fut.await;
4344                    }
4345                }),
4346            )
4347            .await;
4348
4349            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4350                .collect::<Vec<&'static str>>()
4351                .await;
4352
4353            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
4354            Ok(())
4355        }
4356
4357        #[tokio::test]
4358        async fn for_each_concurrent_mut_with_runs_fns_concurrently(
4359        ) -> Result<(), Box<dyn std::error::Error>> {
4360            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4361
4362            let mut resources = Resources::new();
4363            resources.insert(0u8);
4364            resources.insert(0u16);
4365            let resources = &resources;
4366            test_timeout(
4367                Duration::from_millis(200),
4368                Duration::from_millis(265),
4369                fn_graph.for_each_concurrent_mut_with(None, StreamOpts::new().rev(), |f| {
4370                    let fut = f.call_mut(resources);
4371                    async move {
4372                        let _ = fut.await;
4373                    }
4374                }),
4375            )
4376            .await;
4377
4378            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4379                .collect::<Vec<&'static str>>()
4380                .await;
4381
4382            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
4383            Ok(())
4384        }
4385
4386        #[tokio::test]
4387        async fn try_for_each_concurrent_mut_runs_fns_concurrently_mut(
4388        ) -> Result<(), Box<dyn std::error::Error>> {
4389            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4390
4391            let mut resources = Resources::new();
4392            resources.insert(0u8);
4393            resources.insert(0u16);
4394            let resources = &resources;
4395
4396            test_timeout(
4397                Duration::from_millis(200),
4398                Duration::from_millis(265),
4399                fn_graph.try_for_each_concurrent_mut(None, |f| {
4400                    let fut = f.call_mut(resources);
4401                    async move {
4402                        let _ = fut.await;
4403                        Result::<_, TestError>::Ok(())
4404                    }
4405                }),
4406            )
4407            .await
4408            .unwrap();
4409
4410            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4411                .collect::<Vec<&'static str>>()
4412                .await;
4413
4414            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
4415
4416            Ok(())
4417        }
4418
4419        #[tokio::test]
4420        async fn try_for_each_concurrent_mut_gracefully_ends_when_one_function_returns_failure(
4421        ) -> Result<(), Box<dyn std::error::Error>> {
4422            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4423
4424            let mut resources = Resources::new();
4425            resources.insert(0u8);
4426            resources.insert(0u16);
4427            let resources = &resources;
4428
4429            let result = test_timeout(
4430                Duration::from_millis(50),
4431                Duration::from_millis(70),
4432                fn_graph.try_for_each_concurrent_mut(None, |f| {
4433                    let fut = f.call_mut(resources);
4434                    async move {
4435                        match fut.await {
4436                            "a" => Err(TestError("a")),
4437                            _ => Ok(()),
4438                        }
4439                    }
4440                }),
4441            )
4442            .await;
4443
4444            assert_eq!([TestError("a")], result.unwrap_err().1.as_slice());
4445            assert_eq!("f", seq_rx.try_recv().unwrap());
4446            assert_eq!("a", seq_rx.try_recv().unwrap()); assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4448
4449            Ok(())
4450        }
4451
4452        #[tokio::test]
4453        async fn try_for_each_concurrent_mut_gracefully_ends_when_one_function_returns_failure_variation(
4454        ) -> Result<(), Box<dyn std::error::Error>> {
4455            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4456
4457            let mut resources = Resources::new();
4458            resources.insert(0u8);
4459            resources.insert(0u16);
4460            let resources = &resources;
4461
4462            let result = test_timeout(
4463                Duration::from_millis(100),
4464                Duration::from_millis(130),
4465                fn_graph.try_for_each_concurrent_mut(None, |f| {
4466                    let fut = f.call_mut(resources);
4467                    async move {
4468                        match fut.await {
4469                            "c" => Err(TestError("c")),
4470                            _ => Ok(()),
4471                        }
4472                    }
4473                }),
4474            )
4475            .await;
4476
4477            assert_eq!([TestError("c")], result.unwrap_err().1.as_slice());
4478            assert_eq!("f", seq_rx.try_recv().unwrap());
4479            assert_eq!("a", seq_rx.try_recv().unwrap());
4480            assert_eq!("c", seq_rx.try_recv().unwrap());
4481            assert_eq!("b", seq_rx.try_recv().unwrap());
4482            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4483
4484            Ok(())
4485        }
4486
4487        #[tokio::test]
4488        async fn try_for_each_concurrent_mut_gracefully_ends_when_multiple_functions_return_failure(
4489        ) -> Result<(), Box<dyn std::error::Error>> {
4490            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4491
4492            let mut resources = Resources::new();
4493            resources.insert(0u8);
4494            resources.insert(0u16);
4495            let resources = &resources;
4496
4497            let result = test_timeout(
4498                Duration::from_millis(100),
4499                Duration::from_millis(130),
4500                fn_graph.try_for_each_concurrent_mut(None, |f| {
4501                    let fut = f.call_mut(resources);
4502                    async move {
4503                        match fut.await {
4504                            "b" => Err(TestError("b")),
4505                            "c" => Err(TestError("c")),
4506                            _ => Ok(()),
4507                        }
4508                    }
4509                }),
4510            )
4511            .await;
4512
4513            assert_eq!(
4516                [TestError("c"), TestError("b")],
4517                result.unwrap_err().1.as_slice()
4518            );
4519            assert_eq!("f", seq_rx.try_recv().unwrap());
4520            assert_eq!("a", seq_rx.try_recv().unwrap());
4521            assert_eq!("c", seq_rx.try_recv().unwrap());
4522            assert_eq!("b", seq_rx.try_recv().unwrap());
4523            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4524
4525            Ok(())
4526        }
4527
4528        #[tokio::test]
4529        async fn try_for_each_concurrent_mut_with_runs_fns_concurrently_mut(
4530        ) -> Result<(), Box<dyn std::error::Error>> {
4531            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4532
4533            let mut resources = Resources::new();
4534            resources.insert(0u8);
4535            resources.insert(0u16);
4536            let resources = &resources;
4537
4538            test_timeout(
4539                Duration::from_millis(200),
4540                Duration::from_millis(265),
4541                fn_graph.try_for_each_concurrent_mut_with(None, StreamOpts::new().rev(), |f| {
4542                    let fut = f.call_mut(resources);
4543                    async move {
4544                        let _ = fut.await;
4545                        Result::<_, TestError>::Ok(())
4546                    }
4547                }),
4548            )
4549            .await
4550            .unwrap();
4551
4552            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4553                .collect::<Vec<&'static str>>()
4554                .await;
4555
4556            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
4557
4558            Ok(())
4559        }
4560
4561        #[tokio::test]
4562        async fn try_for_each_concurrent_mut_with_gracefully_ends_when_one_function_returns_failure(
4563        ) -> Result<(), Box<dyn std::error::Error>> {
4564            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4565
4566            let mut resources = Resources::new();
4567            resources.insert(0u8);
4568            resources.insert(0u16);
4569            let resources = &resources;
4570
4571            let result = test_timeout(
4572                Duration::from_millis(50),
4573                Duration::from_millis(70),
4574                fn_graph.try_for_each_concurrent_mut_with(None, StreamOpts::new().rev(), |f| {
4575                    let fut = f.call_mut(resources);
4576                    async move {
4577                        match fut.await {
4578                            "e" => Err(TestError("e")),
4579                            _ => Ok(()),
4580                        }
4581                    }
4582                }),
4583            )
4584            .await;
4585
4586            assert_eq!([TestError("e")], result.unwrap_err().1.as_slice());
4587            assert_eq!("e", seq_rx.try_recv().unwrap()); assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4589
4590            Ok(())
4591        }
4592
4593        #[tokio::test]
4594        async fn try_for_each_concurrent_mut_with_gracefully_ends_when_one_function_returns_failure_variation(
4595        ) -> Result<(), Box<dyn std::error::Error>> {
4596            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4597
4598            let mut resources = Resources::new();
4599            resources.insert(0u8);
4600            resources.insert(0u16);
4601            let resources = &resources;
4602
4603            let result = test_timeout(
4604                Duration::from_millis(150),
4605                Duration::from_millis(197),
4606                fn_graph.try_for_each_concurrent_mut_with(None, StreamOpts::new().rev(), |f| {
4607                    let fut = f.call_mut(resources);
4608                    async move {
4609                        match fut.await {
4610                            "b" => Err(TestError("b")),
4611                            _ => Ok(()),
4612                        }
4613                    }
4614                }),
4615            )
4616            .await;
4617
4618            assert_eq!([TestError("b")], result.unwrap_err().1.as_slice());
4619            assert_eq!("e", seq_rx.try_recv().unwrap());
4620            assert_eq!("d", seq_rx.try_recv().unwrap());
4621            assert_eq!("b", seq_rx.try_recv().unwrap());
4622            assert_eq!("c", seq_rx.try_recv().unwrap());
4623            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4624
4625            Ok(())
4626        }
4627
4628        #[tokio::test]
4629        async fn try_for_each_concurrent_mut_with_gracefully_ends_when_multiple_functions_return_failure(
4630        ) -> Result<(), Box<dyn std::error::Error>> {
4631            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4632
4633            let mut resources = Resources::new();
4634            resources.insert(0u8);
4635            resources.insert(0u16);
4636            let resources = &resources;
4637
4638            let result = test_timeout(
4639                Duration::from_millis(150),
4640                Duration::from_millis(197),
4641                fn_graph.try_for_each_concurrent_mut_with(None, StreamOpts::new().rev(), |f| {
4642                    let fut = f.call_mut(resources);
4643                    async move {
4644                        match fut.await {
4645                            "b" => Err(TestError("b")),
4646                            "c" => Err(TestError("c")),
4647                            _ => Ok(()),
4648                        }
4649                    }
4650                }),
4651            )
4652            .await;
4653
4654            assert_eq!(
4655                [TestError("b"), TestError("c")],
4656                result.unwrap_err().1.as_slice()
4657            );
4658            assert_eq!("e", seq_rx.try_recv().unwrap());
4659            assert_eq!("d", seq_rx.try_recv().unwrap());
4660            assert_eq!("b", seq_rx.try_recv().unwrap());
4661            assert_eq!("c", seq_rx.try_recv().unwrap());
4662            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4663
4664            Ok(())
4665        }
4666
4667        #[tokio::test]
4668        async fn try_for_each_concurrent_control_returns_when_graph_is_empty() {
4669            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
4670
4671            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = fn_graph
4672                .try_for_each_concurrent_control(
4673                    None,
4674                    #[cfg_attr(coverage_nightly, coverage(off))]
4675                    |_f| async { ControlFlow::<(), ()>::Continue(()) },
4676                )
4677                .await;
4678        }
4679
4680        #[tokio::test]
4681        async fn try_for_each_concurrent_control_runs_fns_concurrently(
4682        ) -> Result<(), Box<dyn std::error::Error>> {
4683            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4684
4685            let mut resources = Resources::new();
4686            resources.insert(0u8);
4687            resources.insert(0u16);
4688            let resources = &resources;
4689
4690            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4691                Duration::from_millis(200),
4692                Duration::from_millis(265),
4693                fn_graph.try_for_each_concurrent_control(None, |f| {
4694                    let fut = f.call(resources);
4695                    async move {
4696                        let _ = fut.await;
4697                        ControlFlow::<(), ()>::Continue(())
4698                    }
4699                }),
4700            )
4701            .await;
4702
4703            seq_rx.close();
4704            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4705                .collect::<Vec<&'static str>>()
4706                .await;
4707
4708            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
4709
4710            Ok(())
4711        }
4712
4713        #[tokio::test]
4714        async fn try_for_each_concurrent_control_gracefully_ends_when_one_function_returns_failure(
4715        ) -> Result<(), Box<dyn std::error::Error>> {
4716            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4717
4718            let mut resources = Resources::new();
4719            resources.insert(0u8);
4720            resources.insert(0u16);
4721            let resources = &resources;
4722
4723            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4724                Duration::from_millis(50),
4725                Duration::from_millis(70),
4726                fn_graph.try_for_each_concurrent_control(None, |f| {
4727                    let fut = f.call(resources);
4728                    async move {
4729                        match fut.await {
4730                            "a" => ControlFlow::Break(()),
4731                            _ => ControlFlow::Continue(()),
4732                        }
4733                    }
4734                }),
4735            )
4736            .await;
4737
4738            assert_eq!("f", seq_rx.try_recv().unwrap());
4739            assert_eq!("a", seq_rx.try_recv().unwrap()); assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4741
4742            Ok(())
4743        }
4744
4745        #[tokio::test]
4746        async fn try_for_each_concurrent_control_gracefully_ends_when_one_function_returns_failure_variation(
4747        ) -> Result<(), Box<dyn std::error::Error>> {
4748            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4749
4750            let mut resources = Resources::new();
4751            resources.insert(0u8);
4752            resources.insert(0u16);
4753            let resources = &resources;
4754
4755            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4756                Duration::from_millis(100),
4757                Duration::from_millis(130),
4758                fn_graph.try_for_each_concurrent_control(None, |f| {
4759                    let fut = f.call(resources);
4760                    async move {
4761                        match fut.await {
4762                            "c" => ControlFlow::Break(()),
4763                            _ => ControlFlow::Continue(()),
4764                        }
4765                    }
4766                }),
4767            )
4768            .await;
4769
4770            assert_eq!("f", seq_rx.try_recv().unwrap());
4771            assert_eq!("a", seq_rx.try_recv().unwrap());
4772            assert_eq!("c", seq_rx.try_recv().unwrap());
4773            assert_eq!("b", seq_rx.try_recv().unwrap());
4774            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4775
4776            Ok(())
4777        }
4778
4779        #[tokio::test]
4780        async fn try_for_each_concurrent_control_gracefully_ends_when_multiple_functions_return_failure(
4781        ) -> Result<(), Box<dyn std::error::Error>> {
4782            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4783
4784            let mut resources = Resources::new();
4785            resources.insert(0u8);
4786            resources.insert(0u16);
4787            let resources = &resources;
4788
4789            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4790                Duration::from_millis(100),
4791                Duration::from_millis(130),
4792                fn_graph.try_for_each_concurrent_control(None, |f| {
4793                    let fut = f.call(resources);
4794                    async move {
4795                        match fut.await {
4796                            "b" => ControlFlow::Break(()),
4797                            "c" => ControlFlow::Break(()),
4798                            _ => ControlFlow::Continue(()),
4799                        }
4800                    }
4801                }),
4802            )
4803            .await;
4804
4805            assert_eq!("f", seq_rx.try_recv().unwrap());
4806            assert_eq!("a", seq_rx.try_recv().unwrap());
4807            assert_eq!("c", seq_rx.try_recv().unwrap());
4808            assert_eq!("b", seq_rx.try_recv().unwrap());
4809            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4810
4811            Ok(())
4812        }
4813
4814        #[tokio::test]
4815        async fn try_for_each_concurrent_control_with_returns_when_graph_is_empty() {
4816            let fn_graph = FnGraph::<Box<dyn FnRes<Ret = ()>>>::new();
4817
4818            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = fn_graph
4819                .try_for_each_concurrent_control_with(
4820                    None,
4821                    StreamOpts::new().rev(),
4822                    #[cfg_attr(coverage_nightly, coverage(off))]
4823                    |_f| async { ControlFlow::<(), ()>::Continue(()) },
4824                )
4825                .await;
4826        }
4827
4828        #[tokio::test]
4829        async fn try_for_each_concurrent_control_with_runs_fns_concurrently(
4830        ) -> Result<(), Box<dyn std::error::Error>> {
4831            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4832
4833            let mut resources = Resources::new();
4834            resources.insert(0u8);
4835            resources.insert(0u16);
4836            let resources = &resources;
4837
4838            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4839                Duration::from_millis(200),
4840                Duration::from_millis(265),
4841                fn_graph.try_for_each_concurrent_control_with(None, StreamOpts::new().rev(), |f| {
4842                    let fut = f.call(resources);
4843                    async move {
4844                        let _ = fut.await;
4845                        ControlFlow::<(), ()>::Continue(())
4846                    }
4847                }),
4848            )
4849            .await;
4850
4851            seq_rx.close();
4852            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4853                .collect::<Vec<&'static str>>()
4854                .await;
4855
4856            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
4857
4858            Ok(())
4859        }
4860
4861        #[tokio::test]
4862        async fn try_for_each_concurrent_control_with_gracefully_ends_when_one_function_returns_failure(
4863        ) -> Result<(), Box<dyn std::error::Error>> {
4864            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4865
4866            let mut resources = Resources::new();
4867            resources.insert(0u8);
4868            resources.insert(0u16);
4869            let resources = &resources;
4870
4871            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4872                Duration::from_millis(50),
4873                Duration::from_millis(70),
4874                fn_graph.try_for_each_concurrent_control_with(None, StreamOpts::new().rev(), |f| {
4875                    let fut = f.call(resources);
4876                    async move {
4877                        match fut.await {
4878                            "e" => ControlFlow::Break(()),
4879                            _ => ControlFlow::Continue(()),
4880                        }
4881                    }
4882                }),
4883            )
4884            .await;
4885
4886            assert_eq!("e", seq_rx.try_recv().unwrap()); assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4888
4889            Ok(())
4890        }
4891
4892        #[tokio::test]
4893        async fn try_for_each_concurrent_control_with_gracefully_ends_when_one_function_returns_failure_variation(
4894        ) -> Result<(), Box<dyn std::error::Error>> {
4895            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4896
4897            let mut resources = Resources::new();
4898            resources.insert(0u8);
4899            resources.insert(0u16);
4900            let resources = &resources;
4901
4902            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4903                Duration::from_millis(150),
4904                Duration::from_millis(197),
4905                fn_graph.try_for_each_concurrent_control_with(None, StreamOpts::new().rev(), |f| {
4906                    let fut = f.call(resources);
4907                    async move {
4908                        match fut.await {
4909                            "b" => ControlFlow::Break(()),
4910                            _ => ControlFlow::Continue(()),
4911                        }
4912                    }
4913                }),
4914            )
4915            .await;
4916
4917            assert_eq!("e", seq_rx.try_recv().unwrap());
4918            assert_eq!("d", seq_rx.try_recv().unwrap());
4919            assert_eq!("b", seq_rx.try_recv().unwrap());
4920            assert_eq!("c", seq_rx.try_recv().unwrap());
4921            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4922
4923            Ok(())
4924        }
4925
4926        #[tokio::test]
4927        async fn try_for_each_concurrent_control_with_gracefully_ends_when_multiple_functions_return_failure(
4928        ) -> Result<(), Box<dyn std::error::Error>> {
4929            let (fn_graph, mut seq_rx) = complex_graph_unit()?;
4930
4931            let mut resources = Resources::new();
4932            resources.insert(0u8);
4933            resources.insert(0u16);
4934            let resources = &resources;
4935
4936            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4937                Duration::from_millis(150),
4938                Duration::from_millis(197),
4939                fn_graph.try_for_each_concurrent_control_with(None, StreamOpts::new().rev(), |f| {
4940                    let fut = f.call(resources);
4941                    async move {
4942                        match fut.await {
4943                            "b" => ControlFlow::Break(()),
4944                            "c" => ControlFlow::Break(()),
4945                            _ => ControlFlow::Continue(()),
4946                        }
4947                    }
4948                }),
4949            )
4950            .await;
4951
4952            assert_eq!("e", seq_rx.try_recv().unwrap());
4953            assert_eq!("d", seq_rx.try_recv().unwrap());
4954            assert_eq!("b", seq_rx.try_recv().unwrap());
4955            assert_eq!("c", seq_rx.try_recv().unwrap());
4956            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
4957
4958            Ok(())
4959        }
4960
4961        #[tokio::test]
4962        async fn try_for_each_concurrent_control_mut_runs_fns_concurrently_mut(
4963        ) -> Result<(), Box<dyn std::error::Error>> {
4964            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4965
4966            let mut resources = Resources::new();
4967            resources.insert(0u8);
4968            resources.insert(0u16);
4969            let resources = &resources;
4970
4971            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
4972                Duration::from_millis(200),
4973                Duration::from_millis(265),
4974                fn_graph.try_for_each_concurrent_control_mut(None, |f| {
4975                    let fut = f.call_mut(resources);
4976                    async move {
4977                        let _ = fut.await;
4978                        ControlFlow::<(), ()>::Continue(())
4979                    }
4980                }),
4981            )
4982            .await;
4983
4984            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
4985                .collect::<Vec<&'static str>>()
4986                .await;
4987
4988            assert_eq!(["f", "a", "c", "b", "d", "e"], fn_iter_order.as_slice());
4989
4990            Ok(())
4991        }
4992
4993        #[tokio::test]
4994        async fn try_for_each_concurrent_control_mut_gracefully_ends_when_one_function_returns_failure(
4995        ) -> Result<(), Box<dyn std::error::Error>> {
4996            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
4997
4998            let mut resources = Resources::new();
4999            resources.insert(0u8);
5000            resources.insert(0u16);
5001            let resources = &resources;
5002
5003            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
5004                Duration::from_millis(50),
5005                Duration::from_millis(70),
5006                fn_graph.try_for_each_concurrent_control_mut(None, |f| {
5007                    let fut = f.call_mut(resources);
5008                    async move {
5009                        match fut.await {
5010                            "a" => ControlFlow::Break(()),
5011                            _ => ControlFlow::Continue(()),
5012                        }
5013                    }
5014                }),
5015            )
5016            .await;
5017
5018            assert_eq!("f", seq_rx.try_recv().unwrap());
5019            assert_eq!("a", seq_rx.try_recv().unwrap()); assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
5021
5022            Ok(())
5023        }
5024
5025        #[tokio::test]
5026        async fn try_for_each_concurrent_control_mut_gracefully_ends_when_one_function_returns_failure_variation(
5027        ) -> Result<(), Box<dyn std::error::Error>> {
5028            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
5029
5030            let mut resources = Resources::new();
5031            resources.insert(0u8);
5032            resources.insert(0u16);
5033            let resources = &resources;
5034
5035            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
5036                Duration::from_millis(100),
5037                Duration::from_millis(130),
5038                fn_graph.try_for_each_concurrent_control_mut(None, |f| {
5039                    let fut = f.call_mut(resources);
5040                    async move {
5041                        match fut.await {
5042                            "c" => ControlFlow::Break(()),
5043                            _ => ControlFlow::Continue(()),
5044                        }
5045                    }
5046                }),
5047            )
5048            .await;
5049
5050            assert_eq!("f", seq_rx.try_recv().unwrap());
5051            assert_eq!("a", seq_rx.try_recv().unwrap());
5052            assert_eq!("c", seq_rx.try_recv().unwrap());
5053            assert_eq!("b", seq_rx.try_recv().unwrap());
5054            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
5055
5056            Ok(())
5057        }
5058
5059        #[tokio::test]
5060        async fn try_for_each_concurrent_control_mut_gracefully_ends_when_multiple_functions_return_failure(
5061        ) -> Result<(), Box<dyn std::error::Error>> {
5062            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
5063
5064            let mut resources = Resources::new();
5065            resources.insert(0u8);
5066            resources.insert(0u16);
5067            let resources = &resources;
5068
5069            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
5070                Duration::from_millis(100),
5071                Duration::from_millis(130),
5072                fn_graph.try_for_each_concurrent_control_mut(None, |f| {
5073                    let fut = f.call_mut(resources);
5074                    async move {
5075                        match fut.await {
5076                            "b" => ControlFlow::Break(()),
5077                            "c" => ControlFlow::Break(()),
5078                            _ => ControlFlow::Continue(()),
5079                        }
5080                    }
5081                }),
5082            )
5083            .await;
5084
5085            assert_eq!("f", seq_rx.try_recv().unwrap());
5086            assert_eq!("a", seq_rx.try_recv().unwrap());
5087            assert_eq!("c", seq_rx.try_recv().unwrap());
5088            assert_eq!("b", seq_rx.try_recv().unwrap());
5089            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
5090
5091            Ok(())
5092        }
5093
5094        #[tokio::test]
5095        async fn try_for_each_concurrent_control_mut_with_runs_fns_concurrently_mut(
5096        ) -> Result<(), Box<dyn std::error::Error>> {
5097            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
5098
5099            let mut resources = Resources::new();
5100            resources.insert(0u8);
5101            resources.insert(0u16);
5102            let resources = &resources;
5103
5104            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
5105                Duration::from_millis(200),
5106                Duration::from_millis(265),
5107                fn_graph.try_for_each_concurrent_control_mut_with(
5108                    None,
5109                    StreamOpts::new().rev(),
5110                    |f| {
5111                        let fut = f.call_mut(resources);
5112                        async move {
5113                            let _ = fut.await;
5114                            ControlFlow::<(), ()>::Continue(())
5115                        }
5116                    },
5117                ),
5118            )
5119            .await;
5120
5121            let fn_iter_order = stream::poll_fn(|context| seq_rx.poll_recv(context))
5122                .collect::<Vec<&'static str>>()
5123                .await;
5124
5125            assert_eq!(["e", "d", "b", "c", "f", "a"], fn_iter_order.as_slice());
5126
5127            Ok(())
5128        }
5129
5130        #[tokio::test]
5131        async fn try_for_each_concurrent_control_mut_with_gracefully_ends_when_one_function_returns_failure(
5132        ) -> Result<(), Box<dyn std::error::Error>> {
5133            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
5134
5135            let mut resources = Resources::new();
5136            resources.insert(0u8);
5137            resources.insert(0u16);
5138            let resources = &resources;
5139
5140            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
5141                Duration::from_millis(50),
5142                Duration::from_millis(70),
5143                fn_graph.try_for_each_concurrent_control_mut_with(
5144                    None,
5145                    StreamOpts::new().rev(),
5146                    |f| {
5147                        let fut = f.call_mut(resources);
5148                        async move {
5149                            match fut.await {
5150                                "e" => ControlFlow::Break(()),
5151                                _ => ControlFlow::Continue(()),
5152                            }
5153                        }
5154                    },
5155                ),
5156            )
5157            .await;
5158
5159            assert_eq!("e", seq_rx.try_recv().unwrap()); assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
5161
5162            Ok(())
5163        }
5164
5165        #[tokio::test]
5166        async fn try_for_each_concurrent_control_mut_with_gracefully_ends_when_one_function_returns_failure_variation(
5167        ) -> Result<(), Box<dyn std::error::Error>> {
5168            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
5169
5170            let mut resources = Resources::new();
5171            resources.insert(0u8);
5172            resources.insert(0u16);
5173            let resources = &resources;
5174
5175            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
5176                Duration::from_millis(150),
5177                Duration::from_millis(197),
5178                fn_graph.try_for_each_concurrent_control_mut_with(
5179                    None,
5180                    StreamOpts::new().rev(),
5181                    |f| {
5182                        let fut = f.call_mut(resources);
5183                        async move {
5184                            match fut.await {
5185                                "b" => ControlFlow::Break(()),
5186                                _ => ControlFlow::Continue(()),
5187                            }
5188                        }
5189                    },
5190                ),
5191            )
5192            .await;
5193
5194            assert_eq!("e", seq_rx.try_recv().unwrap());
5195            assert_eq!("d", seq_rx.try_recv().unwrap());
5196            assert_eq!("b", seq_rx.try_recv().unwrap());
5197            assert_eq!("c", seq_rx.try_recv().unwrap());
5198            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
5199
5200            Ok(())
5201        }
5202
5203        #[tokio::test]
5204        async fn try_for_each_concurrent_control_mut_with_gracefully_ends_when_multiple_functions_return_failure(
5205        ) -> Result<(), Box<dyn std::error::Error>> {
5206            let (mut fn_graph, mut seq_rx) = complex_graph_unit_mut()?;
5207
5208            let mut resources = Resources::new();
5209            resources.insert(0u8);
5210            resources.insert(0u16);
5211            let resources = &resources;
5212
5213            let (ControlFlow::Continue(_) | ControlFlow::Break(_)) = test_timeout(
5214                Duration::from_millis(150),
5215                Duration::from_millis(197),
5216                fn_graph.try_for_each_concurrent_control_mut_with(
5217                    None,
5218                    StreamOpts::new().rev(),
5219                    |f| {
5220                        let fut = f.call_mut(resources);
5221                        async move {
5222                            match fut.await {
5223                                "b" => ControlFlow::Break(()),
5224                                "c" => ControlFlow::Break(()),
5225                                _ => ControlFlow::Continue(()),
5226                            }
5227                        }
5228                    },
5229                ),
5230            )
5231            .await;
5232
5233            assert_eq!("e", seq_rx.try_recv().unwrap());
5234            assert_eq!("d", seq_rx.try_recv().unwrap());
5235            assert_eq!("b", seq_rx.try_recv().unwrap());
5236            assert_eq!("c", seq_rx.try_recv().unwrap());
5237            assert_eq!(TryRecvError::Empty, seq_rx.try_recv().unwrap_err());
5238
5239            Ok(())
5240        }
5241
5242        async fn test_timeout<T, Fut>(lower: Duration, upper: Duration, fut: Fut) -> T
5243        where
5244            Fut: Future<Output = T>,
5245        {
5246            let timestamp_begin = Instant::now();
5247            let t = tokio::time::timeout(Duration::from_millis(750), fut)
5248                .await
5249                .unwrap();
5250            let duration_elapsed = timestamp_begin.elapsed();
5251
5252            assert!(
5255                duration_elapsed < upper,
5256                "expected duration to be less than {} ms, duration was {} ms",
5257                upper.as_millis(),
5258                duration_elapsed.as_millis()
5259            );
5260            assert!(
5261                duration_elapsed > lower,
5262                "expected duration to be more than {} ms, duration was {} ms",
5263                lower.as_millis(),
5264                duration_elapsed.as_millis()
5265            );
5266
5267            t
5268        }
5269
5270        type BoxFnRes = Box<dyn FnRes<Ret = BoxFuture<'static, &'static str>>>;
5271        fn complex_graph_unit(
5272        ) -> Result<(FnGraph<BoxFnRes>, Receiver<&'static str>), WouldCycle<Edge>> {
5273            let (seq_tx, seq_rx) = mpsc::channel(6);
5284            let seq_tx_a = seq_tx.clone();
5287            let seq_tx_b = seq_tx.clone();
5288            let seq_tx_c = seq_tx.clone();
5289            let seq_tx_d = seq_tx.clone();
5290            let seq_tx_e = seq_tx.clone();
5291            let seq_tx_f = seq_tx;
5292
5293            let mut fn_graph_builder = FnGraphBuilder::new();
5294            let [fn_id_a, fn_id_b, fn_id_c, fn_id_d, fn_id_e, fn_id_f] =
5295                fn_graph_builder.add_fns([
5296                    (move |_: &u8| -> BoxFuture<'_, &'static str> {
5297                        seq_tx_a
5298                            .try_send("a")
5299                            .expect("Failed to send sequence `a`.");
5300                        async {
5301                            time::sleep(sleep_duration!()).await;
5302                            "a"
5303                        }
5304                        .boxed()
5305                    })
5306                    .into_fn_res(),
5307                    (move |_: &mut u16| -> BoxFuture<'_, &'static str> {
5308                        seq_tx_b
5309                            .try_send("b")
5310                            .expect("Failed to send sequence `b`.");
5311                        async {
5312                            time::sleep(sleep_duration!()).await;
5313                            "b"
5314                        }
5315                        .boxed()
5316                    })
5317                    .into_fn_res(),
5318                    (move || -> BoxFuture<'_, &'static str> {
5319                        seq_tx_c
5320                            .try_send("c")
5321                            .expect("Failed to send sequence `c`.");
5322                        async {
5323                            time::sleep(sleep_duration!()).await;
5324                            "c"
5325                        }
5326                        .boxed()
5327                    })
5328                    .into_fn_res(),
5329                    (move |_: &u8, _: &mut u16| -> BoxFuture<'_, &'static str> {
5330                        seq_tx_d
5331                            .try_send("d")
5332                            .expect("Failed to send sequence `d`.");
5333                        async {
5334                            time::sleep(sleep_duration!()).await;
5335                            "d"
5336                        }
5337                        .boxed()
5338                    })
5339                    .into_fn_res(),
5340                    (move || -> BoxFuture<'_, &'static str> {
5341                        seq_tx_e
5342                            .try_send("e")
5343                            .expect("Failed to send sequence `e`.");
5344                        async {
5345                            time::sleep(sleep_duration!()).await;
5346                            "e"
5347                        }
5348                        .boxed()
5349                    })
5350                    .into_fn_res(),
5351                    (move |_: &mut u16| -> BoxFuture<'_, &'static str> {
5352                        seq_tx_f
5353                            .try_send("f")
5354                            .expect("Failed to send sequence `f`.");
5355                        async {
5356                            time::sleep(sleep_duration!()).await;
5357                            "f"
5358                        }
5359                        .boxed()
5360                    })
5361                    .into_fn_res(),
5362                ]);
5363            fn_graph_builder.add_logic_edges([
5364                (fn_id_a, fn_id_b),
5365                (fn_id_a, fn_id_c),
5366                (fn_id_b, fn_id_e),
5367                (fn_id_c, fn_id_d),
5368                (fn_id_d, fn_id_e),
5369                (fn_id_f, fn_id_e),
5370            ])?;
5371            let fn_graph = fn_graph_builder.build();
5372            Ok((fn_graph, seq_rx))
5373        }
5374
5375        type BoxFnResMut = Box<dyn FnResMut<Ret = BoxFuture<'static, &'static str>>>;
5376
5377        fn complex_graph_unit_mut(
5378        ) -> Result<(FnGraph<BoxFnResMut>, Receiver<&'static str>), WouldCycle<Edge>> {
5379            let (seq_tx, seq_rx) = mpsc::channel(6);
5390            let mut seq_tx_a = Some(seq_tx.clone());
5393            let mut seq_tx_b = Some(seq_tx.clone());
5394            let mut seq_tx_c = Some(seq_tx.clone());
5395            let mut seq_tx_d = Some(seq_tx.clone());
5396            let mut seq_tx_e = Some(seq_tx.clone());
5397            let mut seq_tx_f = Some(seq_tx);
5398
5399            let mut fn_graph_builder = FnGraphBuilder::new();
5400            let [fn_id_a, fn_id_b, fn_id_c, fn_id_d, fn_id_e, fn_id_f] =
5401                fn_graph_builder.add_fns([
5402                    (move |_: &u8| -> BoxFuture<&'static str> {
5403                        if let Some(seq_tx_a) = seq_tx_a.take() {
5404                            seq_tx_a
5405                                .try_send("a")
5406                                .expect("Failed to send sequence `a`.");
5407                        }
5408                        Box::pin(async {
5409                            time::sleep(sleep_duration!()).await;
5410                            "a"
5411                        })
5412                    })
5413                    .into_fn_res_mut(),
5414                    (move |_: &mut u16| -> BoxFuture<&'static str> {
5415                        if let Some(seq_tx_b) = seq_tx_b.take() {
5416                            seq_tx_b
5417                                .try_send("b")
5418                                .expect("Failed to send sequence `b`.");
5419                        }
5420                        Box::pin(async {
5421                            time::sleep(sleep_duration!()).await;
5422                            "b"
5423                        })
5424                    })
5425                    .into_fn_res_mut(),
5426                    (move || -> BoxFuture<&'static str> {
5427                        if let Some(seq_tx_c) = seq_tx_c.take() {
5428                            seq_tx_c
5429                                .try_send("c")
5430                                .expect("Failed to send sequence `c`.");
5431                        }
5432                        Box::pin(async {
5433                            time::sleep(sleep_duration!()).await;
5434                            "c"
5435                        })
5436                    })
5437                    .into_fn_res_mut(),
5438                    (move |_: &u8, _: &mut u16| -> BoxFuture<&'static str> {
5439                        if let Some(seq_tx_d) = seq_tx_d.take() {
5440                            seq_tx_d
5441                                .try_send("d")
5442                                .expect("Failed to send sequence `d`.");
5443                        }
5444                        Box::pin(async {
5445                            time::sleep(sleep_duration!()).await;
5446                            "d"
5447                        })
5448                    })
5449                    .into_fn_res_mut(),
5450                    (move || -> BoxFuture<&'static str> {
5451                        if let Some(seq_tx_e) = seq_tx_e.take() {
5452                            seq_tx_e
5453                                .try_send("e")
5454                                .expect("Failed to send sequence `e`.");
5455                        }
5456                        Box::pin(async {
5457                            time::sleep(sleep_duration!()).await;
5458                            "e"
5459                        })
5460                    })
5461                    .into_fn_res_mut(),
5462                    (move |_: &mut u16| -> BoxFuture<&'static str> {
5463                        if let Some(seq_tx_f) = seq_tx_f.take() {
5464                            seq_tx_f
5465                                .try_send("f")
5466                                .expect("Failed to send sequence `f`.");
5467                        }
5468                        Box::pin(async {
5469                            time::sleep(sleep_duration!()).await;
5470                            "f"
5471                        })
5472                    })
5473                    .into_fn_res_mut(),
5474                ]);
5475            fn_graph_builder.add_contains_edges([
5476                (fn_id_a, fn_id_b),
5477                (fn_id_a, fn_id_c),
5478                (fn_id_b, fn_id_e),
5479                (fn_id_c, fn_id_d),
5480                (fn_id_d, fn_id_e),
5481                (fn_id_f, fn_id_e),
5482            ])?;
5483            let fn_graph = fn_graph_builder.build();
5484            Ok((fn_graph, seq_rx))
5485        }
5486
5487        #[derive(Debug, PartialEq, Eq)]
5488        struct TestError(&'static str);
5489
5490        impl fmt::Display for TestError {
5491            #[cfg_attr(coverage_nightly, coverage(off))]
5492            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5493                self.0.fmt(f)
5494            }
5495        }
5496
5497        impl std::error::Error for TestError {}
5498    }
5499
5500    fn complex_graph() -> Result<FnGraph<Box<dyn FnRes<Ret = &'static str>>>, WouldCycle<Edge>> {
5501        let mut fn_graph_builder = FnGraphBuilder::new();
5511        let [fn_id_a, fn_id_b, fn_id_c, fn_id_d, fn_id_e, fn_id_f] = fn_graph_builder.add_fns([
5512            (|_: &u8| "a").into_fn_res(),
5513            (|_: &mut u16| "b").into_fn_res(),
5514            (|| "c").into_fn_res(),
5515            (|_: &u8, _: &mut u16| "d").into_fn_res(),
5516            (|| "e").into_fn_res(),
5517            (|_: &mut u16| "f").into_fn_res(),
5518        ]);
5519        fn_graph_builder.add_logic_edges([
5520            (fn_id_a, fn_id_b),
5521            (fn_id_a, fn_id_c),
5522            (fn_id_b, fn_id_e),
5523            (fn_id_c, fn_id_d),
5524            (fn_id_d, fn_id_e),
5525            (fn_id_f, fn_id_e),
5526        ])?;
5527        let fn_graph = fn_graph_builder.build();
5528        Ok(fn_graph)
5529    }
5530}