1use std::future::Future;
19
20use bevy_ecs::prelude::Entity;
21
22use smallvec::SmallVec;
23
24use std::error::Error;
25
26use crate::{
27 Accessing, AddOperation, AsMap, Buffer, BufferKey, BufferKeys, Bufferable, Buffering, Builder,
28 Collect, CreateCancelFilter, CreateDisposalFilter, ForkTargetStorage, Gate, GateRequest,
29 InputSlot, IntoAsyncMap, IntoBlockingCallback, IntoBlockingMap, Node, Noop,
30 OperateBufferAccess, OperateCancel, OperateDynamicGate, OperateQuietCancel, OperateSplit,
31 OperateStaticGate, Output, ProvideOnce, Provider, Scope, ScopeSettings, Sendish,
32 ServiceInstructions, Spread, StreamPack, StreamTargetMap, Trim, TrimBranch, UnusedTarget,
33 make_option_branching, make_result_branching,
34};
35
36pub mod fork_clone_builder;
37pub use fork_clone_builder::*;
38
39pub(crate) mod premade;
40use premade::*;
41
42pub mod split;
43pub use split::*;
44
45pub mod unzip;
46pub use unzip::*;
47
48#[must_use]
55pub struct Chain<'w, 's, 'a, 'b, T> {
56 target: Entity,
57 builder: &'b mut Builder<'w, 's, 'a>,
58 _ignore: std::marker::PhantomData<fn(T)>,
59}
60
61impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
62 pub fn output(self) -> Output<T> {
69 Output::new(self.scope(), self.target)
70 }
71
72 pub fn unpack(self) -> (Output<T>, &'b mut Builder<'w, 's, 'a>) {
76 (Output::new(self.scope(), self.target), self.builder)
77 }
78
79 pub fn connect(self, input: InputSlot<T>) {
84 let output = Output::new(self.scope(), self.target);
85 self.builder.connect(output, input)
86 }
87
88 pub fn unused(self) {
91 }
93
94 pub fn then<P: Provider<Request = T>>(self, provider: P) -> Chain<'w, 's, 'a, 'b, P::Response>
98 where
99 P::Response: 'static + Send + Sync,
100 {
101 let source = self.target;
102 let target = self.builder.commands.spawn(UnusedTarget).id();
103 provider.connect(
104 Some(self.builder.scope()),
105 source,
106 target,
107 self.builder.commands,
108 );
109 Chain::new(target, self.builder)
110 }
111
112 pub fn then_node<P: Provider<Request = T>>(
115 self,
116 provider: P,
117 ) -> Node<T, P::Response, P::Streams>
118 where
119 P::Response: 'static + Send + Sync,
120 P::Streams: StreamPack,
121 {
122 let source = self.target;
123 let target = self.builder.commands.spawn(UnusedTarget).id();
124 provider.connect(Some(self.scope()), source, target, self.builder.commands);
125
126 let mut map = StreamTargetMap::default();
127 let streams =
128 <P::Streams as StreamPack>::spawn_node_streams(source, &mut map, self.builder);
129 self.builder.commands.entity(source).insert(map);
130 Node {
131 input: InputSlot::new(self.builder.scope(), source),
132 output: Output::new(self.builder.scope(), target),
133 streams,
134 }
135 }
136
137 pub fn map<M, F: AsMap<M>>(
140 self,
141 f: F,
142 ) -> Chain<'w, 's, 'a, 'b, <F::MapType as ProvideOnce>::Response>
143 where
144 F::MapType: Provider<Request = T>,
145 <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
146 {
147 self.then(f.as_map())
148 }
149
150 pub fn map_node<M, F: AsMap<M>>(
153 self,
154 f: F,
155 ) -> Node<T, <F::MapType as ProvideOnce>::Response, <F::MapType as ProvideOnce>::Streams>
156 where
157 F::MapType: Provider<Request = T>,
158 <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
159 <F::MapType as ProvideOnce>::Streams: StreamPack,
160 {
161 self.then_node(f.as_map())
162 }
163
164 pub fn map_block<U>(
171 self,
172 f: impl FnMut(T) -> U + 'static + Send + Sync,
173 ) -> Chain<'w, 's, 'a, 'b, U>
174 where
175 U: 'static + Send + Sync,
176 {
177 self.then(f.into_blocking_map())
178 }
179
180 pub fn map_block_node<U>(self, f: impl FnMut(T) -> U + 'static + Send + Sync) -> Node<T, U, ()>
183 where
184 U: 'static + Send + Sync,
185 {
186 self.then_node(f.into_blocking_map())
187 }
188
189 pub fn map_async<Task>(
194 self,
195 f: impl FnMut(T) -> Task + 'static + Send + Sync,
196 ) -> Chain<'w, 's, 'a, 'b, Task::Output>
197 where
198 Task: Future + 'static + Sendish,
199 Task::Output: 'static + Send + Sync,
200 {
201 self.then(f.into_async_map())
202 }
203
204 pub fn map_async_node<Task>(
207 self,
208 f: impl FnMut(T) -> Task + 'static + Send + Sync,
209 ) -> Node<T, Task::Output, ()>
210 where
211 Task: Future + 'static + Sendish,
212 Task::Output: 'static + Send + Sync,
213 {
214 self.then_node(f.into_async_map())
215 }
216
217 pub fn then_scope<Response, Streams, Settings>(
222 self,
223 build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
224 ) -> Chain<'w, 's, 'a, 'b, Response>
225 where
226 Response: 'static + Send + Sync,
227 Streams: StreamPack,
228 Settings: Into<ScopeSettings>,
229 {
230 let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
231 self.builder
232 .create_scope_impl::<T, Response, Streams, Settings>(self.target, exit_scope, build)
233 .output
234 .chain(self.builder)
235 }
236
237 pub fn then_io_scope<Response, Settings>(
243 self,
244 build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
245 ) -> Chain<'w, 's, 'a, 'b, Response>
246 where
247 Response: 'static + Send + Sync,
248 Settings: Into<ScopeSettings>,
249 {
250 self.then_scope(build)
251 }
252
253 pub fn then_scope_node<Response, Streams, Settings>(
256 self,
257 build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
258 ) -> Node<T, Response, Streams>
259 where
260 Response: 'static + Send + Sync,
261 Streams: StreamPack,
262 Settings: Into<ScopeSettings>,
263 {
264 let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
265 self.builder
266 .create_scope_impl::<T, Response, Streams, Settings>(self.target, exit_scope, build)
267 }
268
269 pub fn then_io_scope_node<Response, Settings>(
275 self,
276 build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
277 ) -> Node<T, Response, ()>
278 where
279 Response: 'static + Send + Sync,
280 Settings: Into<ScopeSettings>,
281 {
282 self.then_scope_node(build)
283 }
284
285 pub fn trigger(self) -> Chain<'w, 's, 'a, 'b, ()> {
291 self.map_block(|_| ())
292 }
293
294 pub fn with_access<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, (T, BufferKeys<B>)>
304 where
305 B: Bufferable,
306 B::BufferType: Accessing,
307 {
308 let buffers = buffers.into_buffer(self.builder);
309 buffers.verify_scope(self.builder.scope());
310
311 let source = self.target;
312 let target = self.builder.commands.spawn(UnusedTarget).id();
313 self.builder.commands.queue(AddOperation::new(
314 Some(self.builder.scope()),
315 source,
316 OperateBufferAccess::<T, B::BufferType>::new(buffers, target),
317 ));
318
319 Chain::new(target, self.builder)
320 }
321
322 pub fn then_access<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, BufferKeys<B>>
325 where
326 B: Bufferable,
327 B::BufferType: Accessing,
328 {
329 self.with_access(buffers).map_block(|(_, key)| key)
330 }
331
332 pub fn cancellation_filter<ThenResponse, F>(
341 self,
342 filter_provider: F,
343 ) -> Chain<'w, 's, 'a, 'b, ThenResponse>
344 where
345 ThenResponse: 'static + Send + Sync,
346 F: Provider<Request = T, Response = Option<ThenResponse>>,
347 F::Response: 'static + Send + Sync,
348 F::Streams: StreamPack,
349 {
350 self.then(filter_provider).cancel_on_none()
351 }
352
353 pub fn then_cancel(self)
361 where
362 T: ToString,
363 {
364 self.builder.commands.queue(AddOperation::new(
365 Some(self.scope()),
366 self.target,
367 OperateCancel::<T>::new(),
368 ));
369 }
370
371 pub fn disposal_filter<ThenResponse, F>(
375 self,
376 filter_provider: F,
377 ) -> Chain<'w, 's, 'a, 'b, ThenResponse>
378 where
379 ThenResponse: 'static + Send + Sync,
380 F: Provider<Request = T, Response = Option<ThenResponse>>,
381 F::Response: 'static + Send + Sync,
382 F::Streams: StreamPack,
383 {
384 self.then(filter_provider).dispose_on_none()
385 }
386
387 pub fn branch_clone(self, build: impl FnOnce(Chain<T>)) -> Chain<'w, 's, 'a, 'b, T>
396 where
397 T: Clone,
398 {
399 Chain::<T>::new(self.target, self.builder)
400 .fork_clone((|chain: Chain<T>| chain.output(), build))
401 .0
402 .chain(self.builder)
403 }
404
405 pub fn fork_clone<Build: ForkCloneBuilder<T>>(self, build: Build) -> Build::Outputs
416 where
417 T: Clone,
418 {
419 build.build_fork_clone(Output::new(self.scope(), self.target), self.builder)
420 }
421
422 pub fn unzip(self) -> T::Unzipped
430 where
431 T: Unzippable,
432 {
433 T::unzip_output(Output::<T>::new(self.scope(), self.target), self.builder)
434 }
435
436 pub fn fork_unzip<Build>(self, build: Build) -> Build::ReturnType
441 where
442 Build: UnzipBuilder<T>,
443 {
444 build.fork_unzip(Output::<T>::new(self.scope(), self.target), self.builder)
445 }
446
447 pub fn spread(self) -> Chain<'w, 's, 'a, 'b, T::Item>
457 where
458 T: IntoIterator,
459 T::Item: 'static + Send + Sync,
460 {
461 let source = self.target;
462 let target = self.builder.commands.spawn(UnusedTarget).id();
463 self.builder.commands.queue(AddOperation::new(
464 Some(self.builder.scope()),
465 source,
466 Spread::<T>::new(target),
467 ));
468
469 Chain::new(target, self.builder)
470 }
471
472 pub fn collect<const N: usize>(
488 self,
489 min: usize,
490 max: Option<usize>,
491 ) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
492 if let Some(max) = max {
493 assert!(0 < max);
494 assert!(min <= max);
495 }
496
497 let source = self.target;
498 let target = self.builder.commands.spawn(UnusedTarget).id();
499 self.builder.commands.queue(AddOperation::new(
500 Some(self.builder.scope()),
501 source,
502 Collect::<T, N>::new(target, min, max),
503 ));
504
505 Chain::new(target, self.builder)
506 }
507
508 pub fn collect_all<const N: usize>(self) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
510 self.collect::<N>(0, None)
511 }
512
513 pub fn collect_n<const N: usize>(self, n: usize) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
515 self.collect::<N>(n, Some(n))
516 }
517
518 pub fn then_trim(
526 self,
527 branches: impl IntoIterator<Item = TrimBranch>,
528 ) -> Chain<'w, 's, 'a, 'b, T> {
529 let branches: SmallVec<[_; 16]> = branches.into_iter().collect();
530 for branch in &branches {
531 branch.verify_scope(self.builder.scope());
532 }
533
534 let source = self.target;
535 let target = self.builder.commands.spawn(UnusedTarget).id();
536 self.builder.commands.queue(AddOperation::new(
537 Some(self.builder.scope()),
538 source,
539 Trim::<T>::new(branches, target),
540 ));
541
542 Chain::new(target, self.builder)
543 }
544
545 pub fn then_trim_node(self, branches: impl IntoIterator<Item = TrimBranch>) -> Node<T, T> {
550 let source = self.target;
551 let scope = self.builder.scope();
552 let target = self.then_trim(branches).output().id();
553 Node {
554 input: InputSlot::new(scope, source),
555 output: Output::new(scope, target),
556 streams: (),
557 }
558 }
559
560 pub fn then_push(self, buffer: Buffer<T>) -> Chain<'w, 's, 'a, 'b, ()> {
566 assert_eq!(self.scope(), buffer.scope());
567 self.with_access(buffer)
568 .then(push_into_buffer.into_blocking_callback())
569 .cancel_on_err()
570 }
571
572 pub fn then_gate_action<B>(self, action: Gate, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
575 where
576 B: Bufferable,
577 {
578 let buffers = buffers.into_buffer(self.builder);
579 buffers.verify_scope(self.builder.scope());
580
581 let source = self.target;
582 let target = self.builder.commands.spawn(UnusedTarget).id();
583 self.builder.commands.queue(AddOperation::new(
584 Some(self.builder.scope()),
585 source,
586 OperateStaticGate::<T, _>::new(buffers, target, action),
587 ));
588
589 Chain::new(target, self.builder)
590 }
591
592 pub fn then_gate_open<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
595 where
596 B: Bufferable,
597 {
598 self.then_gate_action(Gate::Open, buffers)
599 }
600
601 pub fn then_gate_close<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
604 where
605 B: Bufferable,
606 {
607 self.then_gate_action(Gate::Closed, buffers)
608 }
609
610 pub fn flatten(self) -> Chain<'w, 's, 'a, 'b, T::Output>
614 where
615 T: Future,
616 T::Output: 'static + Send + Sync,
617 {
618 self.map_async(|r| r)
619 }
620
621 pub fn split<U>(self, build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, T>) -> U) -> U
626 where
627 T: Splittable,
628 {
629 let source = self.target;
630 self.builder.commands.queue(AddOperation::new(
631 Some(self.builder.scope()),
632 source,
633 OperateSplit::<T>::default(),
634 ));
635
636 build(SplitBuilder::new(source, self.builder))
637 }
638
639 pub fn split_outputs(self) -> SplitOutputs<T>
648 where
649 T: Splittable,
650 {
651 self.split(|b| b.outputs())
652 }
653
654 pub fn split_as_list<U>(
662 self,
663 build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, SplitAsList<T>>) -> U,
664 ) -> U
665 where
666 T: IntoIterator,
667 T::Item: 'static + Send + Sync,
668 {
669 self.map_block(SplitAsList::new).split(build)
670 }
671
672 pub fn split_as_list_outputs(self) -> SplitOutputs<SplitAsList<T>>
682 where
683 T: IntoIterator,
684 T::Item: 'static + Send + Sync,
685 {
686 self.split_as_list(|b| b.outputs())
687 }
688
689 pub fn noop(self) -> Chain<'w, 's, 'a, 'b, T> {
698 let source = self.target;
699 let target = self.builder.commands.spawn(UnusedTarget).id();
700
701 self.builder.commands.queue(AddOperation::new(
702 Some(self.scope()),
703 source,
704 Noop::<T>::new(target),
705 ));
706 Chain::new(target, self.builder)
707 }
708
709 pub fn noop_node(self) -> Node<T, T> {
711 let source = self.target;
712 let scope = self.builder.scope();
713 let target = self.noop().output().id();
714 Node {
715 input: InputSlot::new(scope, source),
716 output: Output::new(scope, target),
717 streams: (),
718 }
719 }
720
721 pub fn scope(&self) -> Entity {
723 self.builder.scope()
724 }
725
726 pub fn target(&self) -> Entity {
728 self.target
729 }
730}
731
732impl<'w, 's, 'a, 'b, T, E> Chain<'w, 's, 'a, 'b, Result<T, E>>
733where
734 T: 'static + Send + Sync,
735 E: 'static + Send + Sync,
736{
737 pub fn branch_for_err(self, build_err: impl FnOnce(Chain<E>)) -> Chain<'w, 's, 'a, 'b, T> {
743 Chain::<Result<T, E>>::new(self.target, self.builder)
744 .fork_result(|chain| chain.output(), build_err)
745 .0
746 .chain(self.builder)
747 }
748
749 pub fn fork_result<U, V>(
757 self,
758 build_ok: impl FnOnce(Chain<T>) -> U,
759 build_err: impl FnOnce(Chain<E>) -> V,
760 ) -> (U, V) {
761 let source = self.target;
762 let target_ok = self.builder.commands.spawn(UnusedTarget).id();
763 let target_err = self.builder.commands.spawn(UnusedTarget).id();
764
765 self.builder.commands.queue(AddOperation::new(
766 Some(self.scope()),
767 source,
768 make_result_branching::<T, E>(ForkTargetStorage::from_iter([target_ok, target_err])),
769 ));
770
771 let u = build_ok(Chain::new(target_ok, self.builder));
772 let v = build_err(Chain::new(target_err, self.builder));
773 (u, v)
774 }
775
776 pub fn connect_on_err<U>(self, input: InputSlot<Result<U, E>>) -> Chain<'w, 's, 'a, 'b, T>
784 where
785 U: 'static + Send + Sync,
786 {
787 self.branch_for_err(move |chain: Chain<E>| chain.map_block(|err| Err(err)).connect(input))
788 }
789
790 pub fn cancel_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
818 where
819 E: Error,
820 {
821 let source = self.target;
822 let target = self.builder.commands.spawn(UnusedTarget).id();
823
824 self.builder.commands.queue(AddOperation::new(
825 Some(self.scope()),
826 source,
827 CreateCancelFilter::on_err::<T, E>(target),
828 ));
829
830 Chain::new(target, self.builder)
831 }
832
833 pub fn cancel_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
838 let source = self.target;
839 let target = self.builder.commands.spawn(UnusedTarget).id();
840
841 self.builder.commands.queue(AddOperation::new(
842 Some(self.scope()),
843 source,
844 CreateCancelFilter::on_quiet_err::<T, E>(target),
845 ));
846
847 Chain::new(target, self.builder)
848 }
849
850 pub fn dispose_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
857 where
858 E: Error,
859 {
860 let source = self.target;
861 let target = self.builder.commands.spawn(UnusedTarget).id();
862
863 self.builder.commands.queue(AddOperation::new(
864 Some(self.scope()),
865 source,
866 CreateDisposalFilter::on_err::<T, E>(target),
867 ));
868
869 Chain::new(target, self.builder)
870 }
871
872 pub fn dispose_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
877 let source = self.target;
878 let target = self.builder.commands.spawn(UnusedTarget).id();
879
880 self.builder.commands.queue(AddOperation::new(
881 Some(self.scope()),
882 source,
883 CreateDisposalFilter::on_quiet_err::<T, E>(target),
884 ));
885
886 Chain::new(target, self.builder)
887 }
888
889 pub fn dispose_on_ok(self) -> Chain<'w, 's, 'a, 'b, E> {
893 let source = self.target;
894 let target = self.builder.commands.spawn(UnusedTarget).id();
895
896 self.builder.commands.queue(AddOperation::new(
897 Some(self.scope()),
898 source,
899 CreateDisposalFilter::on_ok::<T, E>(target),
900 ));
901
902 Chain::new(target, self.builder)
903 }
904}
905
906impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, Option<T>>
907where
908 T: 'static + Send + Sync,
909{
910 pub fn branch_for_none(self, build_none: impl FnOnce(Chain<()>)) -> Chain<'w, 's, 'a, 'b, T> {
916 Chain::<Option<T>>::new(self.target, self.builder)
917 .fork_option(|chain| chain.output(), build_none)
918 .0
919 .chain(self.builder)
920 }
921
922 pub fn fork_option<U, V>(
930 self,
931 build_some: impl FnOnce(Chain<T>) -> U,
932 build_none: impl FnOnce(Chain<()>) -> V,
933 ) -> (U, V) {
934 let source = self.target;
935 let target_some = self.builder.commands.spawn(UnusedTarget).id();
936 let target_none = self.builder.commands.spawn(UnusedTarget).id();
937
938 self.builder.commands.queue(AddOperation::new(
939 Some(self.scope()),
940 source,
941 make_option_branching::<T>(ForkTargetStorage::from_iter([target_some, target_none])),
942 ));
943
944 let u = build_some(Chain::new(target_some, self.builder));
945 let v = build_none(Chain::new(target_none, self.builder));
946 (u, v)
947 }
948
949 pub fn connect_on_none<U>(self, input: InputSlot<Option<U>>) -> Chain<'w, 's, 'a, 'b, T>
957 where
958 U: 'static + Send + Sync,
959 {
960 self.branch_for_none(move |chain: Chain<()>| chain.map_block(|_| None).connect(input))
961 }
962
963 pub fn cancel_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
967 let source = self.target;
968 let target = self.builder.commands.spawn(UnusedTarget).id();
969
970 self.builder.commands.queue(AddOperation::new(
971 Some(self.scope()),
972 source,
973 CreateCancelFilter::on_none::<T>(target),
974 ));
975
976 Chain::new(target, self.builder)
977 }
978
979 pub fn dispose_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
986 let source = self.target;
987 let target = self.builder.commands.spawn(UnusedTarget).id();
988
989 self.builder.commands.queue(AddOperation::new(
990 Some(self.scope()),
991 source,
992 CreateDisposalFilter::on_none::<T>(target),
993 ));
994
995 Chain::new(target, self.builder)
996 }
997
998 pub fn dispose_on_some(self) -> Chain<'w, 's, 'a, 'b, ()> {
1003 let source = self.target;
1004 let target = self.builder.commands.spawn(UnusedTarget).id();
1005
1006 self.builder.commands.queue(AddOperation::new(
1007 Some(self.scope()),
1008 source,
1009 CreateDisposalFilter::on_some::<T>(target),
1010 ));
1011
1012 Chain::new(target, self.builder)
1013 }
1014}
1015
1016impl<'w, 's, 'a, 'b, K, V, T> Chain<'w, 's, 'a, 'b, T>
1017where
1018 K: 'static + Send + Sync + Eq + std::hash::Hash + Clone + std::fmt::Debug,
1019 V: 'static + Send + Sync,
1020 T: 'static + Send + Sync + IntoIterator<Item = (K, V)>,
1021{
1022 pub fn split_as_map<U>(
1031 self,
1032 build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, SplitAsMap<K, V, T>>) -> U,
1033 ) -> U {
1034 self.map_block(SplitAsMap::new).split(build)
1035 }
1036
1037 pub fn split_as_map_outputs(self) -> SplitOutputs<SplitAsMap<K, V, T>> {
1047 self.split_as_map(|b| b.outputs())
1048 }
1049}
1050
1051impl<'w, 's, 'a, 'b, Request, Response, Streams>
1052 Chain<'w, 's, 'a, 'b, (Request, ServiceInstructions<Request, Response, Streams>)>
1053where
1054 Request: 'static + Send + Sync,
1055 Response: 'static + Send + Sync + Unpin,
1056 Streams: StreamPack,
1057{
1058 pub fn then_injection(self) -> Chain<'w, 's, 'a, 'b, Response> {
1069 let source = self.target;
1070 let node = self
1071 .builder
1072 .create_injection_impl::<Request, Response, Streams>(source);
1073 node.output.chain(self.builder)
1074 }
1075
1076 pub fn then_injection_node(
1085 self,
1086 ) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams> {
1087 let source = self.target;
1088 self.builder
1089 .create_injection_impl::<Request, Response, Streams>(source)
1090 }
1091}
1092
1093impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, GateRequest<T>>
1094where
1095 T: 'static + Send + Sync,
1096{
1097 pub fn then_gate<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
1098 where
1099 B: Bufferable,
1100 B::BufferType: 'static + Send + Sync,
1101 {
1102 let buffers = buffers.into_buffer(self.builder);
1103 buffers.verify_scope(self.builder.scope());
1104
1105 let source = self.target;
1106 let target = self.builder.commands.spawn(UnusedTarget).id();
1107 self.builder.commands.queue(AddOperation::new(
1108 Some(self.builder.scope()),
1109 source,
1110 OperateDynamicGate::<T, _>::new(buffers, target),
1111 ));
1112
1113 Chain::new(target, self.builder)
1114 }
1115}
1116
1117impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, BufferKey<T>>
1118where
1119 T: 'static + Send + Sync,
1120{
1121 pub fn consume_buffer<const N: usize>(self) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
1122 self.then(consume_buffer.into_blocking_callback())
1123 }
1124}
1125
1126impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
1127 pub(crate) fn new(target: Entity, builder: &'b mut Builder<'w, 's, 'a>) -> Self {
1130 Self {
1131 target,
1132 builder,
1133 _ignore: Default::default(),
1134 }
1135 }
1136}
1137
1138impl<'w, 's, 'a, 'b, K, V> Chain<'w, 's, 'a, 'b, (K, V)>
1139where
1140 K: 'static + Send + Sync,
1141 V: 'static + Send + Sync,
1142{
1143 pub fn key(self) -> Chain<'w, 's, 'a, 'b, K> {
1146 self.map_block(|(key, _)| key)
1147 }
1148
1149 pub fn value(self) -> Chain<'w, 's, 'a, 'b, V> {
1152 self.map_block(|(_, value)| value)
1153 }
1154}
1155
1156impl<'w, 's, 'a, 'b> Chain<'w, 's, 'a, 'b, ()> {
1157 pub fn then_quiet_cancel(self) {
1162 self.builder.commands.queue(AddOperation::new(
1163 Some(self.scope()),
1164 self.target,
1165 OperateQuietCancel,
1166 ));
1167 }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use crate::{prelude::*, testing::*};
1173 use smallvec::SmallVec;
1174
1175 #[test]
1176 fn test_join() {
1177 let mut context = TestingContext::minimal_plugins();
1178
1179 let workflow = context.spawn_io_workflow(|scope, builder| {
1180 builder
1181 .chain(scope.start)
1182 .fork_unzip((
1184 |chain: Chain<f64>| {
1185 chain
1186 .map_block(|value| WaitRequest {
1188 duration: Duration::from_secs_f64(value / 100.0),
1189 value,
1190 })
1191 .map_async(wait)
1192 .output()
1194 },
1195 |chain: Chain<f64>| {
1196 chain
1197 .map_block(|value| 2.0 * value)
1199 .output()
1201 },
1202 ))
1203 .join(builder)
1204 .map_block(add)
1206 .connect(scope.terminate);
1208 });
1209
1210 let r = context.resolve_request((2.0, 2.0), workflow);
1211 assert_eq!(r, 6.0);
1212 }
1213
1214 #[test]
1215 fn test_race() {
1216 let mut context = TestingContext::minimal_plugins();
1217
1218 let workflow = context.spawn_io_workflow(|scope, builder| {
1219 builder
1220 .chain(scope.start)
1221 .map_block(add)
1223 .then_io_scope(|scope, builder| {
1225 builder
1226 .chain(scope.start)
1227 .fork_clone((
1229 |chain: Chain<f64>| {
1230 chain
1232 .map_block(|value| WaitRequest {
1233 duration: Duration::from_secs_f64(0.01 * value),
1234 value,
1235 })
1236 .map_async(wait)
1237 .connect(scope.terminate);
1239 },
1240 |chain: Chain<f64>| {
1241 chain
1243 .map_block(|a| (a, a))
1244 .map_block(add)
1246 .connect(scope.terminate);
1248 },
1249 ));
1250 })
1251 .map_block(|a| (a, a))
1254 .map_block(add)
1256 .connect(scope.terminate);
1258 });
1259
1260 let r = context.resolve_request((2.0, 2.0), workflow);
1261 assert_eq!(r, 16.0);
1262 }
1263
1264 #[test]
1265 fn test_unzip() {
1266 let mut context = TestingContext::minimal_plugins();
1267
1268 let workflow = context.spawn_io_workflow(|scope, builder| {
1269 builder
1270 .chain(scope.start)
1271 .map_block(add)
1272 .map_block(|v| (v, 2.0 * v))
1273 .then_io_scope(|scope, builder| {
1274 builder.chain(scope.start).fork_unzip((
1275 |chain: Chain<f64>| {
1276 chain
1277 .map_block(|v| (v, 10.0))
1278 .map_block(add)
1279 .connect(scope.terminate);
1280 },
1281 |chain: Chain<f64>| {
1282 chain
1283 .map_block(|value| WaitRequest {
1284 duration: std::time::Duration::from_secs_f64(0.01),
1285 value,
1286 })
1287 .map_async(wait)
1288 .connect(scope.terminate);
1289 },
1290 ));
1291 })
1292 .connect(scope.terminate);
1293 });
1294
1295 let r = context.resolve_request((2.0, 3.0), workflow);
1296 assert_eq!(r, 15.0);
1297 }
1298
1299 #[test]
1300 fn test_cancel_on_special_case() {
1301 let mut context = TestingContext::minimal_plugins();
1302
1303 let workflow = context.spawn_io_workflow(|scope, builder| {
1304 builder
1305 .chain(scope.start)
1306 .map_block(duplicate)
1307 .map_block(add)
1308 .map_block(produce_none)
1309 .cancel_on_none()
1310 .map_block(duplicate)
1311 .map_block(add)
1312 .connect(scope.terminate);
1313 });
1314
1315 let r = context.try_resolve_request(2.0, workflow, ());
1316 assert!(r.is_err());
1317
1318 let workflow = context.spawn_io_workflow(|scope, builder| {
1319 builder
1320 .chain(scope.start)
1321 .map_block(duplicate)
1322 .map_block(add)
1323 .map_block(produce_err)
1324 .cancel_on_quiet_err()
1325 .map_block(duplicate)
1326 .map_block(add)
1327 .connect(scope.terminate);
1328 });
1329
1330 let r = context.try_resolve_request(2.0, workflow, ());
1331 assert!(r.is_err());
1332 }
1333
1334 #[test]
1335 fn test_disposal() {
1336 let mut context = TestingContext::minimal_plugins();
1337
1338 let workflow = context.spawn_io_workflow(|scope, builder| {
1339 builder
1340 .chain(scope.start)
1341 .map_block(duplicate)
1342 .map_block(add)
1343 .map_block(produce_none)
1344 .dispose_on_none()
1345 .map_block(duplicate)
1346 .map_block(add)
1347 .connect(scope.terminate);
1348 });
1349
1350 let r = context.try_resolve_request(2.0, workflow, ());
1351 assert!(r.is_err());
1352
1353 let workflow = context.spawn_io_workflow(
1354 |scope: Scope<Result<f64, Result<f64, TestError>>, f64>, builder| {
1355 builder.chain(scope.start).fork_result(
1356 |chain| chain.connect(scope.terminate),
1357 |chain| chain.dispose_on_err().connect(scope.terminate),
1358 );
1359 },
1360 );
1361
1362 let r = context.resolve_request(Ok(1.0), workflow);
1363 assert_eq!(r, 1.0);
1364
1365 let r = context.resolve_request(Err(Ok(5.0)), workflow);
1366 assert_eq!(r, 5.0);
1367
1368 let r = context.try_resolve_request(Err(Err(TestError)), workflow, ());
1369 assert!(r.is_err());
1370 }
1371
1372 #[test]
1373 fn test_spread() {
1374 let mut context = TestingContext::minimal_plugins();
1375
1376 let workflow = context.spawn_io_workflow(|scope, builder| {
1377 let buffer = builder.create_buffer(BufferSettings::keep_all());
1378
1379 builder
1380 .chain(scope.start)
1381 .map_block(|value| {
1382 let mut duplicated_values: SmallVec<[i32; 16]> = SmallVec::new();
1383 for _ in 0..value {
1384 duplicated_values.push(value);
1385 }
1386 duplicated_values
1387 })
1388 .spread()
1389 .connect(buffer.input_slot());
1390
1391 buffer
1392 .listen(builder)
1393 .then(watch_for_quantity.into_blocking_callback())
1394 .dispose_on_none()
1395 .connect(scope.terminate);
1396 });
1397
1398 let r = context.try_resolve_request(7, workflow, 1).unwrap();
1399 assert_eq!(r.len(), 7);
1400 assert!(r.iter().all(|v| *v == 7));
1401 }
1402
1403 fn watch_for_quantity(
1408 In(key): In<BufferKey<i32>>,
1409 mut access: BufferAccessMut<i32>,
1410 ) -> Option<SmallVec<[i32; 16]>> {
1411 let mut buffer = access.get_mut(&key).unwrap();
1412 let expected_count = *buffer.newest()? as usize;
1413 if buffer.len() < expected_count {
1414 return None;
1415 }
1416
1417 Some(buffer.drain(..).collect())
1418 }
1419
1420 #[test]
1421 fn test_collect() {
1422 let mut context = TestingContext::minimal_plugins();
1423
1424 let workflow = context.spawn_io_workflow(|scope, builder| {
1425 let node =
1426 builder
1427 .chain(scope.start)
1428 .map_node(|input: BlockingMap<i32, StreamOf<i32>>| {
1429 for _ in 0..input.request {
1430 input.streams.send(input.request);
1431 }
1432 });
1433
1434 builder
1435 .chain(node.streams)
1436 .collect_all::<16>()
1437 .connect(scope.terminate);
1438 });
1439
1440 let r = context.try_resolve_request(8, workflow, 1).unwrap();
1441 assert_eq!(r.len(), 8);
1442 assert!(r.iter().all(|v| *v == 8));
1443
1444 let workflow = context.spawn_io_workflow(|scope, builder| {
1445 let node =
1446 builder
1447 .chain(scope.start)
1448 .map_node(|input: BlockingMap<i32, StreamOf<i32>>| {
1449 for _ in 0..input.request {
1450 input.streams.send(input.request);
1451 }
1452 });
1453
1454 builder
1455 .chain(node.streams)
1456 .collect::<16>(4, None)
1457 .connect(scope.terminate);
1458 });
1459
1460 check_collection(0, 4, workflow, &mut context);
1461 check_collection(2, 4, workflow, &mut context);
1462 check_collection(3, 4, workflow, &mut context);
1463 check_collection(4, 4, workflow, &mut context);
1464 check_collection(5, 4, workflow, &mut context);
1465 check_collection(8, 4, workflow, &mut context);
1466
1467 let workflow = context.spawn_io_workflow(|scope, builder| {
1468 let bogus_node = builder.create_map_block(|v: i32| v);
1469 builder
1470 .chain(bogus_node.output)
1471 .collect_all::<16>()
1472 .connect(scope.terminate);
1473
1474 builder
1475 .chain(scope.start)
1476 .map_block(|v: i32| Some(v))
1477 .fork_option(
1478 |chain: Chain<i32>| {
1479 chain
1480 .map_async(|v| async move { v })
1481 .collect_all::<16>()
1482 .connect(scope.terminate)
1483 },
1484 |chain: Chain<()>| chain.map_block(|()| unreachable!()).unused(),
1485 );
1486 });
1487
1488 let r = context
1489 .try_resolve_request(2, workflow, Duration::from_secs(30))
1490 .unwrap();
1491 assert_eq!(r.len(), 1);
1492 assert!(r.iter().all(|v| *v == 2));
1493
1494 let workflow = context.spawn_io_workflow(|scope, builder| {
1495 builder
1496 .chain(scope.start)
1497 .map_block(|v| if v < 4 { None } else { Some(v) })
1498 .dispose_on_none()
1499 .collect_all::<8>()
1500 .connect(scope.terminate);
1501 });
1502
1503 let r = context.resolve_request(2, workflow);
1504 assert!(r.is_empty());
1505
1506 let r = context.try_resolve_request(5, workflow, 1).unwrap();
1507 assert_eq!(r.len(), 1);
1508 assert!(r.iter().all(|v| *v == 5));
1509 }
1510
1511 fn check_collection(
1512 value: i32,
1513 min: i32,
1514 workflow: Service<i32, SmallVec<[i32; 16]>>,
1515 context: &mut TestingContext,
1516 ) {
1517 let r = context.try_resolve_request(value, workflow, 1);
1518 if value < min {
1519 assert!(r.is_err());
1520 } else {
1521 let r = r.unwrap();
1522 assert_eq!(r.len(), value as usize);
1523 assert!(r.iter().all(|v| *v == value));
1524 }
1525 }
1526
1527 #[test]
1528 fn test_unused_branch() {
1529 let mut context = TestingContext::minimal_plugins();
1530
1531 let workflow =
1532 context.spawn_io_workflow(|scope: Scope<Vec<Result<i64, ()>>, i64>, builder| {
1533 builder
1534 .chain(scope.start)
1535 .spread()
1536 .fork_result(|ok| ok.connect(scope.terminate), |err| err.unused());
1537 });
1538
1539 let test_set = vec![Err(()), Err(()), Ok(5), Err(()), Ok(10)];
1540
1541 let r = context.resolve_request(test_set, workflow);
1542 assert_eq!(r, 5);
1543 }
1544}