1use std::future::Future;
19
20use bevy_ecs::prelude::Entity;
21
22use smallvec::SmallVec;
23
24use std::error::Error;
25
26use crate::{
27 make_option_branching, make_result_branching, Accessing, AddOperation, AsMap, Buffer,
28 BufferKey, BufferKeys, Bufferable, Buffering, Builder, Collect, CreateCancelFilter,
29 CreateDisposalFilter, ForkTargetStorage, Gate, GateRequest, InputSlot, IntoAsyncMap,
30 IntoBlockingCallback, IntoBlockingMap, Node, Noop, OperateBufferAccess, OperateCancel,
31 OperateDynamicGate, OperateQuietCancel, OperateSplit, OperateStaticGate, Output, ProvideOnce,
32 Provider, Scope, ScopeSettings, Sendish, ServiceInstructions, Spread, StreamPack,
33 StreamTargetMap, Trim, TrimBranch, UnusedTarget,
34};
35
36pub mod fork_clone_builder;
37pub use fork_clone_builder::*;
38
39pub(crate) mod premade;
40use premade::*;
41
42pub mod split;
43pub use split::*;
44
45pub mod unzip;
46pub use unzip::*;
47
48#[must_use]
55pub struct Chain<'w, 's, 'a, 'b, T> {
56 target: Entity,
57 builder: &'b mut Builder<'w, 's, 'a>,
58 _ignore: std::marker::PhantomData<fn(T)>,
59}
60
61impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
62 pub fn output(self) -> Output<T> {
69 Output::new(self.scope(), self.target)
70 }
71
72 pub fn unpack(self) -> (Output<T>, &'b mut Builder<'w, 's, 'a>) {
76 (Output::new(self.scope(), self.target), self.builder)
77 }
78
79 pub fn connect(self, input: InputSlot<T>) {
84 let output = Output::new(self.scope(), self.target);
85 self.builder.connect(output, input)
86 }
87
88 pub fn unused(self) {
91 }
93
94 pub fn then<P: Provider<Request = T>>(self, provider: P) -> Chain<'w, 's, 'a, 'b, P::Response>
98 where
99 P::Response: 'static + Send + Sync,
100 {
101 let source = self.target;
102 let target = self.builder.commands.spawn(UnusedTarget).id();
103 provider.connect(
104 Some(self.builder.scope()),
105 source,
106 target,
107 self.builder.commands,
108 );
109 Chain::new(target, self.builder)
110 }
111
112 pub fn then_node<P: Provider<Request = T>>(
115 self,
116 provider: P,
117 ) -> Node<T, P::Response, P::Streams>
118 where
119 P::Response: 'static + Send + Sync,
120 P::Streams: StreamPack,
121 {
122 let source = self.target;
123 let target = self.builder.commands.spawn(UnusedTarget).id();
124 provider.connect(Some(self.scope()), source, target, self.builder.commands);
125
126 let mut map = StreamTargetMap::default();
127 let streams =
128 <P::Streams as StreamPack>::spawn_node_streams(source, &mut map, self.builder);
129 self.builder.commands.entity(source).insert(map);
130 Node {
131 input: InputSlot::new(self.builder.scope(), source),
132 output: Output::new(self.builder.scope(), target),
133 streams,
134 }
135 }
136
137 pub fn map<M, F: AsMap<M>>(
140 self,
141 f: F,
142 ) -> Chain<'w, 's, 'a, 'b, <F::MapType as ProvideOnce>::Response>
143 where
144 F::MapType: Provider<Request = T>,
145 <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
146 {
147 self.then(f.as_map())
148 }
149
150 pub fn map_node<M, F: AsMap<M>>(
153 self,
154 f: F,
155 ) -> Node<T, <F::MapType as ProvideOnce>::Response, <F::MapType as ProvideOnce>::Streams>
156 where
157 F::MapType: Provider<Request = T>,
158 <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
159 <F::MapType as ProvideOnce>::Streams: StreamPack,
160 {
161 self.then_node(f.as_map())
162 }
163
164 pub fn map_block<U>(
171 self,
172 f: impl FnMut(T) -> U + 'static + Send + Sync,
173 ) -> Chain<'w, 's, 'a, 'b, U>
174 where
175 U: 'static + Send + Sync,
176 {
177 self.then(f.into_blocking_map())
178 }
179
180 pub fn map_block_node<U>(self, f: impl FnMut(T) -> U + 'static + Send + Sync) -> Node<T, U, ()>
183 where
184 U: 'static + Send + Sync,
185 {
186 self.then_node(f.into_blocking_map())
187 }
188
189 pub fn map_async<Task>(
194 self,
195 f: impl FnMut(T) -> Task + 'static + Send + Sync,
196 ) -> Chain<'w, 's, 'a, 'b, Task::Output>
197 where
198 Task: Future + 'static + Sendish,
199 Task::Output: 'static + Send + Sync,
200 {
201 self.then(f.into_async_map())
202 }
203
204 pub fn map_async_node<Task>(
207 self,
208 f: impl FnMut(T) -> Task + 'static + Send + Sync,
209 ) -> Node<T, Task::Output, ()>
210 where
211 Task: Future + 'static + Sendish,
212 Task::Output: 'static + Send + Sync,
213 {
214 self.then_node(f.into_async_map())
215 }
216
217 pub fn then_scope<Response, Streams, Settings>(
222 self,
223 build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
224 ) -> Chain<'w, 's, 'a, 'b, Response>
225 where
226 Response: 'static + Send + Sync,
227 Streams: StreamPack,
228 Settings: Into<ScopeSettings>,
229 {
230 let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
231 self.builder
232 .create_scope_impl::<T, Response, Streams, Settings>(self.target, exit_scope, build)
233 .output
234 .chain(self.builder)
235 }
236
237 pub fn then_io_scope<Response, Settings>(
243 self,
244 build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
245 ) -> Chain<'w, 's, 'a, 'b, Response>
246 where
247 Response: 'static + Send + Sync,
248 Settings: Into<ScopeSettings>,
249 {
250 self.then_scope(build)
251 }
252
253 pub fn then_scope_node<Response, Streams, Settings>(
256 self,
257 build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
258 ) -> Node<T, Response, Streams>
259 where
260 Response: 'static + Send + Sync,
261 Streams: StreamPack,
262 Settings: Into<ScopeSettings>,
263 {
264 let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
265 self.builder
266 .create_scope_impl::<T, Response, Streams, Settings>(self.target, exit_scope, build)
267 }
268
269 pub fn then_io_scope_node<Response, Settings>(
275 self,
276 build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
277 ) -> Node<T, Response, ()>
278 where
279 Response: 'static + Send + Sync,
280 Settings: Into<ScopeSettings>,
281 {
282 self.then_scope_node(build)
283 }
284
285 pub fn trigger(self) -> Chain<'w, 's, 'a, 'b, ()> {
291 self.map_block(|_| ())
292 }
293
294 pub fn with_access<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, (T, BufferKeys<B>)>
304 where
305 B: Bufferable,
306 B::BufferType: Accessing,
307 {
308 let buffers = buffers.into_buffer(self.builder);
309 buffers.verify_scope(self.builder.scope());
310
311 let source = self.target;
312 let target = self.builder.commands.spawn(UnusedTarget).id();
313 self.builder.commands.queue(AddOperation::new(
314 Some(self.builder.scope()),
315 source,
316 OperateBufferAccess::<T, B::BufferType>::new(buffers, target),
317 ));
318
319 Chain::new(target, self.builder)
320 }
321
322 pub fn then_access<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, BufferKeys<B>>
325 where
326 B: Bufferable,
327 B::BufferType: Accessing,
328 {
329 self.with_access(buffers).map_block(|(_, key)| key)
330 }
331
332 pub fn cancellation_filter<ThenResponse, F>(
341 self,
342 filter_provider: F,
343 ) -> Chain<'w, 's, 'a, 'b, ThenResponse>
344 where
345 ThenResponse: 'static + Send + Sync,
346 F: Provider<Request = T, Response = Option<ThenResponse>>,
347 F::Response: 'static + Send + Sync,
348 F::Streams: StreamPack,
349 {
350 self.then(filter_provider).cancel_on_none()
351 }
352
353 pub fn then_cancel(self)
361 where
362 T: ToString,
363 {
364 self.builder.commands.queue(AddOperation::new(
365 Some(self.scope()),
366 self.target,
367 OperateCancel::<T>::new(),
368 ));
369 }
370
371 pub fn disposal_filter<ThenResponse, F>(
375 self,
376 filter_provider: F,
377 ) -> Chain<'w, 's, 'a, 'b, ThenResponse>
378 where
379 ThenResponse: 'static + Send + Sync,
380 F: Provider<Request = T, Response = Option<ThenResponse>>,
381 F::Response: 'static + Send + Sync,
382 F::Streams: StreamPack,
383 {
384 self.then(filter_provider).dispose_on_none()
385 }
386
387 pub fn branch_clone(self, build: impl FnOnce(Chain<T>)) -> Chain<'w, 's, 'a, 'b, T>
396 where
397 T: Clone,
398 {
399 Chain::<T>::new(self.target, self.builder)
400 .fork_clone((|chain: Chain<T>| chain.output(), build))
401 .0
402 .chain(self.builder)
403 }
404
405 pub fn fork_clone<Build: ForkCloneBuilder<T>>(self, build: Build) -> Build::Outputs
416 where
417 T: Clone,
418 {
419 build.build_fork_clone(Output::new(self.scope(), self.target), self.builder)
420 }
421
422 pub fn unzip(self) -> T::Unzipped
430 where
431 T: Unzippable,
432 {
433 T::unzip_output(Output::<T>::new(self.scope(), self.target), self.builder)
434 }
435
436 pub fn fork_unzip<Build>(self, build: Build) -> Build::ReturnType
441 where
442 Build: UnzipBuilder<T>,
443 {
444 build.fork_unzip(Output::<T>::new(self.scope(), self.target), self.builder)
445 }
446
447 pub fn spread(self) -> Chain<'w, 's, 'a, 'b, T::Item>
457 where
458 T: IntoIterator,
459 T::Item: 'static + Send + Sync,
460 {
461 let source = self.target;
462 let target = self.builder.commands.spawn(UnusedTarget).id();
463 self.builder.commands.queue(AddOperation::new(
464 Some(self.builder.scope()),
465 source,
466 Spread::<T>::new(target),
467 ));
468
469 Chain::new(target, self.builder)
470 }
471
472 pub fn collect<const N: usize>(
488 self,
489 min: usize,
490 max: Option<usize>,
491 ) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
492 if let Some(max) = max {
493 assert!(0 < max);
494 assert!(min <= max);
495 }
496
497 let source = self.target;
498 let target = self.builder.commands.spawn(UnusedTarget).id();
499 self.builder.commands.queue(AddOperation::new(
500 Some(self.builder.scope()),
501 source,
502 Collect::<T, N>::new(target, min, max),
503 ));
504
505 Chain::new(target, self.builder)
506 }
507
508 pub fn collect_all<const N: usize>(self) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
510 self.collect::<N>(0, None)
511 }
512
513 pub fn collect_n<const N: usize>(self, n: usize) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
515 self.collect::<N>(n, Some(n))
516 }
517
518 pub fn then_trim(
526 self,
527 branches: impl IntoIterator<Item = TrimBranch>,
528 ) -> Chain<'w, 's, 'a, 'b, T> {
529 let branches: SmallVec<[_; 16]> = branches.into_iter().collect();
530 for branch in &branches {
531 branch.verify_scope(self.builder.scope());
532 }
533
534 let source = self.target;
535 let target = self.builder.commands.spawn(UnusedTarget).id();
536 self.builder.commands.queue(AddOperation::new(
537 Some(self.builder.scope()),
538 source,
539 Trim::<T>::new(branches, target),
540 ));
541
542 Chain::new(target, self.builder)
543 }
544
545 pub fn then_trim_node(self, branches: impl IntoIterator<Item = TrimBranch>) -> Node<T, T> {
550 let source = self.target;
551 let scope = self.builder.scope();
552 let target = self.then_trim(branches).output().id();
553 Node {
554 input: InputSlot::new(scope, source),
555 output: Output::new(scope, target),
556 streams: (),
557 }
558 }
559
560 pub fn then_push(self, buffer: Buffer<T>) -> Chain<'w, 's, 'a, 'b, ()> {
566 assert_eq!(self.scope(), buffer.scope());
567 self.with_access(buffer)
568 .then(push_into_buffer.into_blocking_callback())
569 .cancel_on_err()
570 }
571
572 pub fn then_gate_action<B>(self, action: Gate, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
575 where
576 B: Bufferable,
577 {
578 let buffers = buffers.into_buffer(self.builder);
579 buffers.verify_scope(self.builder.scope());
580
581 let source = self.target;
582 let target = self.builder.commands.spawn(UnusedTarget).id();
583 self.builder.commands.queue(AddOperation::new(
584 Some(self.builder.scope()),
585 source,
586 OperateStaticGate::<T, _>::new(buffers, target, action),
587 ));
588
589 Chain::new(target, self.builder)
590 }
591
592 pub fn then_gate_open<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
595 where
596 B: Bufferable,
597 {
598 self.then_gate_action(Gate::Open, buffers)
599 }
600
601 pub fn then_gate_close<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
604 where
605 B: Bufferable,
606 {
607 self.then_gate_action(Gate::Closed, buffers)
608 }
609
610 pub fn flatten(self) -> Chain<'w, 's, 'a, 'b, T::Output>
614 where
615 T: Future,
616 T::Output: 'static + Send + Sync,
617 {
618 self.map_async(|r| r)
619 }
620
621 pub fn split<U>(self, build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, T>) -> U) -> U
626 where
627 T: Splittable,
628 {
629 let source = self.target;
630 self.builder.commands.queue(AddOperation::new(
631 Some(self.builder.scope()),
632 source,
633 OperateSplit::<T>::default(),
634 ));
635
636 build(SplitBuilder::new(source, self.builder))
637 }
638
639 pub fn split_outputs(self) -> SplitOutputs<T>
648 where
649 T: Splittable,
650 {
651 self.split(|b| b.outputs())
652 }
653
654 pub fn split_as_list<U>(
662 self,
663 build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, SplitAsList<T>>) -> U,
664 ) -> U
665 where
666 T: IntoIterator,
667 T::Item: 'static + Send + Sync,
668 {
669 self.map_block(SplitAsList::new).split(build)
670 }
671
672 pub fn split_as_list_outputs(self) -> SplitOutputs<SplitAsList<T>>
682 where
683 T: IntoIterator,
684 T::Item: 'static + Send + Sync,
685 {
686 self.split_as_list(|b| b.outputs())
687 }
688
689 pub fn noop(self) -> Chain<'w, 's, 'a, 'b, T> {
698 let source = self.target;
699 let target = self.builder.commands.spawn(UnusedTarget).id();
700
701 self.builder.commands.queue(AddOperation::new(
702 Some(self.scope()),
703 source,
704 Noop::<T>::new(target),
705 ));
706 Chain::new(target, self.builder)
707 }
708
709 pub fn noop_node(self) -> Node<T, T> {
711 let source = self.target;
712 let scope = self.builder.scope();
713 let target = self.noop().output().id();
714 Node {
715 input: InputSlot::new(scope, source),
716 output: Output::new(scope, target),
717 streams: (),
718 }
719 }
720
721 pub fn scope(&self) -> Entity {
723 self.builder.scope()
724 }
725
726 pub fn target(&self) -> Entity {
728 self.target
729 }
730}
731
732impl<'w, 's, 'a, 'b, T, E> Chain<'w, 's, 'a, 'b, Result<T, E>>
733where
734 T: 'static + Send + Sync,
735 E: 'static + Send + Sync,
736{
737 pub fn branch_for_err(self, build_err: impl FnOnce(Chain<E>)) -> Chain<'w, 's, 'a, 'b, T> {
743 Chain::<Result<T, E>>::new(self.target, self.builder)
744 .fork_result(|chain| chain.output(), build_err)
745 .0
746 .chain(self.builder)
747 }
748
749 pub fn fork_result<U, V>(
757 self,
758 build_ok: impl FnOnce(Chain<T>) -> U,
759 build_err: impl FnOnce(Chain<E>) -> V,
760 ) -> (U, V) {
761 let source = self.target;
762 let target_ok = self.builder.commands.spawn(UnusedTarget).id();
763 let target_err = self.builder.commands.spawn(UnusedTarget).id();
764
765 self.builder.commands.queue(AddOperation::new(
766 Some(self.scope()),
767 source,
768 make_result_branching::<T, E>(ForkTargetStorage::from_iter([target_ok, target_err])),
769 ));
770
771 let u = build_ok(Chain::new(target_ok, self.builder));
772 let v = build_err(Chain::new(target_err, self.builder));
773 (u, v)
774 }
775
776 pub fn connect_on_err<U>(self, input: InputSlot<Result<U, E>>) -> Chain<'w, 's, 'a, 'b, T>
784 where
785 U: 'static + Send + Sync,
786 {
787 self.branch_for_err(move |chain: Chain<E>| chain.map_block(|err| Err(err)).connect(input))
788 }
789
790 pub fn cancel_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
823 where
824 E: Error,
825 {
826 let source = self.target;
827 let target = self.builder.commands.spawn(UnusedTarget).id();
828
829 self.builder.commands.queue(AddOperation::new(
830 Some(self.scope()),
831 source,
832 CreateCancelFilter::on_err::<T, E>(target),
833 ));
834
835 Chain::new(target, self.builder)
836 }
837
838 pub fn cancel_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
843 let source = self.target;
844 let target = self.builder.commands.spawn(UnusedTarget).id();
845
846 self.builder.commands.queue(AddOperation::new(
847 Some(self.scope()),
848 source,
849 CreateCancelFilter::on_quiet_err::<T, E>(target),
850 ));
851
852 Chain::new(target, self.builder)
853 }
854
855 pub fn dispose_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
862 where
863 E: Error,
864 {
865 let source = self.target;
866 let target = self.builder.commands.spawn(UnusedTarget).id();
867
868 self.builder.commands.queue(AddOperation::new(
869 Some(self.scope()),
870 source,
871 CreateDisposalFilter::on_err::<T, E>(target),
872 ));
873
874 Chain::new(target, self.builder)
875 }
876
877 pub fn dispose_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
882 let source = self.target;
883 let target = self.builder.commands.spawn(UnusedTarget).id();
884
885 self.builder.commands.queue(AddOperation::new(
886 Some(self.scope()),
887 source,
888 CreateDisposalFilter::on_quiet_err::<T, E>(target),
889 ));
890
891 Chain::new(target, self.builder)
892 }
893
894 pub fn dispose_on_ok(self) -> Chain<'w, 's, 'a, 'b, E> {
898 let source = self.target;
899 let target = self.builder.commands.spawn(UnusedTarget).id();
900
901 self.builder.commands.queue(AddOperation::new(
902 Some(self.scope()),
903 source,
904 CreateDisposalFilter::on_ok::<T, E>(target),
905 ));
906
907 Chain::new(target, self.builder)
908 }
909}
910
911impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, Option<T>>
912where
913 T: 'static + Send + Sync,
914{
915 pub fn branch_for_none(self, build_none: impl FnOnce(Chain<()>)) -> Chain<'w, 's, 'a, 'b, T> {
921 Chain::<Option<T>>::new(self.target, self.builder)
922 .fork_option(|chain| chain.output(), build_none)
923 .0
924 .chain(self.builder)
925 }
926
927 pub fn fork_option<U, V>(
935 self,
936 build_some: impl FnOnce(Chain<T>) -> U,
937 build_none: impl FnOnce(Chain<()>) -> V,
938 ) -> (U, V) {
939 let source = self.target;
940 let target_some = self.builder.commands.spawn(UnusedTarget).id();
941 let target_none = self.builder.commands.spawn(UnusedTarget).id();
942
943 self.builder.commands.queue(AddOperation::new(
944 Some(self.scope()),
945 source,
946 make_option_branching::<T>(ForkTargetStorage::from_iter([target_some, target_none])),
947 ));
948
949 let u = build_some(Chain::new(target_some, self.builder));
950 let v = build_none(Chain::new(target_none, self.builder));
951 (u, v)
952 }
953
954 pub fn connect_on_none<U>(self, input: InputSlot<Option<U>>) -> Chain<'w, 's, 'a, 'b, T>
962 where
963 U: 'static + Send + Sync,
964 {
965 self.branch_for_none(move |chain: Chain<()>| chain.map_block(|_| None).connect(input))
966 }
967
968 pub fn cancel_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
972 let source = self.target;
973 let target = self.builder.commands.spawn(UnusedTarget).id();
974
975 self.builder.commands.queue(AddOperation::new(
976 Some(self.scope()),
977 source,
978 CreateCancelFilter::on_none::<T>(target),
979 ));
980
981 Chain::new(target, self.builder)
982 }
983
984 pub fn dispose_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
991 let source = self.target;
992 let target = self.builder.commands.spawn(UnusedTarget).id();
993
994 self.builder.commands.queue(AddOperation::new(
995 Some(self.scope()),
996 source,
997 CreateDisposalFilter::on_none::<T>(target),
998 ));
999
1000 Chain::new(target, self.builder)
1001 }
1002
1003 pub fn dispose_on_some(self) -> Chain<'w, 's, 'a, 'b, ()> {
1008 let source = self.target;
1009 let target = self.builder.commands.spawn(UnusedTarget).id();
1010
1011 self.builder.commands.queue(AddOperation::new(
1012 Some(self.scope()),
1013 source,
1014 CreateDisposalFilter::on_some::<T>(target),
1015 ));
1016
1017 Chain::new(target, self.builder)
1018 }
1019}
1020
1021impl<'w, 's, 'a, 'b, K, V, T> Chain<'w, 's, 'a, 'b, T>
1022where
1023 K: 'static + Send + Sync + Eq + std::hash::Hash + Clone + std::fmt::Debug,
1024 V: 'static + Send + Sync,
1025 T: 'static + Send + Sync + IntoIterator<Item = (K, V)>,
1026{
1027 pub fn split_as_map<U>(
1036 self,
1037 build: impl FnOnce(SplitBuilder<'w, 's, 'a, 'b, SplitAsMap<K, V, T>>) -> U,
1038 ) -> U {
1039 self.map_block(SplitAsMap::new).split(build)
1040 }
1041
1042 pub fn split_as_map_outputs(self) -> SplitOutputs<SplitAsMap<K, V, T>> {
1052 self.split_as_map(|b| b.outputs())
1053 }
1054}
1055
1056impl<'w, 's, 'a, 'b, Request, Response, Streams>
1057 Chain<'w, 's, 'a, 'b, (Request, ServiceInstructions<Request, Response, Streams>)>
1058where
1059 Request: 'static + Send + Sync,
1060 Response: 'static + Send + Sync + Unpin,
1061 Streams: StreamPack,
1062{
1063 pub fn then_injection(self) -> Chain<'w, 's, 'a, 'b, Response> {
1074 let source = self.target;
1075 let node = self
1076 .builder
1077 .create_injection_impl::<Request, Response, Streams>(source);
1078 node.output.chain(self.builder)
1079 }
1080
1081 pub fn then_injection_node(
1090 self,
1091 ) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams> {
1092 let source = self.target;
1093 self.builder
1094 .create_injection_impl::<Request, Response, Streams>(source)
1095 }
1096}
1097
1098impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, GateRequest<T>>
1099where
1100 T: 'static + Send + Sync,
1101{
1102 pub fn then_gate<B>(self, buffers: B) -> Chain<'w, 's, 'a, 'b, T>
1103 where
1104 B: Bufferable,
1105 B::BufferType: 'static + Send + Sync,
1106 {
1107 let buffers = buffers.into_buffer(self.builder);
1108 buffers.verify_scope(self.builder.scope());
1109
1110 let source = self.target;
1111 let target = self.builder.commands.spawn(UnusedTarget).id();
1112 self.builder.commands.queue(AddOperation::new(
1113 Some(self.builder.scope()),
1114 source,
1115 OperateDynamicGate::<T, _>::new(buffers, target),
1116 ));
1117
1118 Chain::new(target, self.builder)
1119 }
1120}
1121
1122impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, BufferKey<T>>
1123where
1124 T: 'static + Send + Sync,
1125{
1126 pub fn consume_buffer<const N: usize>(self) -> Chain<'w, 's, 'a, 'b, SmallVec<[T; N]>> {
1127 self.then(consume_buffer.into_blocking_callback())
1128 }
1129}
1130
1131impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
1132 pub(crate) fn new(target: Entity, builder: &'b mut Builder<'w, 's, 'a>) -> Self {
1135 Self {
1136 target,
1137 builder,
1138 _ignore: Default::default(),
1139 }
1140 }
1141}
1142
1143impl<'w, 's, 'a, 'b, K, V> Chain<'w, 's, 'a, 'b, (K, V)>
1144where
1145 K: 'static + Send + Sync,
1146 V: 'static + Send + Sync,
1147{
1148 pub fn key(self) -> Chain<'w, 's, 'a, 'b, K> {
1151 self.map_block(|(key, _)| key)
1152 }
1153
1154 pub fn value(self) -> Chain<'w, 's, 'a, 'b, V> {
1157 self.map_block(|(_, value)| value)
1158 }
1159}
1160
1161impl<'w, 's, 'a, 'b> Chain<'w, 's, 'a, 'b, ()> {
1162 pub fn then_quiet_cancel(self) {
1167 self.builder.commands.queue(AddOperation::new(
1168 Some(self.scope()),
1169 self.target,
1170 OperateQuietCancel,
1171 ));
1172 }
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177 use crate::{prelude::*, testing::*};
1178 use smallvec::SmallVec;
1179
1180 #[test]
1181 fn test_join() {
1182 let mut context = TestingContext::minimal_plugins();
1183
1184 let workflow = context.spawn_io_workflow(|scope, builder| {
1185 scope
1186 .input
1187 .chain(builder)
1188 .fork_unzip((
1190 |chain: Chain<f64>| {
1191 chain
1192 .map_block(|value| WaitRequest {
1194 duration: Duration::from_secs_f64(value / 100.0),
1195 value,
1196 })
1197 .map_async(wait)
1198 .output()
1200 },
1201 |chain: Chain<f64>| {
1202 chain
1203 .map_block(|value| 2.0 * value)
1205 .output()
1207 },
1208 ))
1209 .join(builder)
1210 .map_block(add)
1212 .connect(scope.terminate);
1214 });
1215
1216 let mut promise =
1217 context.command(|commands| commands.request((2.0, 2.0), workflow).take_response());
1218
1219 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1220 assert!(promise.take().available().is_some_and(|value| value == 6.0));
1221 assert!(context.no_unhandled_errors());
1222 }
1223
1224 #[test]
1225 fn test_race() {
1226 let mut context = TestingContext::minimal_plugins();
1227
1228 let workflow = context.spawn_io_workflow(|scope, builder| {
1229 scope
1230 .input
1231 .chain(builder)
1232 .map_block(add)
1234 .then_io_scope(|scope, builder| {
1236 scope
1237 .input
1238 .chain(builder)
1239 .fork_clone((
1241 |chain: Chain<f64>| {
1242 chain
1244 .map_block(|value| WaitRequest {
1245 duration: Duration::from_secs_f64(0.01 * value),
1246 value,
1247 })
1248 .map_async(wait)
1249 .connect(scope.terminate);
1251 },
1252 |chain: Chain<f64>| {
1253 chain
1255 .map_block(|a| (a, a))
1256 .map_block(add)
1258 .connect(scope.terminate);
1260 },
1261 ));
1262 })
1263 .map_block(|a| (a, a))
1266 .map_block(add)
1268 .connect(scope.terminate);
1270 });
1271
1272 let mut promise =
1273 context.command(|commands| commands.request((2.0, 2.0), workflow).take_response());
1274
1275 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1276
1277 assert_eq!(promise.take().available(), Some(16.0));
1278 assert!(context.no_unhandled_errors());
1279 }
1280
1281 #[test]
1282 fn test_unzip() {
1283 let mut context = TestingContext::minimal_plugins();
1284
1285 let workflow = context.spawn_io_workflow(|scope, builder| {
1286 scope
1287 .input
1288 .chain(builder)
1289 .map_block(add)
1290 .map_block(|v| (v, 2.0 * v))
1291 .then_io_scope(|scope, builder| {
1292 scope.input.chain(builder).fork_unzip((
1293 |chain: Chain<f64>| {
1294 chain
1295 .map_block(|v| (v, 10.0))
1296 .map_block(add)
1297 .connect(scope.terminate);
1298 },
1299 |chain: Chain<f64>| {
1300 chain
1301 .map_block(|value| WaitRequest {
1302 duration: std::time::Duration::from_secs_f64(0.01),
1303 value,
1304 })
1305 .map_async(wait)
1306 .connect(scope.terminate);
1307 },
1308 ));
1309 })
1310 .connect(scope.terminate);
1311 });
1312
1313 let mut promise =
1314 context.command(|commands| commands.request((2.0, 3.0), workflow).take_response());
1315
1316 context.run_while_pending(&mut promise);
1317 assert_eq!(promise.take().available(), Some(15.0));
1318 assert!(context.no_unhandled_errors());
1319 }
1320
1321 #[test]
1322 fn test_cancel_on_special_case() {
1323 let mut context = TestingContext::minimal_plugins();
1324
1325 let workflow = context.spawn_io_workflow(|scope, builder| {
1326 scope
1327 .input
1328 .chain(builder)
1329 .map_block(duplicate)
1330 .map_block(add)
1331 .map_block(produce_none)
1332 .cancel_on_none()
1333 .map_block(duplicate)
1334 .map_block(add)
1335 .connect(scope.terminate);
1336 });
1337
1338 let mut promise =
1339 context.command(|commands| commands.request(2.0, workflow).take_response());
1340
1341 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1342 assert!(promise.peek().is_cancelled());
1343 assert!(context.no_unhandled_errors());
1344
1345 let workflow = context.spawn_io_workflow(|scope, builder| {
1346 scope
1347 .input
1348 .chain(builder)
1349 .map_block(duplicate)
1350 .map_block(add)
1351 .map_block(produce_err)
1352 .cancel_on_quiet_err()
1353 .map_block(duplicate)
1354 .map_block(add)
1355 .connect(scope.terminate);
1356 });
1357
1358 let mut promise =
1359 context.command(|commands| commands.request(2.0, workflow).take_response());
1360
1361 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1362 assert!(promise.peek().is_cancelled());
1363 assert!(context.no_unhandled_errors());
1364 }
1365
1366 #[test]
1367 fn test_disposal() {
1368 let mut context = TestingContext::minimal_plugins();
1369
1370 let workflow = context.spawn_io_workflow(|scope, builder| {
1371 scope
1372 .input
1373 .chain(builder)
1374 .map_block(duplicate)
1375 .map_block(add)
1376 .map_block(produce_none)
1377 .dispose_on_none()
1378 .map_block(duplicate)
1379 .map_block(add)
1380 .connect(scope.terminate);
1381 });
1382
1383 let mut promise =
1384 context.command(|commands| commands.request(2.0, workflow).take_response());
1385
1386 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1387 assert!(promise.peek().is_cancelled());
1388 assert!(context.no_unhandled_errors());
1389
1390 let workflow = context.spawn_io_workflow(
1391 |scope: Scope<Result<f64, Result<f64, TestError>>, f64>, builder| {
1392 scope.input.chain(builder).fork_result(
1393 |chain| chain.connect(scope.terminate),
1394 |chain| chain.dispose_on_err().connect(scope.terminate),
1395 );
1396 },
1397 );
1398
1399 let mut promise =
1400 context.command(|commands| commands.request(Ok(1.0), workflow).take_response());
1401
1402 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1403 assert!(promise.take().available().is_some_and(|v| v == 1.0));
1404 assert!(context.no_unhandled_errors());
1405
1406 let mut promise =
1407 context.command(|commands| commands.request(Err(Ok(5.0)), workflow).take_response());
1408
1409 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1410 assert!(promise.take().available().is_some_and(|v| v == 5.0));
1411 assert!(context.no_unhandled_errors());
1412
1413 let mut promise = context.command(|commands| {
1414 commands
1415 .request(Err(Err(TestError)), workflow)
1416 .take_response()
1417 });
1418
1419 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1420 assert!(promise.peek().is_cancelled());
1421 assert!(context.no_unhandled_errors());
1422 }
1423
1424 #[test]
1425 fn test_spread() {
1426 let mut context = TestingContext::minimal_plugins();
1427
1428 let workflow = context.spawn_io_workflow(|scope, builder| {
1429 let buffer = builder.create_buffer(BufferSettings::keep_all());
1430
1431 scope
1432 .input
1433 .chain(builder)
1434 .map_block(|value| {
1435 let mut duplicated_values: SmallVec<[i32; 16]> = SmallVec::new();
1436 for _ in 0..value {
1437 duplicated_values.push(value);
1438 }
1439 duplicated_values
1440 })
1441 .spread()
1442 .connect(buffer.input_slot());
1443
1444 buffer
1445 .listen(builder)
1446 .then(watch_for_quantity.into_blocking_callback())
1447 .dispose_on_none()
1448 .connect(scope.terminate);
1449 });
1450
1451 let mut promise = context.command(|commands| commands.request(7, workflow).take_response());
1452
1453 context.run_with_conditions(&mut promise, 1);
1454 assert!(promise
1455 .take()
1456 .available()
1457 .is_some_and(|v| { v.len() == 7 && v.iter().find(|a| **a != 7).is_none() }));
1458 assert!(context.no_unhandled_errors());
1459 }
1460
1461 fn watch_for_quantity(
1466 In(key): In<BufferKey<i32>>,
1467 mut access: BufferAccessMut<i32>,
1468 ) -> Option<SmallVec<[i32; 16]>> {
1469 let mut buffer = access.get_mut(&key).unwrap();
1470 let expected_count = *buffer.newest()? as usize;
1471 if buffer.len() < expected_count {
1472 return None;
1473 }
1474
1475 Some(buffer.drain(..).collect())
1476 }
1477
1478 #[test]
1479 fn test_collect() {
1480 let mut context = TestingContext::minimal_plugins();
1481
1482 let workflow =
1483 context.spawn_io_workflow(|scope, builder| {
1484 let node = scope.input.chain(builder).map_node(
1485 |input: BlockingMap<i32, StreamOf<i32>>| {
1486 for _ in 0..input.request {
1487 input.streams.send(input.request);
1488 }
1489 },
1490 );
1491
1492 node.streams
1493 .chain(builder)
1494 .collect_all::<16>()
1495 .connect(scope.terminate);
1496 });
1497
1498 let mut promise = context.command(|commands| commands.request(8, workflow).take_response());
1499
1500 context.run_with_conditions(&mut promise, 1);
1501 assert!(promise
1502 .take()
1503 .available()
1504 .is_some_and(|v| { v.len() == 8 && v.iter().find(|a| **a != 8).is_none() }));
1505 assert!(context.no_unhandled_errors());
1506
1507 let workflow =
1508 context.spawn_io_workflow(|scope, builder| {
1509 let node = scope.input.chain(builder).map_node(
1510 |input: BlockingMap<i32, StreamOf<i32>>| {
1511 for _ in 0..input.request {
1512 input.streams.send(input.request);
1513 }
1514 },
1515 );
1516
1517 node.streams
1518 .chain(builder)
1519 .collect::<16>(4, None)
1520 .connect(scope.terminate);
1521 });
1522
1523 check_collection(0, 4, workflow, &mut context);
1524 check_collection(2, 4, workflow, &mut context);
1525 check_collection(3, 4, workflow, &mut context);
1526 check_collection(4, 4, workflow, &mut context);
1527 check_collection(5, 4, workflow, &mut context);
1528 check_collection(8, 4, workflow, &mut context);
1529
1530 let workflow = context.spawn_io_workflow(|scope, builder| {
1531 let bogus_node = builder.create_map_block(|v: i32| v);
1532 bogus_node
1533 .output
1534 .chain(builder)
1535 .collect_all::<16>()
1536 .connect(scope.terminate);
1537
1538 scope
1539 .input
1540 .chain(builder)
1541 .map_block(|v: i32| Some(v))
1542 .fork_option(
1543 |chain: Chain<i32>| {
1544 chain
1545 .map_async(|v| async move { v })
1546 .collect_all::<16>()
1547 .connect(scope.terminate)
1548 },
1549 |chain: Chain<()>| chain.map_block(|()| unreachable!()).unused(),
1550 );
1551 });
1552
1553 let mut promise = context.command(|commands| commands.request(2, workflow).take_response());
1554
1555 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1556 assert!(promise
1557 .take()
1558 .available()
1559 .is_some_and(|v| v.len() == 1 && v.iter().find(|a| **a != 2).is_none()));
1560 assert!(context.no_unhandled_errors());
1561
1562 let workflow = context.spawn_io_workflow(|scope, builder| {
1563 scope
1564 .input
1565 .chain(builder)
1566 .map_block(|v| if v < 4 { None } else { Some(v) })
1567 .dispose_on_none()
1568 .collect_all::<8>()
1569 .connect(scope.terminate);
1570 });
1571
1572 let mut promise = context.command(|commands| commands.request(2, workflow).take_response());
1573
1574 context.run_with_conditions(&mut promise, 1);
1575 assert!(promise.take().available().is_some_and(|v| v.is_empty()));
1576 assert!(context.no_unhandled_errors());
1577
1578 let mut promise = context.command(|commands| commands.request(5, workflow).take_response());
1579
1580 context.run_with_conditions(&mut promise, 1);
1581 assert!(promise
1582 .take()
1583 .available()
1584 .is_some_and(|v| v.len() == 1 && v.iter().find(|a| **a != 5).is_none()));
1585 assert!(context.no_unhandled_errors());
1586 }
1587
1588 fn check_collection(
1589 value: i32,
1590 min: i32,
1591 workflow: Service<i32, SmallVec<[i32; 16]>>,
1592 context: &mut TestingContext,
1593 ) {
1594 let mut promise =
1595 context.command(|commands| commands.request(value, workflow).take_response());
1596
1597 context.run_with_conditions(&mut promise, 1);
1598 if value < min {
1599 assert!(promise.take().is_cancelled());
1600 } else {
1601 assert!(promise.take().available().is_some_and(|v| {
1602 v.len() == value as usize && v.iter().find(|a| **a != value).is_none()
1603 }));
1604 }
1605 assert!(context.no_unhandled_errors());
1606 }
1607
1608 #[test]
1609 fn test_unused_branch() {
1610 let mut context = TestingContext::minimal_plugins();
1611
1612 let workflow =
1613 context.spawn_io_workflow(|scope: Scope<Vec<Result<i64, ()>>, i64>, builder| {
1614 scope
1615 .input
1616 .chain(builder)
1617 .spread()
1618 .fork_result(|ok| ok.connect(scope.terminate), |err| err.unused());
1619 });
1620
1621 let test_set = vec![Err(()), Err(()), Ok(5), Err(()), Ok(10)];
1622 let mut promise =
1623 context.command(|commands| commands.request(test_set, workflow).take_response());
1624
1625 context.run_with_conditions(&mut promise, Duration::from_secs(2));
1626 assert!(context.no_unhandled_errors());
1627 assert_eq!(promise.take().available().unwrap(), 5);
1628 }
1629}