crossflow/
builder.rs

1/*
2 * Copyright (C) 2023 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use bevy_ecs::prelude::{Commands, Entity};
19
20use std::future::Future;
21
22use smallvec::SmallVec;
23
24use crate::{
25    Accessible, Accessing, Accessor, AddOperation, AsMap, Buffer, BufferKeys, BufferLocation,
26    BufferMap, BufferSettings, Bufferable, Buffering, Chain, Collect, ForkClone, ForkCloneOutput,
27    ForkOptionOutput, ForkResultOutput, ForkTargetStorage, Gate, GateRequest, IncompatibleLayout,
28    Injection, InputSlot, IntoAsyncMap, IntoBlockingMap, Joinable, Joined, Node, OperateBuffer,
29    OperateCancel, OperateDynamicGate, OperateQuietCancel, OperateScope, OperateSplit,
30    OperateStaticGate, Output, Provider, RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints,
31    ScopeSettings, ScopeSettingsStorage, Sendish, ServiceInstructions, SplitOutputs, Splittable,
32    StreamPack, StreamTargetMap, StreamsOfMap, Trim, TrimBranch, UnusedTarget, Unzippable,
33    make_option_branching, make_result_branching,
34};
35
36pub(crate) mod connect;
37pub(crate) use connect::*;
38
39/// Device used for building a workflow. Simply pass a mutable borrow of this
40/// into any functions which ask for it.
41///
42/// Note that each scope has its own [`Builder`], and a panic will occur if a
43/// [`Builder`] gets used in the wrong scope. As of right now there is no known
44/// way to trick the compiler into using a [`Builder`] in the wrong scope, but
45/// please open an issue with a minimal reproducible example if you find a way
46/// to make it panic.
47pub struct Builder<'w, 's, 'a> {
48    pub(crate) context: BuilderScopeContext,
49    pub(crate) commands: &'a mut Commands<'w, 's>,
50}
51
52#[derive(Clone, Copy, Debug)]
53pub struct BuilderScopeContext {
54    /// The scope that this builder is meant to help build
55    pub(crate) scope: Entity,
56    /// The target for cancellation workflows
57    pub(crate) finish_scope_cancel: Entity,
58}
59
60impl<'w, 's, 'a> Builder<'w, 's, 'a> {
61    /// Begin building a chain of operations off of an output.
62    pub fn chain<'b, Response: 'static + Send + Sync>(
63        &'b mut self,
64        output: Output<Response>,
65    ) -> Chain<'w, 's, 'a, 'b, Response> {
66        output.chain(self)
67    }
68
69    /// Create a node for a provider. This will give access to an input slot, an
70    /// output slots, and a pack of stream outputs which can all be connected to
71    /// other nodes.
72    pub fn create_node<P: Provider>(
73        &mut self,
74        provider: P,
75    ) -> Node<P::Request, P::Response, P::Streams>
76    where
77        P::Request: 'static + Send + Sync,
78        P::Response: 'static + Send + Sync,
79        P::Streams: StreamPack,
80    {
81        let source = self.commands.spawn(()).id();
82        let target = self.commands.spawn(UnusedTarget).id();
83        provider.connect(Some(self.scope()), source, target, self.commands);
84
85        let mut map = StreamTargetMap::default();
86        let streams = <P::Streams as StreamPack>::spawn_node_streams(source, &mut map, self);
87        self.commands.entity(source).insert(map);
88        Node {
89            input: InputSlot::new(self.scope(), source),
90            output: Output::new(self.scope(), target),
91            streams,
92        }
93    }
94
95    /// Create a [node](Node) that provides a [blocking map](crate::BlockingMap).
96    pub fn create_map_block<T, U>(
97        &mut self,
98        f: impl FnMut(T) -> U + 'static + Send + Sync,
99    ) -> Node<T, U, ()>
100    where
101        T: 'static + Send + Sync,
102        U: 'static + Send + Sync,
103    {
104        self.create_node(f.into_blocking_map())
105    }
106
107    /// Create a [node](Node) that provides an [async map](crate::AsyncMap).
108    pub fn create_map_async<T, Task>(
109        &mut self,
110        f: impl FnMut(T) -> Task + 'static + Send + Sync,
111    ) -> Node<T, Task::Output, ()>
112    where
113        T: 'static + Send + Sync,
114        Task: Future + 'static + Sendish,
115        Task::Output: 'static + Send + Sync,
116    {
117        self.create_node(f.into_async_map())
118    }
119
120    /// Create a map (either a [blocking map][1] or an
121    /// [async map][2]) by providing a function that takes [`BlockingMap`][1] or
122    /// [AsyncMap][2] as its only argument.
123    ///
124    /// [1]: crate::BlockingMap
125    /// [2]: crate::AsyncMap
126    pub fn create_map<M, F: AsMap<M>>(
127        &mut self,
128        f: F,
129    ) -> Node<RequestOfMap<M, F>, ResponseOfMap<M, F>, StreamsOfMap<M, F>>
130    where
131        F::MapType: Provider,
132        RequestOfMap<M, F>: 'static + Send + Sync,
133        ResponseOfMap<M, F>: 'static + Send + Sync,
134        StreamsOfMap<M, F>: StreamPack,
135    {
136        self.create_node(f.as_map())
137    }
138
139    /// Create a node that takes in a `(request, service)` at runtime and then
140    /// passes the `request` into the `service`. All streams will be forwarded
141    /// and the response of the service will be the node's output.
142    ///
143    /// This allows services to be injected into workflows as input, or for a
144    /// service to be chosen during runtime.
145    pub fn create_injection_node<Request, Response, Streams>(
146        &mut self,
147    ) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams>
148    where
149        Request: 'static + Send + Sync,
150        Response: 'static + Send + Sync + Unpin,
151        Streams: StreamPack,
152    {
153        let source = self.commands.spawn(()).id();
154        self.create_injection_impl::<Request, Response, Streams>(source)
155    }
156
157    /// Connect the output of one into the input slot of another node.
158    pub fn connect<T: 'static + Send + Sync>(&mut self, output: Output<T>, input: InputSlot<T>) {
159        assert_eq!(output.scope(), input.scope());
160        self.commands.queue(Connect {
161            original_target: output.id(),
162            new_target: input.id(),
163        });
164    }
165
166    /// Create a [`Buffer`] which can be used to store and pull data within
167    /// a scope. This is often used along with joining to synchronize multiple
168    /// branches.
169    pub fn create_buffer<T: 'static + Send + Sync>(
170        &mut self,
171        settings: BufferSettings,
172    ) -> Buffer<T> {
173        let source = self.commands.spawn(()).id();
174        self.commands.queue(AddOperation::new(
175            Some(self.scope()),
176            source,
177            OperateBuffer::<T>::new(settings),
178        ));
179
180        Buffer {
181            location: BufferLocation {
182                scope: self.scope(),
183                source,
184            },
185            _ignore: Default::default(),
186        }
187    }
188
189    /// Create an isolated scope within the workflow. This can be useful for
190    /// racing multiple branches, creating an uninterruptible segment within
191    /// your workflow, or being able to run the same multiple instances of the
192    /// same sub-workflow in parallel without them interfering with each other.
193    ///
194    /// A value can be sent into the scope by connecting an [`Output`] of a node
195    /// in the parent scope to the [`InputSlot`] of the node which gets returned
196    /// by this function. Each time a value is sent into the scope, it will run
197    /// through the workflow of the scope with a unique session ID. Even if
198    /// multiple values are sent in from the same session, they will each be
199    /// assigned their own unique session ID while inside of this scope.
200    pub fn create_scope<Request, Response, Streams, Settings>(
201        &mut self,
202        build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
203    ) -> Node<Request, Response, Streams>
204    where
205        Request: 'static + Send + Sync,
206        Response: 'static + Send + Sync,
207        Streams: StreamPack,
208        Settings: Into<ScopeSettings>,
209    {
210        let scope_id = self.commands.spawn(()).id();
211        let exit_scope = self.commands.spawn(UnusedTarget).id();
212        self.create_scope_impl(scope_id, exit_scope, build)
213    }
214
215    /// Alternative to [`Self::create_scope`] for pure input/output scopes (i.e.
216    /// there are no output streams). Using this signature should allow the
217    /// compiler to infer all the generic arguments when there are no streams.
218    pub fn create_io_scope<Request, Response, Settings>(
219        &mut self,
220        build: impl FnOnce(Scope<Request, Response, ()>, &mut Builder) -> Settings,
221    ) -> Node<Request, Response, ()>
222    where
223        Request: 'static + Send + Sync,
224        Response: 'static + Send + Sync,
225        Settings: Into<ScopeSettings>,
226    {
227        self.create_scope::<Request, Response, (), Settings>(build)
228    }
229
230    /// Create an operation that clones its inputs and sends them off to any
231    /// number of targets.
232    pub fn create_fork_clone<T>(&mut self) -> (InputSlot<T>, ForkCloneOutput<T>)
233    where
234        T: Clone + 'static + Send + Sync,
235    {
236        let source = self.commands.spawn(()).id();
237        self.commands.queue(AddOperation::new(
238            Some(self.scope()),
239            source,
240            ForkClone::<T>::new(ForkTargetStorage::new()),
241        ));
242        (
243            InputSlot::new(self.scope(), source),
244            ForkCloneOutput::new(self.scope(), source),
245        )
246    }
247
248    /// Create an operation that unzips its inputs and sends each element off to
249    /// a different output.
250    pub fn create_unzip<T>(&mut self) -> (InputSlot<T>, T::Unzipped)
251    where
252        T: Unzippable + 'static + Send + Sync,
253    {
254        let source = self.commands.spawn(()).id();
255        (
256            InputSlot::new(self.scope(), source),
257            T::unzip_output(Output::<T>::new(self.scope(), source), self),
258        )
259    }
260
261    /// Create an operation that creates a fork for a [`Result`] input. The value
262    /// inside the [`Result`] will be unpacked and sent down a different branch
263    /// depending on whether it was in the [`Ok`] or [`Err`] variant.
264    pub fn create_fork_result<T, E>(&mut self) -> (InputSlot<Result<T, E>>, ForkResultOutput<T, E>)
265    where
266        T: 'static + Send + Sync,
267        E: 'static + Send + Sync,
268    {
269        let source = self.commands.spawn(()).id();
270        let target_ok = self.commands.spawn(UnusedTarget).id();
271        let target_err = self.commands.spawn(UnusedTarget).id();
272
273        self.commands.queue(AddOperation::new(
274            Some(self.scope()),
275            source,
276            make_result_branching::<T, E>(ForkTargetStorage::from_iter([target_ok, target_err])),
277        ));
278
279        (
280            InputSlot::new(self.scope(), source),
281            ForkResultOutput {
282                ok: Output::new(self.scope(), target_ok),
283                err: Output::new(self.scope(), target_err),
284            },
285        )
286    }
287
288    /// Create an operation that creates a fork for an [`Option`] input. The value
289    /// inside the [`Option`] will be unpacked and sent down a different branch
290    /// depending on whether it was in the [`Some`] or [`None`] variant.
291    ///
292    /// For the [`None`] variant a unit `()` output will be sent, also called
293    /// a trigger.
294    pub fn create_fork_option<T>(&mut self) -> (InputSlot<Option<T>>, ForkOptionOutput<T>)
295    where
296        T: 'static + Send + Sync,
297    {
298        let source = self.commands.spawn(()).id();
299        let target_some = self.commands.spawn(UnusedTarget).id();
300        let target_none = self.commands.spawn(UnusedTarget).id();
301
302        self.commands.queue(AddOperation::new(
303            Some(self.scope()),
304            source,
305            make_option_branching::<T>(ForkTargetStorage::from_iter([target_some, target_none])),
306        ));
307
308        (
309            InputSlot::new(self.scope(), source),
310            ForkOptionOutput {
311                some: Output::new(self.scope(), target_some),
312                none: Output::new(self.scope(), target_none),
313            },
314        )
315    }
316
317    /// Alternative way of calling [`Joinable::join`]
318    pub fn join<'b, B: Joinable>(&'b mut self, buffers: B) -> Chain<'w, 's, 'a, 'b, B::Item> {
319        buffers.join(self)
320    }
321
322    /// Try joining a map of buffers into a single value.
323    pub fn try_join<'b, J: Joined>(
324        &'b mut self,
325        buffers: &BufferMap,
326    ) -> Result<Chain<'w, 's, 'a, 'b, J>, IncompatibleLayout> {
327        J::try_join_from(buffers, self)
328    }
329
330    /// Alternative way of calling [`Accessible::listen`].
331    pub fn listen<'b, B: Accessible>(&'b mut self, buffers: B) -> Chain<'w, 's, 'a, 'b, B::Keys> {
332        buffers.listen(self)
333    }
334
335    /// Try listening to a map of buffers.
336    pub fn try_listen<'b, Keys: Accessor>(
337        &'b mut self,
338        buffers: &BufferMap,
339    ) -> Result<Chain<'w, 's, 'a, 'b, Keys>, IncompatibleLayout> {
340        Keys::try_listen_from(buffers, self)
341    }
342
343    /// Create a node that combines its inputs with access to some buffers. You
344    /// must specify one ore more buffers to access. FOr multiple buffers,
345    /// combine then into a tuple or an [`Iterator`]. Tuples of buffers can be
346    /// nested inside each other.
347    ///
348    /// Other [outputs](Output) can also be passed in as buffers. These outputs
349    /// will be transformed into a buffer with default buffer settings.
350    pub fn create_buffer_access<T, B: Bufferable>(
351        &mut self,
352        buffers: B,
353    ) -> Node<T, (T, BufferKeys<B>)>
354    where
355        B::BufferType: Accessing,
356        T: 'static + Send + Sync,
357    {
358        let buffers = buffers.into_buffer(self);
359        buffers.access(self)
360    }
361
362    /// Try to create access to some buffers. Same as [`Self::create_buffer_access`]
363    /// except it will return an error if the buffers in the [`BufferMap`] are not
364    /// compatible with the keys that are being asked for.
365    pub fn try_create_buffer_access<T, Keys: Accessor>(
366        &mut self,
367        buffers: &BufferMap,
368    ) -> Result<Node<T, (T, Keys)>, IncompatibleLayout>
369    where
370        T: 'static + Send + Sync,
371    {
372        Keys::try_buffer_access(buffers, self)
373    }
374
375    /// Collect incoming workflow threads into a container.
376    ///
377    /// If `max` is specified, the collection will always be sent out once it
378    /// reaches that maximum value.
379    ///
380    /// If `min` is greater than 0 then the collection will not be sent out unless
381    /// it is equal to or greater than that value. Note that this means the
382    /// collect operation could force the workflow into cancelling if it cannot
383    /// reach the minimum number of elements. A `min` of 0 means that if an
384    /// upstream thread is disposed and the collect node is no longer reachable
385    /// then it will fire off with an empty collection.
386    ///
387    /// If the `min` limit is satisfied and there are no remaining workflow
388    /// threads that can reach this collect operation, then the collection will
389    /// be sent out with however many elements it happens to have collected.
390    pub fn create_collect<T, const N: usize>(
391        &mut self,
392        min: usize,
393        max: Option<usize>,
394    ) -> Node<T, SmallVec<[T; N]>>
395    where
396        T: 'static + Send + Sync,
397    {
398        if let Some(max) = max {
399            assert!(0 < max);
400            assert!(min <= max);
401        }
402
403        let source = self.commands.spawn(()).id();
404        let target = self.commands.spawn(UnusedTarget).id();
405        self.commands.queue(AddOperation::new(
406            Some(self.scope()),
407            source,
408            Collect::<T, N>::new(target, min, max),
409        ));
410
411        Node {
412            input: InputSlot::new(self.scope(), source),
413            output: Output::new(self.scope(), target),
414            streams: (),
415        }
416    }
417
418    /// Collect all workflow threads that are moving towards this node.
419    pub fn create_collect_all<T, const N: usize>(&mut self) -> Node<T, SmallVec<[T; N]>>
420    where
421        T: 'static + Send + Sync,
422    {
423        self.create_collect(0, None)
424    }
425
426    /// Collect an exact number of threads that are moving towards this node.
427    pub fn create_collect_n<T, const N: usize>(&mut self, n: usize) -> Node<T, SmallVec<[T; N]>>
428    where
429        T: 'static + Send + Sync,
430    {
431        self.create_collect(n, Some(n))
432    }
433
434    /// Create a new split operation in the workflow. The [`InputSlot`] can take
435    /// in values that you want to split, and [`SplitOutputs::build`] will let
436    /// you build connections to the split value.
437    pub fn create_split<T>(&mut self) -> (InputSlot<T>, SplitOutputs<T>)
438    where
439        T: 'static + Send + Sync + Splittable,
440    {
441        let source = self.commands.spawn(()).id();
442        self.commands.queue(AddOperation::new(
443            Some(self.scope()),
444            source,
445            OperateSplit::<T>::default(),
446        ));
447
448        (
449            InputSlot::new(self.scope(), source),
450            SplitOutputs::new(self.scope(), source),
451        )
452    }
453
454    /// Create an input slot that will cancel the current scope when it gets
455    /// triggered. This can be used on types that implement [`ToString`].
456    ///
457    /// If you need to cancel for a type that does not implement [`ToString`]
458    /// then convert it to a trigger `()` and then connect it to
459    /// [`Self::create_quiet_cancel`].
460    pub fn create_cancel<T>(&mut self) -> InputSlot<T>
461    where
462        T: 'static + Send + Sync + ToString,
463    {
464        let source = self.commands.spawn(()).id();
465        self.commands.queue(AddOperation::new(
466            Some(self.scope()),
467            source,
468            OperateCancel::<T>::new(),
469        ));
470
471        InputSlot::new(self.scope(), source)
472    }
473
474    /// Create an input slot that will cancel that current scope when it gets
475    /// triggered.
476    ///
477    /// If you want the cancellation message to include information about the
478    /// input value that triggered it, use [`Self::create_cancel`].
479    pub fn create_quiet_cancel(&mut self) -> InputSlot<()> {
480        let source = self.commands.spawn(()).id();
481        self.commands.queue(AddOperation::new(
482            Some(self.scope()),
483            source,
484            OperateQuietCancel,
485        ));
486
487        InputSlot::new(self.scope(), source)
488    }
489
490    /// This method allows you to define a cleanup workflow that branches off of
491    /// this scope that will activate during the scope's cleanup phase. The
492    /// input to the cleanup workflow will be a key to access to one or more
493    /// buffers from the parent scope.
494    ///
495    /// Each different cleanup workflow that you define using this function will
496    /// be given its own unique session ID when it gets run. You can define any
497    /// number of cleanup workflows.
498    ///
499    /// The parent scope will only finish its cleanup phase after all cleanup
500    /// workflows for the scope have finished, either by terminating or by being
501    /// cancelled themselves.
502    ///
503    /// Cleanup workflows that you define with this function will always be run
504    /// no matter how the scope finished. If you want a cleanup workflow that
505    /// only runs when the scope is cancelled, use [`Self::on_cancel`]. If you
506    /// want a cleanup workflow that only runs when the scope terminates
507    /// successfully, then use [`Self::on_terminate`]. For easier runtime
508    /// flexibility you can also use [`Self::on_cleanup_if`].
509    pub fn on_cleanup<B, Settings>(
510        &mut self,
511        from_buffers: B,
512        build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
513    ) where
514        B: Bufferable,
515        B::BufferType: Accessing,
516        Settings: Into<ScopeSettings>,
517    {
518        from_buffers.into_buffer(self).on_cleanup(self, build);
519    }
520
521    /// Define a cleanup workflow that only gets run if the scope was cancelled.
522    ///
523    /// When the scope gets dropped before it terminates (i.e. the parent scope
524    /// finished while this scope was active, and this scope is interruptible)
525    /// that also counts as cancelled.
526    ///
527    /// A scope which is set to be uninterruptible will still experience a
528    /// cancellation if its terminal node becomes unreachable.
529    ///
530    /// If you want a cleanup workflow that only runs when the scope terminates
531    /// successfully then use [`Self::on_terminate`]. If you want a cleanup
532    /// workflow that always runs when the scope is finished, then use
533    /// [`Self::on_cleanup`].
534    pub fn on_cancel<B, Settings>(
535        &mut self,
536        from_buffers: B,
537        build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
538    ) where
539        B: Bufferable,
540        B::BufferType: Accessing,
541        Settings: Into<ScopeSettings>,
542    {
543        from_buffers.into_buffer(self).on_cancel(self, build);
544    }
545
546    /// Define a cleanup workflow that only gets run if the scope was successfully
547    /// terminated. That means an input reached its terminal node, and now the
548    /// scope is cleaning up.
549    ///
550    /// If you want a cleanup workflow that only runs when the scope is cancelled
551    /// then use [`Self::on_cancel`]. If you want a cleanup workflow that always
552    /// runs when then scope is finished, then use [`Self::on_cleanup`].
553    pub fn on_terminate<B, Settings>(
554        &mut self,
555        from_buffers: B,
556        build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
557    ) where
558        B: Bufferable,
559        B::BufferType: Accessing,
560        Settings: Into<ScopeSettings>,
561    {
562        from_buffers.into_buffer(self).on_terminate(self, build);
563    }
564
565    /// Define a sub-workflow that will be run when this workflow is being cleaned
566    /// up if the conditions are met. A workflow enters the cleanup stage when
567    /// its terminal node is reached or if it gets cancelled.
568    pub fn on_cleanup_if<B, Settings>(
569        &mut self,
570        conditions: CleanupWorkflowConditions,
571        from_buffers: B,
572        build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
573    ) where
574        B: Bufferable,
575        B::BufferType: Accessing,
576        Settings: Into<ScopeSettings>,
577    {
578        from_buffers
579            .into_buffer(self)
580            .on_cleanup_if(self, conditions, build);
581    }
582
583    /// Create a node that trims (cancels) other nodes in the workflow when it
584    /// gets activated. The input into the node will be passed along as output
585    /// after the trimming is confirmed to be completed.
586    pub fn create_trim<T>(&mut self, branches: impl IntoIterator<Item = TrimBranch>) -> Node<T, T>
587    where
588        T: 'static + Send + Sync,
589    {
590        let branches: SmallVec<[_; 16]> = branches.into_iter().collect();
591        for branch in &branches {
592            branch.verify_scope(self.scope());
593        }
594
595        let source = self.commands.spawn(()).id();
596        let target = self.commands.spawn(UnusedTarget).id();
597        self.commands.queue(AddOperation::new(
598            Some(self.scope()),
599            source,
600            Trim::<T>::new(branches, target),
601        ));
602
603        Node {
604            input: InputSlot::new(self.scope(), source),
605            output: Output::new(self.scope(), target),
606            streams: (),
607        }
608    }
609
610    /// Create a gate node that can open and close the gates on one or more
611    /// buffers. Feed a [`GateRequest`] into the node and all the associated
612    /// buffers will be opened or closed based on the action inside the request.
613    ///
614    /// The data inside the request will be passed along as output once the
615    /// gate action is finished.
616    ///
617    /// See [`Gate`] to understand what happens when a gate is opened or closed.
618    pub fn create_gate<T, B>(&mut self, buffers: B) -> Node<GateRequest<T>, T>
619    where
620        B: Bufferable,
621        T: 'static + Send + Sync,
622    {
623        let buffers = buffers.into_buffer(self);
624        buffers.verify_scope(self.scope());
625
626        let source = self.commands.spawn(()).id();
627        let target = self.commands.spawn(UnusedTarget).id();
628        self.commands.queue(AddOperation::new(
629            Some(self.scope()),
630            source,
631            OperateDynamicGate::<T, _>::new(buffers, target),
632        ));
633
634        Node {
635            input: InputSlot::new(self.scope(), source),
636            output: Output::new(self.scope(), target),
637            streams: (),
638        }
639    }
640
641    /// Create a gate node that always opens or always closes the gates on one
642    /// or more buffers.
643    ///
644    /// The data sent into the node will be passed back out as input, unchanged.
645    ///
646    /// See [`Gate`] to understand what happens when a gate is opened or closed.
647    pub fn create_gate_action<T, B>(&mut self, action: Gate, buffers: B) -> Node<T, T>
648    where
649        B: Bufferable,
650        T: 'static + Send + Sync,
651    {
652        let buffers = buffers.into_buffer(self);
653        buffers.verify_scope(self.scope());
654
655        let source = self.commands.spawn(()).id();
656        let target = self.commands.spawn(UnusedTarget).id();
657        self.commands.queue(AddOperation::new(
658            Some(self.scope()),
659            source,
660            OperateStaticGate::<T, _>::new(buffers, target, action),
661        ));
662
663        Node {
664            input: InputSlot::new(self.scope(), source),
665            output: Output::new(self.scope(), target),
666            streams: (),
667        }
668    }
669
670    /// Create a gate node that always opens the gates on one or more buffers.
671    ///
672    /// See [`Gate`] to understand what happens when a gate is opened or closed.
673    pub fn create_gate_open<B, T>(&mut self, buffers: B) -> Node<T, T>
674    where
675        B: Bufferable,
676        T: 'static + Send + Sync,
677    {
678        self.create_gate_action(Gate::Open, buffers)
679    }
680
681    /// Create a gate node that always closes the gates on one or more buffers.
682    ///
683    /// See [`Gate`] to understand what happens when a gate is opened or closed.
684    pub fn create_gate_close<T, B>(&mut self, buffers: B) -> Node<T, T>
685    where
686        B: Bufferable,
687        T: 'static + Send + Sync,
688    {
689        self.create_gate_action(Gate::Closed, buffers)
690    }
691
692    /// Get the scope that this builder is building for.
693    pub fn scope(&self) -> Entity {
694        self.context.scope
695    }
696
697    /// Borrow the commands for the builder
698    pub fn commands(&mut self) -> &mut Commands<'w, 's> {
699        self.commands
700    }
701
702    /// Used internally to create scopes in different ways.
703    pub(crate) fn create_scope_impl<Request, Response, Streams, Settings>(
704        &mut self,
705        scope_id: Entity,
706        exit_scope: Entity,
707        build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
708    ) -> Node<Request, Response, Streams>
709    where
710        Request: 'static + Send + Sync,
711        Response: 'static + Send + Sync,
712        Streams: StreamPack,
713        Settings: Into<ScopeSettings>,
714    {
715        // NOTE(@mxgrey): When changing the implementation of this function,
716        // remember to similarly update the implementation of IncrementalScopeBuilder
717        let ScopeEndpoints {
718            terminal,
719            enter_scope,
720            finish_scope_cancel,
721        } = OperateScope::add::<Request, Response>(
722            Some(self.scope()),
723            scope_id,
724            Some(exit_scope),
725            self.commands,
726        );
727
728        let (stream_in, stream_out) =
729            Streams::spawn_scope_streams(scope_id, self.scope(), self.commands);
730
731        let mut builder = Builder {
732            context: BuilderScopeContext {
733                scope: scope_id,
734                finish_scope_cancel,
735            },
736            commands: self.commands,
737        };
738
739        let scope = Scope {
740            start: Output::new(scope_id, enter_scope),
741            terminate: InputSlot::new(scope_id, terminal),
742            streams: stream_in,
743        };
744
745        let settings = build(scope, &mut builder).into();
746        self.commands
747            .entity(scope_id)
748            .insert(ScopeSettingsStorage(settings));
749
750        Node {
751            input: InputSlot::new(self.scope(), scope_id),
752            output: Output::new(self.scope(), exit_scope),
753            streams: stream_out,
754        }
755    }
756
757    pub(crate) fn create_injection_impl<Request, Response, Streams>(
758        &mut self,
759        source: Entity,
760    ) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams>
761    where
762        Request: 'static + Send + Sync,
763        Response: 'static + Send + Sync,
764        Streams: StreamPack,
765    {
766        let target = self.commands.spawn(UnusedTarget).id();
767
768        let mut map = StreamTargetMap::default();
769        let streams = Streams::spawn_node_streams(source, &mut map, self);
770        self.commands.entity(source).insert(map);
771        self.commands.queue(AddOperation::new(
772            Some(self.scope()),
773            source,
774            Injection::<Request, Response, Streams>::new(target),
775        ));
776
777        Node {
778            input: InputSlot::new(self.scope(), source),
779            output: Output::new(self.scope(), target),
780            streams,
781        }
782    }
783
784    pub fn context(&self) -> BuilderScopeContext {
785        self.context
786    }
787}
788
789/// This struct is used to describe when a cleanup workflow should run. Currently
790/// this is only two simple booleans, but in the future it could potentially
791/// contain systems that make decisions at runtime, so for now we encapsulate
792/// the idea of cleanup conditions in this struct so we enhance the capabilities
793/// later without breaking API.
794#[derive(Clone)]
795pub struct CleanupWorkflowConditions {
796    pub(crate) run_on_terminate: bool,
797    pub(crate) run_on_cancel: bool,
798}
799
800impl CleanupWorkflowConditions {
801    pub fn always_if(run_on_terminate: bool, run_on_cancel: bool) -> Self {
802        CleanupWorkflowConditions {
803            run_on_terminate,
804            run_on_cancel,
805        }
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use crate::{CancellationCause, prelude::*, testing::*};
812    use smallvec::SmallVec;
813    use std::time::Instant;
814
815    #[test]
816    fn test_disconnected_workflow() {
817        let mut context = TestingContext::minimal_plugins();
818
819        let workflow = context.spawn_io_workflow(|_, _| {
820            // Do nothing. Totally empty workflow.
821        });
822        // Test this repeatedly because we cache the result after the first time.
823        check_unreachable(workflow, 1, &mut context);
824        check_unreachable(workflow, 1, &mut context);
825        check_unreachable(workflow, 1, &mut context);
826
827        let workflow = context.spawn_io_workflow(|scope, builder| {
828            let node = builder.create_map_block(|v| v);
829            builder.connect(scope.start, node.input);
830            // Create a tight infinite loop that will never reach the terminal
831            builder.connect(node.output, node.input);
832        });
833        check_unreachable(workflow, 1, &mut context);
834        check_unreachable(workflow, 1, &mut context);
835        check_unreachable(workflow, 1, &mut context);
836
837        let workflow = context.spawn_io_workflow(|scope, builder| {
838            builder.chain(scope.start).map_block(|v| v).fork_clone((
839                |chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
840                |chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
841                |chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
842            ));
843
844            // Create an exit node that never connects to the scope's input.
845            let exit_node = builder.create_map_block(|v| v);
846            builder.connect(exit_node.output, scope.terminate);
847        });
848        check_unreachable(workflow, 1, &mut context);
849        check_unreachable(workflow, 1, &mut context);
850        check_unreachable(workflow, 1, &mut context);
851
852        let workflow = context.spawn_io_workflow(|scope, builder| {
853            let entry_buffer = builder.create_buffer::<()>(BufferSettings::keep_all());
854            builder.chain(scope.start).map_block(|v| v).fork_clone((
855                |chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
856                |chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
857                |chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
858            ));
859
860            // Create an exit buffer with no relationship to the entry buffer
861            // which is the only thing that connects to the terminal node.
862            let exit_buffer = builder.create_buffer::<()>(BufferSettings::keep_all());
863            builder
864                .listen(exit_buffer)
865                .map_block(|_| ())
866                .connect(scope.terminate);
867        });
868        check_unreachable(workflow, 1, &mut context);
869        check_unreachable(workflow, 1, &mut context);
870        check_unreachable(workflow, 1, &mut context);
871    }
872
873    fn check_unreachable(
874        workflow: Service<(), ()>,
875        flush_cycles: usize,
876        context: &mut TestingContext,
877    ) {
878        let r = context.try_resolve_request((), workflow, flush_cycles);
879        assert!(matches!(
880            *r.unwrap_err().cause,
881            CancellationCause::Unreachable(_)
882        ));
883    }
884
885    #[test]
886    fn test_fork_clone() {
887        let mut context = TestingContext::minimal_plugins();
888
889        let workflow = context.spawn_io_workflow(|scope, builder| {
890            let fork = scope.start.fork_clone(builder);
891            let branch_a = fork.clone_output(builder);
892            let branch_b = fork.clone_output(builder);
893            builder.connect(branch_a, scope.terminate);
894            builder.connect(branch_b, scope.terminate);
895        });
896
897        let r = context.resolve_request(5.0, workflow);
898        assert_eq!(r, 5.0);
899
900        let workflow = context.spawn_io_workflow(|scope, builder| {
901            builder.chain(scope.start).fork_clone((
902                |chain: Chain<f64>| chain.connect(scope.terminate),
903                |chain: Chain<f64>| chain.connect(scope.terminate),
904            ));
905        });
906
907        let r = context.resolve_request(3.0, workflow);
908        assert_eq!(r, 3.0);
909
910        let workflow = context.spawn_io_workflow(|scope, builder| {
911            builder.chain(scope.start).fork_clone((
912                |chain: Chain<f64>| {
913                    chain
914                        .map_block(|t| WaitRequest {
915                            duration: Duration::from_secs_f64(10.0 * t),
916                            value: 10.0 * t,
917                        })
918                        .map(|r: AsyncMap<WaitRequest<f64>>| wait(r.request))
919                        .connect(scope.terminate)
920                },
921                |chain: Chain<f64>| {
922                    chain
923                        .map_block(|t| WaitRequest {
924                            duration: Duration::from_secs_f64(t / 100.0),
925                            value: t / 100.0,
926                        })
927                        .map(|r: AsyncMap<WaitRequest<f64>>| wait(r.request))
928                        .connect(scope.terminate)
929                },
930            ));
931        });
932
933        let r = context.resolve_request(1.0, workflow);
934        assert_eq!(r, 0.01);
935
936        let workflow = context.spawn_io_workflow(|scope, builder| {
937            let (fork_input, fork_output) = builder.create_fork_clone();
938            builder.connect(scope.start, fork_input);
939            let a = fork_output.clone_output(builder);
940            let b = fork_output.clone_output(builder);
941            builder.join((a, b)).connect(scope.terminate);
942        });
943
944        let r = context.resolve_request(5, workflow);
945        assert_eq!(r, (5, 5));
946    }
947
948    #[test]
949    fn test_stream_reachability() {
950        let mut context = TestingContext::minimal_plugins();
951
952        // Test for streams from a blocking node
953        let workflow = context.spawn_io_workflow(|scope, builder| {
954            let stream_node = builder.create_map(|_: BlockingMap<(), StreamOf<u32>>| {
955                // Do nothing. The purpose of this node is to just return without
956                // sending off any streams.
957            });
958
959            builder.connect(scope.start, stream_node.input);
960            builder
961                .chain(stream_node.streams)
962                .map_block(|value| 2 * value)
963                .connect(scope.terminate);
964        });
965
966        let r = context.try_resolve_request((), workflow, ());
967        assert!(r.is_err());
968
969        // Test for streams from an async node
970        let workflow = context.spawn_io_workflow(|scope, builder| {
971            let stream_node = builder.create_map(|_: AsyncMap<(), StreamOf<u32>>| {
972                async { /* Do nothing */ }
973            });
974
975            builder.connect(scope.start, stream_node.input);
976            builder
977                .chain(stream_node.streams)
978                .map_block(|value| 2 * value)
979                .connect(scope.terminate);
980        });
981
982        let r = context.try_resolve_request((), workflow, ());
983        assert!(r.is_err());
984    }
985
986    use tokio::sync::mpsc::unbounded_channel;
987
988    #[test]
989    fn test_on_cleanup() {
990        let mut context = TestingContext::minimal_plugins();
991
992        let (sender, mut receiver) = unbounded_channel();
993        let workflow = context.spawn_io_workflow(|scope, builder| {
994            let input = scope.start.fork_clone(builder);
995
996            let buffer = builder.create_buffer(BufferSettings::default());
997            let input_to_buffer = input.clone_output(builder);
998            builder.connect(input_to_buffer, buffer.input_slot());
999
1000            let none_node = builder.create_map_block(produce_none);
1001            let input_to_node = input.clone_output(builder);
1002            builder.connect(input_to_node, none_node.input);
1003            builder
1004                .chain(none_node.output)
1005                .cancel_on_none()
1006                .connect(scope.terminate);
1007
1008            // The chain coming out of the none_node will result in the scope
1009            // being cancelled. After that, this scope should run, and the value
1010            // that went into the buffer should get sent over the channel.
1011            builder.on_cancel(buffer, |scope, builder| {
1012                builder
1013                    .chain(scope.start)
1014                    .consume_buffer::<8>()
1015                    .map_block(move |values| {
1016                        for value in values {
1017                            sender.send(value).unwrap();
1018                        }
1019                    })
1020                    .connect(scope.terminate);
1021            });
1022        });
1023
1024        let r = context.try_resolve_request(5, workflow, ());
1025        assert!(r.is_err());
1026
1027        let channel_output = receiver.try_recv().unwrap();
1028        assert_eq!(channel_output, 5);
1029        assert!(receiver.try_recv().is_err());
1030        assert!(context.confirm_buffers_empty().is_ok());
1031
1032        let (cancel_sender, mut cancel_receiver) = unbounded_channel();
1033        let (terminate_sender, mut terminate_receiver) = unbounded_channel();
1034        let (cleanup_sender, mut cleanup_receiver) = unbounded_channel();
1035        let workflow = context.spawn_io_workflow(|scope, builder| {
1036            let input = scope.start.fork_clone(builder);
1037
1038            let cancel_buffer = builder.create_buffer(BufferSettings::default());
1039            let input_to_cancel = input.clone_output(builder);
1040            builder.connect(input_to_cancel, cancel_buffer.input_slot());
1041
1042            let terminate_buffer = builder.create_buffer(BufferSettings::default());
1043            let input_to_terminate = input.clone_output(builder);
1044            builder.connect(input_to_terminate, terminate_buffer.input_slot());
1045
1046            let cleanup_buffer = builder.create_buffer(BufferSettings::default());
1047            let input_to_cleanup = input.clone_output(builder);
1048            builder.connect(input_to_cleanup, cleanup_buffer.input_slot());
1049
1050            let filter_node =
1051                builder.create_map_block(|value: u64| if value >= 5 { Some(value) } else { None });
1052            let input_to_filter_node = input.clone_output(builder);
1053            builder.connect(input_to_filter_node, filter_node.input);
1054            builder
1055                .chain(filter_node.output)
1056                .cancel_on_none()
1057                .connect(scope.terminate);
1058
1059            builder.on_cancel(cancel_buffer, |scope, builder| {
1060                builder
1061                    .chain(scope.start)
1062                    .consume_buffer::<8>()
1063                    .map_block(move |values| {
1064                        for value in values {
1065                            cancel_sender.send(value).unwrap();
1066                        }
1067                    })
1068                    .connect(scope.terminate);
1069            });
1070
1071            builder.on_terminate(terminate_buffer, |scope, builder| {
1072                builder
1073                    .chain(scope.start)
1074                    .consume_buffer::<8>()
1075                    .map_block(move |values| {
1076                        for value in values {
1077                            terminate_sender.send(value).unwrap();
1078                        }
1079                    })
1080                    .connect(scope.terminate);
1081            });
1082
1083            builder.on_cleanup(cleanup_buffer, |scope, builder| {
1084                builder
1085                    .chain(scope.start)
1086                    .consume_buffer::<8>()
1087                    .map_block(move |values| {
1088                        for value in values {
1089                            cleanup_sender.send(value).unwrap();
1090                        }
1091                    })
1092                    .connect(scope.terminate);
1093            });
1094        });
1095
1096        let r = context.try_resolve_request(3, workflow, 10);
1097        assert!(r.is_err());
1098
1099        assert_eq!(cancel_receiver.try_recv().unwrap(), 3);
1100        assert!(cancel_receiver.try_recv().is_err());
1101        assert_eq!(cleanup_receiver.try_recv().unwrap(), 3);
1102        assert!(cleanup_receiver.try_recv().is_err());
1103        assert!(terminate_receiver.try_recv().is_err());
1104        assert!(context.no_unhandled_errors());
1105        assert!(context.confirm_buffers_empty().is_ok());
1106
1107        let r = context.try_resolve_request(6, workflow, 10).unwrap();
1108        assert_eq!(r, 6);
1109
1110        assert_eq!(terminate_receiver.try_recv().unwrap(), 6);
1111        assert!(terminate_receiver.try_recv().is_err());
1112        assert_eq!(cleanup_receiver.try_recv().unwrap(), 6);
1113        assert!(cleanup_receiver.try_recv().is_err());
1114        assert!(cancel_receiver.try_recv().is_err());
1115        assert!(context.no_unhandled_errors());
1116        assert!(context.confirm_buffers_empty().is_ok());
1117    }
1118
1119    #[test]
1120    fn test_double_collection() {
1121        let mut context = TestingContext::minimal_plugins();
1122
1123        let delay = context.spawn_delay(Duration::from_secs_f32(0.01));
1124
1125        let workflow = context.spawn_io_workflow(|scope, builder| {
1126            // We make the later collect node first so that its disposal update
1127            // gets triggered first. If we don't implement collect correctly
1128            // then the later collect may think it's unreachable and send out
1129            // its collection prematurely. This test is checking for that edge
1130            // case.
1131            let later_collect = builder.create_collect_all::<i32, 8>();
1132            let earlier_collect = builder.create_collect_all::<i32, 8>();
1133
1134            builder
1135                .chain(scope.start)
1136                .spread()
1137                .then(delay)
1138                .map_block(|v| if v <= 4 { Some(v) } else { None })
1139                .dispose_on_none()
1140                .connect(earlier_collect.input);
1141
1142            builder
1143                .chain(earlier_collect.output)
1144                .spread()
1145                .connect(later_collect.input);
1146
1147            builder.connect(later_collect.output, scope.terminate);
1148        });
1149
1150        let r = context.resolve_request([1, 2, 3, 4, 5], workflow);
1151        assert_eq!(r.as_slice(), &[1, 2, 3, 4]);
1152
1153        let workflow = context.spawn_io_workflow(|scope, builder| {
1154            // We create a circular dependency between two collect operations.
1155            // This makes it impossible to determine which should fire when the
1156            // upstream is exhausted because both collect operations are upstream
1157            // of each other. This workflow should be cancelled before it even
1158            // starts.
1159            let earlier_collect = builder.create_collect_all::<i32, 8>();
1160            let later_collect = builder.create_collect_all::<i32, 8>();
1161
1162            builder
1163                .chain(scope.start)
1164                .spread()
1165                .then(delay)
1166                .connect(earlier_collect.input);
1167
1168            builder
1169                .chain(earlier_collect.output)
1170                .spread()
1171                .connect(later_collect.input);
1172
1173            builder.chain(later_collect.output).spread().fork_clone((
1174                |chain: Chain<i32>| chain.connect(earlier_collect.input),
1175                |chain: Chain<i32>| chain.connect(scope.terminate),
1176            ));
1177        });
1178
1179        let r = context.try_resolve_request([1, 2, 3, 4, 5], workflow, ());
1180        assert!(r.is_err());
1181
1182        let workflow = context.spawn_io_workflow(|scope, builder| {
1183            // We create a circulaur dependency between two collect operations,
1184            // but this time one of those collect operations is scoped, and that
1185            // disambiguates the behavior of the collect operations. We should
1186            // get the intuitive workflow output from this.
1187            let earlier_collect = builder.create_collect_all::<i32, 8>();
1188
1189            builder
1190                .chain(scope.start)
1191                .spread()
1192                .then(delay)
1193                .map_block(|v| if v <= 4 { Some(v) } else { None })
1194                .dispose_on_none()
1195                .connect(earlier_collect.input);
1196
1197            let _ = builder
1198                .chain(earlier_collect.output)
1199                .then_io_scope(|scope, builder| {
1200                    builder
1201                        .chain(scope.start)
1202                        .spread()
1203                        .collect_all::<8>()
1204                        .connect(scope.terminate);
1205                })
1206                .fork_clone((
1207                    |chain: Chain<_>| chain.spread().connect(earlier_collect.input),
1208                    |chain: Chain<_>| chain.connect(scope.terminate),
1209                ));
1210        });
1211
1212        check_collections(workflow, [1, 2, 3, 4], [1, 2, 3, 4], &mut context);
1213        check_collections(workflow, [1, 2, 3, 4, 5, 6], [1, 2, 3, 4], &mut context);
1214        check_collections(workflow, [1, 8, 2, 7, 3, 6], [1, 2, 3], &mut context);
1215        check_collections(
1216            workflow,
1217            [8, 7, 6, 5, 4, 3, 2, 1],
1218            [4, 3, 2, 1],
1219            &mut context,
1220        );
1221        check_collections(workflow, [6, 7, 8, 9, 10], [], &mut context);
1222    }
1223
1224    fn check_collections(
1225        workflow: Service<SmallVec<[i32; 8]>, SmallVec<[i32; 8]>>,
1226        input: impl IntoIterator<Item = i32>,
1227        expectation: impl IntoIterator<Item = i32>,
1228        context: &mut TestingContext,
1229    ) {
1230        let input: SmallVec<[i32; 8]> = SmallVec::from_iter(input);
1231        let expectation: SmallVec<[i32; 8]> = SmallVec::from_iter(expectation);
1232        let r = context.resolve_request(input, workflow);
1233        assert_eq!(r, expectation);
1234    }
1235
1236    #[test]
1237    fn benchmarks() {
1238        let mut context = TestingContext::minimal_plugins();
1239
1240        let workflow = context.spawn_io_workflow(|scope, builder| {
1241            let (start_test, end_test) = build_benchmark_fixture(scope, builder);
1242            builder.connect(start_test, end_test);
1243        });
1244
1245        let result = context
1246            .try_resolve_request((), workflow, Duration::from_secs(10))
1247            .unwrap();
1248        println!("Performance for basic connection:\n{result:#?}");
1249
1250        let workflow = context.spawn_io_workflow(|scope, builder| {
1251            let (start_test, end_test) = build_benchmark_fixture(scope, builder);
1252            builder
1253                .chain(start_test)
1254                .then_io_scope(|scope, builder| {
1255                    builder.connect(scope.start, scope.terminate);
1256                })
1257                .connect(end_test);
1258        });
1259
1260        let result = context
1261            .try_resolve_request((), workflow, Duration::from_secs(10))
1262            .unwrap();
1263        println!("Performance for basic connection:\n{result:#?}");
1264    }
1265
1266    fn build_benchmark_fixture(
1267        scope: Scope<(), TimeStats>,
1268        builder: &mut Builder,
1269    ) -> (Output<Instant>, InputSlot<Instant>) {
1270        let initial_time = builder
1271            .commands()
1272            .spawn_service(get_initial_time.into_blocking_service());
1273        let finish_time = builder
1274            .commands()
1275            .spawn_service(finish_time_range.into_blocking_service());
1276        let collect_samples = builder
1277            .commands()
1278            .spawn_service(collect_samples.into_blocking_service());
1279
1280        let samples = builder.create_buffer(BufferSettings::keep_all());
1281
1282        let initial_node = builder.create_node(initial_time);
1283        let finish_node = builder.create_node(finish_time);
1284
1285        builder.connect(scope.start, initial_node.input);
1286        builder.connect(finish_node.output, samples.input_slot());
1287
1288        builder
1289            .listen(samples)
1290            .then(collect_samples)
1291            .dispose_on_none()
1292            .connect(scope.terminate);
1293
1294        builder
1295            .listen(samples)
1296            .map_block(|_| ())
1297            .connect(initial_node.input);
1298
1299        (initial_node.output, finish_node.input)
1300    }
1301
1302    #[derive(Debug, Clone, Copy)]
1303    struct TimeRange {
1304        initial_time: Instant,
1305        finish_time: Instant,
1306    }
1307
1308    #[derive(Debug, Clone, Copy)]
1309    struct TimeStats {
1310        #[allow(unused)]
1311        sample_count: usize,
1312        #[allow(unused)]
1313        average: Duration,
1314        #[allow(unused)]
1315        std_dev: Duration,
1316        #[allow(unused)]
1317        highest: Duration,
1318        #[allow(unused)]
1319        lowest: Duration,
1320    }
1321
1322    impl TimeStats {
1323        fn new(samples: impl IntoIterator<Item = TimeRange>) -> Self {
1324            let samples: Vec<_> = samples
1325                .into_iter()
1326                .map(|s| s.finish_time - s.initial_time)
1327                .collect();
1328            let sample_count = samples.len();
1329
1330            let mut highest = None;
1331            let mut lowest = None;
1332            let mut average = Duration::new(0, 0);
1333            for sample in &samples {
1334                average += *sample;
1335                if highest.is_none_or(|h| *sample > h) {
1336                    highest = Some(*sample);
1337                }
1338
1339                if lowest.is_none_or(|l| *sample < l) {
1340                    lowest = Some(*sample);
1341                }
1342            }
1343
1344            let average = average / sample_count as u32;
1345            let mut radicand = 0.0;
1346            for sample in samples {
1347                let delta = (sample.as_nanos() as i64 - average.as_nanos() as i64) as f64;
1348                radicand += f64::powf(delta, 2.0);
1349            }
1350
1351            let std_dev = Duration::from_nanos(f64::sqrt(radicand) as u64);
1352            let highest = highest.unwrap();
1353            let lowest = lowest.unwrap();
1354            TimeStats {
1355                sample_count,
1356                average,
1357                std_dev,
1358                highest,
1359                lowest,
1360            }
1361        }
1362    }
1363
1364    fn get_initial_time(_: In<()>) -> Instant {
1365        Instant::now()
1366    }
1367
1368    fn finish_time_range(In(initial_time): In<Instant>) -> TimeRange {
1369        TimeRange {
1370            initial_time,
1371            finish_time: Instant::now(),
1372        }
1373    }
1374
1375    fn collect_samples(
1376        In(key): In<BufferKey<TimeRange>>,
1377        access: BufferAccess<TimeRange>,
1378    ) -> Option<TimeStats> {
1379        let samples = access.get(&key).unwrap();
1380        if samples.len() >= 1000 {
1381            Some(TimeStats::new(samples.iter().copied()))
1382        } else {
1383            None
1384        }
1385    }
1386}