crossflow/
series.rs

1/*
2 * Copyright (C) 2024 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use bevy_ecs::{
19    hierarchy::ChildOf,
20    prelude::{Bundle, Commands, Component, Entity, Event},
21};
22
23use std::future::Future;
24
25use crate::{
26    AsMapOnce, Cancellable, IntoAsyncMapOnce, IntoBlockingMapOnce, Outcome, Promise, ProvideOnce,
27    Sendish, StreamPack, StreamTargetMap, UnusedTarget,
28};
29
30mod detach;
31pub(crate) use detach::*;
32
33mod finished;
34pub(crate) use finished::*;
35
36mod insert;
37pub(crate) use insert::*;
38
39mod internal;
40pub(crate) use internal::*;
41
42mod map;
43pub(crate) use map::*;
44
45mod push;
46pub(crate) use push::*;
47
48mod capture;
49pub(crate) use capture::*;
50
51mod send_event;
52pub(crate) use send_event::*;
53
54mod store;
55pub(crate) use store::*;
56
57mod taken;
58pub(crate) use taken::*;
59
60/// A series of one or more workflow execution sessions. You can use a [`Series`]
61/// to chain the output of one workflow session into the input of another workflow
62/// session, and then eventually receive the final output of the whole series.
63///
64/// You can begin creating a series using [`RequestExt`][crate::RequestExt].
65///
66/// A series like this can only be a linear sequence---it does not support
67/// conditional branching or cycles. If you want a more complex structure than
68/// a linear sequence, you will need to spawn a workflow with the structure
69/// that you want, and then issue a request into that workflow.
70///
71/// A series is a one-time-use sequence of sessions. You will have to reconstruct
72/// the series each time you want to call it.
73///
74/// Note that the entire series of sessions will automatically cancel if you
75/// drop the final promise of the last session in the series. To prevent that
76/// from happening, you can use [`Series::detach`] to lock in the execution of
77/// the series (or a subset of the series) no matter what happens downstream.
78pub struct Series<'w, 's, 'a, Response, Streams> {
79    pub(crate) source: Entity,
80    pub(crate) target: Entity,
81    pub(crate) commands: &'a mut Commands<'w, 's>,
82    pub(crate) _ignore: std::marker::PhantomData<fn(Response, Streams)>,
83}
84
85impl<'w, 's, 'a, Response, Streams> Series<'w, 's, 'a, Response, Streams>
86where
87    Response: 'static + Send + Sync,
88    Streams: StreamPack,
89{
90    /// Keep executing the series up to here even if a downstream dependent gets
91    /// dropped. If you continue building the series from this point, then the
92    /// later sessions will not be affected by this use of `.detach()` and may
93    /// be dropped if its downstream dependent gets dropped.
94    ///
95    /// Dependency gets dropped in the following situations:
96    ///
97    /// | Operation                                                 | Drop condition                                        |
98    /// |-----------------------------------------------------------|-------------------------------------------------------|
99    /// | [`Self::capture`] <br> [`Self::outcome`]                  | The [`Outcome`] is dropped.                           |
100    /// | [`Self::store`] <br> [`Self::push`] <br> [`Self::insert`] | The target entity of the operation is despawned.      |
101    /// | [`Self::detach`] <br> [`Self::send_event`]                | This will never be dropped                            |
102    /// | Using none of the above                                   | The series will immediately be dropped during a flush, so it will never be run at all. <br> This will also push an error into [`UnhandledErrors`](crate::UnhandledErrors). |
103    pub fn detach(self) -> Series<'w, 's, 'a, Response, Streams> {
104        self.commands.queue(Detach {
105            target: self.target,
106        });
107        self
108    }
109
110    /// This is the session ID of the last request so far in the series.
111    pub fn session_id(&self) -> Entity {
112        self.source
113    }
114
115    /// Capture the outcome of the series and all the stream data of the final
116    /// provider.
117    #[must_use]
118    pub fn capture(self) -> Capture<Response, Streams> {
119        let target = self.target;
120        let mut map = StreamTargetMap::default();
121        let stream_receivers = Streams::take_streams(target, &mut map, self.commands);
122        self.commands.entity(self.source).insert(map);
123
124        let (outcome, capture_outcome) = Outcome::new();
125        self.send_outcome(capture_outcome);
126
127        Capture {
128            outcome,
129            streams: stream_receivers,
130            session: target,
131        }
132    }
133
134    /// Take the data that comes out of the request, including both the response
135    /// and the streams.
136    #[must_use]
137    #[deprecated(since = "0.0.6", note = "Use .capture() instead")]
138    pub fn take(self) -> Recipient<Response, Streams> {
139        let (response_sender, response_promise) = Promise::<Response>::new();
140        self.commands.queue(AddExecution::new(
141            Some(self.source),
142            self.target,
143            TakenResponse::<Response>::new(response_sender),
144        ));
145        let mut map = StreamTargetMap::default();
146        let stream_receivers = Streams::take_streams(self.target, &mut map, self.commands);
147        self.commands.entity(self.source).insert(map);
148
149        Recipient {
150            response: response_promise,
151            streams: stream_receivers,
152            session: self.target,
153        }
154    }
155
156    /// Capture only the outcome (response) of the series.
157    #[must_use]
158    pub fn outcome(self) -> Outcome<Response> {
159        let (outcome, capture) = Outcome::new();
160        self.send_outcome(capture);
161        outcome
162    }
163
164    /// Take only the response data that comes out of the request.
165    #[deprecated(since = "0.0.6", note = "Use .outcome() instead")]
166    pub fn take_response(self) -> Promise<Response> {
167        let (response_sender, response_promise) = Promise::<Response>::new();
168        self.commands.queue(AddExecution::new(
169            Some(self.source),
170            self.target,
171            TakenResponse::<Response>::new(response_sender),
172        ));
173        response_promise
174    }
175
176    /// Pass the outcome of the request to another provider.
177    #[must_use]
178    pub fn then<P: ProvideOnce<Request = Response>>(
179        self,
180        provider: P,
181    ) -> Series<'w, 's, 'a, P::Response, P::Streams> {
182        let source = self.target;
183        let target = self
184            .commands
185            .spawn((Detached::default(), UnusedTarget, SeriesMarker))
186            .id();
187
188        // We should automatically delete the previous step in the chain once
189        // this one is finished.
190        self.commands
191            .entity(source)
192            .insert((Cancellable::new(cancel_execution), SeriesMarker))
193            .remove::<UnusedTarget>()
194            .insert(ChildOf(target));
195        provider.connect(None, source, target, self.commands);
196        Series {
197            source,
198            target,
199            commands: self.commands,
200            _ignore: Default::default(),
201        }
202    }
203
204    /// Apply a one-time callback whose input is the Response of the current
205    /// target. The output of the map will become the Response of the returned
206    /// target.
207    ///
208    /// This takes in a regular blocking function, which means all systems will
209    /// be blocked from running while the function gets executed.
210    #[must_use]
211    pub fn map_block<U>(
212        self,
213        f: impl FnOnce(Response) -> U + 'static + Send + Sync,
214    ) -> Series<'w, 's, 'a, U, ()>
215    where
216        U: 'static + Send + Sync,
217    {
218        self.then(f.into_blocking_map_once())
219    }
220
221    /// Apply a one-time callback whose output is a [`Future`] that will be run
222    /// in the [`AsyncComputeTaskPool`][1] (unless the `single_threaded_async`
223    /// feature is active). The output of the [`Future`] will be the Response of
224    /// the returned Series.
225    ///
226    /// [1]: bevy_tasks::AsyncComputeTaskPool
227    #[must_use]
228    pub fn map_async<Task>(
229        self,
230        f: impl FnOnce(Response) -> Task + 'static + Send + Sync,
231    ) -> Series<'w, 's, 'a, Task::Output, ()>
232    where
233        Task: Future + 'static + Sendish,
234        Task::Output: 'static + Send + Sync,
235    {
236        self.then(f.into_async_map_once())
237    }
238
239    /// Apply a one-time map that implements one of
240    /// - [`FnOnce(BlockingMap<Request, Streams>) -> Response`](crate::BlockingMap)
241    /// - [`FnOnce(AsyncMap<Request, Streams>) -> impl Future<Response>`](crate::AsyncMap)
242    ///
243    /// If you do not care about providing streams then you can use
244    /// [`Self::map_block`] or [`Self::map_async`] instead.
245    pub fn map<M, F: AsMapOnce<M>>(
246        self,
247        f: F,
248    ) -> Series<
249        'w,
250        's,
251        'a,
252        <F::MapType as ProvideOnce>::Response,
253        <F::MapType as ProvideOnce>::Streams,
254    >
255    where
256        F::MapType: ProvideOnce<Request = Response>,
257        <F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
258        <F::MapType as ProvideOnce>::Streams: StreamPack,
259    {
260        self.then(f.as_map_once())
261    }
262
263    /// Store the response in a [`Storage`] component in the specified entity.
264    ///
265    /// Each stream will be collected into [`Collection`] components in the
266    /// specified entity, one for each stream type. To store the streams in a
267    /// different entity, call [`Self::collect_streams`] before this.
268    ///
269    /// If the entity despawns then the request gets cancelled unless you used
270    /// [`Self::detach`] before calling this.
271    pub fn store(self, target: Entity) {
272        self.commands.queue(AddExecution::new(
273            Some(self.source),
274            self.target,
275            Store::<Response>::new(target),
276        ));
277
278        let mut map = StreamTargetMap::default();
279        let stream_targets = Streams::collect_streams(self.source, target, &mut map, self.commands);
280        self.commands
281            .entity(self.source)
282            .insert((stream_targets, map));
283    }
284
285    /// Collect the stream data into [`Collection<T>`] components in the
286    /// specified target, one collection for each stream data type. You must
287    /// still decide what to do with the final response data.
288    #[must_use]
289    pub fn collect_streams(self, target: Entity) -> Series<'w, 's, 'a, Response, ()> {
290        let mut map = StreamTargetMap::default();
291        let stream_targets = Streams::collect_streams(self.source, target, &mut map, self.commands);
292        self.commands
293            .entity(self.source)
294            .insert((stream_targets, map));
295
296        Series {
297            source: self.source,
298            target: self.target,
299            commands: self.commands,
300            _ignore: Default::default(),
301        }
302    }
303
304    /// Push the response to the back of a [`Collection<T>`] component in an
305    /// entity.
306    ///
307    /// Similar to [`Self::store`] this will also collect streams into this
308    /// entity.
309    ///
310    /// If the entity despawns then the request gets cancelled unless you used
311    /// [`Self::detach`] before calling this.
312    pub fn push(self, target: Entity) {
313        self.commands.queue(AddExecution::new(
314            Some(self.source),
315            self.target,
316            Push::<Response>::new(target, false),
317        ));
318
319        let mut map = StreamTargetMap::default();
320        let stream_targets = Streams::collect_streams(self.source, target, &mut map, self.commands);
321        self.commands
322            .entity(self.source)
323            .insert((stream_targets, map));
324    }
325
326    /// Used internally to implement various ways of capturing an outcome.
327    pub(crate) fn send_outcome(self, capture: CaptureOutcome<Response>) {
328        self.commands
329            .queue(AddExecution::new(Some(self.source), self.target, capture));
330    }
331
332    // TODO(@mxgrey): Consider offering ways for users to respond to cancellations.
333    // For example, offer an on_cancel method that lets users provide a callback
334    // to be triggered when a cancellation happens. Or focus on the terminal end
335    // of a series, like offer store_or_else(~), push_or_else(~) etc which accept
336    // a callback that will be triggered after a cancellation.
337}
338
339impl<'w, 's, 'a, Response, Streams> Series<'w, 's, 'a, Response, Streams>
340where
341    Response: Bundle,
342{
343    /// Insert the response as a bundle in the specified entity. Stream data
344    /// will be dropped unless you use [`Self::collect_streams`] before this.
345    ///
346    /// If the entity despawns then the request gets cancelled unless you used
347    /// [`Self::detach`] before calling this.
348    ///
349    /// If the response is not a bundle then you can store it in an entity using
350    /// [`Self::store`] or [`Self::push`]. Alternatively you can transform it
351    /// into a bundle using [`Self::map_block`] or [`Self::map_async`].
352    pub fn insert(self, target: Entity) {
353        self.commands.queue(AddExecution::new(
354            Some(self.source),
355            self.target,
356            Insert::<Response>::new(target),
357        ));
358    }
359}
360
361impl<'w, 's, 'a, Response, Streams> Series<'w, 's, 'a, Response, Streams>
362where
363    Response: Event,
364{
365    /// Send the response out as an event once it is ready. Stream data will be
366    /// dropped unless you use [`Self::collect_streams`] before this.
367    ///
368    /// Using this will also effectively [detach](Self::detach) the series.
369    pub fn send_event(self) {
370        self.commands.queue(AddExecution::new(
371            Some(self.source),
372            self.target,
373            SendEvent::<Response>::new(),
374        ));
375    }
376}
377
378/// Contains the final response and streams produced at the end of a series.
379pub struct Recipient<Response, Streams: StreamPack> {
380    pub response: Promise<Response>,
381    pub streams: Streams::StreamReceivers,
382    /// The root session ID of the entire series. Every session ID related to
383    /// this series is a descendent of this entity.
384    ///
385    /// This may be counter-intuitive because this is the last session in the
386    /// series but structurally we have made it the parent of all the sessions
387    /// that will be executed before it. This is done so that despawning behavior
388    /// has a more logical relationship with the dependencies between the sessions.
389    pub session: Entity,
390}
391
392pub struct Capture<Response, Streams: StreamPack> {
393    pub outcome: Outcome<Response>,
394    pub streams: Streams::StreamReceivers,
395    pub session: Entity,
396}
397
398/// Used to store a response of a series as a component of an entity.
399#[derive(Component)]
400pub struct Storage<T> {
401    pub data: T,
402    pub session: Entity,
403}
404
405/// Used to collect responses from multiple series chains into a container
406/// attached to an entity.
407//
408// TODO(@mxgrey): Consider allowing the user to choose the container type.
409#[derive(Component)]
410pub struct Collection<T> {
411    /// The items that have been collected.
412    pub items: Vec<Storage<T>>,
413}
414
415impl<T> Default for Collection<T> {
416    fn default() -> Self {
417        Self {
418            items: Default::default(),
419        }
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use crate::{ContinuousQueueView, prelude::*, testing::*};
426    use smallvec::SmallVec;
427    use std::{
428        sync::{Arc, Mutex},
429        time::{Duration, Instant},
430    };
431    use tokio::sync::mpsc::unbounded_channel;
432
433    #[test]
434    fn test_dropped_chain() {
435        let mut context = TestingContext::minimal_plugins();
436
437        let (detached_sender, mut detached_receiver) = unbounded_channel();
438        let (attached_sender, mut attached_receiver) = unbounded_channel();
439
440        context.command(|commands| {
441            let _ = commands
442                .request("hello".to_owned(), to_uppercase.into_blocking_map())
443                .map_block(move |value| {
444                    detached_sender.send(value.clone()).unwrap();
445                    value
446                })
447                .detach()
448                .map_block(to_lowercase)
449                .map_block(move |value| {
450                    attached_sender.send(value.clone()).unwrap();
451                    value
452                })
453                .map_block(to_uppercase);
454        });
455
456        context.run(1);
457        assert_eq!(detached_receiver.try_recv().unwrap(), "HELLO");
458        assert!(attached_receiver.try_recv().is_err());
459        assert!(
460            context
461                .get_unhandled_errors()
462                .is_some_and(|e| e.unused_targets.len() == 1)
463        );
464    }
465
466    #[test]
467    fn test_blocking_map() {
468        let mut context = TestingContext::minimal_plugins();
469
470        let r = context.resolve_request(String::from("hello"), to_uppercase.into_blocking_map());
471        assert_eq!(r, "HELLO");
472
473        let r =
474            context.resolve_request(String::from("hello"), to_uppercase.into_blocking_map_once());
475        assert_eq!(r, "HELLO");
476
477        let mut outcome = context.command(|commands| {
478            commands
479                .provide(String::from("hello"))
480                .map_block(to_uppercase)
481                .outcome()
482        });
483
484        context.run_while_pending(&mut outcome);
485        context.assert_no_errors();
486        assert_eq!(outcome.try_recv().unwrap().unwrap(), "HELLO");
487
488        let mut outcome = context.command(|commands| {
489            commands
490                .provide(String::from("hello"))
491                .map_block(|request| request.to_uppercase())
492                .outcome()
493        });
494
495        context.run_while_pending(&mut outcome);
496        context.assert_no_errors();
497        assert_eq!(outcome.try_recv().unwrap().unwrap(), "HELLO");
498    }
499
500    #[test]
501    fn test_async_map() {
502        let mut context = TestingContext::minimal_plugins();
503
504        let request = WaitRequest {
505            duration: Duration::from_secs_f64(0.001),
506            value: "hello".to_owned(),
507        };
508
509        let conditions = FlushConditions::new().with_timeout(Duration::from_secs_f64(5.0));
510
511        let r = context
512            .try_resolve_request(request.clone(), wait.into_async_map(), conditions.clone())
513            .unwrap();
514        assert_eq!(r, "hello");
515
516        let r = context
517            .try_resolve_request(
518                request.clone(),
519                wait.into_async_map_once(),
520                conditions.clone(),
521            )
522            .unwrap();
523        assert_eq!(r, "hello");
524
525        let mut outcome =
526            context.command(|commands| commands.provide(request.clone()).map_async(wait).outcome());
527
528        assert!(context.run_with_conditions(&mut outcome, conditions.clone()));
529        context.assert_no_errors();
530        assert_eq!(outcome.try_recv().unwrap().unwrap(), "hello");
531
532        let mut outcome = context.command(|commands| {
533            commands
534                .provide(request.clone())
535                .map_async(|request| {
536                    async move {
537                        let t = Instant::now();
538                        while t.elapsed() < request.duration {
539                            // Busy wait
540                        }
541                        request.value
542                    }
543                })
544                .outcome()
545        });
546
547        assert!(context.run_with_conditions(&mut outcome, conditions.clone()));
548        context.assert_no_errors();
549        assert_eq!(outcome.try_recv().unwrap().unwrap(), "hello");
550    }
551
552    #[test]
553    fn test_detach() {
554        // This is a regression test that covers a bug which existed due to
555        // an incorrect handling of detached series when giving input.
556        let mut context = TestingContext::minimal_plugins();
557        let service = context.spawn_delayed_map(Duration::from_millis(1), |n| *n + 1);
558
559        context.command(|commands| {
560            commands.provide(0).then(service).detach();
561        });
562
563        let (sender, mut receiver) = tokio::sync::oneshot::channel();
564        context.run_with_conditions(&mut receiver, Duration::from_millis(5));
565        assert!(
566            context.no_unhandled_errors(),
567            "Unhandled errors: {:#?}",
568            context.get_unhandled_errors(),
569        );
570
571        // The promise and sender only exist because run_with_conditions requires
572        // them. Moreover we need to make sure that sender does not get dropped
573        // prematurely by the compiler, otherwise the promise will have the run
574        // exit prematurely. Therefore we call .send(()) here to guarantee the
575        // compiler knows to keep it alive until the running is finished.
576        //
577        // We have observed that using `let (_, mut promise) = ` will cause the
578        // sender to drop prematurely, so we don't want to risk that there are
579        // other cases where that may happen. It is important for the run to
580        // last multiple cycles.
581        let _ = sender.send(());
582    }
583
584    #[derive(Clone, Debug, PartialEq, Eq, Hash, DeliveryLabel)]
585    struct UnitLabel;
586
587    #[derive(Clone, Debug, PartialEq, Eq, Hash, DeliveryLabel)]
588    struct StatefulLabel(u64);
589
590    #[test]
591    fn test_delivery_instructions() {
592        let mut context = TestingContext::minimal_plugins();
593        let service = context.spawn_delayed_map_with_viewer(
594            Duration::from_secs_f32(0.01),
595            |counter: &Arc<Mutex<u64>>| {
596                *counter.lock().unwrap() += 1;
597            },
598            |view: &ContinuousQueueView<_, ()>| {
599                assert!(view.len() <= 1);
600            },
601        );
602
603        verify_delivery_instruction_matrix(service.optional_stream_cast(), &mut context);
604
605        let service = context.spawn_async_delayed_map(
606            Duration::from_secs_f32(0.01),
607            |counter: Arc<Mutex<u64>>| {
608                *counter.lock().unwrap() += 1;
609            },
610        );
611
612        verify_delivery_instruction_matrix(service, &mut context);
613
614        let async_service = service;
615        let service = context.spawn_io_workflow(|scope, builder| {
616            builder
617                .chain(scope.start)
618                .then(async_service)
619                .connect(scope.terminate);
620        });
621
622        verify_delivery_instruction_matrix(service, &mut context);
623
624        // We don't test blocking services because blocking services are always
625        // serial no matter what, so delivery instructions have no effect for them.
626    }
627
628    fn verify_delivery_instruction_matrix(
629        service: Service<Arc<Mutex<u64>>, ()>,
630        context: &mut TestingContext,
631    ) {
632        // Test for a unit struct
633        verify_preemption_matrix(
634            service.instruct(UnitLabel),
635            service.instruct(UnitLabel.preempt()),
636            context,
637        );
638
639        // Test for a stateful struct
640        verify_preemption_matrix(
641            service.instruct(StatefulLabel(5)),
642            service.instruct(StatefulLabel(5).preempt()),
643            context,
644        );
645
646        // Test for a unit struct
647        verify_queuing_matrix(service.instruct(UnitLabel), context);
648
649        // Test for a stateful struct
650        verify_queuing_matrix(service.instruct(StatefulLabel(7)), context);
651
652        // Test for a unit struct
653        verify_ensured_matrix(service, UnitLabel, context);
654
655        // Test for a stateful struct
656        verify_ensured_matrix(service, StatefulLabel(2), context);
657    }
658
659    fn verify_preemption_matrix(
660        queuing_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
661        preempting_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
662        context: &mut TestingContext,
663    ) {
664        // Test by queuing up a bunch of requests before preempting them all at once.
665        verify_preemption(
666            1,
667            queuing_service.clone(),
668            preempting_service.clone(),
669            context,
670        );
671        verify_preemption(
672            2,
673            queuing_service.clone(),
674            preempting_service.clone(),
675            context,
676        );
677        verify_preemption(
678            3,
679            queuing_service.clone(),
680            preempting_service.clone(),
681            context,
682        );
683        verify_preemption(
684            4,
685            queuing_service.clone(),
686            preempting_service.clone(),
687            context,
688        );
689
690        // Test by repeatedly preempting each request with the next.
691        verify_preemption(
692            1,
693            preempting_service.clone(),
694            preempting_service.clone(),
695            context,
696        );
697        verify_preemption(
698            2,
699            preempting_service.clone(),
700            preempting_service.clone(),
701            context,
702        );
703        verify_preemption(
704            3,
705            preempting_service.clone(),
706            preempting_service.clone(),
707            context,
708        );
709        verify_preemption(
710            4,
711            preempting_service.clone(),
712            preempting_service.clone(),
713            context,
714        );
715    }
716
717    fn verify_preemption(
718        preemptions: usize,
719        preempted_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
720        preempting_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
721        context: &mut TestingContext,
722    ) {
723        let counter = Arc::new(Mutex::new(0_u64));
724        let mut preempted: SmallVec<[Outcome<()>; 16]> = SmallVec::new();
725        for _ in 0..preemptions {
726            let outcome = context.command(|commands| {
727                commands
728                    .request(Arc::clone(&counter), preempted_service.clone())
729                    .outcome()
730            });
731            preempted.push(outcome);
732        }
733
734        let mut final_outcome = context.command(|commands| {
735            commands
736                .request(Arc::clone(&counter), preempting_service)
737                .outcome()
738        });
739
740        for outcome in &mut preempted {
741            context.run_with_conditions(outcome, Duration::from_secs(2));
742            assert!(outcome.try_recv().unwrap().is_err());
743        }
744
745        context.run_with_conditions(&mut final_outcome, Duration::from_secs(2));
746        assert!(final_outcome.try_recv().unwrap().is_ok());
747        assert_eq!(*counter.lock().unwrap(), 1);
748        context.assert_no_errors();
749    }
750
751    fn verify_queuing_matrix(
752        queuing_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
753        context: &mut TestingContext,
754    ) {
755        // Test by queuing up a bunch of requests and making sure they all get
756        // delivered.
757        verify_queuing(2, queuing_service.clone(), context);
758        verify_queuing(3, queuing_service.clone(), context);
759        verify_queuing(4, queuing_service.clone(), context);
760        verify_queuing(5, queuing_service.clone(), context);
761    }
762
763    fn verify_queuing(
764        queue_size: usize,
765        queuing_service: ServiceInstructions<Arc<Mutex<u64>>, ()>,
766        context: &mut TestingContext,
767    ) {
768        let counter = Arc::new(Mutex::new(0_u64));
769        let mut queued: SmallVec<[Outcome<()>; 16]> = SmallVec::new();
770        for _ in 0..queue_size {
771            let outcome = context.command(|commands| {
772                commands
773                    .request(Arc::clone(&counter), queuing_service.clone())
774                    .outcome()
775            });
776            queued.push(outcome);
777        }
778
779        for outcome in &mut queued {
780            context.run_with_conditions(outcome, Duration::from_secs(2));
781            assert!(outcome.try_recv().unwrap().is_ok());
782        }
783
784        assert_eq!(*counter.lock().unwrap(), queue_size as u64);
785        context.assert_no_errors();
786    }
787
788    fn verify_ensured_matrix<L: DeliveryLabel + Clone>(
789        service: Service<Arc<Mutex<u64>>, ()>,
790        label: L,
791        context: &mut TestingContext,
792    ) {
793        // Test by queuing up a mix of ensured and unensured requests, and then
794        // sending in one that preempts them all. The ensured requests should
795        // remain in the queue and execute despite the preempter. The unensured
796        // requests should all be cancelled.
797        verify_ensured([false, true, false, true], service, label.clone(), context);
798        verify_ensured([true, false, false, false], service, label.clone(), context);
799        verify_ensured([true, true, false, false], service, label.clone(), context);
800        verify_ensured([false, false, true, true], service, label.clone(), context);
801        verify_ensured([true, false, false, true], service, label.clone(), context);
802        verify_ensured(
803            [false, false, false, false],
804            service,
805            label.clone(),
806            context,
807        );
808        verify_ensured([true, true, true, true], service, label.clone(), context);
809    }
810
811    fn verify_ensured<L: DeliveryLabel + Clone>(
812        queued: impl IntoIterator<Item = bool>,
813        service: Service<Arc<Mutex<u64>>, ()>,
814        label: L,
815        context: &mut TestingContext,
816    ) {
817        let counter = Arc::new(Mutex::new(0_u64));
818        let mut queued_outcomes: SmallVec<[(Outcome<()>, bool); 16]> = SmallVec::new();
819        // This counter starts out at 1 to account for the preempting request.
820        let mut expected_count = 1;
821        for ensured in queued {
822            let srv = if ensured {
823                expected_count += 1;
824                service.instruct(label.clone().ensure())
825            } else {
826                service.instruct(label.clone())
827            };
828
829            let outcome =
830                context.command(|commands| commands.request(Arc::clone(&counter), srv).outcome());
831
832            queued_outcomes.push((outcome, ensured));
833        }
834
835        let mut preempter = context.command(|commands| {
836            commands
837                .request(
838                    Arc::clone(&counter),
839                    service.instruct(label.clone().preempt()),
840                )
841                .outcome()
842        });
843
844        for (outcome, ensured) in &mut queued_outcomes {
845            context.run_with_conditions(outcome, Duration::from_secs(2));
846            if *ensured {
847                assert!(outcome.try_recv().unwrap().is_ok());
848            } else {
849                assert!(outcome.try_recv().unwrap().is_err());
850            }
851        }
852
853        context.run_with_conditions(&mut preempter, Duration::from_secs(2));
854        assert!(preempter.is_available());
855        assert_eq!(*counter.lock().unwrap(), expected_count);
856        context.assert_no_errors();
857    }
858}