use bevy_ecs::prelude::{Commands, Entity};
use bevy_hierarchy::prelude::BuildChildren;
use std::future::Future;
use smallvec::SmallVec;
use crate::{
AddOperation, AsMap, BeginCleanupWorkflow, Buffer, BufferItem, BufferKeys, BufferSettings,
Bufferable, Buffered, Chain, Collect, ForkClone, ForkCloneOutput, ForkTargetStorage, Gate,
GateRequest, Injection, InputSlot, IntoAsyncMap, IntoBlockingMap, Node, OperateBuffer,
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateStaticGate, Output, Provider,
RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings, ScopeSettingsStorage,
Sendish, Service, StreamPack, StreamTargetMap, StreamsOfMap, Trim, TrimBranch, UnusedTarget,
};
pub(crate) mod connect;
pub(crate) use connect::*;
pub struct Builder<'w, 's, 'a> {
pub(crate) scope: Entity,
pub(crate) finish_scope_cancel: Entity,
pub(crate) commands: &'a mut Commands<'w, 's>,
}
impl<'w, 's, 'a> Builder<'w, 's, 'a> {
pub fn create_node<P: Provider>(
&mut self,
provider: P,
) -> Node<P::Request, P::Response, P::Streams>
where
P::Request: 'static + Send + Sync,
P::Response: 'static + Send + Sync,
P::Streams: StreamPack,
{
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
provider.connect(Some(self.scope), source, target, self.commands);
let mut map = StreamTargetMap::default();
let (bundle, streams) =
<P::Streams as StreamPack>::spawn_node_streams(source, &mut map, self);
self.commands.entity(source).insert((bundle, map));
Node {
input: InputSlot::new(self.scope, source),
output: Output::new(self.scope, target),
streams,
}
}
pub fn create_map_block<T, U>(
&mut self,
f: impl FnMut(T) -> U + 'static + Send + Sync,
) -> Node<T, U, ()>
where
T: 'static + Send + Sync,
U: 'static + Send + Sync,
{
self.create_node(f.into_blocking_map())
}
pub fn create_map_async<T, Task>(
&mut self,
f: impl FnMut(T) -> Task + 'static + Send + Sync,
) -> Node<T, Task::Output, ()>
where
T: 'static + Send + Sync,
Task: Future + 'static + Sendish,
Task::Output: 'static + Send + Sync,
{
self.create_node(f.into_async_map())
}
pub fn create_map<M, F: AsMap<M>>(
&mut self,
f: F,
) -> Node<RequestOfMap<M, F>, ResponseOfMap<M, F>, StreamsOfMap<M, F>>
where
F::MapType: Provider,
RequestOfMap<M, F>: 'static + Send + Sync,
ResponseOfMap<M, F>: 'static + Send + Sync,
StreamsOfMap<M, F>: StreamPack,
{
self.create_node(f.as_map())
}
pub fn create_injection_node<Request, Response, Streams>(
&mut self,
) -> Node<(Request, Service<Request, Response, Streams>), Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync + Unpin,
Streams: StreamPack,
{
let source = self.commands.spawn(()).id();
self.create_injection_impl::<Request, Response, Streams>(source)
}
pub fn connect<T: 'static + Send + Sync>(&mut self, output: Output<T>, input: InputSlot<T>) {
assert_eq!(output.scope(), input.scope());
self.commands.add(Connect {
original_target: output.id(),
new_target: input.id(),
});
}
pub fn create_buffer<T: 'static + Send + Sync>(
&mut self,
settings: BufferSettings,
) -> Buffer<T> {
let source = self.commands.spawn(()).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
OperateBuffer::<T>::new(settings),
));
Buffer {
scope: self.scope,
source,
_ignore: Default::default(),
}
}
pub fn create_scope<Request, Response, Streams, Settings>(
&mut self,
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
) -> Node<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
Settings: Into<ScopeSettings>,
{
let scope_id = self.commands.spawn(()).id();
let exit_scope = self.commands.spawn(UnusedTarget).id();
self.create_scope_impl(scope_id, exit_scope, build)
}
pub fn create_io_scope<Request, Response, Settings>(
&mut self,
build: impl FnOnce(Scope<Request, Response, ()>, &mut Builder) -> Settings,
) -> Node<Request, Response, ()>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Settings: Into<ScopeSettings>,
{
self.create_scope::<Request, Response, (), Settings>(build)
}
pub fn create_fork_clone<T>(&mut self) -> (InputSlot<T>, ForkCloneOutput<T>)
where
T: Clone + 'static + Send + Sync,
{
let source = self.commands.spawn(()).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
ForkClone::<T>::new(ForkTargetStorage::new()),
));
(
InputSlot::new(self.scope, source),
ForkCloneOutput::new(self.scope, source),
)
}
pub fn join<'b, B: Bufferable>(&'b mut self, buffers: B) -> Chain<'w, 's, 'a, 'b, BufferItem<B>>
where
B::BufferType: 'static + Send + Sync,
BufferItem<B>: 'static + Send + Sync,
{
buffers.join(self)
}
pub fn listen<'b, B: Bufferable>(
&'b mut self,
buffers: B,
) -> Chain<'w, 's, 'a, 'b, BufferKeys<B>>
where
B::BufferType: 'static + Send + Sync,
BufferKeys<B>: 'static + Send + Sync,
{
buffers.listen(self)
}
pub fn create_buffer_access<T, B>(&mut self, buffers: B) -> Node<T, (T, BufferKeys<B>)>
where
T: 'static + Send + Sync,
B: Bufferable,
B::BufferType: 'static + Send + Sync,
BufferKeys<B>: 'static + Send + Sync,
{
let buffers = buffers.into_buffer(self);
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
OperateBufferAccess::<T, B::BufferType>::new(buffers, target),
));
Node {
input: InputSlot::new(self.scope, source),
output: Output::new(self.scope, target),
streams: (),
}
}
pub fn create_collect<T, const N: usize>(
&mut self,
min: usize,
max: Option<usize>,
) -> Node<T, SmallVec<[T; N]>>
where
T: 'static + Send + Sync,
{
if let Some(max) = max {
assert!(0 < max);
assert!(min <= max);
}
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
Collect::<T, N>::new(target, min, max),
));
Node {
input: InputSlot::new(self.scope, source),
output: Output::new(self.scope, target),
streams: (),
}
}
pub fn create_collect_all<T, const N: usize>(&mut self) -> Node<T, SmallVec<[T; N]>>
where
T: 'static + Send + Sync,
{
self.create_collect(0, None)
}
pub fn create_collect_n<T, const N: usize>(&mut self, n: usize) -> Node<T, SmallVec<[T; N]>>
where
T: 'static + Send + Sync,
{
self.create_collect(n, Some(n))
}
pub fn on_cleanup<B, Settings>(
&mut self,
from_buffers: B,
build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
) where
B: Bufferable,
B::BufferType: 'static + Send + Sync,
BufferKeys<B>: 'static + Send + Sync,
Settings: Into<ScopeSettings>,
{
self.on_cleanup_if(
CleanupWorkflowConditions::always_if(true, true),
from_buffers,
build,
)
}
pub fn on_cancel<B, Settings>(
&mut self,
from_buffers: B,
build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
) where
B: Bufferable,
B::BufferType: 'static + Send + Sync,
BufferKeys<B>: 'static + Send + Sync,
Settings: Into<ScopeSettings>,
{
self.on_cleanup_if(
CleanupWorkflowConditions::always_if(false, true),
from_buffers,
build,
)
}
pub fn on_terminate<B, Settings>(
&mut self,
from_buffers: B,
build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
) where
B: Bufferable,
B::BufferType: 'static + Send + Sync,
BufferKeys<B>: 'static + Send + Sync,
Settings: Into<ScopeSettings>,
{
self.on_cleanup_if(
CleanupWorkflowConditions::always_if(true, false),
from_buffers,
build,
)
}
pub fn on_cleanup_if<B, Settings>(
&mut self,
conditions: CleanupWorkflowConditions,
from_buffers: B,
build: impl FnOnce(Scope<BufferKeys<B>, (), ()>, &mut Builder) -> Settings,
) where
B: Bufferable,
B::BufferType: 'static + Send + Sync,
BufferKeys<B>: 'static + Send + Sync,
Settings: Into<ScopeSettings>,
{
let cancelling_scope_id = self.commands.spawn(()).id();
let _ = self.create_scope_impl::<BufferKeys<B>, (), (), Settings>(
cancelling_scope_id,
self.finish_scope_cancel,
build,
);
let begin_cancel = self.commands.spawn(()).set_parent(self.scope).id();
let buffers = from_buffers.into_buffer(self);
buffers.verify_scope(self.scope);
self.commands.add(AddOperation::new(
None,
begin_cancel,
BeginCleanupWorkflow::<B::BufferType>::new(
self.scope,
buffers,
cancelling_scope_id,
conditions.run_on_terminate,
conditions.run_on_cancel,
),
));
}
pub fn create_trim<T>(&mut self, branches: impl IntoIterator<Item = TrimBranch>) -> Node<T, T>
where
T: 'static + Send + Sync,
{
let branches: SmallVec<[_; 16]> = branches.into_iter().collect();
for branch in &branches {
branch.verify_scope(self.scope);
}
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
Trim::<T>::new(branches, target),
));
Node {
input: InputSlot::new(self.scope, source),
output: Output::new(self.scope, target),
streams: (),
}
}
pub fn create_gate<T, B>(&mut self, buffers: B) -> Node<GateRequest<T>, T>
where
B: Bufferable,
B::BufferType: 'static + Send + Sync,
T: 'static + Send + Sync,
{
let buffers = buffers.into_buffer(self);
buffers.verify_scope(self.scope);
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
OperateDynamicGate::<T, _>::new(buffers, target),
));
Node {
input: InputSlot::new(self.scope, source),
output: Output::new(self.scope, target),
streams: (),
}
}
pub fn create_gate_action<T, B>(&mut self, action: Gate, buffers: B) -> Node<T, T>
where
B: Bufferable,
B::BufferType: 'static + Send + Sync,
T: 'static + Send + Sync,
{
let buffers = buffers.into_buffer(self);
buffers.verify_scope(self.scope);
let source = self.commands.spawn(()).id();
let target = self.commands.spawn(UnusedTarget).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
OperateStaticGate::<T, _>::new(buffers, target, action),
));
Node {
input: InputSlot::new(self.scope, source),
output: Output::new(self.scope, target),
streams: (),
}
}
pub fn create_gate_open<B, T>(&mut self, buffers: B) -> Node<T, T>
where
B: Bufferable,
B::BufferType: 'static + Send + Sync,
T: 'static + Send + Sync,
{
self.create_gate_action(Gate::Open, buffers)
}
pub fn create_gate_close<T, B>(&mut self, buffers: B) -> Node<T, T>
where
B: Bufferable,
B::BufferType: 'static + Send + Sync,
T: 'static + Send + Sync,
{
self.create_gate_action(Gate::Closed, buffers)
}
pub fn scope(&self) -> Entity {
self.scope
}
pub fn commands(&mut self) -> &mut Commands<'w, 's> {
self.commands
}
pub(crate) fn create_scope_impl<Request, Response, Streams, Settings>(
&mut self,
scope_id: Entity,
exit_scope: Entity,
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
) -> Node<Request, Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
Settings: Into<ScopeSettings>,
{
let ScopeEndpoints {
terminal,
enter_scope,
finish_scope_cancel,
} = OperateScope::<Request, Response, Streams>::add(
Some(self.scope()),
scope_id,
Some(exit_scope),
self.commands,
);
let (stream_in, stream_out) =
Streams::spawn_scope_streams(scope_id, self.scope, self.commands);
let mut builder = Builder {
scope: scope_id,
finish_scope_cancel,
commands: self.commands,
};
let scope = Scope {
input: Output::new(scope_id, enter_scope),
terminate: InputSlot::new(scope_id, terminal),
streams: stream_in,
};
let settings = build(scope, &mut builder).into();
self.commands
.entity(scope_id)
.insert(ScopeSettingsStorage(settings));
Node {
input: InputSlot::new(self.scope, scope_id),
output: Output::new(self.scope, exit_scope),
streams: stream_out,
}
}
pub(crate) fn create_injection_impl<Request, Response, Streams>(
&mut self,
source: Entity,
) -> Node<(Request, Service<Request, Response, Streams>), Response, Streams>
where
Request: 'static + Send + Sync,
Response: 'static + Send + Sync,
Streams: StreamPack,
{
let target = self.commands.spawn(UnusedTarget).id();
let mut map = StreamTargetMap::default();
let (bundle, streams) = Streams::spawn_node_streams(source, &mut map, self);
self.commands.entity(source).insert((bundle, map));
self.commands.add(AddOperation::new(
Some(self.scope),
source,
Injection::<Request, Response, Streams>::new(target),
));
Node {
input: InputSlot::new(self.scope, source),
output: Output::new(self.scope, target),
streams,
}
}
}
#[derive(Clone)]
pub struct CleanupWorkflowConditions {
run_on_terminate: bool,
run_on_cancel: bool,
}
impl CleanupWorkflowConditions {
pub fn always_if(run_on_terminate: bool, run_on_cancel: bool) -> Self {
CleanupWorkflowConditions {
run_on_terminate,
run_on_cancel,
}
}
}
#[cfg(test)]
mod tests {
use crate::{prelude::*, testing::*, CancellationCause};
use smallvec::SmallVec;
#[test]
fn test_disconnected_workflow() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|_, _| {
});
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
let workflow = context.spawn_io_workflow(|scope, builder| {
let node = builder.create_map_block(|v| v);
builder.connect(scope.input, node.input);
builder.connect(node.output, node.input);
});
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
let workflow = context.spawn_io_workflow(|scope, builder| {
scope.input.chain(builder).map_block(|v| v).fork_clone((
|chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
|chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
|chain: Chain<()>| chain.map_block(|v| v).map_block(|v| v).unused(),
));
let exit_node = builder.create_map_block(|v| v);
builder.connect(exit_node.output, scope.terminate);
});
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
let workflow = context.spawn_io_workflow(|scope, builder| {
let entry_buffer = builder.create_buffer::<()>(BufferSettings::keep_all());
scope.input.chain(builder).map_block(|v| v).fork_clone((
|chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
|chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
|chain: Chain<()>| chain.map_block(|v| v).connect(entry_buffer.input_slot()),
));
let exit_buffer = builder.create_buffer::<()>(BufferSettings::keep_all());
builder
.listen(exit_buffer)
.map_block(|_| ())
.connect(scope.terminate);
});
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
check_unreachable(workflow, 1, &mut context);
}
fn check_unreachable(
workflow: Service<(), ()>,
flush_cycles: usize,
context: &mut TestingContext,
) {
let mut promise =
context.command(|commands| commands.request((), workflow).take_response());
context.run_with_conditions(&mut promise, flush_cycles);
assert!(promise
.take()
.cancellation()
.is_some_and(|c| matches!(*c.cause, CancellationCause::Unreachable(_))));
assert!(context.no_unhandled_errors());
}
#[test]
fn test_fork_clone() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let fork = scope.input.fork_clone(builder);
let branch_a = fork.clone_output(builder);
let branch_b = fork.clone_output(builder);
builder.connect(branch_a, scope.terminate);
builder.connect(branch_b, scope.terminate);
});
let mut promise =
context.command(|commands| commands.request(5.0, workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(1));
assert!(promise.take().available().is_some_and(|v| v == 5.0));
assert!(context.no_unhandled_errors());
let workflow = context.spawn_io_workflow(|scope, builder| {
scope.input.chain(builder).fork_clone((
|chain: Chain<f64>| chain.connect(scope.terminate),
|chain: Chain<f64>| chain.connect(scope.terminate),
));
});
let mut promise =
context.command(|commands| commands.request(3.0, workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(1));
assert!(promise.take().available().is_some_and(|v| v == 3.0));
assert!(context.no_unhandled_errors());
let workflow = context.spawn_io_workflow(|scope, builder| {
scope.input.chain(builder).fork_clone((
|chain: Chain<f64>| {
chain
.map_block(|t| WaitRequest {
duration: Duration::from_secs_f64(10.0 * t),
value: 10.0 * t,
})
.map(|r: AsyncMap<WaitRequest<f64>>| wait(r.request))
.connect(scope.terminate)
},
|chain: Chain<f64>| {
chain
.map_block(|t| WaitRequest {
duration: Duration::from_secs_f64(t / 100.0),
value: t / 100.0,
})
.map(|r: AsyncMap<WaitRequest<f64>>| wait(r.request))
.connect(scope.terminate)
},
));
});
let mut promise =
context.command(|commands| commands.request(1.0, workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs_f64(0.5));
assert!(promise.take().available().is_some_and(|v| v == 0.01));
assert!(context.no_unhandled_errors());
let workflow = context.spawn_io_workflow(|scope, builder| {
let (fork_input, fork_output) = builder.create_fork_clone();
builder.connect(scope.input, fork_input);
let a = fork_output.clone_output(builder);
let b = fork_output.clone_output(builder);
builder.join((a, b)).connect(scope.terminate);
});
let mut promise = context.command(|commands| commands.request(5, workflow).take_response());
context.run_with_conditions(&mut promise, 1);
assert!(promise.take().available().is_some_and(|v| v == (5, 5)));
assert!(context.no_unhandled_errors());
}
#[test]
fn test_stream_reachability() {
let mut context = TestingContext::minimal_plugins();
let workflow = context.spawn_io_workflow(|scope, builder| {
let stream_node = builder.create_map(|_: BlockingMap<(), StreamOf<u32>>| {
});
builder.connect(scope.input, stream_node.input);
stream_node
.streams
.chain(builder)
.inner()
.map_block(|value| 2 * value)
.connect(scope.terminate);
});
let mut promise =
context.command(|commands| commands.request((), workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(2));
assert!(promise.peek().is_cancelled());
assert!(context.no_unhandled_errors());
let workflow = context.spawn_io_workflow(|scope, builder| {
let stream_node = builder.create_map(|_: AsyncMap<(), StreamOf<u32>>| {
async { }
});
builder.connect(scope.input, stream_node.input);
stream_node
.streams
.chain(builder)
.inner()
.map_block(|value| 2 * value)
.connect(scope.terminate);
});
let mut promise =
context.command(|commands| commands.request((), workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(2));
assert!(promise.peek().is_cancelled());
assert!(context.no_unhandled_errors());
}
use tokio::sync::mpsc::unbounded_channel;
#[test]
fn test_on_cleanup() {
let mut context = TestingContext::minimal_plugins();
let (sender, mut receiver) = unbounded_channel();
let workflow = context.spawn_io_workflow(|scope, builder| {
let input = scope.input.fork_clone(builder);
let buffer = builder.create_buffer(BufferSettings::default());
let input_to_buffer = input.clone_output(builder);
builder.connect(input_to_buffer, buffer.input_slot());
let none_node = builder.create_map_block(produce_none);
let input_to_node = input.clone_output(builder);
builder.connect(input_to_node, none_node.input);
none_node
.output
.chain(builder)
.cancel_on_none()
.connect(scope.terminate);
builder.on_cancel(buffer, |scope, builder| {
scope
.input
.chain(builder)
.consume_buffer::<8>()
.map_block(move |values| {
for value in values {
sender.send(value).unwrap();
}
})
.connect(scope.terminate);
});
});
let mut promise = context.command(|commands| commands.request(5, workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(2));
assert!(promise.peek().is_cancelled());
let channel_output = receiver.try_recv().unwrap();
assert_eq!(channel_output, 5);
assert!(receiver.try_recv().is_err());
assert!(context.no_unhandled_errors());
assert!(context.confirm_buffers_empty().is_ok());
let (cancel_sender, mut cancel_receiver) = unbounded_channel();
let (terminate_sender, mut terminate_receiver) = unbounded_channel();
let (cleanup_sender, mut cleanup_receiver) = unbounded_channel();
let workflow = context.spawn_io_workflow(|scope, builder| {
let input = scope.input.fork_clone(builder);
let cancel_buffer = builder.create_buffer(BufferSettings::default());
let input_to_cancel = input.clone_output(builder);
builder.connect(input_to_cancel, cancel_buffer.input_slot());
let terminate_buffer = builder.create_buffer(BufferSettings::default());
let input_to_terminate = input.clone_output(builder);
builder.connect(input_to_terminate, terminate_buffer.input_slot());
let cleanup_buffer = builder.create_buffer(BufferSettings::default());
let input_to_cleanup = input.clone_output(builder);
builder.connect(input_to_cleanup, cleanup_buffer.input_slot());
let filter_node =
builder.create_map_block(|value: u64| if value >= 5 { Some(value) } else { None });
let input_to_filter_node = input.clone_output(builder);
builder.connect(input_to_filter_node, filter_node.input);
filter_node
.output
.chain(builder)
.cancel_on_none()
.connect(scope.terminate);
builder.on_cancel(cancel_buffer, |scope, builder| {
scope
.input
.chain(builder)
.consume_buffer::<8>()
.map_block(move |values| {
for value in values {
cancel_sender.send(value).unwrap();
}
})
.connect(scope.terminate);
});
builder.on_terminate(terminate_buffer, |scope, builder| {
scope
.input
.chain(builder)
.consume_buffer::<8>()
.map_block(move |values| {
for value in values {
terminate_sender.send(value).unwrap();
}
})
.connect(scope.terminate);
});
builder.on_cleanup(cleanup_buffer, |scope, builder| {
scope
.input
.chain(builder)
.consume_buffer::<8>()
.map_block(move |values| {
for value in values {
cleanup_sender.send(value).unwrap();
}
})
.connect(scope.terminate);
});
});
let mut promise = context.command(|commands| commands.request(3, workflow).take_response());
context.run_with_conditions(&mut promise, 10);
assert!(promise.peek().is_cancelled());
assert_eq!(cancel_receiver.try_recv().unwrap(), 3);
assert!(cancel_receiver.try_recv().is_err());
assert_eq!(cleanup_receiver.try_recv().unwrap(), 3);
assert!(cleanup_receiver.try_recv().is_err());
assert!(terminate_receiver.try_recv().is_err());
assert!(context.no_unhandled_errors());
assert!(context.confirm_buffers_empty().is_ok());
let mut promise = context.command(|commands| commands.request(6, workflow).take_response());
context.run_with_conditions(&mut promise, 10);
assert!(promise.take().available().is_some_and(|v| v == 6));
assert_eq!(terminate_receiver.try_recv().unwrap(), 6);
assert!(terminate_receiver.try_recv().is_err());
assert_eq!(cleanup_receiver.try_recv().unwrap(), 6);
assert!(cleanup_receiver.try_recv().is_err());
assert!(cancel_receiver.try_recv().is_err());
assert!(context.no_unhandled_errors());
assert!(context.confirm_buffers_empty().is_ok());
}
#[test]
fn test_double_collection() {
let mut context = TestingContext::minimal_plugins();
let delay = context.spawn_delay(Duration::from_secs_f32(0.01));
let workflow = context.spawn_io_workflow(|scope, builder| {
let later_collect = builder.create_collect_all::<i32, 8>();
let earlier_collect = builder.create_collect_all::<i32, 8>();
scope
.input
.chain(builder)
.spread()
.then(delay)
.map_block(|v| if v <= 4 { Some(v) } else { None })
.dispose_on_none()
.connect(earlier_collect.input);
earlier_collect
.output
.chain(builder)
.spread()
.connect(later_collect.input);
later_collect.output.chain(builder).connect(scope.terminate);
});
let mut promise =
context.command(|commands| commands.request([1, 2, 3, 4, 5], workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(5));
assert!(promise
.take()
.available()
.is_some_and(|v| &v[..] == [1, 2, 3, 4]));
assert!(context.no_unhandled_errors());
let workflow = context.spawn_io_workflow(|scope, builder| {
let earlier_collect = builder.create_collect_all::<i32, 8>();
let later_collect = builder.create_collect_all::<i32, 8>();
scope
.input
.chain(builder)
.spread()
.then(delay)
.connect(earlier_collect.input);
earlier_collect
.output
.chain(builder)
.spread()
.connect(later_collect.input);
later_collect.output.chain(builder).spread().fork_clone((
|chain: Chain<i32>| chain.connect(earlier_collect.input),
|chain: Chain<i32>| chain.connect(scope.terminate),
));
});
let mut promise =
context.command(|commands| commands.request([1, 2, 3, 4, 5], workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(2));
assert!(promise.take().is_cancelled());
assert!(context.no_unhandled_errors());
let workflow = context.spawn_io_workflow(|scope, builder| {
let earlier_collect = builder.create_collect_all::<i32, 8>();
scope
.input
.chain(builder)
.spread()
.then(delay)
.map_block(|v| if v <= 4 { Some(v) } else { None })
.dispose_on_none()
.connect(earlier_collect.input);
let _ = earlier_collect
.output
.chain(builder)
.then_io_scope(|scope, builder| {
scope
.input
.chain(builder)
.spread()
.collect_all::<8>()
.connect(scope.terminate);
})
.fork_clone((
|chain: Chain<_>| chain.spread().connect(earlier_collect.input),
|chain: Chain<_>| chain.connect(scope.terminate),
));
});
check_collections(workflow, [1, 2, 3, 4], [1, 2, 3, 4], &mut context);
check_collections(workflow, [1, 2, 3, 4, 5, 6], [1, 2, 3, 4], &mut context);
check_collections(workflow, [1, 8, 2, 7, 3, 6], [1, 2, 3], &mut context);
check_collections(
workflow,
[8, 7, 6, 5, 4, 3, 2, 1],
[4, 3, 2, 1],
&mut context,
);
check_collections(workflow, [6, 7, 8, 9, 10], [], &mut context);
}
fn check_collections(
workflow: Service<SmallVec<[i32; 8]>, SmallVec<[i32; 8]>>,
input: impl IntoIterator<Item = i32>,
expectation: impl IntoIterator<Item = i32>,
context: &mut TestingContext,
) {
let input: SmallVec<[i32; 8]> = SmallVec::from_iter(input);
let expectation: SmallVec<[i32; 8]> = SmallVec::from_iter(expectation);
let mut promise =
context.command(|commands| commands.request(input, workflow).take_response());
context.run_with_conditions(&mut promise, Duration::from_secs(2));
assert!(promise.take().available().is_some_and(|v| v == expectation));
assert!(context.no_unhandled_errors());
}
}