use futures::StreamExt;
use serde::{Deserialize, Serialize};
use crate::{Request, capability::Operation};
use super::super::Command;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
enum Op {
Basic,
Abort,
}
impl Operation for Op {
type Output = usize;
}
enum Effect {
Op(Request<Op>),
}
impl From<Request<Op>> for Effect {
fn from(value: Request<Op>) -> Self {
Effect::Op(value)
}
}
#[derive(Debug, PartialEq)]
enum Event {
OpDone(usize),
Ping,
}
#[test]
fn spawn_returns_join_handle() {
let mut cmd = Command::new(|ctx| async move {
let task_join = ctx.spawn(|ctx| async move {
ctx.request_from_shell(Op::Basic).await;
});
task_join.await;
ctx.send_event(Event::Ping);
});
assert!(cmd.events().next().is_none());
let effect = cmd.effects().next().unwrap();
let Effect::Op(mut request) = effect;
request.resolve(1).expect("should resolve");
let event = cmd.events().next().unwrap();
assert_eq!(event, Event::Ping);
assert!(cmd.is_done());
}
#[test]
fn all_join_handles_get_notified() {
let mut cmd = Command::new(|ctx| async move {
let task_join = ctx.spawn(|ctx| async move {
ctx.request_from_shell(Op::Basic).await;
});
ctx.spawn({
let task_join = task_join.clone();
|ctx| async move {
task_join.await;
ctx.send_event(Event::OpDone(1));
}
});
ctx.spawn({
let task_join = task_join.clone();
|ctx| async move {
task_join.await;
ctx.send_event(Event::OpDone(2));
}
});
});
assert!(cmd.events().next().is_none());
let effect = cmd.effects().next().unwrap();
let Effect::Op(mut request) = effect;
request.resolve(1).expect("should resolve");
let events: Vec<_> = cmd.events().collect();
assert_eq!(events.len(), 2);
assert_eq!(events[0], Event::OpDone(1));
assert_eq!(events[1], Event::OpDone(2));
assert!(cmd.is_done());
}
#[test]
fn awaiting_multiple_copies_of_handle_works() {
let mut cmd = Command::new(|ctx| async move {
let task_join = ctx.spawn(|ctx| async move {
ctx.request_from_shell(Op::Basic).await;
});
let join_one = task_join.clone();
let join_two = task_join.clone();
let join_three = task_join.clone();
futures::join!(join_one, join_two, join_three);
ctx.send_event(Event::Ping);
});
assert!(cmd.events().next().is_none());
let effect = cmd.effects().next().unwrap();
let Effect::Op(mut request) = effect;
request.resolve(1).expect("should resolve");
let event = cmd.events().next().unwrap();
assert_eq!(event, Event::Ping);
assert!(cmd.is_done());
}
#[test]
fn join_handle_can_abort_a_task() {
let mut cmd = Command::new(|ctx| async move {
let stream_handle = ctx.spawn(|ctx| async move {
let mut stream = ctx.stream_from_shell(Op::Basic);
while stream.next().await.is_some() {
ctx.send_event(Event::Ping);
}
});
ctx.spawn(|ctx| async move {
ctx.request_from_shell(Op::Abort).await;
stream_handle.abort();
});
});
assert!(cmd.events().next().is_none());
let mut effects: Vec<_> = cmd.effects().collect();
let Effect::Op(mut stream_request) = effects.remove(0);
let Effect::Op(mut abort_request) = effects.remove(0);
assert_eq!(abort_request.operation, Op::Abort);
assert_eq!(stream_request.operation, Op::Basic);
for i in 1..10 {
stream_request.resolve(i).expect("to resolve");
let event = cmd.events().next().unwrap();
assert_eq!(event, Event::Ping);
}
assert!(!cmd.is_done());
abort_request.resolve(0).expect("to resolve");
stream_request.resolve(1).expect("to resolve"); assert!(cmd.events().next().is_none());
assert!(cmd.is_done());
}
#[test]
fn tasks_can_be_aborted_immediately() {
let mut cmd: Command<Effect, Event> = Command::new(|ctx| async move {
let handle = ctx.spawn(|ctx| async move {
ctx.request_from_shell(Op::Abort).await;
});
handle.abort();
});
assert!(cmd.effects().next().is_none());
assert!(cmd.events().next().is_none());
assert!(cmd.is_done());
}
#[test]
fn aborted_tasks_notify_their_join_handles() {
let mut cmd = Command::new(|ctx| async move {
let stream_handle = ctx.spawn(|ctx| async move {
let mut stream = ctx.stream_from_shell(Op::Basic);
while stream.next().await.is_some() {
ctx.send_event(Event::Ping);
}
});
ctx.spawn({
let stream_handle = stream_handle.clone();
|ctx| async move {
ctx.request_from_shell(Op::Abort).await;
stream_handle.abort();
}
});
ctx.spawn(|ctx| async move {
stream_handle.await;
ctx.send_event(Event::OpDone(3));
});
});
assert!(cmd.events().next().is_none());
let mut effects: Vec<_> = cmd.effects().collect();
let Effect::Op(mut stream_request) = effects.remove(0);
let Effect::Op(mut abort_request) = effects.remove(0);
assert_eq!(abort_request.operation, Op::Abort);
assert_eq!(stream_request.operation, Op::Basic);
for i in 1..10 {
stream_request.resolve(i).expect("to resolve");
let event = cmd.events().next().unwrap();
assert_eq!(event, Event::Ping);
}
assert!(!cmd.is_done());
abort_request.resolve(0).expect("to resolve");
stream_request.resolve(1).expect("to resolve");
assert_eq!(Event::OpDone(3), cmd.events().next().unwrap());
assert!(cmd.is_done());
}
#[test]
fn commands_can_be_aborted() {
let mut cmd: Command<Effect, Event> = Command::all([
Command::request_from_shell(Op::Basic).then_send(Event::OpDone),
Command::request_from_shell(Op::Basic).then_send(Event::OpDone),
]);
let handle = cmd.abort_handle();
assert!(!cmd.was_aborted());
let mut effects: Vec<_> = cmd.effects().collect();
assert_eq!(effects.len(), 2);
handle.abort();
assert!(cmd.is_done());
assert!(cmd.was_aborted());
let Effect::Op(mut first_request) = effects.remove(0);
let Effect::Op(mut second_request) = effects.remove(0);
first_request.resolve(1).expect("to resolve");
second_request.resolve(2).expect("to resolve");
assert!(cmd.events().next().is_none());
assert!(cmd.effects().next().is_none());
}
#[test]
fn dropping_request_cancels_its_future() {
let mut cmd: Command<Effect, Event> = Command::new(|ctx| async move {
ctx.request_from_shell(Op::Basic).await;
ctx.send_event(Event::Ping);
});
assert!(cmd.events().next().is_none());
let Effect::Op(request) = cmd.effects().next().unwrap();
drop(request);
assert!(cmd.is_done());
}