use bevy_ecs::prelude::{Commands, Entity};
use std::future::Future;
use smallvec::SmallVec;
use crate::{
Accessible, Accessing, Accessor, AddOperation, AsMap, Buffer, BufferKeys, BufferLocation,
BufferMap, BufferSettings, Bufferable, Buffering, Chain, Collect, ForkClone, ForkCloneOutput,
ForkOptionOutput, ForkResultOutput, ForkTargetStorage, Gate, GateRequest, IncompatibleLayout,
Injection, InputSlot, IntoAsyncMap, IntoBlockingMap, Joinable, Joined, Node, OperateBuffer,
OperateCancel, OperateDynamicGate, OperateQuietCancel, OperateScope, OperateSplit,
OperateStaticGate, Output, Provider, RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints,
ScopeSettings, ScopeSettingsStorage, Sendish, ServiceInstructions, SplitOutputs, Splittable,
StreamPack, StreamTargetMap, StreamsOfMap, Trim, TrimBranch, UnusedTarget, Unzippable,
make_option_branching, make_result_branching,
};
pub(crate) mod connect;
pub(crate) use connect::*;
pub struct Builder<'w, 's, 'a> {
pub(crate) context: BuilderScopeContext,
pub(crate) commands: &'a mut Commands<'w, 's>,
}
#[derive(Clone, Copy, Debug)]
pub struct BuilderScopeContext {
pub(crate) scope: Entity,
pub(crate) finish_scope_cancel: Entity,
}
impl<'w, 's, 'a> Builder<'w, 's, 'a> {
pub fn chain<'b, Response: 'static + Send + Sync>(
&'b mut self,
output: Output<Response>,
) -> Chain<'w, 's, 'a, 'b, Response> {
output.chain(self)
}
pub fn create_node<P: Provider>(
&mut self,
provider: P,
) -> Node<P::Request, P::Response, P::Streams>
where
P::Request: 'static + Send + Sync,
P::Response: 'static + Send + Sync,
P::Streams: StreamPack,
{
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
provider.connect(Some(self.scope()), source, target, self.commands);
let mut map = StreamTargetMap::default();
let streams = <P::Streams as StreamPack>::spawn_node_streams(source, &mut map, self);
self.commands.entity(source).insert(map);
Node {
input: InputSlot::new(self.scope(), source),
output: Output::new(self.scope(), target),
streams,
}
}
pub fn create_map_block<T, U>(
&mut self,
f: impl FnMut(T) -> U + 'static + Send + Sync,
) -> Node<T, U, ()>
where
T: 'static + Send + Sync,
U: 'static + Send + Sync,
{
self.create_node(f.into_blocking_map())
}
pub fn create_map_async<T, Task>(
&mut self,
f: impl FnMut(T) -> Task + 'static + Send + Sync,
) -> Node<T, Task::Output, ()>
where
T: 'static + Send + Sync,
Task: Future + 'static + Sendish,
Task::Output: 'static + Send + Sync,
{
self.create_node(f.into_async_map())
}
pub fn create_map<M, F: AsMap<M>>(
&mut self,
f: F,
) -> Node<RequestOfMap<M, F>, ResponseOfMap<M, F>, StreamsOfMap<M, F>>
where
F::MapType: Provider,
RequestOfMap<M, F>: 'static + Send + Sync,
ResponseOfMap<M, F>: 'static + Send + Sync,
StreamsOfMap<M, F>: StreamPack,
{
self.create_node(f.as_map())
}
pub fn create_injection_node<Request, Response, Streams>(
&mut self,
) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync + Unpin,
Streams: StreamPack,
{
let source = self.commands.spawn(()).id();
self.create_injection_impl::<Request, Response, Streams>(source)
}
pub fn connect<T: 'static + Send + Sync>(&mut self, output: Output<T>, input: InputSlot<T>) {
assert_eq!(output.scope(), input.scope());
self.commands.queue(Connect {
original_target: output.id(),
new_target: input.id(),
});
}
pub fn create_buffer<T: 'static + Send + Sync>(
&mut self,
settings: BufferSettings,
) -> Buffer<T> {
let source = self.commands.spawn(()).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
OperateBuffer::<T>::new(settings),
));
Buffer {
location: BufferLocation {
scope: self.scope(),
source,
},
_ignore: Default::default(),
}
}
pub fn create_scope<Request, Response, Streams, Settings>(
&mut self,
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
) -> Node<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
Settings: Into<ScopeSettings>,
{
let scope_id = self.commands.spawn(()).id();
let exit_scope = self.commands.spawn(UnusedTarget).id();
self.create_scope_impl(scope_id, exit_scope, build)
}
pub fn create_io_scope<Request, Response, Settings>(
&mut self,
build: impl FnOnce(Scope<Request, Response, ()>, &mut Builder) -> Settings,
) -> Node<Request, Response, ()>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Settings: Into<ScopeSettings>,
{
self.create_scope::<Request, Response, (), Settings>(build)
}
pub fn create_fork_clone<T>(&mut self) -> (InputSlot<T>, ForkCloneOutput<T>)
where
T: Clone + 'static + Send + Sync,
{
let source = self.commands.spawn(()).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
ForkClone::<T>::new(ForkTargetStorage::new()),
));
(
InputSlot::new(self.scope(), source),
ForkCloneOutput::new(self.scope(), source),
)
}
pub fn create_unzip<T>(&mut self) -> (InputSlot<T>, T::Unzipped)
where
T: Unzippable + 'static + Send + Sync,
{
let source = self.commands.spawn(()).id();
(
InputSlot::new(self.scope(), source),
T::unzip_output(Output::<T>::new(self.scope(), source), self),
)
}
pub fn create_fork_result<T, E>(&mut self) -> (InputSlot<Result<T, E>>, ForkResultOutput<T, E>)
where
T: 'static + Send + Sync,
E: 'static + Send + Sync,
{
let source = self.commands.spawn(()).id();
let target_ok = self.commands.spawn(UnusedTarget).id();
let target_err = self.commands.spawn(UnusedTarget).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
make_result_branching::<T, E>(ForkTargetStorage::from_iter([target_ok, target_err])),
));
(
InputSlot::new(self.scope(), source),
ForkResultOutput {
ok: Output::new(self.scope(), target_ok),
err: Output::new(self.scope(), target_err),
},
)
}
pub fn create_fork_option<T>(&mut self) -> (InputSlot<Option<T>>, ForkOptionOutput<T>)
where
T: 'static + Send + Sync,
{
let source = self.commands.spawn(()).id();
let target_some = self.commands.spawn(UnusedTarget).id();
let target_none = self.commands.spawn(UnusedTarget).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
make_option_branching::<T>(ForkTargetStorage::from_iter([target_some, target_none])),
));
(
InputSlot::new(self.scope(), source),
ForkOptionOutput {
some: Output::new(self.scope(), target_some),
none: Output::new(self.scope(), target_none),
},
)
}
pub fn join<'b, B: Joinable>(&'b mut self, buffers: B) -> Chain<'w, 's, 'a, 'b, B::Item> {
buffers.join(self)
}
pub fn try_join<'b, J: Joined>(
&'b mut self,
buffers: &BufferMap,
) -> Result<Chain<'w, 's, 'a, 'b, J>, IncompatibleLayout> {
J::try_join_from(buffers, self)
}
pub fn listen<'b, B: Accessible>(&'b mut self, buffers: B) -> Chain<'w, 's, 'a, 'b, B::Keys> {
buffers.listen(self)
}
pub fn try_listen<'b, Keys: Accessor>(
&'b mut self,
buffers: &BufferMap,
) -> Result<Chain<'w, 's, 'a, 'b, Keys>, IncompatibleLayout> {
Keys::try_listen_from(buffers, self)
}
pub fn create_buffer_access<T, B: Bufferable>(
&mut self,
buffers: B,
) -> Node<T, (T, BufferKeys<B>)>
where
B::BufferType: Accessing,
T: 'static + Send + Sync,
{
let buffers = buffers.into_buffer(self);
buffers.access(self)
}
pub fn try_create_buffer_access<T, Keys: Accessor>(
&mut self,
buffers: &BufferMap,
) -> Result<Node<T, (T, Keys)>, IncompatibleLayout>
where
T: 'static + Send + Sync,
{
Keys::try_buffer_access(buffers, self)
}
pub fn create_collect<T, const N: usize>(
&mut self,
min: usize,
max: Option<usize>,
) -> Node<T, SmallVec<[T; N]>>
where
T: 'static + Send + Sync,
{
if let Some(max) = max {
assert!(0 < max);
assert!(min <= max);
}
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
Collect::<T, N>::new(target, min, max),
));
Node {
input: InputSlot::new(self.scope(), source),
output: Output::new(self.scope(), target),
streams: (),
}
}
pub fn create_collect_all<T, const N: usize>(&mut self) -> Node<T, SmallVec<[T; N]>>
where
T: 'static + Send + Sync,
{
self.create_collect(0, None)
}
pub fn create_collect_n<T, const N: usize>(&mut self, n: usize) -> Node<T, SmallVec<[T; N]>>
where
T: 'static + Send + Sync,
{
self.create_collect(n, Some(n))
}
pub fn create_split<T>(&mut self) -> (InputSlot<T>, SplitOutputs<T>)
where
T: 'static + Send + Sync + Splittable,
{
let source = self.commands.spawn(()).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
OperateSplit::<T>::default(),
));
(
InputSlot::new(self.scope(), source),
SplitOutputs::new(self.scope(), source),
)
}
pub fn create_cancel<T>(&mut self) -> InputSlot<T>
where
T: 'static + Send + Sync + ToString,
{
let source = self.commands.spawn(()).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
OperateCancel::<T>::new(),
));
InputSlot::new(self.scope(), source)
}
pub fn create_quiet_cancel(&mut self) -> InputSlot<()> {
let source = self.commands.spawn(()).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
OperateQuietCancel,
));
InputSlot::new(self.scope(), source)
}
pub fn on_cleanup<B, Settings>(
&mut self,
from_buffers: B,
build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
) where
B: Bufferable,
B::BufferType: Accessing,
Settings: Into<ScopeSettings>,
{
from_buffers.into_buffer(self).on_cleanup(self, build);
}
pub fn on_cancel<B, Settings>(
&mut self,
from_buffers: B,
build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
) where
B: Bufferable,
B::BufferType: Accessing,
Settings: Into<ScopeSettings>,
{
from_buffers.into_buffer(self).on_cancel(self, build);
}
pub fn on_terminate<B, Settings>(
&mut self,
from_buffers: B,
build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
) where
B: Bufferable,
B::BufferType: Accessing,
Settings: Into<ScopeSettings>,
{
from_buffers.into_buffer(self).on_terminate(self, build);
}
pub fn on_cleanup_if<B, Settings>(
&mut self,
conditions: CleanupWorkflowConditions,
from_buffers: B,
build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
) where
B: Bufferable,
B::BufferType: Accessing,
Settings: Into<ScopeSettings>,
{
from_buffers
.into_buffer(self)
.on_cleanup_if(self, conditions, build);
}
pub fn create_trim<T>(&mut self, branches: impl IntoIterator<Item = TrimBranch>) -> Node<T, T>
where
T: 'static + Send + Sync,
{
let branches: SmallVec<[_; 16]> = branches.into_iter().collect();
for branch in &branches {
branch.verify_scope(self.scope());
}
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
Trim::<T>::new(branches, target),
));
Node {
input: InputSlot::new(self.scope(), source),
output: Output::new(self.scope(), target),
streams: (),
}
}
pub fn create_gate<T, B>(&mut self, buffers: B) -> Node<GateRequest<T>, T>
where
B: Bufferable,
T: 'static + Send + Sync,
{
let buffers = buffers.into_buffer(self);
buffers.verify_scope(self.scope());
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
OperateDynamicGate::<T, _>::new(buffers, target),
));
Node {
input: InputSlot::new(self.scope(), source),
output: Output::new(self.scope(), target),
streams: (),
}
}
pub fn create_gate_action<T, B>(&mut self, action: Gate, buffers: B) -> Node<T, T>
where
B: Bufferable,
T: 'static + Send + Sync,
{
let buffers = buffers.into_buffer(self);
buffers.verify_scope(self.scope());
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
OperateStaticGate::<T, _>::new(buffers, target, action),
));
Node {
input: InputSlot::new(self.scope(), source),
output: Output::new(self.scope(), target),
streams: (),
}
}
pub fn create_gate_open<B, T>(&mut self, buffers: B) -> Node<T, T>
where
B: Bufferable,
T: 'static + Send + Sync,
{
self.create_gate_action(Gate::Open, buffers)
}
pub fn create_gate_close<T, B>(&mut self, buffers: B) -> Node<T, T>
where
B: Bufferable,
T: 'static + Send + Sync,
{
self.create_gate_action(Gate::Closed, buffers)
}
pub fn scope(&self) -> Entity {
self.context.scope
}
pub fn commands(&mut self) -> &mut Commands<'w, 's> {
self.commands
}
pub(crate) fn create_scope_impl<Request, Response, Streams, Settings>(
&mut self,
scope_id: Entity,
exit_scope: Entity,
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
) -> Node<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
Settings: Into<ScopeSettings>,
{
let ScopeEndpoints {
terminal,
enter_scope,
finish_scope_cancel,
} = OperateScope::add::<Request, Response>(
Some(self.scope()),
scope_id,
Some(exit_scope),
self.commands,
);
let (stream_in, stream_out) =
Streams::spawn_scope_streams(scope_id, self.scope(), self.commands);
let mut builder = Builder {
context: BuilderScopeContext {
scope: scope_id,
finish_scope_cancel,
},
commands: self.commands,
};
let scope = Scope {
start: Output::new(scope_id, enter_scope),
terminate: InputSlot::new(scope_id, terminal),
streams: stream_in,
};
let settings = build(scope, &mut builder).into();
self.commands
.entity(scope_id)
.insert(ScopeSettingsStorage(settings));
Node {
input: InputSlot::new(self.scope(), scope_id),
output: Output::new(self.scope(), exit_scope),
streams: stream_out,
}
}
pub(crate) fn create_injection_impl<Request, Response, Streams>(
&mut self,
source: Entity,
) -> Node<(Request, ServiceInstructions<Request, Response, Streams>), Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
let target = self.commands.spawn(UnusedTarget).id();
let mut map = StreamTargetMap::default();
let streams = Streams::spawn_node_streams(source, &mut map, self);
self.commands.entity(source).insert(map);
self.commands.queue(AddOperation::new(
Some(self.scope()),
source,
Injection::<Request, Response, Streams>::new(target),
));
Node {
input: InputSlot::new(self.scope(), source),
output: Output::new(self.scope(), target),
streams,
}
}
pub fn context(&self) -> BuilderScopeContext {
self.context
}
}
#[derive(Clone)]
pub struct CleanupWorkflowConditions {
pub(crate) run_on_terminate: bool,
pub(crate) run_on_cancel: bool,
}
impl CleanupWorkflowConditions {
pub fn always_if(run_on_terminate: bool, run_on_cancel: bool) -> Self {
CleanupWorkflowConditions {
run_on_terminate,
run_on_cancel,
}
}
}
#[cfg(test)]
mod tests {
use crate::{CancellationCause, prelude::*, testing::*};
use smallvec::SmallVec;
use std::time::Instant;
#[test]
fn test_disconnected_workflow() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|_, _| {
});
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
let workflow = context.spawn_io_workflow(|scope, builder| {
let node = builder.create_map_block(|v| v);
builder.connect(scope.start, node.input);
builder.connect(node.output, node.input);
});
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
let workflow = context.spawn_io_workflow(|scope, builder| {
builder.chain(scope.start).map_block(|v| v).fork_clone((
|chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
|chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
|chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
));
let exit_node = builder.create_map_block(|v| v);
builder.connect(exit_node.output, scope.terminate);
});
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
let workflow = context.spawn_io_workflow(|scope, builder| {
let entry_buffer = builder.create_buffer::<()>(BufferSettings::keep_all());
builder.chain(scope.start).map_block(|v| v).fork_clone((
|chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
|chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
|chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
));
let exit_buffer = builder.create_buffer::<()>(BufferSettings::keep_all());
builder
.listen(exit_buffer)
.map_block(|_| ())
.connect(scope.terminate);
});
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
}
fn check_unreachable(
workflow: Service<(), ()>,
flush_cycles: usize,
context: &mut TestingContext,
) {
let r = context.try_resolve_request((), workflow, flush_cycles);
assert!(matches!(
*r.unwrap_err().cause,
CancellationCause::Unreachable(_)
));
}
#[test]
fn test_fork_clone() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let fork = scope.start.fork_clone(builder);
let branch_a = fork.clone_output(builder);
let branch_b = fork.clone_output(builder);
builder.connect(branch_a, scope.terminate);
builder.connect(branch_b, scope.terminate);
});
let r = context.resolve_request(5.0, workflow);
assert_eq!(r, 5.0);
let workflow = context.spawn_io_workflow(|scope, builder| {
builder.chain(scope.start).fork_clone((
|chain: Chain<f64>| chain.connect(scope.terminate),
|chain: Chain<f64>| chain.connect(scope.terminate),
));
});
let r = context.resolve_request(3.0, workflow);
assert_eq!(r, 3.0);
let workflow = context.spawn_io_workflow(|scope, builder| {
builder.chain(scope.start).fork_clone((
|chain: Chain<f64>| {
chain
.map_block(|t| WaitRequest {
duration: Duration::from_secs_f64(10.0 * t),
value: 10.0 * t,
})
.map(|r: AsyncMap<WaitRequest<f64>>| wait(r.request))
.connect(scope.terminate)
},
|chain: Chain<f64>| {
chain
.map_block(|t| WaitRequest {
duration: Duration::from_secs_f64(t / 100.0),
value: t / 100.0,
})
.map(|r: AsyncMap<WaitRequest<f64>>| wait(r.request))
.connect(scope.terminate)
},
));
});
let r = context.resolve_request(1.0, workflow);
assert_eq!(r, 0.01);
let workflow = context.spawn_io_workflow(|scope, builder| {
let (fork_input, fork_output) = builder.create_fork_clone();
builder.connect(scope.start, fork_input);
let a = fork_output.clone_output(builder);
let b = fork_output.clone_output(builder);
builder.join((a, b)).connect(scope.terminate);
});
let r = context.resolve_request(5, workflow);
assert_eq!(r, (5, 5));
}
#[test]
fn test_stream_reachability() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let stream_node = builder.create_map(|_: BlockingMap<(), StreamOf<u32>>| {
});
builder.connect(scope.start, stream_node.input);
builder
.chain(stream_node.streams)
.map_block(|value| 2 * value)
.connect(scope.terminate);
});
let r = context.try_resolve_request((), workflow, ());
assert!(r.is_err());
let workflow = context.spawn_io_workflow(|scope, builder| {
let stream_node = builder.create_map(|_: AsyncMap<(), StreamOf<u32>>| {
async { }
});
builder.connect(scope.start, stream_node.input);
builder
.chain(stream_node.streams)
.map_block(|value| 2 * value)
.connect(scope.terminate);
});
let r = context.try_resolve_request((), workflow, ());
assert!(r.is_err());
}
use tokio::sync::mpsc::unbounded_channel;
#[test]
fn test_on_cleanup() {
let mut context = TestingContext::minimal_plugins();
let (sender, mut receiver) = unbounded_channel();
let workflow = context.spawn_io_workflow(|scope, builder| {
let input = scope.start.fork_clone(builder);
let buffer = builder.create_buffer(BufferSettings::default());
let input_to_buffer = input.clone_output(builder);
builder.connect(input_to_buffer, buffer.input_slot());
let none_node = builder.create_map_block(produce_none);
let input_to_node = input.clone_output(builder);
builder.connect(input_to_node, none_node.input);
builder
.chain(none_node.output)
.cancel_on_none()
.connect(scope.terminate);
builder.on_cancel(buffer, |scope, builder| {
builder
.chain(scope.start)
.consume_buffer::<8>()
.map_block(move |values| {
for value in values {
sender.send(value).unwrap();
}
})
.connect(scope.terminate);
});
});
let r = context.try_resolve_request(5, workflow, ());
assert!(r.is_err());
let channel_output = receiver.try_recv().unwrap();
assert_eq!(channel_output, 5);
assert!(receiver.try_recv().is_err());
assert!(context.confirm_buffers_empty().is_ok());
let (cancel_sender, mut cancel_receiver) = unbounded_channel();
let (terminate_sender, mut terminate_receiver) = unbounded_channel();
let (cleanup_sender, mut cleanup_receiver) = unbounded_channel();
let workflow = context.spawn_io_workflow(|scope, builder| {
let input = scope.start.fork_clone(builder);
let cancel_buffer = builder.create_buffer(BufferSettings::default());
let input_to_cancel = input.clone_output(builder);
builder.connect(input_to_cancel, cancel_buffer.input_slot());
let terminate_buffer = builder.create_buffer(BufferSettings::default());
let input_to_terminate = input.clone_output(builder);
builder.connect(input_to_terminate, terminate_buffer.input_slot());
let cleanup_buffer = builder.create_buffer(BufferSettings::default());
let input_to_cleanup = input.clone_output(builder);
builder.connect(input_to_cleanup, cleanup_buffer.input_slot());
let filter_node =
builder.create_map_block(|value: u64| if value >= 5 { Some(value) } else { None });
let input_to_filter_node = input.clone_output(builder);
builder.connect(input_to_filter_node, filter_node.input);
builder
.chain(filter_node.output)
.cancel_on_none()
.connect(scope.terminate);
builder.on_cancel(cancel_buffer, |scope, builder| {
builder
.chain(scope.start)
.consume_buffer::<8>()
.map_block(move |values| {
for value in values {
cancel_sender.send(value).unwrap();
}
})
.connect(scope.terminate);
});
builder.on_terminate(terminate_buffer, |scope, builder| {
builder
.chain(scope.start)
.consume_buffer::<8>()
.map_block(move |values| {
for value in values {
terminate_sender.send(value).unwrap();
}
})
.connect(scope.terminate);
});
builder.on_cleanup(cleanup_buffer, |scope, builder| {
builder
.chain(scope.start)
.consume_buffer::<8>()
.map_block(move |values| {
for value in values {
cleanup_sender.send(value).unwrap();
}
})
.connect(scope.terminate);
});
});
let r = context.try_resolve_request(3, workflow, 10);
assert!(r.is_err());
assert_eq!(cancel_receiver.try_recv().unwrap(), 3);
assert!(cancel_receiver.try_recv().is_err());
assert_eq!(cleanup_receiver.try_recv().unwrap(), 3);
assert!(cleanup_receiver.try_recv().is_err());
assert!(terminate_receiver.try_recv().is_err());
assert!(context.no_unhandled_errors());
assert!(context.confirm_buffers_empty().is_ok());
let r = context.try_resolve_request(6, workflow, 10).unwrap();
assert_eq!(r, 6);
assert_eq!(terminate_receiver.try_recv().unwrap(), 6);
assert!(terminate_receiver.try_recv().is_err());
assert_eq!(cleanup_receiver.try_recv().unwrap(), 6);
assert!(cleanup_receiver.try_recv().is_err());
assert!(cancel_receiver.try_recv().is_err());
assert!(context.no_unhandled_errors());
assert!(context.confirm_buffers_empty().is_ok());
}
#[test]
fn test_double_collection() {
let mut context = TestingContext::minimal_plugins();
let delay = context.spawn_delay(Duration::from_secs_f32(0.01));
let workflow = context.spawn_io_workflow(|scope, builder| {
let later_collect = builder.create_collect_all::<i32, 8>();
let earlier_collect = builder.create_collect_all::<i32, 8>();
builder
.chain(scope.start)
.spread()
.then(delay)
.map_block(|v| if v <= 4 { Some(v) } else { None })
.dispose_on_none()
.connect(earlier_collect.input);
builder
.chain(earlier_collect.output)
.spread()
.connect(later_collect.input);
builder.connect(later_collect.output, scope.terminate);
});
let r = context.resolve_request([1, 2, 3, 4, 5], workflow);
assert_eq!(r.as_slice(), &[1, 2, 3, 4]);
let workflow = context.spawn_io_workflow(|scope, builder| {
let earlier_collect = builder.create_collect_all::<i32, 8>();
let later_collect = builder.create_collect_all::<i32, 8>();
builder
.chain(scope.start)
.spread()
.then(delay)
.connect(earlier_collect.input);
builder
.chain(earlier_collect.output)
.spread()
.connect(later_collect.input);
builder.chain(later_collect.output).spread().fork_clone((
|chain: Chain<i32>| chain.connect(earlier_collect.input),
|chain: Chain<i32>| chain.connect(scope.terminate),
));
});
let r = context.try_resolve_request([1, 2, 3, 4, 5], workflow, ());
assert!(r.is_err());
let workflow = context.spawn_io_workflow(|scope, builder| {
let earlier_collect = builder.create_collect_all::<i32, 8>();
builder
.chain(scope.start)
.spread()
.then(delay)
.map_block(|v| if v <= 4 { Some(v) } else { None })
.dispose_on_none()
.connect(earlier_collect.input);
let _ = builder
.chain(earlier_collect.output)
.then_io_scope(|scope, builder| {
builder
.chain(scope.start)
.spread()
.collect_all::<8>()
.connect(scope.terminate);
})
.fork_clone((
|chain: Chain<_>| chain.spread().connect(earlier_collect.input),
|chain: Chain<_>| chain.connect(scope.terminate),
));
});
check_collections(workflow, [1, 2, 3, 4], [1, 2, 3, 4], &mut context);
check_collections(workflow, [1, 2, 3, 4, 5, 6], [1, 2, 3, 4], &mut context);
check_collections(workflow, [1, 8, 2, 7, 3, 6], [1, 2, 3], &mut context);
check_collections(
workflow,
[8, 7, 6, 5, 4, 3, 2, 1],
[4, 3, 2, 1],
&mut context,
);
check_collections(workflow, [6, 7, 8, 9, 10], [], &mut context);
}
fn check_collections(
workflow: Service<SmallVec<[i32; 8]>, SmallVec<[i32; 8]>>,
input: impl IntoIterator<Item = i32>,
expectation: impl IntoIterator<Item = i32>,
context: &mut TestingContext,
) {
let input: SmallVec<[i32; 8]> = SmallVec::from_iter(input);
let expectation: SmallVec<[i32; 8]> = SmallVec::from_iter(expectation);
let r = context.resolve_request(input, workflow);
assert_eq!(r, expectation);
}
#[test]
fn benchmarks() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let (start_test, end_test) = build_benchmark_fixture(scope, builder);
builder.connect(start_test, end_test);
});
let result = context
.try_resolve_request((), workflow, Duration::from_secs(10))
.unwrap();
println!("Performance for basic connection:\n{result:#?}");
let workflow = context.spawn_io_workflow(|scope, builder| {
let (start_test, end_test) = build_benchmark_fixture(scope, builder);
builder
.chain(start_test)
.then_io_scope(|scope, builder| {
builder.connect(scope.start, scope.terminate);
})
.connect(end_test);
});
let result = context
.try_resolve_request((), workflow, Duration::from_secs(10))
.unwrap();
println!("Performance for basic connection:\n{result:#?}");
}
fn build_benchmark_fixture(
scope: Scope<(), TimeStats>,
builder: &mut Builder,
) -> (Output<Instant>, InputSlot<Instant>) {
let initial_time = builder
.commands()
.spawn_service(get_initial_time.into_blocking_service());
let finish_time = builder
.commands()
.spawn_service(finish_time_range.into_blocking_service());
let collect_samples = builder
.commands()
.spawn_service(collect_samples.into_blocking_service());
let samples = builder.create_buffer(BufferSettings::keep_all());
let initial_node = builder.create_node(initial_time);
let finish_node = builder.create_node(finish_time);
builder.connect(scope.start, initial_node.input);
builder.connect(finish_node.output, samples.input_slot());
builder
.listen(samples)
.then(collect_samples)
.dispose_on_none()
.connect(scope.terminate);
builder
.listen(samples)
.map_block(|_| ())
.connect(initial_node.input);
(initial_node.output, finish_node.input)
}
#[derive(Debug, Clone, Copy)]
struct TimeRange {
initial_time: Instant,
finish_time: Instant,
}
#[derive(Debug, Clone, Copy)]
struct TimeStats {
#[allow(unused)]
sample_count: usize,
#[allow(unused)]
average: Duration,
#[allow(unused)]
std_dev: Duration,
#[allow(unused)]
highest: Duration,
#[allow(unused)]
lowest: Duration,
}
impl TimeStats {
fn new(samples: impl IntoIterator<Item = TimeRange>) -> Self {
let samples: Vec<_> = samples
.into_iter()
.map(|s| s.finish_time - s.initial_time)
.collect();
let sample_count = samples.len();
let mut highest = None;
let mut lowest = None;
let mut average = Duration::new(0, 0);
for sample in &samples {
average += *sample;
if highest.is_none_or(|h| *sample > h) {
highest = Some(*sample);
}
if lowest.is_none_or(|l| *sample < l) {
lowest = Some(*sample);
}
}
let average = average / sample_count as u32;
let mut radicand = 0.0;
for sample in samples {
let delta = (sample.as_nanos() as i64 - average.as_nanos() as i64) as f64;
radicand += f64::powf(delta, 2.0);
}
let std_dev = Duration::from_nanos(f64::sqrt(radicand) as u64);
let highest = highest.unwrap();
let lowest = lowest.unwrap();
TimeStats {
sample_count,
average,
std_dev,
highest,
lowest,
}
}
}
fn get_initial_time(_: In<()>) -> Instant {
Instant::now()
}
fn finish_time_range(In(initial_time): In<Instant>) -> TimeRange {
TimeRange {
initial_time,
finish_time: Instant::now(),
}
}
fn collect_samples(
In(key): In<BufferKey<TimeRange>>,
access: BufferAccess<TimeRange>,
) -> Option<TimeStats> {
let samples = access.get(&key).unwrap();
if samples.len() >= 1000 {
Some(TimeStats::new(samples.iter().copied()))
} else {
None
}
}
}