1use bevy_ecs::{
19 hierarchy::ChildOf,
20 prelude::{Bundle, Commands, Component, Entity, Event},
21};
22
23use std::future::Future;
24
25use crate::{
26 AsMapOnce, Cancellable, IntoAsyncMapOnce, IntoBlockingMapOnce, Outcome, Promise, ProvideOnce,
27 Sendish, StreamPack, StreamTargetMap, UnusedTarget,
28};
29
30mod detach;
31pub(crate) use detach::*;
32
33mod finished;
34pub(crate) use finished::*;
35
36mod insert;
37pub(crate) use insert::*;
38
39mod internal;
40pub(crate) use internal::*;
41
42mod map;
43pub(crate) use map::*;
44
45mod push;
46pub(crate) use push::*;
47
48mod capture;
49pub(crate) use capture::*;
50
51mod send_event;
52pub(crate) use send_event::*;
53
54mod store;
55pub(crate) use store::*;
56
57mod taken;
58pub(crate) use taken::*;
59
60pub struct Series<'w, 's, 'a, Response, Streams> {
79 pub(crate) source: Entity,
80 pub(crate) target: Entity,
81 pub(crate) commands: &'a mut Commands<'w, 's>,
82 pub(crate) _ignore: std::marker::PhantomData<fn(Response, Streams)>,
83}
84
85impl<'w, 's, 'a, Response, Streams> Series<'w, 's, 'a, Response, Streams>
86where
87 Response: 'static + Send + Sync,
88 Streams: StreamPack,
89{
90 pub fn detach(self) -> Series<'w, 's, 'a, Response, Streams> {
104 self.commands.queue(Detach {
105 target: self.target,
106 });
107 self
108 }
109
110 pub fn session_id(&self) -> Entity {
112 self.source
113 }
114
115 #[must_use]
118 pub fn capture(self) -> Capture<Response, Streams> {
119 let target = self.target;
120 let mut map = StreamTargetMap::default();
121 let stream_receivers = Streams::take_streams(target, &mut map, self.commands);
122 self.commands.entity(self.source).insert(map);
123
124 let (outcome, capture_outcome) = Outcome::new();
125 self.send_outcome(capture_outcome);
126
127 Capture {
128 outcome,
129 streams: stream_receivers,
130 session: target,
131 }
132 }
133
134 #[must_use]
137 #[deprecated(since = "0.0.6", note = "Use .capture() instead")]
138 pub fn take(self) -> Recipient<Response, Streams> {
139 let (response_sender, response_promise) = Promise::<Response>::new();
140 self.commands.queue(AddExecution::new(
141 Some(self.source),
142 self.target,
143 TakenResponse::<Response>::new(response_sender),
144 ));
145 let mut map = StreamTargetMap::default();
146 let stream_receivers = Streams::take_streams(self.target, &mut map, self.commands);
147 self.commands.entity(self.source).insert(map);
148
149 Recipient {
150 response: response_promise,
151 streams: stream_receivers,
152 session: self.target,
153 }
154 }
155
156 #[must_use]
158 pub fn outcome(self) -> Outcome<Response> {
159 let (outcome, capture) = Outcome::new();
160 self.send_outcome(capture);
161 outcome
162 }
163
164 #[deprecated(since = "0.0.6", note = "Use .outcome() instead")]
166 pub fn take_response(self) -> Promise<Response> {
167 let (response_sender, response_promise) = Promise::<Response>::new();
168 self.commands.queue(AddExecution::new(
169 Some(self.source),
170 self.target,
171 TakenResponse::<Response>::new(response_sender),
172 ));
173 response_promise
174 }
175
176 #[must_use]
178 pub fn then<P: ProvideOnce<Request = Response>>(
179 self,
180 provider: P,
181 ) -> Series<'w, 's, 'a, P::Response, P::Streams> {
182 let source = self.target;
183 let target = self
184 .commands
185 .spawn((Detached::default(), UnusedTarget, SeriesMarker))
186 .id();
187
188 self.commands
191 .entity(source)
192 .insert((Cancellable::new(cancel_execution), SeriesMarker))
193 .remove::<UnusedTarget>()
194 .insert(ChildOf(target));
195 provider.connect(None, source, target, self.commands);
196 Series {
197 source,
198 target,
199 commands: self.commands,
200 _ignore: Default::default(),
201 }
202 }
203
204 #[must_use]
211 pub fn map_block<U>(
212 self,
213 f: impl FnOnce(Response) -> U + 'static + Send + Sync,
214 ) -> Series<'w, 's, 'a, U, ()>
215 where
216 U: 'static + Send + Sync,
217 {
218 self.then(f.into_blocking_map_once())
219 }
220
221 #[must_use]
228 pub fn map_async<Task>(
229 self,
230 f: impl FnOnce(Response) -> Task + 'static + Send + Sync,
231 ) -> Series<'w, 's, 'a, Task::Output, ()>
232 where
233 Task: Future + 'static + Sendish,
234 Task::Output: 'static + Send + Sync,
235 {
236 self.then(f.into_async_map_once())
237 }
238
239 pub fn map<M, F: AsMapOnce<M>>(
246 self,
247 f: F,
248 ) -> Series<
249 'w,
250 's,
251 'a,
252 <F::MapType as ProvideOnce>::Response,
253 <F::MapType as ProvideOnce>::Streams,
254 >
255 where
256 F::MapType: ProvideOnce<Request = Response>,
257 <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
258 <F::MapType as ProvideOnce>::Streams: StreamPack,
259 {
260 self.then(f.as_map_once())
261 }
262
263 pub fn store(self, target: Entity) {
272 self.commands.queue(AddExecution::new(
273 Some(self.source),
274 self.target,
275 Store::<Response>::new(target),
276 ));
277
278 let mut map = StreamTargetMap::default();
279 let stream_targets = Streams::collect_streams(self.source, target, &mut map, self.commands);
280 self.commands
281 .entity(self.source)
282 .insert((stream_targets, map));
283 }
284
285 #[must_use]
289 pub fn collect_streams(self, target: Entity) -> Series<'w, 's, 'a, Response, ()> {
290 let mut map = StreamTargetMap::default();
291 let stream_targets = Streams::collect_streams(self.source, target, &mut map, self.commands);
292 self.commands
293 .entity(self.source)
294 .insert((stream_targets, map));
295
296 Series {
297 source: self.source,
298 target: self.target,
299 commands: self.commands,
300 _ignore: Default::default(),
301 }
302 }
303
304 pub fn push(self, target: Entity) {
313 self.commands.queue(AddExecution::new(
314 Some(self.source),
315 self.target,
316 Push::<Response>::new(target, false),
317 ));
318
319 let mut map = StreamTargetMap::default();
320 let stream_targets = Streams::collect_streams(self.source, target, &mut map, self.commands);
321 self.commands
322 .entity(self.source)
323 .insert((stream_targets, map));
324 }
325
326 pub(crate) fn send_outcome(self, capture: CaptureOutcome<Response>) {
328 self.commands
329 .queue(AddExecution::new(Some(self.source), self.target, capture));
330 }
331
332 }
338
339impl<'w, 's, 'a, Response, Streams> Series<'w, 's, 'a, Response, Streams>
340where
341 Response: Bundle,
342{
343 pub fn insert(self, target: Entity) {
353 self.commands.queue(AddExecution::new(
354 Some(self.source),
355 self.target,
356 Insert::<Response>::new(target),
357 ));
358 }
359}
360
361impl<'w, 's, 'a, Response, Streams> Series<'w, 's, 'a, Response, Streams>
362where
363 Response: Event,
364{
365 pub fn send_event(self) {
370 self.commands.queue(AddExecution::new(
371 Some(self.source),
372 self.target,
373 SendEvent::<Response>::new(),
374 ));
375 }
376}
377
378pub struct Recipient<Response, Streams: StreamPack> {
380 pub response: Promise<Response>,
381 pub streams: Streams::StreamReceivers,
382 pub session: Entity,
390}
391
392pub struct Capture<Response, Streams: StreamPack> {
393 pub outcome: Outcome<Response>,
394 pub streams: Streams::StreamReceivers,
395 pub session: Entity,
396}
397
398#[derive(Component)]
400pub struct Storage<T> {
401 pub data: T,
402 pub session: Entity,
403}
404
405#[derive(Component)]
410pub struct Collection<T> {
411 pub items: Vec<Storage<T>>,
413}
414
415impl<T> Default for Collection<T> {
416 fn default() -> Self {
417 Self {
418 items: Default::default(),
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use crate::{ContinuousQueueView, prelude::*, testing::*};
426 use smallvec::SmallVec;
427 use std::{
428 sync::{Arc, Mutex},
429 time::{Duration, Instant},
430 };
431 use tokio::sync::mpsc::unbounded_channel;
432
433 #[test]
434 fn test_dropped_chain() {
435 let mut context = TestingContext::minimal_plugins();
436
437 let (detached_sender, mut detached_receiver) = unbounded_channel();
438 let (attached_sender, mut attached_receiver) = unbounded_channel();
439
440 context.command(|commands| {
441 let _ = commands
442 .request("hello".to_owned(), to_uppercase.into_blocking_map())
443 .map_block(move |value| {
444 detached_sender.send(value.clone()).unwrap();
445 value
446 })
447 .detach()
448 .map_block(to_lowercase)
449 .map_block(move |value| {
450 attached_sender.send(value.clone()).unwrap();
451 value
452 })
453 .map_block(to_uppercase);
454 });
455
456 context.run(1);
457 assert_eq!(detached_receiver.try_recv().unwrap(), "HELLO");
458 assert!(attached_receiver.try_recv().is_err());
459 assert!(
460 context
461 .get_unhandled_errors()
462 .is_some_and(|e| e.unused_targets.len() == 1)
463 );
464 }
465
466 #[test]
467 fn test_blocking_map() {
468 let mut context = TestingContext::minimal_plugins();
469
470 let r = context.resolve_request(String::from("hello"), to_uppercase.into_blocking_map());
471 assert_eq!(r, "HELLO");
472
473 let r =
474 context.resolve_request(String::from("hello"), to_uppercase.into_blocking_map_once());
475 assert_eq!(r, "HELLO");
476
477 let mut outcome = context.command(|commands| {
478 commands
479 .provide(String::from("hello"))
480 .map_block(to_uppercase)
481 .outcome()
482 });
483
484 context.run_while_pending(&mut outcome);
485 context.assert_no_errors();
486 assert_eq!(outcome.try_recv().unwrap().unwrap(), "HELLO");
487
488 let mut outcome = context.command(|commands| {
489 commands
490 .provide(String::from("hello"))
491 .map_block(|request| request.to_uppercase())
492 .outcome()
493 });
494
495 context.run_while_pending(&mut outcome);
496 context.assert_no_errors();
497 assert_eq!(outcome.try_recv().unwrap().unwrap(), "HELLO");
498 }
499
500 #[test]
501 fn test_async_map() {
502 let mut context = TestingContext::minimal_plugins();
503
504 let request = WaitRequest {
505 duration: Duration::from_secs_f64(0.001),
506 value: "hello".to_owned(),
507 };
508
509 let conditions = FlushConditions::new().with_timeout(Duration::from_secs_f64(5.0));
510
511 let r = context
512 .try_resolve_request(request.clone(), wait.into_async_map(), conditions.clone())
513 .unwrap();
514 assert_eq!(r, "hello");
515
516 let r = context
517 .try_resolve_request(
518 request.clone(),
519 wait.into_async_map_once(),
520 conditions.clone(),
521 )
522 .unwrap();
523 assert_eq!(r, "hello");
524
525 let mut outcome =
526 context.command(|commands| commands.provide(request.clone()).map_async(wait).outcome());
527
528 assert!(context.run_with_conditions(&mut outcome, conditions.clone()));
529 context.assert_no_errors();
530 assert_eq!(outcome.try_recv().unwrap().unwrap(), "hello");
531
532 let mut outcome = context.command(|commands| {
533 commands
534 .provide(request.clone())
535 .map_async(|request| {
536 async move {
537 let t = Instant::now();
538 while t.elapsed() < request.duration {
539 }
541 request.value
542 }
543 })
544 .outcome()
545 });
546
547 assert!(context.run_with_conditions(&mut outcome, conditions.clone()));
548 context.assert_no_errors();
549 assert_eq!(outcome.try_recv().unwrap().unwrap(), "hello");
550 }
551
552 #[test]
553 fn test_detach() {
554 let mut context = TestingContext::minimal_plugins();
557 let service = context.spawn_delayed_map(Duration::from_millis(1), |n| *n + 1);
558
559 context.command(|commands| {
560 commands.provide(0).then(service).detach();
561 });
562
563 let (sender, mut receiver) = tokio::sync::oneshot::channel();
564 context.run_with_conditions(&mut receiver, Duration::from_millis(5));
565 assert!(
566 context.no_unhandled_errors(),
567 "Unhandled errors: {:#?}",
568 context.get_unhandled_errors(),
569 );
570
571 let _ = sender.send(());
582 }
583
584 #[derive(Clone, Debug, PartialEq, Eq, Hash, DeliveryLabel)]
585 struct UnitLabel;
586
587 #[derive(Clone, Debug, PartialEq, Eq, Hash, DeliveryLabel)]
588 struct StatefulLabel(u64);
589
590 #[test]
591 fn test_delivery_instructions() {
592 let mut context = TestingContext::minimal_plugins();
593 let service = context.spawn_delayed_map_with_viewer(
594 Duration::from_secs_f32(0.01),
595 |counter: &Arc<Mutex<u64>>| {
596 *counter.lock().unwrap() += 1;
597 },
598 |view: &ContinuousQueueView<_, ()>| {
599 assert!(view.len() <= 1);
600 },
601 );
602
603 verify_delivery_instruction_matrix(service.optional_stream_cast(), &mut context);
604
605 let service = context.spawn_async_delayed_map(
606 Duration::from_secs_f32(0.01),
607 |counter: Arc<Mutex<u64>>| {
608 *counter.lock().unwrap() += 1;
609 },
610 );
611
612 verify_delivery_instruction_matrix(service, &mut context);
613
614 let async_service = service;
615 let service = context.spawn_io_workflow(|scope, builder| {
616 builder
617 .chain(scope.start)
618 .then(async_service)
619 .connect(scope.terminate);
620 });
621
622 verify_delivery_instruction_matrix(service, &mut context);
623
624 }
627
628 fn verify_delivery_instruction_matrix(
629 service: Service<Arc<Mutex<u64>>, ()>,
630 context: &mut TestingContext,
631 ) {
632 verify_preemption_matrix(
634 service.instruct(UnitLabel),
635 service.instruct(UnitLabel.preempt()),
636 context,
637 );
638
639 verify_preemption_matrix(
641 service.instruct(StatefulLabel(5)),
642 service.instruct(StatefulLabel(5).preempt()),
643 context,
644 );
645
646 verify_queuing_matrix(service.instruct(UnitLabel), context);
648
649 verify_queuing_matrix(service.instruct(StatefulLabel(7)), context);
651
652 verify_ensured_matrix(service, UnitLabel, context);
654
655 verify_ensured_matrix(service, StatefulLabel(2), context);
657 }
658
659 fn verify_preemption_matrix(
660 queuing_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
661 preempting_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
662 context: &mut TestingContext,
663 ) {
664 verify_preemption(
666 1,
667 queuing_service.clone(),
668 preempting_service.clone(),
669 context,
670 );
671 verify_preemption(
672 2,
673 queuing_service.clone(),
674 preempting_service.clone(),
675 context,
676 );
677 verify_preemption(
678 3,
679 queuing_service.clone(),
680 preempting_service.clone(),
681 context,
682 );
683 verify_preemption(
684 4,
685 queuing_service.clone(),
686 preempting_service.clone(),
687 context,
688 );
689
690 verify_preemption(
692 1,
693 preempting_service.clone(),
694 preempting_service.clone(),
695 context,
696 );
697 verify_preemption(
698 2,
699 preempting_service.clone(),
700 preempting_service.clone(),
701 context,
702 );
703 verify_preemption(
704 3,
705 preempting_service.clone(),
706 preempting_service.clone(),
707 context,
708 );
709 verify_preemption(
710 4,
711 preempting_service.clone(),
712 preempting_service.clone(),
713 context,
714 );
715 }
716
717 fn verify_preemption(
718 preemptions: usize,
719 preempted_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
720 preempting_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
721 context: &mut TestingContext,
722 ) {
723 let counter = Arc::new(Mutex::new(0_u64));
724 let mut preempted: SmallVec<[Outcome<()>; 16]> = SmallVec::new();
725 for _ in 0..preemptions {
726 let outcome = context.command(|commands| {
727 commands
728 .request(Arc::clone(&counter), preempted_service.clone())
729 .outcome()
730 });
731 preempted.push(outcome);
732 }
733
734 let mut final_outcome = context.command(|commands| {
735 commands
736 .request(Arc::clone(&counter), preempting_service)
737 .outcome()
738 });
739
740 for outcome in &mut preempted {
741 context.run_with_conditions(outcome, Duration::from_secs(2));
742 assert!(outcome.try_recv().unwrap().is_err());
743 }
744
745 context.run_with_conditions(&mut final_outcome, Duration::from_secs(2));
746 assert!(final_outcome.try_recv().unwrap().is_ok());
747 assert_eq!(*counter.lock().unwrap(), 1);
748 context.assert_no_errors();
749 }
750
751 fn verify_queuing_matrix(
752 queuing_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
753 context: &mut TestingContext,
754 ) {
755 verify_queuing(2, queuing_service.clone(), context);
758 verify_queuing(3, queuing_service.clone(), context);
759 verify_queuing(4, queuing_service.clone(), context);
760 verify_queuing(5, queuing_service.clone(), context);
761 }
762
763 fn verify_queuing(
764 queue_size: usize,
765 queuing_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
766 context: &mut TestingContext,
767 ) {
768 let counter = Arc::new(Mutex::new(0_u64));
769 let mut queued: SmallVec<[Outcome<()>; 16]> = SmallVec::new();
770 for _ in 0..queue_size {
771 let outcome = context.command(|commands| {
772 commands
773 .request(Arc::clone(&counter), queuing_service.clone())
774 .outcome()
775 });
776 queued.push(outcome);
777 }
778
779 for outcome in &mut queued {
780 context.run_with_conditions(outcome, Duration::from_secs(2));
781 assert!(outcome.try_recv().unwrap().is_ok());
782 }
783
784 assert_eq!(*counter.lock().unwrap(), queue_size as u64);
785 context.assert_no_errors();
786 }
787
788 fn verify_ensured_matrix<L: DeliveryLabel + Clone>(
789 service: Service<Arc<Mutex<u64>>, ()>,
790 label: L,
791 context: &mut TestingContext,
792 ) {
793 verify_ensured([false, true, false, true], service, label.clone(), context);
798 verify_ensured([true, false, false, false], service, label.clone(), context);
799 verify_ensured([true, true, false, false], service, label.clone(), context);
800 verify_ensured([false, false, true, true], service, label.clone(), context);
801 verify_ensured([true, false, false, true], service, label.clone(), context);
802 verify_ensured(
803 [false, false, false, false],
804 service,
805 label.clone(),
806 context,
807 );
808 verify_ensured([true, true, true, true], service, label.clone(), context);
809 }
810
811 fn verify_ensured<L: DeliveryLabel + Clone>(
812 queued: impl IntoIterator<Item = bool>,
813 service: Service<Arc<Mutex<u64>>, ()>,
814 label: L,
815 context: &mut TestingContext,
816 ) {
817 let counter = Arc::new(Mutex::new(0_u64));
818 let mut queued_outcomes: SmallVec<[(Outcome<()>, bool); 16]> = SmallVec::new();
819 let mut expected_count = 1;
821 for ensured in queued {
822 let srv = if ensured {
823 expected_count += 1;
824 service.instruct(label.clone().ensure())
825 } else {
826 service.instruct(label.clone())
827 };
828
829 let outcome =
830 context.command(|commands| commands.request(Arc::clone(&counter), srv).outcome());
831
832 queued_outcomes.push((outcome, ensured));
833 }
834
835 let mut preempter = context.command(|commands| {
836 commands
837 .request(
838 Arc::clone(&counter),
839 service.instruct(label.clone().preempt()),
840 )
841 .outcome()
842 });
843
844 for (outcome, ensured) in &mut queued_outcomes {
845 context.run_with_conditions(outcome, Duration::from_secs(2));
846 if *ensured {
847 assert!(outcome.try_recv().unwrap().is_ok());
848 } else {
849 assert!(outcome.try_recv().unwrap().is_err());
850 }
851 }
852
853 context.run_with_conditions(&mut preempter, Duration::from_secs(2));
854 assert!(preempter.is_available());
855 assert_eq!(*counter.lock().unwrap(), expected_count);
856 context.assert_no_errors();
857 }
858}