crossflow/
chain.rs

1/*
2 * Copyright (C) 2023 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use std::future::Future;
19
20use bevy_ecs::prelude::Entity;
21
22use smallvec::SmallVec;
23
24use std::error::Error;
25
26use crate::{
27    make_option_branching, make_result_branching, Accessing, AddOperation, AsMap, Buffer,
28    BufferKey, BufferKeys, Bufferable, Buffering, Builder, Collect, CreateCancelFilter,
29    CreateDisposalFilter, ForkTargetStorage, Gate, GateRequest, InputSlot, IntoAsyncMap,
30    IntoBlockingCallback, IntoBlockingMap, Node, Noop, OperateBufferAccess, OperateCancel,
31    OperateDynamicGate, OperateQuietCancel, OperateSplit, OperateStaticGate, Output, ProvideOnce,
32    Provider, Scope, ScopeSettings, Sendish, ServiceInstructions, Spread, StreamPack,
33    StreamTargetMap, Trim, TrimBranch, UnusedTarget,
34};
35
36pub mod fork_clone_builder;
37pub use fork_clone_builder::*;
38
39pub(crate) mod premade;
40use premade::*;
41
42pub mod split;
43pub use split::*;
44
45pub mod unzip;
46pub use unzip::*;
47
48/// Chain operations onto the output of a workflow node.
49///
50/// Make sure to use [`Self::connect`] when you're done chaining so that the
51/// final output of the chain gets connected into another node. If the final
52/// output of the chain is meant to be the final output of your workflow then
53/// you should connect it to [`Scope::terminate`].
54#[must_use]
55pub struct Chain<'w, 's, 'a, 'b, T> {
56    target: Entity,
57    builder: &'b mut Builder<'w, 's, 'a>,
58    _ignore: std::marker::PhantomData<fn(T)>,
59}
60
61impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
62    /// Get the raw [`Output`] slot for the current link in the chain. You can
63    /// use this to resume building this chain later.
64    ///
65    /// Note that if you do not connect some path of your workflow into the
66    /// `terminate` slot of your [`Scope`] then the workflow will not be able
67    /// to run.
68    pub fn output(self) -> Output<T> {
69        Output::new(self.scope(), self.target)
70    }
71
72    /// Get the raw [`Output`] slot for the current link in the chain, along with
73    /// the builder. This can be used to do more complex building inside of
74    /// chain builder functions.
75    pub fn unpack(self) -> (Output<T>, &'b mut Builder<'w, 's, 'a>) {
76        (Output::new(self.scope(), self.target), self.builder)
77    }
78
79    /// Connect this output into an input slot.
80    ///
81    /// Pass a [terminate](crate::Scope::terminate) into this function to
82    /// end a chain.
83    pub fn connect(self, input: InputSlot<T>) {
84        let output = Output::new(self.scope(), self.target);
85        self.builder.connect(output, input)
86    }
87
88    /// Explicitly terminate the chain by indicating that you want it to remain
89    /// unused.
90    pub fn unused(self) {
91        // Do nothing
92    }
93
94    /// Connect the response at the end of the chain into a new provider. Get
95    /// the response of the new provider as a chain so you can continue chaining
96    /// operations.
97    pub fn then<P: Provider<Request = T>>(self, provider: P) -> Chain<'w, 's, 'a, 'b, P::Response>
98    where
99        P::Response: 'static + Send + Sync,
100    {
101        let source = self.target;
102        let target = self.builder.commands.spawn(UnusedTarget).id();
103        provider.connect(
104            Some(self.builder.scope()),
105            source,
106            target,
107            self.builder.commands,
108        );
109        Chain::new(target, self.builder)
110    }
111
112    /// Connect the response in the chain into a new provider. Get the node
113    /// slots that wrap around the new provider.
114    pub fn then_node<P: Provider<Request = T>>(
115        self,
116        provider: P,
117    ) -> Node<T, P::Response, P::Streams>
118    where
119        P::Response: 'static + Send + Sync,
120        P::Streams: StreamPack,
121    {
122        let source = self.target;
123        let target = self.builder.commands.spawn(UnusedTarget).id();
124        provider.connect(Some(self.scope()), source, target, self.builder.commands);
125
126        let mut map = StreamTargetMap::default();
127        let streams =
128            <P::Streams as StreamPack>::spawn_node_streams(source, &mut map, self.builder);
129        self.builder.commands.entity(source).insert(map);
130        Node {
131            input: InputSlot::new(self.builder.scope(), source),
132            output: Output::new(self.builder.scope(), target),
133            streams,
134        }
135    }
136
137    /// Apply a function whose input is [`BlockingMap<T>`](crate::BlockingMap)
138    /// or [`AsyncMap<T>`](crate::AsyncMap).
139    pub fn map<M, F: AsMap<M>>(
140        self,
141        f: F,
142    ) -> Chain<'w, 's, 'a, 'b, <F::MapType as ProvideOnce>::Response>
143    where
144        F::MapType: Provider<Request = T>,
145        <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
146    {
147        self.then(f.as_map())
148    }
149
150    /// Same as [`Self::map`] but receive the new node instead of continuing a
151    /// chain.
152    pub fn map_node<M, F: AsMap<M>>(
153        self,
154        f: F,
155    ) -> Node<T, <F::MapType as ProvideOnce>::Response, <F::MapType as ProvideOnce>::Streams>
156    where
157        F::MapType: Provider<Request = T>,
158        <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
159        <F::MapType as ProvideOnce>::Streams: StreamPack,
160    {
161        self.then_node(f.as_map())
162    }
163
164    /// Apply a function whose input is the Response of the current Chain. The
165    /// output of the map will be the Response of the returned Chain.
166    ///
167    /// This takes in a regular blocking function rather than an async function,
168    /// so while the function is executing, it will block all systems and all
169    /// other workflows from running.
170    pub fn map_block<U>(
171        self,
172        f: impl FnMut(T) -> U + 'static + Send + Sync,
173    ) -> Chain<'w, 's, 'a, 'b, U>
174    where
175        U: 'static + Send + Sync,
176    {
177        self.then(f.into_blocking_map())
178    }
179
180    /// Same as [`Self::map_block`] but receive the new node instead of
181    /// continuing a chain.
182    pub fn map_block_node<U>(self, f: impl FnMut(T) -> U + 'static + Send + Sync) -> Node<T, U, ()>
183    where
184        U: 'static + Send + Sync,
185    {
186        self.then_node(f.into_blocking_map())
187    }
188
189    /// Apply a map whose output is a Future that will be run in the
190    /// [`AsyncComputeTaskPool`](bevy_tasks::AsyncComputeTaskPool) (unless
191    /// the `single_threaded_async` feature is active). The output of the Future
192    /// will be the Response of the returned Chain.
193    pub fn map_async<Task>(
194        self,
195        f: impl FnMut(T) -> Task + 'static + Send + Sync,
196    ) -> Chain<'w, 's, 'a, 'b, Task::Output>
197    where
198        Task: Future + 'static + Sendish,
199        Task::Output: 'static + Send + Sync,
200    {
201        self.then(f.into_async_map())
202    }
203
204    /// Same as [`Self::map_async`] but receive the new node instead of
205    /// continuing a chain.
206    pub fn map_async_node<Task>(
207        self,
208        f: impl FnMut(T) -> Task + 'static + Send + Sync,
209    ) -> Node<T, Task::Output, ()>
210    where
211        Task: Future + 'static + Sendish,
212        Task::Output: 'static + Send + Sync,
213    {
214        self.then_node(f.into_async_map())
215    }
216
217    /// Build a workflow scope to be used as an element in this chain.
218    ///
219    /// If you want to connect to the stream outputs or be able to loop back
220    /// to the input of this scope, use [`Self::then_scope_node`] instead.
221    pub fn then_scope<Response, Streams, Settings>(
222        self,
223        build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
224    ) -> Chain<'w, 's, 'a, 'b, Response>
225    where
226        Response: 'static + Send + Sync,
227        Streams: StreamPack,
228        Settings: Into<ScopeSettings>,
229    {
230        let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
231        self.builder
232            .create_scope_impl::<T, Response, Streams, Settings>(self.target, exit_scope, build)
233            .output
234            .chain(self.builder)
235    }
236
237    /// Simplified version of [`Self::then_scope`] limited to a simple input and
238    /// output.
239    ///
240    /// Unlike `then_scope`, this function can infer the types for the generics
241    /// so you don't need to explicitly specify them.
242    pub fn then_io_scope<Response, Settings>(
243        self,
244        build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
245    ) -> Chain<'w, 's, 'a, 'b, Response>
246    where
247        Response: 'static + Send + Sync,
248        Settings: Into<ScopeSettings>,
249    {
250        self.then_scope(build)
251    }
252
253    /// From the current target in the chain, build a [scoped](Scope) workflow
254    /// and then get back a node that represents that scoped workflow.
255    pub fn then_scope_node<Response, Streams, Settings>(
256        self,
257        build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
258    ) -> Node<T, Response, Streams>
259    where
260        Response: 'static + Send + Sync,
261        Streams: StreamPack,
262        Settings: Into<ScopeSettings>,
263    {
264        let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
265        self.builder
266            .create_scope_impl::<T, Response, Streams, Settings>(self.target, exit_scope, build)
267    }
268
269    /// Simplified version of [`Self::then_scope_node`] limited to a simple
270    /// input and output.
271    ///
272    /// Unlike `then_scope_node`, this function can infer the types for the
273    /// generics so you don't need to explicitly specify them.
274    pub fn then_io_scope_node<Response, Settings>(
275        self,
276        build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
277    ) -> Node<T, Response, ()>
278    where
279        Response: 'static + Send + Sync,
280        Settings: Into<ScopeSettings>,
281    {
282        self.then_scope_node(build)
283    }
284
285    /// Many services just need to be triggered without being give any particular
286    /// input. The convention for those services is to take an input of `()`.
287    ///
288    /// We call this "triggering", and this function is a convenient way to turn
289    /// any output type into a trigger.
290    pub fn trigger(self) -> Chain<'w, 's, 'a, 'b, ()> {
291        self.map_block(|_| ())
292    }
293
294    /// Combine the output with access to some buffers. You must specify one or
295    /// more buffers to access. For multiple buffers, combine them into a tuple
296    /// or an [`Iterator`]. Tuples of buffers can be nested inside each other.
297    ///
298    /// Other [outputs](Output) can also be passed in as buffers. Those outputs
299    /// will be transformed into a buffer with default buffer settings.
300    ///
301    /// To obtain a set of buffer keys each time a buffer is modified, use
302    /// [`listen`](crate::Accessible::listen).
303    pub fn with_access<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, (T, BufferKeys<B>)>
304    where
305        B: Bufferable,
306        B::BufferType: Accessing,
307    {
308        let buffers = buffers.into_buffer(self.builder);
309        buffers.verify_scope(self.builder.scope());
310
311        let source = self.target;
312        let target = self.builder.commands.spawn(UnusedTarget).id();
313        self.builder.commands.queue(AddOperation::new(
314            Some(self.builder.scope()),
315            source,
316            OperateBufferAccess::<T, B::BufferType>::new(buffers, target),
317        ));
318
319        Chain::new(target, self.builder)
320    }
321
322    /// After the previous operation is finished, trigger a new operation whose
323    /// input is simply the access keys for one or more buffers.
324    pub fn then_access<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, BufferKeys<B>>
325    where
326        B: Bufferable,
327        B::BufferType: Accessing,
328    {
329        self.with_access(buffers).map_block(|(_, key)| key)
330    }
331
332    /// Apply a [`Provider`] that filters the response by returning an [`Option`].
333    /// If the filter returns [`None`] then a [`Cancellation`](crate::Cancellation)
334    /// is triggered. Otherwise the chain continues with the value that was
335    /// inside [`Some`].
336    ///
337    /// This is conceptually similar to [`Iterator::filter_map`]. You can also
338    /// use [`Chain::disposal_filter`] to dispose of the value instead of
339    /// cancelling the entire scope.
340    pub fn cancellation_filter<ThenResponse, F>(
341        self,
342        filter_provider: F,
343    ) -> Chain<'w, 's, 'a, 'b, ThenResponse>
344    where
345        ThenResponse: 'static + Send + Sync,
346        F: Provider<Request = T, Response = Option<ThenResponse>>,
347        F::Response: 'static + Send + Sync,
348        F::Streams: StreamPack,
349    {
350        self.then(filter_provider).cancel_on_none()
351    }
352
353    /// When the chain reaches this point, cancel the workflow and include
354    /// information about the value that triggered the cancellation. The input
355    /// type must implement [`ToString`].
356    ///
357    /// If you want to trigger a cancellation with a type that does not
358    /// implement [`ToString`] then use [`Self::trigger`] and then
359    /// [`Self::then_quiet_cancel`].
360    pub fn then_cancel(self)
361    where
362        T: ToString,
363    {
364        self.builder.commands.queue(AddOperation::new(
365            Some(self.scope()),
366            self.target,
367            OperateCancel::<T>::new(),
368        ));
369    }
370
371    /// Same as [`Chain::cancellation_filter`] but the chain will be disposed
372    /// instead of cancelled, so the workflow may continue if the termination
373    /// node can still be reached.
374    pub fn disposal_filter<ThenResponse, F>(
375        self,
376        filter_provider: F,
377    ) -> Chain<'w, 's, 'a, 'b, ThenResponse>
378    where
379        ThenResponse: 'static + Send + Sync,
380        F: Provider<Request = T, Response = Option<ThenResponse>>,
381        F::Response: 'static + Send + Sync,
382        F::Streams: StreamPack,
383    {
384        self.then(filter_provider).dispose_on_none()
385    }
386
387    /// When the response is delivered, we will make a clone of it and
388    /// simultaneously pass that clone along two different branches chains: one
389    /// determined by the `build` function passed into this operation and the
390    /// other determined by the [`Chain`] that gets returned.
391    ///
392    /// This can only be applied when `Response` can be cloned.
393    ///
394    /// See also [`Chain::fork_clone`]
395    pub fn branch_clone(self, build: impl FnOnce(Chain<T>)) -> Chain<'w, 's, 'a, 'b, T>
396    where
397        T: Clone,
398    {
399        Chain::<T>::new(self.target, self.builder)
400            .fork_clone((|chain: Chain<T>| chain.output(), build))
401            .0
402            .chain(self.builder)
403    }
404
405    /// When the response is delivered, we will make clones of it and
406    /// simultaneously pass that clone along mutliple branches, each one
407    /// determined by a different element of the tuple that gets passed in as
408    /// a builder.
409    ///
410    /// The return values of the individual chain builders will be zipped into
411    /// one tuple return value by this function. If all of the builders return
412    /// [`Output`] then you can easily continue chaining more operations using
413    /// [`join`](crate::Joinable::join), or destructure them into individual
414    /// outputs that you can continue to build with.
415    pub fn fork_clone<Build: ForkCloneBuilder<T>>(self, build: Build) -> Build::Outputs
416    where
417        T: Clone,
418    {
419        build.build_fork_clone(Output::new(self.scope(), self.target), self.builder)
420    }
421
422    /// If you have a `Chain<(A, B, C, ...)>` with a tuple response then
423    /// `unzip` allows you to convert it into a tuple of chains:
424    /// `(Output<A>, Output<B>, Output<C>, ...)`.
425    ///
426    /// You can also consider using `fork_unzip` to continue building each
427    /// chain in the tuple independently by providing a builder function for
428    /// each element of the tuple.
429    pub fn unzip(self) -> T::Unzipped
430    where
431        T: Unzippable,
432    {
433        T::unzip_output(Output::<T>::new(self.scope(), self.target), self.builder)
434    }
435
436    /// If you have a `Chain<(A, B, C, ...)>` with a tuple response then
437    /// `fork_unzip` allows you to split it into multiple chains (one for each
438    /// tuple element) and apply a separate builder function to each chain. You
439    /// will be passed back the zipped return values of all the builder functions.
440    pub fn fork_unzip<Build>(self, build: Build) -> Build::ReturnType
441    where
442        Build: UnzipBuilder<T>,
443    {
444        build.fork_unzip(Output::<T>::new(self.scope(), self.target), self.builder)
445    }
446
447    /// If `T` implements [`Iterator`] then you can fire off each of its elements
448    /// as a new thread within the workflow. Each thread will still have the same
449    /// session ID.
450    ///
451    /// This is similar to streams which can produce multiple outputs, which also
452    /// potentially creates multiple threads in the workflow. If the input to
453    /// the spread operator is empty, then a disposal notice will go out, and
454    /// the workflow will be cancelled if it is no longer possible to reach the
455    /// terminal node.
456    pub fn spread(self) -> Chain<'w, 's, 'a, 'b, T::Item>
457    where
458        T: IntoIterator,
459        T::Item: 'static + Send + Sync,
460    {
461        let source = self.target;
462        let target = self.builder.commands.spawn(UnusedTarget).id();
463        self.builder.commands.queue(AddOperation::new(
464            Some(self.builder.scope()),
465            source,
466            Spread::<T>::new(target),
467        ));
468
469        Chain::new(target, self.builder)
470    }
471
472    /// Collect incoming workflow threads into a container.
473    ///
474    /// If `max` is specified, the collection will always be sent out once it
475    /// reaches that maximum value.
476    ///
477    /// If `min` is greater than 0 then the collection will not be sent out unless
478    /// it is equal to or greater than that value. Note that this means the
479    /// collect operation could force the workflow into cancelling if it cannot
480    /// reach the minimum number of elements. A `min` of 0 means that if an
481    /// upstream thread is disposed and the collect node is no longer reachable
482    /// then it will fire off with an empty collection.
483    ///
484    /// If the `min` limit is satisfied and there are no remaining workflow
485    /// threads that can reach this collect operation, then the collection will
486    /// be sent out with however many elements it happens to have collected.
487    pub fn collect<const N: usize>(
488        self,
489        min: usize,
490        max: Option<usize>,
491    ) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
492        if let Some(max) = max {
493            assert!(0 < max);
494            assert!(min <= max);
495        }
496
497        let source = self.target;
498        let target = self.builder.commands.spawn(UnusedTarget).id();
499        self.builder.commands.queue(AddOperation::new(
500            Some(self.builder.scope()),
501            source,
502            Collect::<T, N>::new(target, min, max),
503        ));
504
505        Chain::new(target, self.builder)
506    }
507
508    /// Collect all workflow threads that are moving towards this node.
509    pub fn collect_all<const N: usize>(self) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
510        self.collect::<N>(0, None)
511    }
512
513    /// Collect an exact number of threads that are moving towards this node.
514    pub fn collect_n<const N: usize>(self, n: usize) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
515        self.collect::<N>(n, Some(n))
516    }
517
518    // TODO(@mxgrey): We could offer a collect_array that always collects exactly
519    // N, defined at compile time, and outputs a fixed-size array. This will require
520    // a second implementation of Collect, or a more generic implementation of it.
521
522    /// Run a trimming operation when the workflow reaches this point.
523    ///
524    /// See also: [`Self::then_trim_node`], [`Builder::create_trim`].
525    pub fn then_trim(
526        self,
527        branches: impl IntoIterator<Item = TrimBranch>,
528    ) -> Chain<'w, 's, 'a, 'b, T> {
529        let branches: SmallVec<[_; 16]> = branches.into_iter().collect();
530        for branch in &branches {
531            branch.verify_scope(self.builder.scope());
532        }
533
534        let source = self.target;
535        let target = self.builder.commands.spawn(UnusedTarget).id();
536        self.builder.commands.queue(AddOperation::new(
537            Some(self.builder.scope()),
538            source,
539            Trim::<T>::new(branches, target),
540        ));
541
542        Chain::new(target, self.builder)
543    }
544
545    /// Run a trimming operation when the workflow reaches this point. This will
546    /// return a [`Node`] so you can connect other inputs into the operation.
547    ///
548    /// See also: [`Self::then_trim`], [`Builder::create_trim`].
549    pub fn then_trim_node(self, branches: impl IntoIterator<Item = TrimBranch>) -> Node<T, T> {
550        let source = self.target;
551        let scope = self.builder.scope();
552        let target = self.then_trim(branches).output().id();
553        Node {
554            input: InputSlot::new(scope, source),
555            output: Output::new(scope, target),
556            streams: (),
557        }
558    }
559
560    /// Push the value into a buffer then emit a trigger once the value is
561    /// inside the buffer.
562    ///
563    /// If the buffer is broken (e.g. its operation has been despawned) the
564    /// workflow will be cancelled.
565    pub fn then_push(self, buffer: Buffer<T>) -> Chain<'w, 's, 'a, 'b, ()> {
566        assert_eq!(self.scope(), buffer.scope());
567        self.with_access(buffer)
568            .then(push_into_buffer.into_blocking_callback())
569            .cancel_on_err()
570    }
571
572    /// Apply a [gate action](Gate) to one or more buffers at this point
573    /// in the workflow.
574    pub fn then_gate_action<B>(self, action: Gate, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
575    where
576        B: Bufferable,
577    {
578        let buffers = buffers.into_buffer(self.builder);
579        buffers.verify_scope(self.builder.scope());
580
581        let source = self.target;
582        let target = self.builder.commands.spawn(UnusedTarget).id();
583        self.builder.commands.queue(AddOperation::new(
584            Some(self.builder.scope()),
585            source,
586            OperateStaticGate::<T, _>::new(buffers, target, action),
587        ));
588
589        Chain::new(target, self.builder)
590    }
591
592    /// [Open the gates](Gate) of one or more buffers at this point in the
593    /// workflow.
594    pub fn then_gate_open<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
595    where
596        B: Bufferable,
597    {
598        self.then_gate_action(Gate::Open, buffers)
599    }
600
601    /// [Close the gates](Gate) of one or more buffers at this point in
602    /// the workflow.
603    pub fn then_gate_close<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
604    where
605        B: Bufferable,
606    {
607        self.then_gate_action(Gate::Closed, buffers)
608    }
609
610    /// If the chain's response implements the [`Future`] trait, applying
611    /// `.flatten()` to the chain will yield the output of that Future as the
612    /// chain's response.
613    pub fn flatten(self) -> Chain<'w, 's, 'a, 'b, T::Output>
614    where
615        T: Future,
616        T::Output: 'static + Send + Sync,
617    {
618        self.map_async(|r| r)
619    }
620
621    /// If the chain's response implements the [`Splittable`] trait, then this
622    /// will insert a split operation and provide your `build` function with the
623    /// [`SplitBuilder`] for it. This returns the return value of your build
624    /// function.
625    pub fn split<U>(self, build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, T>) -> U) -> U
626    where
627        T: Splittable,
628    {
629        let source = self.target;
630        self.builder.commands.queue(AddOperation::new(
631            Some(self.builder.scope()),
632            source,
633            OperateSplit::<T>::default(),
634        ));
635
636        build(SplitBuilder::new(source, self.builder))
637    }
638
639    /// If the chain's response implements the [`Splittable`] trait, then this
640    /// will insert a split and provide a container for its available outputs.
641    /// To build connections to these outputs later, use [`SplitOutputs::build`].
642    ///
643    /// This is equivalent to
644    /// ```text
645    /// .split(|split| split.outputs())
646    /// ```
647    pub fn split_outputs(self) -> SplitOutputs<T>
648    where
649        T: Splittable,
650    {
651        self.split(|b| b.outputs())
652    }
653
654    /// If the chain's response can be turned into an iterator with an appropriate
655    /// item type, this will allow it to be split in a list-like way.
656    ///
657    /// This is equivalent to
658    /// ```text
659    /// .map_block(SplitAsList::new).split(build)
660    /// ```
661    pub fn split_as_list<U>(
662        self,
663        build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, SplitAsList<T>>) -> U,
664    ) -> U
665    where
666        T: IntoIterator,
667        T::Item: 'static + Send + Sync,
668    {
669        self.map_block(SplitAsList::new).split(build)
670    }
671
672    /// If the chain's response can be turned into an iterator with an appropriate
673    /// item type, this will insert a split and provide a container for its
674    /// available outputs. To build connections to these outputs later, use
675    /// [`SplitOutputs::build`].
676    ///
677    /// This is equivalent to
678    /// ```text
679    /// .split_as_list(|split| split.outputs())
680    /// ```
681    pub fn split_as_list_outputs(self) -> SplitOutputs<SplitAsList<T>>
682    where
683        T: IntoIterator,
684        T::Item: 'static + Send + Sync,
685    {
686        self.split_as_list(|b| b.outputs())
687    }
688
689    /// Add a [no-op][1] to the current end of the chain.
690    ///
691    /// As the name suggests, a no-op will not actually do anything, but it adds
692    /// a new operation point into the workflow. That operation point could be
693    /// used as a reference point for other operations, like
694    /// [trimming](Self::then_trim).
695    ///
696    /// [1]: `<https://en.wikipedia.org/wiki/NOP_(code)>`
697    pub fn noop(self) -> Chain<'w, 's, 'a, 'b, T> {
698        let source = self.target;
699        let target = self.builder.commands.spawn(UnusedTarget).id();
700
701        self.builder.commands.queue(AddOperation::new(
702            Some(self.scope()),
703            source,
704            Noop::<T>::new(target),
705        ));
706        Chain::new(target, self.builder)
707    }
708
709    /// Get a whole node that is simply a [no-op](Self::noop).
710    pub fn noop_node(self) -> Node<T, T> {
711        let source = self.target;
712        let scope = self.builder.scope();
713        let target = self.noop().output().id();
714        Node {
715            input: InputSlot::new(scope, source),
716            output: Output::new(scope, target),
717            streams: (),
718        }
719    }
720
721    /// The scope that the chain is building inside of.
722    pub fn scope(&self) -> Entity {
723        self.builder.scope()
724    }
725
726    /// The target where the chain will be sending its latest output.
727    pub fn target(&self) -> Entity {
728        self.target
729    }
730}
731
732impl<'w, 's, 'a, 'b, T, E> Chain<'w, 's, 'a, 'b, Result<T, E>>
733where
734    T: 'static + Send + Sync,
735    E: 'static + Send + Sync,
736{
737    /// Build a chain that activates if the response is an [`Err`]. If the
738    /// response is [`Ok`] then this branch will not be activated.
739    ///
740    /// This function returns a chain that will be activated if the result was
741    /// [`Ok`] so you can continue building your response to the [`Ok`] case.
742    pub fn branch_for_err(self, build_err: impl FnOnce(Chain<E>)) -> Chain<'w, 's, 'a, 'b, T> {
743        Chain::<Result<T, E>>::new(self.target, self.builder)
744            .fork_result(|chain| chain.output(), build_err)
745            .0
746            .chain(self.builder)
747    }
748
749    /// Build two branching chains, one for the case where the response is [`Ok`]
750    /// and one if the response is [`Err`]. Whichever chain does not get activated
751    /// will be disposed, which means it gets dropped without triggering any
752    /// cancellation effects.
753    ///
754    /// The outputs of both builder functions will be zipped as the return value
755    /// of this function.
756    pub fn fork_result<U, V>(
757        self,
758        build_ok: impl FnOnce(Chain<T>) -> U,
759        build_err: impl FnOnce(Chain<E>) -> V,
760    ) -> (U, V) {
761        let source = self.target;
762        let target_ok = self.builder.commands.spawn(UnusedTarget).id();
763        let target_err = self.builder.commands.spawn(UnusedTarget).id();
764
765        self.builder.commands.queue(AddOperation::new(
766            Some(self.scope()),
767            source,
768            make_result_branching::<T, E>(ForkTargetStorage::from_iter([target_ok, target_err])),
769        ));
770
771        let u = build_ok(Chain::new(target_ok, self.builder));
772        let v = build_err(Chain::new(target_err, self.builder));
773        (u, v)
774    }
775
776    /// If the result contains an [`Err`] value then connect the error result
777    /// to an input that takes in a [`Result`] with the same [`Err`] variant.
778    /// If the result contained an [`Ok`] value then it will be passed along to
779    /// the next step in the chain.
780    ///
781    /// This is meant to replicate the `?` operator used in normal Rust
782    /// programming.
783    pub fn connect_on_err<U>(self, input: InputSlot<Result<U, E>>) -> Chain<'w, 's, 'a, 'b, T>
784    where
785        U: 'static + Send + Sync,
786    {
787        self.branch_for_err(move |chain: Chain<E>| chain.map_block(|err| Err(err)).connect(input))
788    }
789
790    /// If the result contains an [`Err`] value then the entire scope that
791    /// contains this operation will be immediately cancelled. If the scope is
792    /// within a node of an outer workflow, then the node will emit a disposal
793    /// for its outer workflow. Otherwise if this is the root scope of a workflow
794    /// then the whole workflow is immediately cancelled. This effect will happen
795    /// even if the scope is set to be uninterruptible.
796    ///
797    /// This operation only works for results with an [`Err`] variant that
798    /// implements the [`Error`] trait. If your [`Err`] variant does not implement
799    /// that trait, then you can use [`Self::cancel_on_quiet_err`] instead.
800    ///
801    /// ```
802    /// use crossflow::{prelude::*, testing::*};
803    ///
804    /// let mut context = TestingContext::minimal_plugins();
805    ///
806    /// let workflow = context.spawn_io_workflow(|scope, builder| {
807    ///     scope.input.chain(builder)
808    ///         .map_block(produce_err)
809    ///         .cancel_on_err()
810    ///         .connect(scope.terminate);
811    /// });
812    ///
813    /// let mut promise = context.command(|commands| {
814    ///     commands
815    ///     .request("hello", workflow)
816    ///     .take_response()
817    /// });
818    ///
819    /// context.run_while_pending(&mut promise);
820    /// assert!(promise.peek().is_cancelled());
821    /// ```
822    pub fn cancel_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
823    where
824        E: Error,
825    {
826        let source = self.target;
827        let target = self.builder.commands.spawn(UnusedTarget).id();
828
829        self.builder.commands.queue(AddOperation::new(
830            Some(self.scope()),
831            source,
832            CreateCancelFilter::on_err::<T, E>(target),
833        ));
834
835        Chain::new(target, self.builder)
836    }
837
838    /// Same as [`Self::cancel_on_err`] except it also works for [`Err`] variants
839    /// that do not implement [`Error`]. The catch is that their error message
840    /// will not be included in the [`Filtered`](crate::Filtered) information
841    /// that gets propagated outward.
842    pub fn cancel_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
843        let source = self.target;
844        let target = self.builder.commands.spawn(UnusedTarget).id();
845
846        self.builder.commands.queue(AddOperation::new(
847            Some(self.scope()),
848            source,
849            CreateCancelFilter::on_quiet_err::<T, E>(target),
850        ));
851
852        Chain::new(target, self.builder)
853    }
854
855    /// If the output contains an [`Err`] value then the output will be disposed.
856    ///
857    /// Disposal means that the node that the output is connected to will simply
858    /// not be triggered, but the workflow is not necessarily cancelled. If a
859    /// disposal makes it impossible for the workflow to terminate, then the
860    /// workflow will be cancelled immediately.
861    pub fn dispose_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
862    where
863        E: Error,
864    {
865        let source = self.target;
866        let target = self.builder.commands.spawn(UnusedTarget).id();
867
868        self.builder.commands.queue(AddOperation::new(
869            Some(self.scope()),
870            source,
871            CreateDisposalFilter::on_err::<T, E>(target),
872        ));
873
874        Chain::new(target, self.builder)
875    }
876
877    /// Same as [`Self::dispose_on_err`] except it also works for [`Err`] variants
878    /// that do not implement [`Error`]. The catch is that their error message
879    /// will not be included in the [`Filtered`](crate::Filtered) information
880    /// that gets propagated outward.
881    pub fn dispose_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
882        let source = self.target;
883        let target = self.builder.commands.spawn(UnusedTarget).id();
884
885        self.builder.commands.queue(AddOperation::new(
886            Some(self.scope()),
887            source,
888            CreateDisposalFilter::on_quiet_err::<T, E>(target),
889        ));
890
891        Chain::new(target, self.builder)
892    }
893
894    /// Inverse of [`Self::dispose_on_err`], this will dispose [`Ok`] results
895    /// and pass along [`Err`] values. This can be used in cases where you are
896    /// monitoring only for errors and are not concerned about any [`Ok`] results.
897    pub fn dispose_on_ok(self) -> Chain<'w, 's, 'a, 'b, E> {
898        let source = self.target;
899        let target = self.builder.commands.spawn(UnusedTarget).id();
900
901        self.builder.commands.queue(AddOperation::new(
902            Some(self.scope()),
903            source,
904            CreateDisposalFilter::on_ok::<T, E>(target),
905        ));
906
907        Chain::new(target, self.builder)
908    }
909}
910
911impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, Option<T>>
912where
913    T: 'static + Send + Sync,
914{
915    /// Build a chain that activates if the response is [`None`]. If the response
916    /// is [`Some`] then the branch built by this function will not be activated.
917    ///
918    /// This function returns a chain that will be activated if the result was
919    /// [`Some`] so you can continue building your response to the [`Some`] case.
920    pub fn branch_for_none(self, build_none: impl FnOnce(Chain<()>)) -> Chain<'w, 's, 'a, 'b, T> {
921        Chain::<Option<T>>::new(self.target, self.builder)
922            .fork_option(|chain| chain.output(), build_none)
923            .0
924            .chain(self.builder)
925    }
926
927    /// Build two branching chains, one for the case where the response is [`Some`]
928    /// and one if the response is [`None`]. Whichever chain does not get activated
929    /// will be disposed, which means it gets dropped without triggering any
930    /// cancellation effects.
931    ///
932    /// The outputs of both builder functions will be zipped as the return value
933    /// of this function.
934    pub fn fork_option<U, V>(
935        self,
936        build_some: impl FnOnce(Chain<T>) -> U,
937        build_none: impl FnOnce(Chain<()>) -> V,
938    ) -> (U, V) {
939        let source = self.target;
940        let target_some = self.builder.commands.spawn(UnusedTarget).id();
941        let target_none = self.builder.commands.spawn(UnusedTarget).id();
942
943        self.builder.commands.queue(AddOperation::new(
944            Some(self.scope()),
945            source,
946            make_option_branching::<T>(ForkTargetStorage::from_iter([target_some, target_none])),
947        ));
948
949        let u = build_some(Chain::new(target_some, self.builder));
950        let v = build_none(Chain::new(target_none, self.builder));
951        (u, v)
952    }
953
954    /// If the result contains a [`None`] value then connect the trigger to
955    /// an input that takes in an [`Option`] with any [`Some`] variant.
956    /// If the result contained a [`Some`] value then it will be passed along to
957    /// the next step in the chain.
958    ///
959    /// This is meant to replicate the `?` operator used in normal Rust
960    /// programming.
961    pub fn connect_on_none<U>(self, input: InputSlot<Option<U>>) -> Chain<'w, 's, 'a, 'b, T>
962    where
963        U: 'static + Send + Sync,
964    {
965        self.branch_for_none(move |chain: Chain<()>| chain.map_block(|_| None).connect(input))
966    }
967
968    /// If the result contains a [`None`] value then the chain will be cancelled
969    /// from this link onwards. The next link in the chain will receive a `T` if
970    /// the chain is not cancelled.
971    pub fn cancel_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
972        let source = self.target;
973        let target = self.builder.commands.spawn(UnusedTarget).id();
974
975        self.builder.commands.queue(AddOperation::new(
976            Some(self.scope()),
977            source,
978            CreateCancelFilter::on_none::<T>(target),
979        ));
980
981        Chain::new(target, self.builder)
982    }
983
984    /// If the output contains [`None`] value then the output will be disposed.
985    ///
986    /// Disposal means that the node that the output is connected to will simply
987    /// not be triggered, but the workflow is not necessarily cancelled. If a
988    /// disposal makes it impossible for the workflow to terminate, then the
989    /// workflow will be cancelled immediately.
990    pub fn dispose_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
991        let source = self.target;
992        let target = self.builder.commands.spawn(UnusedTarget).id();
993
994        self.builder.commands.queue(AddOperation::new(
995            Some(self.scope()),
996            source,
997            CreateDisposalFilter::on_none::<T>(target),
998        ));
999
1000        Chain::new(target, self.builder)
1001    }
1002
1003    /// Inverse of [`Self::dispose_on_none`], this will dispose [`Some`] values
1004    /// and pass along a trigger `()` for [`None`] values. This can be used in
1005    /// cases where you are monitoring only for [`None`] values and are not
1006    /// concerned about any [`Some`] values.
1007    pub fn dispose_on_some(self) -> Chain<'w, 's, 'a, 'b, ()> {
1008        let source = self.target;
1009        let target = self.builder.commands.spawn(UnusedTarget).id();
1010
1011        self.builder.commands.queue(AddOperation::new(
1012            Some(self.scope()),
1013            source,
1014            CreateDisposalFilter::on_some::<T>(target),
1015        ));
1016
1017        Chain::new(target, self.builder)
1018    }
1019}
1020
1021impl<'w, 's, 'a, 'b, K, V, T> Chain<'w, 's, 'a, 'b, T>
1022where
1023    K: 'static + Send + Sync + Eq + std::hash::Hash + Clone + std::fmt::Debug,
1024    V: 'static + Send + Sync,
1025    T: 'static + Send + Sync + IntoIterator<Item = (K, V)>,
1026{
1027    /// If the chain's response type can be turned into an iterator that returns
1028    /// `(key, value)` pairs, then this will split it in a map-like way, whether
1029    /// or not it is a conventional map data structure.
1030    ///
1031    /// This is equivalent to
1032    /// ```text
1033    /// .map_block(SplitAsMap::new).split(build)
1034    /// ```
1035    pub fn split_as_map<U>(
1036        self,
1037        build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, SplitAsMap<K, V, T>>) -> U,
1038    ) -> U {
1039        self.map_block(SplitAsMap::new).split(build)
1040    }
1041
1042    /// If the chain's response type can be turned into an iterator that returns
1043    /// `(key, value)` pairs, then this will split it in a map-like way and
1044    /// provide a container for its available outputs. To build connections to
1045    /// these outputs later, use [`SplitOutputs::build`].
1046    ///
1047    /// This is equivalent to
1048    /// ```text
1049    /// .split_as_map(|split| split.outputs())
1050    /// ```
1051    pub fn split_as_map_outputs(self) -> SplitOutputs<SplitAsMap<K, V, T>> {
1052        self.split_as_map(|b| b.outputs())
1053    }
1054}
1055
1056impl<'w, 's, 'a, 'b, Request, Response, Streams>
1057    Chain<'w, 's, 'a, 'b, (Request, ServiceInstructions<Request, Response, Streams>)>
1058where
1059    Request: 'static + Send + Sync,
1060    Response: 'static + Send + Sync + Unpin,
1061    Streams: StreamPack,
1062{
1063    /// Given the input `(request, service)`, pass `request` into `service` and
1064    /// forward its response. This is called `injection` because it's a
1065    /// [dependency injection](https://en.wikipedia.org/wiki/Dependency_injection)
1066    /// operation.
1067    ///
1068    /// Since it's possible for `service` to fail for
1069    /// various reasons, this returns a [`Result`]. Follow this with
1070    /// `.dispose_on_err` to filter away errors.
1071    ///
1072    /// To access the streams of the service, use [`Chain::then_injection_node`].
1073    pub fn then_injection(self) -> Chain<'w, 's, 'a, 'b, Response> {
1074        let source = self.target;
1075        let node = self
1076            .builder
1077            .create_injection_impl::<Request, Response, Streams>(source);
1078        node.output.chain(self.builder)
1079    }
1080
1081    /// Given the input `(request, service)`, pass `request` into `service` and
1082    /// forward its streams and response. This is called `injection` because it's a
1083    /// [dependency injection](https://en.wikipedia.org/wiki/Dependency_injection)
1084    /// operation.
1085    ///
1086    /// Since it's possible for `service` to
1087    /// fail for various reasons, this returns a [`Result`]. Follow this with
1088    /// `.dispose_on_err` to filter away errors.
1089    pub fn then_injection_node(
1090        self,
1091    ) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams> {
1092        let source = self.target;
1093        self.builder
1094            .create_injection_impl::<Request, Response, Streams>(source)
1095    }
1096}
1097
1098impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, GateRequest<T>>
1099where
1100    T: 'static + Send + Sync,
1101{
1102    pub fn then_gate<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
1103    where
1104        B: Bufferable,
1105        B::BufferType: 'static + Send + Sync,
1106    {
1107        let buffers = buffers.into_buffer(self.builder);
1108        buffers.verify_scope(self.builder.scope());
1109
1110        let source = self.target;
1111        let target = self.builder.commands.spawn(UnusedTarget).id();
1112        self.builder.commands.queue(AddOperation::new(
1113            Some(self.builder.scope()),
1114            source,
1115            OperateDynamicGate::<T, _>::new(buffers, target),
1116        ));
1117
1118        Chain::new(target, self.builder)
1119    }
1120}
1121
1122impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, BufferKey<T>>
1123where
1124    T: 'static + Send + Sync,
1125{
1126    pub fn consume_buffer<const N: usize>(self) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
1127        self.then(consume_buffer.into_blocking_callback())
1128    }
1129}
1130
1131impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
1132    /// Used internally to create a [`Chain`] that can accept a label
1133    /// and hook into streams.
1134    pub(crate) fn new(target: Entity, builder: &'b mut Builder<'w, 's, 'a>) -> Self {
1135        Self {
1136            target,
1137            builder,
1138            _ignore: Default::default(),
1139        }
1140    }
1141}
1142
1143impl<'w, 's, 'a, 'b, K, V> Chain<'w, 's, 'a, 'b, (K, V)>
1144where
1145    K: 'static + Send + Sync,
1146    V: 'static + Send + Sync,
1147{
1148    /// If the chain's response contains a `(key, value)` pair, get the `key`
1149    /// component from it (the first element of the tuple).
1150    pub fn key(self) -> Chain<'w, 's, 'a, 'b, K> {
1151        self.map_block(|(key, _)| key)
1152    }
1153
1154    /// If the chain's response contains a `(key, value)` pair, get the `value`
1155    /// component from it (the second element of the tuple).
1156    pub fn value(self) -> Chain<'w, 's, 'a, 'b, V> {
1157        self.map_block(|(_, value)| value)
1158    }
1159}
1160
1161impl<'w, 's, 'a, 'b> Chain<'w, 's, 'a, 'b, ()> {
1162    /// When the chain reaches this point, cancel the workflow.
1163    ///
1164    /// If you want to include information about the value that triggered the
1165    /// cancellation, use [`Self::then_cancel`].
1166    pub fn then_quiet_cancel(self) {
1167        self.builder.commands.queue(AddOperation::new(
1168            Some(self.scope()),
1169            self.target,
1170            OperateQuietCancel,
1171        ));
1172    }
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177    use crate::{prelude::*, testing::*};
1178    use smallvec::SmallVec;
1179
1180    #[test]
1181    fn test_join() {
1182        let mut context = TestingContext::minimal_plugins();
1183
1184        let workflow = context.spawn_io_workflow(|scope, builder| {
1185            scope
1186                .input
1187                .chain(builder)
1188                // (2.0, 2.0)
1189                .fork_unzip((
1190                    |chain: Chain<f64>| {
1191                        chain
1192                            // 2.0
1193                            .map_block(|value| WaitRequest {
1194                                duration: Duration::from_secs_f64(value / 100.0),
1195                                value,
1196                            })
1197                            .map_async(wait)
1198                            // 2.0
1199                            .output()
1200                    },
1201                    |chain: Chain<f64>| {
1202                        chain
1203                            // 2.0
1204                            .map_block(|value| 2.0 * value)
1205                            // 4.0
1206                            .output()
1207                    },
1208                ))
1209                .join(builder)
1210                // (2.0, 4.0)
1211                .map_block(add)
1212                // 6.0
1213                .connect(scope.terminate);
1214        });
1215
1216        let mut promise =
1217            context.command(|commands| commands.request((2.0, 2.0), workflow).take_response());
1218
1219        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1220        assert!(promise.take().available().is_some_and(|value| value == 6.0));
1221        assert!(context.no_unhandled_errors());
1222    }
1223
1224    #[test]
1225    fn test_race() {
1226        let mut context = TestingContext::minimal_plugins();
1227
1228        let workflow = context.spawn_io_workflow(|scope, builder| {
1229            scope
1230                .input
1231                .chain(builder)
1232                // (2.0, 2.0)
1233                .map_block(add)
1234                // 4.0
1235                .then_io_scope(|scope, builder| {
1236                    scope
1237                        .input
1238                        .chain(builder)
1239                        // 4.0
1240                        .fork_clone((
1241                            |chain: Chain<f64>| {
1242                                // 4.0
1243                                chain
1244                                    .map_block(|value| WaitRequest {
1245                                        duration: Duration::from_secs_f64(0.01 * value),
1246                                        value,
1247                                    })
1248                                    .map_async(wait)
1249                                    // 4.0
1250                                    .connect(scope.terminate);
1251                            },
1252                            |chain: Chain<f64>| {
1253                                // 4.0
1254                                chain
1255                                    .map_block(|a| (a, a))
1256                                    // (4.0, 4.0)
1257                                    .map_block(add)
1258                                    // 8.0
1259                                    .connect(scope.terminate);
1260                            },
1261                        ));
1262                })
1263                // This should be won by the 8.0 branch because it does not wait,
1264                // while the 4.0 branch should wait for 0.04s.
1265                .map_block(|a| (a, a))
1266                // (8.0, 8.0)
1267                .map_block(add)
1268                // 16.0
1269                .connect(scope.terminate);
1270        });
1271
1272        let mut promise =
1273            context.command(|commands| commands.request((2.0, 2.0), workflow).take_response());
1274
1275        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1276
1277        assert_eq!(promise.take().available(), Some(16.0));
1278        assert!(context.no_unhandled_errors());
1279    }
1280
1281    #[test]
1282    fn test_unzip() {
1283        let mut context = TestingContext::minimal_plugins();
1284
1285        let workflow = context.spawn_io_workflow(|scope, builder| {
1286            scope
1287                .input
1288                .chain(builder)
1289                .map_block(add)
1290                .map_block(|v| (v, 2.0 * v))
1291                .then_io_scope(|scope, builder| {
1292                    scope.input.chain(builder).fork_unzip((
1293                        |chain: Chain<f64>| {
1294                            chain
1295                                .map_block(|v| (v, 10.0))
1296                                .map_block(add)
1297                                .connect(scope.terminate);
1298                        },
1299                        |chain: Chain<f64>| {
1300                            chain
1301                                .map_block(|value| WaitRequest {
1302                                    duration: std::time::Duration::from_secs_f64(0.01),
1303                                    value,
1304                                })
1305                                .map_async(wait)
1306                                .connect(scope.terminate);
1307                        },
1308                    ));
1309                })
1310                .connect(scope.terminate);
1311        });
1312
1313        let mut promise =
1314            context.command(|commands| commands.request((2.0, 3.0), workflow).take_response());
1315
1316        context.run_while_pending(&mut promise);
1317        assert_eq!(promise.take().available(), Some(15.0));
1318        assert!(context.no_unhandled_errors());
1319    }
1320
1321    #[test]
1322    fn test_cancel_on_special_case() {
1323        let mut context = TestingContext::minimal_plugins();
1324
1325        let workflow = context.spawn_io_workflow(|scope, builder| {
1326            scope
1327                .input
1328                .chain(builder)
1329                .map_block(duplicate)
1330                .map_block(add)
1331                .map_block(produce_none)
1332                .cancel_on_none()
1333                .map_block(duplicate)
1334                .map_block(add)
1335                .connect(scope.terminate);
1336        });
1337
1338        let mut promise =
1339            context.command(|commands| commands.request(2.0, workflow).take_response());
1340
1341        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1342        assert!(promise.peek().is_cancelled());
1343        assert!(context.no_unhandled_errors());
1344
1345        let workflow = context.spawn_io_workflow(|scope, builder| {
1346            scope
1347                .input
1348                .chain(builder)
1349                .map_block(duplicate)
1350                .map_block(add)
1351                .map_block(produce_err)
1352                .cancel_on_quiet_err()
1353                .map_block(duplicate)
1354                .map_block(add)
1355                .connect(scope.terminate);
1356        });
1357
1358        let mut promise =
1359            context.command(|commands| commands.request(2.0, workflow).take_response());
1360
1361        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1362        assert!(promise.peek().is_cancelled());
1363        assert!(context.no_unhandled_errors());
1364    }
1365
1366    #[test]
1367    fn test_disposal() {
1368        let mut context = TestingContext::minimal_plugins();
1369
1370        let workflow = context.spawn_io_workflow(|scope, builder| {
1371            scope
1372                .input
1373                .chain(builder)
1374                .map_block(duplicate)
1375                .map_block(add)
1376                .map_block(produce_none)
1377                .dispose_on_none()
1378                .map_block(duplicate)
1379                .map_block(add)
1380                .connect(scope.terminate);
1381        });
1382
1383        let mut promise =
1384            context.command(|commands| commands.request(2.0, workflow).take_response());
1385
1386        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1387        assert!(promise.peek().is_cancelled());
1388        assert!(context.no_unhandled_errors());
1389
1390        let workflow = context.spawn_io_workflow(
1391            |scope: Scope<Result<f64, Result<f64, TestError>>, f64>, builder| {
1392                scope.input.chain(builder).fork_result(
1393                    |chain| chain.connect(scope.terminate),
1394                    |chain| chain.dispose_on_err().connect(scope.terminate),
1395                );
1396            },
1397        );
1398
1399        let mut promise =
1400            context.command(|commands| commands.request(Ok(1.0), workflow).take_response());
1401
1402        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1403        assert!(promise.take().available().is_some_and(|v| v == 1.0));
1404        assert!(context.no_unhandled_errors());
1405
1406        let mut promise =
1407            context.command(|commands| commands.request(Err(Ok(5.0)), workflow).take_response());
1408
1409        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1410        assert!(promise.take().available().is_some_and(|v| v == 5.0));
1411        assert!(context.no_unhandled_errors());
1412
1413        let mut promise = context.command(|commands| {
1414            commands
1415                .request(Err(Err(TestError)), workflow)
1416                .take_response()
1417        });
1418
1419        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1420        assert!(promise.peek().is_cancelled());
1421        assert!(context.no_unhandled_errors());
1422    }
1423
1424    #[test]
1425    fn test_spread() {
1426        let mut context = TestingContext::minimal_plugins();
1427
1428        let workflow = context.spawn_io_workflow(|scope, builder| {
1429            let buffer = builder.create_buffer(BufferSettings::keep_all());
1430
1431            scope
1432                .input
1433                .chain(builder)
1434                .map_block(|value| {
1435                    let mut duplicated_values: SmallVec<[i32; 16]> = SmallVec::new();
1436                    for _ in 0..value {
1437                        duplicated_values.push(value);
1438                    }
1439                    duplicated_values
1440                })
1441                .spread()
1442                .connect(buffer.input_slot());
1443
1444            buffer
1445                .listen(builder)
1446                .then(watch_for_quantity.into_blocking_callback())
1447                .dispose_on_none()
1448                .connect(scope.terminate);
1449        });
1450
1451        let mut promise = context.command(|commands| commands.request(7, workflow).take_response());
1452
1453        context.run_with_conditions(&mut promise, 1);
1454        assert!(promise
1455            .take()
1456            .available()
1457            .is_some_and(|v| { v.len() == 7 && v.iter().find(|a| **a != 7).is_none() }));
1458        assert!(context.no_unhandled_errors());
1459    }
1460
1461    // This is essentially a collect operation specific to one of our spread
1462    // operation tests. We expect to gather up a number of elements equal to the
1463    // integer value of those elements. We don't use the collect operation for
1464    // this test so that we can test each of those operations in isolation.
1465    fn watch_for_quantity(
1466        In(key): In<BufferKey<i32>>,
1467        mut access: BufferAccessMut<i32>,
1468    ) -> Option<SmallVec<[i32; 16]>> {
1469        let mut buffer = access.get_mut(&key).unwrap();
1470        let expected_count = *buffer.newest()? as usize;
1471        if buffer.len() < expected_count {
1472            return None;
1473        }
1474
1475        Some(buffer.drain(..).collect())
1476    }
1477
1478    #[test]
1479    fn test_collect() {
1480        let mut context = TestingContext::minimal_plugins();
1481
1482        let workflow =
1483            context.spawn_io_workflow(|scope, builder| {
1484                let node = scope.input.chain(builder).map_node(
1485                    |input: BlockingMap<i32, StreamOf<i32>>| {
1486                        for _ in 0..input.request {
1487                            input.streams.send(input.request);
1488                        }
1489                    },
1490                );
1491
1492                node.streams
1493                    .chain(builder)
1494                    .collect_all::<16>()
1495                    .connect(scope.terminate);
1496            });
1497
1498        let mut promise = context.command(|commands| commands.request(8, workflow).take_response());
1499
1500        context.run_with_conditions(&mut promise, 1);
1501        assert!(promise
1502            .take()
1503            .available()
1504            .is_some_and(|v| { v.len() == 8 && v.iter().find(|a| **a != 8).is_none() }));
1505        assert!(context.no_unhandled_errors());
1506
1507        let workflow =
1508            context.spawn_io_workflow(|scope, builder| {
1509                let node = scope.input.chain(builder).map_node(
1510                    |input: BlockingMap<i32, StreamOf<i32>>| {
1511                        for _ in 0..input.request {
1512                            input.streams.send(input.request);
1513                        }
1514                    },
1515                );
1516
1517                node.streams
1518                    .chain(builder)
1519                    .collect::<16>(4, None)
1520                    .connect(scope.terminate);
1521            });
1522
1523        check_collection(0, 4, workflow, &mut context);
1524        check_collection(2, 4, workflow, &mut context);
1525        check_collection(3, 4, workflow, &mut context);
1526        check_collection(4, 4, workflow, &mut context);
1527        check_collection(5, 4, workflow, &mut context);
1528        check_collection(8, 4, workflow, &mut context);
1529
1530        let workflow = context.spawn_io_workflow(|scope, builder| {
1531            let bogus_node = builder.create_map_block(|v: i32| v);
1532            bogus_node
1533                .output
1534                .chain(builder)
1535                .collect_all::<16>()
1536                .connect(scope.terminate);
1537
1538            scope
1539                .input
1540                .chain(builder)
1541                .map_block(|v: i32| Some(v))
1542                .fork_option(
1543                    |chain: Chain<i32>| {
1544                        chain
1545                            .map_async(|v| async move { v })
1546                            .collect_all::<16>()
1547                            .connect(scope.terminate)
1548                    },
1549                    |chain: Chain<()>| chain.map_block(|()| unreachable!()).unused(),
1550                );
1551        });
1552
1553        let mut promise = context.command(|commands| commands.request(2, workflow).take_response());
1554
1555        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1556        assert!(promise
1557            .take()
1558            .available()
1559            .is_some_and(|v| v.len() == 1 && v.iter().find(|a| **a != 2).is_none()));
1560        assert!(context.no_unhandled_errors());
1561
1562        let workflow = context.spawn_io_workflow(|scope, builder| {
1563            scope
1564                .input
1565                .chain(builder)
1566                .map_block(|v| if v < 4 { None } else { Some(v) })
1567                .dispose_on_none()
1568                .collect_all::<8>()
1569                .connect(scope.terminate);
1570        });
1571
1572        let mut promise = context.command(|commands| commands.request(2, workflow).take_response());
1573
1574        context.run_with_conditions(&mut promise, 1);
1575        assert!(promise.take().available().is_some_and(|v| v.is_empty()));
1576        assert!(context.no_unhandled_errors());
1577
1578        let mut promise = context.command(|commands| commands.request(5, workflow).take_response());
1579
1580        context.run_with_conditions(&mut promise, 1);
1581        assert!(promise
1582            .take()
1583            .available()
1584            .is_some_and(|v| v.len() == 1 && v.iter().find(|a| **a != 5).is_none()));
1585        assert!(context.no_unhandled_errors());
1586    }
1587
1588    fn check_collection(
1589        value: i32,
1590        min: i32,
1591        workflow: Service<i32, SmallVec<[i32; 16]>>,
1592        context: &mut TestingContext,
1593    ) {
1594        let mut promise =
1595            context.command(|commands| commands.request(value, workflow).take_response());
1596
1597        context.run_with_conditions(&mut promise, 1);
1598        if value < min {
1599            assert!(promise.take().is_cancelled());
1600        } else {
1601            assert!(promise.take().available().is_some_and(|v| {
1602                v.len() == value as usize && v.iter().find(|a| **a != value).is_none()
1603            }));
1604        }
1605        assert!(context.no_unhandled_errors());
1606    }
1607
1608    #[test]
1609    fn test_unused_branch() {
1610        let mut context = TestingContext::minimal_plugins();
1611
1612        let workflow =
1613            context.spawn_io_workflow(|scope: Scope<Vec<Result<i64, ()>>, i64>, builder| {
1614                scope
1615                    .input
1616                    .chain(builder)
1617                    .spread()
1618                    .fork_result(|ok| ok.connect(scope.terminate), |err| err.unused());
1619            });
1620
1621        let test_set = vec![Err(()), Err(()), Ok(5), Err(()), Ok(10)];
1622        let mut promise =
1623            context.command(|commands| commands.request(test_set, workflow).take_response());
1624
1625        context.run_with_conditions(&mut promise, Duration::from_secs(2));
1626        assert!(context.no_unhandled_errors());
1627        assert_eq!(promise.take().available().unwrap(), 5);
1628    }
1629}