crossflow/
chain.rs

1/*
2 * Copyright (C) 2023 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use std::future::Future;
19
20use bevy_ecs::prelude::Entity;
21
22use smallvec::SmallVec;
23
24use std::error::Error;
25
26use crate::{
27    Accessing, AddOperation, AsMap, Buffer, BufferKey, BufferKeys, Bufferable, Buffering, Builder,
28    Collect, CreateCancelFilter, CreateDisposalFilter, ForkTargetStorage, Gate, GateRequest,
29    InputSlot, IntoAsyncMap, IntoBlockingCallback, IntoBlockingMap, Node, Noop,
30    OperateBufferAccess, OperateCancel, OperateDynamicGate, OperateQuietCancel, OperateSplit,
31    OperateStaticGate, Output, ProvideOnce, Provider, Scope, ScopeSettings, Sendish,
32    ServiceInstructions, Spread, StreamPack, StreamTargetMap, Trim, TrimBranch, UnusedTarget,
33    make_option_branching, make_result_branching,
34};
35
36pub mod fork_clone_builder;
37pub use fork_clone_builder::*;
38
39pub(crate) mod premade;
40use premade::*;
41
42pub mod split;
43pub use split::*;
44
45pub mod unzip;
46pub use unzip::*;
47
48/// Chain operations onto the output of a workflow node.
49///
50/// Make sure to use [`Self::connect`] when you're done chaining so that the
51/// final output of the chain gets connected into another node. If the final
52/// output of the chain is meant to be the final output of your workflow then
53/// you should connect it to [`Scope::terminate`].
54#[must_use]
55pub struct Chain<'w, 's, 'a, 'b, T> {
56    target: Entity,
57    builder: &'b mut Builder<'w, 's, 'a>,
58    _ignore: std::marker::PhantomData<fn(T)>,
59}
60
61impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
62    /// Get the raw [`Output`] slot for the current link in the chain. You can
63    /// use this to resume building this chain later.
64    ///
65    /// Note that if you do not connect some path of your workflow into the
66    /// `terminate` slot of your [`Scope`] then the workflow will not be able
67    /// to run.
68    pub fn output(self) -> Output<T> {
69        Output::new(self.scope(), self.target)
70    }
71
72    /// Get the raw [`Output`] slot for the current link in the chain, along with
73    /// the builder. This can be used to do more complex building inside of
74    /// chain builder functions.
75    pub fn unpack(self) -> (Output<T>, &'b mut Builder<'w, 's, 'a>) {
76        (Output::new(self.scope(), self.target), self.builder)
77    }
78
79    /// Connect this output into an input slot.
80    ///
81    /// Pass a [terminate](crate::Scope::terminate) into this function to
82    /// end a chain.
83    pub fn connect(self, input: InputSlot<T>) {
84        let output = Output::new(self.scope(), self.target);
85        self.builder.connect(output, input)
86    }
87
88    /// Explicitly terminate the chain by indicating that you want it to remain
89    /// unused.
90    pub fn unused(self) {
91        // Do nothing
92    }
93
94    /// Connect the response at the end of the chain into a new provider. Get
95    /// the response of the new provider as a chain so you can continue chaining
96    /// operations.
97    pub fn then<P: Provider<Request = T>>(self, provider: P) -> Chain<'w, 's, 'a, 'b, P::Response>
98    where
99        P::Response: 'static + Send + Sync,
100    {
101        let source = self.target;
102        let target = self.builder.commands.spawn(UnusedTarget).id();
103        provider.connect(
104            Some(self.builder.scope()),
105            source,
106            target,
107            self.builder.commands,
108        );
109        Chain::new(target, self.builder)
110    }
111
112    /// Connect the response in the chain into a new provider. Get the node
113    /// slots that wrap around the new provider.
114    pub fn then_node<P: Provider<Request = T>>(
115        self,
116        provider: P,
117    ) -> Node<T, P::Response, P::Streams>
118    where
119        P::Response: 'static + Send + Sync,
120        P::Streams: StreamPack,
121    {
122        let source = self.target;
123        let target = self.builder.commands.spawn(UnusedTarget).id();
124        provider.connect(Some(self.scope()), source, target, self.builder.commands);
125
126        let mut map = StreamTargetMap::default();
127        let streams =
128            <P::Streams as StreamPack>::spawn_node_streams(source, &mut map, self.builder);
129        self.builder.commands.entity(source).insert(map);
130        Node {
131            input: InputSlot::new(self.builder.scope(), source),
132            output: Output::new(self.builder.scope(), target),
133            streams,
134        }
135    }
136
137    /// Apply a function whose input is [`BlockingMap<T>`](crate::BlockingMap)
138    /// or [`AsyncMap<T>`](crate::AsyncMap).
139    pub fn map<M, F: AsMap<M>>(
140        self,
141        f: F,
142    ) -> Chain<'w, 's, 'a, 'b, <F::MapType as ProvideOnce>::Response>
143    where
144        F::MapType: Provider<Request = T>,
145        <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
146    {
147        self.then(f.as_map())
148    }
149
150    /// Same as [`Self::map`] but receive the new node instead of continuing a
151    /// chain.
152    pub fn map_node<M, F: AsMap<M>>(
153        self,
154        f: F,
155    ) -> Node<T, <F::MapType as ProvideOnce>::Response, <F::MapType as ProvideOnce>::Streams>
156    where
157        F::MapType: Provider<Request = T>,
158        <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
159        <F::MapType as ProvideOnce>::Streams: StreamPack,
160    {
161        self.then_node(f.as_map())
162    }
163
164    /// Apply a function whose input is the Response of the current Chain. The
165    /// output of the map will be the Response of the returned Chain.
166    ///
167    /// This takes in a regular blocking function rather than an async function,
168    /// so while the function is executing, it will block all systems and all
169    /// other workflows from running.
170    pub fn map_block<U>(
171        self,
172        f: impl FnMut(T) -> U + 'static + Send + Sync,
173    ) -> Chain<'w, 's, 'a, 'b, U>
174    where
175        U: 'static + Send + Sync,
176    {
177        self.then(f.into_blocking_map())
178    }
179
180    /// Same as [`Self::map_block`] but receive the new node instead of
181    /// continuing a chain.
182    pub fn map_block_node<U>(self, f: impl FnMut(T) -> U + 'static + Send + Sync) -> Node<T, U, ()>
183    where
184        U: 'static + Send + Sync,
185    {
186        self.then_node(f.into_blocking_map())
187    }
188
189    /// Apply a map whose output is a Future that will be run in the
190    /// [`AsyncComputeTaskPool`](bevy_tasks::AsyncComputeTaskPool) (unless
191    /// the `single_threaded_async` feature is active). The output of the Future
192    /// will be the Response of the returned Chain.
193    pub fn map_async<Task>(
194        self,
195        f: impl FnMut(T) -> Task + 'static + Send + Sync,
196    ) -> Chain<'w, 's, 'a, 'b, Task::Output>
197    where
198        Task: Future + 'static + Sendish,
199        Task::Output: 'static + Send + Sync,
200    {
201        self.then(f.into_async_map())
202    }
203
204    /// Same as [`Self::map_async`] but receive the new node instead of
205    /// continuing a chain.
206    pub fn map_async_node<Task>(
207        self,
208        f: impl FnMut(T) -> Task + 'static + Send + Sync,
209    ) -> Node<T, Task::Output, ()>
210    where
211        Task: Future + 'static + Sendish,
212        Task::Output: 'static + Send + Sync,
213    {
214        self.then_node(f.into_async_map())
215    }
216
217    /// Build a workflow scope to be used as an element in this chain.
218    ///
219    /// If you want to connect to the stream outputs or be able to loop back
220    /// to the input of this scope, use [`Self::then_scope_node`] instead.
221    pub fn then_scope<Response, Streams, Settings>(
222        self,
223        build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
224    ) -> Chain<'w, 's, 'a, 'b, Response>
225    where
226        Response: 'static + Send + Sync,
227        Streams: StreamPack,
228        Settings: Into<ScopeSettings>,
229    {
230        let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
231        self.builder
232            .create_scope_impl::<T, Response, Streams, Settings>(self.target, exit_scope, build)
233            .output
234            .chain(self.builder)
235    }
236
237    /// Simplified version of [`Self::then_scope`] limited to a simple input and
238    /// output.
239    ///
240    /// Unlike `then_scope`, this function can infer the types for the generics
241    /// so you don't need to explicitly specify them.
242    pub fn then_io_scope<Response, Settings>(
243        self,
244        build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
245    ) -> Chain<'w, 's, 'a, 'b, Response>
246    where
247        Response: 'static + Send + Sync,
248        Settings: Into<ScopeSettings>,
249    {
250        self.then_scope(build)
251    }
252
253    /// From the current target in the chain, build a [scoped](Scope) workflow
254    /// and then get back a node that represents that scoped workflow.
255    pub fn then_scope_node<Response, Streams, Settings>(
256        self,
257        build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
258    ) -> Node<T, Response, Streams>
259    where
260        Response: 'static + Send + Sync,
261        Streams: StreamPack,
262        Settings: Into<ScopeSettings>,
263    {
264        let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
265        self.builder
266            .create_scope_impl::<T, Response, Streams, Settings>(self.target, exit_scope, build)
267    }
268
269    /// Simplified version of [`Self::then_scope_node`] limited to a simple
270    /// input and output.
271    ///
272    /// Unlike `then_scope_node`, this function can infer the types for the
273    /// generics so you don't need to explicitly specify them.
274    pub fn then_io_scope_node<Response, Settings>(
275        self,
276        build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
277    ) -> Node<T, Response, ()>
278    where
279        Response: 'static + Send + Sync,
280        Settings: Into<ScopeSettings>,
281    {
282        self.then_scope_node(build)
283    }
284
285    /// Many services just need to be triggered without being give any particular
286    /// input. The convention for those services is to take an input of `()`.
287    ///
288    /// We call this "triggering", and this function is a convenient way to turn
289    /// any output type into a trigger.
290    pub fn trigger(self) -> Chain<'w, 's, 'a, 'b, ()> {
291        self.map_block(|_| ())
292    }
293
294    /// Combine the output with access to some buffers. You must specify one or
295    /// more buffers to access. For multiple buffers, combine them into a tuple
296    /// or an [`Iterator`]. Tuples of buffers can be nested inside each other.
297    ///
298    /// Other [outputs](Output) can also be passed in as buffers. Those outputs
299    /// will be transformed into a buffer with default buffer settings.
300    ///
301    /// To obtain a set of buffer keys each time a buffer is modified, use
302    /// [`listen`](crate::Accessible::listen).
303    pub fn with_access<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, (T, BufferKeys<B>)>
304    where
305        B: Bufferable,
306        B::BufferType: Accessing,
307    {
308        let buffers = buffers.into_buffer(self.builder);
309        buffers.verify_scope(self.builder.scope());
310
311        let source = self.target;
312        let target = self.builder.commands.spawn(UnusedTarget).id();
313        self.builder.commands.queue(AddOperation::new(
314            Some(self.builder.scope()),
315            source,
316            OperateBufferAccess::<T, B::BufferType>::new(buffers, target),
317        ));
318
319        Chain::new(target, self.builder)
320    }
321
322    /// After the previous operation is finished, trigger a new operation whose
323    /// input is simply the access keys for one or more buffers.
324    pub fn then_access<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, BufferKeys<B>>
325    where
326        B: Bufferable,
327        B::BufferType: Accessing,
328    {
329        self.with_access(buffers).map_block(|(_, key)| key)
330    }
331
332    /// Apply a [`Provider`] that filters the response by returning an [`Option`].
333    /// If the filter returns [`None`] then a [`Cancellation`](crate::Cancellation)
334    /// is triggered. Otherwise the chain continues with the value that was
335    /// inside [`Some`].
336    ///
337    /// This is conceptually similar to [`Iterator::filter_map`]. You can also
338    /// use [`Chain::disposal_filter`] to dispose of the value instead of
339    /// cancelling the entire scope.
340    pub fn cancellation_filter<ThenResponse, F>(
341        self,
342        filter_provider: F,
343    ) -> Chain<'w, 's, 'a, 'b, ThenResponse>
344    where
345        ThenResponse: 'static + Send + Sync,
346        F: Provider<Request = T, Response = Option<ThenResponse>>,
347        F::Response: 'static + Send + Sync,
348        F::Streams: StreamPack,
349    {
350        self.then(filter_provider).cancel_on_none()
351    }
352
353    /// When the chain reaches this point, cancel the workflow and include
354    /// information about the value that triggered the cancellation. The input
355    /// type must implement [`ToString`].
356    ///
357    /// If you want to trigger a cancellation with a type that does not
358    /// implement [`ToString`] then use [`Self::trigger`] and then
359    /// [`Self::then_quiet_cancel`].
360    pub fn then_cancel(self)
361    where
362        T: ToString,
363    {
364        self.builder.commands.queue(AddOperation::new(
365            Some(self.scope()),
366            self.target,
367            OperateCancel::<T>::new(),
368        ));
369    }
370
371    /// Same as [`Chain::cancellation_filter`] but the chain will be disposed
372    /// instead of cancelled, so the workflow may continue if the termination
373    /// node can still be reached.
374    pub fn disposal_filter<ThenResponse, F>(
375        self,
376        filter_provider: F,
377    ) -> Chain<'w, 's, 'a, 'b, ThenResponse>
378    where
379        ThenResponse: 'static + Send + Sync,
380        F: Provider<Request = T, Response = Option<ThenResponse>>,
381        F::Response: 'static + Send + Sync,
382        F::Streams: StreamPack,
383    {
384        self.then(filter_provider).dispose_on_none()
385    }
386
387    /// When the response is delivered, we will make a clone of it and
388    /// simultaneously pass that clone along two different branches chains: one
389    /// determined by the `build` function passed into this operation and the
390    /// other determined by the [`Chain`] that gets returned.
391    ///
392    /// This can only be applied when `Response` can be cloned.
393    ///
394    /// See also [`Chain::fork_clone`]
395    pub fn branch_clone(self, build: impl FnOnce(Chain<T>)) -> Chain<'w, 's, 'a, 'b, T>
396    where
397        T: Clone,
398    {
399        Chain::<T>::new(self.target, self.builder)
400            .fork_clone((|chain: Chain<T>| chain.output(), build))
401            .0
402            .chain(self.builder)
403    }
404
405    /// When the response is delivered, we will make clones of it and
406    /// simultaneously pass that clone along mutliple branches, each one
407    /// determined by a different element of the tuple that gets passed in as
408    /// a builder.
409    ///
410    /// The return values of the individual chain builders will be zipped into
411    /// one tuple return value by this function. If all of the builders return
412    /// [`Output`] then you can easily continue chaining more operations using
413    /// [`join`](crate::Joinable::join), or destructure them into individual
414    /// outputs that you can continue to build with.
415    pub fn fork_clone<Build: ForkCloneBuilder<T>>(self, build: Build) -> Build::Outputs
416    where
417        T: Clone,
418    {
419        build.build_fork_clone(Output::new(self.scope(), self.target), self.builder)
420    }
421
422    /// If you have a `Chain<(A, B, C, ...)>` with a tuple response then
423    /// `unzip` allows you to convert it into a tuple of chains:
424    /// `(Output<A>, Output<B>, Output<C>, ...)`.
425    ///
426    /// You can also consider using `fork_unzip` to continue building each
427    /// chain in the tuple independently by providing a builder function for
428    /// each element of the tuple.
429    pub fn unzip(self) -> T::Unzipped
430    where
431        T: Unzippable,
432    {
433        T::unzip_output(Output::<T>::new(self.scope(), self.target), self.builder)
434    }
435
436    /// If you have a `Chain<(A, B, C, ...)>` with a tuple response then
437    /// `fork_unzip` allows you to split it into multiple chains (one for each
438    /// tuple element) and apply a separate builder function to each chain. You
439    /// will be passed back the zipped return values of all the builder functions.
440    pub fn fork_unzip<Build>(self, build: Build) -> Build::ReturnType
441    where
442        Build: UnzipBuilder<T>,
443    {
444        build.fork_unzip(Output::<T>::new(self.scope(), self.target), self.builder)
445    }
446
447    /// If `T` implements [`Iterator`] then you can fire off each of its elements
448    /// as a new thread within the workflow. Each thread will still have the same
449    /// session ID.
450    ///
451    /// This is similar to streams which can produce multiple outputs, which also
452    /// potentially creates multiple threads in the workflow. If the input to
453    /// the spread operator is empty, then a disposal notice will go out, and
454    /// the workflow will be cancelled if it is no longer possible to reach the
455    /// terminal node.
456    pub fn spread(self) -> Chain<'w, 's, 'a, 'b, T::Item>
457    where
458        T: IntoIterator,
459        T::Item: 'static + Send + Sync,
460    {
461        let source = self.target;
462        let target = self.builder.commands.spawn(UnusedTarget).id();
463        self.builder.commands.queue(AddOperation::new(
464            Some(self.builder.scope()),
465            source,
466            Spread::<T>::new(target),
467        ));
468
469        Chain::new(target, self.builder)
470    }
471
472    /// Collect incoming workflow threads into a container.
473    ///
474    /// If `max` is specified, the collection will always be sent out once it
475    /// reaches that maximum value.
476    ///
477    /// If `min` is greater than 0 then the collection will not be sent out unless
478    /// it is equal to or greater than that value. Note that this means the
479    /// collect operation could force the workflow into cancelling if it cannot
480    /// reach the minimum number of elements. A `min` of 0 means that if an
481    /// upstream thread is disposed and the collect node is no longer reachable
482    /// then it will fire off with an empty collection.
483    ///
484    /// If the `min` limit is satisfied and there are no remaining workflow
485    /// threads that can reach this collect operation, then the collection will
486    /// be sent out with however many elements it happens to have collected.
487    pub fn collect<const N: usize>(
488        self,
489        min: usize,
490        max: Option<usize>,
491    ) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
492        if let Some(max) = max {
493            assert!(0 < max);
494            assert!(min <= max);
495        }
496
497        let source = self.target;
498        let target = self.builder.commands.spawn(UnusedTarget).id();
499        self.builder.commands.queue(AddOperation::new(
500            Some(self.builder.scope()),
501            source,
502            Collect::<T, N>::new(target, min, max),
503        ));
504
505        Chain::new(target, self.builder)
506    }
507
508    /// Collect all workflow threads that are moving towards this node.
509    pub fn collect_all<const N: usize>(self) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
510        self.collect::<N>(0, None)
511    }
512
513    /// Collect an exact number of threads that are moving towards this node.
514    pub fn collect_n<const N: usize>(self, n: usize) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
515        self.collect::<N>(n, Some(n))
516    }
517
518    // TODO(@mxgrey): We could offer a collect_array that always collects exactly
519    // N, defined at compile time, and outputs a fixed-size array. This will require
520    // a second implementation of Collect, or a more generic implementation of it.
521
522    /// Run a trimming operation when the workflow reaches this point.
523    ///
524    /// See also: [`Self::then_trim_node`], [`Builder::create_trim`].
525    pub fn then_trim(
526        self,
527        branches: impl IntoIterator<Item = TrimBranch>,
528    ) -> Chain<'w, 's, 'a, 'b, T> {
529        let branches: SmallVec<[_; 16]> = branches.into_iter().collect();
530        for branch in &branches {
531            branch.verify_scope(self.builder.scope());
532        }
533
534        let source = self.target;
535        let target = self.builder.commands.spawn(UnusedTarget).id();
536        self.builder.commands.queue(AddOperation::new(
537            Some(self.builder.scope()),
538            source,
539            Trim::<T>::new(branches, target),
540        ));
541
542        Chain::new(target, self.builder)
543    }
544
545    /// Run a trimming operation when the workflow reaches this point. This will
546    /// return a [`Node`] so you can connect other inputs into the operation.
547    ///
548    /// See also: [`Self::then_trim`], [`Builder::create_trim`].
549    pub fn then_trim_node(self, branches: impl IntoIterator<Item = TrimBranch>) -> Node<T, T> {
550        let source = self.target;
551        let scope = self.builder.scope();
552        let target = self.then_trim(branches).output().id();
553        Node {
554            input: InputSlot::new(scope, source),
555            output: Output::new(scope, target),
556            streams: (),
557        }
558    }
559
560    /// Push the value into a buffer then emit a trigger once the value is
561    /// inside the buffer.
562    ///
563    /// If the buffer is broken (e.g. its operation has been despawned) the
564    /// workflow will be cancelled.
565    pub fn then_push(self, buffer: Buffer<T>) -> Chain<'w, 's, 'a, 'b, ()> {
566        assert_eq!(self.scope(), buffer.scope());
567        self.with_access(buffer)
568            .then(push_into_buffer.into_blocking_callback())
569            .cancel_on_err()
570    }
571
572    /// Apply a [gate action](Gate) to one or more buffers at this point
573    /// in the workflow.
574    pub fn then_gate_action<B>(self, action: Gate, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
575    where
576        B: Bufferable,
577    {
578        let buffers = buffers.into_buffer(self.builder);
579        buffers.verify_scope(self.builder.scope());
580
581        let source = self.target;
582        let target = self.builder.commands.spawn(UnusedTarget).id();
583        self.builder.commands.queue(AddOperation::new(
584            Some(self.builder.scope()),
585            source,
586            OperateStaticGate::<T, _>::new(buffers, target, action),
587        ));
588
589        Chain::new(target, self.builder)
590    }
591
592    /// [Open the gates](Gate) of one or more buffers at this point in the
593    /// workflow.
594    pub fn then_gate_open<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
595    where
596        B: Bufferable,
597    {
598        self.then_gate_action(Gate::Open, buffers)
599    }
600
601    /// [Close the gates](Gate) of one or more buffers at this point in
602    /// the workflow.
603    pub fn then_gate_close<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
604    where
605        B: Bufferable,
606    {
607        self.then_gate_action(Gate::Closed, buffers)
608    }
609
610    /// If the chain's response implements the [`Future`] trait, applying
611    /// `.flatten()` to the chain will yield the output of that Future as the
612    /// chain's response.
613    pub fn flatten(self) -> Chain<'w, 's, 'a, 'b, T::Output>
614    where
615        T: Future,
616        T::Output: 'static + Send + Sync,
617    {
618        self.map_async(|r| r)
619    }
620
621    /// If the chain's response implements the [`Splittable`] trait, then this
622    /// will insert a split operation and provide your `build` function with the
623    /// [`SplitBuilder`] for it. This returns the return value of your build
624    /// function.
625    pub fn split<U>(self, build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, T>) -> U) -> U
626    where
627        T: Splittable,
628    {
629        let source = self.target;
630        self.builder.commands.queue(AddOperation::new(
631            Some(self.builder.scope()),
632            source,
633            OperateSplit::<T>::default(),
634        ));
635
636        build(SplitBuilder::new(source, self.builder))
637    }
638
639    /// If the chain's response implements the [`Splittable`] trait, then this
640    /// will insert a split and provide a container for its available outputs.
641    /// To build connections to these outputs later, use [`SplitOutputs::build`].
642    ///
643    /// This is equivalent to
644    /// ```text
645    /// .split(|split| split.outputs())
646    /// ```
647    pub fn split_outputs(self) -> SplitOutputs<T>
648    where
649        T: Splittable,
650    {
651        self.split(|b| b.outputs())
652    }
653
654    /// If the chain's response can be turned into an iterator with an appropriate
655    /// item type, this will allow it to be split in a list-like way.
656    ///
657    /// This is equivalent to
658    /// ```text
659    /// .map_block(SplitAsList::new).split(build)
660    /// ```
661    pub fn split_as_list<U>(
662        self,
663        build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, SplitAsList<T>>) -> U,
664    ) -> U
665    where
666        T: IntoIterator,
667        T::Item: 'static + Send + Sync,
668    {
669        self.map_block(SplitAsList::new).split(build)
670    }
671
672    /// If the chain's response can be turned into an iterator with an appropriate
673    /// item type, this will insert a split and provide a container for its
674    /// available outputs. To build connections to these outputs later, use
675    /// [`SplitOutputs::build`].
676    ///
677    /// This is equivalent to
678    /// ```text
679    /// .split_as_list(|split| split.outputs())
680    /// ```
681    pub fn split_as_list_outputs(self) -> SplitOutputs<SplitAsList<T>>
682    where
683        T: IntoIterator,
684        T::Item: 'static + Send + Sync,
685    {
686        self.split_as_list(|b| b.outputs())
687    }
688
689    /// Add a [no-op][1] to the current end of the chain.
690    ///
691    /// As the name suggests, a no-op will not actually do anything, but it adds
692    /// a new operation point into the workflow. That operation point could be
693    /// used as a reference point for other operations, like
694    /// [trimming](Self::then_trim).
695    ///
696    /// [1]: `<https://en.wikipedia.org/wiki/NOP_(code)>`
697    pub fn noop(self) -> Chain<'w, 's, 'a, 'b, T> {
698        let source = self.target;
699        let target = self.builder.commands.spawn(UnusedTarget).id();
700
701        self.builder.commands.queue(AddOperation::new(
702            Some(self.scope()),
703            source,
704            Noop::<T>::new(target),
705        ));
706        Chain::new(target, self.builder)
707    }
708
709    /// Get a whole node that is simply a [no-op](Self::noop).
710    pub fn noop_node(self) -> Node<T, T> {
711        let source = self.target;
712        let scope = self.builder.scope();
713        let target = self.noop().output().id();
714        Node {
715            input: InputSlot::new(scope, source),
716            output: Output::new(scope, target),
717            streams: (),
718        }
719    }
720
721    /// The scope that the chain is building inside of.
722    pub fn scope(&self) -> Entity {
723        self.builder.scope()
724    }
725
726    /// The target where the chain will be sending its latest output.
727    pub fn target(&self) -> Entity {
728        self.target
729    }
730}
731
732impl<'w, 's, 'a, 'b, T, E> Chain<'w, 's, 'a, 'b, Result<T, E>>
733where
734    T: 'static + Send + Sync,
735    E: 'static + Send + Sync,
736{
737    /// Build a chain that activates if the response is an [`Err`]. If the
738    /// response is [`Ok`] then this branch will not be activated.
739    ///
740    /// This function returns a chain that will be activated if the result was
741    /// [`Ok`] so you can continue building your response to the [`Ok`] case.
742    pub fn branch_for_err(self, build_err: impl FnOnce(Chain<E>)) -> Chain<'w, 's, 'a, 'b, T> {
743        Chain::<Result<T, E>>::new(self.target, self.builder)
744            .fork_result(|chain| chain.output(), build_err)
745            .0
746            .chain(self.builder)
747    }
748
749    /// Build two branching chains, one for the case where the response is [`Ok`]
750    /// and one if the response is [`Err`]. Whichever chain does not get activated
751    /// will be disposed, which means it gets dropped without triggering any
752    /// cancellation effects.
753    ///
754    /// The outputs of both builder functions will be zipped as the return value
755    /// of this function.
756    pub fn fork_result<U, V>(
757        self,
758        build_ok: impl FnOnce(Chain<T>) -> U,
759        build_err: impl FnOnce(Chain<E>) -> V,
760    ) -> (U, V) {
761        let source = self.target;
762        let target_ok = self.builder.commands.spawn(UnusedTarget).id();
763        let target_err = self.builder.commands.spawn(UnusedTarget).id();
764
765        self.builder.commands.queue(AddOperation::new(
766            Some(self.scope()),
767            source,
768            make_result_branching::<T, E>(ForkTargetStorage::from_iter([target_ok, target_err])),
769        ));
770
771        let u = build_ok(Chain::new(target_ok, self.builder));
772        let v = build_err(Chain::new(target_err, self.builder));
773        (u, v)
774    }
775
776    /// If the result contains an [`Err`] value then connect the error result
777    /// to an input that takes in a [`Result`] with the same [`Err`] variant.
778    /// If the result contained an [`Ok`] value then it will be passed along to
779    /// the next step in the chain.
780    ///
781    /// This is meant to replicate the `?` operator used in normal Rust
782    /// programming.
783    pub fn connect_on_err<U>(self, input: InputSlot<Result<U, E>>) -> Chain<'w, 's, 'a, 'b, T>
784    where
785        U: 'static + Send + Sync,
786    {
787        self.branch_for_err(move |chain: Chain<E>| chain.map_block(|err| Err(err)).connect(input))
788    }
789
790    /// If the result contains an [`Err`] value then the entire scope that
791    /// contains this operation will be immediately cancelled. If the scope is
792    /// within a node of an outer workflow, then the node will emit a disposal
793    /// for its outer workflow. Otherwise if this is the root scope of a workflow
794    /// then the whole workflow is immediately cancelled. This effect will happen
795    /// even if the scope is set to be uninterruptible.
796    ///
797    /// This operation only works for results with an [`Err`] variant that
798    /// implements the [`Error`] trait. If your [`Err`] variant does not implement
799    /// that trait, then you can use [`Self::cancel_on_quiet_err`] instead.
800    ///
801    /// ```
802    /// use crossflow::{prelude::*, testing::*};
803    ///
804    /// let mut context = TestingContext::minimal_plugins();
805    ///
806    /// let workflow = context.spawn_io_workflow(|scope, builder| {
807    ///     builder
808    ///         .chain(scope.start)
809    ///         .map_block(produce_err)
810    ///         .cancel_on_err()
811    ///         .connect(scope.terminate);
812    /// });
813    ///
814    /// let outcome = context.try_resolve_request("hello", workflow, ());
815    /// assert!(outcome.is_err());
816    /// ```
817    pub fn cancel_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
818    where
819        E: Error,
820    {
821        let source = self.target;
822        let target = self.builder.commands.spawn(UnusedTarget).id();
823
824        self.builder.commands.queue(AddOperation::new(
825            Some(self.scope()),
826            source,
827            CreateCancelFilter::on_err::<T, E>(target),
828        ));
829
830        Chain::new(target, self.builder)
831    }
832
833    /// Same as [`Self::cancel_on_err`] except it also works for [`Err`] variants
834    /// that do not implement [`Error`]. The catch is that their error message
835    /// will not be included in the [`Filtered`](crate::Filtered) information
836    /// that gets propagated outward.
837    pub fn cancel_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
838        let source = self.target;
839        let target = self.builder.commands.spawn(UnusedTarget).id();
840
841        self.builder.commands.queue(AddOperation::new(
842            Some(self.scope()),
843            source,
844            CreateCancelFilter::on_quiet_err::<T, E>(target),
845        ));
846
847        Chain::new(target, self.builder)
848    }
849
850    /// If the output contains an [`Err`] value then the output will be disposed.
851    ///
852    /// Disposal means that the node that the output is connected to will simply
853    /// not be triggered, but the workflow is not necessarily cancelled. If a
854    /// disposal makes it impossible for the workflow to terminate, then the
855    /// workflow will be cancelled immediately.
856    pub fn dispose_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
857    where
858        E: Error,
859    {
860        let source = self.target;
861        let target = self.builder.commands.spawn(UnusedTarget).id();
862
863        self.builder.commands.queue(AddOperation::new(
864            Some(self.scope()),
865            source,
866            CreateDisposalFilter::on_err::<T, E>(target),
867        ));
868
869        Chain::new(target, self.builder)
870    }
871
872    /// Same as [`Self::dispose_on_err`] except it also works for [`Err`] variants
873    /// that do not implement [`Error`]. The catch is that their error message
874    /// will not be included in the [`Filtered`](crate::Filtered) information
875    /// that gets propagated outward.
876    pub fn dispose_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
877        let source = self.target;
878        let target = self.builder.commands.spawn(UnusedTarget).id();
879
880        self.builder.commands.queue(AddOperation::new(
881            Some(self.scope()),
882            source,
883            CreateDisposalFilter::on_quiet_err::<T, E>(target),
884        ));
885
886        Chain::new(target, self.builder)
887    }
888
889    /// Inverse of [`Self::dispose_on_err`], this will dispose [`Ok`] results
890    /// and pass along [`Err`] values. This can be used in cases where you are
891    /// monitoring only for errors and are not concerned about any [`Ok`] results.
892    pub fn dispose_on_ok(self) -> Chain<'w, 's, 'a, 'b, E> {
893        let source = self.target;
894        let target = self.builder.commands.spawn(UnusedTarget).id();
895
896        self.builder.commands.queue(AddOperation::new(
897            Some(self.scope()),
898            source,
899            CreateDisposalFilter::on_ok::<T, E>(target),
900        ));
901
902        Chain::new(target, self.builder)
903    }
904}
905
906impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, Option<T>>
907where
908    T: 'static + Send + Sync,
909{
910    /// Build a chain that activates if the response is [`None`]. If the response
911    /// is [`Some`] then the branch built by this function will not be activated.
912    ///
913    /// This function returns a chain that will be activated if the result was
914    /// [`Some`] so you can continue building your response to the [`Some`] case.
915    pub fn branch_for_none(self, build_none: impl FnOnce(Chain<()>)) -> Chain<'w, 's, 'a, 'b, T> {
916        Chain::<Option<T>>::new(self.target, self.builder)
917            .fork_option(|chain| chain.output(), build_none)
918            .0
919            .chain(self.builder)
920    }
921
922    /// Build two branching chains, one for the case where the response is [`Some`]
923    /// and one if the response is [`None`]. Whichever chain does not get activated
924    /// will be disposed, which means it gets dropped without triggering any
925    /// cancellation effects.
926    ///
927    /// The outputs of both builder functions will be zipped as the return value
928    /// of this function.
929    pub fn fork_option<U, V>(
930        self,
931        build_some: impl FnOnce(Chain<T>) -> U,
932        build_none: impl FnOnce(Chain<()>) -> V,
933    ) -> (U, V) {
934        let source = self.target;
935        let target_some = self.builder.commands.spawn(UnusedTarget).id();
936        let target_none = self.builder.commands.spawn(UnusedTarget).id();
937
938        self.builder.commands.queue(AddOperation::new(
939            Some(self.scope()),
940            source,
941            make_option_branching::<T>(ForkTargetStorage::from_iter([target_some, target_none])),
942        ));
943
944        let u = build_some(Chain::new(target_some, self.builder));
945        let v = build_none(Chain::new(target_none, self.builder));
946        (u, v)
947    }
948
949    /// If the result contains a [`None`] value then connect the trigger to
950    /// an input that takes in an [`Option`] with any [`Some`] variant.
951    /// If the result contained a [`Some`] value then it will be passed along to
952    /// the next step in the chain.
953    ///
954    /// This is meant to replicate the `?` operator used in normal Rust
955    /// programming.
956    pub fn connect_on_none<U>(self, input: InputSlot<Option<U>>) -> Chain<'w, 's, 'a, 'b, T>
957    where
958        U: 'static + Send + Sync,
959    {
960        self.branch_for_none(move |chain: Chain<()>| chain.map_block(|_| None).connect(input))
961    }
962
963    /// If the result contains a [`None`] value then the chain will be cancelled
964    /// from this link onwards. The next link in the chain will receive a `T` if
965    /// the chain is not cancelled.
966    pub fn cancel_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
967        let source = self.target;
968        let target = self.builder.commands.spawn(UnusedTarget).id();
969
970        self.builder.commands.queue(AddOperation::new(
971            Some(self.scope()),
972            source,
973            CreateCancelFilter::on_none::<T>(target),
974        ));
975
976        Chain::new(target, self.builder)
977    }
978
979    /// If the output contains [`None`] value then the output will be disposed.
980    ///
981    /// Disposal means that the node that the output is connected to will simply
982    /// not be triggered, but the workflow is not necessarily cancelled. If a
983    /// disposal makes it impossible for the workflow to terminate, then the
984    /// workflow will be cancelled immediately.
985    pub fn dispose_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
986        let source = self.target;
987        let target = self.builder.commands.spawn(UnusedTarget).id();
988
989        self.builder.commands.queue(AddOperation::new(
990            Some(self.scope()),
991            source,
992            CreateDisposalFilter::on_none::<T>(target),
993        ));
994
995        Chain::new(target, self.builder)
996    }
997
998    /// Inverse of [`Self::dispose_on_none`], this will dispose [`Some`] values
999    /// and pass along a trigger `()` for [`None`] values. This can be used in
1000    /// cases where you are monitoring only for [`None`] values and are not
1001    /// concerned about any [`Some`] values.
1002    pub fn dispose_on_some(self) -> Chain<'w, 's, 'a, 'b, ()> {
1003        let source = self.target;
1004        let target = self.builder.commands.spawn(UnusedTarget).id();
1005
1006        self.builder.commands.queue(AddOperation::new(
1007            Some(self.scope()),
1008            source,
1009            CreateDisposalFilter::on_some::<T>(target),
1010        ));
1011
1012        Chain::new(target, self.builder)
1013    }
1014}
1015
1016impl<'w, 's, 'a, 'b, K, V, T> Chain<'w, 's, 'a, 'b, T>
1017where
1018    K: 'static + Send + Sync + Eq + std::hash::Hash + Clone + std::fmt::Debug,
1019    V: 'static + Send + Sync,
1020    T: 'static + Send + Sync + IntoIterator<Item = (K, V)>,
1021{
1022    /// If the chain's response type can be turned into an iterator that returns
1023    /// `(key, value)` pairs, then this will split it in a map-like way, whether
1024    /// or not it is a conventional map data structure.
1025    ///
1026    /// This is equivalent to
1027    /// ```text
1028    /// .map_block(SplitAsMap::new).split(build)
1029    /// ```
1030    pub fn split_as_map<U>(
1031        self,
1032        build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, SplitAsMap<K, V, T>>) -> U,
1033    ) -> U {
1034        self.map_block(SplitAsMap::new).split(build)
1035    }
1036
1037    /// If the chain's response type can be turned into an iterator that returns
1038    /// `(key, value)` pairs, then this will split it in a map-like way and
1039    /// provide a container for its available outputs. To build connections to
1040    /// these outputs later, use [`SplitOutputs::build`].
1041    ///
1042    /// This is equivalent to
1043    /// ```text
1044    /// .split_as_map(|split| split.outputs())
1045    /// ```
1046    pub fn split_as_map_outputs(self) -> SplitOutputs<SplitAsMap<K, V, T>> {
1047        self.split_as_map(|b| b.outputs())
1048    }
1049}
1050
1051impl<'w, 's, 'a, 'b, Request, Response, Streams>
1052    Chain<'w, 's, 'a, 'b, (Request, ServiceInstructions<Request, Response, Streams>)>
1053where
1054    Request: 'static + Send + Sync,
1055    Response: 'static + Send + Sync + Unpin,
1056    Streams: StreamPack,
1057{
1058    /// Given the input `(request, service)`, pass `request` into `service` and
1059    /// forward its response. This is called `injection` because it's a
1060    /// [dependency injection](https://en.wikipedia.org/wiki/Dependency_injection)
1061    /// operation.
1062    ///
1063    /// Since it's possible for `service` to fail for
1064    /// various reasons, this returns a [`Result`]. Follow this with
1065    /// `.dispose_on_err` to filter away errors.
1066    ///
1067    /// To access the streams of the service, use [`Chain::then_injection_node`].
1068    pub fn then_injection(self) -> Chain<'w, 's, 'a, 'b, Response> {
1069        let source = self.target;
1070        let node = self
1071            .builder
1072            .create_injection_impl::<Request, Response, Streams>(source);
1073        node.output.chain(self.builder)
1074    }
1075
1076    /// Given the input `(request, service)`, pass `request` into `service` and
1077    /// forward its streams and response. This is called `injection` because it's a
1078    /// [dependency injection](https://en.wikipedia.org/wiki/Dependency_injection)
1079    /// operation.
1080    ///
1081    /// Since it's possible for `service` to
1082    /// fail for various reasons, this returns a [`Result`]. Follow this with
1083    /// `.dispose_on_err` to filter away errors.
1084    pub fn then_injection_node(
1085        self,
1086    ) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams> {
1087        let source = self.target;
1088        self.builder
1089            .create_injection_impl::<Request, Response, Streams>(source)
1090    }
1091}
1092
1093impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, GateRequest<T>>
1094where
1095    T: 'static + Send + Sync,
1096{
1097    pub fn then_gate<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
1098    where
1099        B: Bufferable,
1100        B::BufferType: 'static + Send + Sync,
1101    {
1102        let buffers = buffers.into_buffer(self.builder);
1103        buffers.verify_scope(self.builder.scope());
1104
1105        let source = self.target;
1106        let target = self.builder.commands.spawn(UnusedTarget).id();
1107        self.builder.commands.queue(AddOperation::new(
1108            Some(self.builder.scope()),
1109            source,
1110            OperateDynamicGate::<T, _>::new(buffers, target),
1111        ));
1112
1113        Chain::new(target, self.builder)
1114    }
1115}
1116
1117impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, BufferKey<T>>
1118where
1119    T: 'static + Send + Sync,
1120{
1121    pub fn consume_buffer<const N: usize>(self) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
1122        self.then(consume_buffer.into_blocking_callback())
1123    }
1124}
1125
1126impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
1127    /// Used internally to create a [`Chain`] that can accept a label
1128    /// and hook into streams.
1129    pub(crate) fn new(target: Entity, builder: &'b mut Builder<'w, 's, 'a>) -> Self {
1130        Self {
1131            target,
1132            builder,
1133            _ignore: Default::default(),
1134        }
1135    }
1136}
1137
1138impl<'w, 's, 'a, 'b, K, V> Chain<'w, 's, 'a, 'b, (K, V)>
1139where
1140    K: 'static + Send + Sync,
1141    V: 'static + Send + Sync,
1142{
1143    /// If the chain's response contains a `(key, value)` pair, get the `key`
1144    /// component from it (the first element of the tuple).
1145    pub fn key(self) -> Chain<'w, 's, 'a, 'b, K> {
1146        self.map_block(|(key, _)| key)
1147    }
1148
1149    /// If the chain's response contains a `(key, value)` pair, get the `value`
1150    /// component from it (the second element of the tuple).
1151    pub fn value(self) -> Chain<'w, 's, 'a, 'b, V> {
1152        self.map_block(|(_, value)| value)
1153    }
1154}
1155
1156impl<'w, 's, 'a, 'b> Chain<'w, 's, 'a, 'b, ()> {
1157    /// When the chain reaches this point, cancel the workflow.
1158    ///
1159    /// If you want to include information about the value that triggered the
1160    /// cancellation, use [`Self::then_cancel`].
1161    pub fn then_quiet_cancel(self) {
1162        self.builder.commands.queue(AddOperation::new(
1163            Some(self.scope()),
1164            self.target,
1165            OperateQuietCancel,
1166        ));
1167    }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use crate::{prelude::*, testing::*};
1173    use smallvec::SmallVec;
1174
1175    #[test]
1176    fn test_join() {
1177        let mut context = TestingContext::minimal_plugins();
1178
1179        let workflow = context.spawn_io_workflow(|scope, builder| {
1180            builder
1181                .chain(scope.start)
1182                // (2.0, 2.0)
1183                .fork_unzip((
1184                    |chain: Chain<f64>| {
1185                        chain
1186                            // 2.0
1187                            .map_block(|value| WaitRequest {
1188                                duration: Duration::from_secs_f64(value / 100.0),
1189                                value,
1190                            })
1191                            .map_async(wait)
1192                            // 2.0
1193                            .output()
1194                    },
1195                    |chain: Chain<f64>| {
1196                        chain
1197                            // 2.0
1198                            .map_block(|value| 2.0 * value)
1199                            // 4.0
1200                            .output()
1201                    },
1202                ))
1203                .join(builder)
1204                // (2.0, 4.0)
1205                .map_block(add)
1206                // 6.0
1207                .connect(scope.terminate);
1208        });
1209
1210        let r = context.resolve_request((2.0, 2.0), workflow);
1211        assert_eq!(r, 6.0);
1212    }
1213
1214    #[test]
1215    fn test_race() {
1216        let mut context = TestingContext::minimal_plugins();
1217
1218        let workflow = context.spawn_io_workflow(|scope, builder| {
1219            builder
1220                .chain(scope.start)
1221                // (2.0, 2.0)
1222                .map_block(add)
1223                // 4.0
1224                .then_io_scope(|scope, builder| {
1225                    builder
1226                        .chain(scope.start)
1227                        // 4.0
1228                        .fork_clone((
1229                            |chain: Chain<f64>| {
1230                                // 4.0
1231                                chain
1232                                    .map_block(|value| WaitRequest {
1233                                        duration: Duration::from_secs_f64(0.01 * value),
1234                                        value,
1235                                    })
1236                                    .map_async(wait)
1237                                    // 4.0
1238                                    .connect(scope.terminate);
1239                            },
1240                            |chain: Chain<f64>| {
1241                                // 4.0
1242                                chain
1243                                    .map_block(|a| (a, a))
1244                                    // (4.0, 4.0)
1245                                    .map_block(add)
1246                                    // 8.0
1247                                    .connect(scope.terminate);
1248                            },
1249                        ));
1250                })
1251                // This should be won by the 8.0 branch because it does not wait,
1252                // while the 4.0 branch should wait for 0.04s.
1253                .map_block(|a| (a, a))
1254                // (8.0, 8.0)
1255                .map_block(add)
1256                // 16.0
1257                .connect(scope.terminate);
1258        });
1259
1260        let r = context.resolve_request((2.0, 2.0), workflow);
1261        assert_eq!(r, 16.0);
1262    }
1263
1264    #[test]
1265    fn test_unzip() {
1266        let mut context = TestingContext::minimal_plugins();
1267
1268        let workflow = context.spawn_io_workflow(|scope, builder| {
1269            builder
1270                .chain(scope.start)
1271                .map_block(add)
1272                .map_block(|v| (v, 2.0 * v))
1273                .then_io_scope(|scope, builder| {
1274                    builder.chain(scope.start).fork_unzip((
1275                        |chain: Chain<f64>| {
1276                            chain
1277                                .map_block(|v| (v, 10.0))
1278                                .map_block(add)
1279                                .connect(scope.terminate);
1280                        },
1281                        |chain: Chain<f64>| {
1282                            chain
1283                                .map_block(|value| WaitRequest {
1284                                    duration: std::time::Duration::from_secs_f64(0.01),
1285                                    value,
1286                                })
1287                                .map_async(wait)
1288                                .connect(scope.terminate);
1289                        },
1290                    ));
1291                })
1292                .connect(scope.terminate);
1293        });
1294
1295        let r = context.resolve_request((2.0, 3.0), workflow);
1296        assert_eq!(r, 15.0);
1297    }
1298
1299    #[test]
1300    fn test_cancel_on_special_case() {
1301        let mut context = TestingContext::minimal_plugins();
1302
1303        let workflow = context.spawn_io_workflow(|scope, builder| {
1304            builder
1305                .chain(scope.start)
1306                .map_block(duplicate)
1307                .map_block(add)
1308                .map_block(produce_none)
1309                .cancel_on_none()
1310                .map_block(duplicate)
1311                .map_block(add)
1312                .connect(scope.terminate);
1313        });
1314
1315        let r = context.try_resolve_request(2.0, workflow, ());
1316        assert!(r.is_err());
1317
1318        let workflow = context.spawn_io_workflow(|scope, builder| {
1319            builder
1320                .chain(scope.start)
1321                .map_block(duplicate)
1322                .map_block(add)
1323                .map_block(produce_err)
1324                .cancel_on_quiet_err()
1325                .map_block(duplicate)
1326                .map_block(add)
1327                .connect(scope.terminate);
1328        });
1329
1330        let r = context.try_resolve_request(2.0, workflow, ());
1331        assert!(r.is_err());
1332    }
1333
1334    #[test]
1335    fn test_disposal() {
1336        let mut context = TestingContext::minimal_plugins();
1337
1338        let workflow = context.spawn_io_workflow(|scope, builder| {
1339            builder
1340                .chain(scope.start)
1341                .map_block(duplicate)
1342                .map_block(add)
1343                .map_block(produce_none)
1344                .dispose_on_none()
1345                .map_block(duplicate)
1346                .map_block(add)
1347                .connect(scope.terminate);
1348        });
1349
1350        let r = context.try_resolve_request(2.0, workflow, ());
1351        assert!(r.is_err());
1352
1353        let workflow = context.spawn_io_workflow(
1354            |scope: Scope<Result<f64, Result<f64, TestError>>, f64>, builder| {
1355                builder.chain(scope.start).fork_result(
1356                    |chain| chain.connect(scope.terminate),
1357                    |chain| chain.dispose_on_err().connect(scope.terminate),
1358                );
1359            },
1360        );
1361
1362        let r = context.resolve_request(Ok(1.0), workflow);
1363        assert_eq!(r, 1.0);
1364
1365        let r = context.resolve_request(Err(Ok(5.0)), workflow);
1366        assert_eq!(r, 5.0);
1367
1368        let r = context.try_resolve_request(Err(Err(TestError)), workflow, ());
1369        assert!(r.is_err());
1370    }
1371
1372    #[test]
1373    fn test_spread() {
1374        let mut context = TestingContext::minimal_plugins();
1375
1376        let workflow = context.spawn_io_workflow(|scope, builder| {
1377            let buffer = builder.create_buffer(BufferSettings::keep_all());
1378
1379            builder
1380                .chain(scope.start)
1381                .map_block(|value| {
1382                    let mut duplicated_values: SmallVec<[i32; 16]> = SmallVec::new();
1383                    for _ in 0..value {
1384                        duplicated_values.push(value);
1385                    }
1386                    duplicated_values
1387                })
1388                .spread()
1389                .connect(buffer.input_slot());
1390
1391            buffer
1392                .listen(builder)
1393                .then(watch_for_quantity.into_blocking_callback())
1394                .dispose_on_none()
1395                .connect(scope.terminate);
1396        });
1397
1398        let r = context.try_resolve_request(7, workflow, 1).unwrap();
1399        assert_eq!(r.len(), 7);
1400        assert!(r.iter().all(|v| *v == 7));
1401    }
1402
1403    // This is essentially a collect operation specific to one of our spread
1404    // operation tests. We expect to gather up a number of elements equal to the
1405    // integer value of those elements. We don't use the collect operation for
1406    // this test so that we can test each of those operations in isolation.
1407    fn watch_for_quantity(
1408        In(key): In<BufferKey<i32>>,
1409        mut access: BufferAccessMut<i32>,
1410    ) -> Option<SmallVec<[i32; 16]>> {
1411        let mut buffer = access.get_mut(&key).unwrap();
1412        let expected_count = *buffer.newest()? as usize;
1413        if buffer.len() < expected_count {
1414            return None;
1415        }
1416
1417        Some(buffer.drain(..).collect())
1418    }
1419
1420    #[test]
1421    fn test_collect() {
1422        let mut context = TestingContext::minimal_plugins();
1423
1424        let workflow = context.spawn_io_workflow(|scope, builder| {
1425            let node =
1426                builder
1427                    .chain(scope.start)
1428                    .map_node(|input: BlockingMap<i32, StreamOf<i32>>| {
1429                        for _ in 0..input.request {
1430                            input.streams.send(input.request);
1431                        }
1432                    });
1433
1434            builder
1435                .chain(node.streams)
1436                .collect_all::<16>()
1437                .connect(scope.terminate);
1438        });
1439
1440        let r = context.try_resolve_request(8, workflow, 1).unwrap();
1441        assert_eq!(r.len(), 8);
1442        assert!(r.iter().all(|v| *v == 8));
1443
1444        let workflow = context.spawn_io_workflow(|scope, builder| {
1445            let node =
1446                builder
1447                    .chain(scope.start)
1448                    .map_node(|input: BlockingMap<i32, StreamOf<i32>>| {
1449                        for _ in 0..input.request {
1450                            input.streams.send(input.request);
1451                        }
1452                    });
1453
1454            builder
1455                .chain(node.streams)
1456                .collect::<16>(4, None)
1457                .connect(scope.terminate);
1458        });
1459
1460        check_collection(0, 4, workflow, &mut context);
1461        check_collection(2, 4, workflow, &mut context);
1462        check_collection(3, 4, workflow, &mut context);
1463        check_collection(4, 4, workflow, &mut context);
1464        check_collection(5, 4, workflow, &mut context);
1465        check_collection(8, 4, workflow, &mut context);
1466
1467        let workflow = context.spawn_io_workflow(|scope, builder| {
1468            let bogus_node = builder.create_map_block(|v: i32| v);
1469            builder
1470                .chain(bogus_node.output)
1471                .collect_all::<16>()
1472                .connect(scope.terminate);
1473
1474            builder
1475                .chain(scope.start)
1476                .map_block(|v: i32| Some(v))
1477                .fork_option(
1478                    |chain: Chain<i32>| {
1479                        chain
1480                            .map_async(|v| async move { v })
1481                            .collect_all::<16>()
1482                            .connect(scope.terminate)
1483                    },
1484                    |chain: Chain<()>| chain.map_block(|()| unreachable!()).unused(),
1485                );
1486        });
1487
1488        let r = context
1489            .try_resolve_request(2, workflow, Duration::from_secs(30))
1490            .unwrap();
1491        assert_eq!(r.len(), 1);
1492        assert!(r.iter().all(|v| *v == 2));
1493
1494        let workflow = context.spawn_io_workflow(|scope, builder| {
1495            builder
1496                .chain(scope.start)
1497                .map_block(|v| if v < 4 { None } else { Some(v) })
1498                .dispose_on_none()
1499                .collect_all::<8>()
1500                .connect(scope.terminate);
1501        });
1502
1503        let r = context.resolve_request(2, workflow);
1504        assert!(r.is_empty());
1505
1506        let r = context.try_resolve_request(5, workflow, 1).unwrap();
1507        assert_eq!(r.len(), 1);
1508        assert!(r.iter().all(|v| *v == 5));
1509    }
1510
1511    fn check_collection(
1512        value: i32,
1513        min: i32,
1514        workflow: Service<i32, SmallVec<[i32; 16]>>,
1515        context: &mut TestingContext,
1516    ) {
1517        let r = context.try_resolve_request(value, workflow, 1);
1518        if value < min {
1519            assert!(r.is_err());
1520        } else {
1521            let r = r.unwrap();
1522            assert_eq!(r.len(), value as usize);
1523            assert!(r.iter().all(|v| *v == value));
1524        }
1525    }
1526
1527    #[test]
1528    fn test_unused_branch() {
1529        let mut context = TestingContext::minimal_plugins();
1530
1531        let workflow =
1532            context.spawn_io_workflow(|scope: Scope<Vec<Result<i64, ()>>, i64>, builder| {
1533                builder
1534                    .chain(scope.start)
1535                    .spread()
1536                    .fork_result(|ok| ok.connect(scope.terminate), |err| err.unused());
1537            });
1538
1539        let test_set = vec![Err(()), Err(()), Ok(5), Err(()), Ok(10)];
1540
1541        let r = context.resolve_request(test_set, workflow);
1542        assert_eq!(r, 5);
1543    }
1544}