1use bevy_ecs::prelude::{Commands, Entity};
19
20use std::future::Future;
21
22use smallvec::SmallVec;
23
24use crate::{
25 Accessible, Accessing, Accessor, AddOperation, AsMap, Buffer, BufferKeys, BufferLocation,
26 BufferMap, BufferSettings, Bufferable, Buffering, Chain, Collect, ForkClone, ForkCloneOutput,
27 ForkOptionOutput, ForkResultOutput, ForkTargetStorage, Gate, GateRequest, IncompatibleLayout,
28 Injection, InputSlot, IntoAsyncMap, IntoBlockingMap, Joinable, Joined, Node, OperateBuffer,
29 OperateCancel, OperateDynamicGate, OperateQuietCancel, OperateScope, OperateSplit,
30 OperateStaticGate, Output, Provider, RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints,
31 ScopeSettings, ScopeSettingsStorage, Sendish, ServiceInstructions, SplitOutputs, Splittable,
32 StreamPack, StreamTargetMap, StreamsOfMap, Trim, TrimBranch, UnusedTarget, Unzippable,
33 make_option_branching, make_result_branching,
34};
35
36pub(crate) mod connect;
37pub(crate) use connect::*;
38
39pub struct Builder<'w, 's, 'a> {
48 pub(crate) context: BuilderScopeContext,
49 pub(crate) commands: &'a mut Commands<'w, 's>,
50}
51
52#[derive(Clone, Copy, Debug)]
53pub struct BuilderScopeContext {
54 pub(crate) scope: Entity,
56 pub(crate) finish_scope_cancel: Entity,
58}
59
60impl<'w, 's, 'a> Builder<'w, 's, 'a> {
61 pub fn chain<'b, Response: 'static + Send + Sync>(
63 &'b mut self,
64 output: Output<Response>,
65 ) -> Chain<'w, 's, 'a, 'b, Response> {
66 output.chain(self)
67 }
68
69 pub fn create_node<P: Provider>(
73 &mut self,
74 provider: P,
75 ) -> Node<P::Request, P::Response, P::Streams>
76 where
77 P::Request: 'static + Send + Sync,
78 P::Response: 'static + Send + Sync,
79 P::Streams: StreamPack,
80 {
81 let source = self.commands.spawn(()).id();
82 let target = self.commands.spawn(UnusedTarget).id();
83 provider.connect(Some(self.scope()), source, target, self.commands);
84
85 let mut map = StreamTargetMap::default();
86 let streams = <P::Streams as StreamPack>::spawn_node_streams(source, &mut map, self);
87 self.commands.entity(source).insert(map);
88 Node {
89 input: InputSlot::new(self.scope(), source),
90 output: Output::new(self.scope(), target),
91 streams,
92 }
93 }
94
95 pub fn create_map_block<T, U>(
97 &mut self,
98 f: impl FnMut(T) -> U + 'static + Send + Sync,
99 ) -> Node<T, U, ()>
100 where
101 T: 'static + Send + Sync,
102 U: 'static + Send + Sync,
103 {
104 self.create_node(f.into_blocking_map())
105 }
106
107 pub fn create_map_async<T, Task>(
109 &mut self,
110 f: impl FnMut(T) -> Task + 'static + Send + Sync,
111 ) -> Node<T, Task::Output, ()>
112 where
113 T: 'static + Send + Sync,
114 Task: Future + 'static + Sendish,
115 Task::Output: 'static + Send + Sync,
116 {
117 self.create_node(f.into_async_map())
118 }
119
120 pub fn create_map<M, F: AsMap<M>>(
127 &mut self,
128 f: F,
129 ) -> Node<RequestOfMap<M, F>, ResponseOfMap<M, F>, StreamsOfMap<M, F>>
130 where
131 F::MapType: Provider,
132 RequestOfMap<M, F>: 'static + Send + Sync,
133 ResponseOfMap<M, F>: 'static + Send + Sync,
134 StreamsOfMap<M, F>: StreamPack,
135 {
136 self.create_node(f.as_map())
137 }
138
139 pub fn create_injection_node<Request, Response, Streams>(
146 &mut self,
147 ) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams>
148 where
149 Request: 'static + Send + Sync,
150 Response: 'static + Send + Sync + Unpin,
151 Streams: StreamPack,
152 {
153 let source = self.commands.spawn(()).id();
154 self.create_injection_impl::<Request, Response, Streams>(source)
155 }
156
157 pub fn connect<T: 'static + Send + Sync>(&mut self, output: Output<T>, input: InputSlot<T>) {
159 assert_eq!(output.scope(), input.scope());
160 self.commands.queue(Connect {
161 original_target: output.id(),
162 new_target: input.id(),
163 });
164 }
165
166 pub fn create_buffer<T: 'static + Send + Sync>(
170 &mut self,
171 settings: BufferSettings,
172 ) -> Buffer<T> {
173 let source = self.commands.spawn(()).id();
174 self.commands.queue(AddOperation::new(
175 Some(self.scope()),
176 source,
177 OperateBuffer::<T>::new(settings),
178 ));
179
180 Buffer {
181 location: BufferLocation {
182 scope: self.scope(),
183 source,
184 },
185 _ignore: Default::default(),
186 }
187 }
188
189 pub fn create_scope<Request, Response, Streams, Settings>(
201 &mut self,
202 build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
203 ) -> Node<Request, Response, Streams>
204 where
205 Request: 'static + Send + Sync,
206 Response: 'static + Send + Sync,
207 Streams: StreamPack,
208 Settings: Into<ScopeSettings>,
209 {
210 let scope_id = self.commands.spawn(()).id();
211 let exit_scope = self.commands.spawn(UnusedTarget).id();
212 self.create_scope_impl(scope_id, exit_scope, build)
213 }
214
215 pub fn create_io_scope<Request, Response, Settings>(
219 &mut self,
220 build: impl FnOnce(Scope<Request, Response, ()>, &mut Builder) -> Settings,
221 ) -> Node<Request, Response, ()>
222 where
223 Request: 'static + Send + Sync,
224 Response: 'static + Send + Sync,
225 Settings: Into<ScopeSettings>,
226 {
227 self.create_scope::<Request, Response, (), Settings>(build)
228 }
229
230 pub fn create_fork_clone<T>(&mut self) -> (InputSlot<T>, ForkCloneOutput<T>)
233 where
234 T: Clone + 'static + Send + Sync,
235 {
236 let source = self.commands.spawn(()).id();
237 self.commands.queue(AddOperation::new(
238 Some(self.scope()),
239 source,
240 ForkClone::<T>::new(ForkTargetStorage::new()),
241 ));
242 (
243 InputSlot::new(self.scope(), source),
244 ForkCloneOutput::new(self.scope(), source),
245 )
246 }
247
248 pub fn create_unzip<T>(&mut self) -> (InputSlot<T>, T::Unzipped)
251 where
252 T: Unzippable + 'static + Send + Sync,
253 {
254 let source = self.commands.spawn(()).id();
255 (
256 InputSlot::new(self.scope(), source),
257 T::unzip_output(Output::<T>::new(self.scope(), source), self),
258 )
259 }
260
261 pub fn create_fork_result<T, E>(&mut self) -> (InputSlot<Result<T, E>>, ForkResultOutput<T, E>)
265 where
266 T: 'static + Send + Sync,
267 E: 'static + Send + Sync,
268 {
269 let source = self.commands.spawn(()).id();
270 let target_ok = self.commands.spawn(UnusedTarget).id();
271 let target_err = self.commands.spawn(UnusedTarget).id();
272
273 self.commands.queue(AddOperation::new(
274 Some(self.scope()),
275 source,
276 make_result_branching::<T, E>(ForkTargetStorage::from_iter([target_ok, target_err])),
277 ));
278
279 (
280 InputSlot::new(self.scope(), source),
281 ForkResultOutput {
282 ok: Output::new(self.scope(), target_ok),
283 err: Output::new(self.scope(), target_err),
284 },
285 )
286 }
287
288 pub fn create_fork_option<T>(&mut self) -> (InputSlot<Option<T>>, ForkOptionOutput<T>)
295 where
296 T: 'static + Send + Sync,
297 {
298 let source = self.commands.spawn(()).id();
299 let target_some = self.commands.spawn(UnusedTarget).id();
300 let target_none = self.commands.spawn(UnusedTarget).id();
301
302 self.commands.queue(AddOperation::new(
303 Some(self.scope()),
304 source,
305 make_option_branching::<T>(ForkTargetStorage::from_iter([target_some, target_none])),
306 ));
307
308 (
309 InputSlot::new(self.scope(), source),
310 ForkOptionOutput {
311 some: Output::new(self.scope(), target_some),
312 none: Output::new(self.scope(), target_none),
313 },
314 )
315 }
316
317 pub fn join<'b, B: Joinable>(&'b mut self, buffers: B) -> Chain<'w, 's, 'a, 'b, B::Item> {
319 buffers.join(self)
320 }
321
322 pub fn try_join<'b, J: Joined>(
324 &'b mut self,
325 buffers: &BufferMap,
326 ) -> Result<Chain<'w, 's, 'a, 'b, J>, IncompatibleLayout> {
327 J::try_join_from(buffers, self)
328 }
329
330 pub fn listen<'b, B: Accessible>(&'b mut self, buffers: B) -> Chain<'w, 's, 'a, 'b, B::Keys> {
332 buffers.listen(self)
333 }
334
335 pub fn try_listen<'b, Keys: Accessor>(
337 &'b mut self,
338 buffers: &BufferMap,
339 ) -> Result<Chain<'w, 's, 'a, 'b, Keys>, IncompatibleLayout> {
340 Keys::try_listen_from(buffers, self)
341 }
342
343 pub fn create_buffer_access<T, B: Bufferable>(
351 &mut self,
352 buffers: B,
353 ) -> Node<T, (T, BufferKeys<B>)>
354 where
355 B::BufferType: Accessing,
356 T: 'static + Send + Sync,
357 {
358 let buffers = buffers.into_buffer(self);
359 buffers.access(self)
360 }
361
362 pub fn try_create_buffer_access<T, Keys: Accessor>(
366 &mut self,
367 buffers: &BufferMap,
368 ) -> Result<Node<T, (T, Keys)>, IncompatibleLayout>
369 where
370 T: 'static + Send + Sync,
371 {
372 Keys::try_buffer_access(buffers, self)
373 }
374
375 pub fn create_collect<T, const N: usize>(
391 &mut self,
392 min: usize,
393 max: Option<usize>,
394 ) -> Node<T, SmallVec<[T; N]>>
395 where
396 T: 'static + Send + Sync,
397 {
398 if let Some(max) = max {
399 assert!(0 < max);
400 assert!(min <= max);
401 }
402
403 let source = self.commands.spawn(()).id();
404 let target = self.commands.spawn(UnusedTarget).id();
405 self.commands.queue(AddOperation::new(
406 Some(self.scope()),
407 source,
408 Collect::<T, N>::new(target, min, max),
409 ));
410
411 Node {
412 input: InputSlot::new(self.scope(), source),
413 output: Output::new(self.scope(), target),
414 streams: (),
415 }
416 }
417
418 pub fn create_collect_all<T, const N: usize>(&mut self) -> Node<T, SmallVec<[T; N]>>
420 where
421 T: 'static + Send + Sync,
422 {
423 self.create_collect(0, None)
424 }
425
426 pub fn create_collect_n<T, const N: usize>(&mut self, n: usize) -> Node<T, SmallVec<[T; N]>>
428 where
429 T: 'static + Send + Sync,
430 {
431 self.create_collect(n, Some(n))
432 }
433
434 pub fn create_split<T>(&mut self) -> (InputSlot<T>, SplitOutputs<T>)
438 where
439 T: 'static + Send + Sync + Splittable,
440 {
441 let source = self.commands.spawn(()).id();
442 self.commands.queue(AddOperation::new(
443 Some(self.scope()),
444 source,
445 OperateSplit::<T>::default(),
446 ));
447
448 (
449 InputSlot::new(self.scope(), source),
450 SplitOutputs::new(self.scope(), source),
451 )
452 }
453
454 pub fn create_cancel<T>(&mut self) -> InputSlot<T>
461 where
462 T: 'static + Send + Sync + ToString,
463 {
464 let source = self.commands.spawn(()).id();
465 self.commands.queue(AddOperation::new(
466 Some(self.scope()),
467 source,
468 OperateCancel::<T>::new(),
469 ));
470
471 InputSlot::new(self.scope(), source)
472 }
473
474 pub fn create_quiet_cancel(&mut self) -> InputSlot<()> {
480 let source = self.commands.spawn(()).id();
481 self.commands.queue(AddOperation::new(
482 Some(self.scope()),
483 source,
484 OperateQuietCancel,
485 ));
486
487 InputSlot::new(self.scope(), source)
488 }
489
490 pub fn on_cleanup<B, Settings>(
510 &mut self,
511 from_buffers: B,
512 build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
513 ) where
514 B: Bufferable,
515 B::BufferType: Accessing,
516 Settings: Into<ScopeSettings>,
517 {
518 from_buffers.into_buffer(self).on_cleanup(self, build);
519 }
520
521 pub fn on_cancel<B, Settings>(
535 &mut self,
536 from_buffers: B,
537 build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
538 ) where
539 B: Bufferable,
540 B::BufferType: Accessing,
541 Settings: Into<ScopeSettings>,
542 {
543 from_buffers.into_buffer(self).on_cancel(self, build);
544 }
545
546 pub fn on_terminate<B, Settings>(
554 &mut self,
555 from_buffers: B,
556 build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
557 ) where
558 B: Bufferable,
559 B::BufferType: Accessing,
560 Settings: Into<ScopeSettings>,
561 {
562 from_buffers.into_buffer(self).on_terminate(self, build);
563 }
564
565 pub fn on_cleanup_if<B, Settings>(
569 &mut self,
570 conditions: CleanupWorkflowConditions,
571 from_buffers: B,
572 build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
573 ) where
574 B: Bufferable,
575 B::BufferType: Accessing,
576 Settings: Into<ScopeSettings>,
577 {
578 from_buffers
579 .into_buffer(self)
580 .on_cleanup_if(self, conditions, build);
581 }
582
583 pub fn create_trim<T>(&mut self, branches: impl IntoIterator<Item = TrimBranch>) -> Node<T, T>
587 where
588 T: 'static + Send + Sync,
589 {
590 let branches: SmallVec<[_; 16]> = branches.into_iter().collect();
591 for branch in &branches {
592 branch.verify_scope(self.scope());
593 }
594
595 let source = self.commands.spawn(()).id();
596 let target = self.commands.spawn(UnusedTarget).id();
597 self.commands.queue(AddOperation::new(
598 Some(self.scope()),
599 source,
600 Trim::<T>::new(branches, target),
601 ));
602
603 Node {
604 input: InputSlot::new(self.scope(), source),
605 output: Output::new(self.scope(), target),
606 streams: (),
607 }
608 }
609
610 pub fn create_gate<T, B>(&mut self, buffers: B) -> Node<GateRequest<T>, T>
619 where
620 B: Bufferable,
621 T: 'static + Send + Sync,
622 {
623 let buffers = buffers.into_buffer(self);
624 buffers.verify_scope(self.scope());
625
626 let source = self.commands.spawn(()).id();
627 let target = self.commands.spawn(UnusedTarget).id();
628 self.commands.queue(AddOperation::new(
629 Some(self.scope()),
630 source,
631 OperateDynamicGate::<T, _>::new(buffers, target),
632 ));
633
634 Node {
635 input: InputSlot::new(self.scope(), source),
636 output: Output::new(self.scope(), target),
637 streams: (),
638 }
639 }
640
641 pub fn create_gate_action<T, B>(&mut self, action: Gate, buffers: B) -> Node<T, T>
648 where
649 B: Bufferable,
650 T: 'static + Send + Sync,
651 {
652 let buffers = buffers.into_buffer(self);
653 buffers.verify_scope(self.scope());
654
655 let source = self.commands.spawn(()).id();
656 let target = self.commands.spawn(UnusedTarget).id();
657 self.commands.queue(AddOperation::new(
658 Some(self.scope()),
659 source,
660 OperateStaticGate::<T, _>::new(buffers, target, action),
661 ));
662
663 Node {
664 input: InputSlot::new(self.scope(), source),
665 output: Output::new(self.scope(), target),
666 streams: (),
667 }
668 }
669
670 pub fn create_gate_open<B, T>(&mut self, buffers: B) -> Node<T, T>
674 where
675 B: Bufferable,
676 T: 'static + Send + Sync,
677 {
678 self.create_gate_action(Gate::Open, buffers)
679 }
680
681 pub fn create_gate_close<T, B>(&mut self, buffers: B) -> Node<T, T>
685 where
686 B: Bufferable,
687 T: 'static + Send + Sync,
688 {
689 self.create_gate_action(Gate::Closed, buffers)
690 }
691
692 pub fn scope(&self) -> Entity {
694 self.context.scope
695 }
696
697 pub fn commands(&mut self) -> &mut Commands<'w, 's> {
699 self.commands
700 }
701
702 pub(crate) fn create_scope_impl<Request, Response, Streams, Settings>(
704 &mut self,
705 scope_id: Entity,
706 exit_scope: Entity,
707 build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
708 ) -> Node<Request, Response, Streams>
709 where
710 Request: 'static + Send + Sync,
711 Response: 'static + Send + Sync,
712 Streams: StreamPack,
713 Settings: Into<ScopeSettings>,
714 {
715 let ScopeEndpoints {
718 terminal,
719 enter_scope,
720 finish_scope_cancel,
721 } = OperateScope::add::<Request, Response>(
722 Some(self.scope()),
723 scope_id,
724 Some(exit_scope),
725 self.commands,
726 );
727
728 let (stream_in, stream_out) =
729 Streams::spawn_scope_streams(scope_id, self.scope(), self.commands);
730
731 let mut builder = Builder {
732 context: BuilderScopeContext {
733 scope: scope_id,
734 finish_scope_cancel,
735 },
736 commands: self.commands,
737 };
738
739 let scope = Scope {
740 start: Output::new(scope_id, enter_scope),
741 terminate: InputSlot::new(scope_id, terminal),
742 streams: stream_in,
743 };
744
745 let settings = build(scope, &mut builder).into();
746 self.commands
747 .entity(scope_id)
748 .insert(ScopeSettingsStorage(settings));
749
750 Node {
751 input: InputSlot::new(self.scope(), scope_id),
752 output: Output::new(self.scope(), exit_scope),
753 streams: stream_out,
754 }
755 }
756
757 pub(crate) fn create_injection_impl<Request, Response, Streams>(
758 &mut self,
759 source: Entity,
760 ) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams>
761 where
762 Request: 'static + Send + Sync,
763 Response: 'static + Send + Sync,
764 Streams: StreamPack,
765 {
766 let target = self.commands.spawn(UnusedTarget).id();
767
768 let mut map = StreamTargetMap::default();
769 let streams = Streams::spawn_node_streams(source, &mut map, self);
770 self.commands.entity(source).insert(map);
771 self.commands.queue(AddOperation::new(
772 Some(self.scope()),
773 source,
774 Injection::<Request, Response, Streams>::new(target),
775 ));
776
777 Node {
778 input: InputSlot::new(self.scope(), source),
779 output: Output::new(self.scope(), target),
780 streams,
781 }
782 }
783
784 pub fn context(&self) -> BuilderScopeContext {
785 self.context
786 }
787}
788
789#[derive(Clone)]
795pub struct CleanupWorkflowConditions {
796 pub(crate) run_on_terminate: bool,
797 pub(crate) run_on_cancel: bool,
798}
799
800impl CleanupWorkflowConditions {
801 pub fn always_if(run_on_terminate: bool, run_on_cancel: bool) -> Self {
802 CleanupWorkflowConditions {
803 run_on_terminate,
804 run_on_cancel,
805 }
806 }
807}
808
809#[cfg(test)]
810mod tests {
811 use crate::{CancellationCause, prelude::*, testing::*};
812 use smallvec::SmallVec;
813 use std::time::Instant;
814
815 #[test]
816 fn test_disconnected_workflow() {
817 let mut context = TestingContext::minimal_plugins();
818
819 let workflow = context.spawn_io_workflow(|_, _| {
820 });
822 check_unreachable(workflow, 1, &mut context);
824 check_unreachable(workflow, 1, &mut context);
825 check_unreachable(workflow, 1, &mut context);
826
827 let workflow = context.spawn_io_workflow(|scope, builder| {
828 let node = builder.create_map_block(|v| v);
829 builder.connect(scope.start, node.input);
830 builder.connect(node.output, node.input);
832 });
833 check_unreachable(workflow, 1, &mut context);
834 check_unreachable(workflow, 1, &mut context);
835 check_unreachable(workflow, 1, &mut context);
836
837 let workflow = context.spawn_io_workflow(|scope, builder| {
838 builder.chain(scope.start).map_block(|v| v).fork_clone((
839 |chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
840 |chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
841 |chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
842 ));
843
844 let exit_node = builder.create_map_block(|v| v);
846 builder.connect(exit_node.output, scope.terminate);
847 });
848 check_unreachable(workflow, 1, &mut context);
849 check_unreachable(workflow, 1, &mut context);
850 check_unreachable(workflow, 1, &mut context);
851
852 let workflow = context.spawn_io_workflow(|scope, builder| {
853 let entry_buffer = builder.create_buffer::<()>(BufferSettings::keep_all());
854 builder.chain(scope.start).map_block(|v| v).fork_clone((
855 |chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
856 |chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
857 |chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
858 ));
859
860 let exit_buffer = builder.create_buffer::<()>(BufferSettings::keep_all());
863 builder
864 .listen(exit_buffer)
865 .map_block(|_| ())
866 .connect(scope.terminate);
867 });
868 check_unreachable(workflow, 1, &mut context);
869 check_unreachable(workflow, 1, &mut context);
870 check_unreachable(workflow, 1, &mut context);
871 }
872
873 fn check_unreachable(
874 workflow: Service<(), ()>,
875 flush_cycles: usize,
876 context: &mut TestingContext,
877 ) {
878 let r = context.try_resolve_request((), workflow, flush_cycles);
879 assert!(matches!(
880 *r.unwrap_err().cause,
881 CancellationCause::Unreachable(_)
882 ));
883 }
884
885 #[test]
886 fn test_fork_clone() {
887 let mut context = TestingContext::minimal_plugins();
888
889 let workflow = context.spawn_io_workflow(|scope, builder| {
890 let fork = scope.start.fork_clone(builder);
891 let branch_a = fork.clone_output(builder);
892 let branch_b = fork.clone_output(builder);
893 builder.connect(branch_a, scope.terminate);
894 builder.connect(branch_b, scope.terminate);
895 });
896
897 let r = context.resolve_request(5.0, workflow);
898 assert_eq!(r, 5.0);
899
900 let workflow = context.spawn_io_workflow(|scope, builder| {
901 builder.chain(scope.start).fork_clone((
902 |chain: Chain<f64>| chain.connect(scope.terminate),
903 |chain: Chain<f64>| chain.connect(scope.terminate),
904 ));
905 });
906
907 let r = context.resolve_request(3.0, workflow);
908 assert_eq!(r, 3.0);
909
910 let workflow = context.spawn_io_workflow(|scope, builder| {
911 builder.chain(scope.start).fork_clone((
912 |chain: Chain<f64>| {
913 chain
914 .map_block(|t| WaitRequest {
915 duration: Duration::from_secs_f64(10.0 * t),
916 value: 10.0 * t,
917 })
918 .map(|r: AsyncMap<WaitRequest<f64>>| wait(r.request))
919 .connect(scope.terminate)
920 },
921 |chain: Chain<f64>| {
922 chain
923 .map_block(|t| WaitRequest {
924 duration: Duration::from_secs_f64(t / 100.0),
925 value: t / 100.0,
926 })
927 .map(|r: AsyncMap<WaitRequest<f64>>| wait(r.request))
928 .connect(scope.terminate)
929 },
930 ));
931 });
932
933 let r = context.resolve_request(1.0, workflow);
934 assert_eq!(r, 0.01);
935
936 let workflow = context.spawn_io_workflow(|scope, builder| {
937 let (fork_input, fork_output) = builder.create_fork_clone();
938 builder.connect(scope.start, fork_input);
939 let a = fork_output.clone_output(builder);
940 let b = fork_output.clone_output(builder);
941 builder.join((a, b)).connect(scope.terminate);
942 });
943
944 let r = context.resolve_request(5, workflow);
945 assert_eq!(r, (5, 5));
946 }
947
948 #[test]
949 fn test_stream_reachability() {
950 let mut context = TestingContext::minimal_plugins();
951
952 let workflow = context.spawn_io_workflow(|scope, builder| {
954 let stream_node = builder.create_map(|_: BlockingMap<(), StreamOf<u32>>| {
955 });
958
959 builder.connect(scope.start, stream_node.input);
960 builder
961 .chain(stream_node.streams)
962 .map_block(|value| 2 * value)
963 .connect(scope.terminate);
964 });
965
966 let r = context.try_resolve_request((), workflow, ());
967 assert!(r.is_err());
968
969 let workflow = context.spawn_io_workflow(|scope, builder| {
971 let stream_node = builder.create_map(|_: AsyncMap<(), StreamOf<u32>>| {
972 async { }
973 });
974
975 builder.connect(scope.start, stream_node.input);
976 builder
977 .chain(stream_node.streams)
978 .map_block(|value| 2 * value)
979 .connect(scope.terminate);
980 });
981
982 let r = context.try_resolve_request((), workflow, ());
983 assert!(r.is_err());
984 }
985
986 use tokio::sync::mpsc::unbounded_channel;
987
988 #[test]
989 fn test_on_cleanup() {
990 let mut context = TestingContext::minimal_plugins();
991
992 let (sender, mut receiver) = unbounded_channel();
993 let workflow = context.spawn_io_workflow(|scope, builder| {
994 let input = scope.start.fork_clone(builder);
995
996 let buffer = builder.create_buffer(BufferSettings::default());
997 let input_to_buffer = input.clone_output(builder);
998 builder.connect(input_to_buffer, buffer.input_slot());
999
1000 let none_node = builder.create_map_block(produce_none);
1001 let input_to_node = input.clone_output(builder);
1002 builder.connect(input_to_node, none_node.input);
1003 builder
1004 .chain(none_node.output)
1005 .cancel_on_none()
1006 .connect(scope.terminate);
1007
1008 builder.on_cancel(buffer, |scope, builder| {
1012 builder
1013 .chain(scope.start)
1014 .consume_buffer::<8>()
1015 .map_block(move |values| {
1016 for value in values {
1017 sender.send(value).unwrap();
1018 }
1019 })
1020 .connect(scope.terminate);
1021 });
1022 });
1023
1024 let r = context.try_resolve_request(5, workflow, ());
1025 assert!(r.is_err());
1026
1027 let channel_output = receiver.try_recv().unwrap();
1028 assert_eq!(channel_output, 5);
1029 assert!(receiver.try_recv().is_err());
1030 assert!(context.confirm_buffers_empty().is_ok());
1031
1032 let (cancel_sender, mut cancel_receiver) = unbounded_channel();
1033 let (terminate_sender, mut terminate_receiver) = unbounded_channel();
1034 let (cleanup_sender, mut cleanup_receiver) = unbounded_channel();
1035 let workflow = context.spawn_io_workflow(|scope, builder| {
1036 let input = scope.start.fork_clone(builder);
1037
1038 let cancel_buffer = builder.create_buffer(BufferSettings::default());
1039 let input_to_cancel = input.clone_output(builder);
1040 builder.connect(input_to_cancel, cancel_buffer.input_slot());
1041
1042 let terminate_buffer = builder.create_buffer(BufferSettings::default());
1043 let input_to_terminate = input.clone_output(builder);
1044 builder.connect(input_to_terminate, terminate_buffer.input_slot());
1045
1046 let cleanup_buffer = builder.create_buffer(BufferSettings::default());
1047 let input_to_cleanup = input.clone_output(builder);
1048 builder.connect(input_to_cleanup, cleanup_buffer.input_slot());
1049
1050 let filter_node =
1051 builder.create_map_block(|value: u64| if value >= 5 { Some(value) } else { None });
1052 let input_to_filter_node = input.clone_output(builder);
1053 builder.connect(input_to_filter_node, filter_node.input);
1054 builder
1055 .chain(filter_node.output)
1056 .cancel_on_none()
1057 .connect(scope.terminate);
1058
1059 builder.on_cancel(cancel_buffer, |scope, builder| {
1060 builder
1061 .chain(scope.start)
1062 .consume_buffer::<8>()
1063 .map_block(move |values| {
1064 for value in values {
1065 cancel_sender.send(value).unwrap();
1066 }
1067 })
1068 .connect(scope.terminate);
1069 });
1070
1071 builder.on_terminate(terminate_buffer, |scope, builder| {
1072 builder
1073 .chain(scope.start)
1074 .consume_buffer::<8>()
1075 .map_block(move |values| {
1076 for value in values {
1077 terminate_sender.send(value).unwrap();
1078 }
1079 })
1080 .connect(scope.terminate);
1081 });
1082
1083 builder.on_cleanup(cleanup_buffer, |scope, builder| {
1084 builder
1085 .chain(scope.start)
1086 .consume_buffer::<8>()
1087 .map_block(move |values| {
1088 for value in values {
1089 cleanup_sender.send(value).unwrap();
1090 }
1091 })
1092 .connect(scope.terminate);
1093 });
1094 });
1095
1096 let r = context.try_resolve_request(3, workflow, 10);
1097 assert!(r.is_err());
1098
1099 assert_eq!(cancel_receiver.try_recv().unwrap(), 3);
1100 assert!(cancel_receiver.try_recv().is_err());
1101 assert_eq!(cleanup_receiver.try_recv().unwrap(), 3);
1102 assert!(cleanup_receiver.try_recv().is_err());
1103 assert!(terminate_receiver.try_recv().is_err());
1104 assert!(context.no_unhandled_errors());
1105 assert!(context.confirm_buffers_empty().is_ok());
1106
1107 let r = context.try_resolve_request(6, workflow, 10).unwrap();
1108 assert_eq!(r, 6);
1109
1110 assert_eq!(terminate_receiver.try_recv().unwrap(), 6);
1111 assert!(terminate_receiver.try_recv().is_err());
1112 assert_eq!(cleanup_receiver.try_recv().unwrap(), 6);
1113 assert!(cleanup_receiver.try_recv().is_err());
1114 assert!(cancel_receiver.try_recv().is_err());
1115 assert!(context.no_unhandled_errors());
1116 assert!(context.confirm_buffers_empty().is_ok());
1117 }
1118
1119 #[test]
1120 fn test_double_collection() {
1121 let mut context = TestingContext::minimal_plugins();
1122
1123 let delay = context.spawn_delay(Duration::from_secs_f32(0.01));
1124
1125 let workflow = context.spawn_io_workflow(|scope, builder| {
1126 let later_collect = builder.create_collect_all::<i32, 8>();
1132 let earlier_collect = builder.create_collect_all::<i32, 8>();
1133
1134 builder
1135 .chain(scope.start)
1136 .spread()
1137 .then(delay)
1138 .map_block(|v| if v <= 4 { Some(v) } else { None })
1139 .dispose_on_none()
1140 .connect(earlier_collect.input);
1141
1142 builder
1143 .chain(earlier_collect.output)
1144 .spread()
1145 .connect(later_collect.input);
1146
1147 builder.connect(later_collect.output, scope.terminate);
1148 });
1149
1150 let r = context.resolve_request([1, 2, 3, 4, 5], workflow);
1151 assert_eq!(r.as_slice(), &[1, 2, 3, 4]);
1152
1153 let workflow = context.spawn_io_workflow(|scope, builder| {
1154 let earlier_collect = builder.create_collect_all::<i32, 8>();
1160 let later_collect = builder.create_collect_all::<i32, 8>();
1161
1162 builder
1163 .chain(scope.start)
1164 .spread()
1165 .then(delay)
1166 .connect(earlier_collect.input);
1167
1168 builder
1169 .chain(earlier_collect.output)
1170 .spread()
1171 .connect(later_collect.input);
1172
1173 builder.chain(later_collect.output).spread().fork_clone((
1174 |chain: Chain<i32>| chain.connect(earlier_collect.input),
1175 |chain: Chain<i32>| chain.connect(scope.terminate),
1176 ));
1177 });
1178
1179 let r = context.try_resolve_request([1, 2, 3, 4, 5], workflow, ());
1180 assert!(r.is_err());
1181
1182 let workflow = context.spawn_io_workflow(|scope, builder| {
1183 let earlier_collect = builder.create_collect_all::<i32, 8>();
1188
1189 builder
1190 .chain(scope.start)
1191 .spread()
1192 .then(delay)
1193 .map_block(|v| if v <= 4 { Some(v) } else { None })
1194 .dispose_on_none()
1195 .connect(earlier_collect.input);
1196
1197 let _ = builder
1198 .chain(earlier_collect.output)
1199 .then_io_scope(|scope, builder| {
1200 builder
1201 .chain(scope.start)
1202 .spread()
1203 .collect_all::<8>()
1204 .connect(scope.terminate);
1205 })
1206 .fork_clone((
1207 |chain: Chain<_>| chain.spread().connect(earlier_collect.input),
1208 |chain: Chain<_>| chain.connect(scope.terminate),
1209 ));
1210 });
1211
1212 check_collections(workflow, [1, 2, 3, 4], [1, 2, 3, 4], &mut context);
1213 check_collections(workflow, [1, 2, 3, 4, 5, 6], [1, 2, 3, 4], &mut context);
1214 check_collections(workflow, [1, 8, 2, 7, 3, 6], [1, 2, 3], &mut context);
1215 check_collections(
1216 workflow,
1217 [8, 7, 6, 5, 4, 3, 2, 1],
1218 [4, 3, 2, 1],
1219 &mut context,
1220 );
1221 check_collections(workflow, [6, 7, 8, 9, 10], [], &mut context);
1222 }
1223
1224 fn check_collections(
1225 workflow: Service<SmallVec<[i32; 8]>, SmallVec<[i32; 8]>>,
1226 input: impl IntoIterator<Item = i32>,
1227 expectation: impl IntoIterator<Item = i32>,
1228 context: &mut TestingContext,
1229 ) {
1230 let input: SmallVec<[i32; 8]> = SmallVec::from_iter(input);
1231 let expectation: SmallVec<[i32; 8]> = SmallVec::from_iter(expectation);
1232 let r = context.resolve_request(input, workflow);
1233 assert_eq!(r, expectation);
1234 }
1235
1236 #[test]
1237 fn benchmarks() {
1238 let mut context = TestingContext::minimal_plugins();
1239
1240 let workflow = context.spawn_io_workflow(|scope, builder| {
1241 let (start_test, end_test) = build_benchmark_fixture(scope, builder);
1242 builder.connect(start_test, end_test);
1243 });
1244
1245 let result = context
1246 .try_resolve_request((), workflow, Duration::from_secs(10))
1247 .unwrap();
1248 println!("Performance for basic connection:\n{result:#?}");
1249
1250 let workflow = context.spawn_io_workflow(|scope, builder| {
1251 let (start_test, end_test) = build_benchmark_fixture(scope, builder);
1252 builder
1253 .chain(start_test)
1254 .then_io_scope(|scope, builder| {
1255 builder.connect(scope.start, scope.terminate);
1256 })
1257 .connect(end_test);
1258 });
1259
1260 let result = context
1261 .try_resolve_request((), workflow, Duration::from_secs(10))
1262 .unwrap();
1263 println!("Performance for basic connection:\n{result:#?}");
1264 }
1265
1266 fn build_benchmark_fixture(
1267 scope: Scope<(), TimeStats>,
1268 builder: &mut Builder,
1269 ) -> (Output<Instant>, InputSlot<Instant>) {
1270 let initial_time = builder
1271 .commands()
1272 .spawn_service(get_initial_time.into_blocking_service());
1273 let finish_time = builder
1274 .commands()
1275 .spawn_service(finish_time_range.into_blocking_service());
1276 let collect_samples = builder
1277 .commands()
1278 .spawn_service(collect_samples.into_blocking_service());
1279
1280 let samples = builder.create_buffer(BufferSettings::keep_all());
1281
1282 let initial_node = builder.create_node(initial_time);
1283 let finish_node = builder.create_node(finish_time);
1284
1285 builder.connect(scope.start, initial_node.input);
1286 builder.connect(finish_node.output, samples.input_slot());
1287
1288 builder
1289 .listen(samples)
1290 .then(collect_samples)
1291 .dispose_on_none()
1292 .connect(scope.terminate);
1293
1294 builder
1295 .listen(samples)
1296 .map_block(|_| ())
1297 .connect(initial_node.input);
1298
1299 (initial_node.output, finish_node.input)
1300 }
1301
1302 #[derive(Debug, Clone, Copy)]
1303 struct TimeRange {
1304 initial_time: Instant,
1305 finish_time: Instant,
1306 }
1307
1308 #[derive(Debug, Clone, Copy)]
1309 struct TimeStats {
1310 #[allow(unused)]
1311 sample_count: usize,
1312 #[allow(unused)]
1313 average: Duration,
1314 #[allow(unused)]
1315 std_dev: Duration,
1316 #[allow(unused)]
1317 highest: Duration,
1318 #[allow(unused)]
1319 lowest: Duration,
1320 }
1321
1322 impl TimeStats {
1323 fn new(samples: impl IntoIterator<Item = TimeRange>) -> Self {
1324 let samples: Vec<_> = samples
1325 .into_iter()
1326 .map(|s| s.finish_time - s.initial_time)
1327 .collect();
1328 let sample_count = samples.len();
1329
1330 let mut highest = None;
1331 let mut lowest = None;
1332 let mut average = Duration::new(0, 0);
1333 for sample in &samples {
1334 average += *sample;
1335 if highest.is_none_or(|h| *sample > h) {
1336 highest = Some(*sample);
1337 }
1338
1339 if lowest.is_none_or(|l| *sample < l) {
1340 lowest = Some(*sample);
1341 }
1342 }
1343
1344 let average = average / sample_count as u32;
1345 let mut radicand = 0.0;
1346 for sample in samples {
1347 let delta = (sample.as_nanos() as i64 - average.as_nanos() as i64) as f64;
1348 radicand += f64::powf(delta, 2.0);
1349 }
1350
1351 let std_dev = Duration::from_nanos(f64::sqrt(radicand) as u64);
1352 let highest = highest.unwrap();
1353 let lowest = lowest.unwrap();
1354 TimeStats {
1355 sample_count,
1356 average,
1357 std_dev,
1358 highest,
1359 lowest,
1360 }
1361 }
1362 }
1363
1364 fn get_initial_time(_: In<()>) -> Instant {
1365 Instant::now()
1366 }
1367
1368 fn finish_time_range(In(initial_time): In<Instant>) -> TimeRange {
1369 TimeRange {
1370 initial_time,
1371 finish_time: Instant::now(),
1372 }
1373 }
1374
1375 fn collect_samples(
1376 In(key): In<BufferKey<TimeRange>>,
1377 access: BufferAccess<TimeRange>,
1378 ) -> Option<TimeStats> {
1379 let samples = access.get(&key).unwrap();
1380 if samples.len() >= 1000 {
1381 Some(TimeStats::new(samples.iter().copied()))
1382 } else {
1383 None
1384 }
1385 }
1386}